Skip to content

Commit 60035c6

Browse files
authored
refactor(server): extract kubernetes compute driver (NVIDIA#817)
1 parent fdca543 commit 60035c6

31 files changed

Lines changed: 2858 additions & 1188 deletions

File tree

.agents/skills/debug-openshell-cluster/SKILL.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ Component images (server, sandbox) can reach kubelet via two paths:
182182

183183
**Local/external pull mode** (default local via `mise run cluster`): Local images are tagged to the configured local registry base (default `127.0.0.1:5000/openshell/*`), pushed to that registry, and pulled by k3s via `registries.yaml` mirror endpoint (typically `host.docker.internal:5000`). The `cluster` task pushes prebuilt local tags (`openshell/*:dev`, falling back to `localhost:5000/openshell/*:dev` or `127.0.0.1:5000/openshell/*:dev`).
184184

185+
Gateway image builds now stage a partial Rust workspace from `deploy/docker/Dockerfile.images`. If cargo fails with a missing manifest under `/build/crates/...`, verify that every current gateway dependency crate (including `openshell-driver-kubernetes`) is copied into the staged workspace there.
186+
185187
```bash
186188
# Verify image refs currently used by openshell deployment
187189
openshell doctor exec -- kubectl -n openshell get statefulset openshell -o jsonpath="{.spec.template.spec.containers[*].image}"

Cargo.lock

Lines changed: 23 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

architecture/gateway.md

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,21 @@ graph TD
7070
| Persistence | `crates/openshell-server/src/persistence/mod.rs` | `Store` enum (SQLite/Postgres), generic object CRUD, protobuf codec |
7171
| Persistence: SQLite | `crates/openshell-server/src/persistence/sqlite.rs` | `SqliteStore` with sqlx |
7272
| Persistence: Postgres | `crates/openshell-server/src/persistence/postgres.rs` | `PostgresStore` with sqlx |
73-
| Sandbox K8s | `crates/openshell-server/src/sandbox/mod.rs` | `SandboxClient`, CRD creation/deletion, Kubernetes watcher, phase derivation |
73+
| Compute runtime | `crates/openshell-server/src/compute/mod.rs` | `ComputeRuntime`, gateway-owned sandbox lifecycle orchestration over a compute backend |
74+
| Compute driver: Kubernetes | `crates/openshell-driver-kubernetes/src/driver.rs` | Kubernetes CRD create/delete, endpoint resolution, watch stream, pod template translation |
7475
| Sandbox index | `crates/openshell-server/src/sandbox_index.rs` | `SandboxIndex` -- in-memory name/pod-to-id correlation |
75-
| Watch bus | `crates/openshell-server/src/sandbox_watch.rs` | `SandboxWatchBus`, `PlatformEventBus`, Kubernetes event tailer |
76+
| Watch bus | `crates/openshell-server/src/sandbox_watch.rs` | `SandboxWatchBus` -- in-memory broadcast for persisted sandbox updates |
7677
| Tracing bus | `crates/openshell-server/src/tracing_bus.rs` | `TracingLogBus` -- captures tracing events keyed by `sandbox_id` |
7778

7879
Proto definitions consumed by the gateway:
7980

8081
| Proto file | Package | Defines |
8182
|------------|---------|---------|
82-
| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, sandbox/provider/SSH/watch messages |
83+
| `proto/openshell.proto` | `openshell.v1` | `OpenShell` service, public sandbox resource model, provider/SSH/watch messages |
84+
| `proto/compute_driver.proto` | `openshell.compute.v1` | Internal `ComputeDriver` service, driver-native sandbox observations, endpoint resolution, compute watch stream envelopes |
8385
| `proto/inference.proto` | `openshell.inference.v1` | `Inference` service: `SetClusterInference`, `GetClusterInference`, `GetInferenceBundle` |
84-
| `proto/datamodel.proto` | `openshell.datamodel.v1` | `Sandbox`, `SandboxSpec`, `SandboxStatus`, `Provider`, `SandboxPhase` |
85-
| `proto/sandbox.proto` | `openshell.sandbox.v1` | `SandboxPolicy`, `NetworkPolicyRule`, `SettingValue`, `EffectiveSetting`, `SettingScope`, `PolicySource`, `GetSandboxSettingsRequest/Response`, `GetGatewaySettingsRequest/Response` |
86+
| `proto/datamodel.proto` | `openshell.datamodel.v1` | `Provider` |
87+
| `proto/sandbox.proto` | `openshell.sandbox.v1` | Sandbox supervisor policy, settings, and config messages |
8688

8789
## Startup Sequence
8890

@@ -94,11 +96,10 @@ The gateway boots in `main()` (`crates/openshell-server/src/main.rs`) and procee
9496
4. **Build `Config`** -- Assembles a `openshell_core::Config` from the parsed arguments.
9597
5. **Call `run_server()`** (`crates/openshell-server/src/lib.rs`):
9698
1. Connect to the persistence store (`Store::connect`), which auto-detects SQLite vs Postgres from the URL prefix and runs migrations.
97-
2. Create `SandboxClient` (initializes a `kube::Client` from in-cluster or kubeconfig).
99+
2. Create `ComputeRuntime` with the in-process Kubernetes compute backend (`KubernetesComputeDriver`).
98100
3. Build `ServerState` (shared via `Arc<ServerState>` across all handlers).
99101
4. **Spawn background tasks**:
100-
- `spawn_sandbox_watcher` -- watches Kubernetes Sandbox CRDs and syncs state to the store.
101-
- `spawn_kube_event_tailer` -- watches Kubernetes Events in the sandbox namespace and publishes them to the `PlatformEventBus`.
102+
- `ComputeRuntime::spawn_watchers` -- consumes the compute-driver watch stream, updates persisted sandbox records, and republishes platform events.
102103
5. Create `MultiplexService`.
103104
6. Bind `TcpListener` on `config.bind_address`.
104105
7. Optionally create `TlsAcceptor` from cert/key files.
@@ -137,7 +138,7 @@ All handlers share an `Arc<ServerState>` (`crates/openshell-server/src/lib.rs`):
137138
pub struct ServerState {
138139
pub config: Config,
139140
pub store: Arc<Store>,
140-
pub sandbox_client: SandboxClient,
141+
pub compute: ComputeRuntime,
141142
pub sandbox_index: SandboxIndex,
142143
pub sandbox_watch_bus: SandboxWatchBus,
143144
pub tracing_log_bus: TracingLogBus,
@@ -148,10 +149,10 @@ pub struct ServerState {
148149
```
149150

150151
- **`store`** -- persistence backend (SQLite or Postgres) for all object types.
151-
- **`sandbox_client`** -- Kubernetes client scoped to the sandbox namespace; creates/deletes CRDs and resolves pod IPs.
152-
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Used by the event tailer to correlate Kubernetes events.
152+
- **`compute`** -- gateway-owned compute orchestration. Persists sandbox lifecycle transitions, validates create requests through the compute backend, resolves exec/SSH endpoints, consumes the backend watch stream, and periodically reconciles orphaned `Provisioning` records that no longer have a backing compute resource.
153+
- **`sandbox_index`** -- in-memory bidirectional index mapping sandbox names and agent pod names to sandbox IDs. Updated from compute-driver sandbox snapshots.
153154
- **`sandbox_watch_bus`** -- `broadcast`-based notification bus keyed by sandbox ID. Producers call `notify(&id)` when the persisted sandbox record changes; consumers in `WatchSandbox` streams receive `()` signals and re-read the record.
154-
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for Kubernetes events.
155+
- **`tracing_log_bus`** -- captures `tracing` events that include a `sandbox_id` field and republishes them as `SandboxLogLine` messages. Maintains a per-sandbox tail buffer (default 200 entries). Also contains a nested `PlatformEventBus` for compute-driver platform events.
155156
- **`settings_mutex`** -- serializes settings mutations (global and sandbox) to prevent read-modify-write races. Held for the duration of any setting set/delete or global policy set/delete operation. See [Gateway Settings Channel](gateway-settings.md#global-policy-lifecycle).
156157

157158
## Protocol Multiplexing
@@ -380,7 +381,7 @@ All buses use `tokio::sync::broadcast` channels keyed by sandbox ID. Buffer size
380381

381382
Broadcast lag is translated to `Status::resource_exhausted` via `broadcast_to_status()`.
382383

383-
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into both the `handle_deleted` reconciler (Kubernetes watcher) and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
384+
**Cleanup:** Each bus exposes a `remove(sandbox_id)` method that drops the broadcast sender (closing active receivers with `RecvError::Closed`) and frees internal map entries. Cleanup is wired into the compute watch reconciler, the periodic orphan sweep for stale `Provisioning` records, and the `delete_sandbox` gRPC handler to prevent unbounded memory growth from accumulated entries for deleted sandboxes.
384385

385386
**Validation:** `WatchSandbox` validates that the sandbox exists before subscribing to any bus, preventing entries from being created for non-existent IDs. `PushSandboxLogs` validates sandbox existence once on the first batch of the stream.
386387

@@ -392,7 +393,7 @@ The `ExecSandbox` RPC (`crates/openshell-server/src/grpc.rs`) executes a command
392393

393394
1. Validate request: `sandbox_id`, `command`, and environment key format (`^[A-Za-z_][A-Za-z0-9_]*$`).
394395
2. Verify sandbox exists and is in `Ready` phase.
395-
3. Resolve target: prefer agent pod IP (via `sandbox_client.agent_pod_ip()`), fall back to Kubernetes service DNS (`<name>.<namespace>.svc.cluster.local`).
396+
3. Resolve target: prefer agent pod IP, fall back to Kubernetes service DNS (`<name>.<namespace>.svc.cluster.local`). If the sandbox is not connectable yet (for example the pod exists but has no IP), the gateway returns `FAILED_PRECONDITION` instead of surfacing the condition as an internal server fault.
396397
4. Build the remote command string: sort environment variables, shell-escape all values, prepend `cd <workdir> &&` if `workdir` is set.
397398
5. **Start a single-use SSH proxy**: binds an ephemeral local TCP port, accepts one connection, performs the NSSH1 handshake with the sandbox, and bidirectionally copies data.
398399
6. **Connect via `russh`**: establishes an SSH connection through the local proxy, authenticates with `none` auth as user `sandbox`, opens a session channel, and executes the command.
@@ -499,27 +500,30 @@ The Helm chart template is at `deploy/helm/openshell/templates/statefulset.yaml`
499500

500501
### Sandbox CRD Management
501502

502-
`SandboxClient` (`crates/openshell-server/src/sandbox/mod.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs.
503+
`KubernetesComputeDriver` (`crates/openshell-driver-kubernetes/src/driver.rs`) manages `agents.x-k8s.io/v1alpha1/Sandbox` CRDs behind the gateway's compute interface.
503504

504-
- **Create**: Translates a `Sandbox` proto into a Kubernetes `DynamicObject` with labels (`openshell.ai/sandbox-id`, `openshell.ai/managed-by: openshell`) and a spec that includes the pod template, environment variables, and gateway-required env vars (`OPENSHELL_SANDBOX_ID`, `OPENSHELL_ENDPOINT`, `OPENSHELL_SSH_LISTEN_ADDR`, etc.). When callers do not provide custom `volumeClaimTemplates`, the server injects a default `workspace` PVC and mounts it at `/sandbox` so the default sandbox home/workdir survives pod rescheduling.
505+
- **Get**: `GetSandbox` looks up a sandbox CRD by name and returns a driver-native platform observation (`openshell.compute.v1.DriverSandbox`) with raw status and condition data from the object.
506+
- **List**: `ListSandboxes` enumerates sandbox CRDs and returns driver-native platform observations for each, sorted by name for stable results.
507+
- **Create**: Translates an internal `openshell.compute.v1.DriverSandbox` message into a Kubernetes `DynamicObject` with labels (`openshell.ai/sandbox-id`, `openshell.ai/managed-by: openshell`) and a spec that includes the pod template, environment variables, and gateway-required env vars (`OPENSHELL_SANDBOX_ID`, `OPENSHELL_ENDPOINT`, `OPENSHELL_SSH_LISTEN_ADDR`, etc.). When callers do not provide custom `volumeClaimTemplates`, the driver injects a default `workspace` PVC and mounts it at `/sandbox` so the default sandbox home/workdir survives pod rescheduling.
505508
- **Delete**: Calls the Kubernetes API to delete the CRD by name. Returns `false` if already gone (404).
509+
- **Stop**: `proto/compute_driver.proto` now reserves `StopSandbox` for a non-destructive lifecycle transition. Resume is intentionally not a dedicated compute-driver RPC; the gateway is expected to auto-resume a stopped sandbox when a client connects or executes into it.
506510
- **Pod IP resolution**: `agent_pod_ip()` fetches the agent pod and reads `status.podIP`.
507511

508512
### Sandbox Watcher
509513

510-
`spawn_sandbox_watcher()` (`crates/openshell-server/src/sandbox/mod.rs`) runs a Kubernetes watcher on `Sandbox` CRDs and processes three event types:
514+
The Kubernetes driver emits `WatchSandboxes` events through `proto/compute_driver.proto`. `ComputeRuntime` consumes that stream, translates the driver-native snapshots into public `openshell.v1.Sandbox` resources, derives the public phase, and applies the results to the store.
511515

512-
- **Applied**: Extracts the sandbox ID from labels (or falls back to name prefix stripping), reads the CRD status, derives the phase, and upserts the sandbox record in the store. Notifies the watch bus.
516+
- **Applied**: Extracts the sandbox ID from labels (or falls back to name prefix stripping), reads the CRD status, emits a driver-native snapshot, and lets the gateway translate that into the stored public sandbox record. Notifies the watch bus.
513517
- **Deleted**: Removes the sandbox record from the store and the index. Notifies the watch bus.
514518
- **Restarted**: Re-processes all objects (full resync).
515519

516-
### Phase Derivation
520+
### Gateway Phase Derivation
517521

518-
`derive_phase()` maps Kubernetes condition state to `SandboxPhase`:
522+
`ComputeRuntime::derive_phase()` (`crates/openshell-server/src/compute/mod.rs`) maps driver-native compute status to the public `SandboxPhase` exposed by `proto/openshell.proto`:
519523

520524
| Condition | Phase |
521525
|-----------|-------|
522-
| `deletionTimestamp` is set | `Deleting` |
526+
| Driver status `deleting=true` | `Deleting` |
523527
| Ready condition `status=True` | `Ready` |
524528
| Ready condition `status=False`, terminal reason | `Error` |
525529
| Ready condition `status=False`, transient reason | `Provisioning` |
@@ -530,7 +534,7 @@ All other `Ready=False` reasons are treated as terminal failures (`Error` phase)
530534

531535
### Kubernetes Event Tailer
532536

533-
`spawn_kube_event_tailer()` (`crates/openshell-server/src/sandbox_watch.rs`) watches all Kubernetes `Event` objects in the sandbox namespace and correlates them to sandbox IDs using `SandboxIndex`:
537+
The Kubernetes driver also watches namespace-scoped Kubernetes `Event` objects and correlates them to sandbox IDs before emitting them as compute-driver platform events:
534538

535539
- Events involving `kind: Sandbox` are correlated by sandbox name.
536540
- Events involving `kind: Pod` are correlated by agent pod name.

crates/openshell-core/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
3232
"../../proto/openshell.proto",
3333
"../../proto/datamodel.proto",
3434
"../../proto/sandbox.proto",
35+
"../../proto/compute_driver.proto",
3536
"../../proto/inference.proto",
3637
"../../proto/test.proto",
3738
];

crates/openshell-core/src/config.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,48 @@
44
//! Configuration management for OpenShell components.
55
66
use serde::{Deserialize, Serialize};
7+
use std::fmt;
78
use std::net::SocketAddr;
89
use std::path::PathBuf;
10+
use std::str::FromStr;
11+
12+
/// Compute backends the gateway can orchestrate sandboxes through.
13+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14+
#[serde(rename_all = "snake_case")]
15+
pub enum ComputeDriverKind {
16+
Kubernetes,
17+
Podman,
18+
}
19+
20+
impl ComputeDriverKind {
21+
#[must_use]
22+
pub const fn as_str(self) -> &'static str {
23+
match self {
24+
Self::Kubernetes => "kubernetes",
25+
Self::Podman => "podman",
26+
}
27+
}
28+
}
29+
30+
impl fmt::Display for ComputeDriverKind {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
f.write_str(self.as_str())
33+
}
34+
}
35+
36+
impl FromStr for ComputeDriverKind {
37+
type Err = String;
38+
39+
fn from_str(value: &str) -> Result<Self, Self::Err> {
40+
match value.trim().to_ascii_lowercase().as_str() {
41+
"kubernetes" => Ok(Self::Kubernetes),
42+
"podman" => Ok(Self::Podman),
43+
other => Err(format!(
44+
"unsupported compute driver '{other}'. expected one of: kubernetes, podman"
45+
)),
46+
}
47+
}
48+
}
949

1050
/// Server configuration.
1151
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -24,6 +64,14 @@ pub struct Config {
2464
/// Database URL for persistence.
2565
pub database_url: String,
2666

67+
/// Compute drivers configured for the gateway.
68+
///
69+
/// The config shape allows multiple drivers so the gateway can evolve
70+
/// toward multi-backend routing. Current releases require exactly one
71+
/// configured driver.
72+
#[serde(default = "default_compute_drivers")]
73+
pub compute_drivers: Vec<ComputeDriverKind>,
74+
2775
/// Kubernetes namespace for sandboxes.
2876
#[serde(default = "default_sandbox_namespace")]
2977
pub sandbox_namespace: String,
@@ -120,6 +168,7 @@ impl Config {
120168
log_level: default_log_level(),
121169
tls,
122170
database_url: String::new(),
171+
compute_drivers: default_compute_drivers(),
123172
sandbox_namespace: default_sandbox_namespace(),
124173
sandbox_image: String::new(),
125174
sandbox_image_pull_policy: String::new(),
@@ -157,6 +206,16 @@ impl Config {
157206
self
158207
}
159208

209+
/// Create a new configuration with the configured compute drivers.
210+
#[must_use]
211+
pub fn with_compute_drivers<I>(mut self, drivers: I) -> Self
212+
where
213+
I: IntoIterator<Item = ComputeDriverKind>,
214+
{
215+
self.compute_drivers = drivers.into_iter().collect();
216+
self
217+
}
218+
160219
/// Create a new configuration with a sandbox namespace.
161220
#[must_use]
162221
pub fn with_sandbox_namespace(mut self, namespace: impl Into<String>) -> Self {
@@ -261,6 +320,10 @@ fn default_sandbox_namespace() -> String {
261320
"default".to_string()
262321
}
263322

323+
fn default_compute_drivers() -> Vec<ComputeDriverKind> {
324+
vec![ComputeDriverKind::Kubernetes]
325+
}
326+
264327
fn default_ssh_gateway_host() -> String {
265328
"127.0.0.1".to_string()
266329
}
@@ -284,3 +347,34 @@ const fn default_ssh_handshake_skew_secs() -> u64 {
284347
const fn default_ssh_session_ttl_secs() -> u64 {
285348
86400 // 24 hours
286349
}
350+
351+
#[cfg(test)]
352+
mod tests {
353+
use super::{ComputeDriverKind, Config};
354+
355+
#[test]
356+
fn compute_driver_kind_parses_supported_values() {
357+
assert_eq!(
358+
"kubernetes".parse::<ComputeDriverKind>().unwrap(),
359+
ComputeDriverKind::Kubernetes
360+
);
361+
assert_eq!(
362+
"podman".parse::<ComputeDriverKind>().unwrap(),
363+
ComputeDriverKind::Podman
364+
);
365+
}
366+
367+
#[test]
368+
fn compute_driver_kind_rejects_unknown_values() {
369+
let err = "docker".parse::<ComputeDriverKind>().unwrap_err();
370+
assert!(err.contains("unsupported compute driver 'docker'"));
371+
}
372+
373+
#[test]
374+
fn config_defaults_to_kubernetes_driver() {
375+
assert_eq!(
376+
Config::new(None).compute_drivers,
377+
vec![ComputeDriverKind::Kubernetes]
378+
);
379+
}
380+
}

0 commit comments

Comments
 (0)