Skip to content
This repository was archived by the owner on Jun 6, 2024. It is now read-only.

Commit e7e5e20

Browse files
committed
try fifo query queue
1 parent 53f02b3 commit e7e5e20

File tree

1 file changed

+32
-19
lines changed

1 file changed

+32
-19
lines changed

src/queue.rs

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::task::{
66
};
77
use dashmap::DashMap;
88
use datafusion::physical_plan::ExecutionPlan;
9-
use std::collections::BTreeMap;
9+
use std::collections::{BTreeMap, VecDeque};
1010
use std::hash::{Hash, Hasher};
1111
use std::sync::atomic::{AtomicU64, Ordering};
1212
use std::sync::Arc;
@@ -30,9 +30,9 @@ impl Copy for TaskId {}
3030

3131
#[derive(Debug)]
3232
pub struct State {
33-
// queue: Mutex<VecDeque<QueryKey>>,
3433
// The queue used to order queries by executor usage.
35-
queue: Mutex<BTreeMap<Duration, u64>>,
34+
// queue: Mutex<BTreeMap<Duration, u64>>,
35+
queue: Mutex<VecDeque<u64>>,
3636
start_ts: SystemTime,
3737

3838
query_id_counter: AtomicU64,
@@ -47,7 +47,8 @@ pub struct State {
4747
impl State {
4848
pub fn new(notify: Arc<Notify>) -> Self {
4949
Self {
50-
queue: Mutex::new(BTreeMap::new()),
50+
// queue: Mutex::new(BTreeMap::new()),
51+
queue: Mutex::new(VecDeque::new()),
5152
start_ts: SystemTime::now(),
5253
query_id_counter: AtomicU64::new(0),
5354
table: DashMap::new(),
@@ -67,7 +68,8 @@ impl State {
6768
query.time = time;
6869

6970
self.table.insert(id, RwLock::new(query));
70-
self.queue.lock().await.insert(time, id);
71+
// self.queue.lock().await.insert(time, id);
72+
self.queue.lock().await.push_back(id);
7173

7274
self.notify.notify_waiters();
7375
id
@@ -86,7 +88,8 @@ impl State {
8688
}
8789

8890
pub async fn next_task(&self) -> Option<(TaskId, Arc<dyn ExecutionPlan>)> {
89-
let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
91+
// let Some((duration, query_id)) = self.queue.lock().await.pop_first() else {
92+
let Some(query_id) = self.queue.lock().await.pop_front() else {
9093
return None;
9194
};
9295
let query = self.table.get(&query_id).unwrap();
@@ -102,7 +105,8 @@ impl State {
102105
.update_stage_status(task.task_id.stage_id, StageStatus::Running(0))
103106
.unwrap();
104107
if let QueryQueueStatus::Available = guard.get_queue_status() {
105-
self.queue.lock().await.insert(duration, query_id);
108+
// self.queue.lock().await.insert(duration, query_id);
109+
self.queue.lock().await.push_back(query_id);
106110
self.notify.notify_waiters();
107111
}
108112

@@ -129,14 +133,19 @@ impl State {
129133
_ => unreachable!(),
130134
}
131135

132-
let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
133136
let mut queue = self.queue.lock().await;
134-
let _ = queue.remove(&guard.time);
135-
if let QueryQueueStatus::Available = guard.get_queue_status() {
136-
queue.insert(new_time, task_id.query_id);
137-
self.notify.notify_waiters();
137+
// let new_time = guard.time + SystemTime::now().duration_since(ts).unwrap();
138+
// let _ = queue.remove(&guard.time);
139+
// if let QueryQueueStatus::Available = guard.get_queue_status() {
140+
// queue.insert(new_time, task_id.query_id);
141+
// self.notify.notify_waiters();
142+
// }
143+
// guard.time = new_time;
144+
if QueryQueueStatus::Available == guard.get_queue_status()
145+
&& !queue.contains(&task_id.query_id)
146+
{
147+
queue.push_back(task_id.query_id);
138148
}
139-
guard.time = new_time;
140149
}
141150
}
142151

@@ -148,12 +157,18 @@ impl State {
148157
#[cfg(test)]
149158
mod tests {
150159
use rand::Rng;
151-
use std::{fs, time::{Duration, SystemTime}};
152-
use tokio::{sync::{Mutex, Notify}, time::sleep};
160+
use std::{
161+
fs,
162+
time::{Duration, SystemTime},
163+
};
164+
use tokio::{
165+
sync::{Mutex, Notify},
166+
time::sleep,
167+
};
153168

154-
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
155169
use crate::queue::State;
156170
use crate::task::TaskStatus;
171+
use crate::{parser::ExecutionPlanParser, query_graph::QueryGraph};
157172
use std::{cmp::min, sync::Arc};
158173

159174
// Deprecated, use test_queue_conc instead
@@ -282,9 +297,7 @@ mod tests {
282297
// Add a bunch of queries with staggered submission time
283298
let start_enqueue = SystemTime::now();
284299
for plan in long_plans {
285-
queue
286-
.add_query(Arc::clone(&plan))
287-
.await;
300+
queue.add_query(Arc::clone(&plan)).await;
288301
sleep(Duration::from_millis(10)).await;
289302
}
290303
let enq_time = SystemTime::now().duration_since(start_enqueue).unwrap();

0 commit comments

Comments
 (0)