-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(pools): rewrite sqlite manager to run without locks using scc #2082
base: 02-25-fix_add_metrics_for_wf_engine
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR rewrites the SQLite connection manager to use lock-free concurrency with the SCC crate instead of traditional mutex locks.
- Adds
scc
crate (v2.3.3) toCargo.toml
for lock-free concurrent data structures - Replaces
HashMap
withscc::HashMap
for writer pools to eliminate contention - Restructures connection management with
OnceCell
for lazy initialization - Temporarily disables snapshot functionality with
todo!()
placeholders - Exposes Redis (6379), ClickHouse (9300, 9301), and NATS (4222) ports in docker-compose.yml for development
4 file(s) reviewed, 6 comment(s)
Edit PR Review Bot Settings | Greptile
//#[tokio::test] | ||
//async fn sqlite_snapshot_idempotence() -> GlobalResult<()> { | ||
// let (pools, db_name) = setup_test_db().await?; | ||
// | ||
// // Create initial database | ||
// let db = pools.sqlite(&db_name, false).await?; | ||
// { | ||
// let mut conn = db.conn().await?; | ||
// sqlx::query("CREATE TABLE test (id INTEGER PRIMARY KEY, value TEXT)") | ||
// .execute(&mut *conn) | ||
// .await?; | ||
// } | ||
// | ||
// // First snapshot should return true since we made changes | ||
// let snapshot_result = db.snapshot().await?; | ||
// assert!( | ||
// snapshot_result, | ||
// "First snapshot should return true due to table creation" | ||
// ); | ||
// | ||
// // Second snapshot with no changes should return false | ||
// let snapshot_result = db.snapshot().await?; | ||
// assert!( | ||
// !snapshot_result, | ||
// "Second snapshot should return false since no changes were made" | ||
// ); | ||
// | ||
// // Make a change to the database | ||
// { | ||
// let mut conn = db.conn().await?; | ||
// sqlx::query("INSERT INTO test (value) VALUES (?)") | ||
// .bind("test_value") | ||
// .execute(&mut *conn) | ||
// .await?; | ||
// } | ||
// | ||
// // Third snapshot should return true due to the INSERT | ||
// let snapshot_result = db.snapshot().await?; | ||
// assert!( | ||
// snapshot_result, | ||
// "Third snapshot should return true due to INSERT" | ||
// ); | ||
// | ||
// // Fourth snapshot with no new changes should return false | ||
// let snapshot_result = db.snapshot().await?; | ||
// assert!( | ||
// !snapshot_result, | ||
// "Fourth snapshot should return false since no new changes were made" | ||
// ); | ||
// | ||
// Ok(()) | ||
//} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: This test for snapshot idempotence has been commented out but not replaced. The test verifies important behavior: that snapshots only report changes when actual database modifications occur. Without this test, there's no verification that the new SCC-based implementation maintains this behavior correctly.
//// Return the snapshot result, not the rollback result | ||
//snapshot_result.map(|x| x.is_some()) | ||
|
||
Ok(todo!()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Returning Ok(todo!())
will panic at runtime. This needs to be implemented before merging.
async fn evict_with_key(&self, key: &KeyPacked) -> GlobalResult<()> { | ||
tracing::debug!("evicting sqlite database"); | ||
|
||
let key_packed = Arc::new(key.pack_to_vec()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Creating a new Arc here is unnecessary since key
is already an Arc. This creates an extra allocation.
let key_packed = Arc::new(key.pack_to_vec()); | |
let key_packed = key.clone(); |
/// GC loop for SqlitePoolManager | ||
async fn manager_gc_loop(self: Arc<Self>, mut shutdown: broadcast::Receiver<()>) { | ||
// TODO: | ||
//let mut interval = tokio::time::interval(GC_INTERVAL); | ||
// | ||
//loop { | ||
// tokio::select! { | ||
// _ = interval.tick() => {}, | ||
// Ok(_) = shutdown.recv() => { | ||
// tracing::debug!("shutting down sqlite pool manager"); | ||
// break; | ||
// } | ||
// } | ||
// | ||
// // Anything last used before this instant will be removed | ||
// let expire_ts = Instant::now() - POOL_TTL; | ||
// | ||
// // Remove pools | ||
// { | ||
// let mut writer_pools_guard = self.writer_pools.lock().await; | ||
// let mut removed = 0; | ||
// | ||
// // Find entries to remove | ||
// let mut to_remove = Vec::new(); | ||
// for (k, v) in writer_pools_guard.iter() { | ||
// // TODO: Figure out how to do this without a mutex | ||
// if *v.last_access.lock().await <= expire_ts { | ||
// // Validate that this is the only reference to the database | ||
// let ref_count = Arc::strong_count(&v); | ||
// if ref_count == 1 { | ||
// to_remove.push(k.clone()); | ||
// } else { | ||
// tracing::warn!(?ref_count, ?k, "sqlite pool is expired and should have no references, but still has references"); | ||
// } | ||
// } | ||
// } | ||
// | ||
// // Evict each entry | ||
// for key in to_remove { | ||
// match self | ||
// .evict_database_inner(&key, &mut writer_pools_guard) | ||
// .await | ||
// { | ||
// Ok(_) => { | ||
// removed += 1; | ||
// } | ||
// Err(err) => { | ||
// tracing::error!( | ||
// ?err, | ||
// ?key, | ||
// "failed to evict sqlite db, will retry later" | ||
// ); | ||
// } | ||
// } | ||
// } | ||
// | ||
// tracing::debug!(?removed, total = ?writer_pools_guard.len(), "gc sqlite pools"); | ||
// } | ||
//} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The GC loop is commented out but still spawned at line 209. This will result in a task that does nothing but wait for shutdown.
pub async fn snapshot(&self) -> GlobalResult<bool> { | ||
match self.manager.snapshot_with_key(&self.key_packed).await { | ||
Ok(x) => Ok(x), | ||
Err(err) => { | ||
tracing::error!( | ||
?err, | ||
"failed to snapshot on drop, will attempt snapshotting again when gc'd" | ||
); | ||
Ok(false) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This implementation swallows errors and always returns success (with false) when snapshot fails. Consider propagating the error or adding metrics to track failure rates.
) | ||
}); | ||
|
||
entry.last_access = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Updating last_access
without synchronization could lead to race conditions if multiple threads access the same entry simultaneously.
dba8d32
to
dbc2e30
Compare
da80453
to
be6321d
Compare
dbc2e30
to
dba8d32
Compare
be6321d
to
7fb8933
Compare
Deploying rivet-hub with
|
Latest commit: |
c8342a8
|
Status: | ✅ Deploy successful! |
Preview URL: | https://23c1ed4f.rivet-hub-7jb.pages.dev |
Branch Preview URL: | https://02-25-chore-pools-rewrite-sq.rivet-hub-7jb.pages.dev |
7fb8933
to
c8342a8
Compare
dba8d32
to
dbc2e30
Compare
c8342a8
to
feecc72
Compare
dbc2e30
to
dba8d32
Compare
feecc72
to
c8342a8
Compare
dba8d32
to
dbc2e30
Compare
c8342a8
to
feecc72
Compare
feecc72
to
c8342a8
Compare
dbc2e30
to
dba8d32
Compare
dba8d32
to
dbc2e30
Compare
c8342a8
to
feecc72
Compare
Changes