Skip to content

Commit c48c744

Browse files
committed
feat: deploy RAG server container
1 parent b6b6f3b commit c48c744

7 files changed

Lines changed: 170 additions & 63 deletions

File tree

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,26 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta
653653
// instance. RAG only requires read access, so a single ServiceUserRoleRO is
654654
// created per database node using the same canonical+per-node pattern as MCP.
655655
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
656+
// Get service image.
657+
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
658+
if err != nil {
659+
return nil, fmt.Errorf("failed to get service image: %w", err)
660+
}
661+
656662
// Parse the RAG service config to extract API keys.
657663
ragConfig, errs := database.ParseRAGServiceConfig(spec.ServiceSpec.Config, false)
658664
if len(errs) > 0 {
659665
return nil, fmt.Errorf("failed to parse RAG service config: %w", errors.Join(errs...))
660666
}
661667

668+
// Database network (shared with postgres instances).
669+
databaseNetwork := &Network{
670+
Scope: "swarm",
671+
Driver: OverlayDriver,
672+
Name: fmt.Sprintf("%s-database", spec.DatabaseID),
673+
Allocator: o.dbNetworkAllocator,
674+
}
675+
662676
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
663677

664678
// Canonical read-only role — runs on the node co-located with this instance.
@@ -670,7 +684,7 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
670684
Mode: ServiceUserRoleRO,
671685
}
672686

673-
orchestratorResources := []resource.Resource{canonicalRO}
687+
orchestratorResources := []resource.Resource{databaseNetwork, canonicalRO}
674688

675689
// Per-node RO role for each additional database node so that RAG instances
676690
// on other hosts can authenticate against their co-located Postgres.
@@ -691,12 +705,15 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
691705
// Service data directory resource (host-side bind mount directory).
692706
dataDirID := spec.ServiceInstanceID + "-data"
693707
dataDir := &filesystem.DirResource{
694-
ID: dataDirID,
695-
HostID: spec.HostID,
696-
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
708+
ID: dataDirID,
709+
HostID: spec.HostID,
710+
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
711+
OwnerUID: ragContainerUID,
712+
OwnerGID: ragContainerUID,
697713
}
698714

699715
// API key files resource — writes provider keys into a "keys" subdirectory.
716+
// The keys subdirectory path is resolved at runtime from the parent DirResource.
700717
keysResource := &RAGServiceKeysResource{
701718
ServiceInstanceID: spec.ServiceInstanceID,
702719
HostID: spec.HostID,
@@ -722,7 +739,38 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
722739
DatabasePort: dbPort,
723740
}
724741

725-
orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes)
742+
// Service instance spec resource — holds the computed Docker Swarm service spec.
743+
// KeysDirID is the parent data dir; the actual keys subdir path is derived at runtime.
744+
serviceName := ServiceInstanceName(spec.ServiceSpec.ServiceType, spec.DatabaseID, spec.ServiceSpec.ServiceID, spec.HostID)
745+
serviceInstanceSpec := &ServiceInstanceSpecResource{
746+
ServiceInstanceID: spec.ServiceInstanceID,
747+
ServiceSpec: spec.ServiceSpec,
748+
DatabaseID: spec.DatabaseID,
749+
DatabaseName: spec.DatabaseName,
750+
HostID: spec.HostID,
751+
ServiceName: serviceName,
752+
Hostname: serviceName,
753+
CohortMemberID: o.swarmNodeID,
754+
ServiceImage: serviceImage,
755+
Credentials: spec.Credentials,
756+
DatabaseNetworkID: databaseNetwork.Name,
757+
DatabaseHosts: spec.DatabaseHosts,
758+
Port: spec.Port,
759+
DataDirID: dataDirID,
760+
}
761+
762+
// Service instance resource (actual Docker Swarm service).
763+
serviceInstance := &ServiceInstanceResource{
764+
ServiceInstanceID: spec.ServiceInstanceID,
765+
DatabaseID: spec.DatabaseID,
766+
ServiceName: serviceName,
767+
ServiceID: spec.ServiceSpec.ServiceID,
768+
ServiceSpecID: spec.ServiceSpec.ServiceID,
769+
ServiceType: spec.ServiceSpec.ServiceType,
770+
HostID: spec.HostID,
771+
}
772+
773+
orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes, serviceInstanceSpec, serviceInstance)
726774

727775
return o.buildServiceInstanceResources(spec, orchestratorResources)
728776
}

server/internal/orchestrator/swarm/rag_service_keys_resource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func TestExtractRAGAPIKeys_MultiPipeline(t *testing.T) {
234234
}
235235

236236
func TestGenerateRAGInstanceResources_IncludesKeysResource(t *testing.T) {
237-
o := &Orchestrator{}
237+
o := newTestOrchestrator()
238238
spec := &database.ServiceInstanceSpec{
239239
ServiceInstanceID: "storefront-rag-host1",
240240
ServiceSpec: &database.ServiceSpec{

server/internal/orchestrator/swarm/rag_service_user_role_test.go

Lines changed: 72 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@ package swarm
33
import (
44
"testing"
55

6+
"github.com/pgEdge/control-plane/server/internal/config"
67
"github.com/pgEdge/control-plane/server/internal/database"
78
"github.com/pgEdge/control-plane/server/internal/filesystem"
89
"github.com/pgEdge/control-plane/server/internal/resource"
910
)
1011

12+
// newTestOrchestrator returns an Orchestrator with serviceVersions initialised
13+
// from a minimal config, suitable for unit tests that call generateRAGInstanceResources.
14+
func newTestOrchestrator() *Orchestrator {
15+
return &Orchestrator{
16+
serviceVersions: NewServiceVersions(config.Config{}),
17+
}
18+
}
19+
1120
// minimalRAGConfig returns a minimal valid RAG service config suitable for unit tests.
1221
func minimalRAGConfig() map[string]any {
1322
return map[string]any{
@@ -37,7 +46,7 @@ func minimalRAGConfig() map[string]any {
3746
}
3847

3948
func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
40-
o := &Orchestrator{}
49+
o := newTestOrchestrator()
4150
spec := &database.ServiceInstanceSpec{
4251
ServiceInstanceID: "storefront-rag-host1",
4352
ServiceSpec: &database.ServiceSpec{
@@ -73,34 +82,46 @@ func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
7382
result.ServiceInstance.State, database.ServiceInstanceStateCreating)
7483
}
7584

76-
// Single node: canonical RO ServiceUserRole + DirResource + RAGServiceKeysResource + RAGConfigResource.
77-
if len(result.Resources) != 4 {
78-
t.Fatalf("len(Resources) = %d, want 4", len(result.Resources))
85+
// Single node: Network + canonical RO + DirResource + Keys + Config + InstanceSpec + ServiceInstance = 7.
86+
if len(result.Resources) != 7 {
87+
t.Fatalf("len(Resources) = %d, want 7", len(result.Resources))
7988
}
80-
if result.Resources[0].Identifier.Type != ResourceTypeServiceUserRole {
89+
if result.Resources[0].Identifier.Type != ResourceTypeNetwork {
8190
t.Errorf("Resources[0].Identifier.Type = %q, want %q",
82-
result.Resources[0].Identifier.Type, ResourceTypeServiceUserRole)
83-
}
84-
wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
85-
if result.Resources[0].Identifier != wantID {
86-
t.Errorf("Resources[0].Identifier = %v, want %v", result.Resources[0].Identifier, wantID)
91+
result.Resources[0].Identifier.Type, ResourceTypeNetwork)
8792
}
88-
if result.Resources[1].Identifier.Type != filesystem.ResourceTypeDir {
93+
if result.Resources[1].Identifier.Type != ResourceTypeServiceUserRole {
8994
t.Errorf("Resources[1].Identifier.Type = %q, want %q",
90-
result.Resources[1].Identifier.Type, filesystem.ResourceTypeDir)
95+
result.Resources[1].Identifier.Type, ResourceTypeServiceUserRole)
9196
}
92-
if result.Resources[2].Identifier.Type != ResourceTypeRAGServiceKeys {
97+
wantID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
98+
if result.Resources[1].Identifier != wantID {
99+
t.Errorf("Resources[1].Identifier = %v, want %v", result.Resources[1].Identifier, wantID)
100+
}
101+
if result.Resources[2].Identifier.Type != filesystem.ResourceTypeDir {
93102
t.Errorf("Resources[2].Identifier.Type = %q, want %q",
94-
result.Resources[2].Identifier.Type, ResourceTypeRAGServiceKeys)
103+
result.Resources[2].Identifier.Type, filesystem.ResourceTypeDir)
95104
}
96-
if result.Resources[3].Identifier.Type != ResourceTypeRAGConfig {
105+
if result.Resources[3].Identifier.Type != ResourceTypeRAGServiceKeys {
97106
t.Errorf("Resources[3].Identifier.Type = %q, want %q",
98-
result.Resources[3].Identifier.Type, ResourceTypeRAGConfig)
107+
result.Resources[3].Identifier.Type, ResourceTypeRAGServiceKeys)
108+
}
109+
if result.Resources[4].Identifier.Type != ResourceTypeRAGConfig {
110+
t.Errorf("Resources[4].Identifier.Type = %q, want %q",
111+
result.Resources[4].Identifier.Type, ResourceTypeRAGConfig)
112+
}
113+
if result.Resources[5].Identifier.Type != ResourceTypeServiceInstanceSpec {
114+
t.Errorf("Resources[5].Identifier.Type = %q, want %q",
115+
result.Resources[5].Identifier.Type, ResourceTypeServiceInstanceSpec)
116+
}
117+
if result.Resources[6].Identifier.Type != ResourceTypeServiceInstance {
118+
t.Errorf("Resources[6].Identifier.Type = %q, want %q",
119+
result.Resources[6].Identifier.Type, ResourceTypeServiceInstance)
99120
}
100121
}
101122

102123
func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
103-
o := &Orchestrator{}
124+
o := newTestOrchestrator()
104125
spec := &database.ServiceInstanceSpec{
105126
ServiceInstanceID: "storefront-rag-host1",
106127
ServiceSpec: &database.ServiceSpec{
@@ -125,19 +146,19 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
125146
t.Fatalf("generateRAGInstanceResources() error = %v", err)
126147
}
127148

128-
// 3 nodes → canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config = 6 resources.
129-
if len(result.Resources) != 6 {
130-
t.Fatalf("len(Resources) = %d, want 6", len(result.Resources))
149+
// 3 nodes → Network + canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config + spec + instance = 9.
150+
if len(result.Resources) != 9 {
151+
t.Fatalf("len(Resources) = %d, want 9", len(result.Resources))
131152
}
132-
// First three must be ServiceUserRole resources.
133-
for i := 0; i < 3; i++ {
153+
// Resources[0] is Network; Resources[1..3] are ServiceUserRole resources.
154+
for i := 1; i < 4; i++ {
134155
if result.Resources[i].Identifier.Type != ResourceTypeServiceUserRole {
135156
t.Errorf("resource[%d] type = %q, want %q", i, result.Resources[i].Identifier.Type, ResourceTypeServiceUserRole)
136157
}
137158
}
138159

139-
// Canonical is first and has no CredentialSource
140-
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
160+
// Canonical is index 1 and has no CredentialSource
161+
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
141162
if err != nil {
142163
t.Fatalf("ToResource canonical: %v", err)
143164
}
@@ -150,7 +171,7 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
150171

151172
// Per-node resources point back to canonical
152173
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
153-
for i, rd := range result.Resources[1:3] {
174+
for i, rd := range result.Resources[2:4] {
154175
perNode, err := resource.ToResource[*ServiceUserRole](rd)
155176
if err != nil {
156177
t.Fatalf("ToResource per-node[%d]: %v", i, err)
@@ -163,23 +184,31 @@ func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
163184
}
164185
}
165186

166-
// Data dir, keys, and config resource are appended last.
167-
if result.Resources[3].Identifier.Type != filesystem.ResourceTypeDir {
168-
t.Errorf("Resources[3].Identifier.Type = %q, want %q",
169-
result.Resources[3].Identifier.Type, filesystem.ResourceTypeDir)
170-
}
171-
if result.Resources[4].Identifier.Type != ResourceTypeRAGServiceKeys {
187+
// Dir, keys, config, spec, and instance are appended last.
188+
if result.Resources[4].Identifier.Type != filesystem.ResourceTypeDir {
172189
t.Errorf("Resources[4].Identifier.Type = %q, want %q",
173-
result.Resources[4].Identifier.Type, ResourceTypeRAGServiceKeys)
190+
result.Resources[4].Identifier.Type, filesystem.ResourceTypeDir)
174191
}
175-
if result.Resources[5].Identifier.Type != ResourceTypeRAGConfig {
192+
if result.Resources[5].Identifier.Type != ResourceTypeRAGServiceKeys {
176193
t.Errorf("Resources[5].Identifier.Type = %q, want %q",
177-
result.Resources[5].Identifier.Type, ResourceTypeRAGConfig)
194+
result.Resources[5].Identifier.Type, ResourceTypeRAGServiceKeys)
195+
}
196+
if result.Resources[6].Identifier.Type != ResourceTypeRAGConfig {
197+
t.Errorf("Resources[6].Identifier.Type = %q, want %q",
198+
result.Resources[6].Identifier.Type, ResourceTypeRAGConfig)
199+
}
200+
if result.Resources[7].Identifier.Type != ResourceTypeServiceInstanceSpec {
201+
t.Errorf("Resources[7].Identifier.Type = %q, want %q",
202+
result.Resources[7].Identifier.Type, ResourceTypeServiceInstanceSpec)
203+
}
204+
if result.Resources[8].Identifier.Type != ResourceTypeServiceInstance {
205+
t.Errorf("Resources[8].Identifier.Type = %q, want %q",
206+
result.Resources[8].Identifier.Type, ResourceTypeServiceInstance)
178207
}
179208
}
180209

181210
func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) {
182-
o := &Orchestrator{}
211+
o := newTestOrchestrator()
183212
spec := &database.ServiceInstanceSpec{
184213
ServiceInstanceID: "storefront-rag-host2",
185214
ServiceSpec: &database.ServiceSpec{
@@ -204,13 +233,13 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
204233
t.Fatalf("generateRAGInstanceResources() error = %v", err)
205234
}
206235

207-
// 3 nodes → canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config = 6 resources.
208-
if len(result.Resources) != 6 {
209-
t.Fatalf("len(Resources) = %d, want 6", len(result.Resources))
236+
// 3 nodes → Network + canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config + spec + instance = 9.
237+
if len(result.Resources) != 9 {
238+
t.Fatalf("len(Resources) = %d, want 9", len(result.Resources))
210239
}
211240

212-
// Canonical (index 0) must be n2 with no CredentialSource
213-
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[0])
241+
// Canonical (index 1, after Network) must be n2 with no CredentialSource
242+
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
214243
if err != nil {
215244
t.Fatalf("ToResource canonical: %v", err)
216245
}
@@ -224,7 +253,7 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
224253
// Per-node resources must cover n1 and n3, not n2
225254
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
226255
perNodeNames := make(map[string]bool)
227-
for i, rd := range result.Resources[1:3] {
256+
for i, rd := range result.Resources[2:4] {
228257
perNode, err := resource.ToResource[*ServiceUserRole](rd)
229258
if err != nil {
230259
t.Fatalf("ToResource per-node[%d]: %v", i, err)
@@ -243,7 +272,7 @@ func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T)
243272
}
244273

245274
func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
246-
o := &Orchestrator{}
275+
o := newTestOrchestrator()
247276
spec := &database.ServiceInstanceSpec{
248277
ServiceInstanceID: "db1-rag-host1",
249278
ServiceSpec: &database.ServiceSpec{
@@ -268,7 +297,7 @@ func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
268297
}
269298

270299
func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) {
271-
o := &Orchestrator{}
300+
o := newTestOrchestrator()
272301
spec := &database.ServiceInstanceSpec{
273302
ServiceInstanceID: "db1-unknown-host1",
274303
ServiceSpec: &database.ServiceSpec{

server/internal/orchestrator/swarm/service_instance.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type ServiceInstanceResource struct {
3434
ServiceName string `json:"service_name"`
3535
ServiceID string `json:"service_id"` // Docker Swarm service ID (set by Refresh)
3636
ServiceSpecID string `json:"service_spec_id"` // Logical service ID from the spec (e.g. "mcp-server")
37+
ServiceType string `json:"service_type"` // Service type (e.g. "mcp", "rag", "postgrest")
3738
HostID string `json:"host_id"`
3839
NeedsUpdate bool `json:"needs_update"`
3940
}
@@ -60,11 +61,15 @@ func (s *ServiceInstanceResource) Executor() resource.Executor {
6061
}
6162

6263
func (s *ServiceInstanceResource) Dependencies() []resource.Identifier {
63-
return []resource.Identifier{
64+
deps := []resource.Identifier{
6465
ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRO),
65-
ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW),
66-
ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID),
6766
}
67+
// RAG only has an RO role; all other service types also require an RW role.
68+
if s.ServiceType != "rag" {
69+
deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpecID, ServiceUserRoleRW))
70+
}
71+
deps = append(deps, ServiceInstanceSpecResourceIdentifier(s.ServiceInstanceID))
72+
return deps
6873
}
6974

7075
func (s *ServiceInstanceResource) TypeDependencies() []resource.Type {

server/internal/orchestrator/swarm/service_instance_spec.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package swarm
33
import (
44
"context"
55
"fmt"
6+
"path/filepath"
67

78
"github.com/docker/docker/api/types/swarm"
89
"github.com/rs/zerolog/log"
@@ -64,13 +65,18 @@ func (s *ServiceInstanceSpecResource) Dependencies() []resource.Identifier {
6465
deps := []resource.Identifier{
6566
NetworkResourceIdentifier(s.DatabaseNetworkID),
6667
ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRO),
67-
ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW),
68+
}
69+
// RAG only has an RO role; all other service types also have an RW role.
70+
if s.ServiceSpec.ServiceType != "rag" {
71+
deps = append(deps, ServiceUserRoleIdentifier(s.ServiceSpec.ServiceID, ServiceUserRoleRW))
6872
}
6973
switch s.ServiceSpec.ServiceType {
7074
case "mcp":
7175
deps = append(deps, MCPConfigResourceIdentifier(s.ServiceInstanceID))
7276
case "postgrest":
7377
deps = append(deps, PostgRESTConfigResourceIdentifier(s.ServiceInstanceID))
78+
case "rag":
79+
deps = append(deps, RAGConfigResourceIdentifier(s.ServiceInstanceID))
7480
default:
7581
log.Warn().Str("service_type", s.ServiceSpec.ServiceType).Msg("unknown service type in dependencies")
7682
}
@@ -115,6 +121,12 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.
115121
return fmt.Errorf("failed to get service data dir path: %w", err)
116122
}
117123

124+
// Resolve the keys directory path (RAG only): it lives at {dataPath}/keys.
125+
var keysPath string
126+
if s.ServiceSpec.ServiceType == "rag" {
127+
keysPath = filepath.Join(dataPath, "keys")
128+
}
129+
118130
spec, err := ServiceContainerSpec(&ServiceContainerSpecOptions{
119131
ServiceSpec: s.ServiceSpec,
120132
ServiceInstanceID: s.ServiceInstanceID,
@@ -131,6 +143,7 @@ func (s *ServiceInstanceSpecResource) Refresh(ctx context.Context, rc *resource.
131143
TargetSessionAttrs: s.TargetSessionAttrs,
132144
Port: s.Port,
133145
DataPath: dataPath,
146+
KeysPath: keysPath,
134147
})
135148
if err != nil {
136149
return fmt.Errorf("failed to generate service container spec: %w", err)

0 commit comments

Comments
 (0)