Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit 53f02b3

Browse files
committed
Merge branch 'main' into cleanup
Fix scheduler test
2 parents 1fd2286 + 847a5d4 commit 53f02b3

File tree

9 files changed

+97
-40
lines changed

9 files changed

+97
-40
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ test_data/
2323
**/*.tbl
2424

2525
executor_logs/*
26+
2627
job_summary.json

docs/architecture.png

207 KB
Loading
-64.1 KB
Binary file not shown.

docs/design_doc.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@
1414

1515
**75% Goals:**
1616
- Able to break down a physical plan into a distributed QUERY plan.
17-
- Achieve both inter-QUERY and intra-QUERY parallelism.
18-
- Provide job status.
1917
- End-to-end correctness/performance testing framework.
18+
- Provide job status.
2019

2120
**100% Goals:**
22-
- Implement data shuffling between QUERY stages.
21+
- Achieve both inter-QUERY and intra-QUERY parallelism.
2322
- Cost-based and dynamic priority scheduling for better fairness.
24-
- Data-locality optimizations.
2523
- Able to abort/cancel a QUERY.
2624

2725
**125% Goals:**
@@ -30,16 +28,18 @@
3028

3129
# Architectural Design
3230

33-
![Project Proposal Architecture](project_proposal_arch.png "Project Proposal Architecture Diagram")
31+
![Project Proposal Architecture](architecture.png "Project Proposal Architecture Diagram")
32+
3433

3534
**Architectural Components:**
3635
- **DAG Parser:** Parses a Datafusion ExecutionPlan into a DAG of stages, where each stage consists of tasks that can be completed without shuffling intermediate results. After decomposing the work, it then enqueues tasks into a work queue in a breadth-first manner.
37-
- **Work Queue:** A concurrent queue (initially FIFO) where tasks are enqueued by the DAG Parser. Each QUERY submitted by the optimizer also has a cost, allowing for heuristic adjustments to the ordering.
36+
- **Work Queue:** A concurrent queue where tasks are enqueued by the DAG Parser. Each QUERY submitted by the optimizer also has a cost, allowing for heuristic adjustments to the ordering.
3837
- **Work Threads (tokio):** Tokio threads are created for each executor node to handle communications.
3938
- **QueryID Table:** An in-memory data structure mapping QueryIDs to a DAG of remaining QUERY fragments and cost estimates retrieved from the optimizer.
4039
- **Executors:** Each executor is connected to the scheduler and the other executors via gRPC (tonic).
41-
- **Intermediate Results**: Intermediate results are stored as a thread-safe HashMap<TaskKey, Vec<RecordBatch> in shared memory. All executors will be able to access intermediate results without having to serialize/deserialize data.
40+
- **Intermediate Results**: Intermediate results are stored as a thread-safe hashmap in shared memory. All executors will be able to access intermediate results without having to serialize/deserialize data.
4241

42+
![Task Dispatch Loop](task_dispatch_loop.png "Task Dispatch Loop")
4343
**Workflow:**
4444
1. Receives Datafusion ExecutionPlans from Query Optimizer and parses them into DAG, then stores in QueryID Table.
4545
2. Leaves of DAG are added to work queue that work threads can pull from.
@@ -62,6 +62,8 @@ Individual components within the scheduler will be unit tested using Rust’s te
6262

6363
The end-to-end testing framework is composed of three primary components: the mock frontend, the mock catalog, and the mock executors.
6464

65+
66+
![E2E Testing Architecture](e2e-testing-arch.png)
6567
### 1. Frontend
6668
The `MockFrontend` class is responsible for:
6769
- Establishing and maintaining a connection with the scheduler.
@@ -106,12 +108,12 @@ These consist of DataFusion executors and gRPC clients that execute tasks, recei
106108
### Performance Benchmarking
107109
To assess the scheduler's capacity to handle complex OLAP queries and maintain high throughput, we intend to use the integration test framework to simultaneously submit all 22 TPC-H queries across a cluster of executors. We will collect the following metrics:
108110

111+
- **Speedup from Scaling Out Executors**: Measure the speedup gained from increasing the number of executors.
109112
- **Execution Time for Each Query**: Measure the duration from submission to completion for each query.
110113
- **Busy-Idle Time Ratio for Each Executor**: Record periods of activity and inactivity for each executor throughout the test.
111114

112115
Additionally, we plan to develop data visualization tools in Python to present the results more effectively.
113116

114-
![E2E Testing Architecture](e2e_testing_arch.png)
115117

116118
## Future Composability with Other Components
117119
The mock optimizer and executor can be directly replaced with alternative implementations without necessitating any additional changes to the system. While the catalog, cache, and storage layers are not directly integrated into the testing system, we plan to encapsulate most of the logic within the mock catalog to simplify future integration.

docs/task_dispatch_loop.png

226 KB
Loading

src/bin/server.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
use tonic::transport::Server;
21
use scheduler2::composable_database::scheduler_api_server::SchedulerApiServer;
32
use scheduler2::server::SchedulerService;
3+
use tonic::transport::Server;
44

55
#[tokio::main]
66
async fn main() -> Result<(), Box<dyn std::error::Error>> {
7-
87
let addr = "0.0.0.0:15721".parse().unwrap();
98

109
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_files/");

src/frontend.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::composable_database::scheduler_api_client::SchedulerApiClient;
4848
use crate::composable_database::QueryJobStatusArgs;
4949
use crate::composable_database::QueryStatus;
5050
use crate::composable_database::QueryStatus::InProgress;
51+
use crate::intermediate_results::{get_results, TaskKey};
5152
use crate::mock_catalog::load_catalog;
5253
use crate::mock_optimizer::Optimizer;
5354
use crate::parser::ExecutionPlanParser;
@@ -56,7 +57,6 @@ use datafusion::logical_expr::LogicalPlan;
5657
use datafusion::prelude::SessionContext;
5758
use serde::{Deserialize, Serialize};
5859
use tonic::Request;
59-
use crate::intermediate_results::{get_results, TaskKey};
6060

6161
#[derive(Clone, Serialize, Deserialize)]
6262
pub struct JobInfo {
@@ -329,10 +329,15 @@ impl MockFrontend {
329329
// }
330330
// };
331331

332-
let results = get_results(&TaskKey{stage_id: status.stage_id, query_id: status.query_id}).await
333-
.expect("api.rs: query is done but no results in table");
332+
let results = get_results(&TaskKey {
333+
stage_id: status.stage_id,
334+
query_id: status.query_id,
335+
})
336+
.await
337+
.expect("api.rs: query is done but no results in table");
334338

335-
let flattened_results: Vec<RecordBatch> = results.into_iter().flat_map(|r| r.into_iter()).collect();
339+
let flattened_results: Vec<RecordBatch> =
340+
results.into_iter().flat_map(|r| r.into_iter()).collect();
336341

337342
let updated_job_info = JobInfo {
338343
query_id,

src/queue.rs

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ use crate::task::{
44
Task,
55
TaskStatus::{self, *},
66
};
7-
use crate::SchedulerError;
87
use dashmap::DashMap;
98
use datafusion::physical_plan::ExecutionPlan;
10-
use datafusion_proto::bytes::physical_plan_to_bytes;
11-
use std::collections::{BTreeMap, BTreeSet, HashMap};
9+
use std::collections::BTreeMap;
1210
use std::hash::{Hash, Hasher};
1311
use std::sync::atomic::{AtomicU64, Ordering};
1412
use std::sync::Arc;
@@ -150,10 +148,10 @@ impl State {
150148
#[cfg(test)]
151149
mod tests {
152150
use rand::Rng;
153-
use std::fs;
154-
use tokio::sync::{Mutex, Notify};
151+
use std::{fs, time::{Duration, SystemTime}};
152+
use tokio::{sync::{Mutex, Notify}, time::sleep};
155153

156-
use crate::parser::ExecutionPlanParser;
154+
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
157155
use crate::queue::State;
158156
use crate::task::TaskStatus;
159157
use std::{cmp::min, sync::Arc};
@@ -250,4 +248,59 @@ mod tests {
250248
assert!(queue.size().await == 0);
251249
println!("Finished {:?} tasks.", nplans);
252250
}
251+
252+
// Test correctness of stride scheduling algorithm.
253+
#[tokio::test]
254+
async fn test_stride() {
255+
let test_sql_dir = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/");
256+
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/");
257+
let queue = Box::new(State::new(Arc::new(Notify::new())));
258+
let parser = ExecutionPlanParser::new(catalog_path).await;
259+
println!("test_queue_conc: Testing files in {}", test_sql_dir);
260+
261+
// Generate list of all tpch plans
262+
let mut physical_plans = Vec::new();
263+
for file in fs::read_dir(test_sql_dir).unwrap() {
264+
let path_buf = file.unwrap().path();
265+
let path = path_buf.to_str().unwrap();
266+
physical_plans.extend(parser.get_execution_plan_from_file(&path).await.unwrap());
267+
}
268+
269+
println!("Got {} plans.", physical_plans.len());
270+
let mut long_plans = Vec::new();
271+
272+
// Generate list of plans with at least `rounds` stages
273+
let rounds = 5;
274+
for plan in physical_plans {
275+
let graph = QueryGraph::new(0, Arc::clone(&plan));
276+
if graph.stages.len() >= rounds {
277+
long_plans.push(plan);
278+
}
279+
}
280+
let nplans = long_plans.len();
281+
282+
// Add a bunch of queries with staggered submission time
283+
let start_enqueue = SystemTime::now();
284+
for plan in long_plans {
285+
queue
286+
.add_query(Arc::clone(&plan))
287+
.await;
288+
sleep(Duration::from_millis(10)).await;
289+
}
290+
let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();
291+
292+
// Ensure correct order of queue
293+
for rnd in 0..rounds {
294+
for i in 0..nplans {
295+
let (task, _) = queue.next_task().await.unwrap();
296+
// Queries should be processed in order
297+
assert_eq!(task.query_id, i as u64);
298+
// "process" for at least as long as (max_pass - min_pass)
299+
sleep(enq_time).await;
300+
// Return task; update query's pass
301+
queue.report_task(task, TaskStatus::Finished).await;
302+
println!("(Round {}) Query {}/{} ok.", rnd + 1, i + 1, nplans);
303+
}
304+
}
305+
}
253306
}

src/server.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ impl SchedulerApi for SchedulerService {
7878

7979
let plan = physical_plan_from_bytes(bytes.as_slice(), &self.ctx)
8080
.expect("Failed to deserialize physical plan");
81-
8281
let qid = self.state.add_query(plan).await;
8382

8483
let response = ScheduleQueryRet { query_id: qid };
@@ -181,8 +180,8 @@ mod tests {
181180

182181
#[tokio::test]
183182
async fn test_scheduler() {
184-
let test_file = concat!(env!("CARGO_MANIFEST_DIR"), "/test_files/expr.slt");
185-
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_files/");
183+
let test_file = concat!(env!("CARGO_MANIFEST_DIR"), "/test_sql/1.sql");
184+
let catalog_path = concat!(env!("CARGO_MANIFEST_DIR"), "/test_data/");
186185
let scheduler_service = Box::new(SchedulerService::new(catalog_path).await);
187186
let parser = ExecutionPlanParser::new(catalog_path).await;
188187
println!("test_scheduler: Testing file {}", test_file);
@@ -227,29 +226,27 @@ mod tests {
227226
test_file
228227
);
229228
}
230-
// println!(
231-
// "test_scheduler: queued {} tasks.",
232-
// scheduler_service.queue.lock().await.size()
233-
// );
229+
println!(
230+
"test_scheduler: queued {} tasks.",
231+
scheduler_service.state.size().await
232+
);
234233

235234
// TODO: add concurrent test eventually
236235
let mut send_task = NotifyTaskStateArgs {
237236
task: None,
238237
success: true,
239238
};
240239
// may not terminate
241-
loop {
242-
let ret = scheduler_service
243-
.notify_task_state(Request::new(send_task.clone()))
244-
.await
245-
.unwrap();
246-
let NotifyTaskStateRet {
247-
has_new_task,
248-
task,
249-
physical_plan,
250-
} = ret.into_inner();
251-
assert!(task.is_some());
252-
send_task.task = task;
253-
}
240+
let ret = scheduler_service
241+
.notify_task_state(Request::new(send_task.clone()))
242+
.await
243+
.unwrap();
244+
let NotifyTaskStateRet {
245+
has_new_task,
246+
task,
247+
physical_plan,
248+
} = ret.into_inner();
249+
println!("test_scheduler: Received task {:?}", task.unwrap());
250+
send_task.task = task;
254251
}
255252
}

0 commit comments

Comments
 (0)