Skip to content

Commit 913e17b

Browse files
committed
add goroutine
Signed-off-by: You-Cheng Lin (Owen) <[email protected]>
1 parent d59cbd8 commit 913e17b

File tree

1 file changed

+51
-24
lines changed

1 file changed

+51
-24
lines changed

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 51 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"strconv"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/go-logr/logr"
@@ -36,6 +37,8 @@ const (
3637
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
3738
)
3839

40+
var jobInfoMap sync.Map
41+
3942
// RayJobReconciler reconciles a RayJob object
4043
type RayJobReconciler struct {
4144
client.Client
@@ -256,32 +259,40 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
256259
}
257260

258261
// Check the current status of ray jobs
259-
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
260-
if err != nil {
261-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
262-
}
263-
264-
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
265-
if err != nil {
266-
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
267-
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) {
268-
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
269-
if _, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance); err != nil {
270-
logger.Error(err, "Failed to submit the Ray job", "JobId", rayJobInstance.Status.JobId)
271-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
262+
jobInfoFromMapInterface, exists := jobInfoMap.Load(rayJobInstance.Name)
263+
var jobInfoFromMap utils.RayJobInfo
264+
if exists {
265+
jobInfoFromMap = jobInfoFromMapInterface.(utils.RayJobInfo)
266+
logger.Info("JobInfoFromMap exists", "JobId", rayJobInstance.Status.JobId, "JobInfo", jobInfoFromMap.JobStatus)
267+
} else {
268+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
269+
if err != nil {
270+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
271+
}
272+
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
273+
if err != nil {
274+
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
275+
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode && errors.IsBadRequest(err) {
276+
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
277+
if _, err := rayDashboardClient.SubmitJob(ctx, rayJobInstance); err != nil {
278+
logger.Error(err, "Failed to submit the Ray job", "JobId", rayJobInstance.Status.JobId)
279+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
280+
}
281+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
272282
}
273-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
283+
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
284+
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
274285
}
275-
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
276-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
286+
jobInfoFromMap = *jobInfo
287+
jobInfoMap.Store(rayJobInstance.Name, jobInfoFromMap)
277288
}
278289

279290
// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
280291
// to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob
281292
// as "Complete" or "Failed" to avoid unnecessary reconciliation.
282293
jobDeploymentStatus := rayv1.JobDeploymentStatusRunning
283294
reason := rayv1.JobFailedReason("")
284-
isJobTerminal := rayv1.IsJobTerminal(jobInfo.JobStatus)
295+
isJobTerminal := rayv1.IsJobTerminal(jobInfoFromMap.JobStatus)
285296
// If in K8sJobMode, further refine the terminal condition by checking if the submitter Job has finished.
286297
// See https://github.com/ray-project/kuberay/pull/1919 for reasons.
287298
if utils.HasSubmitter(rayJobInstance) {
@@ -290,27 +301,42 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
290301

291302
if isJobTerminal {
292303
jobDeploymentStatus = rayv1.JobDeploymentStatusComplete
293-
if jobInfo.JobStatus == rayv1.JobStatusFailed {
304+
if jobInfoFromMap.JobStatus == rayv1.JobStatusFailed {
294305
jobDeploymentStatus = rayv1.JobDeploymentStatusFailed
295306
reason = rayv1.AppFailed
296307
}
308+
} else {
309+
go func() {
310+
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
311+
if err != nil {
312+
logger.Error(err, "Failed to get Job client", "JobId", rayJobInstance.Status.JobId)
313+
return
314+
}
315+
jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
316+
if err != nil {
317+
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
318+
return
319+
}
320+
jobInfoMap.Store(rayJobInstance.Name, *jobInfo)
321+
}()
297322
}
298323

299324
// Always update RayClusterStatus along with JobStatus and JobDeploymentStatus updates.
300325
rayJobInstance.Status.RayClusterStatus = rayClusterInstance.Status
301-
rayJobInstance.Status.JobStatus = jobInfo.JobStatus
326+
rayJobInstance.Status.JobStatus = jobInfoFromMap.JobStatus
302327
rayJobInstance.Status.JobDeploymentStatus = jobDeploymentStatus
303328
rayJobInstance.Status.Reason = reason
304-
rayJobInstance.Status.Message = jobInfo.Message
329+
rayJobInstance.Status.Message = jobInfoFromMap.Message
305330
// It is safe to convert uint64 to int64 here because Ray Core uses `int64_t` under the hood,
306331
// even though the type defined in `message JobTableData` (gcs.proto) is `uint64`.
307332
// See `gcs_job_manager.cc` and the function `current_sys_time_ms` for more details.
308-
if jobInfo.StartTime != 0 {
309-
rayJobInstance.Status.RayJobStatusInfo.StartTime = &metav1.Time{Time: time.UnixMilli(utils.SafeUint64ToInt64(jobInfo.StartTime))}
333+
if jobInfoFromMap.StartTime != 0 {
334+
rayJobInstance.Status.RayJobStatusInfo.StartTime = &metav1.Time{Time: time.UnixMilli(utils.SafeUint64ToInt64(jobInfoFromMap.StartTime))}
310335
}
311-
if jobInfo.EndTime != 0 {
312-
rayJobInstance.Status.RayJobStatusInfo.EndTime = &metav1.Time{Time: time.UnixMilli(utils.SafeUint64ToInt64(jobInfo.EndTime))}
336+
if jobInfoFromMap.EndTime != 0 {
337+
rayJobInstance.Status.RayJobStatusInfo.EndTime = &metav1.Time{Time: time.UnixMilli(utils.SafeUint64ToInt64(jobInfoFromMap.EndTime))}
313338
}
339+
314340
case rayv1.JobDeploymentStatusSuspending, rayv1.JobDeploymentStatusRetrying:
315341
// The `suspend` operation should be atomic. In other words, if users set the `suspend` flag to true and then immediately
316342
// set it back to false, either all of the RayJob's associated resources should be cleaned up, or no resources should be
@@ -364,6 +390,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
364390
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
365391
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
366392
nowTime := time.Now()
393+
jobInfoMap.Delete(rayJobInstance.Name)
367394
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
368395
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus),
369396
"ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes,

0 commit comments

Comments
 (0)