Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct {
// Create RayJobSubmissionServiceServer
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, false)}
}

// Submit Ray job
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,20 @@ type Configuration struct {

// EnableMetrics indicates whether KubeRay operator should emit control plane metrics.
EnableMetrics bool `json:"enableMetrics,omitempty"`

// UseBackgroundGoroutine indicates that it wil use goroutine to fetch the job info from ray dashboard and
// store the job info in the cache
UseBackgroundGoroutine bool `json:"useBackgroundGoroutine,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, config.UseBackgroundGoroutine)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy)
}

func (config Configuration) DoesUseBackgroundGoroutine() bool {
return config.UseBackgroundGoroutine
}
36 changes: 26 additions & 10 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ray

import (
"context"
errs "errors"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -42,11 +43,11 @@ const (
// RayJobReconciler reconciles a RayJob object
type RayJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
Recorder record.EventRecorder
options RayJobReconcilerOptions
Scheme *runtime.Scheme
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
useBackgroundGoroutine bool
}

type RayJobReconcilerOptions struct {
Expand All @@ -58,11 +59,12 @@ type RayJobReconcilerOptions struct {
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler {
dashboardClientFunc := provider.GetDashboardClient(mgr)
return &RayJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
options: options,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
useBackgroundGoroutine: provider.DoesUseBackgroundGoroutine(),
options: options,
}
}

Expand Down Expand Up @@ -284,6 +286,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
if errs.Is(err, dashboardclient.ErrAgain) {
logger.Info("The Ray job Info was not ready. Try again next iteration.", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
if errors.IsBadRequest(err) {
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
Expand Down Expand Up @@ -753,6 +759,16 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
if !cluster.DeletionTimestamp.IsZero() {
logger.Info("The deletion of the associated RayCluster for RayJob is ongoing.", "RayCluster", cluster.Name)
} else {
if r.useBackgroundGoroutine {
rayDashboardClient, err := r.dashboardClientFunc(&cluster, rayJobInstance.Status.DashboardURL)
if err != nil {
logger.Error(err, "Failed to get dashboard client for RayJob")
}
// clear cache, and it will remove this job from the cache updating loop.
if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil {
logger.Error(err, "Failed to stop job for RayJob")
}
Comment on lines +768 to +770
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to call StopJob to remove the cache placeholder before deleting the RayCluster because the status of retry calls deleteClusterResources?

}
if err := r.Delete(ctx, &cluster); err != nil {
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), "Failed to delete cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
return false, err
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (testProvider TestClientProvider) GetHttpProxyClient(_ manager.Manager) fun
}
}

func (testProvider TestClientProvider) DoesUseBackgroundGoroutine() bool {
return false
}

func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package dashboardclient

import (
"context"
"errors"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

var ErrAgain = errors.New("EAGAIN")

const (
// TODO: make queue size and worker size configurable.
taskQueueSize = 128
workerSize = 8

queryInterval = 3 * time.Second

// TODO: consider a proper size for accommodating the all live job info
cacheSize = 10000
cacheExpiry = 10 * time.Minute
)

var (
// singleton
initWorkPool sync.Once
pool workerPool

// singleton
initCacheStorage sync.Once
cacheStorage *lru.Cache[string, *JobInfoCache]
)

type (
Task func() bool
JobInfoCache struct {
JobInfo *utiltypes.RayJobInfo
Err error
UpdateAt *time.Time
}

workerPool struct {
taskQueue chan Task
}
)

func (w *workerPool) init(taskQueueSize int, workerSize int, queryInterval time.Duration) {
w.taskQueue = make(chan Task, taskQueueSize)

for i := 0; i < workerSize; i++ {
// TODO: observability for these goroutine
// TODO: should we consider the stop ?
Copy link
Collaborator Author

@fscnick fscnick Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consider the stop? The goroutine fetches the JobInfo and keeps the copy of JobInfo in the memory not updating something. Once the program exits, all the copy are gone.

go func() {
for task := range w.taskQueue {
again := task()

if again {
time.AfterFunc(queryInterval, func() {
w.taskQueue <- task
})
Comment on lines +63 to +65
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't block the current goroutine.

}
}
}()
}
}

func (w *workerPool) PutTask(task Task) {
w.taskQueue <- task
}

var _ RayDashboardClientInterface = (*RayDashboardCacheClient)(nil)

type RayDashboardCacheClient struct {
client RayDashboardClientInterface
}

func (r *RayDashboardCacheClient) InitClient(client RayDashboardClientInterface) {
initWorkPool.Do(func() {
pool.init(taskQueueSize, workerSize, queryInterval)
})

initCacheStorage.Do(func() {
if cacheStorage == nil {
// the New() returns error only if the size is less or equal than zero.
cacheStorage, _ = lru.New[string, *JobInfoCache](cacheSize)
}

// expiry cache cleanup
go func() {
ticker := time.NewTicker(queryInterval * 10)
defer ticker.Stop()

// TODO: observability ?
// TODO: should we consider the stop ?
for range ticker.C {
keys := cacheStorage.Keys()
expiredThreshold := time.Now().Add(-cacheExpiry)
for _, key := range keys {
if cached, ok := cacheStorage.Peek(key); ok {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peek wouldn't update the recent-ness of cache.

if cached.UpdateAt.Before(expiredThreshold) {
cacheStorage.Remove(key)
}
}
}
}
}()
})

r.client = client
}

func (r *RayDashboardCacheClient) UpdateDeployments(ctx context.Context, configJson []byte) error {
return r.client.UpdateDeployments(ctx, configJson)
}

func (r *RayDashboardCacheClient) GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error) {
return r.client.GetServeDetails(ctx)
}

func (r *RayDashboardCacheClient) GetMultiApplicationStatus(ctx context.Context) (map[string]*utiltypes.ServeApplicationStatus, error) {
return r.client.GetMultiApplicationStatus(ctx)
}

func (r *RayDashboardCacheClient) GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error) {
if cached, ok := cacheStorage.Get(jobId); ok {
return cached.JobInfo, cached.Err
}
currentTime := time.Now()
placeholder := &JobInfoCache{Err: ErrAgain, UpdateAt: &currentTime}

// Put a placeholder in storage. The cache will be updated only if the placeholder exists.
// The placeholder will be removed when StopJob or DeleteJob.
if cached, existed, _ := cacheStorage.PeekOrAdd(jobId, placeholder); existed {
return cached.JobInfo, cached.Err
}

task := func() bool {
jobInfoCache, existed := cacheStorage.Get(jobId)
if !existed {
return false
}

jobInfoCache.JobInfo, jobInfoCache.Err = r.client.GetJobInfo(ctx, jobId)
currentTime := time.Now()
jobInfoCache.UpdateAt = &currentTime

if _, existed := cacheStorage.ContainsOrAdd(jobId, jobInfoCache); !existed {
return false
}

return !rayv1.IsJobTerminal(jobInfoCache.JobInfo.JobStatus)
}

pool.PutTask(task)

return nil, ErrAgain
}

func (r *RayDashboardCacheClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
return r.client.ListJobs(ctx)
}

func (r *RayDashboardCacheClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error) {
return r.client.SubmitJob(ctx, rayJob)
}

func (r *RayDashboardCacheClient) SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error) {
return r.client.SubmitJobReq(ctx, request)
}

func (r *RayDashboardCacheClient) GetJobLog(ctx context.Context, jobName string) (*string, error) {
return r.client.GetJobLog(ctx, jobName)
}

func (r *RayDashboardCacheClient) StopJob(ctx context.Context, jobName string) error {
cacheStorage.Remove(jobName)
return r.client.StopJob(ctx, jobName)
}

func (r *RayDashboardCacheClient) DeleteJob(ctx context.Context, jobName string) error {
cacheStorage.Remove(jobName)
return r.client.DeleteJob(ctx, jobName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
Copy link
Collaborator Author

@fscnick fscnick Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this method from the interface because the different implementation might have different input arguments.

UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
Expand Down
14 changes: 13 additions & 1 deletion ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)
type ClientProvider interface {
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface
DoesUseBackgroundGoroutine() bool
}

func ManagedByExternalController(controllerName *string) *string {
Expand Down Expand Up @@ -877,7 +878,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, useBackgroundGoroutine bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
if useKubernetesProxy {
Expand All @@ -897,12 +898,23 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
)
if useBackgroundGoroutine {
dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{}
dashboardCachedClient.InitClient(dashboardClient)
return dashboardCachedClient, nil
}
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)

if useBackgroundGoroutine {
dashboardCachedClient := &dashboardclient.RayDashboardCacheClient{}
dashboardCachedClient.InitClient(dashboardClient)
return dashboardCachedClient, nil
}
return dashboardClient, nil
}
}
Expand Down
1 change: 1 addition & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/coder/websocket v1.8.13
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jarcoal/httpmock v1.4.0
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.37.0
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading