Skip to content

Commit 25e4423

Browse files
committed
feat(server,sandbox): move SSH connect and exec onto supervisor session relay
Introduce a persistent supervisor-to-gateway session (ConnectSupervisor bidirectional gRPC RPC) and migrate /connect/ssh and ExecSandbox onto relay channels coordinated through it. Architecture: - gRPC control plane: carries session lifecycle (hello, heartbeat) and relay lifecycle (RelayOpen, RelayOpenResult, RelayClose) - HTTP data plane: for each relay, the supervisor opens a reverse HTTP CONNECT to /relay/{channel_id} on the gateway; the gateway bridges the client stream with the supervisor stream - The supervisor is a dumb byte bridge with no SSH/NSSH1 awareness; the gateway sends the NSSH1 preface through the relay Key changes: - Add ConnectSupervisor RPC and session/relay proto messages - Add gateway session registry (SupervisorSessionRegistry) with pending-relay map for channel correlation - Add /relay/{channel_id} HTTP CONNECT endpoint - Rewire /connect/ssh: session lookup + RelayOpen instead of direct TCP dial to sandbox:2222 - Rewire ExecSandbox: relay-based proxy instead of direct sandbox dial - Add supervisor session client with reconnect and relay bridge - Remove ResolveSandboxEndpoint from proto, gateway, and K8s driver Closes OS-86
1 parent 25d2530 commit 25e4423

27 files changed

Lines changed: 1408 additions & 694 deletions

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/openshell-cli/tests/ensure_providers_integration.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ use openshell_core::proto::open_shell_server::{OpenShell, OpenShellServer};
1111
use openshell_core::proto::{
1212
CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse,
1313
DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse,
14-
ExecSandboxEvent, ExecSandboxRequest, GetGatewayConfigRequest, GetGatewayConfigResponse,
15-
GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse,
16-
GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest,
17-
HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse,
18-
ListSandboxesRequest, ListSandboxesResponse, Provider, ProviderResponse,
19-
RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent,
20-
ServiceStatus, UpdateProviderRequest, WatchSandboxRequest,
14+
ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest,
15+
GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest,
16+
GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest,
17+
GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse,
18+
ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse,
19+
Provider, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse,
20+
SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest,
21+
WatchSandboxRequest,
2122
};
2223
use rcgen::{
2324
BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
@@ -298,6 +299,8 @@ impl OpenShell for TestOpenShell {
298299
tokio_stream::wrappers::ReceiverStream<Result<SandboxStreamEvent, Status>>;
299300
type ExecSandboxStream =
300301
tokio_stream::wrappers::ReceiverStream<Result<ExecSandboxEvent, Status>>;
302+
type ConnectSupervisorStream =
303+
tokio_stream::wrappers::ReceiverStream<Result<GatewayMessage, Status>>;
301304

302305
async fn watch_sandbox(
303306
&self,
@@ -423,6 +426,13 @@ impl OpenShell for TestOpenShell {
423426
) -> Result<Response<openshell_core::proto::GetDraftHistoryResponse>, Status> {
424427
Err(Status::unimplemented("not implemented in test"))
425428
}
429+
430+
async fn connect_supervisor(
431+
&self,
432+
_request: tonic::Request<tonic::Streaming<SupervisorMessage>>,
433+
) -> Result<Response<Self::ConnectSupervisorStream>, Status> {
434+
Err(Status::unimplemented("not implemented in test"))
435+
}
426436
}
427437

428438
// ── TLS helpers ──────────────────────────────────────────────────────

crates/openshell-cli/tests/mtls_integration.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ impl OpenShell for TestOpenShell {
200200
>;
201201
type ExecSandboxStream =
202202
tokio_stream::wrappers::ReceiverStream<Result<ExecSandboxEvent, Status>>;
203+
type ConnectSupervisorStream = tokio_stream::wrappers::ReceiverStream<
204+
Result<openshell_core::proto::GatewayMessage, Status>,
205+
>;
203206

204207
async fn watch_sandbox(
205208
&self,
@@ -325,6 +328,13 @@ impl OpenShell for TestOpenShell {
325328
) -> Result<Response<openshell_core::proto::GetDraftHistoryResponse>, Status> {
326329
Err(Status::unimplemented("not implemented in test"))
327330
}
331+
332+
async fn connect_supervisor(
333+
&self,
334+
_request: tonic::Request<tonic::Streaming<openshell_core::proto::SupervisorMessage>>,
335+
) -> Result<Response<Self::ConnectSupervisorStream>, Status> {
336+
Err(Status::unimplemented("not implemented in test"))
337+
}
328338
}
329339

330340
fn build_ca() -> (Certificate, KeyPair) {

crates/openshell-cli/tests/provider_commands_integration.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ use openshell_core::proto::open_shell_server::{OpenShell, OpenShellServer};
77
use openshell_core::proto::{
88
CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse,
99
DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse,
10-
ExecSandboxEvent, ExecSandboxRequest, GetGatewayConfigRequest, GetGatewayConfigResponse,
11-
GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse,
12-
GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest,
13-
HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse,
14-
ListSandboxesRequest, ListSandboxesResponse, Provider, ProviderResponse,
15-
RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse, SandboxStreamEvent,
16-
ServiceStatus, UpdateProviderRequest, WatchSandboxRequest,
10+
ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest,
11+
GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest,
12+
GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest,
13+
GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse,
14+
ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse,
15+
Provider, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, SandboxResponse,
16+
SandboxStreamEvent, ServiceStatus, SupervisorMessage, UpdateProviderRequest,
17+
WatchSandboxRequest,
1718
};
1819
use rcgen::{
1920
BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
@@ -252,6 +253,8 @@ impl OpenShell for TestOpenShell {
252253
tokio_stream::wrappers::ReceiverStream<Result<SandboxStreamEvent, Status>>;
253254
type ExecSandboxStream =
254255
tokio_stream::wrappers::ReceiverStream<Result<ExecSandboxEvent, Status>>;
256+
type ConnectSupervisorStream =
257+
tokio_stream::wrappers::ReceiverStream<Result<GatewayMessage, Status>>;
255258

256259
async fn watch_sandbox(
257260
&self,
@@ -377,6 +380,13 @@ impl OpenShell for TestOpenShell {
377380
) -> Result<Response<openshell_core::proto::GetDraftHistoryResponse>, Status> {
378381
Err(Status::unimplemented("not implemented in test"))
379382
}
383+
384+
async fn connect_supervisor(
385+
&self,
386+
_request: tonic::Request<tonic::Streaming<SupervisorMessage>>,
387+
) -> Result<Response<Self::ConnectSupervisorStream>, Status> {
388+
Err(Status::unimplemented("not implemented in test"))
389+
}
380390
}
381391

382392
fn install_rustls_provider() {

crates/openshell-cli/tests/sandbox_create_lifecycle_integration.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use openshell_core::proto::open_shell_server::{OpenShell, OpenShellServer};
88
use openshell_core::proto::{
99
CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse,
1010
DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse,
11-
ExecSandboxEvent, ExecSandboxRequest, GetGatewayConfigRequest, GetGatewayConfigResponse,
12-
GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse,
13-
GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest,
14-
HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse,
15-
ListSandboxesRequest, ListSandboxesResponse, PlatformEvent, ProviderResponse,
16-
RevokeSshSessionRequest, RevokeSshSessionResponse, Sandbox, SandboxPhase, SandboxResponse,
17-
SandboxStreamEvent, ServiceStatus, UpdateProviderRequest, WatchSandboxRequest,
18-
sandbox_stream_event,
11+
ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest,
12+
GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest,
13+
GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest,
14+
GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse,
15+
ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse,
16+
PlatformEvent, ProviderResponse, RevokeSshSessionRequest, RevokeSshSessionResponse, Sandbox,
17+
SandboxPhase, SandboxResponse, SandboxStreamEvent, ServiceStatus, SupervisorMessage,
18+
UpdateProviderRequest, WatchSandboxRequest, sandbox_stream_event,
1919
};
2020
use rcgen::{
2121
BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
@@ -242,6 +242,8 @@ impl OpenShell for TestOpenShell {
242242
tokio_stream::wrappers::ReceiverStream<Result<SandboxStreamEvent, Status>>;
243243
type ExecSandboxStream =
244244
tokio_stream::wrappers::ReceiverStream<Result<ExecSandboxEvent, Status>>;
245+
type ConnectSupervisorStream =
246+
tokio_stream::wrappers::ReceiverStream<Result<GatewayMessage, Status>>;
245247

246248
async fn watch_sandbox(
247249
&self,
@@ -403,6 +405,13 @@ impl OpenShell for TestOpenShell {
403405
) -> Result<Response<openshell_core::proto::GetDraftHistoryResponse>, Status> {
404406
Err(Status::unimplemented("not implemented in test"))
405407
}
408+
409+
async fn connect_supervisor(
410+
&self,
411+
_request: tonic::Request<tonic::Streaming<SupervisorMessage>>,
412+
) -> Result<Response<Self::ConnectSupervisorStream>, Status> {
413+
Err(Status::unimplemented("not implemented in test"))
414+
}
406415
}
407416

408417
fn install_rustls_provider() {

crates/openshell-cli/tests/sandbox_name_fallback_integration.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ use openshell_core::proto::open_shell_server::{OpenShell, OpenShellServer};
88
use openshell_core::proto::{
99
CreateProviderRequest, CreateSandboxRequest, CreateSshSessionRequest, CreateSshSessionResponse,
1010
DeleteProviderRequest, DeleteProviderResponse, DeleteSandboxRequest, DeleteSandboxResponse,
11-
ExecSandboxEvent, ExecSandboxRequest, GetGatewayConfigRequest, GetGatewayConfigResponse,
12-
GetProviderRequest, GetSandboxConfigRequest, GetSandboxConfigResponse,
13-
GetSandboxProviderEnvironmentRequest, GetSandboxProviderEnvironmentResponse, GetSandboxRequest,
14-
HealthRequest, HealthResponse, ListProvidersRequest, ListProvidersResponse,
15-
ListSandboxesRequest, ListSandboxesResponse, ProviderResponse, Sandbox, SandboxResponse,
16-
SandboxStreamEvent, ServiceStatus, UpdateProviderRequest, WatchSandboxRequest,
11+
ExecSandboxEvent, ExecSandboxRequest, GatewayMessage, GetGatewayConfigRequest,
12+
GetGatewayConfigResponse, GetProviderRequest, GetSandboxConfigRequest,
13+
GetSandboxConfigResponse, GetSandboxProviderEnvironmentRequest,
14+
GetSandboxProviderEnvironmentResponse, GetSandboxRequest, HealthRequest, HealthResponse,
15+
ListProvidersRequest, ListProvidersResponse, ListSandboxesRequest, ListSandboxesResponse,
16+
ProviderResponse, Sandbox, SandboxResponse, SandboxStreamEvent, ServiceStatus,
17+
SupervisorMessage, UpdateProviderRequest, WatchSandboxRequest,
1718
};
1819
use rcgen::{
1920
BasicConstraints, Certificate, CertificateParams, ExtendedKeyUsagePurpose, IsCa, KeyPair,
@@ -210,6 +211,8 @@ impl OpenShell for TestOpenShell {
210211
tokio_stream::wrappers::ReceiverStream<Result<SandboxStreamEvent, Status>>;
211212
type ExecSandboxStream =
212213
tokio_stream::wrappers::ReceiverStream<Result<ExecSandboxEvent, Status>>;
214+
type ConnectSupervisorStream =
215+
tokio_stream::wrappers::ReceiverStream<Result<GatewayMessage, Status>>;
213216

214217
async fn watch_sandbox(
215218
&self,
@@ -335,6 +338,13 @@ impl OpenShell for TestOpenShell {
335338
) -> Result<Response<openshell_core::proto::GetDraftHistoryResponse>, Status> {
336339
Err(Status::unimplemented("not implemented in test"))
337340
}
341+
342+
async fn connect_supervisor(
343+
&self,
344+
_request: tonic::Request<tonic::Streaming<SupervisorMessage>>,
345+
) -> Result<Response<Self::ConnectSupervisorStream>, Status> {
346+
Err(Status::unimplemented("not implemented in test"))
347+
}
338348
}
339349

340350
// ── helpers ───────────────────────────────────────────────────────────

crates/openshell-driver-kubernetes/src/driver.rs

Lines changed: 3 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
66
use crate::config::KubernetesComputeConfig;
77
use futures::{Stream, StreamExt, TryStreamExt};
8-
use k8s_openapi::api::core::v1::{Event as KubeEventObj, Node, Pod};
8+
use k8s_openapi::api::core::v1::{Event as KubeEventObj, Node};
99
use kube::api::{Api, ApiResource, DeleteParams, ListParams, PostParams};
1010
use kube::core::gvk::GroupVersionKind;
1111
use kube::core::{DynamicObject, ObjectMeta};
@@ -15,12 +15,10 @@ use openshell_core::proto::compute::v1::{
1515
DriverCondition as SandboxCondition, DriverPlatformEvent as PlatformEvent,
1616
DriverSandbox as Sandbox, DriverSandboxSpec as SandboxSpec,
1717
DriverSandboxStatus as SandboxStatus, DriverSandboxTemplate as SandboxTemplate,
18-
GetCapabilitiesResponse, ResolveSandboxEndpointResponse, SandboxEndpoint,
19-
WatchSandboxesDeletedEvent, WatchSandboxesEvent, WatchSandboxesPlatformEvent,
20-
WatchSandboxesSandboxEvent, sandbox_endpoint, watch_sandboxes_event,
18+
GetCapabilitiesResponse, WatchSandboxesDeletedEvent, WatchSandboxesEvent,
19+
WatchSandboxesPlatformEvent, WatchSandboxesSandboxEvent, watch_sandboxes_event,
2120
};
2221
use std::collections::BTreeMap;
23-
use std::net::IpAddr;
2422
use std::pin::Pin;
2523
use std::time::Duration;
2624
use tokio::sync::mpsc;
@@ -271,21 +269,6 @@ impl KubernetesComputeDriver {
271269
&self.config.ssh_handshake_secret
272270
}
273271

274-
async fn agent_pod_ip(&self, pod_name: &str) -> Result<Option<IpAddr>, KubeError> {
275-
let api: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
276-
match api.get(pod_name).await {
277-
Ok(pod) => {
278-
let ip = pod
279-
.status
280-
.and_then(|status| status.pod_ip)
281-
.and_then(|ip| ip.parse().ok());
282-
Ok(ip)
283-
}
284-
Err(KubeError::Api(err)) if err.code == 404 => Ok(None),
285-
Err(err) => Err(err),
286-
}
287-
}
288-
289272
pub async fn create_sandbox(&self, sandbox: &Sandbox) -> Result<(), KubernetesDriverError> {
290273
let name = sandbox.name.as_str();
291274
info!(
@@ -407,52 +390,6 @@ impl KubernetesComputeDriver {
407390
}
408391
}
409392

410-
pub async fn resolve_sandbox_endpoint(
411-
&self,
412-
sandbox: &Sandbox,
413-
) -> Result<ResolveSandboxEndpointResponse, KubernetesDriverError> {
414-
if let Some(status) = sandbox.status.as_ref()
415-
&& !status.instance_id.is_empty()
416-
{
417-
match self.agent_pod_ip(&status.instance_id).await {
418-
Ok(Some(ip)) => {
419-
return Ok(ResolveSandboxEndpointResponse {
420-
endpoint: Some(SandboxEndpoint {
421-
target: Some(sandbox_endpoint::Target::Ip(ip.to_string())),
422-
port: u32::from(self.config.ssh_port),
423-
}),
424-
});
425-
}
426-
Ok(None) => {
427-
return Err(KubernetesDriverError::Precondition(
428-
"sandbox agent pod IP is not available".to_string(),
429-
));
430-
}
431-
Err(err) => {
432-
return Err(KubernetesDriverError::Message(format!(
433-
"failed to resolve agent pod IP: {err}"
434-
)));
435-
}
436-
}
437-
}
438-
439-
if sandbox.name.is_empty() {
440-
return Err(KubernetesDriverError::Precondition(
441-
"sandbox has no name".to_string(),
442-
));
443-
}
444-
445-
Ok(ResolveSandboxEndpointResponse {
446-
endpoint: Some(SandboxEndpoint {
447-
target: Some(sandbox_endpoint::Target::Host(format!(
448-
"{}.{}.svc.cluster.local",
449-
sandbox.name, self.config.namespace
450-
))),
451-
port: u32::from(self.config.ssh_port),
452-
}),
453-
})
454-
}
455-
456393
pub async fn watch_sandboxes(&self) -> Result<WatchStream, String> {
457394
let namespace = self.config.namespace.clone();
458395
let sandbox_api = self.api();

crates/openshell-driver-kubernetes/src/grpc.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ use futures::{Stream, StreamExt};
55
use openshell_core::proto::compute::v1::{
66
CreateSandboxRequest, CreateSandboxResponse, DeleteSandboxRequest, DeleteSandboxResponse,
77
GetCapabilitiesRequest, GetCapabilitiesResponse, GetSandboxRequest, GetSandboxResponse,
8-
ListSandboxesRequest, ListSandboxesResponse, ResolveSandboxEndpointRequest,
9-
ResolveSandboxEndpointResponse, StopSandboxRequest, StopSandboxResponse,
8+
ListSandboxesRequest, ListSandboxesResponse, StopSandboxRequest, StopSandboxResponse,
109
ValidateSandboxCreateRequest, ValidateSandboxCreateResponse, WatchSandboxesEvent,
1110
WatchSandboxesRequest, compute_driver_server::ComputeDriver,
1211
};
@@ -128,21 +127,6 @@ impl ComputeDriver for ComputeDriverService {
128127
Ok(Response::new(DeleteSandboxResponse { deleted }))
129128
}
130129

131-
async fn resolve_sandbox_endpoint(
132-
&self,
133-
request: Request<ResolveSandboxEndpointRequest>,
134-
) -> Result<Response<ResolveSandboxEndpointResponse>, Status> {
135-
let sandbox = request
136-
.into_inner()
137-
.sandbox
138-
.ok_or_else(|| Status::invalid_argument("sandbox is required"))?;
139-
self.driver
140-
.resolve_sandbox_endpoint(&sandbox)
141-
.await
142-
.map(Response::new)
143-
.map_err(status_from_driver_error)
144-
}
145-
146130
type WatchSandboxesStream =
147131
Pin<Box<dyn Stream<Item = Result<WatchSandboxesEvent, Status>> + Send + 'static>>;
148132

crates/openshell-sandbox/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,15 @@ rcgen = { workspace = true }
5151
webpki-roots = { workspace = true }
5252

5353
# HTTP
54+
hyper = { workspace = true }
55+
hyper-util = { workspace = true }
56+
http = "1"
57+
http-body-util = "0.1"
5458
bytes = { workspace = true }
5559

60+
# UUID
61+
uuid = { workspace = true }
62+
5663
# Encoding
5764
base64 = { workspace = true }
5865

crates/openshell-sandbox/src/grpc_client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ async fn connect_channel(endpoint: &str) -> Result<Channel> {
7474
.wrap_err("failed to connect to OpenShell server")
7575
}
7676

77+
/// Create a channel to the OpenShell server (public for use by supervisor_session).
78+
pub async fn connect_channel_pub(endpoint: &str) -> Result<Channel> {
79+
connect_channel(endpoint).await
80+
}
81+
7782
/// Connect to the OpenShell server (mTLS or plaintext based on endpoint scheme).
7883
async fn connect(endpoint: &str) -> Result<OpenShellClient<Channel>> {
7984
let channel = connect_channel(endpoint).await?;

0 commit comments

Comments
 (0)