Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: executor supports pluggable arrow flight server #1170

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl TryFrom<Config> for ExecutorProcessConfig {
override_runtime_producer: None,
override_logical_codec: None,
override_physical_codec: None,
override_arrow_flight_service: None,
})
}
}
115 changes: 80 additions & 35 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ use crate::flight_service::BallistaFlightService;
use crate::metrics::LoggingMetricsCollector;
use crate::shutdown::Shutdown;
use crate::shutdown::ShutdownNotifier;
use crate::terminate;
use crate::{execution_loop, executor_server};
use crate::{terminate, ArrowFlightServerProvider};

pub struct ExecutorProcessConfig {
pub bind_host: String,
Expand Down Expand Up @@ -101,6 +101,8 @@ pub struct ExecutorProcessConfig {
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
/// [PhysicalExtensionCodec] override option
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
/// [ArrowFlightServerProvider] implementation override option
pub override_arrow_flight_service: Option<Arc<ArrowFlightServerProvider>>,
}

impl ExecutorProcessConfig {
Expand Down Expand Up @@ -143,6 +145,7 @@ impl Default for ExecutorProcessConfig {
override_config_producer: None,
override_logical_codec: None,
override_physical_codec: None,
override_arrow_flight_service: None,
}
}
}
Expand All @@ -151,7 +154,7 @@ pub async fn start_executor_process(
opt: Arc<ExecutorProcessConfig>,
) -> ballista_core::error::Result<()> {
let addr = format!("{}:{}", opt.bind_host, opt.port);
let addr = addr.parse().map_err(|e: std::net::AddrParseError| {
let address = addr.parse().map_err(|e: std::net::AddrParseError| {
BallistaError::Configuration(e.to_string())
})?;

Expand All @@ -174,9 +177,12 @@ pub async fn start_executor_process(
opt.concurrent_tasks
};

info!("Running with config:");
info!("work_dir: {}", work_dir);
info!("concurrent_tasks: {}", concurrent_tasks);
info!(
"Executor starting ... (Datafusion Ballista {})",
BALLISTA_VERSION
);
info!("Executor working directory: {}", work_dir);
info!("Executor number of concurrent tasks: {}", concurrent_tasks);

// assign this executor an unique ID
let executor_id = Uuid::new_v4().to_string();
Expand Down Expand Up @@ -259,16 +265,16 @@ pub async fn start_executor_process(
"Could not connect to scheduler".to_string(),
)
}) {
Ok(conn) => {
Ok(connection) => {
info!("Connected to scheduler at {}", scheduler_url);
x = Some(conn);
x = Some(connection);
}
Err(e) => {
warn!(
"Failed to connect to scheduler at {} ({}); retrying ...",
scheduler_url, e
);
std::thread::sleep(time::Duration::from_millis(500));
tokio::time::sleep(time::Duration::from_millis(500)).await;
}
}
}
Expand All @@ -288,13 +294,15 @@ pub async fn start_executor_process(
let job_data_ttl_seconds = opt.job_data_ttl_seconds;

// Graceful shutdown notification
let shutdown_noti = ShutdownNotifier::new();
let shutdown_notification = ShutdownNotifier::new();

if opt.job_data_clean_up_interval_seconds > 0 {
let mut interval_time =
time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();

let mut shuffle_cleaner_shutdown = shutdown_notification.subscribe_for_shutdown();
let shuffle_cleaner_complete = shutdown_notification.shutdown_complete_tx.clone();

tokio::spawn(async move {
// As long as the shutdown notification has not been received
while !shuffle_cleaner_shutdown.is_shutdown() {
Expand Down Expand Up @@ -336,7 +344,7 @@ pub async fn start_executor_process(
executor.clone(),
default_codec,
stop_send,
&shutdown_noti,
&shutdown_notification,
)
.await?,
);
Expand All @@ -349,10 +357,19 @@ pub async fn start_executor_process(
)));
}
};
service_handlers.push(tokio::spawn(flight_server_run(
addr,
shutdown_noti.subscribe_for_shutdown(),
)));
let shutdown = shutdown_notification.subscribe_for_shutdown();
let override_flight = opt.override_arrow_flight_service.clone();

service_handlers.push(match override_flight {
None => {
info!("Starting built-in arrow flight service");
flight_server_task(address, shutdown).await
}
Some(flight_provider) => {
info!("Starting custom, user provided, arrow flight service");
(flight_provider)(address, shutdown)
}
});

let tasks_drained = TasksDrainedFuture(executor);

Expand Down Expand Up @@ -434,7 +451,7 @@ pub async fn start_executor_process(
shutdown_complete_tx,
notify_shutdown,
..
} = shutdown_noti;
} = shutdown_notification;

// When `notify_shutdown` is dropped, all components which have `subscribe`d will
// receive the shutdown signal and can exit
Expand All @@ -449,25 +466,21 @@ pub async fn start_executor_process(
}

// Arrow flight service
async fn flight_server_run(
addr: SocketAddr,
async fn flight_server_task(
address: SocketAddr,
mut grpc_shutdown: Shutdown,
) -> Result<(), BallistaError> {
let service = BallistaFlightService::new();
let server = FlightServiceServer::new(service);
info!(
"Ballista v{} Rust Executor Flight Server listening on {:?}",
BALLISTA_VERSION, addr
);

let shutdown_signal = grpc_shutdown.recv();
let server_future = create_grpc_server()
.add_service(server)
.serve_with_shutdown(addr, shutdown_signal);

server_future.await.map_err(|e| {
error!("Tonic error, Could not start Executor Flight Server.");
BallistaError::TonicError(e)
) -> JoinHandle<Result<(), BallistaError>> {
tokio::spawn(async move {
info!("Built-in arrow flight server listening on: {:?}", address);

let server_future = create_grpc_server()
.add_service(FlightServiceServer::new(BallistaFlightService::new()))
.serve_with_shutdown(address, grpc_shutdown.recv());

server_future.await.map_err(|e| {
error!("Could not start built-in arrow flight server.");
BallistaError::TonicError(e)
})
})
}

Expand Down Expand Up @@ -640,4 +653,36 @@ mod tests {
let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}

#[tokio::test]
async fn test_arrow_flight_provider_ergonomics() {
let config = crate::executor_process::ExecutorProcessConfig {
override_arrow_flight_service: Some(std::sync::Arc::new(
move |address, mut grpc_shutdown| {
tokio::spawn(async move {
log::info!(
"custom arrow flight server listening on: {:?}",
address
);

let server_future = ballista_core::utils::create_grpc_server()
.add_service(
arrow_flight::flight_service_server::FlightServiceServer::new(
crate::flight_service::BallistaFlightService::new(),
),
)
.serve_with_shutdown(address, grpc_shutdown.recv());

server_future.await.map_err(|e| {
log::error!("Could not start built-in arrow flight server.");
ballista_core::error::BallistaError::TonicError(e)
})
})
},
)),
..Default::default()
};

assert!(config.override_arrow_flight_service.is_some());
}
}
15 changes: 15 additions & 0 deletions ballista/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,33 @@ pub mod terminate;
mod cpu_bound_executor;
mod standalone;

use ballista_core::error::BallistaError;
use std::net::SocketAddr;

pub use standalone::new_standalone_executor;
pub use standalone::new_standalone_executor_from_builder;
pub use standalone::new_standalone_executor_from_state;

use log::info;

use crate::shutdown::Shutdown;
use ballista_core::serde::protobuf::{
task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask,
TaskStatus,
};
use ballista_core::serde::scheduler::PartitionId;

/// [ArrowFlightServerProvider] provides a function which creates a new Arrow Flight server.
///
/// The function should take two arguments:
/// [SocketAddr] - the address to bind the server to
/// [Shutdown] - a shutdown signal to gracefully shutdown the server
/// Returns a [tokio::task::JoinHandle] which will be registered as service handler
///
pub type ArrowFlightServerProvider = dyn Fn(SocketAddr, Shutdown) -> tokio::task::JoinHandle<Result<(), BallistaError>>
+ Send
+ Sync;

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TaskExecutionTimes {
launch_time: u64,
Expand Down
Loading