Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Job notification should emit job status for successful and failed jobs #1165

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,24 +380,27 @@ impl JobState for InMemoryJobState {
Some(Status::Successful(_)) | Some(Status::Failed(_))
) {
self.completed_jobs
.insert(job_id.to_string(), (status, Some(graph.clone())));
.insert(job_id.to_string(), (status.clone(), Some(graph.clone())));
self.running_jobs.remove(job_id);
} else if let Some(old_status) =
self.running_jobs.insert(job_id.to_string(), status)
{
self.job_event_sender.send(&JobStateEvent::JobUpdated {
job_id: job_id.to_string(),
status: old_status,
})
} else {
// otherwise update running job
self.running_jobs.insert(job_id.to_string(), status.clone());
}

// job change event emitted
// it is emitting current job status
self.job_event_sender.send(&JobStateEvent::JobUpdated {
job_id: job_id.to_string(),
status,
});

Ok(())
}

async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>> {
self.sessions
.get(session_id)
.map(|sess| sess.clone())
.map(|session_ctx| session_ctx.clone())
.ok_or_else(|| {
BallistaError::General(format!("No session for {session_id} found"))
})
Expand Down Expand Up @@ -500,11 +503,15 @@ mod test {

use crate::cluster::memory::InMemoryJobState;
use crate::cluster::test_util::{test_job_lifecycle, test_job_planning_failure};
use crate::cluster::{JobState, JobStateEvent};
use crate::test_utils::{
test_aggregation_plan, test_join_plan, test_two_aggregations_plan,
};
use ballista_core::error::Result;
use ballista_core::serde::protobuf::JobStatus;
use ballista_core::utils::{default_config_producer, default_session_builder};
use futures::StreamExt;
use tokio::sync::Barrier;

#[tokio::test]
async fn test_in_memory_job_lifecycle() -> Result<()> {
Expand Down Expand Up @@ -571,4 +578,42 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_in_memory_job_notification() -> Result<()> {
let state = InMemoryJobState::new(
"",
Arc::new(default_session_builder),
Arc::new(default_config_producer),
);

let event_stream = state.job_state_events().await?;
let barrier = Arc::new(Barrier::new(2));

let events = tokio::spawn({
let barrier = barrier.clone();
async move {
barrier.wait().await;
event_stream.collect::<Vec<JobStateEvent>>().await
}
});

barrier.wait().await;
test_job_lifecycle(state, test_aggregation_plan(4).await).await?;
let result = events.await?;
assert_eq!(2, result.len());
match result.last().unwrap() {
JobStateEvent::JobUpdated {
status:
JobStatus {
status: Some(ballista_core::serde::protobuf::job_status::Status::Successful(_)),
..
},
..
} => (), // assert!(true, "Last status should be successful job notification"),
_ => panic!("JobUpdated status expected"),
}

Ok(())
}
}
Loading