From 6bc4ba68c8f7004ccfd6ce4deb7fb12b36326d8d Mon Sep 17 00:00:00 2001 From: Bogdan Date: Thu, 7 Aug 2025 11:54:18 +0300 Subject: [PATCH 1/4] chore: repro spread panic --- examples/Cargo.toml | 11 +- examples/src/main.rs | 793 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 696 insertions(+), 108 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ea69dc85..634fef25 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -3,4 +3,13 @@ name = "wt-examples" version = "0.1.0" edition = "2021" -[dependencies] \ No newline at end of file +[dependencies] +worktable = {path = "../", version = "0.7.2" } +tokio = { version = "1.39.2", features = ["full"] } +rkyv = { version = "0.8.9", features = ["uuid-1"] } +eyre = "0.6.12" +serde = { version = "1.0.215", features = ["derive"] } +futures = "0.3.30" +uuid = { version = "1.8.0", features = ["v4", "serde"] } +derive_more = { version = "1.0.0", features = ["full"] } +atomic_float = "1.1.0" diff --git a/examples/src/main.rs b/examples/src/main.rs index 15514a6a..f1af587a 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -1,107 +1,686 @@ -// use futures::executor::block_on; -// use worktable::prelude::*; -// use worktable::worktable; -// -// #[tokio::main] -// async fn main() { -// // describe WorkTable -// worktable!( -// name: My, -// persist: true, -// columns: { -// id: u64 primary_key autoincrement, -// val: i64, -// test: i32, -// attr: String, -// attr2: i32, -// attr_float: f64, -// attr_string: String, -// -// }, -// indexes: { -// idx1: attr, -// idx2: attr2 unique, -// idx3: attr_string, -// }, -// queries: { -// update: { -// ValById(val) by id, -// AllAttrById(attr, attr2) by id, -// UpdateOptionalById(test) by id, -// }, -// delete: { -// ByAttr() by attr, -// ById() by id, -// } -// } -// ); -// -// // Init Worktable -// let config = PersistenceConfig::new("data", "data"); -// let my_table = MyWorkTable::new(config).await.unwrap(); -// -// // WT rows (has prefix My because of table name) -// let row = MyRow { -// val: 777, -// attr: "Attribute0".to_string(), -// attr2: 345, -// test: 1, -// id: 0, -// attr_float: 100.0, -// attr_string: "String_attr0".to_string(), -// }; -// -// for i in 2..1000000_i64 { -// let row = MyRow { -// val: 777, -// attr: format!("Attribute{}", i), -// attr2: 345 + i as i32, -// test: i as i32, -// id: i as u64, -// attr_float: 100.0 + i as f64, -// attr_string: format!("String_attr{}", i), -// }; -// -// my_table.insert(row).unwrap(); -// } -// -// // insert -// let pk: MyPrimaryKey = my_table.insert(row).expect("primary key"); -// -// // Select ALL records from WT -// let _select_all = my_table.select_all().execute(); -// //println!("Select All {:?}", select_all); -// -// // Select All records with attribute TEST -// let _select_all = my_table.select_all().execute(); -// //println!("Select All {:?}", select_all); -// -// // Select by Idx -// //let _select_by_attr = my_table -// // .select_by_attr("Attribute1".to_string()) -// // .execute() -// //r .unwrap(); -// -// //for row in select_by_attr { -// // println!("Select by idx, row {:?}", row); -// //} -// -// // Update Value query -// let update = my_table.update_val_by_id(ValByIdQuery { val: 1337 }, pk.clone()); -// let _ = block_on(update); -// -// let _select_all = my_table.select_all().execute(); -// //println!("Select after update val {:?}", select_all); -// -// let delete = my_table.delete(pk); -// let _ = block_on(delete); -// -// let _select_all = my_table.select_all().execute(); -// //println!("Select after delete {:?}", select_all); -// -// let info = my_table.system_info(); -// -// println!("{info}"); -// } - -fn main() {} +use eyre::bail; +use rkyv::{Archive, Serialize}; +use uuid::timestamp; +use worktable::prelude::{MemStat, SizeMeasurable}; + +use std::{collections::HashMap, sync::Arc}; + +use tokio::sync::RwLock; +use worktable::{WorkTableError, select::SelectQueryExecutor}; + +use atomic_float::AtomicF64; +use futures::executor::block_on; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; +use worktable::prelude::*; +use worktable::worktable; + +#[derive( + Archive, + Clone, + Copy, + Debug, + Default, + serde::Deserialize, + rkyv::Deserialize, + Eq, + Hash, + MemStat, + Ord, + Serialize, + PartialEq, + PartialOrd, +)] +#[rkyv(compare(PartialEq), derive(Debug))] +pub enum Exchange { + #[default] + Unset, + BinanceFutures, + BinanceSpot, + HyperliquidPerpetuals, + HyperliquidSpot, + GateioSpot, + GateioFutures, + KucoinSpot, + KucoinFutures, + BybitSpot, + BybitFutures, + BsxFutures, + BitgetSpot, + BitgetFutures, + BitmexSpot, + VertexSpot, + VertexFutures, + OKXSpot, + OKXFutures, + DeribitFutures, +} + +impl SizeMeasurable for Exchange { + fn aligned_size(&self) -> usize { + size_of::() + } +} + +impl Exchange { + pub const TOTAL: usize = 19; + + pub fn is_hyper(&self) -> bool { + *self == Exchange::HyperliquidPerpetuals + } + + pub fn is_left(&self) -> bool { + *self == Exchange::BinanceFutures + } + + pub fn is_right(&self) -> bool { + *self == Exchange::HyperliquidPerpetuals + } + + pub fn all() -> [Self; Self::TOTAL] { + [ + Self::BinanceFutures, + Self::BinanceSpot, + Self::HyperliquidPerpetuals, + Self::HyperliquidSpot, + Self::GateioSpot, + Self::GateioFutures, + Self::KucoinSpot, + Self::KucoinFutures, + Self::BybitSpot, + Self::BybitFutures, + Self::BsxFutures, + Self::BitgetSpot, + Self::BitgetFutures, + Self::BitmexSpot, + Self::VertexSpot, + Self::VertexFutures, + Self::OKXSpot, + Self::OKXFutures, + Self::DeribitFutures, + ] + } + + pub const fn as_str(&self) -> &'static str { + match self { + Exchange::Unset => "Unset", + Exchange::BinanceFutures => "BIN-futures", + Exchange::BinanceSpot => "BIN-spot", + Exchange::HyperliquidPerpetuals => "HYP-perps", + Exchange::HyperliquidSpot => "HYP-spot", + Exchange::GateioSpot => "GAT-spot", + Exchange::GateioFutures => "GAT-futures", + Exchange::KucoinSpot => "KUC-spot", + Exchange::KucoinFutures => "KUC-futures", + Exchange::BybitSpot => "BYB-spot", + Exchange::BybitFutures => "BYB-futures", + Exchange::BsxFutures => "BSX-futures", + Exchange::BitgetSpot => "BGT-spot", + Exchange::BitgetFutures => "BGT-futures", + Exchange::BitmexSpot => "BMX-spot", + Exchange::VertexSpot => "VTX-spot", + Exchange::VertexFutures => "VTX-futures", + Exchange::OKXSpot => "OKX-spot", + Exchange::OKXFutures => "OKX-futures", + Exchange::DeribitFutures => "DBT-futures", + } + } +} + +impl std::fmt::Display for Exchange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +impl AsRef for Exchange { + fn as_ref(&self) -> &str { + self.as_str() + } +} + +worktable!( + name: Spread, + columns: { + id: u32 primary_key autoincrement, + left_timestamp: u64, + right_timestamp: u64, + left_exchange: Exchange, + right_exchange: Exchange, + left_price: f64, + right_price: f64, + spread: f64, + symbol: String, + key: String, + left_key: String, + right_key: String, + }, + indexes: { + key_idx: key unique, + symbol_idx: symbol, + left_key_idx: left_key, + right_key_idx: right_key + }, + queries: { + in_place: { + Left(left_price, left_timestamp, spread) by id, + Right(right_price, right_timestamp, spread) by id, + } + update: { + ExchangeLeft(left_exchange, left_key, key) by id, + ExchangeRight(right_exchange, right_key, key) by id, + } + } +); + +#[derive(Debug)] +struct MedianBucket { + ts: AtomicU64, + idx: AtomicUsize, + values: RwLock>, +} + +impl MedianBucket { + async fn add(&self, now: u64, value: f64) { + let prev_ts = self.ts.load(Ordering::Acquire); + + if prev_ts != now + && self + .ts + .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let values = self.values.write().await; + for val in values.iter() { + val.store(f64::NAN, Ordering::Relaxed); + } + self.idx.store(0, Ordering::Relaxed); + } + + let values = self.values.read().await; + let idx = self.idx.fetch_add(1, Ordering::Relaxed); + + if idx >= values.len() { + drop(values); + let mut values = self.values.write().await; + values.push(AtomicF64::new(value)); + } else { + values[idx].store(value, Ordering::Relaxed); + } + } + + async fn collect(&self, cutoff: u64) -> Option> { + if self.ts.load(Ordering::Acquire) <= cutoff { + return None; + } + + let values = self.values.read().await; + Some( + values + .iter() + .filter_map(|v| { + let x = v.load(Ordering::Relaxed); + if x.is_nan() { None } else { Some(x) } + }) + .collect(), + ) + } +} + +#[derive(Debug)] +struct Bucket { + ts: AtomicU64, + sum: AtomicF64, + count: AtomicUsize, +} + +#[derive(Debug)] +struct Window { + secs: u64, + start: AtomicU64, + avg_buckets: Box<[Bucket]>, + med_buckets: Box<[MedianBucket]>, +} + +impl Window { + fn new(secs: u64) -> Self { + let mut avg_vec = Vec::with_capacity(secs as usize); + let mut med_vec = Vec::with_capacity(secs as usize); + for _ in 0..secs { + avg_vec.push(Bucket { + ts: AtomicU64::new(0), + sum: AtomicF64::new(0.0), + count: AtomicUsize::new(0), + }); + med_vec.push(MedianBucket { + ts: AtomicU64::new(0), + values: RwLock::new(Vec::new()), + idx: AtomicUsize::new(0), + }); + } + Self { + secs, + start: AtomicU64::new(0), + avg_buckets: avg_vec.into_boxed_slice(), + med_buckets: med_vec.into_boxed_slice(), + } + } + + async fn add(&self, value: f64) { + let now = current_epoch_seconds(); + _ = self + .start + .compare_exchange(0, now, Ordering::Release, Ordering::Relaxed); + + let Ok(idx) = usize::try_from(now % self.secs) else { + return; + }; + + let avg_bucket = &self.avg_buckets[idx]; + let prev_ts = avg_bucket.ts.load(Ordering::Acquire); + if prev_ts != now + && avg_bucket + .ts + .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + avg_bucket.sum.store(0.0, Ordering::Relaxed); + avg_bucket.count.store(0, Ordering::Relaxed); + } + avg_bucket.sum.fetch_add(value, Ordering::Relaxed); + avg_bucket.count.fetch_add(1, Ordering::Relaxed); + + let med_bucket = &self.med_buckets[idx]; + med_bucket.add(now, value).await; + } + + fn average(&self) -> Option { + let now = current_epoch_seconds(); + let start = self.start.load(Ordering::Acquire); + if start == 0 || now < start.saturating_add(self.secs) { + return None; + } + + let cutoff = now.saturating_sub(self.secs); + let mut total = 0.0; + let mut cnt = 0usize; + + for bucket in self.avg_buckets.iter() { + let ts = bucket.ts.load(Ordering::Acquire); + if ts > cutoff { + total += bucket.sum.load(Ordering::Relaxed); + cnt += bucket.count.load(Ordering::Relaxed); + } + } + + if cnt == 0 { + None + } else { + Some(total / (cnt as f64)) + } + } + + async fn median(&self) -> Option { + let now = current_epoch_seconds(); + let cutoff = now.saturating_sub(self.secs); + + let mut all_values = Vec::new(); + + for bucket in self.med_buckets.iter() { + if let Some(mut vals) = bucket.collect(cutoff).await { + all_values.append(&mut vals); + } + } + + if all_values.is_empty() { + return None; + } + + all_values.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let mid = all_values.len() / 2; + Some(if all_values.len() % 2 == 0 { + (all_values[mid - 1] + all_values[mid]) / 2.0 + } else { + all_values[mid] + }) + } +} + +#[derive(Debug, Clone)] +pub struct SpreadWindow { + secs: u64, + windows: Arc>>, +} + +impl SpreadWindow { + pub fn new(secs: u64) -> Self { + Self { + secs, + windows: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn add(&self, symbol: &str, spread: f64) { + let guard = self.windows.read().await; + if let Some(window) = guard.get(symbol) { + window.add(spread).await; + return; + } + drop(guard); + let mut guard = self.windows.write().await; + let window = Window::new(self.secs); + window.add(spread).await; + guard.insert(symbol.to_string(), window); + } + + pub async fn average(&self, symbol: &str) -> Option { + let guard = self.windows.read().await; + guard.get(symbol)?.average() + } + + pub async fn median(&self, symbol: &str) -> Option { + let guard = self.windows.read().await; + guard.get(symbol)?.median().await + } +} + +fn current_epoch_seconds() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time before UNIX EPOCH") + .as_secs() +} + +fn get_basis_point(operand: f64, comparator: f64) -> f64 { + let left = operand - comparator; + let right = (operand + comparator) / 2.0; + (left / right) * 10_000.0 +} + +pub struct SpreadManager { + spread_wt: Arc, + windows: Arc>>, +} + +impl SpreadManager { + pub fn new(spread_wt: Arc) -> Self { + Self { + spread_wt, + windows: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn record( + &self, + exchange: Exchange, + symbol: &str, + ask_price: Option, + bid_price: Option, + timestamp: u64, + ) -> eyre::Result<()> { + let key = format!("{}-{}", symbol, exchange); + let left = self.spread_wt.select_by_left_key(key.clone()).execute()?; + + if left.is_empty() { + let id = self.spread_wt.get_next_pk().into(); + if let Err(e) = self.spread_wt.insert(SpreadRow { + id, + key: format!("unset-left-{key}"), + symbol: symbol.to_string(), + left_timestamp: timestamp, + left_price: bid_price.unwrap_or_default(), + left_exchange: exchange, + left_key: key.clone(), + spread: 0.0, + right_exchange: Exchange::Unset, + right_key: String::new(), + right_price: 0.0, + right_timestamp: 0, + }) { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to write unset left spread: {e}"); + return Err(e.into()); + } + }; + } + let right = self.spread_wt.select_by_right_key(key.clone()).execute()?; + if right.is_empty() { + let id = self.spread_wt.get_next_pk().into(); + if let Err(e) = self.spread_wt.insert(SpreadRow { + id, + key: format!("unset-right-{key}"), + symbol: symbol.to_string(), + right_timestamp: timestamp, + right_price: ask_price.unwrap_or_default(), + left_price: 0.0, + right_exchange: exchange, + right_key: key.clone(), + spread: 0.0, + left_exchange: Exchange::Unset, + left_key: String::new(), + left_timestamp: 0, + }) { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to write unset right spread: {e}"); + return Err(e.into()); + } + }; + } + + let spreads = self + .spread_wt + .select_by_symbol(symbol.to_string()) + .execute()?; + + for spread in spreads { + if spread.left_exchange == Exchange::Unset { + if spread.right_exchange == exchange { + continue; + } + let query = ExchangeLeftQuery { + left_exchange: exchange, + left_key: key.clone(), + key: format!("{}-{}", key, spread.right_key), + }; + let mut unset = spread.clone(); + unset.id = self.spread_wt.get_next_pk().into(); + if let Err(e) = self.spread_wt.update_exchange_left(query, spread.id).await { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to update left spread exchange: {e}"); + return Err(e.into()); + } + } else { + self.windows + .write() + .await + .insert((exchange, spread.right_exchange), SpreadWindow::new(15)); + } + if let Err(e) = self.spread_wt.insert(unset.clone()) { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to copy unset left spread: {e}, {unset:?}"); + return Err(e.into()); + } + }; + } + + if spread.right_exchange == Exchange::Unset { + if spread.left_exchange == exchange { + continue; + } + let query = ExchangeRightQuery { + right_exchange: exchange, + right_key: key.clone(), + key: format!("{}-{}", spread.left_key, key), + }; + let mut unset = spread.clone(); + unset.id = self.spread_wt.get_next_pk().into(); + if let Err(e) = self.spread_wt.update_exchange_right(query, spread.id).await { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to update right spread exchange: {e}"); + return Err(e.into()); + } + } else { + self.windows + .write() + .await + .insert((spread.left_exchange, exchange), SpreadWindow::new(15)); + } + if let Err(e) = self.spread_wt.insert(unset.clone()) { + if !matches!(e, WorkTableError::AlreadyExists(..)) { + println!("failed to copy unset right spread: {e}, {unset:?}"); + return Err(e.into()); + } + }; + } + } + + let spreads = self + .spread_wt + .select_by_symbol(symbol.to_string()) + .execute()?; + + for spread in spreads { + if spread.right_exchange == Exchange::Unset || spread.left_exchange == Exchange::Unset { + continue; + } + let value = if spread.left_exchange == exchange { + let bid_price = bid_price.unwrap_or(spread.left_price); + let value = get_basis_point(bid_price, spread.right_price); + + if let Err(e) = self + .spread_wt + .update_left_in_place( + |x| { + let (left_price, left_timestamp, spread) = x; + *left_timestamp = timestamp.into(); + *left_price = bid_price.into(); + *spread = value.into(); + }, + spread.id, + ) + .await + { + println!("failed to update left spread: {e}"); + return Err(e); + }; + value + } else if spread.right_exchange == exchange { + let ask_price = ask_price.unwrap_or(spread.right_price); + let value = get_basis_point(spread.left_price, ask_price); + + if let Err(e) = self + .spread_wt + .update_right_in_place( + |x| { + let (right_price, right_timestamp, spread) = x; + *right_timestamp = timestamp.into(); + *right_price = ask_price.into(); + *spread = value.into(); + }, + spread.id, + ) + .await + { + { + println!("failed to update right spread: {e}"); + return Err(e); + }; + }; + value + } else { + continue; + }; + + self.windows + .read() + .await + .get(&(spread.left_exchange, spread.right_exchange)) + .expect("window should exist") + .add(symbol, value) + .await; + } + + Ok(()) + } + + pub async fn median( + &self, + left_exchange: Exchange, + right_exchange: Exchange, + symbol: &str, + ) -> Option { + self.windows + .read() + .await + .get(&(left_exchange, right_exchange))? + .median(symbol) + .await + } +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +async fn main() { + let spread_wt = Arc::new(SpreadWorkTable::default()); + + let manager = Arc::new(SpreadManager::new(spread_wt.clone())); + + let mut symbols = Vec::new(); + + for i in 0..50000 { + let mut s = String::new(); + for j in 0..i { + s.push_str(j.to_string().as_str()); + } + symbols.push(s); + } + symbols.push(String::new()); + + let symbols = Arc::new(symbols); + + let ts_seq = Arc::new(AtomicU64::new(0)); + let ask_seq = Arc::new(AtomicF64::new(0.0)); + let bid_seq = Arc::new(AtomicF64::new(0.0)); + + let mut seq = Arc::new(AtomicUsize::new(0)); + let mut handles = Vec::new(); + + for i in 0..16 { + let ask_seq = ask_seq.clone(); + let bid_seq = bid_seq.clone(); + let ts_seq = ts_seq.clone(); + let manager = manager.clone(); + let seq = seq.clone(); + let symbols = symbols.clone(); + handles.push(tokio::spawn(async move { + for _ in 0..1_000_00 { + let timestamp = ts_seq.fetch_add(12, Ordering::Relaxed); + let idx = seq.fetch_add(1, Ordering::Relaxed) % 50000; + let symbol = symbols[idx].clone(); + let exchange = match idx % 3 { + 0 => Exchange::HyperliquidPerpetuals, + 1 => Exchange::BinanceFutures, + _ => Exchange::BybitFutures, + }; + let ask_price = ask_seq.fetch_add(1.0, Ordering::Relaxed); + let bid_price = bid_seq.fetch_add(10.0, Ordering::Relaxed); + // println!("record {exchange} {ask_price} {bid_price}"); + if let Err(e) = manager + .record( + exchange, + &symbol, + Some(ask_price), + Some(bid_price), + timestamp, + ) + .await + { + println!("failed to record spread: {e:?}"); + }; + } + })); + } + + for handle in handles { + println!("{:?}", handle.await); + } +} From caebae20853f8d59f10dc443799ddfa08c10f0ab Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 8 Aug 2025 00:45:56 +0300 Subject: [PATCH 2/4] apply unsized node stability fix --- Cargo.toml | 4 +- examples/Cargo.toml | 3 +- examples/src/main.rs | 1380 +++++++++-------- src/index/unsized_node.rs | 72 +- .../indexset/process_create_node.wt.idx | Bin 49152 -> 49152 bytes .../indexset/process_insert_at.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_big_amount.wt.idx | Bin 65536 -> 65536 bytes .../process_create_node.wt.idx | Bin 49152 -> 49152 bytes .../process_create_node_after_remove.wt.idx | Bin 65536 -> 65536 bytes .../process_create_second_node.wt.idx | Bin 65536 -> 65536 bytes .../process_insert_at.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_big_amount.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_removed_place.wt.idx | Bin 49152 -> 49152 bytes ...ocess_insert_at_with_node_id_update.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_at.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_at_node_id.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_node.wt.idx | Bin 65536 -> 65536 bytes .../process_split_node.wt.idx | Bin 65536 -> 65536 bytes .../indexset/process_create_node.wt.idx | Bin 49152 -> 49152 bytes .../indexset/process_insert_at.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_big_amount.wt.idx | Bin 65536 -> 65536 bytes .../process_create_node.wt.idx | Bin 49152 -> 49152 bytes .../process_create_node_after_remove.wt.idx | Bin 65536 -> 65536 bytes .../process_create_second_node.wt.idx | Bin 65536 -> 65536 bytes .../process_insert_at.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_big_amount.wt.idx | Bin 49152 -> 49152 bytes .../process_insert_at_removed_place.wt.idx | Bin 49152 -> 49152 bytes ...ocess_insert_at_with_node_id_update.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_at.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_at_node_id.wt.idx | Bin 49152 -> 49152 bytes .../process_remove_node.wt.idx | Bin 65536 -> 65536 bytes .../process_split_node.wt.idx | Bin 65536 -> 65536 bytes tests/worktable/index/insert.rs | 106 ++ 33 files changed, 840 insertions(+), 725 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 485f3c91..cf02da5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,9 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -data_bucket = "0.2.10" +# data_bucket = "0.2.10" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.2.9" } +data_bucket = { path = "../DataBucket", version = "0.2.10" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 634fef25..417d9a10 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -worktable = {path = "../", version = "0.7.2" } +worktable = { path = "../", version = "0.7.2" } tokio = { version = "1.39.2", features = ["full"] } rkyv = { version = "0.8.9", features = ["uuid-1"] } eyre = "0.6.12" @@ -13,3 +13,4 @@ futures = "0.3.30" uuid = { version = "1.8.0", features = ["v4", "serde"] } derive_more = { version = "1.0.0", features = ["full"] } atomic_float = "1.1.0" +rand = "0.9.1" diff --git a/examples/src/main.rs b/examples/src/main.rs index f1af587a..012bbf0d 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -1,686 +1,694 @@ -use eyre::bail; -use rkyv::{Archive, Serialize}; -use uuid::timestamp; -use worktable::prelude::{MemStat, SizeMeasurable}; - -use std::{collections::HashMap, sync::Arc}; - -use tokio::sync::RwLock; -use worktable::{WorkTableError, select::SelectQueryExecutor}; - -use atomic_float::AtomicF64; -use futures::executor::block_on; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; -use worktable::prelude::*; -use worktable::worktable; - -#[derive( - Archive, - Clone, - Copy, - Debug, - Default, - serde::Deserialize, - rkyv::Deserialize, - Eq, - Hash, - MemStat, - Ord, - Serialize, - PartialEq, - PartialOrd, -)] -#[rkyv(compare(PartialEq), derive(Debug))] -pub enum Exchange { - #[default] - Unset, - BinanceFutures, - BinanceSpot, - HyperliquidPerpetuals, - HyperliquidSpot, - GateioSpot, - GateioFutures, - KucoinSpot, - KucoinFutures, - BybitSpot, - BybitFutures, - BsxFutures, - BitgetSpot, - BitgetFutures, - BitmexSpot, - VertexSpot, - VertexFutures, - OKXSpot, - OKXFutures, - DeribitFutures, -} - -impl SizeMeasurable for Exchange { - fn aligned_size(&self) -> usize { - size_of::() - } -} - -impl Exchange { - pub const TOTAL: usize = 19; - - pub fn is_hyper(&self) -> bool { - *self == Exchange::HyperliquidPerpetuals - } - - pub fn is_left(&self) -> bool { - *self == Exchange::BinanceFutures - } - - pub fn is_right(&self) -> bool { - *self == Exchange::HyperliquidPerpetuals - } - - pub fn all() -> [Self; Self::TOTAL] { - [ - Self::BinanceFutures, - Self::BinanceSpot, - Self::HyperliquidPerpetuals, - Self::HyperliquidSpot, - Self::GateioSpot, - Self::GateioFutures, - Self::KucoinSpot, - Self::KucoinFutures, - Self::BybitSpot, - Self::BybitFutures, - Self::BsxFutures, - Self::BitgetSpot, - Self::BitgetFutures, - Self::BitmexSpot, - Self::VertexSpot, - Self::VertexFutures, - Self::OKXSpot, - Self::OKXFutures, - Self::DeribitFutures, - ] - } - - pub const fn as_str(&self) -> &'static str { - match self { - Exchange::Unset => "Unset", - Exchange::BinanceFutures => "BIN-futures", - Exchange::BinanceSpot => "BIN-spot", - Exchange::HyperliquidPerpetuals => "HYP-perps", - Exchange::HyperliquidSpot => "HYP-spot", - Exchange::GateioSpot => "GAT-spot", - Exchange::GateioFutures => "GAT-futures", - Exchange::KucoinSpot => "KUC-spot", - Exchange::KucoinFutures => "KUC-futures", - Exchange::BybitSpot => "BYB-spot", - Exchange::BybitFutures => "BYB-futures", - Exchange::BsxFutures => "BSX-futures", - Exchange::BitgetSpot => "BGT-spot", - Exchange::BitgetFutures => "BGT-futures", - Exchange::BitmexSpot => "BMX-spot", - Exchange::VertexSpot => "VTX-spot", - Exchange::VertexFutures => "VTX-futures", - Exchange::OKXSpot => "OKX-spot", - Exchange::OKXFutures => "OKX-futures", - Exchange::DeribitFutures => "DBT-futures", - } - } -} - -impl std::fmt::Display for Exchange { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.as_str()) - } -} - -impl AsRef for Exchange { - fn as_ref(&self) -> &str { - self.as_str() - } -} - -worktable!( - name: Spread, - columns: { - id: u32 primary_key autoincrement, - left_timestamp: u64, - right_timestamp: u64, - left_exchange: Exchange, - right_exchange: Exchange, - left_price: f64, - right_price: f64, - spread: f64, - symbol: String, - key: String, - left_key: String, - right_key: String, - }, - indexes: { - key_idx: key unique, - symbol_idx: symbol, - left_key_idx: left_key, - right_key_idx: right_key - }, - queries: { - in_place: { - Left(left_price, left_timestamp, spread) by id, - Right(right_price, right_timestamp, spread) by id, - } - update: { - ExchangeLeft(left_exchange, left_key, key) by id, - ExchangeRight(right_exchange, right_key, key) by id, - } - } -); - -#[derive(Debug)] -struct MedianBucket { - ts: AtomicU64, - idx: AtomicUsize, - values: RwLock>, -} - -impl MedianBucket { - async fn add(&self, now: u64, value: f64) { - let prev_ts = self.ts.load(Ordering::Acquire); - - if prev_ts != now - && self - .ts - .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - let values = self.values.write().await; - for val in values.iter() { - val.store(f64::NAN, Ordering::Relaxed); - } - self.idx.store(0, Ordering::Relaxed); - } - - let values = self.values.read().await; - let idx = self.idx.fetch_add(1, Ordering::Relaxed); - - if idx >= values.len() { - drop(values); - let mut values = self.values.write().await; - values.push(AtomicF64::new(value)); - } else { - values[idx].store(value, Ordering::Relaxed); - } - } - - async fn collect(&self, cutoff: u64) -> Option> { - if self.ts.load(Ordering::Acquire) <= cutoff { - return None; - } - - let values = self.values.read().await; - Some( - values - .iter() - .filter_map(|v| { - let x = v.load(Ordering::Relaxed); - if x.is_nan() { None } else { Some(x) } - }) - .collect(), - ) - } -} - -#[derive(Debug)] -struct Bucket { - ts: AtomicU64, - sum: AtomicF64, - count: AtomicUsize, -} - -#[derive(Debug)] -struct Window { - secs: u64, - start: AtomicU64, - avg_buckets: Box<[Bucket]>, - med_buckets: Box<[MedianBucket]>, -} - -impl Window { - fn new(secs: u64) -> Self { - let mut avg_vec = Vec::with_capacity(secs as usize); - let mut med_vec = Vec::with_capacity(secs as usize); - for _ in 0..secs { - avg_vec.push(Bucket { - ts: AtomicU64::new(0), - sum: AtomicF64::new(0.0), - count: AtomicUsize::new(0), - }); - med_vec.push(MedianBucket { - ts: AtomicU64::new(0), - values: RwLock::new(Vec::new()), - idx: AtomicUsize::new(0), - }); - } - Self { - secs, - start: AtomicU64::new(0), - avg_buckets: avg_vec.into_boxed_slice(), - med_buckets: med_vec.into_boxed_slice(), - } - } - - async fn add(&self, value: f64) { - let now = current_epoch_seconds(); - _ = self - .start - .compare_exchange(0, now, Ordering::Release, Ordering::Relaxed); - - let Ok(idx) = usize::try_from(now % self.secs) else { - return; - }; - - let avg_bucket = &self.avg_buckets[idx]; - let prev_ts = avg_bucket.ts.load(Ordering::Acquire); - if prev_ts != now - && avg_bucket - .ts - .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - avg_bucket.sum.store(0.0, Ordering::Relaxed); - avg_bucket.count.store(0, Ordering::Relaxed); - } - avg_bucket.sum.fetch_add(value, Ordering::Relaxed); - avg_bucket.count.fetch_add(1, Ordering::Relaxed); - - let med_bucket = &self.med_buckets[idx]; - med_bucket.add(now, value).await; - } - - fn average(&self) -> Option { - let now = current_epoch_seconds(); - let start = self.start.load(Ordering::Acquire); - if start == 0 || now < start.saturating_add(self.secs) { - return None; - } - - let cutoff = now.saturating_sub(self.secs); - let mut total = 0.0; - let mut cnt = 0usize; - - for bucket in self.avg_buckets.iter() { - let ts = bucket.ts.load(Ordering::Acquire); - if ts > cutoff { - total += bucket.sum.load(Ordering::Relaxed); - cnt += bucket.count.load(Ordering::Relaxed); - } - } - - if cnt == 0 { - None - } else { - Some(total / (cnt as f64)) - } - } - - async fn median(&self) -> Option { - let now = current_epoch_seconds(); - let cutoff = now.saturating_sub(self.secs); - - let mut all_values = Vec::new(); - - for bucket in self.med_buckets.iter() { - if let Some(mut vals) = bucket.collect(cutoff).await { - all_values.append(&mut vals); - } - } - - if all_values.is_empty() { - return None; - } - - all_values.sort_by(|a, b| a.partial_cmp(b).unwrap()); - let mid = all_values.len() / 2; - Some(if all_values.len() % 2 == 0 { - (all_values[mid - 1] + all_values[mid]) / 2.0 - } else { - all_values[mid] - }) - } -} - -#[derive(Debug, Clone)] -pub struct SpreadWindow { - secs: u64, - windows: Arc>>, -} - -impl SpreadWindow { - pub fn new(secs: u64) -> Self { - Self { - secs, - windows: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub async fn add(&self, symbol: &str, spread: f64) { - let guard = self.windows.read().await; - if let Some(window) = guard.get(symbol) { - window.add(spread).await; - return; - } - drop(guard); - let mut guard = self.windows.write().await; - let window = Window::new(self.secs); - window.add(spread).await; - guard.insert(symbol.to_string(), window); - } - - pub async fn average(&self, symbol: &str) -> Option { - let guard = self.windows.read().await; - guard.get(symbol)?.average() - } - - pub async fn median(&self, symbol: &str) -> Option { - let guard = self.windows.read().await; - guard.get(symbol)?.median().await - } -} - -fn current_epoch_seconds() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("system time before UNIX EPOCH") - .as_secs() -} - -fn get_basis_point(operand: f64, comparator: f64) -> f64 { - let left = operand - comparator; - let right = (operand + comparator) / 2.0; - (left / right) * 10_000.0 -} - -pub struct SpreadManager { - spread_wt: Arc, - windows: Arc>>, -} - -impl SpreadManager { - pub fn new(spread_wt: Arc) -> Self { - Self { - spread_wt, - windows: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub async fn record( - &self, - exchange: Exchange, - symbol: &str, - ask_price: Option, - bid_price: Option, - timestamp: u64, - ) -> eyre::Result<()> { - let key = format!("{}-{}", symbol, exchange); - let left = self.spread_wt.select_by_left_key(key.clone()).execute()?; - - if left.is_empty() { - let id = self.spread_wt.get_next_pk().into(); - if let Err(e) = self.spread_wt.insert(SpreadRow { - id, - key: format!("unset-left-{key}"), - symbol: symbol.to_string(), - left_timestamp: timestamp, - left_price: bid_price.unwrap_or_default(), - left_exchange: exchange, - left_key: key.clone(), - spread: 0.0, - right_exchange: Exchange::Unset, - right_key: String::new(), - right_price: 0.0, - right_timestamp: 0, - }) { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to write unset left spread: {e}"); - return Err(e.into()); - } - }; - } - let right = self.spread_wt.select_by_right_key(key.clone()).execute()?; - if right.is_empty() { - let id = self.spread_wt.get_next_pk().into(); - if let Err(e) = self.spread_wt.insert(SpreadRow { - id, - key: format!("unset-right-{key}"), - symbol: symbol.to_string(), - right_timestamp: timestamp, - right_price: ask_price.unwrap_or_default(), - left_price: 0.0, - right_exchange: exchange, - right_key: key.clone(), - spread: 0.0, - left_exchange: Exchange::Unset, - left_key: String::new(), - left_timestamp: 0, - }) { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to write unset right spread: {e}"); - return Err(e.into()); - } - }; - } - - let spreads = self - .spread_wt - .select_by_symbol(symbol.to_string()) - .execute()?; - - for spread in spreads { - if spread.left_exchange == Exchange::Unset { - if spread.right_exchange == exchange { - continue; - } - let query = ExchangeLeftQuery { - left_exchange: exchange, - left_key: key.clone(), - key: format!("{}-{}", key, spread.right_key), - }; - let mut unset = spread.clone(); - unset.id = self.spread_wt.get_next_pk().into(); - if let Err(e) = self.spread_wt.update_exchange_left(query, spread.id).await { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to update left spread exchange: {e}"); - return Err(e.into()); - } - } else { - self.windows - .write() - .await - .insert((exchange, spread.right_exchange), SpreadWindow::new(15)); - } - if let Err(e) = self.spread_wt.insert(unset.clone()) { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to copy unset left spread: {e}, {unset:?}"); - return Err(e.into()); - } - }; - } - - if spread.right_exchange == Exchange::Unset { - if spread.left_exchange == exchange { - continue; - } - let query = ExchangeRightQuery { - right_exchange: exchange, - right_key: key.clone(), - key: format!("{}-{}", spread.left_key, key), - }; - let mut unset = spread.clone(); - unset.id = self.spread_wt.get_next_pk().into(); - if let Err(e) = self.spread_wt.update_exchange_right(query, spread.id).await { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to update right spread exchange: {e}"); - return Err(e.into()); - } - } else { - self.windows - .write() - .await - .insert((spread.left_exchange, exchange), SpreadWindow::new(15)); - } - if let Err(e) = self.spread_wt.insert(unset.clone()) { - if !matches!(e, WorkTableError::AlreadyExists(..)) { - println!("failed to copy unset right spread: {e}, {unset:?}"); - return Err(e.into()); - } - }; - } - } - - let spreads = self - .spread_wt - .select_by_symbol(symbol.to_string()) - .execute()?; - - for spread in spreads { - if spread.right_exchange == Exchange::Unset || spread.left_exchange == Exchange::Unset { - continue; - } - let value = if spread.left_exchange == exchange { - let bid_price = bid_price.unwrap_or(spread.left_price); - let value = get_basis_point(bid_price, spread.right_price); - - if let Err(e) = self - .spread_wt - .update_left_in_place( - |x| { - let (left_price, left_timestamp, spread) = x; - *left_timestamp = timestamp.into(); - *left_price = bid_price.into(); - *spread = value.into(); - }, - spread.id, - ) - .await - { - println!("failed to update left spread: {e}"); - return Err(e); - }; - value - } else if spread.right_exchange == exchange { - let ask_price = ask_price.unwrap_or(spread.right_price); - let value = get_basis_point(spread.left_price, ask_price); - - if let Err(e) = self - .spread_wt - .update_right_in_place( - |x| { - let (right_price, right_timestamp, spread) = x; - *right_timestamp = timestamp.into(); - *right_price = ask_price.into(); - *spread = value.into(); - }, - spread.id, - ) - .await - { - { - println!("failed to update right spread: {e}"); - return Err(e); - }; - }; - value - } else { - continue; - }; - - self.windows - .read() - .await - .get(&(spread.left_exchange, spread.right_exchange)) - .expect("window should exist") - .add(symbol, value) - .await; - } - - Ok(()) - } - - pub async fn median( - &self, - left_exchange: Exchange, - right_exchange: Exchange, - symbol: &str, - ) -> Option { - self.windows - .read() - .await - .get(&(left_exchange, right_exchange))? - .median(symbol) - .await - } -} - -#[tokio::main(flavor = "multi_thread", worker_threads = 16)] -async fn main() { - let spread_wt = Arc::new(SpreadWorkTable::default()); - - let manager = Arc::new(SpreadManager::new(spread_wt.clone())); - - let mut symbols = Vec::new(); - - for i in 0..50000 { - let mut s = String::new(); - for j in 0..i { - s.push_str(j.to_string().as_str()); - } - symbols.push(s); - } - symbols.push(String::new()); - - let symbols = Arc::new(symbols); - - let ts_seq = Arc::new(AtomicU64::new(0)); - let ask_seq = Arc::new(AtomicF64::new(0.0)); - let bid_seq = Arc::new(AtomicF64::new(0.0)); - - let mut seq = Arc::new(AtomicUsize::new(0)); - let mut handles = Vec::new(); - - for i in 0..16 { - let ask_seq = ask_seq.clone(); - let bid_seq = bid_seq.clone(); - let ts_seq = ts_seq.clone(); - let manager = manager.clone(); - let seq = seq.clone(); - let symbols = symbols.clone(); - handles.push(tokio::spawn(async move { - for _ in 0..1_000_00 { - let timestamp = ts_seq.fetch_add(12, Ordering::Relaxed); - let idx = seq.fetch_add(1, Ordering::Relaxed) % 50000; - let symbol = symbols[idx].clone(); - let exchange = match idx % 3 { - 0 => Exchange::HyperliquidPerpetuals, - 1 => Exchange::BinanceFutures, - _ => Exchange::BybitFutures, - }; - let ask_price = ask_seq.fetch_add(1.0, Ordering::Relaxed); - let bid_price = bid_seq.fetch_add(10.0, Ordering::Relaxed); - // println!("record {exchange} {ask_price} {bid_price}"); - if let Err(e) = manager - .record( - exchange, - &symbol, - Some(ask_price), - Some(bid_price), - timestamp, - ) - .await - { - println!("failed to record spread: {e:?}"); - }; - } - })); - } - - for handle in handles { - println!("{:?}", handle.await); - } -} +// use eyre::bail; +// use rkyv::{Archive, Serialize}; +// use uuid::timestamp; +// use worktable::prelude::{MemStat, SizeMeasurable}; +// +// use std::{collections::HashMap, sync::Arc}; +// +// use tokio::sync::RwLock; +// use worktable::{select::SelectQueryExecutor, WorkTableError}; +// +// use atomic_float::AtomicF64; +// use futures::executor::block_on; +// use rand::distr::Alphanumeric; +// use rand::Rng; +// use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +// use std::time::{SystemTime, UNIX_EPOCH}; +// use worktable::prelude::*; +// use worktable::worktable; +// +// #[derive( +// Archive, +// Clone, +// Copy, +// Debug, +// Default, +// serde::Deserialize, +// rkyv::Deserialize, +// Eq, +// Hash, +// MemStat, +// Ord, +// Serialize, +// PartialEq, +// PartialOrd, +// )] +// #[rkyv(compare(PartialEq), derive(Debug))] +// pub enum Exchange { +// #[default] +// Unset, +// BinanceFutures, +// BinanceSpot, +// HyperliquidPerpetuals, +// HyperliquidSpot, +// GateioSpot, +// GateioFutures, +// KucoinSpot, +// KucoinFutures, +// BybitSpot, +// BybitFutures, +// BsxFutures, +// BitgetSpot, +// BitgetFutures, +// BitmexSpot, +// VertexSpot, +// VertexFutures, +// OKXSpot, +// OKXFutures, +// DeribitFutures, +// } +// +// impl SizeMeasurable for Exchange { +// fn aligned_size(&self) -> usize { +// size_of::() +// } +// } +// +// impl Exchange { +// pub const TOTAL: usize = 19; +// +// pub fn is_hyper(&self) -> bool { +// *self == Exchange::HyperliquidPerpetuals +// } +// +// pub fn is_left(&self) -> bool { +// *self == Exchange::BinanceFutures +// } +// +// pub fn is_right(&self) -> bool { +// *self == Exchange::HyperliquidPerpetuals +// } +// +// pub fn all() -> [Self; Self::TOTAL] { +// [ +// Self::BinanceFutures, +// Self::BinanceSpot, +// Self::HyperliquidPerpetuals, +// Self::HyperliquidSpot, +// Self::GateioSpot, +// Self::GateioFutures, +// Self::KucoinSpot, +// Self::KucoinFutures, +// Self::BybitSpot, +// Self::BybitFutures, +// Self::BsxFutures, +// Self::BitgetSpot, +// Self::BitgetFutures, +// Self::BitmexSpot, +// Self::VertexSpot, +// Self::VertexFutures, +// Self::OKXSpot, +// Self::OKXFutures, +// Self::DeribitFutures, +// ] +// } +// +// pub const fn as_str(&self) -> &'static str { +// match self { +// Exchange::Unset => "Unset", +// Exchange::BinanceFutures => "BIN-futures", +// Exchange::BinanceSpot => "BIN-spot", +// Exchange::HyperliquidPerpetuals => "HYP-perps", +// Exchange::HyperliquidSpot => "HYP-spot", +// Exchange::GateioSpot => "GAT-spot", +// Exchange::GateioFutures => "GAT-futures", +// Exchange::KucoinSpot => "KUC-spot", +// Exchange::KucoinFutures => "KUC-futures", +// Exchange::BybitSpot => "BYB-spot", +// Exchange::BybitFutures => "BYB-futures", +// Exchange::BsxFutures => "BSX-futures", +// Exchange::BitgetSpot => "BGT-spot", +// Exchange::BitgetFutures => "BGT-futures", +// Exchange::BitmexSpot => "BMX-spot", +// Exchange::VertexSpot => "VTX-spot", +// Exchange::VertexFutures => "VTX-futures", +// Exchange::OKXSpot => "OKX-spot", +// Exchange::OKXFutures => "OKX-futures", +// Exchange::DeribitFutures => "DBT-futures", +// } +// } +// } +// +// impl std::fmt::Display for Exchange { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// f.write_str(self.as_str()) +// } +// } +// +// impl AsRef for Exchange { +// fn as_ref(&self) -> &str { +// self.as_str() +// } +// } +// +// worktable!( +// name: Spread, +// columns: { +// id: u32 primary_key autoincrement, +// left_timestamp: u64, +// right_timestamp: u64, +// left_exchange: Exchange, +// right_exchange: Exchange, +// left_price: f64, +// right_price: f64, +// spread: f64, +// symbol: String, +// key: String, +// left_key: String, +// right_key: String, +// }, +// indexes: { +// key_idx: key unique, +// symbol_idx: symbol, +// left_key_idx: left_key, +// right_key_idx: right_key +// }, +// queries: { +// in_place: { +// Left(left_price, left_timestamp, spread) by id, +// Right(right_price, right_timestamp, spread) by id, +// } +// update: { +// ExchangeLeft(left_exchange, left_key, key) by id, +// ExchangeRight(right_exchange, right_key, key) by id, +// } +// } +// ); +// +// #[derive(Debug)] +// struct MedianBucket { +// ts: AtomicU64, +// idx: AtomicUsize, +// values: RwLock>, +// } +// +// impl MedianBucket { +// async fn add(&self, now: u64, value: f64) { +// let prev_ts = self.ts.load(Ordering::Acquire); +// +// if prev_ts != now +// && self +// .ts +// .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) +// .is_ok() +// { +// let values = self.values.write().await; +// for val in values.iter() { +// val.store(f64::NAN, Ordering::Relaxed); +// } +// self.idx.store(0, Ordering::Relaxed); +// } +// +// let values = self.values.read().await; +// let idx = self.idx.fetch_add(1, Ordering::Relaxed); +// +// if idx >= values.len() { +// drop(values); +// let mut values = self.values.write().await; +// values.push(AtomicF64::new(value)); +// } else { +// values[idx].store(value, Ordering::Relaxed); +// } +// } +// +// async fn collect(&self, cutoff: u64) -> Option> { +// if self.ts.load(Ordering::Acquire) <= cutoff { +// return None; +// } +// +// let values = self.values.read().await; +// Some( +// values +// .iter() +// .filter_map(|v| { +// let x = v.load(Ordering::Relaxed); +// if x.is_nan() { +// None +// } else { +// Some(x) +// } +// }) +// .collect(), +// ) +// } +// } +// +// #[derive(Debug)] +// struct Bucket { +// ts: AtomicU64, +// sum: AtomicF64, +// count: AtomicUsize, +// } +// +// #[derive(Debug)] +// struct Window { +// secs: u64, +// start: AtomicU64, +// avg_buckets: Box<[Bucket]>, +// med_buckets: Box<[MedianBucket]>, +// } +// +// impl Window { +// fn new(secs: u64) -> Self { +// let mut avg_vec = Vec::with_capacity(secs as usize); +// let mut med_vec = Vec::with_capacity(secs as usize); +// for _ in 0..secs { +// avg_vec.push(Bucket { +// ts: AtomicU64::new(0), +// sum: AtomicF64::new(0.0), +// count: AtomicUsize::new(0), +// }); +// med_vec.push(MedianBucket { +// ts: AtomicU64::new(0), +// values: RwLock::new(Vec::new()), +// idx: AtomicUsize::new(0), +// }); +// } +// Self { +// secs, +// start: AtomicU64::new(0), +// avg_buckets: avg_vec.into_boxed_slice(), +// med_buckets: med_vec.into_boxed_slice(), +// } +// } +// +// async fn add(&self, value: f64) { +// let now = current_epoch_seconds(); +// _ = self +// .start +// .compare_exchange(0, now, Ordering::Release, Ordering::Relaxed); +// +// let Ok(idx) = usize::try_from(now % self.secs) else { +// return; +// }; +// +// let avg_bucket = &self.avg_buckets[idx]; +// let prev_ts = avg_bucket.ts.load(Ordering::Acquire); +// if prev_ts != now +// && avg_bucket +// .ts +// .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) +// .is_ok() +// { +// avg_bucket.sum.store(0.0, Ordering::Relaxed); +// avg_bucket.count.store(0, Ordering::Relaxed); +// } +// avg_bucket.sum.fetch_add(value, Ordering::Relaxed); +// avg_bucket.count.fetch_add(1, Ordering::Relaxed); +// +// let med_bucket = &self.med_buckets[idx]; +// med_bucket.add(now, value).await; +// } +// +// fn average(&self) -> Option { +// let now = current_epoch_seconds(); +// let start = self.start.load(Ordering::Acquire); +// if start == 0 || now < start.saturating_add(self.secs) { +// return None; +// } +// +// let cutoff = now.saturating_sub(self.secs); +// let mut total = 0.0; +// let mut cnt = 0usize; +// +// for bucket in self.avg_buckets.iter() { +// let ts = bucket.ts.load(Ordering::Acquire); +// if ts > cutoff { +// total += bucket.sum.load(Ordering::Relaxed); +// cnt += bucket.count.load(Ordering::Relaxed); +// } +// } +// +// if cnt == 0 { +// None +// } else { +// Some(total / (cnt as f64)) +// } +// } +// +// async fn median(&self) -> Option { +// let now = current_epoch_seconds(); +// let cutoff = now.saturating_sub(self.secs); +// +// let mut all_values = Vec::new(); +// +// for bucket in self.med_buckets.iter() { +// if let Some(mut vals) = bucket.collect(cutoff).await { +// all_values.append(&mut vals); +// } +// } +// +// if all_values.is_empty() { +// return None; +// } +// +// all_values.sort_by(|a, b| a.partial_cmp(b).unwrap()); +// let mid = all_values.len() / 2; +// Some(if all_values.len() % 2 == 0 { +// (all_values[mid - 1] + all_values[mid]) / 2.0 +// } else { +// all_values[mid] +// }) +// } +// } +// +// #[derive(Debug, Clone)] +// pub struct SpreadWindow { +// secs: u64, +// windows: Arc>>, +// } +// +// impl SpreadWindow { +// pub fn new(secs: u64) -> Self { +// Self { +// secs, +// windows: Arc::new(RwLock::new(HashMap::new())), +// } +// } +// +// pub async fn add(&self, symbol: &str, spread: f64) { +// let guard = self.windows.read().await; +// if let Some(window) = guard.get(symbol) { +// window.add(spread).await; +// return; +// } +// drop(guard); +// let mut guard = self.windows.write().await; +// let window = Window::new(self.secs); +// window.add(spread).await; +// guard.insert(symbol.to_string(), window); +// } +// +// pub async fn average(&self, symbol: &str) -> Option { +// let guard = self.windows.read().await; +// guard.get(symbol)?.average() +// } +// +// pub async fn median(&self, symbol: &str) -> Option { +// let guard = self.windows.read().await; +// guard.get(symbol)?.median().await +// } +// } +// +// fn current_epoch_seconds() -> u64 { +// SystemTime::now() +// .duration_since(UNIX_EPOCH) +// .expect("system time before UNIX EPOCH") +// .as_secs() +// } +// +// fn get_basis_point(operand: f64, comparator: f64) -> f64 { +// let left = operand - comparator; +// let right = (operand + comparator) / 2.0; +// (left / right) * 10_000.0 +// } +// +// pub struct SpreadManager { +// spread_wt: Arc, +// windows: Arc>>, +// } +// +// impl SpreadManager { +// pub fn new(spread_wt: Arc) -> Self { +// Self { +// spread_wt, +// windows: Arc::new(RwLock::new(HashMap::new())), +// } +// } +// +// pub async fn record( +// &self, +// exchange: Exchange, +// symbol: &str, +// ask_price: Option, +// bid_price: Option, +// timestamp: u64, +// ) -> eyre::Result<()> { +// let key = format!("{}-{}", symbol, exchange); +// let left = self.spread_wt.select_by_left_key(key.clone()).execute()?; +// +// if left.is_empty() { +// let id = self.spread_wt.get_next_pk().into(); +// if let Err(e) = self.spread_wt.insert(SpreadRow { +// id, +// key: format!("unset-left-{key}"), +// symbol: symbol.to_string(), +// left_timestamp: timestamp, +// left_price: bid_price.unwrap_or_default(), +// left_exchange: exchange, +// left_key: key.clone(), +// spread: 0.0, +// right_exchange: Exchange::Unset, +// right_key: String::new(), +// right_price: 0.0, +// right_timestamp: 0, +// }) { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to write unset left spread: {e}"); +// return Err(e.into()); +// } +// }; +// } +// let right = self.spread_wt.select_by_right_key(key.clone()).execute()?; +// if right.is_empty() { +// let id = self.spread_wt.get_next_pk().into(); +// if let Err(e) = self.spread_wt.insert(SpreadRow { +// id, +// key: format!("unset-right-{key}"), +// symbol: symbol.to_string(), +// right_timestamp: timestamp, +// right_price: ask_price.unwrap_or_default(), +// left_price: 0.0, +// right_exchange: exchange, +// right_key: key.clone(), +// spread: 0.0, +// left_exchange: Exchange::Unset, +// left_key: String::new(), +// left_timestamp: 0, +// }) { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to write unset right spread: {e}"); +// return Err(e.into()); +// } +// }; +// } +// +// let spreads = self +// .spread_wt +// .select_by_symbol(symbol.to_string()) +// .execute()?; +// +// for spread in spreads { +// if spread.left_exchange == Exchange::Unset { +// if spread.right_exchange == exchange { +// continue; +// } +// let query = ExchangeLeftQuery { +// left_exchange: exchange, +// left_key: key.clone(), +// key: format!("{}-{}", key, spread.right_key), +// }; +// let mut unset = spread.clone(); +// unset.id = self.spread_wt.get_next_pk().into(); +// if let Err(e) = self.spread_wt.update_exchange_left(query, spread.id).await { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to update left spread exchange: {e}"); +// return Err(e.into()); +// } +// } else { +// self.windows +// .write() +// .await +// .insert((exchange, spread.right_exchange), SpreadWindow::new(15)); +// } +// if let Err(e) = self.spread_wt.insert(unset.clone()) { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to copy unset left spread: {e}, {unset:?}"); +// return Err(e.into()); +// } +// }; +// } +// +// if spread.right_exchange == Exchange::Unset { +// if spread.left_exchange == exchange { +// continue; +// } +// let query = ExchangeRightQuery { +// right_exchange: exchange, +// right_key: key.clone(), +// key: format!("{}-{}", spread.left_key, key), +// }; +// let mut unset = spread.clone(); +// unset.id = self.spread_wt.get_next_pk().into(); +// if let Err(e) = self.spread_wt.update_exchange_right(query, spread.id).await { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to update right spread exchange: {e}"); +// return Err(e.into()); +// } +// } else { +// self.windows +// .write() +// .await +// .insert((spread.left_exchange, exchange), SpreadWindow::new(15)); +// } +// if let Err(e) = self.spread_wt.insert(unset.clone()) { +// if !matches!(e, WorkTableError::AlreadyExists(..)) { +// println!("failed to copy unset right spread: {e}, {unset:?}"); +// return Err(e.into()); +// } +// }; +// } +// } +// +// let spreads = self +// .spread_wt +// .select_by_symbol(symbol.to_string()) +// .execute()?; +// +// for spread in spreads { +// if spread.right_exchange == Exchange::Unset || spread.left_exchange == Exchange::Unset { +// continue; +// } +// let value = if spread.left_exchange == exchange { +// let bid_price = bid_price.unwrap_or(spread.left_price); +// let value = get_basis_point(bid_price, spread.right_price); +// +// if let Err(e) = self +// .spread_wt +// .update_left_in_place( +// |x| { +// let (left_price, left_timestamp, spread) = x; +// *left_timestamp = timestamp.into(); +// *left_price = bid_price.into(); +// *spread = value.into(); +// }, +// spread.id, +// ) +// .await +// { +// println!("failed to update left spread: {e}"); +// return Err(e); +// }; +// value +// } else if spread.right_exchange == exchange { +// let ask_price = ask_price.unwrap_or(spread.right_price); +// let value = get_basis_point(spread.left_price, ask_price); +// +// if let Err(e) = self +// .spread_wt +// .update_right_in_place( +// |x| { +// let (right_price, right_timestamp, spread) = x; +// *right_timestamp = timestamp.into(); +// *right_price = ask_price.into(); +// *spread = value.into(); +// }, +// spread.id, +// ) +// .await +// { +// { +// println!("failed to update right spread: {e}"); +// return Err(e); +// }; +// }; +// value +// } else { +// continue; +// }; +// +// self.windows +// .read() +// .await +// .get(&(spread.left_exchange, spread.right_exchange)) +// .expect("window should exist") +// .add(symbol, value) +// .await; +// } +// +// Ok(()) +// } +// +// pub async fn median( +// &self, +// left_exchange: Exchange, +// right_exchange: Exchange, +// symbol: &str, +// ) -> Option { +// self.windows +// .read() +// .await +// .get(&(left_exchange, right_exchange))? +// .median(symbol) +// .await +// } +// } +// +// #[tokio::main(flavor = "multi_thread", worker_threads = 16)] +// async fn main() { +// let spread_wt = Arc::new(SpreadWorkTable::default()); +// +// let manager = Arc::new(SpreadManager::new(spread_wt.clone())); +// +// let mut symbols = Vec::new(); +// for _ in 0..5000 { +// let s: String = rand::rng() +// .sample_iter(&Alphanumeric) +// .take(7) +// .map(char::from) +// .collect(); +// symbols.push(s); +// } +// symbols.push(String::new()); +// +// let symbols = Arc::new(symbols); +// +// let ts_seq = Arc::new(AtomicU64::new(0)); +// let ask_seq = Arc::new(AtomicF64::new(0.0)); +// let bid_seq = Arc::new(AtomicF64::new(0.0)); +// +// let mut seq = Arc::new(AtomicUsize::new(0)); +// let mut handles = Vec::new(); +// +// for i in 0..16 { +// let ask_seq = ask_seq.clone(); +// let bid_seq = bid_seq.clone(); +// let ts_seq = ts_seq.clone(); +// let manager = manager.clone(); +// let seq = seq.clone(); +// let symbols = symbols.clone(); +// handles.push(tokio::spawn(async move { +// for _ in 0..500_000 { +// let timestamp = ts_seq.fetch_add(12, Ordering::Relaxed); +// let idx = seq.fetch_add(1, Ordering::Relaxed) % 5000; +// let symbol = symbols[idx].clone(); +// let exchange = match idx % 3 { +// 0 => Exchange::HyperliquidPerpetuals, +// 1 => Exchange::BinanceFutures, +// _ => Exchange::BybitFutures, +// }; +// let ask_price = ask_seq.fetch_add(1.0, Ordering::Relaxed); +// let bid_price = bid_seq.fetch_add(10.0, Ordering::Relaxed); +// // println!("record {exchange} {ask_price} {bid_price}"); +// if let Err(e) = manager +// .record( +// exchange, +// &symbol, +// Some(ask_price), +// Some(bid_price), +// timestamp, +// ) +// .await +// { +// println!("failed to record spread: {e:?}"); +// }; +// } +// })); +// } +// +// for handle in handles { +// println!("{:?}", handle.await); +// } +// } + +fn main() {} diff --git a/src/index/unsized_node.rs b/src/index/unsized_node.rs index 40f7033a..6b62e8d5 100644 --- a/src/index/unsized_node.rs +++ b/src/index/unsized_node.rs @@ -16,7 +16,7 @@ where { inner: Vec, length_capacity: usize, - length_without_deleted: usize, + removed_length: usize, length: usize, } @@ -45,9 +45,23 @@ where inner, length, length_capacity, - length_without_deleted: length, + removed_length: 0, } } + + pub fn rebuild(&mut self) { + self.length = self + .inner + .last() + .expect("should not rebuild on empty node") + .aligned_size(); + self.length += UNSIZED_HEADER_LENGTH as usize; + for value in self.inner.iter() { + self.length += value.aligned_size(); + self.length += UnsizedIndexPageUtility::::slots_value_size(); + } + self.removed_length = 0; + } } impl NodeLike for UnsizedNode @@ -59,7 +73,7 @@ where inner: Vec::new(), length_capacity: capacity, length: UNSIZED_HEADER_LENGTH as usize, - length_without_deleted: UNSIZED_HEADER_LENGTH as usize, + removed_length: 0, } } @@ -68,10 +82,10 @@ where } fn halve(&mut self) -> Self { - let middle_length = (self.length_without_deleted + let middle_length = (self.length + - self.removed_length - (self.max().unwrap().aligned_size() + UNSIZED_HEADER_LENGTH as usize)) / 2; - let current_node_id_size = self.max().unwrap().aligned_size(); let mut middle_variance = f64::INFINITY; let mut ind = false; let mut i = 1; @@ -79,7 +93,9 @@ where let mut middle_idx = 0; let mut iter = self.inner.iter(); while !ind { - let val = iter.next().expect("we should stop before node's end"); + let Some(val) = iter.next() else { + break; + }; current_length += val.aligned_size(); current_length += UnsizedIndexPageUtility::::slots_value_size(); let current_middle_variance = @@ -96,19 +112,8 @@ where } let new_inner = self.inner.split_off(middle_idx); - let node_id_len = new_inner.last().unwrap().aligned_size(); - let split = Self { - inner: new_inner, - length_capacity: self.length_capacity, - length: self.length_without_deleted - (current_node_id_size + current_length) - + node_id_len, - length_without_deleted: self.length_without_deleted - - (current_node_id_size + current_length) - + node_id_len, - }; - self.length = - current_length + self.max().unwrap().aligned_size() + UNSIZED_HEADER_LENGTH as usize; - self.length_without_deleted = self.length; + let split = Self::from_inner(new_inner, self.length_capacity); + self.rebuild(); split } @@ -134,13 +139,10 @@ where // Node id is stored separately too, so we need to count node_id twice self.length -= node_id_len; self.length += value_size; - self.length_without_deleted -= node_id_len; - self.length_without_deleted += value_size; } self.length += value_size; self.length += UnsizedIndexPageUtility::::slots_value_size(); - self.length_without_deleted += value_size; - self.length_without_deleted += UnsizedIndexPageUtility::::slots_value_size(); + (true, idx) } (false, idx) => (false, idx), @@ -172,16 +174,14 @@ where where T: Borrow, { - let node_id_len = self.max().map(|v| v.aligned_size()).unwrap_or(0); // TODO: Refactor this when empty links logic will be added to the page if let Some((val, i)) = NodeLike::delete(&mut self.inner, value) { - let new_node_id_len = self.max().map(|v| v.aligned_size()).unwrap_or(0); - if new_node_id_len != node_id_len { - self.length_without_deleted -= node_id_len; - self.length_without_deleted += new_node_id_len; + self.removed_length += + val.aligned_size() + UnsizedIndexPageUtility::::slots_value_size(); + + if self.removed_length > self.length_capacity / 2 { + self.rebuild() } - self.length_without_deleted -= val.aligned_size(); - self.length_without_deleted -= UnsizedIndexPageUtility::::slots_value_size(); Some((val, i)) } else { None @@ -246,10 +246,10 @@ mod test { assert_eq!(node.length, node.length_capacity); let split = node.halve(); assert_eq!(node.length, 152); - assert_eq!(node.length_without_deleted, 152); + assert_eq!(node.removed_length, 0); assert_eq!(node.inner.len(), 2); assert_eq!(split.length, 136); - assert_eq!(split.length_without_deleted, 136); + assert_eq!(split.removed_length, 0); assert_eq!(split.inner.len(), 1); } @@ -258,10 +258,10 @@ mod test { let mut node = UnsizedNode::::with_capacity(200); node.insert(String::from_utf8(vec![b'1'; 16]).unwrap()); assert_eq!(node.length, 120); - assert_eq!(node.length_without_deleted, 120); + assert_eq!(node.removed_length, 0); node.delete(&String::from_utf8(vec![b'1'; 16]).unwrap()); assert_eq!(node.length, 120); - assert_eq!(node.length_without_deleted, 64); + assert_eq!(node.removed_length, 32); } #[test] @@ -270,10 +270,10 @@ mod test { node.insert(String::from_utf8(vec![b'1'; 16]).unwrap()); node.insert(String::from_utf8(vec![b'2'; 24]).unwrap()); assert_eq!(node.length, 168); - assert_eq!(node.length_without_deleted, 168); + assert_eq!(node.removed_length, 0); node.delete(&String::from_utf8(vec![b'2'; 24]).unwrap()); assert_eq!(node.length, 168); - assert_eq!(node.length_without_deleted, 120); + assert_eq!(node.removed_length, 40); } #[test] diff --git a/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx b/tests/data/expected/space_index_unsized/indexset/process_create_node.wt.idx index b95bb2ab788d6418005db6745ff7dec7c7e47a83..4b55de097f548bc941500660aa72368299c76496 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBq1k@&mHj#ZPb3H& delta 21 ccmZo@U~Xt&-Vo5RIi#V%exd-&4lQiv1A)O~eQA delta 21 ccmZo@U~Xt&-Vo5RIiw-Pexd-&O=vS$qz2_PS$M@-z>4rKx(tkfhmGO!OagYD(C?Kf%*}U diff --git a/tests/data/expected/space_index_unsized/process_create_node.wt.idx b/tests/data/expected/space_index_unsized/process_create_node.wt.idx index 4161a40fe2a7b11d7c1827768b84235444a35623..4601d2a9d8ab49ce266afc1877d79755fcb83a25 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBq1k@&mHj#ZPb3H& delta 21 ccmZo@U~Xt&-Vo5RIi#V%exd-&MxShW!BmP6P+| delta 21 ccmZo@U~Xt&-Vo5RIiw-Rexd-& diff --git a/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at_big_amount.wt.idx index b4ad84ef137f2f09662fa0da4e6e9a818910d994..48cd3566fde21610c68154e7182081f02daa7243 100644 GIT binary patch delta 22 dcmZo@U~Xt&-VoBj#K5pQsNs>$W{b5agaBNr2$BE* delta 22 ecmZo@U~Xt&-VoBTIilf#&O`y0%@%7<2mt_Tj0rga diff --git a/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx b/tests/data/expected/space_index_unsized/process_insert_at_removed_place.wt.idx index ff84db94e1a51ca445b7fd8a94edf55388724a9e..2edd226eec08bfdd1b09ba3c1b17254058773048 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBf!%&`#QqfkOGF2t delta 21 ccmZo@U~Xt&-Vo5RIi!Kbexd-&4lQiv1A)O~eQA delta 21 ccmZo@U~Xt&-Vo5RIiw-Pexd-&MxShW!BmP6P+| delta 21 ccmZo@U~Xt&-Vo5RIiw-Rexd-& diff --git a/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx b/tests/data/expected/space_index_unsized/process_remove_at_node_id.wt.idx index 531577f64e95de315a2a567feb40ec89235d686b..bd772e9aba840151edf9e838997c92b8e43c1f35 100644 GIT binary patch delta 20 bcmZo@U~Xt&-r&)|$gtV9A>DrRhW!BmO^gTd delta 21 ccmZo@U~Xt&-r&)&*{31Jexd-&|#44ch8T=0^ diff --git a/tests/data/space_index_unsized/indexset/process_create_node.wt.idx b/tests/data/space_index_unsized/indexset/process_create_node.wt.idx index b95bb2ab788d6418005db6745ff7dec7c7e47a83..4b55de097f548bc941500660aa72368299c76496 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBq1k@&mHj#ZPb3H& delta 21 ccmZo@U~Xt&-Vo5RIi#V%exd-&4lQiv1A)O~eQA delta 21 ccmZo@U~Xt&-Vo5RIiw-Pexd-&O=vS$qz2_PS$M@-z>4rKx(tkfhmGO!OagYD(C?Kf%*}U diff --git a/tests/data/space_index_unsized/process_create_node.wt.idx b/tests/data/space_index_unsized/process_create_node.wt.idx index 4161a40fe2a7b11d7c1827768b84235444a35623..4601d2a9d8ab49ce266afc1877d79755fcb83a25 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBq1k@&mHj#ZPb3H& delta 21 ccmZo@U~Xt&-Vo5RIi#V%exd-&MxShW!BmP6P+| delta 21 ccmZo@U~Xt&-Vo5RIiw-Rexd-& diff --git a/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx b/tests/data/space_index_unsized/process_insert_at_big_amount.wt.idx index b4ad84ef137f2f09662fa0da4e6e9a818910d994..48cd3566fde21610c68154e7182081f02daa7243 100644 GIT binary patch delta 22 dcmZo@U~Xt&-VoBj#K5pQsNs>$W{b5agaBNr2$BE* delta 22 ecmZo@U~Xt&-VoBTIilf#&O`y0%@%7<2mt_Tj0rga diff --git a/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx b/tests/data/space_index_unsized/process_insert_at_removed_place.wt.idx index ff84db94e1a51ca445b7fd8a94edf55388724a9e..2edd226eec08bfdd1b09ba3c1b17254058773048 100644 GIT binary patch delta 20 bcmZo@U~Xt&-Vo5h$gtVBf!%&`#QqfkOGF2t delta 21 ccmZo@U~Xt&-Vo5RIi!Kbexd-&4lQiv1A)O~eQA delta 21 ccmZo@U~Xt&-Vo5RIiw-Pexd-&MxShW!BmP6P+| delta 21 ccmZo@U~Xt&-Vo5RIiw-Rexd-& diff --git a/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx b/tests/data/space_index_unsized/process_remove_at_node_id.wt.idx index 531577f64e95de315a2a567feb40ec89235d686b..bd772e9aba840151edf9e838997c92b8e43c1f35 100644 GIT binary patch delta 20 bcmZo@U~Xt&-r&)|$gtV9A>DrRhW!BmO^gTd delta 21 ccmZo@U~Xt&-r&)&*{31Jexd-&|#44ch8T=0^ diff --git a/tests/worktable/index/insert.rs b/tests/worktable/index/insert.rs index 833c05de..93e67a2c 100644 --- a/tests/worktable/index/insert.rs +++ b/tests/worktable/index/insert.rs @@ -236,6 +236,53 @@ fn insert_when_unique_violated() { h.join().unwrap(); } +#[test] +fn insert_after_unique_violated() { + let table = Arc::new(TestWorkTable::default()); + + let row = TestRow { + id: table.get_next_pk().into(), + val: 13, + attr1: "Attribute".to_string(), + attr2: i16::MIN, + attr3: 123456789, + attr4: "Attribute4".to_string(), + }; + let _ = table.insert(row.clone()).unwrap(); + + let row_new_attr_2 = 128; + let row_new_attr_4 = row.attr4.clone(); + + for _ in 0..5_000 { + let row = TestRow { + id: table.get_next_pk().into(), + val: 13, + attr1: "Attribute".to_string(), + attr2: row_new_attr_2, + attr3: 123456789, + attr4: row_new_attr_4.clone(), + }; + assert!(table.insert(row).is_err()); + } + + for i in 2..5_000 { + let attr2 = if i % 2 == 0 { + i as i16 / 2 + } else { + -i as i16 / 2 + }; + let row = TestRow { + id: table.get_next_pk().into(), + val: 13, + attr1: "Attribute".to_string(), + attr2, + attr3: 123456789, + attr4: format!("{i}"), + }; + assert!(table.insert(row).is_ok()); + } +} + #[test] fn insert_when_pk_violated() { let table = Arc::new(TestWorkTable::default()); @@ -275,3 +322,62 @@ fn insert_when_pk_violated() { h.join().unwrap(); } + +worktable!( + name: TestStrings, + columns: { + id: u64 primary_key autoincrement, + attr1: String, + attr2: String, + attr3: String, + }, + indexes: { + attr1_idx: attr1, + attr2_idx: attr2 unique, + attr4_idx: attr3 unique, + } +); + +#[test] +fn insert_after_unique_violated_strings() { + let table = Arc::new(TestStringsWorkTable::default()); + + let row = TestStringsRow { + id: table.get_next_pk().into(), + attr1: "Attribute_1".to_string(), + attr2: "Attribute_2".to_string(), + attr3: "Attribute_3".to_string(), + }; + let _ = table.insert(row.clone()).unwrap(); + + let row_new_attr_3 = row.attr3.clone(); + for _ in 0..5_000 { + let row = TestStringsRow { + id: table.get_next_pk().into(), + attr1: "Attribute_1_NEW".to_string(), + attr2: "Attribute_2_NEW".to_string(), + attr3: row_new_attr_3.clone(), + }; + assert!(table.insert(row).is_err()); + } + let row_new_attr_2 = row.attr2.clone(); + for i in 0..5_000 { + let row = TestStringsRow { + id: table.get_next_pk().into(), + attr1: "Attribute_1_NEW".to_string(), + attr2: row_new_attr_2.clone(), + attr3: format!("Attribute_3_{i}"), + }; + assert!(table.insert(row).is_err()); + } + + for i in 0..5_000 { + let row = TestStringsRow { + id: table.get_next_pk().into(), + attr1: format!("Attribute_1_{i}"), + attr2: format!("Attribute_2_{i}"), + attr3: format!("Attribute_3_{i}"), + }; + assert!(table.insert(row).is_ok()); + } +} From 6d8a551276f4998ec5d3399527670a2072ba7593 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 8 Aug 2025 00:59:52 +0300 Subject: [PATCH 3/4] clippy --- tests/worktable/index/update_full.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/worktable/index/update_full.rs b/tests/worktable/index/update_full.rs index b4411115..fa57ca17 100644 --- a/tests/worktable/index/update_full.rs +++ b/tests/worktable/index/update_full.rs @@ -306,7 +306,7 @@ async fn update_by_full_row_with_reinsert_and_primary_key_violation() { }; test_table.insert(row2.clone()).unwrap(); let mut update = row1.clone(); - update.id = row2.id.clone(); + update.id = row2.id; update.attr1 = "TEST_______________________1".to_string(); assert!(test_table.update(update).await.is_err()); @@ -389,7 +389,7 @@ async fn update_by_full_row_with_secondary_unique_violation() { }; test_table.insert(row2.clone()).unwrap(); let mut update = row1.clone(); - update.attr2 = row2.attr2.clone(); + update.attr2 = row2.attr2; assert!(test_table.update(update).await.is_err()); assert_eq!(test_table.select(row1.id).unwrap(), row1); From 345d085777df8d499c841498e46af32d631144fd Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 8 Aug 2025 01:38:40 +0300 Subject: [PATCH 4/4] bump --- Cargo.toml | 6 +- examples/Cargo.toml | 2 +- examples/src/main.rs | 693 ------------------------------------------- 3 files changed, 4 insertions(+), 697 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cf02da5c..e665a16a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.7.2" +version = "0.8.0" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -27,9 +27,9 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -# data_bucket = "0.2.10" +data_bucket = "0.3.0" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -data_bucket = { path = "../DataBucket", version = "0.2.10" } +# data_bucket = { path = "../DataBucket", version = "0.2.10" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 417d9a10..26698b74 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -worktable = { path = "../", version = "0.7.2" } +worktable = { path = "../", version = "0.8.0" } tokio = { version = "1.39.2", features = ["full"] } rkyv = { version = "0.8.9", features = ["uuid-1"] } eyre = "0.6.12" diff --git a/examples/src/main.rs b/examples/src/main.rs index 012bbf0d..f328e4d9 100644 --- a/examples/src/main.rs +++ b/examples/src/main.rs @@ -1,694 +1 @@ -// use eyre::bail; -// use rkyv::{Archive, Serialize}; -// use uuid::timestamp; -// use worktable::prelude::{MemStat, SizeMeasurable}; -// -// use std::{collections::HashMap, sync::Arc}; -// -// use tokio::sync::RwLock; -// use worktable::{select::SelectQueryExecutor, WorkTableError}; -// -// use atomic_float::AtomicF64; -// use futures::executor::block_on; -// use rand::distr::Alphanumeric; -// use rand::Rng; -// use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -// use std::time::{SystemTime, UNIX_EPOCH}; -// use worktable::prelude::*; -// use worktable::worktable; -// -// #[derive( -// Archive, -// Clone, -// Copy, -// Debug, -// Default, -// serde::Deserialize, -// rkyv::Deserialize, -// Eq, -// Hash, -// MemStat, -// Ord, -// Serialize, -// PartialEq, -// PartialOrd, -// )] -// #[rkyv(compare(PartialEq), derive(Debug))] -// pub enum Exchange { -// #[default] -// Unset, -// BinanceFutures, -// BinanceSpot, -// HyperliquidPerpetuals, -// HyperliquidSpot, -// GateioSpot, -// GateioFutures, -// KucoinSpot, -// KucoinFutures, -// BybitSpot, -// BybitFutures, -// BsxFutures, -// BitgetSpot, -// BitgetFutures, -// BitmexSpot, -// VertexSpot, -// VertexFutures, -// OKXSpot, -// OKXFutures, -// DeribitFutures, -// } -// -// impl SizeMeasurable for Exchange { -// fn aligned_size(&self) -> usize { -// size_of::() -// } -// } -// -// impl Exchange { -// pub const TOTAL: usize = 19; -// -// pub fn is_hyper(&self) -> bool { -// *self == Exchange::HyperliquidPerpetuals -// } -// -// pub fn is_left(&self) -> bool { -// *self == Exchange::BinanceFutures -// } -// -// pub fn is_right(&self) -> bool { -// *self == Exchange::HyperliquidPerpetuals -// } -// -// pub fn all() -> [Self; Self::TOTAL] { -// [ -// Self::BinanceFutures, -// Self::BinanceSpot, -// Self::HyperliquidPerpetuals, -// Self::HyperliquidSpot, -// Self::GateioSpot, -// Self::GateioFutures, -// Self::KucoinSpot, -// Self::KucoinFutures, -// Self::BybitSpot, -// Self::BybitFutures, -// Self::BsxFutures, -// Self::BitgetSpot, -// Self::BitgetFutures, -// Self::BitmexSpot, -// Self::VertexSpot, -// Self::VertexFutures, -// Self::OKXSpot, -// Self::OKXFutures, -// Self::DeribitFutures, -// ] -// } -// -// pub const fn as_str(&self) -> &'static str { -// match self { -// Exchange::Unset => "Unset", -// Exchange::BinanceFutures => "BIN-futures", -// Exchange::BinanceSpot => "BIN-spot", -// Exchange::HyperliquidPerpetuals => "HYP-perps", -// Exchange::HyperliquidSpot => "HYP-spot", -// Exchange::GateioSpot => "GAT-spot", -// Exchange::GateioFutures => "GAT-futures", -// Exchange::KucoinSpot => "KUC-spot", -// Exchange::KucoinFutures => "KUC-futures", -// Exchange::BybitSpot => "BYB-spot", -// Exchange::BybitFutures => "BYB-futures", -// Exchange::BsxFutures => "BSX-futures", -// Exchange::BitgetSpot => "BGT-spot", -// Exchange::BitgetFutures => "BGT-futures", -// Exchange::BitmexSpot => "BMX-spot", -// Exchange::VertexSpot => "VTX-spot", -// Exchange::VertexFutures => "VTX-futures", -// Exchange::OKXSpot => "OKX-spot", -// Exchange::OKXFutures => "OKX-futures", -// Exchange::DeribitFutures => "DBT-futures", -// } -// } -// } -// -// impl std::fmt::Display for Exchange { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// f.write_str(self.as_str()) -// } -// } -// -// impl AsRef for Exchange { -// fn as_ref(&self) -> &str { -// self.as_str() -// } -// } -// -// worktable!( -// name: Spread, -// columns: { -// id: u32 primary_key autoincrement, -// left_timestamp: u64, -// right_timestamp: u64, -// left_exchange: Exchange, -// right_exchange: Exchange, -// left_price: f64, -// right_price: f64, -// spread: f64, -// symbol: String, -// key: String, -// left_key: String, -// right_key: String, -// }, -// indexes: { -// key_idx: key unique, -// symbol_idx: symbol, -// left_key_idx: left_key, -// right_key_idx: right_key -// }, -// queries: { -// in_place: { -// Left(left_price, left_timestamp, spread) by id, -// Right(right_price, right_timestamp, spread) by id, -// } -// update: { -// ExchangeLeft(left_exchange, left_key, key) by id, -// ExchangeRight(right_exchange, right_key, key) by id, -// } -// } -// ); -// -// #[derive(Debug)] -// struct MedianBucket { -// ts: AtomicU64, -// idx: AtomicUsize, -// values: RwLock>, -// } -// -// impl MedianBucket { -// async fn add(&self, now: u64, value: f64) { -// let prev_ts = self.ts.load(Ordering::Acquire); -// -// if prev_ts != now -// && self -// .ts -// .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) -// .is_ok() -// { -// let values = self.values.write().await; -// for val in values.iter() { -// val.store(f64::NAN, Ordering::Relaxed); -// } -// self.idx.store(0, Ordering::Relaxed); -// } -// -// let values = self.values.read().await; -// let idx = self.idx.fetch_add(1, Ordering::Relaxed); -// -// if idx >= values.len() { -// drop(values); -// let mut values = self.values.write().await; -// values.push(AtomicF64::new(value)); -// } else { -// values[idx].store(value, Ordering::Relaxed); -// } -// } -// -// async fn collect(&self, cutoff: u64) -> Option> { -// if self.ts.load(Ordering::Acquire) <= cutoff { -// return None; -// } -// -// let values = self.values.read().await; -// Some( -// values -// .iter() -// .filter_map(|v| { -// let x = v.load(Ordering::Relaxed); -// if x.is_nan() { -// None -// } else { -// Some(x) -// } -// }) -// .collect(), -// ) -// } -// } -// -// #[derive(Debug)] -// struct Bucket { -// ts: AtomicU64, -// sum: AtomicF64, -// count: AtomicUsize, -// } -// -// #[derive(Debug)] -// struct Window { -// secs: u64, -// start: AtomicU64, -// avg_buckets: Box<[Bucket]>, -// med_buckets: Box<[MedianBucket]>, -// } -// -// impl Window { -// fn new(secs: u64) -> Self { -// let mut avg_vec = Vec::with_capacity(secs as usize); -// let mut med_vec = Vec::with_capacity(secs as usize); -// for _ in 0..secs { -// avg_vec.push(Bucket { -// ts: AtomicU64::new(0), -// sum: AtomicF64::new(0.0), -// count: AtomicUsize::new(0), -// }); -// med_vec.push(MedianBucket { -// ts: AtomicU64::new(0), -// values: RwLock::new(Vec::new()), -// idx: AtomicUsize::new(0), -// }); -// } -// Self { -// secs, -// start: AtomicU64::new(0), -// avg_buckets: avg_vec.into_boxed_slice(), -// med_buckets: med_vec.into_boxed_slice(), -// } -// } -// -// async fn add(&self, value: f64) { -// let now = current_epoch_seconds(); -// _ = self -// .start -// .compare_exchange(0, now, Ordering::Release, Ordering::Relaxed); -// -// let Ok(idx) = usize::try_from(now % self.secs) else { -// return; -// }; -// -// let avg_bucket = &self.avg_buckets[idx]; -// let prev_ts = avg_bucket.ts.load(Ordering::Acquire); -// if prev_ts != now -// && avg_bucket -// .ts -// .compare_exchange(prev_ts, now, Ordering::AcqRel, Ordering::Acquire) -// .is_ok() -// { -// avg_bucket.sum.store(0.0, Ordering::Relaxed); -// avg_bucket.count.store(0, Ordering::Relaxed); -// } -// avg_bucket.sum.fetch_add(value, Ordering::Relaxed); -// avg_bucket.count.fetch_add(1, Ordering::Relaxed); -// -// let med_bucket = &self.med_buckets[idx]; -// med_bucket.add(now, value).await; -// } -// -// fn average(&self) -> Option { -// let now = current_epoch_seconds(); -// let start = self.start.load(Ordering::Acquire); -// if start == 0 || now < start.saturating_add(self.secs) { -// return None; -// } -// -// let cutoff = now.saturating_sub(self.secs); -// let mut total = 0.0; -// let mut cnt = 0usize; -// -// for bucket in self.avg_buckets.iter() { -// let ts = bucket.ts.load(Ordering::Acquire); -// if ts > cutoff { -// total += bucket.sum.load(Ordering::Relaxed); -// cnt += bucket.count.load(Ordering::Relaxed); -// } -// } -// -// if cnt == 0 { -// None -// } else { -// Some(total / (cnt as f64)) -// } -// } -// -// async fn median(&self) -> Option { -// let now = current_epoch_seconds(); -// let cutoff = now.saturating_sub(self.secs); -// -// let mut all_values = Vec::new(); -// -// for bucket in self.med_buckets.iter() { -// if let Some(mut vals) = bucket.collect(cutoff).await { -// all_values.append(&mut vals); -// } -// } -// -// if all_values.is_empty() { -// return None; -// } -// -// all_values.sort_by(|a, b| a.partial_cmp(b).unwrap()); -// let mid = all_values.len() / 2; -// Some(if all_values.len() % 2 == 0 { -// (all_values[mid - 1] + all_values[mid]) / 2.0 -// } else { -// all_values[mid] -// }) -// } -// } -// -// #[derive(Debug, Clone)] -// pub struct SpreadWindow { -// secs: u64, -// windows: Arc>>, -// } -// -// impl SpreadWindow { -// pub fn new(secs: u64) -> Self { -// Self { -// secs, -// windows: Arc::new(RwLock::new(HashMap::new())), -// } -// } -// -// pub async fn add(&self, symbol: &str, spread: f64) { -// let guard = self.windows.read().await; -// if let Some(window) = guard.get(symbol) { -// window.add(spread).await; -// return; -// } -// drop(guard); -// let mut guard = self.windows.write().await; -// let window = Window::new(self.secs); -// window.add(spread).await; -// guard.insert(symbol.to_string(), window); -// } -// -// pub async fn average(&self, symbol: &str) -> Option { -// let guard = self.windows.read().await; -// guard.get(symbol)?.average() -// } -// -// pub async fn median(&self, symbol: &str) -> Option { -// let guard = self.windows.read().await; -// guard.get(symbol)?.median().await -// } -// } -// -// fn current_epoch_seconds() -> u64 { -// SystemTime::now() -// .duration_since(UNIX_EPOCH) -// .expect("system time before UNIX EPOCH") -// .as_secs() -// } -// -// fn get_basis_point(operand: f64, comparator: f64) -> f64 { -// let left = operand - comparator; -// let right = (operand + comparator) / 2.0; -// (left / right) * 10_000.0 -// } -// -// pub struct SpreadManager { -// spread_wt: Arc, -// windows: Arc>>, -// } -// -// impl SpreadManager { -// pub fn new(spread_wt: Arc) -> Self { -// Self { -// spread_wt, -// windows: Arc::new(RwLock::new(HashMap::new())), -// } -// } -// -// pub async fn record( -// &self, -// exchange: Exchange, -// symbol: &str, -// ask_price: Option, -// bid_price: Option, -// timestamp: u64, -// ) -> eyre::Result<()> { -// let key = format!("{}-{}", symbol, exchange); -// let left = self.spread_wt.select_by_left_key(key.clone()).execute()?; -// -// if left.is_empty() { -// let id = self.spread_wt.get_next_pk().into(); -// if let Err(e) = self.spread_wt.insert(SpreadRow { -// id, -// key: format!("unset-left-{key}"), -// symbol: symbol.to_string(), -// left_timestamp: timestamp, -// left_price: bid_price.unwrap_or_default(), -// left_exchange: exchange, -// left_key: key.clone(), -// spread: 0.0, -// right_exchange: Exchange::Unset, -// right_key: String::new(), -// right_price: 0.0, -// right_timestamp: 0, -// }) { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to write unset left spread: {e}"); -// return Err(e.into()); -// } -// }; -// } -// let right = self.spread_wt.select_by_right_key(key.clone()).execute()?; -// if right.is_empty() { -// let id = self.spread_wt.get_next_pk().into(); -// if let Err(e) = self.spread_wt.insert(SpreadRow { -// id, -// key: format!("unset-right-{key}"), -// symbol: symbol.to_string(), -// right_timestamp: timestamp, -// right_price: ask_price.unwrap_or_default(), -// left_price: 0.0, -// right_exchange: exchange, -// right_key: key.clone(), -// spread: 0.0, -// left_exchange: Exchange::Unset, -// left_key: String::new(), -// left_timestamp: 0, -// }) { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to write unset right spread: {e}"); -// return Err(e.into()); -// } -// }; -// } -// -// let spreads = self -// .spread_wt -// .select_by_symbol(symbol.to_string()) -// .execute()?; -// -// for spread in spreads { -// if spread.left_exchange == Exchange::Unset { -// if spread.right_exchange == exchange { -// continue; -// } -// let query = ExchangeLeftQuery { -// left_exchange: exchange, -// left_key: key.clone(), -// key: format!("{}-{}", key, spread.right_key), -// }; -// let mut unset = spread.clone(); -// unset.id = self.spread_wt.get_next_pk().into(); -// if let Err(e) = self.spread_wt.update_exchange_left(query, spread.id).await { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to update left spread exchange: {e}"); -// return Err(e.into()); -// } -// } else { -// self.windows -// .write() -// .await -// .insert((exchange, spread.right_exchange), SpreadWindow::new(15)); -// } -// if let Err(e) = self.spread_wt.insert(unset.clone()) { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to copy unset left spread: {e}, {unset:?}"); -// return Err(e.into()); -// } -// }; -// } -// -// if spread.right_exchange == Exchange::Unset { -// if spread.left_exchange == exchange { -// continue; -// } -// let query = ExchangeRightQuery { -// right_exchange: exchange, -// right_key: key.clone(), -// key: format!("{}-{}", spread.left_key, key), -// }; -// let mut unset = spread.clone(); -// unset.id = self.spread_wt.get_next_pk().into(); -// if let Err(e) = self.spread_wt.update_exchange_right(query, spread.id).await { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to update right spread exchange: {e}"); -// return Err(e.into()); -// } -// } else { -// self.windows -// .write() -// .await -// .insert((spread.left_exchange, exchange), SpreadWindow::new(15)); -// } -// if let Err(e) = self.spread_wt.insert(unset.clone()) { -// if !matches!(e, WorkTableError::AlreadyExists(..)) { -// println!("failed to copy unset right spread: {e}, {unset:?}"); -// return Err(e.into()); -// } -// }; -// } -// } -// -// let spreads = self -// .spread_wt -// .select_by_symbol(symbol.to_string()) -// .execute()?; -// -// for spread in spreads { -// if spread.right_exchange == Exchange::Unset || spread.left_exchange == Exchange::Unset { -// continue; -// } -// let value = if spread.left_exchange == exchange { -// let bid_price = bid_price.unwrap_or(spread.left_price); -// let value = get_basis_point(bid_price, spread.right_price); -// -// if let Err(e) = self -// .spread_wt -// .update_left_in_place( -// |x| { -// let (left_price, left_timestamp, spread) = x; -// *left_timestamp = timestamp.into(); -// *left_price = bid_price.into(); -// *spread = value.into(); -// }, -// spread.id, -// ) -// .await -// { -// println!("failed to update left spread: {e}"); -// return Err(e); -// }; -// value -// } else if spread.right_exchange == exchange { -// let ask_price = ask_price.unwrap_or(spread.right_price); -// let value = get_basis_point(spread.left_price, ask_price); -// -// if let Err(e) = self -// .spread_wt -// .update_right_in_place( -// |x| { -// let (right_price, right_timestamp, spread) = x; -// *right_timestamp = timestamp.into(); -// *right_price = ask_price.into(); -// *spread = value.into(); -// }, -// spread.id, -// ) -// .await -// { -// { -// println!("failed to update right spread: {e}"); -// return Err(e); -// }; -// }; -// value -// } else { -// continue; -// }; -// -// self.windows -// .read() -// .await -// .get(&(spread.left_exchange, spread.right_exchange)) -// .expect("window should exist") -// .add(symbol, value) -// .await; -// } -// -// Ok(()) -// } -// -// pub async fn median( -// &self, -// left_exchange: Exchange, -// right_exchange: Exchange, -// symbol: &str, -// ) -> Option { -// self.windows -// .read() -// .await -// .get(&(left_exchange, right_exchange))? -// .median(symbol) -// .await -// } -// } -// -// #[tokio::main(flavor = "multi_thread", worker_threads = 16)] -// async fn main() { -// let spread_wt = Arc::new(SpreadWorkTable::default()); -// -// let manager = Arc::new(SpreadManager::new(spread_wt.clone())); -// -// let mut symbols = Vec::new(); -// for _ in 0..5000 { -// let s: String = rand::rng() -// .sample_iter(&Alphanumeric) -// .take(7) -// .map(char::from) -// .collect(); -// symbols.push(s); -// } -// symbols.push(String::new()); -// -// let symbols = Arc::new(symbols); -// -// let ts_seq = Arc::new(AtomicU64::new(0)); -// let ask_seq = Arc::new(AtomicF64::new(0.0)); -// let bid_seq = Arc::new(AtomicF64::new(0.0)); -// -// let mut seq = Arc::new(AtomicUsize::new(0)); -// let mut handles = Vec::new(); -// -// for i in 0..16 { -// let ask_seq = ask_seq.clone(); -// let bid_seq = bid_seq.clone(); -// let ts_seq = ts_seq.clone(); -// let manager = manager.clone(); -// let seq = seq.clone(); -// let symbols = symbols.clone(); -// handles.push(tokio::spawn(async move { -// for _ in 0..500_000 { -// let timestamp = ts_seq.fetch_add(12, Ordering::Relaxed); -// let idx = seq.fetch_add(1, Ordering::Relaxed) % 5000; -// let symbol = symbols[idx].clone(); -// let exchange = match idx % 3 { -// 0 => Exchange::HyperliquidPerpetuals, -// 1 => Exchange::BinanceFutures, -// _ => Exchange::BybitFutures, -// }; -// let ask_price = ask_seq.fetch_add(1.0, Ordering::Relaxed); -// let bid_price = bid_seq.fetch_add(10.0, Ordering::Relaxed); -// // println!("record {exchange} {ask_price} {bid_price}"); -// if let Err(e) = manager -// .record( -// exchange, -// &symbol, -// Some(ask_price), -// Some(bid_price), -// timestamp, -// ) -// .await -// { -// println!("failed to record spread: {e:?}"); -// }; -// } -// })); -// } -// -// for handle in handles { -// println!("{:?}", handle.await); -// } -// } - fn main() {}