diff --git a/apiserver/pkg/server/ray_job_submission_service_server.go b/apiserver/pkg/server/ray_job_submission_service_server.go index 4fdead1e50f..596b991821e 100644 --- a/apiserver/pkg/server/ray_job_submission_service_server.go +++ b/apiserver/pkg/server/ray_job_submission_service_server.go @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct { // Create RayJobSubmissionServiceServer func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer { zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel) - return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)} + return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, false)} } // Submit Ray job diff --git a/go.mod b/go.mod index d5dab3bca19..1ce8746cd50 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/go.sum b/go.sum index afff157cfd8..2eb06da7d9e 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jarcoal/httpmock v1.4.0 h1:BvhqnH0JAYbNudL2GMJKgOHe2CtKlzJ/5rWKyp+hc2k= diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 910ec0d11ab..55e1269c6a6 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -86,12 +86,20 @@ type Configuration struct { // EnableMetrics indicates whether KubeRay operator should emit control plane metrics. EnableMetrics bool `json:"enableMetrics,omitempty"` + + // UseBackgroundGoroutine indicates that it wil use goroutine to fetch the job info from ray dashboard and + // store the job info in the cache + UseBackgroundGoroutine bool `json:"useBackgroundGoroutine,omitempty"` } func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy) + return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, config.UseBackgroundGoroutine) } func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface { return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy) } + +func (config Configuration) DoesUseBackgroundGoroutine() bool { + return config.UseBackgroundGoroutine +} diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index 5da57de6491..1d05e625bdc 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -2,6 +2,7 @@ package ray import ( "context" + errs "errors" "fmt" "os" "strconv" @@ -42,11 +43,11 @@ const ( // RayJobReconciler reconciles a RayJob object type RayJobReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - - dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) - options RayJobReconcilerOptions + Recorder record.EventRecorder + options RayJobReconcilerOptions + Scheme *runtime.Scheme + dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) + useBackgroundGoroutine bool } type RayJobReconcilerOptions struct { @@ -58,11 +59,12 @@ type RayJobReconcilerOptions struct { func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler { dashboardClientFunc := provider.GetDashboardClient(mgr) return &RayJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - dashboardClientFunc: dashboardClientFunc, - options: options, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("rayjob-controller"), + dashboardClientFunc: dashboardClientFunc, + useBackgroundGoroutine: provider.DoesUseBackgroundGoroutine(), + options: options, } } @@ -284,6 +286,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId) if err != nil { + if errs.Is(err, dashboardclient.ErrAgain) { + logger.Info("The Ray job Info was not ready. Try again next iteration.", "JobId", rayJobInstance.Status.JobId) + return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil + } // If the Ray job was not found, GetJobInfo returns a BadRequest error. if errors.IsBadRequest(err) { if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode { @@ -753,6 +759,16 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns if !cluster.DeletionTimestamp.IsZero() { logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayCluster", cluster.Name) } else { + if r.useBackgroundGoroutine { + rayDashboardClient, err := r.dashboardClientFunc(&cluster, rayJobInstance.Status.DashboardURL) + if err != nil { + logger.Error(err, "Failed to get dashboard client for RayJob") + } + // clear cache, and it will remove this job from the cache updating loop. + if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil { + logger.Error(err, "Failed to stop job for RayJob") + } + } if err := r.Delete(ctx, &cluster); err != nil { r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete cluster %s/%s: %v", cluster.Namespace, cluster.Name, err) return false, err diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index 85c913e7bd6..e2ab5e16196 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -64,6 +64,10 @@ func (testProvider TestClientProvider) GetHttpProxyClient(_ manager.Manager) fun } } +func (testProvider TestClientProvider) DoesUseBackgroundGoroutine() bool { + return false +} + func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go new file mode 100644 index 00000000000..38deb5825b1 --- /dev/null +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_cache_client.go @@ -0,0 +1,188 @@ +package dashboardclient + +import ( + "context" + "errors" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types" +) + +var ErrAgain = errors.New("EAGAIN") + +const ( + // TODO: make queue size and worker size configurable. + taskQueueSize = 128 + workerSize = 8 + + queryInterval = 3 * time.Second + + // TODO: consider a proper size for accommodating the all live job info + cacheSize = 10000 + cacheExpiry = 10 * time.Minute +) + +var ( + // singleton + initWorkPool sync.Once + pool workerPool + + // singleton + initCacheStorage sync.Once + cacheStorage *lru.Cache[string, *JobInfoCache] +) + +type ( + Task func() bool + JobInfoCache struct { + JobInfo *utiltypes.RayJobInfo + Err error + UpdateAt *time.Time + } + + workerPool struct { + taskQueue chan Task + } +) + +func (w *workerPool) init(taskQueueSize int, workerSize int, queryInterval time.Duration) { + w.taskQueue = make(chan Task, taskQueueSize) + + for i := 0; i < workerSize; i++ { + // TODO: observability for these goroutine + // TODO: should we consider the stop ? + go func() { + for task := range w.taskQueue { + again := task() + + if again { + time.AfterFunc(queryInterval, func() { + w.taskQueue <- task + }) + } + } + }() + } +} + +func (w *workerPool) PutTask(task Task) { + w.taskQueue <- task +} + +var _ RayDashboardClientInterface = (*RayDashboardCacheClient)(nil) + +type RayDashboardCacheClient struct { + client RayDashboardClientInterface +} + +func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) { + initWorkPool.Do(func() { + pool.init(taskQueueSize, workerSize, queryInterval) + }) + + initCacheStorage.Do(func() { + if cacheStorage == nil { + // the New() returns error only if the size is less or equal than zero. + cacheStorage, _ = lru.New[string, *JobInfoCache](cacheSize) + } + + // expiry cache cleanup + go func() { + ticker := time.NewTicker(queryInterval * 10) + defer ticker.Stop() + + // TODO: observability ? + // TODO: should we consider the stop ? + for range ticker.C { + keys := cacheStorage.Keys() + expiredThreshold := time.Now().Add(-cacheExpiry) + for _, key := range keys { + if cached, ok := cacheStorage.Peek(key); ok { + if cached.UpdateAt.Before(expiredThreshold) { + cacheStorage.Remove(key) + } + } + } + } + }() + }) + + r.client = client +} + +func (r *RayDashboardCacheClient) UpdateDeployments(ctx context.Context, configJson []byte) error { + return r.client.UpdateDeployments(ctx, configJson) +} + +func (r *RayDashboardCacheClient) GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) { + return r.client.GetServeDetails(ctx) +} + +func (r *RayDashboardCacheClient) GetMultiApplicationStatus(ctx context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) { + return r.client.GetMultiApplicationStatus(ctx) +} + +func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) { + if cached, ok := cacheStorage.Get(jobId); ok { + return cached.JobInfo, cached.Err + } + currentTime := time.Now() + placeholder := &JobInfoCache{Err: ErrAgain, UpdateAt: ¤tTime} + + // Put a placeholder in storage. The cache will be updated only if the placeholder exists. + // The placeholder will be removed when StopJob or DeleteJob. + if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, placeholder); existed { + return cached.JobInfo, cached.Err + } + + task := func() bool { + jobInfoCache, existed := cacheStorage.Get(jobId) + if !existed { + return false + } + + jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId) + currentTime := time.Now() + jobInfoCache.UpdateAt = ¤tTime + + if _, existed := cacheStorage.ContainsOrAdd(jobId, jobInfoCache); !existed { + return false + } + + return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus) + } + + pool.PutTask(task) + + return nil, ErrAgain +} + +func (r *RayDashboardCacheClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) { + return r.client.ListJobs(ctx) +} + +func (r *RayDashboardCacheClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) { + return r.client.SubmitJob(ctx, rayJob) +} + +func (r *RayDashboardCacheClient) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) { + return r.client.SubmitJobReq(ctx, request) +} + +func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) (*string, error) { + return r.client.GetJobLog(ctx, jobName) +} + +func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error { + cacheStorage.Remove(jobName) + return r.client.StopJob(ctx, jobName) +} + +func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error { + cacheStorage.Remove(jobName) + return r.client.DeleteJob(ctx, jobName) +} diff --git a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go index d360ebc0af9..570ce082243 100644 --- a/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboardclient/dashboard_httpclient.go @@ -27,7 +27,6 @@ var ( ) type RayDashboardClientInterface interface { - InitClient(client *http.Client, dashboardURL string) UpdateDeployments(ctx context.Context, configJson []byte) error // V2/multi-app Rest API GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 540162bf9f4..31a93664b64 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -653,6 +653,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool) type ClientProvider interface { GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface + DoesUseBackgroundGoroutine() bool } func ManagedByExternalController(controllerName *string) *string { @@ -877,7 +878,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray return headServiceURL, nil } -func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { +func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, useBackgroundGoroutine bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) { dashboardClient := &dashboardclient.RayDashboardClient{} if useKubernetesProxy { @@ -897,12 +898,23 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun mgr.GetHTTPClient(), fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName), ) + if useBackgroundGoroutine { + dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} + dashboardCachedClient.InitClient(dashboardClient) + return dashboardCachedClient, nil + } return dashboardClient, nil } dashboardClient.InitClient(&http.Client{ Timeout: 2 * time.Second, }, "http://"+url) + + if useBackgroundGoroutine { + dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{} + dashboardCachedClient.InitClient(dashboardClient) + return dashboardCachedClient, nil + } return dashboardClient, nil } } diff --git a/ray-operator/go.mod b/ray-operator/go.mod index 907c9fd9e8b..cbe6d63098b 100644 --- a/ray-operator/go.mod +++ b/ray-operator/go.mod @@ -7,6 +7,7 @@ require ( github.com/coder/websocket v1.8.13 github.com/go-logr/logr v1.4.3 github.com/go-logr/zapr v1.3.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jarcoal/httpmock v1.4.0 github.com/onsi/ginkgo/v2 v2.23.4 github.com/onsi/gomega v1.37.0 diff --git a/ray-operator/go.sum b/ray-operator/go.sum index 26b72b129ae..d9a20efaf4e 100644 --- a/ray-operator/go.sum +++ b/ray-operator/go.sum @@ -53,6 +53,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jarcoal/httpmock v1.4.0 h1:BvhqnH0JAYbNudL2GMJKgOHe2CtKlzJ/5rWKyp+hc2k= github.com/jarcoal/httpmock v1.4.0/go.mod h1:ftW1xULwo+j0R0JJkJIIi7UKigZUXCLLanykgjwBXL0= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= diff --git a/ray-operator/main.go b/ray-operator/main.go index ceba7d4772e..feb675e717f 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -73,6 +73,7 @@ func main() { var enableMetrics bool var qps float64 var burst int + var useBackgroundGoroutine bool // TODO: remove flag-based config once Configuration API graduates to v1. flag.StringVar(&metricsAddr, "metrics-addr", configapi.DefaultMetricsAddr, "The address the metric endpoint binds to.") @@ -106,6 +107,7 @@ func main() { flag.BoolVar(&enableMetrics, "enable-metrics", false, "Enable the emission of control plane metrics.") flag.Float64Var(&qps, "qps", float64(configapi.DefaultQPS), "The QPS value for the client communicating with the Kubernetes API server.") flag.IntVar(&burst, "burst", configapi.DefaultBurst, "The maximum burst for throttling requests from this client to the Kubernetes API server.") + flag.BoolVar(&useBackgroundGoroutine, "use-background-goroutine", false, "Enable the background goroutine for fetching job info in RayJob.") opts := k8szap.Options{ TimeEncoder: zapcore.ISO8601TimeEncoder, @@ -138,6 +140,7 @@ func main() { config.EnableMetrics = enableMetrics config.QPS = &qps config.Burst = &burst + config.UseBackgroundGoroutine = useBackgroundGoroutine } stdoutEncoder, err := newLogEncoder(logStdoutEncoder) diff --git a/ray-operator/test/sampleyaml/support.go b/ray-operator/test/sampleyaml/support.go index 42ffd19e0a2..f470295001b 100644 --- a/ray-operator/test/sampleyaml/support.go +++ b/ray-operator/test/sampleyaml/support.go @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg g.Expect(err).ToNot(HaveOccurred()) url := fmt.Sprintf("127.0.0.1:%d", localPort) - rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false) + rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, false) rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url) g.Expect(err).ToNot(HaveOccurred()) serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())