@@ -80,7 +80,9 @@ impl From<KubernetesDriverError> for openshell_core::ComputeDriverError {
8080const KUBE_API_TIMEOUT : Duration = Duration :: from_secs ( 30 ) ;
8181
8282const SANDBOX_GROUP : & str = "agents.x-k8s.io" ;
83- const SANDBOX_VERSION : & str = "v1alpha1" ;
83+ const SANDBOX_VERSION_V1BETA1 : & str = "v1beta1" ;
84+ const SANDBOX_VERSION_V1ALPHA1 : & str = "v1alpha1" ;
85+ const SANDBOX_VERSIONS : & [ & str ] = & [ SANDBOX_VERSION_V1BETA1 , SANDBOX_VERSION_V1ALPHA1 ] ;
8486pub const SANDBOX_KIND : & str = "Sandbox" ;
8587
8688const GPU_RESOURCE_NAME : & str = "nvidia.com/gpu" ;
@@ -256,16 +258,53 @@ impl KubernetesComputeDriver {
256258 & self . config . ssh_socket_path
257259 }
258260
259- fn watch_api ( & self ) -> Api < DynamicObject > {
260- let gvk = GroupVersionKind :: gvk ( SANDBOX_GROUP , SANDBOX_VERSION , SANDBOX_KIND ) ;
261+ fn sandbox_api_for_version ( & self , client : Client , version : & str ) -> Api < DynamicObject > {
262+ let gvk = GroupVersionKind :: gvk ( SANDBOX_GROUP , version , SANDBOX_KIND ) ;
261263 let resource = ApiResource :: from_gvk ( & gvk) ;
262- Api :: namespaced_with ( self . watch_client . clone ( ) , & self . config . namespace , & resource)
264+ Api :: namespaced_with ( client , & self . config . namespace , & resource)
263265 }
264266
265- fn api ( & self ) -> Api < DynamicObject > {
266- let gvk = GroupVersionKind :: gvk ( SANDBOX_GROUP , SANDBOX_VERSION , SANDBOX_KIND ) ;
267- let resource = ApiResource :: from_gvk ( & gvk) ;
268- Api :: namespaced_with ( self . client . clone ( ) , & self . config . namespace , & resource)
267+ fn api_for_version ( & self , version : & str ) -> Api < DynamicObject > {
268+ self . sandbox_api_for_version ( self . client . clone ( ) , version)
269+ }
270+
271+ fn watch_api_for_version ( & self , version : & str ) -> Api < DynamicObject > {
272+ self . sandbox_api_for_version ( self . watch_client . clone ( ) , version)
273+ }
274+
275+ async fn supported_api_version ( & self , watch_client : bool ) -> Result < & ' static str , String > {
276+ let client = if watch_client {
277+ self . watch_client . clone ( )
278+ } else {
279+ self . client . clone ( )
280+ } ;
281+ for version in SANDBOX_VERSIONS {
282+ let api = self . sandbox_api_for_version ( client. clone ( ) , version) ;
283+ match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. list ( & ListParams :: default ( ) . limit ( 1 ) ) )
284+ . await
285+ {
286+ Ok ( Ok ( _) ) => return Ok ( version) ,
287+ Ok ( Err ( err) ) if should_try_next_sandbox_api_version ( & err) => {
288+ debug ! (
289+ namespace = %self . config. namespace,
290+ sandbox_api_version = %version,
291+ error = %err,
292+ "Sandbox API version is not available; trying next supported version"
293+ ) ;
294+ }
295+ Ok ( Err ( err) ) => return Err ( err. to_string ( ) ) ,
296+ Err ( _elapsed) => {
297+ return Err ( format ! (
298+ "timed out after {}s waiting for Kubernetes API" ,
299+ KUBE_API_TIMEOUT . as_secs( )
300+ ) ) ;
301+ }
302+ }
303+ }
304+ Err ( format ! (
305+ "no supported Agent Sandbox API version is available; tried {}" ,
306+ SANDBOX_VERSIONS . join( ", " )
307+ ) )
269308 }
270309
271310 async fn has_gpu_capacity ( & self ) -> Result < bool , KubeError > {
@@ -306,7 +345,8 @@ impl KubernetesComputeDriver {
306345 "Fetching sandbox from Kubernetes"
307346 ) ;
308347
309- let api = self . api ( ) ;
348+ let version = self . supported_api_version ( false ) . await ?;
349+ let api = self . api_for_version ( version) ;
310350 match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. get ( name) ) . await {
311351 Ok ( Ok ( obj) ) => sandbox_from_object ( & self . config . namespace , obj) . map ( Some ) ,
312352 Ok ( Err ( KubeError :: Api ( err) ) ) if err. code == 404 => {
@@ -341,7 +381,8 @@ impl KubernetesComputeDriver {
341381 "Listing sandboxes from Kubernetes"
342382 ) ;
343383
344- let api = self . api ( ) ;
384+ let version = self . supported_api_version ( false ) . await ?;
385+ let api = self . api_for_version ( version) ;
345386 match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. list ( & ListParams :: default ( ) ) ) . await {
346387 Ok ( Ok ( list) ) => {
347388 let mut sandboxes = list
@@ -396,7 +437,11 @@ impl KubernetesComputeDriver {
396437 "Creating sandbox in Kubernetes"
397438 ) ;
398439
399- let gvk = GroupVersionKind :: gvk ( SANDBOX_GROUP , SANDBOX_VERSION , SANDBOX_KIND ) ;
440+ let version = self
441+ . supported_api_version ( false )
442+ . await
443+ . map_err ( KubernetesDriverError :: Message ) ?;
444+ let gvk = GroupVersionKind :: gvk ( SANDBOX_GROUP , version, SANDBOX_KIND ) ;
400445 let resource = ApiResource :: from_gvk ( & gvk) ;
401446 let mut obj = DynamicObject :: new ( name, & resource) ;
402447 obj. metadata = ObjectMeta {
@@ -430,7 +475,7 @@ impl KubernetesComputeDriver {
430475 . provider_spiffe_workload_api_socket_path ,
431476 } ;
432477 obj. data = sandbox_to_k8s_spec ( sandbox. spec . as_ref ( ) , & params) ;
433- let api = self . api ( ) ;
478+ let api = self . api_for_version ( version ) ;
434479
435480 match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. create ( & PostParams :: default ( ) , & obj) ) . await
436481 {
@@ -473,7 +518,8 @@ impl KubernetesComputeDriver {
473518 "Deleting sandbox from Kubernetes"
474519 ) ;
475520
476- let api = self . api ( ) ;
521+ let version = self . supported_api_version ( false ) . await ?;
522+ let api = self . api_for_version ( version) ;
477523 match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. delete ( name, & DeleteParams :: default ( ) ) )
478524 . await
479525 {
@@ -508,7 +554,8 @@ impl KubernetesComputeDriver {
508554 }
509555
510556 pub async fn sandbox_exists ( & self , name : & str ) -> Result < bool , String > {
511- let api = self . api ( ) ;
557+ let version = self . supported_api_version ( false ) . await ?;
558+ let api = self . api_for_version ( version) ;
512559 match tokio:: time:: timeout ( KUBE_API_TIMEOUT , api. get ( name) ) . await {
513560 Ok ( Ok ( _) ) => Ok ( true ) ,
514561 Ok ( Err ( KubeError :: Api ( err) ) ) if err. code == 404 => Ok ( false ) ,
@@ -524,7 +571,8 @@ impl KubernetesComputeDriver {
524571 #[ allow( clippy:: unused_async) ]
525572 pub async fn watch_sandboxes ( & self ) -> Result < WatchStream , String > {
526573 let namespace = self . config . namespace . clone ( ) ;
527- let sandbox_api = self . watch_api ( ) ;
574+ let version = self . supported_api_version ( true ) . await ?;
575+ let sandbox_api = self . watch_api_for_version ( version) ;
528576 let event_api: Api < KubeEventObj > = Api :: namespaced ( self . watch_client . clone ( ) , & namespace) ;
529577 let mut sandbox_stream = watcher:: watcher ( sandbox_api, watcher:: Config :: default ( ) ) . boxed ( ) ;
530578 let mut event_stream = watcher:: watcher ( event_api, watcher:: Config :: default ( ) ) . boxed ( ) ;
@@ -650,6 +698,14 @@ impl KubernetesComputeDriver {
650698 }
651699}
652700
701+ fn should_try_next_sandbox_api_version ( err : & KubeError ) -> bool {
702+ matches ! (
703+ err,
704+ KubeError :: Api ( api)
705+ if api. code == 404 && api. message. contains( "could not find the requested resource" )
706+ )
707+ }
708+
653709fn validate_gpu_request (
654710 gpu_requirements : Option < & GpuResourceRequirements > ,
655711) -> Result < ( ) , tonic:: Status > {
0 commit comments