From b5d3f354984b30c3801439d97661abfe550e5a49 Mon Sep 17 00:00:00 2001 From: moizpgedge Date: Mon, 13 Apr 2026 18:06:57 +0500 Subject: [PATCH] feat: stable random ports for service instances Adopt the random port allocation mechanism for supporting services (MCP, PostgREST, RAG), mirroring the existing instance port behaviour introduced in the `feat: stable random ports` commit. When a service spec sets `port: 0`, the Control Plane allocates a stable random port from the configured range, persists it in etcd, and reuses it on every subsequent re-plan. The port is released when the service is removed or the database is deleted. Changes: - Add `ServiceInstanceSpecStore` to persist `ServiceInstanceSpec` in etcd under `service_instance_specs/{database_id}/{service_instance_id}` - Add `CopyPortFrom` to `ServiceInstanceSpec` using the existing `reconcilePort` helper - Add `ReconcileServiceInstanceSpec` and `DeleteServiceInstanceSpec` to `database.Service`, following the same pattern as `ReconcileInstanceSpec` / `DeleteInstanceSpec` - Call `ReconcileServiceInstanceSpec` in the `GenerateServiceInstanceResources` activity before generating resources, so allocation runs on the correct host - Update `DeleteDatabase` to release service instance ports and include `ServiceInstanceSpec.DeleteByDatabaseID` in the deletion transaction - Update `DeleteServiceInstance` to call `DeleteServiceInstanceSpec` PLAT-510 --- server/internal/database/service.go | 112 +++++++++++++++++- server/internal/database/service_instance.go | 7 ++ .../database/service_instance_spec_store.go | 61 ++++++++++ server/internal/database/store.go | 34 +++--- .../generate_service_instance_resources.go | 7 +- 5 files changed, 203 insertions(+), 18 deletions(-) create mode 100644 server/internal/database/service_instance_spec_store.go diff --git a/server/internal/database/service.go b/server/internal/database/service.go index 09ef2338..914445ff 100644 --- a/server/internal/database/service.go +++ b/server/internal/database/service.go @@ -148,6 +148,18 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { } } + serviceInstanceSpecs, err := s.store.ServiceInstanceSpec. + GetByDatabaseID(databaseID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to get service instance specs: %w", err) + } + for _, spec := range serviceInstanceSpecs { + if err := s.releaseServiceInstancePort(ctx, spec.Spec); err != nil { + return err + } + } + var ops []storage.TxnOperation spec, err := s.store.Spec.GetByKey(databaseID).Exec(ctx) @@ -172,6 +184,7 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error { s.store.Instance.DeleteByDatabaseID(databaseID), s.store.InstanceSpec.DeleteByDatabaseID(databaseID), s.store.InstanceStatus.DeleteByDatabaseID(databaseID), + s.store.ServiceInstanceSpec.DeleteByDatabaseID(databaseID), ) if err := s.store.Txn(ops...).Commit(ctx); err != nil { @@ -675,6 +688,103 @@ func (s *Service) DeleteInstanceSpec(ctx context.Context, databaseID, instanceID return nil } +func (s *Service) ReconcileServiceInstanceSpec(ctx context.Context, spec *ServiceInstanceSpec) (*ServiceInstanceSpec, error) { + if s.cfg.HostID != spec.HostID { + return nil, fmt.Errorf("this service instance belongs to another host - this host='%s', service instance host='%s'", s.cfg.HostID, spec.HostID) + } + + var previous *ServiceInstanceSpec + stored, err := s.store.ServiceInstanceSpec. + GetByKey(spec.DatabaseID, spec.ServiceInstanceID). + Exec(ctx) + switch { + case err == nil: + previous = stored.Spec + spec.CopyPortFrom(previous) + case errors.Is(err, storage.ErrNotFound): + stored = &StoredServiceInstanceSpec{} + default: + return nil, fmt.Errorf("failed to get current spec for service instance '%s': %w", spec.ServiceInstanceID, err) + } + + var allocated []int + rollback := func(cause error) error { + rollbackCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + return errors.Join(cause, s.portsSvc.ReleasePort(rollbackCtx, spec.HostID, allocated...)) + } + + if spec.Port != nil && *spec.Port == 0 { + port, err := s.portsSvc.AllocatePort(ctx, spec.HostID) + if err != nil { + return nil, fmt.Errorf("failed to allocate port for service instance: %w", err) + } + allocated = append(allocated, port) + spec.Port = utils.PointerTo(port) + } + + stored.Spec = spec + err = s.store.ServiceInstanceSpec. + Update(stored). + Exec(ctx) + if err != nil { + return nil, rollback(fmt.Errorf("failed to persist updated service instance spec: %w", err)) + } + + if err := s.releasePreviousServiceInstancePort(ctx, previous, spec); err != nil { + return nil, err + } + + return spec, nil +} + +func (s *Service) DeleteServiceInstanceSpec(ctx context.Context, databaseID, serviceInstanceID string) error { + spec, err := s.store.ServiceInstanceSpec. + GetByKey(databaseID, serviceInstanceID). + Exec(ctx) + if errors.Is(err, storage.ErrNotFound) { + return nil + } else if err != nil { + return fmt.Errorf("failed to check if service instance spec exists: %w", err) + } + + if err := s.releaseServiceInstancePort(ctx, spec.Spec); err != nil { + return err + } + + _, err = s.store.ServiceInstanceSpec. + DeleteByKey(databaseID, serviceInstanceID). + Exec(ctx) + if err != nil { + return fmt.Errorf("failed to delete service instance spec: %w", err) + } + + return nil +} + +func (s *Service) releaseServiceInstancePort(ctx context.Context, spec *ServiceInstanceSpec) error { + err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port) + if err != nil { + return fmt.Errorf("failed to release port for service instance '%s': %w", spec.ServiceInstanceID, err) + } + + return nil +} + +func (s *Service) releasePreviousServiceInstancePort(ctx context.Context, previous, new *ServiceInstanceSpec) error { + if previous == nil { + return nil + } + if portShouldBeReleased(previous.Port, new.Port) { + err := s.portsSvc.ReleasePortIfDefined(ctx, previous.HostID, previous.Port) + if err != nil { + return fmt.Errorf("failed to release previous service instance port: %w", err) + } + } + return nil +} + func (s *Service) releaseInstancePorts(ctx context.Context, spec *InstanceSpec) error { err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port, spec.PatroniPort) if err != nil { @@ -865,7 +975,7 @@ func (s *Service) DeleteServiceInstance(ctx context.Context, databaseID, service return fmt.Errorf("failed to delete stored service instance status: %w", err) } - return nil + return s.DeleteServiceInstanceSpec(ctx, databaseID, serviceInstanceID) } func (s *Service) UpdateServiceInstanceStatus( diff --git a/server/internal/database/service_instance.go b/server/internal/database/service_instance.go index 49c7e692..7bf451f8 100644 --- a/server/internal/database/service_instance.go +++ b/server/internal/database/service_instance.go @@ -151,6 +151,13 @@ func GenerateDatabaseNetworkID(databaseID string) string { return databaseID } +// CopyPortFrom copies the port from the current (persisted) spec to this spec, +// retaining any previously allocated stable random port. This mirrors the +// reconcilePort logic used by InstanceSpec.CopySettingsFrom. +func (s *ServiceInstanceSpec) CopyPortFrom(current *ServiceInstanceSpec) { + s.Port = reconcilePort(current.Port, s.Port) +} + // ServiceInstanceSpec contains the specification for generating service instance resources. type ServiceInstanceSpec struct { ServiceInstanceID string diff --git a/server/internal/database/service_instance_spec_store.go b/server/internal/database/service_instance_spec_store.go new file mode 100644 index 00000000..8770d1ed --- /dev/null +++ b/server/internal/database/service_instance_spec_store.go @@ -0,0 +1,61 @@ +package database + +import ( + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/pgEdge/control-plane/server/internal/storage" +) + +type StoredServiceInstanceSpec struct { + storage.StoredValue + Spec *ServiceInstanceSpec `json:"spec"` +} + +type ServiceInstanceSpecStore struct { + client *clientv3.Client + root string +} + +func NewServiceInstanceSpecStore(client *clientv3.Client, root string) *ServiceInstanceSpecStore { + return &ServiceInstanceSpecStore{ + client: client, + root: root, + } +} + +func (s *ServiceInstanceSpecStore) Prefix() string { + return storage.Prefix("/", s.root, "service_instance_specs") +} + +func (s *ServiceInstanceSpecStore) DatabasePrefix(databaseID string) string { + return storage.Prefix(s.Prefix(), databaseID) +} + +func (s *ServiceInstanceSpecStore) Key(databaseID, serviceInstanceID string) string { + return storage.Key(s.DatabasePrefix(databaseID), serviceInstanceID) +} + +func (s *ServiceInstanceSpecStore) GetByKey(databaseID, serviceInstanceID string) storage.GetOp[*StoredServiceInstanceSpec] { + key := s.Key(databaseID, serviceInstanceID) + return storage.NewGetOp[*StoredServiceInstanceSpec](s.client, key) +} + +func (s *ServiceInstanceSpecStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredServiceInstanceSpec] { + prefix := s.DatabasePrefix(databaseID) + return storage.NewGetPrefixOp[*StoredServiceInstanceSpec](s.client, prefix) +} + +func (s *ServiceInstanceSpecStore) Update(item *StoredServiceInstanceSpec) storage.PutOp[*StoredServiceInstanceSpec] { + key := s.Key(item.Spec.DatabaseID, item.Spec.ServiceInstanceID) + return storage.NewUpdateOp(s.client, key, item) +} + +func (s *ServiceInstanceSpecStore) DeleteByKey(databaseID, serviceInstanceID string) storage.DeleteOp { + key := s.Key(databaseID, serviceInstanceID) + return storage.NewDeleteKeyOp(s.client, key) +} + +func (s *ServiceInstanceSpecStore) DeleteByDatabaseID(databaseID string) storage.DeleteOp { + prefix := s.DatabasePrefix(databaseID) + return storage.NewDeletePrefixOp(s.client, prefix) +} diff --git a/server/internal/database/store.go b/server/internal/database/store.go index b280aefc..8ce636da 100644 --- a/server/internal/database/store.go +++ b/server/internal/database/store.go @@ -7,26 +7,28 @@ import ( ) type Store struct { - client *clientv3.Client - Spec *SpecStore - Database *DatabaseStore - Instance *InstanceStore - InstanceStatus *InstanceStatusStore - InstanceSpec *InstanceSpecStore - ServiceInstance *ServiceInstanceStore - ServiceInstanceStatus *ServiceInstanceStatusStore + client *clientv3.Client + Spec *SpecStore + Database *DatabaseStore + Instance *InstanceStore + InstanceStatus *InstanceStatusStore + InstanceSpec *InstanceSpecStore + ServiceInstance *ServiceInstanceStore + ServiceInstanceStatus *ServiceInstanceStatusStore + ServiceInstanceSpec *ServiceInstanceSpecStore } func NewStore(client *clientv3.Client, root string) *Store { return &Store{ - client: client, - Spec: NewSpecStore(client, root), - Database: NewDatabaseStore(client, root), - Instance: NewInstanceStore(client, root), - InstanceStatus: NewInstanceStatusStore(client, root), - InstanceSpec: NewInstanceSpecStore(client, root), - ServiceInstance: NewServiceInstanceStore(client, root), - ServiceInstanceStatus: NewServiceInstanceStatusStore(client, root), + client: client, + Spec: NewSpecStore(client, root), + Database: NewDatabaseStore(client, root), + Instance: NewInstanceStore(client, root), + InstanceStatus: NewInstanceStatusStore(client, root), + InstanceSpec: NewInstanceSpecStore(client, root), + ServiceInstance: NewServiceInstanceStore(client, root), + ServiceInstanceStatus: NewServiceInstanceStatusStore(client, root), + ServiceInstanceSpec: NewServiceInstanceSpecStore(client, root), } } diff --git a/server/internal/workflows/activities/generate_service_instance_resources.go b/server/internal/workflows/activities/generate_service_instance_resources.go index 21c55744..bcc4b434 100644 --- a/server/internal/workflows/activities/generate_service_instance_resources.go +++ b/server/internal/workflows/activities/generate_service_instance_resources.go @@ -42,7 +42,12 @@ func (a *Activities) GenerateServiceInstanceResources( ) logger.Debug("generating service instance resources") - resources, err := a.Orchestrator.GenerateServiceInstanceResources(input.Spec) + spec, err := a.DatabaseService.ReconcileServiceInstanceSpec(ctx, input.Spec) + if err != nil { + return nil, fmt.Errorf("failed to reconcile service instance spec: %w", err) + } + + resources, err := a.Orchestrator.GenerateServiceInstanceResources(spec) if err != nil { return nil, fmt.Errorf("failed to generate service instance resources: %w", err) }