diff --git a/server/internal/database/service.go b/server/internal/database/service.go index 1d168bbc..5fa90e2a 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -149,6 +149,18 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { } } + serviceInstanceSpecs, err := s.store.ServiceInstanceSpec. + GetByDatabaseID(databaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get service instance specs: %w", err) + } + for _, spec := range serviceInstanceSpecs { + if err := s.releaseServiceInstancePort(ctx, spec.Spec); err != nil { + return err + } + } + var ops []storage.TxnOperation spec, err := s.store.Spec.GetByKey(databaseID).Exec(ctx) @@ -174,6 +186,7 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { s.store.InstanceSpec.DeleteByDatabaseID(databaseID), s.store.InstanceStatus.DeleteByDatabaseID(databaseID), s.store.ScriptResult.DeleteByDatabaseID(databaseID), + s.store.ServiceInstanceSpec.DeleteByDatabaseID(databaseID), ) if err := s.store.Txn(ops...).Commit(ctx); err != nil { @@ -723,6 +736,103 @@ func (s *Service) DeleteInstanceSpec(ctx context.Context, databaseID, instanceID return nil } +func (s *Service) ReconcileServiceInstanceSpec(ctx context.Context, spec *ServiceInstanceSpec) (*ServiceInstanceSpec, error) { + if s.cfg.HostID != spec.HostID { + return nil, fmt.Errorf("this service instance belongs to another host - this host='%s', service instance host='%s'", s.cfg.HostID, spec.HostID) + } + + var previous *ServiceInstanceSpec + stored, err := s.store.ServiceInstanceSpec. + GetByKey(spec.DatabaseID, spec.ServiceInstanceID). + Exec(ctx) + switch { + case err == nil: + previous = stored.Spec + spec.CopyPortFrom(previous) + case errors.Is(err, storage.ErrNotFound): + stored = &StoredServiceInstanceSpec{} + default: + return nil, fmt.Errorf("failed to get current spec for service instance '%s': %w", spec.ServiceInstanceID, err) + } + + var allocated []int + rollback := func(cause error) error { + rollbackCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + return errors.Join(cause, s.portsSvc.ReleasePort(rollbackCtx, spec.HostID, allocated...)) + } + + if spec.Port != nil && *spec.Port == 0 { + port, err := s.portsSvc.AllocatePort(ctx, spec.HostID) + if err != nil { + return nil, fmt.Errorf("failed to allocate port for service instance: %w", err) + } + allocated = append(allocated, port) + spec.Port = utils.PointerTo(port) + } + + stored.Spec = spec + err = s.store.ServiceInstanceSpec. + Update(stored). + Exec(ctx) + if err != nil { + return nil, rollback(fmt.Errorf("failed to persist updated service instance spec: %w", err)) + } + + if err := s.releasePreviousServiceInstancePort(ctx, previous, spec); err != nil { + return nil, err + } + + return spec, nil +} + +func (s *Service) DeleteServiceInstanceSpec(ctx context.Context, databaseID, serviceInstanceID string) error { + spec, err := s.store.ServiceInstanceSpec. + GetByKey(databaseID, serviceInstanceID). + Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to check if service instance spec exists: %w", err) + } + + if err := s.releaseServiceInstancePort(ctx, spec.Spec); err != nil { + return err + } + + _, err = s.store.ServiceInstanceSpec. + DeleteByKey(databaseID, serviceInstanceID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to delete service instance spec: %w", err) + } + + return nil +} + +func (s *Service) releaseServiceInstancePort(ctx context.Context, spec *ServiceInstanceSpec) error { + err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port) + if err != nil { + return fmt.Errorf("failed to release port for service instance '%s': %w", spec.ServiceInstanceID, err) + } + + return nil +} + +func (s *Service) releasePreviousServiceInstancePort(ctx context.Context, previous, new *ServiceInstanceSpec) error { + if previous == nil { + return nil + } + if portShouldBeReleased(previous.Port, new.Port) { + err := s.portsSvc.ReleasePortIfDefined(ctx, previous.HostID, previous.Port) + if err != nil { + return fmt.Errorf("failed to release previous service instance port: %w", err) + } + } + return nil +} + func (s *Service) releaseInstancePorts(ctx context.Context, spec *InstanceSpec) error { err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port, spec.PatroniPort) if err != nil { @@ -913,7 +1023,7 @@ func (s *Service) DeleteServiceInstance(ctx context.Context, databaseID, service return fmt.Errorf("failed to delete stored service instance status: %w", err) } - return nil + return s.DeleteServiceInstanceSpec(ctx, databaseID, serviceInstanceID) } func (s *Service) UpdateServiceInstanceStatus( diff --git a/server/internal/database/service_instance.go b/server/internal/database/service_instance.go index eeb748e4..0a5b533f 100644 --- a/server/internal/database/service_instance.go +++ b/server/internal/database/service_instance.go @@ -151,6 +151,13 @@ func GenerateDatabaseNetworkID(databaseID string) string { return databaseID } +// CopyPortFrom copies the port from the current (persisted) spec to this spec, +// retaining any previously allocated stable random port. This mirrors the +// reconcilePort logic used by InstanceSpec.CopySettingsFrom. +func (s *ServiceInstanceSpec) CopyPortFrom(current *ServiceInstanceSpec) { + s.Port = reconcilePort(current.Port, s.Port) +} + // ServiceInstanceSpec contains the specification for generating service instance resources. type ServiceInstanceSpec struct { ServiceInstanceID string diff --git a/server/internal/database/service_instance_spec_store.go b/server/internal/database/service_instance_spec_store.go new file mode 100644 index 00000000..8770d1ed --- /dev/null +++ b/server/internal/database/service_instance_spec_store.go @@ -0,0 +1,61 @@ +package database + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/storage" +) + +type StoredServiceInstanceSpec struct { + storage.StoredValue + Spec *ServiceInstanceSpec `json:"spec"` +} + +type ServiceInstanceSpecStore struct { + client *clientv3.Client + root string +} + +func NewServiceInstanceSpecStore(client *clientv3.Client, root string) *ServiceInstanceSpecStore { + return &ServiceInstanceSpecStore{ + client: client, + root: root, + } +} + +func (s *ServiceInstanceSpecStore) Prefix() string { + return storage.Prefix("/", s.root, "service_instance_specs") +} + +func (s *ServiceInstanceSpecStore) DatabasePrefix(databaseID string) string { + return storage.Prefix(s.Prefix(), databaseID) +} + +func (s *ServiceInstanceSpecStore) Key(databaseID, serviceInstanceID string) string { + return storage.Key(s.DatabasePrefix(databaseID), serviceInstanceID) +} + +func (s *ServiceInstanceSpecStore) GetByKey(databaseID, serviceInstanceID string) storage.GetOp[*StoredServiceInstanceSpec] { + key := s.Key(databaseID, serviceInstanceID) + return storage.NewGetOp[*StoredServiceInstanceSpec](s.client, key) +} + +func (s *ServiceInstanceSpecStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredServiceInstanceSpec] { + prefix := s.DatabasePrefix(databaseID) + return storage.NewGetPrefixOp[*StoredServiceInstanceSpec](s.client, prefix) +} + +func (s *ServiceInstanceSpecStore) Update(item *StoredServiceInstanceSpec) storage.PutOp[*StoredServiceInstanceSpec] { + key := s.Key(item.Spec.DatabaseID, item.Spec.ServiceInstanceID) + return storage.NewUpdateOp(s.client, key, item) +} + +func (s *ServiceInstanceSpecStore) DeleteByKey(databaseID, serviceInstanceID string) storage.DeleteOp { + key := s.Key(databaseID, serviceInstanceID) + return storage.NewDeleteKeyOp(s.client, key) +} + +func (s *ServiceInstanceSpecStore) DeleteByDatabaseID(databaseID string) storage.DeleteOp { + prefix := s.DatabasePrefix(databaseID) + return storage.NewDeletePrefixOp(s.client, prefix) +} diff --git a/server/internal/database/store.go b/server/internal/database/store.go index e9ada83c..2c53f1c9 100644 --- a/server/internal/database/store.go +++ b/server/internal/database/store.go @@ -7,28 +7,30 @@ import ( ) type Store struct { - client *clientv3.Client - Spec *SpecStore - Database *DatabaseStore - Instance *InstanceStore - InstanceStatus *InstanceStatusStore - InstanceSpec *InstanceSpecStore - ScriptResult *ScriptResultStore - ServiceInstance *ServiceInstanceStore - ServiceInstanceStatus *ServiceInstanceStatusStore + client *clientv3.Client + Spec *SpecStore + Database *DatabaseStore + Instance *InstanceStore + InstanceStatus *InstanceStatusStore + InstanceSpec *InstanceSpecStore + ScriptResult *ScriptResultStore + ServiceInstance *ServiceInstanceStore + ServiceInstanceStatus *ServiceInstanceStatusStore + ServiceInstanceSpec *ServiceInstanceSpecStore } func NewStore(client *clientv3.Client, root string) *Store { return &Store{ - client: client, - Spec: NewSpecStore(client, root), - Database: NewDatabaseStore(client, root), - Instance: NewInstanceStore(client, root), - InstanceStatus: NewInstanceStatusStore(client, root), - InstanceSpec: NewInstanceSpecStore(client, root), - ScriptResult: NewScriptResultStore(client, root), - ServiceInstance: NewServiceInstanceStore(client, root), - ServiceInstanceStatus: NewServiceInstanceStatusStore(client, root), + client: client, + Spec: NewSpecStore(client, root), + Database: NewDatabaseStore(client, root), + Instance: NewInstanceStore(client, root), + InstanceStatus: NewInstanceStatusStore(client, root), + InstanceSpec: NewInstanceSpecStore(client, root), + ScriptResult: NewScriptResultStore(client, root), + ServiceInstance: NewServiceInstanceStore(client, root), + ServiceInstanceStatus: NewServiceInstanceStatusStore(client, root), + ServiceInstanceSpec: NewServiceInstanceSpecStore(client, root), } } diff --git a/server/internal/workflows/activities/generate_service_instance_resources.go b/server/internal/workflows/activities/generate_service_instance_resources.go index 21c55744..bcc4b434 100644 --- a/server/internal/workflows/activities/generate_service_instance_resources.go +++ b/server/internal/workflows/activities/generate_service_instance_resources.go @@ -42,7 +42,12 @@ func (a *Activities) GenerateServiceInstanceResources( ) logger.Debug("generating service instance resources") - resources, err := a.Orchestrator.GenerateServiceInstanceResources(input.Spec) + spec, err := a.DatabaseService.ReconcileServiceInstanceSpec(ctx, input.Spec) + if err != nil { + return nil, fmt.Errorf("failed to reconcile service instance spec: %w", err) + } + + resources, err := a.Orchestrator.GenerateServiceInstanceResources(spec) if err != nil { return nil, fmt.Errorf("failed to generate service instance resources: %w", err) }