Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions server/internal/orchestrator/swarm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,26 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta
// instance. RAG only requires read access, so a single ServiceUserRoleRO is
// created per database node using the same canonical+per-node pattern as MCP.
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
// Get service image.
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
if err != nil {
return nil, fmt.Errorf("failed to get service image: %w", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Parse the RAG service config to extract API keys.
ragConfig, errs := database.ParseRAGServiceConfig(spec.ServiceSpec.Config, false)
if len(errs) > 0 {
return nil, fmt.Errorf("failed to parse RAG service config: %w", errors.Join(errs...))
}

// Database network (shared with postgres instances).
databaseNetwork := &Network{
Scope: "swarm",
Driver: OverlayDriver,
Name: fmt.Sprintf("%s-database", spec.DatabaseID),
Allocator: o.dbNetworkAllocator,
}

canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)

// Canonical read-only role — runs on the node co-located with this instance.
Expand All @@ -670,7 +684,7 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
Mode: ServiceUserRoleRO,
}

orchestratorResources := []resource.Resource{canonicalRO}
orchestratorResources := []resource.Resource{databaseNetwork, canonicalRO}

// Per-node RO role for each additional database node so that RAG instances
// on other hosts can authenticate against their co-located Postgres.
Expand All @@ -691,12 +705,15 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
// Service data directory resource (host-side bind mount directory).
dataDirID := spec.ServiceInstanceID + "-data"
dataDir := &filesystem.DirResource{
ID: dataDirID,
HostID: spec.HostID,
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
ID: dataDirID,
HostID: spec.HostID,
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
OwnerUID: ragContainerUID,
OwnerGID: ragContainerUID,
}

// API key files resource — writes provider keys into a "keys" subdirectory.
// The keys subdirectory path is resolved at runtime from the parent DirResource.
keysResource := &RAGServiceKeysResource{
ServiceInstanceID: spec.ServiceInstanceID,
HostID: spec.HostID,
Expand All @@ -722,7 +739,38 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
DatabasePort: dbPort,
}

orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes)
// Service instance spec resource — holds the computed Docker Swarm service spec.
// KeysDirID is the parent data dir; the actual keys subdir path is derived at runtime.
serviceName := ServiceInstanceName(spec.ServiceSpec.ServiceType, spec.DatabaseID, spec.ServiceSpec.ServiceID, spec.HostID)
serviceInstanceSpec := &ServiceInstanceSpecResource{
ServiceInstanceID: spec.ServiceInstanceID,
ServiceSpec: spec.ServiceSpec,
DatabaseID: spec.DatabaseID,
DatabaseName: spec.DatabaseName,
HostID: spec.HostID,
ServiceName: serviceName,
Hostname: serviceName,
CohortMemberID: o.swarmNodeID,
ServiceImage: serviceImage,
Credentials: spec.Credentials,
DatabaseNetworkID: databaseNetwork.Name,
DatabaseHosts: spec.DatabaseHosts,
Port: spec.Port,
DataDirID: dataDirID,
}

// Service instance resource (actual Docker Swarm service).
serviceInstance := &ServiceInstanceResource{
ServiceInstanceID: spec.ServiceInstanceID,
DatabaseID: spec.DatabaseID,
ServiceName: serviceName,
ServiceID: spec.ServiceSpec.ServiceID,
ServiceSpecID: spec.ServiceSpec.ServiceID,
ServiceType: spec.ServiceSpec.ServiceType,
HostID: spec.HostID,
}

orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes, serviceInstanceSpec, serviceInstance)

return o.buildServiceInstanceResources(spec, orchestratorResources)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestExtractRAGAPIKeys_MultiPipeline(t *testing.T) {
}

func TestGenerateRAGInstanceResources_IncludesKeysResource(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host1",
ServiceSpec: &database.ServiceSpec{
Expand Down
115 changes: 72 additions & 43 deletions server/internal/orchestrator/swarm/rag_service_user_role_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package swarm
import (
"testing"

"github.com/pgEdge/control-plane/server/internal/config"
"github.com/pgEdge/control-plane/server/internal/database"
"github.com/pgEdge/control-plane/server/internal/filesystem"
"github.com/pgEdge/control-plane/server/internal/resource"
)

// newTestOrchestrator returns an Orchestrator with serviceVersions initialised
// from a minimal config, suitable for unit tests that call generateRAGInstanceResources.
func newTestOrchestrator() *Orchestrator {
return &Orchestrator{
serviceVersions: NewServiceVersions(config.Config{}),
}
}

// minimalRAGConfig returns a minimal valid RAG service config suitable for unit tests.
func minimalRAGConfig() map[string]any {
return map[string]any{
Expand Down Expand Up @@ -37,7 +46,7 @@ func minimalRAGConfig() map[string]any {
}

func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host1",
ServiceSpec: &database.ServiceSpec{
Expand Down Expand Up @@ -73,34 +82,46 @@ func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
result.ServiceInstance.State, database.ServiceInstanceStateCreating)
}

// Single node: canonical RO ServiceUserRole + DirResource + RAGServiceKeysResource + RAGConfigResource.
if len(result.Resources) != 4 {
t.Fatalf("len(Resources) = %d, want 4", len(result.Resources))
// Single node: Network + canonical RO + DirResource + Keys + Config + InstanceSpec + ServiceInstance = 7.
if len(result.Resources) != 7 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking suggestion: I think these tests could be improved by using the same testify library that we use elsewhere. For example, the three lines of this if statement could be replaced with:

require.Len(t, result.Resources, 7)

require.Len's default error message is roughly equivalent to the existing on here, and it doesn't need to be updated when the expected length changes. Side note - Claude always wants to fill in custom error messages for the testify assertions, but IMO they're not more useful than the defaults unless the check is non-intuitive and the message explains why the condition exists.

You might also be able to combine many of these assertions by using assert.Equal/require.Equal, which work great for complex structs, like:

assert.Equal(t, result, &database.ServiceInstanceResources{
    // ...
})

For me, that would be a lot easier to read and keep up-to-date.

One more suggestion to consider in this file: the file is named server/internal/orchestrator/swarm/rag_service_user_role_test.go but it has tests for other things besides the user role. Should it be renamed?

t.Fatalf("len(Resources) = %d, want 7", len(result.Resources))
}
if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole {
if result.Resources[0].Identifier.Type != ResourceTypeNetwork {
t.Errorf("Resources[0].Identifier.Type = %q, want %q",
result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole)
}
wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
if result.Resources[0].Identifier != wantID {
t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID)
result.Resources[0].Identifier.Type, ResourceTypeNetwork)
}
if result.Resources[1].Identifier.Type != filesystem.ResourceTypeDir {
if result.Resources[1].Identifier.Type != ResourceTypeServiceUserRole {
t.Errorf("Resources[1].Identifier.Type = %q, want %q",
result.Resources[1].Identifier.Type, filesystem.ResourceTypeDir)
result.Resources[1].Identifier.Type, ResourceTypeServiceUserRole)
}
if result.Resources[2].Identifier.Type != ResourceTypeRAGServiceKeys {
wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
if result.Resources[1].Identifier != wantID {
t.Errorf("Resources[1].Identifier = %v, want %v", result.Resources[1].Identifier, wantID)
}
if result.Resources[2].Identifier.Type != filesystem.ResourceTypeDir {
t.Errorf("Resources[2].Identifier.Type = %q, want %q",
result.Resources[2].Identifier.Type, ResourceTypeRAGServiceKeys)
result.Resources[2].Identifier.Type, filesystem.ResourceTypeDir)
}
if result.Resources[3].Identifier.Type != ResourceTypeRAGConfig {
if result.Resources[3].Identifier.Type != ResourceTypeRAGServiceKeys {
t.Errorf("Resources[3].Identifier.Type = %q, want %q",
result.Resources[3].Identifier.Type, ResourceTypeRAGConfig)
result.Resources[3].Identifier.Type, ResourceTypeRAGServiceKeys)
}
if result.Resources[4].Identifier.Type != ResourceTypeRAGConfig {
t.Errorf("Resources[4].Identifier.Type = %q, want %q",
result.Resources[4].Identifier.Type, ResourceTypeRAGConfig)
}
if result.Resources[5].Identifier.Type != ResourceTypeServiceInstanceSpec {
t.Errorf("Resources[5].Identifier.Type = %q, want %q",
result.Resources[5].Identifier.Type, ResourceTypeServiceInstanceSpec)
}
if result.Resources[6].Identifier.Type != ResourceTypeServiceInstance {
t.Errorf("Resources[6].Identifier.Type = %q, want %q",
result.Resources[6].Identifier.Type, ResourceTypeServiceInstance)
}
}

func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host1",
ServiceSpec: &database.ServiceSpec{
Expand All @@ -125,19 +146,19 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
t.Fatalf("generateRAGInstanceResources() error = %v", err)
}

// 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config = 6 resources.
if len(result.Resources) != 6 {
t.Fatalf("len(Resources) = %d, want 6", len(result.Resources))
// 3 nodes → Network + canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config + spec + instance = 9.
if len(result.Resources) != 9 {
t.Fatalf("len(Resources) = %d, want 9", len(result.Resources))
}
// First three must be ServiceUserRole resources.
for i := 0; i < 3; i++ {
// Resources[0] is Network; Resources[1..3] are ServiceUserRole resources.
for i := 1; i < 4; i++ {
if result.Resources[i].Identifier.Type != ResourceTypeServiceUserRole {
t.Errorf("resource[%d] type = %q, want %q", i, result.Resources[i].Identifier.Type, ResourceTypeServiceUserRole)
}
}

// Canonical is first and has no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
// Canonical is index 1 and has no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
if err != nil {
t.Fatalf("ToResource canonical: %v", err)
}
Expand All @@ -150,7 +171,7 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {

// Per-node resources point back to canonical
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
for i, rd := range result.Resources[1:3] {
for i, rd := range result.Resources[2:4] {
perNode, err := resource.ToResource[*ServiceUserRole](rd)
if err != nil {
t.Fatalf("ToResource per-node[%d]: %v", i, err)
Expand All @@ -163,23 +184,31 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
}
}

// Data dir, keys, and config resource are appended last.
if result.Resources[3].Identifier.Type != filesystem.ResourceTypeDir {
t.Errorf("Resources[3].Identifier.Type = %q, want %q",
result.Resources[3].Identifier.Type, filesystem.ResourceTypeDir)
}
if result.Resources[4].Identifier.Type != ResourceTypeRAGServiceKeys {
// Dir, keys, config, spec, and instance are appended last.
if result.Resources[4].Identifier.Type != filesystem.ResourceTypeDir {
t.Errorf("Resources[4].Identifier.Type = %q, want %q",
result.Resources[4].Identifier.Type, ResourceTypeRAGServiceKeys)
result.Resources[4].Identifier.Type, filesystem.ResourceTypeDir)
}
if result.Resources[5].Identifier.Type != ResourceTypeRAGConfig {
if result.Resources[5].Identifier.Type != ResourceTypeRAGServiceKeys {
t.Errorf("Resources[5].Identifier.Type = %q, want %q",
result.Resources[5].Identifier.Type, ResourceTypeRAGConfig)
result.Resources[5].Identifier.Type, ResourceTypeRAGServiceKeys)
}
if result.Resources[6].Identifier.Type != ResourceTypeRAGConfig {
t.Errorf("Resources[6].Identifier.Type = %q, want %q",
result.Resources[6].Identifier.Type, ResourceTypeRAGConfig)
}
if result.Resources[7].Identifier.Type != ResourceTypeServiceInstanceSpec {
t.Errorf("Resources[7].Identifier.Type = %q, want %q",
result.Resources[7].Identifier.Type, ResourceTypeServiceInstanceSpec)
}
if result.Resources[8].Identifier.Type != ResourceTypeServiceInstance {
t.Errorf("Resources[8].Identifier.Type = %q, want %q",
result.Resources[8].Identifier.Type, ResourceTypeServiceInstance)
}
}

func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "storefront-rag-host2",
ServiceSpec: &database.ServiceSpec{
Expand All @@ -204,13 +233,13 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
t.Fatalf("generateRAGInstanceResources() error = %v", err)
}

// 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config = 6 resources.
if len(result.Resources) != 6 {
t.Fatalf("len(Resources) = %d, want 6", len(result.Resources))
// 3 nodes → Network + canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config + spec + instance = 9.
if len(result.Resources) != 9 {
t.Fatalf("len(Resources) = %d, want 9", len(result.Resources))
}

// Canonical (index 0) must be n2 with no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
// Canonical (index 1, after Network) must be n2 with no CredentialSource
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
if err != nil {
t.Fatalf("ToResource canonical: %v", err)
}
Expand All @@ -224,7 +253,7 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
// Per-node resources must cover n1 and n3, not n2
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
perNodeNames := make(map[string]bool)
for i, rd := range result.Resources[1:3] {
for i, rd := range result.Resources[2:4] {
perNode, err := resource.ToResource[*ServiceUserRole](rd)
if err != nil {
t.Fatalf("ToResource per-node[%d]: %v", i, err)
Expand All @@ -243,7 +272,7 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
}

func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "db1-rag-host1",
ServiceSpec: &database.ServiceSpec{
Expand All @@ -268,7 +297,7 @@ func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
}

func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) {
o := &Orchestrator{}
o := newTestOrchestrator()
spec := &database.ServiceInstanceSpec{
ServiceInstanceID: "db1-unknown-host1",
ServiceSpec: &database.ServiceSpec{
Expand Down
11 changes: 8 additions & 3 deletions server/internal/orchestrator/swarm/service_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ServiceInstanceResource struct {
ServiceName string `json:"service_name"`
ServiceID string `json:"service_id"` // Docker Swarm service ID (set by Refresh)
ServiceSpecID string `json:"service_spec_id"` // Logical service ID from the spec (e.g. "mcp-server")
ServiceType string `json:"service_type"` // Service type (e.g. "mcp", "rag", "postgrest")
HostID string `json:"host_id"`
NeedsUpdate bool `json:"needs_update"`
}
Expand All @@ -60,11 +61,15 @@ func (s *ServiceInstanceResource) Executor() resource.Executor {
}

func (s *ServiceInstanceResource) Dependencies() []resource.Identifier {
return []resource.Identifier{
deps := []resource.Identifier{
ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRO),
ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW),
ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID),
}
// RAG only has an RO role; all other service types also require an RW role.
if s.ServiceType != "rag" {
deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW))
}
deps = append(deps, ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID))
return deps
}

func (s *ServiceInstanceResource) TypeDependencies() []resource.Type {
Expand Down
18 changes: 17 additions & 1 deletion server/internal/orchestrator/swarm/service_instance_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package swarm
import (
"context"
"fmt"
"path/filepath"

"github.com/docker/docker/api/types/swarm"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -64,13 +65,21 @@ func (s *ServiceInstanceSpecResource) Dependencies() []resource.Identifier {
deps := []resource.Identifier{
NetworkResourceIdentifier(s.DatabaseNetworkID),
ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRO),
ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW),
}
// RAG only has an RO role; all other service types also have an RW role.
if s.ServiceSpec.ServiceType != "rag" {
deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW))
}
switch s.ServiceSpec.ServiceType {
case "mcp":
deps = append(deps, MCPConfigResourceIdentifier(s.ServiceInstanceID))
case "postgrest":
deps = append(deps, PostgRESTConfigResourceIdentifier(s.ServiceInstanceID))
case "rag":
deps = append(deps,
RAGConfigResourceIdentifier(s.ServiceInstanceID),
RAGServiceKeysResourceIdentifier(s.ServiceInstanceID),
)
default:
log.Warn().Str("service_type", s.ServiceSpec.ServiceType).Msg("unknown service type in dependencies")
}
Expand Down Expand Up @@ -115,6 +124,12 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.
return fmt.Errorf("failed to get service data dir path: %w", err)
}

// Resolve the keys directory path (RAG only): it lives at {dataPath}/keys.
var keysPath string
if s.ServiceSpec.ServiceType == "rag" {
keysPath = filepath.Join(dataPath, "keys")
}

spec, err := ServiceContainerSpec(&ServiceContainerSpecOptions{
ServiceSpec: s.ServiceSpec,
ServiceInstanceID: s.ServiceInstanceID,
Expand All @@ -131,6 +146,7 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.
TargetSessionAttrs: s.TargetSessionAttrs,
Port: s.Port,
DataPath: dataPath,
KeysPath: keysPath,
})
if err != nil {
return fmt.Errorf("failed to generate service container spec: %w", err)
Expand Down
Loading