Skip to content

Commit e586e31

Browse files
authored
feat: deploy RAG server container (#333)
* feat: deploy RAG server container
1 parent 93f273f commit e586e31

8 files changed

Lines changed: 368 additions & 309 deletions

File tree

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,37 @@ func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInsta
653653
// instance. RAG only requires read access, so a single ServiceUserRoleRO is
654654
// created per database node using the same canonical+per-node pattern as MCP.
655655
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
656+
// Get service image.
657+
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
658+
if err != nil {
659+
return nil, fmt.Errorf("failed to get service image: %w", err)
660+
}
661+
662+
// Validate compatibility with database version.
663+
if spec.PgEdgeVersion != nil {
664+
if err := serviceImage.ValidateCompatibility(
665+
spec.PgEdgeVersion.PostgresVersion,
666+
spec.PgEdgeVersion.SpockVersion,
667+
); err != nil {
668+
return nil, fmt.Errorf("service %q version %q is not compatible with this database: %w",
669+
spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version, err)
670+
}
671+
}
672+
656673
// Parse the RAG service config to extract API keys.
657674
ragConfig, errs := database.ParseRAGServiceConfig(spec.ServiceSpec.Config, false)
658675
if len(errs) > 0 {
659676
return nil, fmt.Errorf("failed to parse RAG service config: %w", errors.Join(errs...))
660677
}
661678

679+
// Database network (shared with postgres instances).
680+
databaseNetwork := &Network{
681+
Scope: "swarm",
682+
Driver: OverlayDriver,
683+
Name: fmt.Sprintf("%s-database", spec.DatabaseID),
684+
Allocator: o.dbNetworkAllocator,
685+
}
686+
662687
canonicalROID := ServiceUserRoleIdentifier(spec.ServiceSpec.ServiceID, ServiceUserRoleRO)
663688

664689
// Canonical read-only role — runs on the node co-located with this instance.
@@ -670,7 +695,7 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
670695
Mode: ServiceUserRoleRO,
671696
}
672697

673-
orchestratorResources := []resource.Resource{canonicalRO}
698+
orchestratorResources := []resource.Resource{databaseNetwork, canonicalRO}
674699

675700
// Per-node RO role for each additional database node so that RAG instances
676701
// on other hosts can authenticate against their co-located Postgres.
@@ -691,12 +716,15 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
691716
// Service data directory resource (host-side bind mount directory).
692717
dataDirID := spec.ServiceInstanceID + "-data"
693718
dataDir := &filesystem.DirResource{
694-
ID: dataDirID,
695-
HostID: spec.HostID,
696-
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
719+
ID: dataDirID,
720+
HostID: spec.HostID,
721+
Path: filepath.Join(o.cfg.DataDir, "services", spec.ServiceInstanceID),
722+
OwnerUID: ragContainerUID,
723+
OwnerGID: ragContainerUID,
697724
}
698725

699726
// API key files resource — writes provider keys into a "keys" subdirectory.
727+
// The keys subdirectory path is resolved at runtime from the parent DirResource.
700728
keysResource := &RAGServiceKeysResource{
701729
ServiceInstanceID: spec.ServiceInstanceID,
702730
HostID: spec.HostID,
@@ -722,7 +750,38 @@ func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstan
722750
DatabasePort: dbPort,
723751
}
724752

725-
orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes)
753+
// Service instance spec resource — holds the computed Docker Swarm service spec.
754+
// KeysDirID is the parent data dir; the actual keys subdir path is derived at runtime.
755+
serviceName := ServiceInstanceName(spec.ServiceSpec.ServiceType, spec.DatabaseID, spec.ServiceSpec.ServiceID, spec.HostID)
756+
serviceInstanceSpec := &ServiceInstanceSpecResource{
757+
ServiceInstanceID: spec.ServiceInstanceID,
758+
ServiceSpec: spec.ServiceSpec,
759+
DatabaseID: spec.DatabaseID,
760+
DatabaseName: spec.DatabaseName,
761+
HostID: spec.HostID,
762+
ServiceName: serviceName,
763+
Hostname: serviceName,
764+
CohortMemberID: o.swarmNodeID,
765+
ServiceImage: serviceImage,
766+
Credentials: spec.Credentials,
767+
DatabaseNetworkID: databaseNetwork.Name,
768+
DatabaseHosts: spec.DatabaseHosts,
769+
Port: spec.Port,
770+
DataDirID: dataDirID,
771+
}
772+
773+
// Service instance resource (actual Docker Swarm service).
774+
serviceInstance := &ServiceInstanceResource{
775+
ServiceInstanceID: spec.ServiceInstanceID,
776+
DatabaseID: spec.DatabaseID,
777+
ServiceName: serviceName,
778+
ServiceID: spec.ServiceSpec.ServiceID,
779+
ServiceSpecID: spec.ServiceSpec.ServiceID,
780+
ServiceType: spec.ServiceSpec.ServiceType,
781+
HostID: spec.HostID,
782+
}
783+
784+
orchestratorResources = append(orchestratorResources, dataDir, keysResource, ragConfigRes, serviceInstanceSpec, serviceInstance)
726785

727786
return o.buildServiceInstanceResources(spec, orchestratorResources)
728787
}
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package swarm
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/pgEdge/control-plane/server/internal/config"
10+
"github.com/pgEdge/control-plane/server/internal/database"
11+
"github.com/pgEdge/control-plane/server/internal/ds"
12+
"github.com/pgEdge/control-plane/server/internal/filesystem"
13+
"github.com/pgEdge/control-plane/server/internal/resource"
14+
)
15+
16+
// newTestOrchestrator returns an Orchestrator with serviceVersions initialised
17+
// from a minimal config, suitable for unit tests that call generateRAGInstanceResources.
18+
func newTestOrchestrator() *Orchestrator {
19+
return &Orchestrator{
20+
serviceVersions: NewServiceVersions(config.Config{}),
21+
}
22+
}
23+
24+
// minimalRAGConfig returns a minimal valid RAG service config suitable for unit tests.
25+
func minimalRAGConfig() map[string]any {
26+
return map[string]any{
27+
"pipelines": []any{
28+
map[string]any{
29+
"name": "default",
30+
"tables": []any{
31+
map[string]any{
32+
"table": "docs",
33+
"text_column": "content",
34+
"vector_column": "embedding",
35+
},
36+
},
37+
"embedding_llm": map[string]any{
38+
"provider": "openai",
39+
"model": "text-embedding-3-small",
40+
"api_key": "sk-embed",
41+
},
42+
"rag_llm": map[string]any{
43+
"provider": "anthropic",
44+
"model": "claude-sonnet-4-5",
45+
"api_key": "sk-ant",
46+
},
47+
},
48+
},
49+
}
50+
}
51+
52+
func TestGenerateRAGInstanceResources_ResourceList(t *testing.T) {
53+
o := newTestOrchestrator()
54+
spec := &database.ServiceInstanceSpec{
55+
ServiceInstanceID: "storefront-rag-host1",
56+
ServiceSpec: &database.ServiceSpec{
57+
ServiceID: "rag",
58+
ServiceType: "rag",
59+
Version: "latest",
60+
Config: minimalRAGConfig(),
61+
},
62+
DatabaseID: "storefront",
63+
DatabaseName: "storefront",
64+
HostID: "host-1",
65+
NodeName: "n1",
66+
}
67+
68+
result, err := o.generateRAGInstanceResources(spec)
69+
require.NoError(t, err)
70+
71+
require.NotNil(t, result.ServiceInstance)
72+
assert.Equal(t, spec.ServiceInstanceID, result.ServiceInstance.ServiceInstanceID)
73+
assert.Equal(t, spec.HostID, result.ServiceInstance.HostID)
74+
assert.Equal(t, database.ServiceInstanceStateCreating, result.ServiceInstance.State)
75+
76+
// Single node: Network + canonical RO + DirResource + Keys + Config + InstanceSpec + ServiceInstance = 7.
77+
require.Len(t, result.Resources, 7)
78+
assert.Equal(t, ResourceTypeNetwork, result.Resources[0].Identifier.Type)
79+
assert.Equal(t, ResourceTypeServiceUserRole, result.Resources[1].Identifier.Type)
80+
assert.Equal(t, ServiceUserRoleIdentifier("rag", ServiceUserRoleRO), result.Resources[1].Identifier)
81+
assert.Equal(t, filesystem.ResourceTypeDir, result.Resources[2].Identifier.Type)
82+
assert.Equal(t, ResourceTypeRAGServiceKeys, result.Resources[3].Identifier.Type)
83+
assert.Equal(t, ResourceTypeRAGConfig, result.Resources[4].Identifier.Type)
84+
assert.Equal(t, ResourceTypeServiceInstanceSpec, result.Resources[5].Identifier.Type)
85+
assert.Equal(t, ResourceTypeServiceInstance, result.Resources[6].Identifier.Type)
86+
}
87+
88+
func TestGenerateRAGInstanceResources_MultiNode(t *testing.T) {
89+
o := newTestOrchestrator()
90+
spec := &database.ServiceInstanceSpec{
91+
ServiceInstanceID: "storefront-rag-host1",
92+
ServiceSpec: &database.ServiceSpec{
93+
ServiceID: "rag",
94+
ServiceType: "rag",
95+
Version: "latest",
96+
Config: minimalRAGConfig(),
97+
},
98+
DatabaseID: "storefront",
99+
DatabaseName: "storefront",
100+
HostID: "host-1",
101+
NodeName: "n1",
102+
DatabaseNodes: []*database.NodeInstances{
103+
{NodeName: "n1"},
104+
{NodeName: "n2"},
105+
{NodeName: "n3"},
106+
},
107+
}
108+
109+
result, err := o.generateRAGInstanceResources(spec)
110+
require.NoError(t, err)
111+
112+
// 3 nodes → Network + canonical(n1) + per-node(n2) + per-node(n3) + dir + keys + config + spec + instance = 9.
113+
require.Len(t, result.Resources, 9)
114+
115+
// Resources[0] is Network; Resources[1..3] are ServiceUserRole resources.
116+
for i := 1; i < 4; i++ {
117+
assert.Equal(t, ResourceTypeServiceUserRole, result.Resources[i].Identifier.Type)
118+
}
119+
120+
// Canonical is index 1 and has no CredentialSource.
121+
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
122+
require.NoError(t, err)
123+
assert.Nil(t, canonical.CredentialSource)
124+
assert.Equal(t, ServiceUserRoleRO, canonical.Mode)
125+
126+
// Per-node resources point back to canonical.
127+
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
128+
for i, rd := range result.Resources[2:4] {
129+
perNode, err := resource.ToResource[*ServiceUserRole](rd)
130+
require.NoErrorf(t, err, "ToResource per-node[%d]", i)
131+
assert.Equalf(t, &canonicalID, perNode.CredentialSource, "per-node[%d].CredentialSource", i)
132+
assert.Equalf(t, ServiceUserRoleRO, perNode.Mode, "per-node[%d].Mode", i)
133+
}
134+
135+
assert.Equal(t, filesystem.ResourceTypeDir, result.Resources[4].Identifier.Type)
136+
assert.Equal(t, ResourceTypeRAGServiceKeys, result.Resources[5].Identifier.Type)
137+
assert.Equal(t, ResourceTypeRAGConfig, result.Resources[6].Identifier.Type)
138+
assert.Equal(t, ResourceTypeServiceInstanceSpec, result.Resources[7].Identifier.Type)
139+
assert.Equal(t, ResourceTypeServiceInstance, result.Resources[8].Identifier.Type)
140+
}
141+
142+
func TestGenerateRAGInstanceResources_MultiNode_CanonicalNotFirst(t *testing.T) {
143+
o := newTestOrchestrator()
144+
spec := &database.ServiceInstanceSpec{
145+
ServiceInstanceID: "storefront-rag-host2",
146+
ServiceSpec: &database.ServiceSpec{
147+
ServiceID: "rag",
148+
ServiceType: "rag",
149+
Version: "latest",
150+
Config: minimalRAGConfig(),
151+
},
152+
DatabaseID: "storefront",
153+
DatabaseName: "storefront",
154+
HostID: "host-2",
155+
NodeName: "n2", // canonical is n2, not at index 0
156+
DatabaseNodes: []*database.NodeInstances{
157+
{NodeName: "n1"},
158+
{NodeName: "n2"},
159+
{NodeName: "n3"},
160+
},
161+
}
162+
163+
result, err := o.generateRAGInstanceResources(spec)
164+
require.NoError(t, err)
165+
166+
// 3 nodes → Network + canonical(n2) + per-node(n1) + per-node(n3) + dir + keys + config + spec + instance = 9.
167+
require.Len(t, result.Resources, 9)
168+
169+
// Canonical (index 1, after Network) must be n2 with no CredentialSource.
170+
canonical, err := resource.ToResource[*ServiceUserRole](result.Resources[1])
171+
require.NoError(t, err)
172+
assert.Nil(t, canonical.CredentialSource)
173+
assert.Equal(t, "n2", canonical.NodeName)
174+
175+
// Per-node resources must cover n1 and n3, not n2.
176+
canonicalID := ServiceUserRoleIdentifier("rag", ServiceUserRoleRO)
177+
perNodeNames := make(map[string]bool)
178+
for i, rd := range result.Resources[2:4] {
179+
perNode, err := resource.ToResource[*ServiceUserRole](rd)
180+
require.NoErrorf(t, err, "ToResource per-node[%d]", i)
181+
assert.Equalf(t, &canonicalID, perNode.CredentialSource, "per-node[%d].CredentialSource", i)
182+
perNodeNames[perNode.NodeName] = true
183+
}
184+
assert.False(t, perNodeNames["n2"], "canonical node n2 must not appear in per-node resources")
185+
assert.True(t, perNodeNames["n1"], "n1 must be a per-node resource")
186+
assert.True(t, perNodeNames["n3"], "n3 must be a per-node resource")
187+
}
188+
189+
func TestGenerateServiceInstanceResources_RAGDispatch(t *testing.T) {
190+
o := newTestOrchestrator()
191+
spec := &database.ServiceInstanceSpec{
192+
ServiceInstanceID: "db1-rag-host1",
193+
ServiceSpec: &database.ServiceSpec{
194+
ServiceID: "rag",
195+
ServiceType: "rag",
196+
Version: "latest",
197+
Config: minimalRAGConfig(),
198+
},
199+
DatabaseID: "db1",
200+
DatabaseName: "db1",
201+
HostID: "host-1",
202+
NodeName: "n1",
203+
}
204+
205+
result, err := o.GenerateServiceInstanceResources(spec)
206+
require.NoError(t, err)
207+
require.NotNil(t, result)
208+
}
209+
210+
func TestGenerateServiceInstanceResources_UnknownTypeReturnsError(t *testing.T) {
211+
o := newTestOrchestrator()
212+
spec := &database.ServiceInstanceSpec{
213+
ServiceInstanceID: "db1-unknown-host1",
214+
ServiceSpec: &database.ServiceSpec{
215+
ServiceID: "unknown",
216+
ServiceType: "unknown",
217+
Version: "latest",
218+
},
219+
DatabaseID: "db1",
220+
DatabaseName: "db1",
221+
HostID: "host-1",
222+
NodeName: "n1",
223+
}
224+
225+
_, err := o.GenerateServiceInstanceResources(spec)
226+
require.Error(t, err)
227+
}
228+
229+
func TestGenerateRAGInstanceResources_IncompatibleVersion(t *testing.T) {
230+
o := newTestOrchestrator()
231+
// Override the "rag/latest" image with a constraint requiring PG >= 18.
232+
o.serviceVersions.addServiceImage("rag", "latest", &ServiceImage{
233+
Tag: "rag-server:latest",
234+
PostgresConstraint: &ds.VersionConstraint{
235+
Min: ds.MustParseVersion("18"),
236+
},
237+
})
238+
239+
spec := &database.ServiceInstanceSpec{
240+
ServiceInstanceID: "db1-rag-host1",
241+
ServiceSpec: &database.ServiceSpec{
242+
ServiceID: "rag",
243+
ServiceType: "rag",
244+
Version: "latest",
245+
Config: minimalRAGConfig(),
246+
},
247+
DatabaseID: "db1",
248+
DatabaseName: "db1",
249+
HostID: "host-1",
250+
NodeName: "n1",
251+
PgEdgeVersion: ds.MustPgEdgeVersion("17", "5.0.0"),
252+
}
253+
254+
_, err := o.generateRAGInstanceResources(spec)
255+
require.ErrorContains(t, err, "not compatible")
256+
}

server/internal/orchestrator/swarm/rag_service_keys_resource_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ func TestExtractRAGAPIKeys_MultiPipeline(t *testing.T) {
234234
}
235235

236236
func TestGenerateRAGInstanceResources_IncludesKeysResource(t *testing.T) {
237-
o := &Orchestrator{}
237+
o := newTestOrchestrator()
238238
spec := &database.ServiceInstanceSpec{
239239
ServiceInstanceID: "storefront-rag-host1",
240240
ServiceSpec: &database.ServiceSpec{

0 commit comments

Comments
 (0)