Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/branch-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,22 @@ jobs:
kubernetes-e2e:
needs: [pr_metadata, build-gateway, build-supervisor]
if: needs.pr_metadata.outputs.should_run == 'true' && needs.pr_metadata.outputs.run_core_e2e == 'true'
strategy:
fail-fast: false
matrix:
include:
- agent_sandbox_api: v1beta1
agent_sandbox_version: v0.5.0
- agent_sandbox_api: v1alpha1
agent_sandbox_version: v0.4.6
permissions:
contents: read
packages: read
uses: ./.github/workflows/e2e-kubernetes-test.yml
with:
image-tag: ${{ github.sha }}
job-name: Kubernetes E2E (Rust smoke, Agent Sandbox ${{ matrix.agent_sandbox_api }})
agent-sandbox-version: ${{ matrix.agent_sandbox_version }}

kubernetes-ha-e2e:
needs: [pr_metadata, build-gateway, build-supervisor]
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/e2e-kubernetes-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ on:
required: false
type: string
default: ""
agent-sandbox-version:
description: "Agent Sandbox release to install before OpenShell"
required: false
type: string
default: "v0.5.0"
mise-version:
description: "mise version to install on the bare Kubernetes e2e runner"
required: false
Expand Down Expand Up @@ -114,6 +119,7 @@ jobs:

- name: Run Kubernetes E2E (Rust smoke)
env:
AGENT_SANDBOX_VERSION: ${{ inputs.agent-sandbox-version }}
OPENSHELL_E2E_KUBE_CONTEXT: kind-${{ env.KIND_CLUSTER_NAME }}
OPENSHELL_E2E_KUBE_EXTRA_VALUES: ${{ inputs.extra-helm-values }}
OPENSHELL_E2E_KUBE_EXTERNAL_POSTGRES_SECRET: ${{ inputs.external-postgres-secret }}
Expand Down
11 changes: 7 additions & 4 deletions architecture/gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ token through `IssueSandboxToken`. The gateway validates that projected token
with Kubernetes `TokenReview`, requires the configured sandbox service account,
checks the returned pod binding against the live pod UID, and verifies the pod's
controlling `Sandbox` ownerReference against the live Sandbox CR UID and
sandbox-id label before minting the gateway JWT. Supervisors renew gateway JWTs
in memory before expiry only while the sandbox record still exists. Older tokens
are not server-revoked; shared deployments bound replay exposure with short
`gateway_jwt.ttl_secs` lifetimes. The config default is
sandbox-id label before minting the gateway JWT. The bootstrap path accepts
both `agents.x-k8s.io/v1beta1` ownerReferences from newer Agent Sandbox
controllers and `agents.x-k8s.io/v1alpha1` ownerReferences from existing
deployments. Supervisors renew gateway JWTs in memory before expiry only while
the sandbox record still exists. Older tokens are not server-revoked; shared
deployments bound replay exposure with short `gateway_jwt.ttl_secs` lifetimes.
The config default is
`gateway_jwt.ttl_secs = 0` for local single-player Docker, Podman, and VM
gateways; those tokens carry `exp = 0` and do not expire. Kubernetes and other
shared deployments should set a positive TTL.
Expand Down
10 changes: 7 additions & 3 deletions crates/openshell-driver-kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ credential injection, policy polling, logs, and the gateway relay.

## Sandbox Resource

The driver works with the `agents.x-k8s.io/v1alpha1` `Sandbox` custom resource.
Driver events map Kubernetes object state and platform events into the shared
compute-driver protobuf surface used by the gateway.
The driver works with the `agents.x-k8s.io` `Sandbox` custom resource. It
detects the served Sandbox API at runtime, caches the selected API version for
the gateway process, and uses `v1beta1` when available before falling back to
`v1alpha1`. Restart the gateway after an in-place Agent Sandbox upgrade so the
driver can detect served API versions again. Driver events map Kubernetes object
state and platform events into the shared compute-driver protobuf surface used
by the gateway.

Kubernetes API calls use explicit timeouts so gRPC handlers do not block
indefinitely when the API server is slow or unavailable.
Expand Down
173 changes: 147 additions & 26 deletions crates/openshell-driver-kubernetes/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use openshell_core::proto_struct::{struct_to_json_object, value_to_json};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::{OnceCell, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -80,12 +81,19 @@ impl From<KubernetesDriverError> for openshell_core::ComputeDriverError {
const KUBE_API_TIMEOUT: Duration = Duration::from_secs(30);

const SANDBOX_GROUP: &str = "agents.x-k8s.io";
const SANDBOX_VERSION: &str = "v1alpha1";
const SANDBOX_VERSION_V1BETA1: &str = "v1beta1";
const SANDBOX_VERSION_V1ALPHA1: &str = "v1alpha1";
const SANDBOX_VERSIONS: &[&str] = &[SANDBOX_VERSION_V1BETA1, SANDBOX_VERSION_V1ALPHA1];
pub const SANDBOX_KIND: &str = "Sandbox";

const GPU_RESOURCE_NAME: &str = "nvidia.com/gpu";
const SPIFFE_WORKLOAD_API_VOLUME_NAME: &str = "spiffe-workload-api";

struct AgentSandboxApi {
api: Api<DynamicObject>,
resource: ApiResource,
}

// This POC treats the selected Struct as a driver-local typed schema. Once the
// Kubernetes shape stabilizes, these serde structs may move to driver-local
// protobuf definitions, but the typed decode should stay inside this driver.
Expand Down Expand Up @@ -190,6 +198,7 @@ const WORKSPACE_SENTINEL: &str = ".workspace-initialized";
pub struct KubernetesComputeDriver {
client: Client,
watch_client: Client,
sandbox_api_version: Arc<OnceCell<&'static str>>,
config: KubernetesComputeConfig,
}

Expand Down Expand Up @@ -232,6 +241,7 @@ impl KubernetesComputeDriver {
Ok(Self {
client,
watch_client,
sandbox_api_version: Arc::new(OnceCell::new()),
config,
})
}
Expand All @@ -256,16 +266,68 @@ impl KubernetesComputeDriver {
&self.config.ssh_socket_path
}

fn watch_api(&self) -> Api<DynamicObject> {
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
fn agent_sandbox_api(&self, client: Client, sandbox_api_version: &str) -> AgentSandboxApi {
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, sandbox_api_version, SANDBOX_KIND);
let resource = ApiResource::from_gvk(&gvk);
Api::namespaced_with(self.watch_client.clone(), &self.config.namespace, &resource)
let api = Api::namespaced_with(client, &self.config.namespace, &resource);
AgentSandboxApi { api, resource }
}

fn api(&self) -> Api<DynamicObject> {
let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
let resource = ApiResource::from_gvk(&gvk);
Api::namespaced_with(self.client.clone(), &self.config.namespace, &resource)
async fn supported_agent_sandbox_api(&self, client: Client) -> Result<AgentSandboxApi, String> {
let sandbox_api_version = self.supported_sandbox_api_version(client.clone()).await?;
Ok(self.agent_sandbox_api(client, sandbox_api_version))
}

async fn supported_sandbox_api_version(&self, client: Client) -> Result<&'static str, String> {
self.sandbox_api_version
.get_or_try_init(
|| async move { self.detect_supported_sandbox_api_version(client).await },
)
.await
.copied()
}

async fn detect_supported_sandbox_api_version(
&self,
client: Client,
) -> Result<&'static str, String> {
for sandbox_api_version in SANDBOX_VERSIONS {
let agent_sandbox_api = self.agent_sandbox_api(client.clone(), sandbox_api_version);
match tokio::time::timeout(
KUBE_API_TIMEOUT,
agent_sandbox_api.api.list(&ListParams::default().limit(1)),
)
.await
{
Ok(Ok(_)) => {
debug!(
namespace = %self.config.namespace,
sandbox_api_version = %sandbox_api_version,
"Selected Agent Sandbox API version"
);
return Ok(sandbox_api_version);
}
Ok(Err(err)) if should_try_next_sandbox_api_version(&err) => {
debug!(
namespace = %self.config.namespace,
sandbox_api_version = %sandbox_api_version,
error = %err,
"Sandbox API version is not available; trying next supported version"
);
}
Ok(Err(err)) => return Err(err.to_string()),
Err(_elapsed) => {
return Err(format!(
"timed out after {}s waiting for Kubernetes API",
KUBE_API_TIMEOUT.as_secs()
));
}
}
}
Err(format!(
"no supported Agent Sandbox API version is available; tried {}",
SANDBOX_VERSIONS.join(", ")
))
}

async fn has_gpu_capacity(&self) -> Result<bool, KubeError> {
Expand Down Expand Up @@ -306,8 +368,10 @@ impl KubernetesComputeDriver {
"Fetching sandbox from Kubernetes"
);

let api = self.api();
match tokio::time::timeout(KUBE_API_TIMEOUT, api.get(name)).await {
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.client.clone())
.await?;
match tokio::time::timeout(KUBE_API_TIMEOUT, agent_sandbox_api.api.get(name)).await {
Ok(Ok(obj)) => sandbox_from_object(&self.config.namespace, obj).map(Some),
Ok(Err(KubeError::Api(err))) if err.code == 404 => {
debug!(sandbox_name = %name, "Sandbox not found in Kubernetes");
Expand Down Expand Up @@ -341,8 +405,15 @@ impl KubernetesComputeDriver {
"Listing sandboxes from Kubernetes"
);

let api = self.api();
match tokio::time::timeout(KUBE_API_TIMEOUT, api.list(&ListParams::default())).await {
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.client.clone())
.await?;
match tokio::time::timeout(
KUBE_API_TIMEOUT,
agent_sandbox_api.api.list(&ListParams::default()),
)
.await
{
Ok(Ok(list)) => {
let mut sandboxes = list
.items
Expand Down Expand Up @@ -396,9 +467,11 @@ impl KubernetesComputeDriver {
"Creating sandbox in Kubernetes"
);

let gvk = GroupVersionKind::gvk(SANDBOX_GROUP, SANDBOX_VERSION, SANDBOX_KIND);
let resource = ApiResource::from_gvk(&gvk);
let mut obj = DynamicObject::new(name, &resource);
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.client.clone())
.await
.map_err(KubernetesDriverError::Message)?;
let mut obj = DynamicObject::new(name, &agent_sandbox_api.resource);
obj.metadata = ObjectMeta {
name: Some(name.to_string()),
namespace: Some(self.config.namespace.clone()),
Expand Down Expand Up @@ -430,9 +503,11 @@ impl KubernetesComputeDriver {
.provider_spiffe_workload_api_socket_path,
};
obj.data = sandbox_to_k8s_spec(sandbox.spec.as_ref(), &params);
let api = self.api();

match tokio::time::timeout(KUBE_API_TIMEOUT, api.create(&PostParams::default(), &obj)).await
match tokio::time::timeout(
KUBE_API_TIMEOUT,
agent_sandbox_api.api.create(&PostParams::default(), &obj),
)
.await
{
Ok(Ok(_result)) => {
info!(
Expand Down Expand Up @@ -473,9 +548,14 @@ impl KubernetesComputeDriver {
"Deleting sandbox from Kubernetes"
);

let api = self.api();
match tokio::time::timeout(KUBE_API_TIMEOUT, api.delete(name, &DeleteParams::default()))
.await
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.client.clone())
.await?;
match tokio::time::timeout(
KUBE_API_TIMEOUT,
agent_sandbox_api.api.delete(name, &DeleteParams::default()),
)
.await
{
Ok(Ok(_response)) => {
info!(sandbox_name = %name, "Sandbox deleted from Kubernetes");
Expand Down Expand Up @@ -508,8 +588,10 @@ impl KubernetesComputeDriver {
}

pub async fn sandbox_exists(&self, name: &str) -> Result<bool, String> {
let api = self.api();
match tokio::time::timeout(KUBE_API_TIMEOUT, api.get(name)).await {
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.client.clone())
.await?;
match tokio::time::timeout(KUBE_API_TIMEOUT, agent_sandbox_api.api.get(name)).await {
Ok(Ok(_)) => Ok(true),
Ok(Err(KubeError::Api(err))) if err.code == 404 => Ok(false),
Ok(Err(err)) => Err(err.to_string()),
Expand All @@ -524,9 +606,12 @@ impl KubernetesComputeDriver {
#[allow(clippy::unused_async)]
pub async fn watch_sandboxes(&self) -> Result<WatchStream, String> {
let namespace = self.config.namespace.clone();
let sandbox_api = self.watch_api();
let agent_sandbox_api = self
.supported_agent_sandbox_api(self.watch_client.clone())
.await?;
let event_api: Api<KubeEventObj> = Api::namespaced(self.watch_client.clone(), &namespace);
let mut sandbox_stream = watcher::watcher(sandbox_api, watcher::Config::default()).boxed();
let mut sandbox_stream =
watcher::watcher(agent_sandbox_api.api, watcher::Config::default()).boxed();
let mut event_stream = watcher::watcher(event_api, watcher::Config::default()).boxed();
let (tx, rx) = mpsc::channel(256);

Expand Down Expand Up @@ -650,6 +735,14 @@ impl KubernetesComputeDriver {
}
}

fn should_try_next_sandbox_api_version(err: &KubeError) -> bool {
// Kubernetes returns a structured 404 for some missing API resources and a
// raw "404 page not found" body for others. Both mean the probed
// group/version is unavailable and the next supported Sandbox API version
// should be tried.
matches!(err, KubeError::Api(api) if api.code == 404)
}

fn validate_gpu_request(
gpu_requirements: Option<&GpuResourceRequirements>,
) -> Result<(), tonic::Status> {
Expand Down Expand Up @@ -2013,6 +2106,34 @@ mod tests {
}
}

fn kube_api_error(code: u16, message: &str) -> KubeError {
KubeError::Api(kube::core::ErrorResponse {
status: if code == 404 {
"404 Not Found".to_string()
} else {
"Failure".to_string()
},
message: message.to_string(),
reason: "Failed to parse error data".to_string(),
code,
})
}

#[test]
fn sandbox_api_version_probe_retries_on_structured_and_raw_404() {
let structured = kube_api_error(404, "could not find the requested resource");
assert!(should_try_next_sandbox_api_version(&structured));

let raw = kube_api_error(404, "404 page not found\n");
assert!(should_try_next_sandbox_api_version(&raw));
}

#[test]
fn sandbox_api_version_probe_keeps_non_404_errors() {
let err = kube_api_error(403, "sandboxes.agents.x-k8s.io is forbidden");
assert!(!should_try_next_sandbox_api_version(&err));
}

#[test]
fn driver_config_rejects_invalid_shape() {
let template = SandboxTemplate {
Expand Down
Loading
Loading