Skip to content

Commit db79781

Browse files
committed
Re-introduce internal runtime to VssStore
In order to avoid the recently, discovered blocking-task-deadlock (in which the task holding the runtime reactor got blocked and hence stopped polling VSS write tasks), we where re-introduce an internal runtime to the `VssStore`, on which we spawn the tasks, while still using `block_on` of our regular runtime for async-sync conversions. This also finally fixes our VSS CI.
1 parent fa5b029 commit db79781

File tree

1 file changed

+161
-49
lines changed

1 file changed

+161
-49
lines changed

src/io/vss_store.rs

Lines changed: 161 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::future::Future;
1111
#[cfg(test)]
1212
use std::panic::RefUnwindSafe;
1313
use std::pin::Pin;
14-
use std::sync::atomic::{AtomicU64, Ordering};
14+
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515
use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717

@@ -44,13 +44,17 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444
Box<dyn Fn(&VssError) -> bool + 'static + Send + Sync>,
4545
>;
4646

47+
const INTERNAL_RUNTIME_WORKERS: usize = 2;
48+
const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5);
49+
4750
/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
4851
pub struct VssStore {
4952
inner: Arc<VssStoreInner>,
5053
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
5154
// operations aren't sensitive to the order of execution.
5255
next_version: AtomicU64,
5356
runtime: Arc<Runtime>,
57+
internal_runtime: Option<tokio::runtime::Runtime>,
5458
}
5559

5660
impl VssStore {
@@ -60,7 +64,21 @@ impl VssStore {
6064
) -> Self {
6165
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
6266
let next_version = AtomicU64::new(1);
63-
Self { inner, next_version, runtime }
67+
let internal_runtime = Some(
68+
tokio::runtime::Builder::new_multi_thread()
69+
.enable_all()
70+
.thread_name_fn(|| {
71+
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
72+
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
73+
format!("ldk-node-vss-runtime-{}", id)
74+
})
75+
.worker_threads(INTERNAL_RUNTIME_WORKERS)
76+
.max_blocking_threads(INTERNAL_RUNTIME_WORKERS)
77+
.build()
78+
.unwrap(),
79+
);
80+
81+
Self { inner, next_version, runtime, internal_runtime }
6482
}
6583

6684
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -94,46 +112,122 @@ impl KVStoreSync for VssStore {
94112
fn read(
95113
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
96114
) -> io::Result<Vec<u8>> {
97-
let fut = self.inner.read_internal(primary_namespace, secondary_namespace, key);
98-
self.runtime.block_on(fut)
115+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
116+
debug_assert!(false, "Failed to access internal runtime");
117+
let msg = format!("Failed to access internal runtime");
118+
Error::new(ErrorKind::Other, msg)
119+
})?;
120+
let primary_namespace = primary_namespace.to_string();
121+
let secondary_namespace = secondary_namespace.to_string();
122+
let key = key.to_string();
123+
let inner = Arc::clone(&self.inner);
124+
let fut =
125+
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await };
126+
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
127+
// times out.
128+
let spawned_fut = internal_runtime.spawn(async move {
129+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
130+
let msg = "VssStore::read timed out";
131+
Error::new(ErrorKind::Other, msg)
132+
})
133+
});
134+
self.runtime.block_on(spawned_fut).expect("We should always finish")?
99135
}
100136

101137
fn write(
102138
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
103139
) -> io::Result<()> {
104-
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
140+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
141+
debug_assert!(false, "Failed to access internal runtime");
142+
let msg = format!("Failed to access internal runtime");
143+
Error::new(ErrorKind::Other, msg)
144+
})?;
145+
let primary_namespace = primary_namespace.to_string();
146+
let secondary_namespace = secondary_namespace.to_string();
147+
let key = key.to_string();
148+
let inner = Arc::clone(&self.inner);
149+
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
105150
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
106-
let fut = self.inner.write_internal(
107-
inner_lock_ref,
108-
locking_key,
109-
version,
110-
primary_namespace,
111-
secondary_namespace,
112-
key,
113-
buf,
114-
);
115-
self.runtime.block_on(fut)
151+
let fut = async move {
152+
inner
153+
.write_internal(
154+
inner_lock_ref,
155+
locking_key,
156+
version,
157+
primary_namespace,
158+
secondary_namespace,
159+
key,
160+
buf,
161+
)
162+
.await
163+
};
164+
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
165+
// times out.
166+
let spawned_fut = internal_runtime.spawn(async move {
167+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
168+
let msg = "VssStore::write timed out";
169+
Error::new(ErrorKind::Other, msg)
170+
})
171+
});
172+
self.runtime.block_on(spawned_fut).expect("We should always finish")?
116173
}
117174

118175
fn remove(
119176
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
120177
) -> io::Result<()> {
121-
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
178+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
179+
debug_assert!(false, "Failed to access internal runtime");
180+
let msg = format!("Failed to access internal runtime");
181+
Error::new(ErrorKind::Other, msg)
182+
})?;
183+
let primary_namespace = primary_namespace.to_string();
184+
let secondary_namespace = secondary_namespace.to_string();
185+
let key = key.to_string();
186+
let inner = Arc::clone(&self.inner);
187+
let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key);
122188
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
123-
let fut = self.inner.remove_internal(
124-
inner_lock_ref,
125-
locking_key,
126-
version,
127-
primary_namespace,
128-
secondary_namespace,
129-
key,
130-
);
131-
self.runtime.block_on(fut)
189+
let fut = async move {
190+
inner
191+
.remove_internal(
192+
inner_lock_ref,
193+
locking_key,
194+
version,
195+
primary_namespace,
196+
secondary_namespace,
197+
key,
198+
)
199+
.await
200+
};
201+
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
202+
// times out.
203+
let spawned_fut = internal_runtime.spawn(async move {
204+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
205+
let msg = "VssStore::remove timed out";
206+
Error::new(ErrorKind::Other, msg)
207+
})
208+
});
209+
self.runtime.block_on(spawned_fut).expect("We should always finish")?
132210
}
133211

134212
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
135-
let fut = self.inner.list_internal(primary_namespace, secondary_namespace);
136-
self.runtime.block_on(fut)
213+
let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| {
214+
debug_assert!(false, "Failed to access internal runtime");
215+
let msg = format!("Failed to access internal runtime");
216+
Error::new(ErrorKind::Other, msg)
217+
})?;
218+
let primary_namespace = primary_namespace.to_string();
219+
let secondary_namespace = secondary_namespace.to_string();
220+
let inner = Arc::clone(&self.inner);
221+
let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await };
222+
// TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
223+
// times out.
224+
let spawned_fut = internal_runtime.spawn(async move {
225+
tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| {
226+
let msg = "VssStore::list timed out";
227+
Error::new(ErrorKind::Other, msg)
228+
})
229+
});
230+
self.runtime.block_on(spawned_fut).expect("We should always finish")?
137231
}
138232
}
139233

@@ -145,9 +239,9 @@ impl KVStore for VssStore {
145239
let secondary_namespace = secondary_namespace.to_string();
146240
let key = key.to_string();
147241
let inner = Arc::clone(&self.inner);
148-
Box::pin(async move {
149-
inner.read_internal(&primary_namespace, &secondary_namespace, &key).await
150-
})
242+
Box::pin(
243+
async move { inner.read_internal(primary_namespace, secondary_namespace, key).await },
244+
)
151245
}
152246
fn write(
153247
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
@@ -164,9 +258,9 @@ impl KVStore for VssStore {
164258
inner_lock_ref,
165259
locking_key,
166260
version,
167-
&primary_namespace,
168-
&secondary_namespace,
169-
&key,
261+
primary_namespace,
262+
secondary_namespace,
263+
key,
170264
buf,
171265
)
172266
.await
@@ -187,9 +281,9 @@ impl KVStore for VssStore {
187281
inner_lock_ref,
188282
locking_key,
189283
version,
190-
&primary_namespace,
191-
&secondary_namespace,
192-
&key,
284+
primary_namespace,
285+
secondary_namespace,
286+
key,
193287
)
194288
.await
195289
})
@@ -200,7 +294,14 @@ impl KVStore for VssStore {
200294
let primary_namespace = primary_namespace.to_string();
201295
let secondary_namespace = secondary_namespace.to_string();
202296
let inner = Arc::clone(&self.inner);
203-
Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await })
297+
Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await })
298+
}
299+
}
300+
301+
impl Drop for VssStore {
302+
fn drop(&mut self) {
303+
let internal_runtime = self.internal_runtime.take();
304+
tokio::task::block_in_place(move || drop(internal_runtime));
204305
}
205306
}
206307

@@ -300,11 +401,12 @@ impl VssStoreInner {
300401
}
301402

302403
async fn read_internal(
303-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
404+
&self, primary_namespace: String, secondary_namespace: String, key: String,
304405
) -> io::Result<Vec<u8>> {
305-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
406+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
306407

307-
let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
408+
let obfuscated_key =
409+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
308410
let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key };
309411
let resp = self.client.get_object(&request).await.map_err(|e| {
310412
let msg = format!(
@@ -332,13 +434,18 @@ impl VssStoreInner {
332434

333435
async fn write_internal(
334436
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
335-
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
437+
primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
336438
) -> io::Result<()> {
337-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
439+
check_namespace_key_validity(
440+
&primary_namespace,
441+
&secondary_namespace,
442+
Some(&key),
443+
"write",
444+
)?;
338445

339446
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
340447
let obfuscated_key =
341-
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
448+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
342449
let vss_version = -1;
343450
let storable = self.storable_builder.build(buf, vss_version);
344451
let request = PutObjectRequest {
@@ -367,13 +474,18 @@ impl VssStoreInner {
367474

368475
async fn remove_internal(
369476
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
370-
primary_namespace: &str, secondary_namespace: &str, key: &str,
477+
primary_namespace: String, secondary_namespace: String, key: String,
371478
) -> io::Result<()> {
372-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
479+
check_namespace_key_validity(
480+
&primary_namespace,
481+
&secondary_namespace,
482+
Some(&key),
483+
"remove",
484+
)?;
373485

374486
self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
375487
let obfuscated_key =
376-
self.build_obfuscated_key(primary_namespace, secondary_namespace, key);
488+
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
377489
let request = DeleteObjectRequest {
378490
store_id: self.store_id.clone(),
379491
key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }),
@@ -393,12 +505,12 @@ impl VssStoreInner {
393505
}
394506

395507
async fn list_internal(
396-
&self, primary_namespace: &str, secondary_namespace: &str,
508+
&self, primary_namespace: String, secondary_namespace: String,
397509
) -> io::Result<Vec<String>> {
398-
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
510+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
399511

400512
let keys =
401-
self.list_all_keys(primary_namespace, secondary_namespace).await.map_err(|e| {
513+
self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| {
402514
let msg = format!(
403515
"Failed to retrieve keys in namespace: {}/{} : {}",
404516
primary_namespace, secondary_namespace, e

0 commit comments

Comments
 (0)