Skip to content

Commit fad40b8

Browse files
committed
Re-introduce internal runtime to VssStore
In order to avoid the recently, discovered blocking-task-deadlock (in which the task holding the runtime reactor got blocked and hence stopped polling VSS write tasks), we where re-introduce an internal runtime to the `VssStore`, on which we spawn the tasks, while still using `block_on` of our regular runtime for async-sync conversions. This also finally fixes our VSS CI.
1 parent fa5b029 commit fad40b8

File tree

1 file changed

+144
-49
lines changed

1 file changed

+144
-49
lines changed

src/io/vss_store.rs

Lines changed: 144 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::future::Future;
1111
#[cfg(test)]
1212
use std::panic::RefUnwindSafe;
1313
use std::pin::Pin;
14-
use std::sync::atomic::{AtomicU64, Ordering};
14+
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717

@@ -44,13 +44,16 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444
Box<dyn Fn(&VssError) -> bool + 'static + Send + Sync>,
4545
>;
4646

47+
const INTERNAL_RUNTIME_WORKERS: usize = 2;
48+
4749
/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
4850
pub struct VssStore {
4951
inner: Arc<VssStoreInner>,
5052
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
5153
// operations aren't sensitive to the order of execution.
5254
next_version: AtomicU64,
5355
runtime: Arc<Runtime>,
56+
internal_runtime: Option<tokio::runtime::Runtime>,
5457
}
5558

5659
impl VssStore {
@@ -60,7 +63,21 @@ impl VssStore {
6063
) -> Self {
6164
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
6265
let next_version = AtomicU64::new(1);
63-
Self { inner, next_version, runtime }
66+
let internal_runtime = Some(
67+
tokio::runtime::Builder::new_multi_thread()
68+
.enable_all()
69+
.thread_name_fn(|| {
70+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
71+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
72+
format!("ldk-node-vss-runtime-{}", id)
73+
})
74+
.worker_threads(INTERNAL_RUNTIME_WORKERS)
75+
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
76+
.build()
77+
.unwrap(),
78+
);
79+
80+
Self { inner, next_version, runtime, internal_runtime }
6481
}
6582

6683
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -94,46 +111,106 @@ impl KVStoreSync for VssStore {
94111
fn read(
95112
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
96113
) -> io::Result<Vec<u8>> {
97-
let fut = self.inner.read_internal(primary_namespace, secondary_namespace, key);
98-
self.runtime.block_on(fut)
114+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
115+
debug_assert!(false, "Failed to access internal runtime");
116+
let msg = format!("Failed to access internal runtime");
117+
Error::new(ErrorKind::Other, msg)
118+
})?;
119+
let primary_namespace = primary_namespace.to_string();
120+
let secondary_namespace = secondary_namespace.to_string();
121+
let key = key.to_string();
122+
let inner = Arc::clone(&self.inner);
123+
let fut =
124+
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
125+
let spawned_fut = internal_runtime.spawn(fut);
126+
self.runtime.block_on(async move { spawned_fut.await }).map_err(|e| {
127+
let msg = format!("Failed to join read future: {}", e);
128+
Error::new(ErrorKind::Other, msg)
129+
})?
99130
}
100131

101132
fn write(
102133
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
103134
) -> io::Result<()> {
104-
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
135+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
136+
debug_assert!(false, "Failed to access internal runtime");
137+
let msg = format!("Failed to access internal runtime");
138+
Error::new(ErrorKind::Other, msg)
139+
})?;
140+
let primary_namespace = primary_namespace.to_string();
141+
let secondary_namespace = secondary_namespace.to_string();
142+
let key = key.to_string();
143+
let inner = Arc::clone(&self.inner);
144+
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
105145
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
106-
let fut = self.inner.write_internal(
107-
inner_lock_ref,
108-
locking_key,
109-
version,
110-
primary_namespace,
111-
secondary_namespace,
112-
key,
113-
buf,
114-
);
115-
self.runtime.block_on(fut)
146+
let fut = async move {
147+
inner
148+
.write_internal(
149+
inner_lock_ref,
150+
locking_key,
151+
version,
152+
primary_namespace,
153+
secondary_namespace,
154+
key,
155+
buf,
156+
)
157+
.await
158+
};
159+
let spawned_fut = internal_runtime.spawn(fut);
160+
self.runtime.block_on(async move { spawned_fut.await }).map_err(|e| {
161+
let msg = format!("Failed to join write future: {}", e);
162+
Error::new(ErrorKind::Other, msg)
163+
})?
116164
}
117165

118166
fn remove(
119167
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
120168
) -> io::Result<()> {
121-
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
169+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
170+
debug_assert!(false, "Failed to access internal runtime");
171+
let msg = format!("Failed to access internal runtime");
172+
Error::new(ErrorKind::Other, msg)
173+
})?;
174+
let primary_namespace = primary_namespace.to_string();
175+
let secondary_namespace = secondary_namespace.to_string();
176+
let key = key.to_string();
177+
let inner = Arc::clone(&self.inner);
178+
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
122179
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
123-
let fut = self.inner.remove_internal(
124-
inner_lock_ref,
125-
locking_key,
126-
version,
127-
primary_namespace,
128-
secondary_namespace,
129-
key,
130-
);
131-
self.runtime.block_on(fut)
180+
let fut = async move {
181+
inner
182+
.remove_internal(
183+
inner_lock_ref,
184+
locking_key,
185+
version,
186+
primary_namespace,
187+
secondary_namespace,
188+
key,
189+
)
190+
.await
191+
};
192+
let spawned_fut = internal_runtime.spawn(fut);
193+
self.runtime.block_on(async move { spawned_fut.await }).map_err(|e| {
194+
let msg = format!("Failed to join remove future: {}", e);
195+
Error::new(ErrorKind::Other, msg)
196+
})?
132197
}
133198

134199
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
135-
let fut = self.inner.list_internal(primary_namespace, secondary_namespace);
136-
self.runtime.block_on(fut)
200+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
201+
debug_assert!(false, "Failed to access internal runtime");
202+
let msg = format!("Failed to access internal runtime");
203+
Error::new(ErrorKind::Other, msg)
204+
})?;
205+
let primary_namespace = primary_namespace.to_string();
206+
let secondary_namespace = secondary_namespace.to_string();
207+
let inner = Arc::clone(&self.inner);
208+
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
209+
let spawned_fut = internal_runtime.spawn(fut);
210+
self.runtime.block_on(async move { spawned_fut.await }).map_err(|e| {
211+
let msg = format!("Failed to join list future: {}", e);
212+
Error::new(ErrorKind::Other, msg)
213+
})?
137214
}
138215
}
139216

@@ -145,9 +222,9 @@ impl KVStore for VssStore {
145222
let secondary_namespace = secondary_namespace.to_string();
146223
let key = key.to_string();
147224
let inner = Arc::clone(&self.inner);
148-
Box::pin(async move {
149-
inner.read_internal(&primary_namespace, &secondary_namespace, &key).await
150-
})
225+
Box::pin(
226+
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
227+
)
151228
}
152229
fn write(
153230
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -164,9 +241,9 @@ impl KVStore for VssStore {
164241
inner_lock_ref,
165242
locking_key,
166243
version,
167-
&primary_namespace,
168-
&secondary_namespace,
169-
&key,
244+
primary_namespace,
245+
secondary_namespace,
246+
key,
170247
buf,
171248
)
172249
.await
@@ -187,9 +264,9 @@ impl KVStore for VssStore {
187264
inner_lock_ref,
188265
locking_key,
189266
version,
190-
&primary_namespace,
191-
&secondary_namespace,
192-
&key,
267+
primary_namespace,
268+
secondary_namespace,
269+
key,
193270
)
194271
.await
195272
})
@@ -200,7 +277,14 @@ impl KVStore for VssStore {
200277
let primary_namespace = primary_namespace.to_string();
201278
let secondary_namespace = secondary_namespace.to_string();
202279
let inner = Arc::clone(&self.inner);
203-
Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await })
280+
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
281+
}
282+
}
283+
284+
impl Drop for VssStore {
285+
fn drop(&mut self) {
286+
let internal_runtime = self.internal_runtime.take();
287+
tokio::task::block_in_place(move || drop(internal_runtime));
204288
}
205289
}
206290

@@ -300,11 +384,12 @@ impl VssStoreInner {
300384
}
301385

302386
async fn read_internal(
303-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
387+
&self, primary_namespace: String, secondary_namespace: String, key: String,
304388
) -> io::Result<Vec<u8>> {
305-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
389+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
306390

307-
let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
391+
let obfuscated_key =
392+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
308393
let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key };
309394
let resp = self.client.get_object(&request).await.map_err(|e| {
310395
let msg = format!(
@@ -332,13 +417,18 @@ impl VssStoreInner {
332417

333418
async fn write_internal(
334419
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
335-
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
420+
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
336421
) -> io::Result<()> {
337-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
422+
check_namespace_key_validity(
423+
&primary_namespace,
424+
&secondary_namespace,
425+
Some(&key),
426+
"write",
427+
)?;
338428

339429
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
340430
let obfuscated_key =
341-
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
431+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
342432
let vss_version = -1;
343433
let storable = self.storable_builder.build(buf, vss_version);
344434
let request = PutObjectRequest {
@@ -367,13 +457,18 @@ impl VssStoreInner {
367457

368458
async fn remove_internal(
369459
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
370-
primary_namespace: &str, secondary_namespace: &str, key: &str,
460+
primary_namespace: String, secondary_namespace: String, key: String,
371461
) -> io::Result<()> {
372-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
462+
check_namespace_key_validity(
463+
&primary_namespace,
464+
&secondary_namespace,
465+
Some(&key),
466+
"remove",
467+
)?;
373468

374469
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
375470
let obfuscated_key =
376-
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
471+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
377472
let request = DeleteObjectRequest {
378473
store_id: self.store_id.clone(),
379474
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
@@ -393,12 +488,12 @@ impl VssStoreInner {
393488
}
394489

395490
async fn list_internal(
396-
&self, primary_namespace: &str, secondary_namespace: &str,
491+
&self, primary_namespace: String, secondary_namespace: String,
397492
) -> io::Result<Vec<String>> {
398-
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
493+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
399494

400495
let keys =
401-
self.list_all_keys(primary_namespace, secondary_namespace).await.map_err(|e| {
496+
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
402497
let msg = format!(
403498
"Failed to retrieve keys in namespace: {}/{} : {}",
404499
primary_namespace, secondary_namespace, e

0 commit comments

Comments
 (0)