From dc1bce079bd7b4cfb01cd054b2b8ed5eafa11819 Mon Sep 17 00:00:00 2001 From: Aidan Smith Date: Thu, 2 May 2024 04:25:43 -0400 Subject: [PATCH 1/4] Initial working 'State' --- src/query_graph.rs | 11 ++++- src/queue.rs | 117 +++++++++++++++++++++++++++++++++++++++++++-- src/server.rs | 79 +++++++++++++++++------------- 3 files changed, 168 insertions(+), 39 deletions(-) diff --git a/src/query_graph.rs b/src/query_graph.rs index 3c05800..c8b75d3 100644 --- a/src/query_graph.rs +++ b/src/query_graph.rs @@ -3,7 +3,6 @@ use crate::composable_database::{QueryStatus, TaskId}; use crate::task::{Task, TaskStatus}; use crate::task_queue::TaskQueue; use datafusion::arrow::datatypes::Schema; -use datafusion::common::JoinSide; use datafusion::physical_plan::aggregates::AggregateExec; use datafusion::physical_plan::joins::{ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec, @@ -15,6 +14,7 @@ use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::time::Duration; // TODO Change to Waiting, Ready, Running(vec[taskid]), Finished(vec[locations?]) #[derive(Clone, Debug, Default)] @@ -48,10 +48,11 @@ pub struct QueryGraph { tid_counter: AtomicU64, // TODO: add mutex to stages and make elements pointers to avoid copying pub stages: Vec, // Can be a vec since all stages in a query are enumerated from 0. task_queue: TaskQueue, // Ready tasks in this graph + pub time: Duration, } impl QueryGraph { - pub async fn new(query_id: u64, plan: Arc) -> Self { + pub fn new(query_id: u64, plan: Arc) -> Self { // Build stages. let mut builder = GraphBuilder::new(); let stages = builder.build(plan.clone()); @@ -63,6 +64,7 @@ impl QueryGraph { tid_counter: AtomicU64::new(0), stages, task_queue: TaskQueue::new(), + time: Duration::new(0, 0), }; // Build tasks for leaf stages. @@ -121,6 +123,7 @@ impl QueryGraph { let outputs = stage.outputs.clone(); if outputs.is_empty() { + println!("QueryGraph::update_stage_status: Query {} is done.", self.query_id); self.status = QueryStatus::Done; return Ok(()); } @@ -161,6 +164,10 @@ impl QueryGraph { } // fn build_tasks(&mut self) + pub fn get_plan(&self, stage_id: u64) -> Arc { + let plan = self.stages[stage_id as usize].plan.clone(); + plan + } } #[derive(Clone, Debug)] diff --git a/src/queue.rs b/src/queue.rs index 54021de..9cca6f3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -5,12 +5,15 @@ use crate::task::{ TaskStatus::{self, *}, }; use crate::SchedulerError; -use std::collections::{BTreeSet, HashMap}; +use dashmap::DashMap; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_proto::bytes::physical_plan_to_bytes; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::hash::{Hash, Hasher}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime}; -use datafusion_proto::bytes::physical_plan_to_bytes; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::{Mutex, Notify, RwLock}; // Must implement here since generated TaskId does not derive Hash. impl Hash for TaskId { @@ -48,6 +51,114 @@ pub struct Queue { avail: Arc, } +pub struct State { + // queue: Mutex>, + queue: Mutex>, + start_ts: SystemTime, + + query_id_counter: AtomicU64, + table: DashMap>, + running_tasks: DashMap, + notify: Arc, +} + +impl State { + pub fn new(notify: Arc) -> Self { + Self { + queue: Mutex::new(BTreeMap::new()), + start_ts: SystemTime::now(), + query_id_counter: AtomicU64::new(0), + table: DashMap::new(), + running_tasks: DashMap::new(), + notify, + } + } + + fn next_query_id(&self) -> u64 { + self.query_id_counter.fetch_add(1, Ordering::SeqCst) + } + + pub async fn add_query(&self, plan: Arc) -> u64 { + let id = self.next_query_id(); + let mut query = QueryGraph::new(id, plan); + let time = SystemTime::now().duration_since(self.start_ts).unwrap(); + query.time = time; + + self.table.insert(id, RwLock::new(query)); + self.queue.lock().await.insert(time, id); + + self.notify.notify_waiters(); + id + } + + pub async fn get_query_status(&self, query_id: u64) -> Option { + let status = self.table.get(&query_id)?.read().await.status; + if status == QueryStatus::Done { + self.table.remove(&query_id); + } + Some(status) + } + + pub async fn abort_query(&self, query_id: u64) { + todo!() + } + + pub async fn next_task(&self) -> Option<(TaskId, Arc)> { + let Some((duration, query_id)) = self.queue.lock().await.pop_first() else { + return None; + }; + let query = self.table.get(&query_id).unwrap(); + let mut guard = query.write().await; + + let mut task = guard.next_task(); + task.status = Running(SystemTime::now()); + let task_id = task.task_id; + let plan = guard.get_plan(task.task_id.stage_id); + + // Update query to reflect running task. Requeue if more tasks are available. + guard + .update_stage_status(task.task_id.stage_id, StageStatus::Running(0)) + .unwrap(); + if let QueryQueueStatus::Available = guard.get_queue_status() { + self.queue.lock().await.insert(duration, query_id); + self.notify.notify_waiters(); + } + + self.running_tasks.insert(task_id, task); + Some((task_id, plan)) + } + + pub async fn report_task(&self, task_id: TaskId, status: TaskStatus) { + if let Some((_, task)) = self.running_tasks.remove(&task_id) { + println!("Updating {:?} status to {:?}", task_id, status); + let TaskStatus::Running(ts) = task.status else { + println!("Task removed with status {:?}", task.status); + panic!("Task removed but is not running."); + }; + let query = self.table.get(&task_id.query_id).unwrap(); + let mut guard = query.write().await; + + match status { + TaskStatus::Finished => guard + .update_stage_status(task_id.stage_id, StageStatus::Finished(0)) + .unwrap(), + TaskStatus::Failed => todo!(), + TaskStatus::Aborted => todo!(), + _ => unreachable!(), + } + + let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap(); + let mut queue = self.queue.lock().await; + let _ = queue.remove(&guard.time); + if let QueryQueueStatus::Available = guard.get_queue_status() { + queue.insert(new_time, task_id.query_id); + self.notify.notify_waiters(); + } + guard.time = new_time; + } + } +} + // Notify variable is shared with scheduler service to control task dispatch. impl Queue { pub fn new(avail: Arc) -> Self { diff --git a/src/server.rs b/src/server.rs index c2b82d1..14f1e62 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,24 +4,22 @@ use crate::composable_database::{ QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet, TaskId, }; -use crate::intermediate_results::{get_results, TaskKey}; use crate::mock_catalog::load_catalog; -use crate::parser::ExecutionPlanParser; use crate::query_graph::{QueryGraph, StageStatus}; use crate::queue::Queue; +use crate::queue::State; +use crate::task::TaskStatus; use crate::SchedulerError; -use datafusion::arrow::util::pretty::print_batches; use datafusion::execution::context::SessionContext; -use datafusion_proto::bytes::physical_plan_from_bytes; +use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; use std::fmt; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::{Mutex, Notify}; -use tokio::time::{sleep, Duration}; -use tonic::transport::Server; use tonic::{Request, Response, Status}; pub struct SchedulerService { + state: Arc, queue: Arc>, ctx: Arc, // If we support changing the catalog at runtime, this should be a RwLock. query_id_counter: AtomicU64, @@ -30,11 +28,7 @@ pub struct SchedulerService { impl fmt::Debug for SchedulerService { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "SchedulerService {{ queue: {:?} }}", - self.queue, - ) + write!(f, "SchedulerService {{ queue: {:?} }}", self.queue,) } } @@ -42,6 +36,7 @@ impl SchedulerService { pub async fn new(catalog_path: &str) -> Self { let avail = Arc::new(Notify::new()); Self { + state: Arc::new(State::new(avail.clone())), queue: Arc::new(Mutex::new(Queue::new(Arc::clone(&avail)))), ctx: load_catalog(catalog_path).await, query_id_counter: AtomicU64::new(0), @@ -59,19 +54,28 @@ impl SchedulerService { task_id_opt: Option, ) -> Result<(TaskId, Vec), SchedulerError> { if let Some(task_id) = task_id_opt { - let mut queue = self.queue.lock().await; - // Remove the current task from the queue. - queue.remove_task(task_id, StageStatus::Finished(0)).await; + // let mut queue = self.queue.lock().await; + // // Remove the current task from the queue. + // queue.remove_task(task_id, StageStatus::Finished(0)).await; + self.state.report_task(task_id, TaskStatus::Finished).await; } loop { - let mut queue = self.queue.lock().await; - if let Some(new_task_id) = queue.next_task().await { - let stage = queue - .get_plan_bytes(new_task_id.query_id, new_task_id.stage_id) - .await?; - return Ok((new_task_id, stage)); + // let mut queue = self.queue.lock().await; + // if let Some(new_task_id) = queue.next_task().await { + // let stage = queue + // .get_plan_bytes(new_task_id.query_id, new_task_id.stage_id) + // .await?; + // return Ok((new_task_id, stage)); + // } + // drop(queue); + if let Some((task_id, plan)) = self.state.next_task().await { + let bytes = physical_plan_to_bytes(plan) + .expect("Failed to serialize physical plan") + .to_vec(); + println!("SchedulerService: Sending task {:?}", task_id); + return Ok((task_id, bytes)); } - drop(queue); + println!("SchedulerService: Waiting for new tasks."); self.avail.notified().await; } } @@ -94,13 +98,13 @@ impl SchedulerApi for SchedulerService { let plan = physical_plan_from_bytes(bytes.as_slice(), &self.ctx) .expect("Failed to deserialize physical plan"); - // println!("schedule_query: received plan {:?}", plan); - // Build a query graph, store in query table, enqueue new tasks. - let qid = self.next_query_id(); - let query = QueryGraph::new(qid, plan).await; - self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await; + let qid = self.state.add_query(plan).await; + // Build a query graph, store in query table, enqueue new tasks. + // let qid = self.next_query_id(); + // let query = QueryGraph::new(qid, plan); + // self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await; let response = ScheduleQueryRet { query_id: qid }; Ok(Response::new(response)) @@ -113,8 +117,14 @@ impl SchedulerApi for SchedulerService { ) -> Result, Status> { let QueryJobStatusArgs { query_id } = request.into_inner(); - let status = self.queue.lock().await.get_query_status(query_id).await; + let status = self + .state + .get_query_status(query_id) + .await + .unwrap_or(QueryStatus::NotFound); + // let status = self.queue.lock().await.get_query_status(query_id).await; if status == QueryStatus::Done { + println!("SchedulerService: Query {} is done.", query_id); let stage_id = 0; // let final_result = get_results(&TaskKey { stage_id, query_id }) // .await @@ -127,8 +137,8 @@ impl SchedulerApi for SchedulerService { return Ok(Response::new(QueryJobStatusRet { query_status: QueryStatus::Done.into(), - stage_id: stage_id, - query_id: query_id + stage_id, + query_id, })); // ****************** END CHANGES FROM INTEGRATION TESTING****************// } @@ -145,7 +155,8 @@ impl SchedulerApi for SchedulerService { ) -> Result, Status> { // TODO: Actually call executor API to abort query. let AbortQueryArgs { query_id } = request.into_inner(); - self.queue.lock().await.abort_query(query_id).await; + // self.queue.lock().await.abort_query(query_id).await; + self.state.abort_query(query_id).await; let response = AbortQueryRet { aborted: true }; Ok(Response::new(response)) } @@ -243,10 +254,10 @@ mod tests { test_file ); } - println!( - "test_scheduler: queued {} tasks.", - scheduler_service.queue.lock().await.size() - ); + // println!( + // "test_scheduler: queued {} tasks.", + // scheduler_service.queue.lock().await.size() + // ); // TODO: add concurrent test eventually let mut send_task = NotifyTaskStateArgs { From 1fd2286a2ee9a5ff5c7a4d8f6530398c843a9c85 Mon Sep 17 00:00:00 2001 From: Aidan Smith Date: Thu, 2 May 2024 04:48:05 -0400 Subject: [PATCH 2/4] Clean up dead code and make tests run --- src/queue.rs | 272 +++++--------------------------------------------- src/server.rs | 29 +----- 2 files changed, 26 insertions(+), 275 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 9cca6f3..3e70ce4 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -30,35 +30,19 @@ impl Eq for TaskId {} impl Copy for TaskId {} -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct QueryKey { - pub ft: Duration, - pub qid: u64, -} - #[derive(Debug)] -pub struct Queue { - // The queue used to order queries by executor usage. - queue: BTreeSet, - // The startup time of the queue, used to calculate new global passes. - start_ts: SystemTime, - // Structure that maps query IDs to query keys. - query_map: HashMap>, Arc>)>, - // table: DashMap>, - // List of currently running tasks. - running_task_map: HashMap, - // Notify primitive that signals when new tasks are ready. - avail: Arc, -} - pub struct State { // queue: Mutex>, + // The queue used to order queries by executor usage. queue: Mutex>, start_ts: SystemTime, query_id_counter: AtomicU64, + // Structure that maps query IDs to query keys. table: DashMap>, + // List of currently running tasks. running_tasks: DashMap, + // Notify primitive that signals when new tasks are ready. notify: Arc, } @@ -157,169 +141,9 @@ impl State { guard.time = new_time; } } -} - -// Notify variable is shared with scheduler service to control task dispatch. -impl Queue { - pub fn new(avail: Arc) -> Self { - Self { - queue: BTreeSet::new(), - start_ts: SystemTime::now(), - query_map: HashMap::new(), - running_task_map: HashMap::new(), - avail, - } - } - - // Send the status update for a task to its query graph structure. - // Based on the query's availability, change its priority. - // Used in both add_running_task and remove_task. - async fn update_status( - &mut self, - old_key: QueryKey, - new_key: QueryKey, - finished_stage_id: u64, - finished_stage_status: StageStatus, - ) { - // Get the graph for this query - let graph = Arc::clone(&self.query_map.get(&old_key.qid).unwrap().1); - // Temporarily remove query from queue, if present, and get its graph - let _ = self.queue.remove(&old_key); - - // If graph has more tasks available, re-insert query and notify - let mut guard = graph.lock().await; - guard - .update_stage_status(finished_stage_id, finished_stage_status) - .unwrap(); - if let QueryQueueStatus::Available = guard.get_queue_status() { - self.queue.insert(new_key); - self.avail.notify_waiters(); - } - } - - // Mark this task as running. - async fn add_running_task(&mut self, mut task: Task, key: QueryKey) { - // Change the task's status to running. - task.status = Running(SystemTime::now()); - // Add the task to the list of running tasks. - self.running_task_map.insert(task.task_id, task.clone()); - // Send the update to the query graph and reorder queue. - // WARNING: stage_status may not be 'running' if tasks and stages are not 1:1 - self.update_status( - key.clone(), - key, - task.task_id.stage_id, - StageStatus::Running(0), - ) - .await; - } - - /* Get the minimum element of the queue, or None if the queue is empty */ - fn min(&mut self) -> Option { - self.queue.pop_first() - } - - #[cfg(test)] - pub fn size(&self) -> usize { - self.queue.len() - } - - // TODO(makototomokiyo): make sure stride actually works - pub async fn add_query(&mut self, qid: u64, graph: Arc>) { - let key = QueryKey { - // running: 0, - ft: SystemTime::now().duration_since(self.start_ts).unwrap(), - qid, - }; - self.query_map - .insert(qid, (Arc::new(Mutex::new(key)), Arc::clone(&graph))); - self.queue.insert(key); - self.avail.notify_waiters(); - } - - /* - Remove this task from the list of running tasks and mark it as done. - This function forwards task info to the task's query graph, - updating it if necessary. - */ - // TODO: handle aborted queries - pub async fn remove_task(&mut self, task_id: TaskId, finished_stage_status: StageStatus) { - // Remove the task from the running map. - let task = self.running_task_map.remove(&task_id).unwrap(); - debug_assert!(task.task_id == task_id); - // Get the query ID. - let query = task_id.query_id; - // Get the key corresponding to the task's query. - let mut key = self.query_map.get(&query).unwrap().0.lock().await; - // Ensure the task is running. - if let Running(start_ts) = task.status { - let old_key = *key; - // Increment the query's pass using the task's elapsed time. - (*key).ft += SystemTime::now().duration_since(start_ts).unwrap(); - let new_key = *key; - drop(key); // to avoid double mutable borrow - self.update_status(old_key, new_key, task_id.stage_id, finished_stage_status) - .await; - } else { - panic!("Task removed but is not running."); - } - } - - /* - Return the next task, or None if the queue is empty. - Blocking is handled in the server. - */ - pub async fn next_task(&mut self) -> Option { - if let Some(key) = self.min() { - // If a query is available, get its next task - let graph = &self.query_map.get(&key.qid).unwrap().1; - println!("Queue size before getting task: {:#?}", self.queue.len()); - let new_task = graph.lock().await.next_task(); - debug_assert!(matches!(new_task.status, TaskStatus::Ready)); - - self.add_running_task(new_task.clone(), key).await; - Some(new_task.task_id) - } else { - None - } - } - - pub async fn get_query_status(&mut self, qid: u64) -> QueryStatus { - if let Some(query_entry) = self.query_map.get(&qid) { - let status = query_entry.1.lock().await.status; - let key = *query_entry.0.lock().await; - // If query is done, return DONE and delete from table - if status == QueryStatus::Done { - self.query_map.remove(&key.qid).expect("Query not found."); - } - return status; - } else { - return QueryStatus::NotFound; - } - } - - pub async fn abort_query(&mut self, qid: u64) { - if let Some(query_entry) = self.query_map.get(&qid) { - query_entry.1.lock().await.abort(); - self.query_map.remove(&qid); - } - } - - pub async fn get_plan_bytes( - &self, - query_id: u64, - stage_id: u64, - ) -> Result, SchedulerError> { - let t = &self.query_map; - if let Some((_, graph)) = t.get(&query_id) { - let plan = Arc::clone(&graph.lock().await.stages[stage_id as usize].plan); - Ok(physical_plan_to_bytes(plan) - .expect("Failed to serialize physical plan") - .to_vec()) - } else { - Err(SchedulerError::Error("Graph not found.".to_string())) - } + pub async fn size(&self) -> usize { + self.queue.lock().await.len() } } @@ -327,68 +151,38 @@ impl Queue { mod tests { use rand::Rng; use std::fs; - use std::time::Duration; use tokio::sync::{Mutex, Notify}; - use tokio::time::sleep; use crate::parser::ExecutionPlanParser; - use crate::{ - composable_database::TaskId, - query_graph::{QueryGraph, StageStatus}, - queue::{QueryKey, Queue}, - }; - use std::{ - cmp::min, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - time::SystemTime, - }; - - // Test that query keys compare properly. - #[tokio::test] - async fn test_query_key_cmp() { - let then = SystemTime::now(); - let now1 = SystemTime::now().duration_since(then).unwrap(); - sleep(Duration::from_secs(1)).await; - let now2 = SystemTime::now().duration_since(then).unwrap(); - - let key1 = QueryKey { - ft: now1.clone(), - qid: 1, - }; - let key2 = QueryKey { ft: now2, qid: 0 }; - let key3 = QueryKey { ft: now1, qid: 0 }; - // Make sure durations are compared first - assert!(key1 < key2); - // Then qids - assert!(key3 < key1); - } + use crate::queue::State; + use crate::task::TaskStatus; + use std::{cmp::min, sync::Arc}; // Deprecated, use test_queue_conc instead #[tokio::test] async fn test_queue() { - let test_file = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/expr.slt"); + let test_file = concat!( + env!("CARGO_MANIFEST_DIR"), + "/test_sql/test_select_multiple.sql" + ); let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/"); - let mut queue = Box::new(Queue::new(Arc::new(Notify::new()))); + let queue = Box::new(State::new(Arc::new(Notify::new()))); let parser = ExecutionPlanParser::new(catalog_path).await; println!("test_scheduler: Testing file {}", test_file); if let Ok(physical_plans) = parser.get_execution_plan_from_file(&test_file).await { let mut qid = 0; // Add a bunch of queries for plan in &physical_plans { - let graph = QueryGraph::new(qid, Arc::clone(plan)).await; - queue.add_query(qid, Arc::new(Mutex::new(graph))).await; + queue.add_query(Arc::clone(plan)).await; qid += 1; } - let mut tasks: Vec = Vec::new(); + let mut tasks = Vec::new(); for _ in 0..qid { tasks.push(queue.next_task().await.expect("No tasks left in queue.")); } for _ in 0..qid { queue - .remove_task(tasks.pop().unwrap(), StageStatus::Finished(0)) + .report_task(tasks.pop().unwrap().0, TaskStatus::Finished) .await; } } else { @@ -400,7 +194,7 @@ mod tests { async fn test_queue_conc() { let test_sql_dir = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/"); let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/"); - let queue = Arc::new(Mutex::new(Queue::new(Arc::new(Notify::new())))); + let queue = Arc::new(State::new(Arc::new(Notify::new()))); let parser = ExecutionPlanParser::new(catalog_path).await; println!("test_queue_conc: Testing files in {}", test_sql_dir); @@ -411,29 +205,17 @@ mod tests { physical_plans.extend(parser.get_execution_plan_from_file(&path).await.unwrap()); } - // let physical_plans = parser - // .get_execution_plan_from_file(&test_file) - // .await - // .unwrap(); println!("Got {} plans.", physical_plans.len()); let nplans = min(physical_plans.len(), 400); let plans = physical_plans[..nplans].to_vec(); - let qid: Arc> = Arc::new(Mutex::new(AtomicU64::new(0))); // Add a bunch of queries let mut jobs = Vec::new(); for plan in plans { let queue_clone = Arc::clone(&queue); - let qid_clone = Arc::clone(&qid); // Spawn threads that each enqueue a job jobs.push(tokio::spawn(async move { - let query_id = qid_clone.lock().await.fetch_add(1, Ordering::SeqCst); - let graph = QueryGraph::new(query_id, Arc::clone(&plan)).await; - queue_clone - .lock() - .await - .add_query(query_id, Arc::new(Mutex::new(graph))) - .await; + let _ = queue_clone.add_query(Arc::clone(&plan)).await; })); } @@ -444,18 +226,14 @@ mod tests { jobs.push(tokio::spawn(async move { // Get a plan, looping until one is available loop { - let task_opt = queue_clone.lock().await.next_task().await; - if let Some(task) = task_opt { - queue_clone - .lock() - .await - .remove_task(task, StageStatus::Finished(0)) - .await; + let task_opt = queue_clone.next_task().await; + if let Some((task, _plan)) = task_opt { + queue_clone.report_task(task, TaskStatus::Finished).await; } let time = rand::thread_rng().gen_range(200..4000); tokio::time::sleep(tokio::time::Duration::from_millis(time)).await; // Return if no more queries left. - if queue_clone.lock().await.queue.len() == 0 { + if queue_clone.size().await == 0 { return; } } @@ -468,8 +246,8 @@ mod tests { } // println!("Queued {} queries.", qid.lock().await.load(Ordering::SeqCst)); // make sure no more tasks remain - assert!(Arc::clone(&queue).lock().await.next_task().await.is_none()); - assert!(queue.lock().await.size() == 0); + assert!(Arc::clone(&queue).next_task().await.is_none()); + assert!(queue.size().await == 0); println!("Finished {:?} tasks.", nplans); } } diff --git a/src/server.rs b/src/server.rs index 14f1e62..45c336a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,6 @@ use crate::composable_database::{ }; use crate::mock_catalog::load_catalog; use crate::query_graph::{QueryGraph, StageStatus}; -use crate::queue::Queue; use crate::queue::State; use crate::task::TaskStatus; use crate::SchedulerError; @@ -20,15 +19,13 @@ use tonic::{Request, Response, Status}; pub struct SchedulerService { state: Arc, - queue: Arc>, ctx: Arc, // If we support changing the catalog at runtime, this should be a RwLock. - query_id_counter: AtomicU64, avail: Arc, } impl fmt::Debug for SchedulerService { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SchedulerService {{ queue: {:?} }}", self.queue,) + write!(f, "SchedulerService {{ state: {:?} }}", self.state,) } } @@ -37,37 +34,20 @@ impl SchedulerService { let avail = Arc::new(Notify::new()); Self { state: Arc::new(State::new(avail.clone())), - queue: Arc::new(Mutex::new(Queue::new(Arc::clone(&avail)))), ctx: load_catalog(catalog_path).await, - query_id_counter: AtomicU64::new(0), avail, } } - fn next_query_id(&self) -> u64 { - self.query_id_counter.fetch_add(1, Ordering::SeqCst) - } - // Get the next task from the queue. async fn next_task( &self, task_id_opt: Option, ) -> Result<(TaskId, Vec), SchedulerError> { if let Some(task_id) = task_id_opt { - // let mut queue = self.queue.lock().await; - // // Remove the current task from the queue. - // queue.remove_task(task_id, StageStatus::Finished(0)).await; self.state.report_task(task_id, TaskStatus::Finished).await; } loop { - // let mut queue = self.queue.lock().await; - // if let Some(new_task_id) = queue.next_task().await { - // let stage = queue - // .get_plan_bytes(new_task_id.query_id, new_task_id.stage_id) - // .await?; - // return Ok((new_task_id, stage)); - // } - // drop(queue); if let Some((task_id, plan)) = self.state.next_task().await { let bytes = physical_plan_to_bytes(plan) .expect("Failed to serialize physical plan") @@ -101,11 +81,6 @@ impl SchedulerApi for SchedulerService { let qid = self.state.add_query(plan).await; - // Build a query graph, store in query table, enqueue new tasks. - // let qid = self.next_query_id(); - // let query = QueryGraph::new(qid, plan); - // self.queue.lock().await.add_query(qid, Arc::new(Mutex::new(query))).await; - let response = ScheduleQueryRet { query_id: qid }; Ok(Response::new(response)) } @@ -122,7 +97,6 @@ impl SchedulerApi for SchedulerService { .get_query_status(query_id) .await .unwrap_or(QueryStatus::NotFound); - // let status = self.queue.lock().await.get_query_status(query_id).await; if status == QueryStatus::Done { println!("SchedulerService: Query {} is done.", query_id); let stage_id = 0; @@ -155,7 +129,6 @@ impl SchedulerApi for SchedulerService { ) -> Result, Status> { // TODO: Actually call executor API to abort query. let AbortQueryArgs { query_id } = request.into_inner(); - // self.queue.lock().await.abort_query(query_id).await; self.state.abort_query(query_id).await; let response = AbortQueryRet { aborted: true }; Ok(Response::new(response)) From 5dafdf54a732f8f7eee8e57964198965ed1e3544 Mon Sep 17 00:00:00 2001 From: Aidan Smith Date: Thu, 2 May 2024 05:47:09 -0400 Subject: [PATCH 3/4] remove warnings --- src/bin/mock.rs | 2 -- src/executor_client.rs | 3 +-- src/frontend.rs | 4 ++-- src/integration_test.rs | 12 +++++------- src/mock_optimizer.rs | 2 +- src/parser.rs | 4 +--- src/query_graph.rs | 16 ++++++++-------- src/queue.rs | 12 ++++++++++-- src/server.rs | 6 ++---- src/task.rs | 1 + src/task_queue.rs | 11 ----------- 11 files changed, 31 insertions(+), 42 deletions(-) diff --git a/src/bin/mock.rs b/src/bin/mock.rs index 0afc7a0..d186723 100644 --- a/src/bin/mock.rs +++ b/src/bin/mock.rs @@ -122,7 +122,6 @@ async fn main() { const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml"); const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data"); -const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs"); const POLL_INTERVAL: Duration = Duration::from_millis(100); // creates server, executors, and the frontend @@ -165,7 +164,6 @@ pub async fn run_single_query( drop(frontend_lock); return Ok(()); } - unreachable!(); } async fn interactive_mode() { diff --git a/src/executor_client.rs b/src/executor_client.rs index 670886d..8817a88 100644 --- a/src/executor_client.rs +++ b/src/executor_client.rs @@ -32,14 +32,13 @@ use crate::composable_database::scheduler_api_client::SchedulerApiClient; use crate::composable_database::QueryStatus::InProgress; -use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus, TaskId}; +use crate::composable_database::{NotifyTaskStateArgs, NotifyTaskStateRet, QueryStatus}; use crate::frontend::JobInfo; use crate::intermediate_results::{insert_results, rewrite_query, TaskKey}; use crate::mock_catalog::load_catalog; use crate::mock_executor::MockExecutor; use chrono::Utc; use datafusion::execution::context::SessionContext; -use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::bytes::physical_plan_from_bytes; use std::path::Path; use std::path::PathBuf; diff --git a/src/frontend.rs b/src/frontend.rs index 03368f1..6f61e30 100644 --- a/src/frontend.rs +++ b/src/frontend.rs @@ -230,7 +230,7 @@ impl MockFrontend { let existing_value = self.jobs.insert( query_id, JobInfo { - query_id: query_id, + query_id, submitted_at: Utc::now(), sql_string: sql_string.to_string(), result: None, @@ -280,7 +280,7 @@ impl MockFrontend { // eprintln!("Polling!"); assert!(self.scheduler_api_client.is_some()); - let mut client = self.scheduler_api_client.as_mut().unwrap(); + let client = self.scheduler_api_client.as_mut().unwrap(); let keys: Vec = self.jobs.keys().cloned().collect(); for query_id in keys { diff --git a/src/integration_test.rs b/src/integration_test.rs index 55c7dfd..ed30d8d 100644 --- a/src/integration_test.rs +++ b/src/integration_test.rs @@ -8,21 +8,18 @@ use crate::server::SchedulerService; use datafusion::arrow::array::RecordBatch; use datafusion::error::DataFusionError; use datafusion::logical_expr::{col, Expr}; -use datafusion::prelude::{concat, SessionContext}; -use serde::{Deserialize, Serialize}; +use datafusion::prelude::SessionContext; use std::sync::Arc; use tokio::sync::Mutex; use tonic::transport::Server; pub struct IntegrationTest { catalog_path: String, - config_path: String, ctx: Arc, config: Config, pub frontend: Arc>, } -const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml"); pub const CATALOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data"); const LOG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executor_logs"); @@ -37,7 +34,6 @@ impl IntegrationTest { ctx, config, catalog_path, - config_path, frontend: Arc::new(Mutex::new(frontend)), } } @@ -195,7 +191,6 @@ impl IntegrationTest { mod tests { use crate::integration_test::IntegrationTest; use crate::parser::ExecutionPlanParser; - // use crate::CATALOG_PATH; use super::*; use datafusion::arrow::array::{Int32Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; @@ -203,12 +198,15 @@ mod tests { use std::sync::Arc; use tokio::fs; + const CONFIG_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml"); + async fn initialize_integration_test() -> IntegrationTest { let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data"); - let config_path = concat!(env!("CARGO_MANIFEST_DIR"), "/executors.toml"); + let config_path = CONFIG_PATH; IntegrationTest::new(catalog_path.to_string(), config_path.to_string()).await } + #[allow(dead_code)] pub async fn get_all_tpch_queries_test() -> Vec { let parser = ExecutionPlanParser::new(CATALOG_PATH).await; let mut res = Vec::new(); diff --git a/src/mock_optimizer.rs b/src/mock_optimizer.rs index bba534e..6e609f0 100644 --- a/src/mock_optimizer.rs +++ b/src/mock_optimizer.rs @@ -16,7 +16,7 @@ use crate::mock_catalog::load_catalog; use datafusion::error::DataFusionError; -use datafusion::execution::context::{SessionContext, SessionState}; +use datafusion::execution::context::SessionContext; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; diff --git a/src/parser.rs b/src/parser.rs index a182679..8bf81c8 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -26,16 +26,14 @@ use crate::mock_catalog::load_catalog; use datafusion::{ arrow::{ - array::{RecordBatch, RecordBatchReader}, + array::RecordBatch, ipc::{reader::FileReader, writer::FileWriter}, }, error::{DataFusionError, Result}, execution::context::SessionContext, physical_plan::ExecutionPlan, - physical_planner::PhysicalPlanner, }; use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; -use futures::TryFutureExt; use sqlparser::{dialect::GenericDialect, parser::Parser}; use std::{fmt, io::Cursor, sync::Arc}; use tokio::{fs::File, io::AsyncReadExt}; diff --git a/src/query_graph.rs b/src/query_graph.rs index c8b75d3..4a11645 100644 --- a/src/query_graph.rs +++ b/src/query_graph.rs @@ -1,15 +1,15 @@ #![allow(dead_code)] -use crate::composable_database::{QueryStatus, TaskId}; -use crate::task::{Task, TaskStatus}; +use crate::composable_database::QueryStatus; +use crate::task::Task; use crate::task_queue::TaskQueue; use datafusion::arrow::datatypes::Schema; -use datafusion::physical_plan::aggregates::AggregateExec; -use datafusion::physical_plan::joins::{ - CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec, -}; -use datafusion::physical_plan::limit::GlobalLimitExec; +// use datafusion::physical_plan::joins::{ +// use datafusion::physical_plan::aggregates::AggregateExec; +// CrossJoinExec, HashJoinExec, NestedLoopJoinExec, SortMergeJoinExec, SymmetricHashJoinExec, +// }; +// use datafusion::physical_plan::limit::GlobalLimitExec; +// use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; diff --git a/src/queue.rs b/src/queue.rs index 3734440..a135f0b 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -81,8 +81,15 @@ impl State { Some(status) } + // TODO: Graceful abort. pub async fn abort_query(&self, query_id: u64) { - todo!() + if let Some(query) = self.table.get(&query_id) { + { + let mut guard = query.write().await; + guard.abort(); + } + self.table.remove(&query_id); + } } pub async fn next_task(&self) -> Option<(TaskId, Arc)> { @@ -140,6 +147,7 @@ impl State { } } + #[allow(dead_code)] pub async fn size(&self) -> usize { self.queue.lock().await.len() } @@ -149,7 +157,7 @@ impl State { mod tests { use rand::Rng; use std::{fs, time::{Duration, SystemTime}}; - use tokio::{sync::{Mutex, Notify}, time::sleep}; + use tokio::{sync::Notify, time::sleep}; use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph}; use crate::queue::State; diff --git a/src/server.rs b/src/server.rs index 67facc8..e5e5180 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,20 +1,18 @@ -use crate::composable_database::scheduler_api_server::{SchedulerApi, SchedulerApiServer}; +use crate::composable_database::scheduler_api_server::SchedulerApi; use crate::composable_database::{ AbortQueryArgs, AbortQueryRet, NotifyTaskStateArgs, NotifyTaskStateRet, QueryInfo, QueryJobStatusArgs, QueryJobStatusRet, QueryStatus, ScheduleQueryArgs, ScheduleQueryRet, TaskId, }; use crate::mock_catalog::load_catalog; -use crate::query_graph::{QueryGraph, StageStatus}; use crate::queue::State; use crate::task::TaskStatus; use crate::SchedulerError; use datafusion::execution::context::SessionContext; use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; use std::fmt; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::Notify; use tonic::{Request, Response, Status}; pub struct SchedulerService { diff --git a/src/task.rs b/src/task.rs index 336ed6d..23e8421 100644 --- a/src/task.rs +++ b/src/task.rs @@ -3,6 +3,7 @@ use std::time::SystemTime; // TODO: some of these don't do anything since // the task is only created when it is ready +#[allow(dead_code)] #[derive(Debug, Clone)] pub enum TaskStatus { Ready, diff --git a/src/task_queue.rs b/src/task_queue.rs index 5fc9339..475fd98 100644 --- a/src/task_queue.rs +++ b/src/task_queue.rs @@ -31,17 +31,6 @@ impl TaskQueue { self.queue.pop_front().expect("Queue has no tasks.") } - pub fn push_tasks(&mut self, tasks: Vec) { - self.queue.extend(tasks); - } - - pub fn push_task(&mut self, task: Task) { - self.queue.push_back(task); - } - - pub fn pop_task(&mut self) -> Option { - self.queue.pop_front() - } } #[cfg(test)] From 8936b1e41fac9ed8e7313bbde78530b5c3b6efb5 Mon Sep 17 00:00:00 2001 From: Aidan Smith Date: Thu, 2 May 2024 05:48:38 -0400 Subject: [PATCH 4/4] try fifo query queue --- src/queue.rs | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index a135f0b..4d6b253 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -6,11 +6,11 @@ use crate::task::{ }; use dashmap::DashMap; use datafusion::physical_plan::ExecutionPlan; -use std::collections::BTreeMap; +use std::collections::VecDeque; use std::hash::{Hash, Hasher}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use tokio::sync::{Mutex, Notify, RwLock}; // Must implement here since generated TaskId does not derive Hash. @@ -30,9 +30,9 @@ impl Copy for TaskId {} #[derive(Debug)] pub struct State { - // queue: Mutex>, // The queue used to order queries by executor usage. - queue: Mutex>, + // queue: Mutex>, + queue: Mutex>, start_ts: SystemTime, query_id_counter: AtomicU64, @@ -47,7 +47,8 @@ pub struct State { impl State { pub fn new(notify: Arc) -> Self { Self { - queue: Mutex::new(BTreeMap::new()), + // queue: Mutex::new(BTreeMap::new()), + queue: Mutex::new(VecDeque::new()), start_ts: SystemTime::now(), query_id_counter: AtomicU64::new(0), table: DashMap::new(), @@ -67,7 +68,8 @@ impl State { query.time = time; self.table.insert(id, RwLock::new(query)); - self.queue.lock().await.insert(time, id); + // self.queue.lock().await.insert(time, id); + self.queue.lock().await.push_back(id); self.notify.notify_waiters(); id @@ -93,7 +95,8 @@ impl State { } pub async fn next_task(&self) -> Option<(TaskId, Arc)> { - let Some((duration, query_id)) = self.queue.lock().await.pop_first() else { + // let Some((duration, query_id)) = self.queue.lock().await.pop_first() else { + let Some(query_id) = self.queue.lock().await.pop_front() else { return None; }; let query = self.table.get(&query_id).unwrap(); @@ -109,7 +112,8 @@ impl State { .update_stage_status(task.task_id.stage_id, StageStatus::Running(0)) .unwrap(); if let QueryQueueStatus::Available = guard.get_queue_status() { - self.queue.lock().await.insert(duration, query_id); + // self.queue.lock().await.insert(duration, query_id); + self.queue.lock().await.push_back(query_id); self.notify.notify_waiters(); } @@ -120,7 +124,7 @@ impl State { pub async fn report_task(&self, task_id: TaskId, status: TaskStatus) { if let Some((_, task)) = self.running_tasks.remove(&task_id) { println!("Updating {:?} status to {:?}", task_id, status); - let TaskStatus::Running(ts) = task.status else { + let TaskStatus::Running(_ts) = task.status else { println!("Task removed with status {:?}", task.status); panic!("Task removed but is not running."); }; @@ -136,14 +140,19 @@ impl State { _ => unreachable!(), } - let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap(); let mut queue = self.queue.lock().await; - let _ = queue.remove(&guard.time); - if let QueryQueueStatus::Available = guard.get_queue_status() { - queue.insert(new_time, task_id.query_id); - self.notify.notify_waiters(); + // let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap(); + // let _ = queue.remove(&guard.time); + // if let QueryQueueStatus::Available = guard.get_queue_status() { + // queue.insert(new_time, task_id.query_id); + // self.notify.notify_waiters(); + // } + // guard.time = new_time; + if QueryQueueStatus::Available == guard.get_queue_status() + && !queue.contains(&task_id.query_id) + { + queue.push_back(task_id.query_id); } - guard.time = new_time; } } @@ -159,9 +168,9 @@ mod tests { use std::{fs, time::{Duration, SystemTime}}; use tokio::{sync::Notify, time::sleep}; - use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph}; use crate::queue::State; use crate::task::TaskStatus; + use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph}; use std::{cmp::min, sync::Arc}; // Deprecated, use test_queue_conc instead @@ -290,9 +299,7 @@ mod tests { // Add a bunch of queries with staggered submission time let start_enqueue = SystemTime::now(); for plan in long_plans { - queue - .add_query(Arc::clone(&plan)) - .await; + queue.add_query(Arc::clone(&plan)).await; sleep(Duration::from_millis(10)).await; } let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();