Skip to content

Commit 38ba83a

Browse files
committed
add Async get Jobinfo method
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent ecf40fe commit 38ba83a

File tree

9 files changed

+50
-42
lines changed

9 files changed

+50
-42
lines changed

apiserver/pkg/server/ray_job_submission_service_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type RayJobSubmissionServiceServer struct {
4040
// Create RayJobSubmissionServiceServer
4141
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
4242
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
43-
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
43+
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil, nil)}
4444
}
4545

4646
// Submit Ray job

ray-operator/apis/config/v1alpha1/configuration_types.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package v1alpha1
22

33
import (
4+
"sync"
5+
46
corev1 "k8s.io/api/core/v1"
57
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
68
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -84,8 +86,8 @@ type Configuration struct {
8486
EnableMetrics bool `json:"enableMetrics,omitempty"`
8587
}
8688

87-
func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
88-
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
89+
func (config Configuration) GetDashboardClient(mgr manager.Manager, jobInfoMap *sync.Map, taskQueue chan func()) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
90+
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, jobInfoMap, taskQueue)
8991
}
9092

9193
func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,14 @@ const (
3636
RayJobDefaultRequeueDuration = 3 * time.Second
3737
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
3838
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
39+
DashboardWorkerNum = 100
40+
TaskQueueSize = 500
3941
)
4042

4143
var jobInfoMap sync.Map
4244

4345
// Simple worker pool for job info updates
44-
var jobInfoChan = make(chan func(), 300) // Unbuffered channel with unlimited capacity
45-
46-
func init() {
47-
// Start 10 worker goroutines that will live for the entire program
48-
for i := 0; i < 100; i++ {
49-
go func() {
50-
for task := range jobInfoChan {
51-
task() // Execute the function
52-
}
53-
}()
54-
}
55-
}
46+
var taskQueue = make(chan func(), TaskQueueSize) // Unbuffered channel with unlimited capacity
5647

5748
// RayJobReconciler reconciles a RayJob object
5849
type RayJobReconciler struct {
@@ -70,7 +61,14 @@ type RayJobReconcilerOptions struct {
7061

7162
// NewRayJobReconciler returns a new reconcile.Reconciler
7263
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, options RayJobReconcilerOptions, provider utils.ClientProvider) *RayJobReconciler {
73-
dashboardClientFunc := provider.GetDashboardClient(mgr)
64+
dashboardClientFunc := provider.GetDashboardClient(mgr, &jobInfoMap, taskQueue)
65+
for i := 0; i < DashboardWorkerNum; i++ {
66+
go func() {
67+
for task := range taskQueue {
68+
task()
69+
}
70+
}()
71+
}
7472
return &RayJobReconciler{
7573
Client: mgr.GetClient(),
7674
Scheme: mgr.GetScheme(),
@@ -274,7 +272,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
274272
}
275273

276274
// Check the current status of ray jobs
277-
jobInfoFromMapInterface, exists := jobInfoMap.Load(rayJobInstance.Name)
275+
jobInfoFromMapInterface, exists := jobInfoMap.Load(rayJobInstance.Status.JobId)
278276
var jobInfoFromMap utiltypes.RayJobInfo
279277
if exists {
280278
jobInfoFromMap = jobInfoFromMapInterface.(utiltypes.RayJobInfo)
@@ -299,7 +297,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
299297
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
300298
}
301299
jobInfoFromMap = *jobInfo
302-
jobInfoMap.Store(rayJobInstance.Name, jobInfoFromMap)
300+
jobInfoMap.Store(rayJobInstance.Status.JobId, jobInfoFromMap)
303301
}
304302

305303
// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
@@ -321,26 +319,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
321319
reason = rayv1.AppFailed
322320
}
323321
} else {
324-
// Submit to simple worker pool instead of creating new goroutine
325-
select {
326-
case jobInfoChan <- func() {
327-
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
328-
if err != nil {
329-
logger.Error(err, "Failed to get Job client", "JobId", rayJobInstance.Status.JobId)
330-
return
331-
}
332-
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
333-
if err != nil {
334-
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
335-
return
336-
}
337-
jobInfoMap.Store(rayJobInstance.Name, *jobInfo)
338-
}:
339-
// Task submitted successfully
340-
default:
341-
// Channel full, skip this update
342-
logger.V(1).Info("Worker pool busy, skipping job info update")
322+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
323+
if err != nil {
324+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
343325
}
326+
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
344327
}
345328

346329
// Always update RayClusterStatus along with JobStatus and JobDeploymentStatus updates.

ray-operator/controllers/ray/rayservice_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type RayServiceReconciler struct {
5959

6060
// NewRayServiceReconciler returns a new reconcile.Reconciler
6161
func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayServiceReconciler {
62-
dashboardClientFunc := provider.GetDashboardClient(mgr)
62+
dashboardClientFunc := provider.GetDashboardClient(mgr, nil, nil)
6363
httpProxyClientFunc := provider.GetHttpProxyClient(mgr)
6464
return &RayServiceReconciler{
6565
Client: mgr.GetClient(),

ray-operator/controllers/ray/suite_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package ray
1818
import (
1919
"os"
2020
"path/filepath"
21+
"sync"
2122
"testing"
2223

2324
. "github.com/onsi/ginkgo/v2"
@@ -51,7 +52,7 @@ var (
5152

5253
type TestClientProvider struct{}
5354

54-
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
55+
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager, _ *sync.Map, _ chan func()) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
5556
return func(_ *rayv1.RayCluster, _ string) (utils.RayDashboardClientInterface, error) {
5657
return fakeRayDashboardClient, nil
5758
}

ray-operator/controllers/ray/utils/dashboard_httpclient.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"sync"
910

1011
"k8s.io/apimachinery/pkg/api/errors"
1112
"k8s.io/apimachinery/pkg/util/json"
@@ -35,10 +36,13 @@ type RayDashboardClientInterface interface {
3536
GetJobLog(ctx context.Context, jobName string) (*string, error)
3637
StopJob(ctx context.Context, jobName string) error
3738
DeleteJob(ctx context.Context, jobName string) error
39+
AsyncGetJobInfo(ctx context.Context, jobId string)
3840
}
3941

4042
type RayDashboardClient struct {
4143
client *http.Client
44+
jobInfoMap *sync.Map
45+
taskQueue chan func()
4246
dashboardURL string
4347
}
4448

@@ -342,3 +346,13 @@ func UnmarshalRuntimeEnvYAML(runtimeEnvYAML string) (utiltypes.RuntimeEnvType, e
342346
}
343347
return runtimeEnv, nil
344348
}
349+
350+
func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
351+
r.taskQueue <- func() {
352+
jobInfo, err := r.GetJobInfo(ctx, jobId)
353+
if err != nil {
354+
return
355+
}
356+
r.jobInfoMap.Store(jobId, *jobInfo)
357+
}
358+
}

ray-operator/controllers/ray/utils/fake_serve_httpclient.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,6 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error
7272
func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error {
7373
return nil
7474
}
75+
76+
func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) {
77+
}

ray-operator/controllers/ray/utils/util.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"reflect"
1212
"strconv"
1313
"strings"
14+
"sync"
1415
"time"
1516
"unicode"
1617

@@ -640,7 +641,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)
640641
}
641642

642643
type ClientProvider interface {
643-
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error)
644+
GetDashboardClient(mgr manager.Manager, jobInfoMap *sync.Map, taskQueue chan func()) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error)
644645
GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) RayHttpProxyClientInterface
645646
}
646647

@@ -757,7 +758,7 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
757758
return headServiceURL, nil
758759
}
759760

760-
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
761+
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *sync.Map, taskQueue chan func()) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
761762
return func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
762763
if useKubernetesProxy {
763764
var err error
@@ -774,6 +775,8 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
774775
// configured to communicate with the Kubernetes API server.
775776
client: mgr.GetHTTPClient(),
776777
dashboardURL: fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
778+
jobInfoMap: jobInfoMap,
779+
taskQueue: taskQueue,
777780
}, nil
778781
}
779782

@@ -782,6 +785,8 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
782785
Timeout: 2 * time.Second,
783786
},
784787
dashboardURL: "http://" + url,
788+
jobInfoMap: jobInfoMap,
789+
taskQueue: taskQueue,
785790
}, nil
786791
}
787792
}

ray-operator/test/sampleyaml/support.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg
7575

7676
g.Expect(err).ToNot(HaveOccurred())
7777
url := fmt.Sprintf("127.0.0.1:%d", localPort)
78-
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
78+
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil, nil)
7979
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
8080
g.Expect(err).ToNot(HaveOccurred())
8181
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())

0 commit comments

Comments
 (0)