diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 81b63fdf9..8473ed413 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -44,5 +44,5 @@ jobs: run: | cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" - RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always + RUSTFLAGS="--cfg vss_test" cargo test io::vss_store RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss diff --git a/src/builder.rs b/src/builder.rs index 0c843447a..195ac65c3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1138,6 +1138,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(RwLock::new(NodeMetrics::default())) } else { + log_error!(logger, "Failed to read node metrics from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1201,7 +1202,8 @@ fn build_with_store_internal( Arc::clone(&kv_store), Arc::clone(&logger), )), - Err(_) => { + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); return Err(BuildError::ReadFailed); }, }; @@ -1334,7 +1336,7 @@ fn build_with_store_internal( if e.kind() == lightning::io::ErrorKind::NotFound { Vec::new() } else { - log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); + log_error!(logger, "Failed to read channel monitors from store: {}", e.to_string()); return Err(BuildError::ReadFailed); } }, @@ -1359,6 +1361,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read network graph from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1379,6 +1382,7 @@ fn build_with_store_internal( Arc::clone(&logger), ))) } else { + log_error!(logger, "Failed to read scoring data from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1448,7 +1452,7 @@ fn build_with_store_internal( ); let (_hash, channel_manager) = <(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| { - log_error!(logger, "Failed to read channel manager from KVStore: {}", e); + log_error!(logger, "Failed to read channel manager from store: {}", e); BuildError::ReadFailed })?; channel_manager @@ -1677,6 +1681,7 @@ fn build_with_store_internal( Arc::clone(&logger), )) } else { + log_error!(logger, "Failed to read output sweeper data from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1689,6 +1694,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read event queue from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1700,6 +1706,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(PeerStore::new(Arc::clone(&kv_store), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read peer data from store: {}", e); return Err(BuildError::ReadFailed); } }, diff --git a/src/event.rs b/src/event.rs index db6ef13f1..eedfb1c14 100644 --- a/src/event.rs +++ b/src/event.rs @@ -9,7 +9,7 @@ use core::future::Future; use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -287,7 +287,6 @@ where { queue: Arc>>, waker: Arc>>, - notifier: Condvar, kv_store: Arc, logger: L, } @@ -299,8 +298,7 @@ where pub(crate) fn new(kv_store: Arc, logger: L) -> Self { let queue = Arc::new(Mutex::new(VecDeque::new())); let waker = Arc::new(Mutex::new(None)); - let notifier = Condvar::new(); - Self { queue, waker, notifier, kv_store, logger } + Self { queue, waker, kv_store, logger } } pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> { @@ -310,8 +308,6 @@ where self.persist_queue(&locked_queue)?; } - self.notifier.notify_one(); - if let Some(waker) = self.waker.lock().unwrap().take() { waker.wake(); } @@ -327,19 +323,12 @@ where EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await } - pub(crate) fn wait_next_event(&self) -> Event { - let locked_queue = - self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap(); - locked_queue.front().unwrap().clone() - } - pub(crate) fn event_handled(&self) -> Result<(), Error> { { let mut locked_queue = self.queue.lock().unwrap(); locked_queue.pop_front(); self.persist_queue(&locked_queue)?; } - self.notifier.notify_one(); if let Some(waker) = self.waker.lock().unwrap().take() { waker.wake(); @@ -383,8 +372,7 @@ where let read_queue: EventQueueDeserWrapper = Readable::read(reader)?; let queue = Arc::new(Mutex::new(read_queue.0)); let waker = Arc::new(Mutex::new(None)); - let notifier = Condvar::new(); - Ok(Self { queue, waker, notifier, kv_store, logger }) + Ok(Self { queue, waker, kv_store, logger }) } } @@ -1637,7 +1625,6 @@ mod tests { // Check we get the expected event and that it is returned until we mark it handled. for _ in 0..5 { - assert_eq!(event_queue.wait_next_event(), expected_event); assert_eq!(event_queue.next_event_async().await, expected_event); assert_eq!(event_queue.next_event(), Some(expected_event.clone())); } @@ -1652,7 +1639,7 @@ mod tests { .unwrap(); let deser_event_queue = EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); - assert_eq!(deser_event_queue.wait_next_event(), expected_event); + assert_eq!(deser_event_queue.next_event_async().await, expected_event); event_queue.event_handled().unwrap(); assert_eq!(event_queue.next_event(), None); @@ -1721,32 +1708,5 @@ mod tests { } } assert_eq!(event_queue.next_event(), None); - - // Check we operate correctly, even when mixing and matching blocking and async API calls. - let (tx, mut rx) = tokio::sync::watch::channel(()); - let thread_queue = Arc::clone(&event_queue); - let thread_event = expected_event.clone(); - std::thread::spawn(move || { - let e = thread_queue.wait_next_event(); - assert_eq!(e, thread_event); - thread_queue.event_handled().unwrap(); - tx.send(()).unwrap(); - }); - - let thread_queue = Arc::clone(&event_queue); - let thread_event = expected_event.clone(); - std::thread::spawn(move || { - // Sleep a bit before we enqueue the events everybody is waiting for. - std::thread::sleep(Duration::from_millis(20)); - thread_queue.add_event(thread_event.clone()).unwrap(); - thread_queue.add_event(thread_event.clone()).unwrap(); - }); - - let e = event_queue.next_event_async().await; - assert_eq!(e, expected_event.clone()); - event_queue.event_handled().unwrap(); - - rx.changed().await.unwrap(); - assert_eq!(event_queue.next_event(), None); } } diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 134ff7af2..ed8e13890 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -11,7 +11,7 @@ use std::future::Future; #[cfg(test)] use std::panic::RefUnwindSafe; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -44,6 +44,11 @@ type CustomRetryPolicy = FilteredRetryPolicy< Box bool + 'static + Send + Sync>, >; +// We set this to a small number of threads that would still allow to make some progress if one +// would hit a blocking case +const INTERNAL_RUNTIME_WORKERS: usize = 2; +const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); + /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { inner: Arc, @@ -51,6 +56,13 @@ pub struct VssStore { // operations aren't sensitive to the order of execution. next_version: AtomicU64, runtime: Arc, + // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned + // blocking task to finish while the blocked thread had acquired the reactor. In particular, + // this works around a previously-hit case where a concurrent call to + // `PeerManager::process_pending_events` -> `ChannelManager::get_and_clear_pending_msg_events` + // would deadlock when trying to acquire sync `Mutex` locks that are held by the thread + // currently being blocked waiting on the VSS operation to finish. + internal_runtime: Option, } impl VssStore { @@ -60,7 +72,21 @@ impl VssStore { ) -> Self { let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); let next_version = AtomicU64::new(1); - Self { inner, next_version, runtime } + let internal_runtime = Some( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .unwrap(), + ); + + Self { inner, next_version, runtime, internal_runtime } } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -94,46 +120,122 @@ impl KVStoreSync for VssStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { - let fut = self.inner.read_internal(primary_namespace, secondary_namespace, key); - self.runtime.block_on(fut) + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::read timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { - let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); - let fut = self.inner.write_internal( - inner_lock_ref, - locking_key, - version, - primary_namespace, - secondary_namespace, - key, - buf, - ); - self.runtime.block_on(fut) + let fut = async move { + inner + .write_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::write timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result<()> { - let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); - let fut = self.inner.remove_internal( - inner_lock_ref, - locking_key, - version, - primary_namespace, - secondary_namespace, - key, - ); - self.runtime.block_on(fut) + let fut = async move { + inner + .remove_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + ) + .await + }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::remove timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - let fut = self.inner.list_internal(primary_namespace, secondary_namespace); - self.runtime.block_on(fut) + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::list timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } } @@ -145,9 +247,9 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { - inner.read_internal(&primary_namespace, &secondary_namespace, &key).await - }) + Box::pin( + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, + ) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, @@ -164,9 +266,9 @@ impl KVStore for VssStore { inner_lock_ref, locking_key, version, - &primary_namespace, - &secondary_namespace, - &key, + primary_namespace, + secondary_namespace, + key, buf, ) .await @@ -187,9 +289,9 @@ impl KVStore for VssStore { inner_lock_ref, locking_key, version, - &primary_namespace, - &secondary_namespace, - &key, + primary_namespace, + secondary_namespace, + key, ) .await }) @@ -200,7 +302,14 @@ impl KVStore for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await }) + Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + } +} + +impl Drop for VssStore { + fn drop(&mut self) { + let internal_runtime = self.internal_runtime.take(); + tokio::task::block_in_place(move || drop(internal_runtime)); } } @@ -300,11 +409,12 @@ impl VssStoreInner { } async fn read_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: String, secondary_namespace: String, key: String, ) -> io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; - let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + let obfuscated_key = + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( @@ -332,13 +442,18 @@ impl VssStoreInner { async fn write_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, ) -> io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = - self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let vss_version = -1; let storable = self.storable_builder.build(buf, vss_version); let request = PutObjectRequest { @@ -367,13 +482,18 @@ impl VssStoreInner { async fn remove_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_namespace: String, secondary_namespace: String, key: String, ) -> io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = - self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let request = DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), @@ -393,12 +513,12 @@ impl VssStoreInner { } async fn list_internal( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, primary_namespace: String, secondary_namespace: String, ) -> io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; let keys = - self.list_all_keys(primary_namespace, secondary_namespace).await.map_err(|e| { + self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| { let msg = format!( "Failed to retrieve keys in namespace: {}/{} : {}", primary_namespace, secondary_namespace, e @@ -486,38 +606,40 @@ mod tests { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng, RngCore}; - use tokio::runtime; use vss_client::headers::FixedHeaders; use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; + use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { - let runtime = Arc::new(Runtime::new().unwrap()); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let mut rng = thread_rng(); let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); do_read_write_remove_list_persist(&vss_store); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn vss_read_write_remove_list_persist_in_runtime_context() { - let runtime = Arc::new(Runtime::new().unwrap()); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let mut rng = thread_rng(); let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); do_read_write_remove_list_persist(&vss_store); drop(vss_store) diff --git a/src/lib.rs b/src/lib.rs index f07b2def3..2ea704d27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -749,7 +749,10 @@ impl Node { /// **Caution:** Users must handle events as quickly as possible to prevent a large event backlog, /// which can increase the memory footprint of [`Node`]. pub fn wait_next_event(&self) -> Event { - self.event_queue.wait_next_event() + let fut = self.event_queue.next_event_async(); + // We use our runtime for the sync variant to ensure `tokio::task::block_in_place` is + // always called if we'd ever hit this in an outer runtime context. + self.runtime.block_on(fut) } /// Confirm the last retrieved event handled. diff --git a/src/runtime.rs b/src/runtime.rs index 2275d5bea..1e9883ae4 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -67,7 +67,10 @@ impl Runtime { { let mut background_tasks = self.background_tasks.lock().unwrap(); let runtime_handle = self.handle(); - background_tasks.spawn_on(future, runtime_handle); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + background_tasks.spawn_on(async { future.await }, runtime_handle); } pub fn spawn_cancellable_background_task(&self, future: F) @@ -76,7 +79,10 @@ impl Runtime { { let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap(); let runtime_handle = self.handle(); - cancellable_background_tasks.spawn_on(future, runtime_handle); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle); } pub fn spawn_background_processor_task(&self, future: F) @@ -107,7 +113,10 @@ impl Runtime { // to detect the outer context here, and otherwise use whatever was set during // initialization. let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone()); - tokio::task::block_in_place(move || handle.block_on(future)) + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + tokio::task::block_in_place(move || handle.block_on(async { future.await })) } pub fn abort_cancellable_background_tasks(&self) { @@ -154,6 +163,9 @@ impl Runtime { self.background_processor_task.lock().unwrap().take() { let abort_handle = background_processor_task.abort_handle(); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. let timeout_res = self.block_on(async { tokio::time::timeout( Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4d02895c7..05326b03d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -49,7 +49,7 @@ use serde_json::{json, Value}; macro_rules! expect_event { ($node:expr, $event_type:ident) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::$event_type { .. } => { println!("{} got event {:?}", $node.node_id(), e); $node.event_handled().unwrap(); @@ -65,7 +65,7 @@ pub(crate) use expect_event; macro_rules! expect_channel_pending_event { ($node:expr, $counterparty_node_id:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::ChannelPending { funding_txo, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(counterparty_node_id, $counterparty_node_id); @@ -83,7 +83,7 @@ pub(crate) use expect_channel_pending_event; macro_rules! expect_channel_ready_event { ($node:expr, $counterparty_node_id:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(counterparty_node_id, Some($counterparty_node_id)); @@ -101,7 +101,7 @@ pub(crate) use expect_channel_ready_event; macro_rules! expect_payment_received_event { ($node:expr, $amount_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentReceived { payment_id, amount_msat, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(amount_msat, $amount_msat); @@ -123,7 +123,7 @@ pub(crate) use expect_payment_received_event; macro_rules! expect_payment_claimable_event { ($node:expr, $payment_id:expr, $payment_hash:expr, $claimable_amount_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentClaimable { payment_id, payment_hash, @@ -148,7 +148,7 @@ pub(crate) use expect_payment_claimable_event; macro_rules! expect_payment_successful_event { ($node:expr, $payment_id:expr, $fee_paid_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentSuccessful { payment_id, fee_paid_msat, .. } => { println!("{} got event {:?}", $node.node_id(), e); if let Some(fee_msat) = $fee_paid_msat { @@ -389,7 +389,7 @@ pub(crate) fn setup_node_for_async_payments( node } -pub(crate) fn generate_blocks_and_wait( +pub(crate) async fn generate_blocks_and_wait( bitcoind: &BitcoindClient, electrs: &E, num: usize, ) { let _ = bitcoind.create_wallet("ldk_node_test"); @@ -400,7 +400,7 @@ pub(crate) fn generate_blocks_and_wait( let address = bitcoind.new_address().expect("failed to get new address"); // TODO: expect this Result once the WouldBlock issue is resolved upstream. let _block_hashes_res = bitcoind.generate_to_address(num, &address); - wait_for_block(electrs, cur_height as usize + num); + wait_for_block(electrs, cur_height as usize + num).await; print!(" Done!"); println!("\n"); } @@ -420,14 +420,14 @@ pub(crate) fn invalidate_blocks(bitcoind: &BitcoindClient, num_blocks: usize) { assert!(new_cur_height + num_blocks == cur_height); } -pub(crate) fn wait_for_block(electrs: &E, min_height: usize) { +pub(crate) async fn wait_for_block(electrs: &E, min_height: usize) { let mut header = match electrs.block_headers_subscribe() { Ok(header) => header, Err(_) => { // While subscribing should succeed the first time around, we ran into some cases where // it didn't. Since we can't proceed without subscribing, we try again after a delay // and panic if it still fails. - std::thread::sleep(Duration::from_secs(3)); + tokio::time::sleep(Duration::from_secs(3)).await; electrs.block_headers_subscribe().expect("failed to subscribe to block headers") }, }; @@ -438,11 +438,12 @@ pub(crate) fn wait_for_block(electrs: &E, min_height: usize) { header = exponential_backoff_poll(|| { electrs.ping().expect("failed to ping electrs"); electrs.block_headers_pop().expect("failed to pop block header") - }); + }) + .await; } } -pub(crate) fn wait_for_tx(electrs: &E, txid: Txid) { +pub(crate) async fn wait_for_tx(electrs: &E, txid: Txid) { if electrs.transaction_get(&txid).is_ok() { return; } @@ -450,10 +451,11 @@ pub(crate) fn wait_for_tx(electrs: &E, txid: Txid) { exponential_backoff_poll(|| { electrs.ping().unwrap(); electrs.transaction_get(&txid).ok() - }); + }) + .await; } -pub(crate) fn wait_for_outpoint_spend(electrs: &E, outpoint: OutPoint) { +pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoint: OutPoint) { let tx = electrs.transaction_get(&outpoint.txid).unwrap(); let txout_script = tx.output.get(outpoint.vout as usize).unwrap().clone().script_pubkey; @@ -467,10 +469,11 @@ pub(crate) fn wait_for_outpoint_spend(electrs: &E, outpoint: Out let is_spent = !electrs.script_get_history(&txout_script).unwrap().is_empty(); is_spent.then_some(()) - }); + }) + .await; } -pub(crate) fn exponential_backoff_poll(mut poll: F) -> T +pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, { @@ -487,26 +490,26 @@ where } assert!(tries < 20, "Reached max tries."); tries += 1; - std::thread::sleep(delay); + tokio::time::sleep(delay).await; } } -pub(crate) fn premine_and_distribute_funds( +pub(crate) async fn premine_and_distribute_funds( bitcoind: &BitcoindClient, electrs: &E, addrs: Vec
, amount: Amount, ) { - premine_blocks(bitcoind, electrs); + premine_blocks(bitcoind, electrs).await; - distribute_funds_unconfirmed(bitcoind, electrs, addrs, amount); - generate_blocks_and_wait(bitcoind, electrs, 1); + distribute_funds_unconfirmed(bitcoind, electrs, addrs, amount).await; + generate_blocks_and_wait(bitcoind, electrs, 1).await; } -pub(crate) fn premine_blocks(bitcoind: &BitcoindClient, electrs: &E) { +pub(crate) async fn premine_blocks(bitcoind: &BitcoindClient, electrs: &E) { let _ = bitcoind.create_wallet("ldk_node_test"); let _ = bitcoind.load_wallet("ldk_node_test"); - generate_blocks_and_wait(bitcoind, electrs, 101); + generate_blocks_and_wait(bitcoind, electrs, 101).await; } -pub(crate) fn distribute_funds_unconfirmed( +pub(crate) async fn distribute_funds_unconfirmed( bitcoind: &BitcoindClient, electrs: &E, addrs: Vec
, amount: Amount, ) -> Txid { let mut amounts = HashMap::::new(); @@ -524,7 +527,7 @@ pub(crate) fn distribute_funds_unconfirmed( .parse() .unwrap(); - wait_for_tx(electrs, txid); + wait_for_tx(electrs, txid).await; txid } @@ -543,7 +546,7 @@ pub(crate) fn prepare_rbf( (tx, fee_output_index) } -pub(crate) fn bump_fee_and_broadcast( +pub(crate) async fn bump_fee_and_broadcast( bitcoind: &BitcoindClient, electrs: &E, mut tx: Transaction, fee_output_index: usize, is_insert_block: bool, ) -> Transaction { @@ -573,10 +576,10 @@ pub(crate) fn bump_fee_and_broadcast( match bitcoind.send_raw_transaction(&tx) { Ok(res) => { if is_insert_block { - generate_blocks_and_wait(bitcoind, electrs, 1); + generate_blocks_and_wait(bitcoind, electrs, 1).await; } let new_txid: Txid = res.0.parse().unwrap(); - wait_for_tx(electrs, new_txid); + wait_for_tx(electrs, new_txid).await; return tx; }, Err(_) => { @@ -591,14 +594,14 @@ pub(crate) fn bump_fee_and_broadcast( panic!("Failed to bump fee after {} attempts", attempts); } -pub fn open_channel( +pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd) + open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await } -pub fn open_channel_push_amt( +pub async fn open_channel_push_amt( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { @@ -628,12 +631,12 @@ pub fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid); + wait_for_tx(&electrsd.client, funding_txo_a.txid).await; funding_txo_a } -pub(crate) fn do_channel_full_cycle( +pub(crate) async fn do_channel_full_cycle( node_a: TestNode, node_b: TestNode, bitcoind: &BitcoindClient, electrsd: &E, allow_0conf: bool, expect_anchor_channel: bool, force_close: bool, ) { @@ -647,7 +650,8 @@ pub(crate) fn do_channel_full_cycle( electrsd, vec![addr_a, addr_b], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -706,10 +710,10 @@ pub(crate) fn do_channel_full_cycle( let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(electrsd, funding_txo_a.txid); + wait_for_tx(electrsd, funding_txo_a.txid).await; if !allow_0conf { - generate_blocks_and_wait(&bitcoind, electrsd, 6); + generate_blocks_and_wait(&bitcoind, electrsd, 6).await; } node_a.sync_wallets().unwrap(); @@ -839,7 +843,7 @@ pub(crate) fn do_channel_full_cycle( let payment_id = node_a.bolt11_payment().send_using_amount(&invoice, overpaid_amount_msat, None).unwrap(); expect_event!(node_a, PaymentSuccessful); - let received_amount = match node_b.wait_next_event() { + let received_amount = match node_b.next_event_async().await { ref e @ Event::PaymentReceived { amount_msat, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); node_b.event_handled().unwrap(); @@ -877,7 +881,7 @@ pub(crate) fn do_channel_full_cycle( .unwrap(); expect_event!(node_a, PaymentSuccessful); - let received_amount = match node_b.wait_next_event() { + let received_amount = match node_b.next_event_async().await { ref e @ Event::PaymentReceived { amount_msat, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); node_b.event_handled().unwrap(); @@ -999,7 +1003,7 @@ pub(crate) fn do_channel_full_cycle( .send_with_custom_tlvs(keysend_amount_msat, node_b.node_id(), None, custom_tlvs.clone()) .unwrap(); expect_event!(node_a, PaymentSuccessful); - let next_event = node_b.wait_next_event(); + let next_event = node_b.next_event_async().await; let (received_keysend_amount, received_custom_records) = match next_event { ref e @ Event::PaymentReceived { amount_msat, ref custom_records, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); @@ -1049,7 +1053,7 @@ pub(crate) fn do_channel_full_cycle( println!("\nB close_channel (force: {})", force_close); if force_close { - std::thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; node_a.force_close_channel(&user_channel_id, node_b.node_id(), None).unwrap(); } else { node_a.close_channel(&user_channel_id, node_b.node_id()).unwrap(); @@ -1058,9 +1062,9 @@ pub(crate) fn do_channel_full_cycle( expect_event!(node_a, ChannelClosed); expect_event!(node_b, ChannelClosed); - wait_for_outpoint_spend(electrsd, funding_txo_b); + wait_for_outpoint_spend(electrsd, funding_txo_b).await; - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1076,7 +1080,7 @@ pub(crate) fn do_channel_full_cycle( assert_eq!(counterparty_node_id, node_a.node_id()); let cur_height = node_b.status().current_best_block.height; let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize); + generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); }, @@ -1089,7 +1093,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); @@ -1099,7 +1103,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 5); + generate_blocks_and_wait(&bitcoind, electrsd, 5).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); @@ -1117,7 +1121,7 @@ pub(crate) fn do_channel_full_cycle( assert_eq!(counterparty_node_id, node_b.node_id()); let cur_height = node_a.status().current_best_block.height; let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize); + generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); }, @@ -1130,7 +1134,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1140,7 +1144,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 5); + generate_blocks_and_wait(&bitcoind, electrsd, 5).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); } diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index 6fc72b2c2..38e345f15 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -25,8 +25,8 @@ use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -#[test] -fn test_cln() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_cln() { // Setup bitcoind / electrs clients let bitcoind_client = BitcoindClient::new_with_auth( "http://127.0.0.1:18443", @@ -36,7 +36,7 @@ fn test_cln() { let electrs_client = ElectrumClient::new("tcp://127.0.0.1:50001").unwrap(); // Give electrs a kick. - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1); + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1).await; // Setup LDK Node let config = common::random_config(true); @@ -54,7 +54,8 @@ fn test_cln() { &electrs_client, vec![address], premine_amount, - ); + ) + .await; // Setup CLN let sock = "/tmp/lightning-rpc"; @@ -67,7 +68,7 @@ fn test_cln() { if info.blockheight > 0 { break info; } - std::thread::sleep(std::time::Duration::from_millis(250)); + tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }; let cln_node_id = PublicKey::from_str(&cln_info.id).unwrap(); @@ -92,8 +93,8 @@ fn test_cln() { .unwrap(); let funding_txo = common::expect_channel_pending_event!(node, cln_node_id); - common::wait_for_tx(&electrs_client, funding_txo.txid); - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6); + common::wait_for_tx(&electrs_client, funding_txo.txid).await; + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6).await; node.sync_wallets().unwrap(); let user_channel_id = common::expect_channel_ready_event!(node, cln_node_id); diff --git a/tests/integration_tests_lnd.rs b/tests/integration_tests_lnd.rs index 7dfc1e4f9..311a11c3c 100755 --- a/tests/integration_tests_lnd.rs +++ b/tests/integration_tests_lnd.rs @@ -34,7 +34,7 @@ async fn test_lnd() { let electrs_client = ElectrumClient::new("tcp://127.0.0.1:50001").unwrap(); // Give electrs a kick. - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1); + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1).await; // Setup LDK Node let config = common::random_config(true); @@ -52,7 +52,8 @@ async fn test_lnd() { &electrs_client, vec![address], premine_amount, - ); + ) + .await; // Setup LND let endpoint = "127.0.0.1:8081"; @@ -73,8 +74,8 @@ async fn test_lnd() { .unwrap(); let funding_txo = common::expect_channel_pending_event!(node, lnd_node_id); - common::wait_for_tx(&electrs_client, funding_txo.txid); - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6); + common::wait_for_tx(&electrs_client, funding_txo.txid).await; + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6).await; node.sync_wallets().unwrap(); let user_channel_id = common::expect_channel_ready_event!(node, lnd_node_id); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 804bba876..e2d4207cd 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -39,72 +39,80 @@ use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; -#[test] -fn channel_full_cycle() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_electrum() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_electrum() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Electrum(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_bitcoind_rpc_sync() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_bitcoind_rpc_sync() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_bitcoind_rest_sync() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_bitcoind_rest_sync() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::BitcoindRestSync(&bitcoind); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_force_close() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true) + .await; } -#[test] -fn channel_full_cycle_force_close_trusted_no_reserve() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_force_close_trusted_no_reserve() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, true); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true) + .await; } -#[test] -fn channel_full_cycle_0conf() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, true, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, true, true, false) + .await; } -#[test] -fn channel_full_cycle_legacy_staticremotekey() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_legacy_staticremotekey() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, false, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, false, false) + .await; } -#[test] -fn channel_open_fails_when_funds_insufficient() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_open_fails_when_funds_insufficient() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -119,7 +127,8 @@ fn channel_open_fails_when_funds_insufficient() { &electrsd.client, vec![addr_a, addr_b], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -138,8 +147,8 @@ fn channel_open_fails_when_funds_insufficient() { ); } -#[test] -fn multi_hop_sending() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn multi_hop_sending() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -162,7 +171,8 @@ fn multi_hop_sending() { &electrsd.client, addresses, Amount::from_sat(premine_amount_sat), - ); + ) + .await; for n in &nodes { n.sync_wallets().unwrap(); @@ -177,18 +187,18 @@ fn multi_hop_sending() { // \ / // (1M:0)- N3 -(1M:0) - open_channel(&nodes[0], &nodes[1], 100_000, true, &electrsd); - open_channel(&nodes[1], &nodes[2], 1_000_000, true, &electrsd); + open_channel(&nodes[0], &nodes[1], 100_000, true, &electrsd).await; + open_channel(&nodes[1], &nodes[2], 1_000_000, true, &electrsd).await; // We need to sync wallets in-between back-to-back channel opens from the same node so BDK // wallet picks up on the broadcast funding tx and doesn't double-spend itself. // // TODO: Remove once fixed in BDK. nodes[1].sync_wallets().unwrap(); - open_channel(&nodes[1], &nodes[3], 1_000_000, true, &electrsd); - open_channel(&nodes[2], &nodes[4], 1_000_000, true, &electrsd); - open_channel(&nodes[3], &nodes[4], 1_000_000, true, &electrsd); + open_channel(&nodes[1], &nodes[3], 1_000_000, true, &electrsd).await; + open_channel(&nodes[2], &nodes[4], 1_000_000, true, &electrsd).await; + open_channel(&nodes[3], &nodes[4], 1_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; for n in &nodes { n.sync_wallets().unwrap(); @@ -206,7 +216,7 @@ fn multi_hop_sending() { expect_event!(nodes[4], ChannelReady); // Sleep a bit for gossip to propagate. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let route_params = RouteParametersConfig { max_total_routing_fee_msat: Some(75_000), @@ -235,8 +245,8 @@ fn multi_hop_sending() { expect_payment_successful_event!(nodes[0], payment_id, Some(fee_paid_msat)); } -#[test] -fn start_stop_reinit() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_reinit() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let config = random_config(true); @@ -265,7 +275,8 @@ fn start_stop_reinit() { &electrsd.client, vec![funding_address], expected_amount, - ); + ) + .await; node.sync_wallets().unwrap(); assert_eq!(node.list_balances().spendable_onchain_balance_sats, expected_amount.to_sat()); @@ -304,8 +315,8 @@ fn start_stop_reinit() { reinitialized_node.stop().unwrap(); } -#[test] -fn onchain_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -323,7 +334,8 @@ fn onchain_send_receive() { &electrsd.client, vec![addr_a.clone(), addr_b.clone()], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -350,8 +362,8 @@ fn onchain_send_receive() { let channel_amount_sat = 1_000_000; let reserve_amount_sat = 25_000; - open_channel(&node_b, &node_a, channel_amount_sat, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_b, &node_a, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -393,7 +405,7 @@ fn onchain_send_receive() { let amount_to_send_sats = 54321; let txid = node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -420,7 +432,7 @@ fn onchain_send_receive() { assert_eq!(payment_a.amount_msat, payment_b.amount_msat); assert_eq!(payment_a.fee_paid_msat, payment_b.fee_paid_msat); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -458,8 +470,8 @@ fn onchain_send_receive() { let addr_b = node_b.onchain_payment().new_address().unwrap(); let txid = node_a.onchain_payment().send_all_to_address(&addr_b, true, None).unwrap(); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -481,8 +493,8 @@ fn onchain_send_receive() { let addr_b = node_b.onchain_payment().new_address().unwrap(); let txid = node_a.onchain_payment().send_all_to_address(&addr_b, false, None).unwrap(); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -504,8 +516,8 @@ fn onchain_send_receive() { assert_eq!(node_b_payments.len(), 5); } -#[test] -fn onchain_send_all_retains_reserve() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_all_retains_reserve() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -522,7 +534,8 @@ fn onchain_send_all_retains_reserve() { &electrsd.client, vec![addr_a.clone(), addr_b.clone()], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -532,8 +545,8 @@ fn onchain_send_all_retains_reserve() { // Send all over, with 0 reserve as we don't have any channels open. let txid = node_a.onchain_payment().send_all_to_address(&addr_b, true, None).unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -550,15 +563,15 @@ fn onchain_send_all_retains_reserve() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, reserve_amount_sat); // Open a channel. - open_channel(&node_b, &node_a, premine_amount_sat, false, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_b, &node_a, premine_amount_sat, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); expect_channel_ready_event!(node_a, node_b.node_id()); @@ -573,8 +586,8 @@ fn onchain_send_all_retains_reserve() { // Send all over again, this time ensuring the reserve is accounted for let txid = node_b.onchain_payment().send_all_to_address(&addr_a, true, None).unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -587,8 +600,8 @@ fn onchain_send_all_retains_reserve() { .contains(&node_a.list_balances().spendable_onchain_balance_sats)); } -#[test] -fn onchain_wallet_recovery() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_recovery() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -607,7 +620,8 @@ fn onchain_wallet_recovery() { &electrsd.client, vec![addr_1], Amount::from_sat(premine_amount_sat), - ); + ) + .await; original_node.sync_wallets().unwrap(); assert_eq!(original_node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -620,9 +634,9 @@ fn onchain_wallet_recovery() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; original_node.sync_wallets().unwrap(); assert_eq!( @@ -656,9 +670,9 @@ fn onchain_wallet_recovery() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; recovered_node.sync_wallets().unwrap(); assert_eq!( @@ -667,20 +681,20 @@ fn onchain_wallet_recovery() { ); } -#[test] -fn test_rbf_via_mempool() { - run_rbf_test(false); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_rbf_via_mempool() { + run_rbf_test(false).await; } -#[test] -fn test_rbf_via_direct_block_insertion() { - run_rbf_test(true); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_rbf_via_direct_block_insertion() { + run_rbf_test(true).await; } // `is_insert_block`: // - `true`: transaction is mined immediately (no mempool), testing confirmed-Tx handling. // - `false`: transaction stays in mempool until confirmation, testing unconfirmed-Tx handling. -fn run_rbf_test(is_insert_block: bool) { +async fn run_rbf_test(is_insert_block: bool) { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); let chain_source_electrsd = TestChainSource::Electrum(&electrsd); @@ -701,7 +715,7 @@ fn run_rbf_test(is_insert_block: bool) { ]; let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); - premine_blocks(bitcoind, electrs); + premine_blocks(bitcoind, electrs).await; // Helpers declaration before starting the test let all_addrs = @@ -715,7 +729,8 @@ fn run_rbf_test(is_insert_block: bool) { electrs, all_addrs.clone(), Amount::from_sat(amount_sat), - ); + ) + .await; }; } macro_rules! validate_balances { @@ -745,14 +760,14 @@ fn run_rbf_test(is_insert_block: bool) { output.script_pubkey = new_addr.script_pubkey(); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; validate_balances!(0, is_insert_block); // Not modifying the output scripts, but still bumping the fee. distribute_funds_all_nodes!(); validate_balances!(amount_sat, false); (tx, fee_output_index) = prepare_rbf(electrs, txid, &scripts_buf); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; validate_balances!(amount_sat, is_insert_block); let mut final_amount_sat = amount_sat * 2; @@ -766,7 +781,7 @@ fn run_rbf_test(is_insert_block: bool) { output.value = Amount::from_sat(output.value.to_sat() + value_sat); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; final_amount_sat += value_sat; validate_balances!(final_amount_sat, is_insert_block); @@ -779,12 +794,12 @@ fn run_rbf_test(is_insert_block: bool) { output.value = Amount::from_sat(output.value.to_sat() - value_sat); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; final_amount_sat -= value_sat; validate_balances!(final_amount_sat, is_insert_block); if !is_insert_block { - generate_blocks_and_wait(bitcoind, electrs, 1); + generate_blocks_and_wait(bitcoind, electrs, 1).await; validate_balances!(final_amount_sat, true); } @@ -795,15 +810,15 @@ fn run_rbf_test(is_insert_block: bool) { let txid = node.onchain_payment().send_all_to_address(&addr, true, None).unwrap(); txids.push(txid); }); - txids.iter().for_each(|txid| { - wait_for_tx(electrs, *txid); - }); - generate_blocks_and_wait(bitcoind, electrs, 6); + for txid in txids { + wait_for_tx(electrs, txid).await; + } + generate_blocks_and_wait(bitcoind, electrs, 6).await; validate_balances!(0, true); } -#[test] -fn sign_verify_msg() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn sign_verify_msg() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let config = random_config(true); let chain_source = TestChainSource::Esplora(&electrsd); @@ -816,8 +831,8 @@ fn sign_verify_msg() { assert!(node.verify_signature(msg, sig.as_str(), &pkey)); } -#[test] -fn connection_multi_listen() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn connection_multi_listen() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); @@ -831,13 +846,13 @@ fn connection_multi_listen() { } } -#[test] -fn connection_restart_behavior() { - do_connection_restart_behavior(true); - do_connection_restart_behavior(false); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn connection_restart_behavior() { + do_connection_restart_behavior(true).await; + do_connection_restart_behavior(false).await; } -fn do_connection_restart_behavior(persist: bool) { +async fn do_connection_restart_behavior(persist: bool) { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); @@ -865,7 +880,7 @@ fn do_connection_restart_behavior(persist: bool) { node_a.start().unwrap(); // Sleep a bit to allow for the reconnect to happen. - std::thread::sleep(std::time::Duration::from_secs(5)); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; if persist { let peer_details_a = node_a.list_peers().first().unwrap().clone(); @@ -883,8 +898,8 @@ fn do_connection_restart_behavior(persist: bool) { } } -#[test] -fn concurrent_connections_succeed() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn concurrent_connections_succeed() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -910,8 +925,8 @@ fn concurrent_connections_succeed() { } } -#[test] -fn simple_bolt12_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -923,12 +938,13 @@ fn simple_bolt12_send_receive() { &electrsd.client, vec![address_a], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -938,11 +954,11 @@ fn simple_bolt12_send_receive() { // Sleep until we broadcasted a node announcement. while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep one more sec to make sure the node announcement propagates. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let expected_amount_msat = 100_000_000; let offer = @@ -1131,8 +1147,8 @@ fn simple_bolt12_send_receive() { assert_eq!(node_a_payments.first().unwrap().amount_msat, Some(overpaid_amount)); } -#[test] -fn async_payment() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn async_payment() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1186,15 +1202,16 @@ fn async_payment() { &electrsd.client, vec![address_sender, address_sender_lsp, address_receiver_lsp, address_receiver], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_sender.sync_wallets().unwrap(); node_sender_lsp.sync_wallets().unwrap(); node_receiver_lsp.sync_wallets().unwrap(); node_receiver.sync_wallets().unwrap(); - open_channel(&node_sender, &node_sender_lsp, 400_000, false, &electrsd); - open_channel(&node_sender_lsp, &node_receiver_lsp, 400_000, true, &electrsd); + open_channel(&node_sender, &node_sender_lsp, 400_000, false, &electrsd).await; + open_channel(&node_sender_lsp, &node_receiver_lsp, 400_000, true, &electrsd).await; open_channel_push_amt( &node_receiver, &node_receiver_lsp, @@ -1202,9 +1219,10 @@ fn async_payment() { Some(200_000_000), false, &electrsd, - ); + ) + .await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_sender.sync_wallets().unwrap(); node_sender_lsp.sync_wallets().unwrap(); @@ -1238,7 +1256,7 @@ fn async_payment() { || !has_node_announcements(&node_receiver_lsp) || !has_node_announcements(&node_receiver) { - std::thread::sleep(std::time::Duration::from_millis(100)); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } let recipient_id = vec![1, 2, 3]; @@ -1251,7 +1269,7 @@ fn async_payment() { break offer; } - std::thread::sleep(std::time::Duration::from_millis(100)); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; }; node_receiver.stop().unwrap(); @@ -1260,15 +1278,15 @@ fn async_payment() { node_sender.bolt12_payment().send_using_amount(&offer, 5_000, None, None).unwrap(); // Sleep to allow the payment reach a state where the htlc is held and waiting for the receiver to come online. - std::thread::sleep(std::time::Duration::from_millis(3000)); + tokio::time::sleep(std::time::Duration::from_millis(3000)).await; node_receiver.start().unwrap(); expect_payment_successful_event!(node_sender, Some(payment_id), None); } -#[test] -fn test_node_announcement_propagation() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_node_announcement_propagation() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1306,14 +1324,15 @@ fn test_node_announcement_propagation() { &electrsd.client, vec![address_a], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); // Open an announced channel from node_a to node_b - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1323,11 +1342,11 @@ fn test_node_announcement_propagation() { // Wait until node_b broadcasts a node announcement while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep to make sure the node announcement propagates - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; // Get node info from the other node's perspective let node_a_info = node_b.network_graph().node(&NodeId::from_pubkey(&node_a.node_id())).unwrap(); @@ -1358,8 +1377,8 @@ fn test_node_announcement_propagation() { assert_eq!(node_b_announcement_info.addresses, node_b_listening_addresses); } -#[test] -fn generate_bip21_uri() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn generate_bip21_uri() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1388,11 +1407,12 @@ fn generate_bip21_uri() { &electrsd.client, vec![address_a], Amount::from_sat(premined_sats), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1412,8 +1432,8 @@ fn generate_bip21_uri() { assert!(uqr_payment.contains("lno=")); } -#[test] -fn unified_qr_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn unified_qr_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1427,11 +1447,12 @@ fn unified_qr_send_receive() { &electrsd.client, vec![address_a], Amount::from_sat(premined_sats), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1441,11 +1462,11 @@ fn unified_qr_send_receive() { // Sleep until we broadcast a node announcement. while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep one more sec to make sure the node announcement propagates. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let expected_amount_sats = 100_000; let expiry_sec = 4_000; @@ -1512,8 +1533,8 @@ fn unified_qr_send_receive() { }, }; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1522,8 +1543,8 @@ fn unified_qr_send_receive() { assert_eq!(node_b.list_balances().total_lightning_balance_sats, 200_000); } -#[test] -fn lsps2_client_service_integration() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn lsps2_client_service_integration() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -1579,16 +1600,17 @@ fn lsps2_client_service_integration() { &electrsd.client, vec![service_addr, client_addr, payer_addr], Amount::from_sat(premine_amount_sat), - ); + ) + .await; service_node.sync_wallets().unwrap(); client_node.sync_wallets().unwrap(); payer_node.sync_wallets().unwrap(); // Open a channel payer -> service that will allow paying the JIT invoice println!("Opening channel payer_node -> service_node!"); - open_channel(&payer_node, &service_node, 5_000_000, false, &electrsd); + open_channel(&payer_node, &service_node, 5_000_000, false, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; service_node.sync_wallets().unwrap(); payer_node.sync_wallets().unwrap(); expect_channel_ready_event!(payer_node, service_node.node_id()); @@ -1743,8 +1765,8 @@ fn lsps2_client_service_integration() { assert_eq!(client_node.payment(&payment_id).unwrap().status, PaymentStatus::Failed); } -#[test] -fn facade_logging() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn facade_logging() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1761,8 +1783,8 @@ fn facade_logging() { } } -#[test] -fn spontaneous_send_with_custom_preimage() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn spontaneous_send_with_custom_preimage() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -1774,11 +1796,12 @@ fn spontaneous_send_with_custom_preimage() { &electrsd.client, vec![address_a], Amount::from_sat(premine_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 500_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 500_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); expect_channel_ready_event!(node_a, node_b.node_id()); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index bdd876003..93f167dae 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -13,8 +13,8 @@ use std::collections::HashMap; use ldk_node::Builder; -#[test] -fn channel_full_cycle_with_vss_store() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); println!("== Node A =="); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -52,5 +52,6 @@ fn channel_full_cycle_with_vss_store() { false, true, false, - ); + ) + .await; } diff --git a/tests/reorg_test.rs b/tests/reorg_test.rs index 03ace908f..491a37fd4 100644 --- a/tests/reorg_test.rs +++ b/tests/reorg_test.rs @@ -17,179 +17,187 @@ proptest! { #![proptest_config(proptest::test_runner::Config::with_cases(5))] #[test] fn reorg_test(reorg_depth in 1..=6usize, force_close in prop::bool::ANY) { - let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - - let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); - let chain_source_electrsd = TestChainSource::Electrum(&electrsd); - let chain_source_esplora = TestChainSource::Esplora(&electrsd); - - macro_rules! config_node { - ($chain_source: expr, $anchor_channels: expr) => {{ - let config_a = random_config($anchor_channels); - let node = setup_node(&$chain_source, config_a, None); - node - }}; - } - let anchor_channels = true; - let nodes = vec![ - config_node!(chain_source_electrsd, anchor_channels), - config_node!(chain_source_bitcoind, anchor_channels), - config_node!(chain_source_esplora, anchor_channels), - ]; - - let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); - macro_rules! reorg { - ($reorg_depth: expr) => {{ - invalidate_blocks(bitcoind, $reorg_depth); - generate_blocks_and_wait(bitcoind, electrs, $reorg_depth); - }}; - } - - let amount_sat = 2_100_000; - let addr_nodes = - nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect::>(); - premine_and_distribute_funds(bitcoind, electrs, addr_nodes, Amount::from_sat(amount_sat)); - - macro_rules! sync_wallets { - () => { - nodes.iter().for_each(|node| node.sync_wallets().unwrap()) - }; - } - sync_wallets!(); - nodes.iter().for_each(|node| { - assert_eq!(node.list_balances().spendable_onchain_balance_sats, amount_sat); - assert_eq!(node.list_balances().total_onchain_balance_sats, amount_sat); - }); - - - let mut nodes_funding_tx = HashMap::new(); - let funding_amount_sat = 2_000_000; - for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { - let funding_txo = open_channel(node, next_node, funding_amount_sat, true, &electrsd); - nodes_funding_tx.insert(node.node_id(), funding_txo); - } - - generate_blocks_and_wait(bitcoind, electrs, 6); - sync_wallets!(); - - reorg!(reorg_depth); - sync_wallets!(); - - macro_rules! collect_channel_ready_events { - ($node:expr, $expected:expr) => {{ - let mut user_channels = HashMap::new(); - for _ in 0..$expected { - match $node.wait_next_event() { - Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { - $node.event_handled().unwrap(); - user_channels.insert(counterparty_node_id, user_channel_id); - }, - other => panic!("Unexpected event: {:?}", other), + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + + let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); + let chain_source_electrsd = TestChainSource::Electrum(&electrsd); + let chain_source_esplora = TestChainSource::Esplora(&electrsd); + + macro_rules! config_node { + ($chain_source: expr, $anchor_channels: expr) => {{ + let config_a = random_config($anchor_channels); + let node = setup_node(&$chain_source, config_a, None); + node + }}; + } + let anchor_channels = true; + let nodes = vec![ + config_node!(chain_source_electrsd, anchor_channels), + config_node!(chain_source_bitcoind, anchor_channels), + config_node!(chain_source_esplora, anchor_channels), + ]; + + let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); + macro_rules! reorg { + ($reorg_depth: expr) => {{ + invalidate_blocks(bitcoind, $reorg_depth); + generate_blocks_and_wait(bitcoind, electrs, $reorg_depth).await; + }}; + } + + let amount_sat = 2_100_000; + let addr_nodes = + nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect::>(); + premine_and_distribute_funds(bitcoind, electrs, addr_nodes, Amount::from_sat(amount_sat)).await; + + macro_rules! sync_wallets { + () => { + for node in &nodes { + node.sync_wallets().unwrap(); } - } - user_channels - }}; - } + }; + } + sync_wallets!(); + nodes.iter().for_each(|node| { + assert_eq!(node.list_balances().spendable_onchain_balance_sats, amount_sat); + assert_eq!(node.list_balances().total_onchain_balance_sats, amount_sat); + }); - let mut node_channels_id = HashMap::new(); - for (i, node) in nodes.iter().enumerate() { - assert_eq!( - node - .list_payments_with_filter(|p| p.direction == PaymentDirection::Outbound - && matches!(p.kind, PaymentKind::Onchain { .. })) - .len(), - 1 - ); - let user_channels = collect_channel_ready_events!(node, 2); - let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); - let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); + let mut nodes_funding_tx = HashMap::new(); + let funding_amount_sat = 2_000_000; + for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { + let funding_txo = open_channel(node, next_node, funding_amount_sat, true, &electrsd).await; + nodes_funding_tx.insert(node.node_id(), funding_txo); + } - assert!(user_channels.get(&Some(next_node.node_id())) != None); - assert!(user_channels.get(&Some(prev_node.node_id())) != None); + generate_blocks_and_wait(bitcoind, electrs, 6).await; + sync_wallets!(); + + reorg!(reorg_depth); + sync_wallets!(); + + macro_rules! collect_channel_ready_events { + ($node:expr, $expected:expr) => {{ + let mut user_channels = HashMap::new(); + for _ in 0..$expected { + match $node.next_event_async().await { + Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { + $node.event_handled().unwrap(); + user_channels.insert(counterparty_node_id, user_channel_id); + }, + other => panic!("Unexpected event: {:?}", other), + } + } + user_channels + }}; + } - let user_channel_id = - user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); - node_channels_id.insert(node.node_id(), *user_channel_id); - } + let mut node_channels_id = HashMap::new(); + for (i, node) in nodes.iter().enumerate() { + assert_eq!( + node + .list_payments_with_filter(|p| p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. })) + .len(), + 1 + ); + + let user_channels = collect_channel_ready_events!(node, 2); + let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); + let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); + + assert!(user_channels.get(&Some(next_node.node_id())) != None); + assert!(user_channels.get(&Some(prev_node.node_id())) != None); + + let user_channel_id = + user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); + node_channels_id.insert(node.node_id(), *user_channel_id); + } - for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { - let user_channel_id = node_channels_id.get(&node.node_id()).expect("user channel id not exist"); - let funding = nodes_funding_tx.get(&node.node_id()).expect("Funding tx not exist"); + for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { + let user_channel_id = node_channels_id.get(&node.node_id()).expect("user channel id not exist"); + let funding = nodes_funding_tx.get(&node.node_id()).expect("Funding tx not exist"); - if force_close { - node.force_close_channel(&user_channel_id, next_node.node_id(), None).unwrap(); - } else { - node.close_channel(&user_channel_id, next_node.node_id()).unwrap(); - } + if force_close { + node.force_close_channel(&user_channel_id, next_node.node_id(), None).unwrap(); + } else { + node.close_channel(&user_channel_id, next_node.node_id()).unwrap(); + } - expect_event!(node, ChannelClosed); - expect_event!(next_node, ChannelClosed); + expect_event!(node, ChannelClosed); + expect_event!(next_node, ChannelClosed); - wait_for_outpoint_spend(electrs, *funding); - } + wait_for_outpoint_spend(electrs, *funding).await; + } - reorg!(reorg_depth); - sync_wallets!(); + reorg!(reorg_depth); + sync_wallets!(); - generate_blocks_and_wait(bitcoind, electrs, 1); - sync_wallets!(); + generate_blocks_and_wait(bitcoind, electrs, 1).await; + sync_wallets!(); - if force_close { - nodes.iter().for_each(|node| { - node.sync_wallets().unwrap(); - // If there is no more balance, there is nothing to process here. - if node.list_balances().lightning_balances.len() < 1 { - return; - } - match node.list_balances().lightning_balances[0] { - LightningBalance::ClaimableAwaitingConfirmations { - confirmation_height, - .. - } => { - let cur_height = node.status().current_best_block.height; - let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(bitcoind, electrs, blocks_to_go as usize); - node.sync_wallets().unwrap(); - }, - _ => panic!("Unexpected balance state for node_hub!"), - } + if force_close { + for node in &nodes { + node.sync_wallets().unwrap(); + // If there is no more balance, there is nothing to process here. + if node.list_balances().lightning_balances.len() < 1 { + return; + } + match node.list_balances().lightning_balances[0] { + LightningBalance::ClaimableAwaitingConfirmations { + confirmation_height, + .. + } => { + let cur_height = node.status().current_best_block.height; + let blocks_to_go = confirmation_height - cur_height; + generate_blocks_and_wait(bitcoind, electrs, blocks_to_go as usize).await; + node.sync_wallets().unwrap(); + }, + _ => panic!("Unexpected balance state for node_hub!"), + } - assert!(node.list_balances().lightning_balances.len() < 2); - assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); - match node.list_balances().pending_balances_from_channel_closures[0] { - PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, - _ => panic!("Unexpected balance state!"), - } + assert!(node.list_balances().lightning_balances.len() < 2); + assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); + match node.list_balances().pending_balances_from_channel_closures[0] { + PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, + _ => panic!("Unexpected balance state!"), + } - generate_blocks_and_wait(&bitcoind, electrs, 1); - node.sync_wallets().unwrap(); - assert!(node.list_balances().lightning_balances.len() < 2); - assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); - match node.list_balances().pending_balances_from_channel_closures[0] { - PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, - _ => panic!("Unexpected balance state!"), + generate_blocks_and_wait(&bitcoind, electrs, 1).await; + node.sync_wallets().unwrap(); + assert!(node.list_balances().lightning_balances.len() < 2); + assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); + match node.list_balances().pending_balances_from_channel_closures[0] { + PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, + _ => panic!("Unexpected balance state!"), + } } - }); - } + } - generate_blocks_and_wait(bitcoind, electrs, 6); - sync_wallets!(); + generate_blocks_and_wait(bitcoind, electrs, 6).await; + sync_wallets!(); - reorg!(reorg_depth); - sync_wallets!(); + reorg!(reorg_depth); + sync_wallets!(); - let fee_sat = 7000; - // Check balance after close channel - nodes.iter().for_each(|node| { - assert!(node.list_balances().spendable_onchain_balance_sats > amount_sat - fee_sat); - assert!(node.list_balances().spendable_onchain_balance_sats < amount_sat); + let fee_sat = 7000; + // Check balance after close channel + nodes.iter().for_each(|node| { + assert!(node.list_balances().spendable_onchain_balance_sats > amount_sat - fee_sat); + assert!(node.list_balances().spendable_onchain_balance_sats < amount_sat); - assert_eq!(node.list_balances().total_anchor_channels_reserve_sats, 0); - assert!(node.list_balances().lightning_balances.is_empty()); + assert_eq!(node.list_balances().total_anchor_channels_reserve_sats, 0); + assert!(node.list_balances().lightning_balances.is_empty()); - assert_eq!(node.next_event(), None); - }); + assert_eq!(node.next_event(), None); + }); + }) } }