Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rest api /api/executors does not show executors if TaskSchedulingPolicy::PullStaged #1175

Merged
merged 3 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,16 @@ impl datafusion::config::ConfigExtension for BallistaConfig {

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me

/// Ballista supports both push-based and pull-based task scheduling.
/// It is recommended that you try both to determine which is the best for your use case.
#[derive(Clone, Copy, Debug, serde::Deserialize, Default)]
#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))]
pub enum TaskSchedulingPolicy {
/// Pull-based scheduling works in a similar way to Apache Spark
#[default]
PullStaged,
/// push-based scheduling can result in lower latency.
PushStaged,
}

Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/api/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ExecutorMetaResponse {
pub id: String,
pub host: String,
pub port: u16,
pub last_seen: u128,
pub last_seen: Option<u128>,
}

#[derive(Debug, serde::Serialize)]
Expand Down Expand Up @@ -98,7 +98,7 @@ pub async fn get_executors<
id: metadata.id,
host: metadata.host,
port: metadata.port,
last_seen: duration.as_millis(),
last_seen: duration.map(|d| d.as_millis()),
})
.collect();

Expand Down
14 changes: 13 additions & 1 deletion ballista/scheduler/src/cluster/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl ClusterState for InMemoryClusterState {
spec: ExecutorData,
) -> Result<()> {
let executor_id = metadata.id.clone();
log::debug!("registering executor: {}", executor_id);

self.save_executor_metadata(metadata).await?;
self.save_executor_heartbeat(ExecutorHeartbeat {
Expand All @@ -223,6 +224,11 @@ impl ClusterState for InMemoryClusterState {
}

async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
log::debug!("save executor metadata: {}", metadata.id);
// TODO: MM it would make sense to add time when ExecutorMetadata is persisted
// we can do that adding additional field in ExecutorMetadata representing
// insert time. This information may be useful when reporting executor
// status and heartbeat is not available (in case of `TaskSchedulingPolicy::PullStaged`)
self.executors.insert(metadata.id.clone(), metadata);
Ok(())
}
Expand All @@ -239,6 +245,7 @@ impl ClusterState for InMemoryClusterState {
}

async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()> {
log::debug!("saving executor heartbeat: {}", heartbeat.executor_id);
let executor_id = heartbeat.executor_id.clone();
if let Some(mut last) = self.heartbeats.get_mut(&executor_id) {
let _ = std::mem::replace(last.deref_mut(), heartbeat);
Expand All @@ -250,12 +257,13 @@ impl ClusterState for InMemoryClusterState {
}

async fn remove_executor(&self, executor_id: &str) -> Result<()> {
log::debug!("removing executor: {}", executor_id);
{
let mut guard = self.task_slots.lock().await;

guard.remove(executor_id);
}

self.executors.remove(executor_id);
self.heartbeats.remove(executor_id);

Ok(())
Expand All @@ -271,6 +279,10 @@ impl ClusterState for InMemoryClusterState {
fn get_executor_heartbeat(&self, executor_id: &str) -> Option<ExecutorHeartbeat> {
self.heartbeats.get(executor_id).map(|r| r.value().clone())
}

async fn registered_executor_metadata(&self) -> Vec<ExecutorMetadata> {
self.executors.iter().map(|v| v.clone()).collect()
}
}

/// Implementation of `JobState` which keeps all state in memory. If using `InMemoryJobState`
Expand Down
3 changes: 3 additions & 0 deletions ballista/scheduler/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ pub trait ClusterState: Send + Sync + 'static {
/// Get executor metadata for the provided executor ID. Returns an error if the executor does not exist
async fn get_executor_metadata(&self, executor_id: &str) -> Result<ExecutorMetadata>;

/// return list of registered executors
async fn registered_executor_metadata(&self) -> Vec<ExecutorMetadata>;

/// Save the executor heartbeat
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()>;

Expand Down
30 changes: 15 additions & 15 deletions ballista/scheduler/src/state/executor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,16 @@ impl ExecutorManager {
}

/// Get a list of all executors along with the timestamp of their last recorded heartbeat
pub async fn get_executor_state(&self) -> Result<Vec<(ExecutorMetadata, Duration)>> {
let heartbeat_timestamps: Vec<(String, u64)> = self
.cluster_state
.executor_heartbeats()
.into_iter()
.map(|(executor_id, heartbeat)| (executor_id, heartbeat.timestamp))
.collect();

let mut state: Vec<(ExecutorMetadata, Duration)> = vec![];
for (executor_id, ts) in heartbeat_timestamps {
let duration = Duration::from_secs(ts);

let metadata = self.get_executor_metadata(&executor_id).await?;

pub async fn get_executor_state(
&self,
) -> Result<Vec<(ExecutorMetadata, Option<Duration>)>> {
let mut state: Vec<(ExecutorMetadata, Option<Duration>)> = vec![];
for metadata in self.cluster_state.registered_executor_metadata().await {
let duration = self
.cluster_state
.get_executor_heartbeat(&metadata.id)
.map(|hb| hb.timestamp)
.map(Duration::from_secs);
state.push((metadata, duration));
}

Expand All @@ -224,6 +220,10 @@ impl ExecutorManager {
///
/// For push-based one, we should use [`register_executor`], instead.
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()> {
debug!(
"save executor metadata {} with {} task slots (pull-based registration)",
metadata.id, metadata.specification.task_slots
);
self.cluster_state.save_executor_metadata(metadata).await
}

Expand All @@ -238,7 +238,7 @@ impl ExecutorManager {
specification: ExecutorData,
) -> Result<()> {
debug!(
"registering executor {} with {} task slots",
"registering executor {} with {} task slots (push-based registration)",
metadata.id, specification.total_task_slots
);

Expand Down
Loading