Skip to content

Commit 2b42d4f

Browse files
committed
Switch to a separate executor for RPC calls to avoid tokio hangs
See the comment in the commit for more info on why we have to do this.
1 parent 04aaa24 commit 2b42d4f

File tree

1 file changed

+78
-35
lines changed

1 file changed

+78
-35
lines changed

src/bitcoind_client.rs

Lines changed: 78 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
2525
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
2626
use serde_json;
2727
use std::collections::HashMap;
28+
use std::future::Future;
2829
use std::str::FromStr;
2930
use std::sync::atomic::{AtomicU32, Ordering};
3031
use std::sync::Arc;
3132
use std::time::Duration;
3233

34+
use tokio::runtime::{self, Runtime};
35+
3336
pub struct BitcoindClient {
3437
pub(crate) bitcoind_rpc_client: Arc<RpcClient>,
3538
network: Network,
@@ -38,7 +41,8 @@ pub struct BitcoindClient {
3841
rpc_user: String,
3942
rpc_password: String,
4043
fees: Arc<HashMap<ConfirmationTarget, AtomicU32>>,
41-
handle: tokio::runtime::Handle,
44+
main_runtime_handle: runtime::Handle,
45+
inner_runtime: Runtime,
4246
logger: Arc<FilesystemLogger>,
4347
}
4448

@@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
6670
impl BitcoindClient {
6771
pub(crate) async fn new(
6872
host: String, port: u16, rpc_user: String, rpc_password: String, network: Network,
69-
handle: tokio::runtime::Handle, logger: Arc<FilesystemLogger>,
73+
handle: runtime::Handle, logger: Arc<FilesystemLogger>,
7074
) -> std::io::Result<Self> {
7175
let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port);
7276
let rpc_credentials =
@@ -95,6 +99,10 @@ impl BitcoindClient {
9599
fees.insert(ConfirmationTarget::ChannelCloseMinimum, AtomicU32::new(MIN_FEERATE));
96100
fees.insert(ConfirmationTarget::OutputSpendingFee, AtomicU32::new(MIN_FEERATE));
97101

102+
let mut builder = runtime::Builder::new_multi_thread();
103+
let inner_runtime =
104+
builder.enable_all().worker_threads(1).thread_name("rpc-worker").build().unwrap();
105+
98106
let client = Self {
99107
bitcoind_rpc_client: Arc::new(bitcoind_rpc_client),
100108
host,
@@ -103,7 +111,8 @@ impl BitcoindClient {
103111
rpc_password,
104112
network,
105113
fees: Arc::new(fees),
106-
handle: handle.clone(),
114+
main_runtime_handle: handle.clone(),
115+
inner_runtime,
107116
logger,
108117
};
109118
BitcoindClient::poll_for_fee_estimates(
@@ -226,10 +235,42 @@ impl BitcoindClient {
226235
});
227236
}
228237

238+
fn run_future_in_blocking_context<F: Future + Send + 'static>(&self, future: F) -> F::Output
239+
where
240+
F::Output: Send + 'static,
241+
{
242+
// Tokio deliberately makes it nigh impossible to block on a future in a sync context that
243+
// is running in an async task (which makes it really hard to interact with sync code that
244+
// has callbacks in an async project).
245+
//
246+
// Reading the docs, it *seems* like
247+
// `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
248+
// trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
249+
// the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
250+
// into a `block_in_place` call, and the inner future requires I/O (which of course it
251+
// does, its a future!), the whole thing will come to a grinding halt as no other thread is
252+
// allowed to poll I/O until the blocked one finishes.
253+
//
254+
// This is, of course, nuts, and an almost trivial performance penalty of occasional
255+
// additional wakeups would solve this, but tokio refuses to do so because any performance
256+
// penalty at all would be too much (tokio issue #4730).
257+
//
258+
// Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
259+
// run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
260+
// blocking too many threads on the main runtime). We want to block on that `future` being
261+
// run on the other runtime's threads, but tokio only provides `block_on` to do so, which
262+
// runs the `future` itself on the current thread, panicing if this thread is already a
263+
// part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
264+
// have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
265+
// `JoinHandle` on the main runtime.
266+
tokio::task::block_in_place(move || {
267+
self.main_runtime_handle.block_on(self.inner_runtime.spawn(future)).unwrap()
268+
})
269+
}
270+
229271
pub fn get_new_rpc_client(&self) -> RpcClient {
230272
let http_endpoint = HttpEndpoint::for_host(self.host.clone()).with_port(self.port);
231-
let rpc_credentials =
232-
base64::encode(format!("{}:{}", self.rpc_user.clone(), self.rpc_password.clone()));
273+
let rpc_credentials = base64::encode(format!("{}:{}", self.rpc_user, self.rpc_password));
233274
RpcClient::new(&rpc_credentials, http_endpoint)
234275
}
235276

@@ -273,22 +314,28 @@ impl BitcoindClient {
273314
.unwrap();
274315
}
275316

276-
pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx {
317+
pub fn sign_raw_transaction_with_wallet(
318+
&self, tx_hex: String,
319+
)-> impl Future<Output = SignedTx> {
277320
let tx_hex_json = serde_json::json!(tx_hex);
278-
self.bitcoind_rpc_client
279-
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
280-
.await
281-
.unwrap()
321+
let rpc_client = self.get_new_rpc_client();
322+
async move {
323+
rpc_client
324+
.call_method("signrawtransactionwithwallet", &vec![tx_hex_json])
325+
.await
326+
.unwrap()
327+
}
282328
}
283329

284-
pub async fn get_new_address(&self) -> Address {
330+
pub fn get_new_address(&self) -> impl Future<Output = Address> {
285331
let addr_args = vec![serde_json::json!("LDK output address")];
286-
let addr = self
287-
.bitcoind_rpc_client
288-
.call_method::<NewAddress>("getnewaddress", &addr_args)
289-
.await
290-
.unwrap();
291-
Address::from_str(addr.0.as_str()).unwrap().require_network(self.network).unwrap()
332+
let network = self.network;
333+
let rpc_client = self.get_new_rpc_client();
334+
async move {
335+
let addr =
336+
rpc_client.call_method::<NewAddress>("getnewaddress", &addr_args).await.unwrap();
337+
Address::from_str(addr.0.as_str()).unwrap().require_network(network).unwrap()
338+
}
292339
}
293340

294341
pub async fn get_blockchain_info(&self) -> BlockchainInfo {
@@ -298,11 +345,11 @@ impl BitcoindClient {
298345
.unwrap()
299346
}
300347

301-
pub async fn list_unspent(&self) -> ListUnspentResponse {
302-
self.bitcoind_rpc_client
303-
.call_method::<ListUnspentResponse>("listunspent", &vec![])
304-
.await
305-
.unwrap()
348+
pub fn list_unspent(&self) -> impl Future<Output = ListUnspentResponse> {
349+
let rpc_client = self.get_new_rpc_client();
350+
async move {
351+
rpc_client.call_method::<ListUnspentResponse>("listunspent", &vec![]).await.unwrap()
352+
}
306353
}
307354
}
308355

@@ -324,7 +371,7 @@ impl BroadcasterInterface for BitcoindClient {
324371
let txn = txs.iter().map(|tx| encode::serialize_hex(tx)).collect::<Vec<_>>();
325372
let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client);
326373
let logger = Arc::clone(&self.logger);
327-
self.handle.spawn(async move {
374+
self.main_runtime_handle.spawn(async move {
328375
let res = if txn.len() == 1 {
329376
let tx_json = serde_json::json!(txn[0]);
330377
bitcoind_rpc_client
@@ -355,17 +402,15 @@ impl BroadcasterInterface for BitcoindClient {
355402

356403
impl ChangeDestinationSource for BitcoindClient {
357404
fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
358-
tokio::task::block_in_place(move || {
359-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
360-
})
405+
let future = self.get_new_address();
406+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
361407
}
362408
}
363409

364410
impl WalletSource for BitcoindClient {
365411
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()> {
366-
let utxos = tokio::task::block_in_place(move || {
367-
self.handle.block_on(async move { self.list_unspent().await }).0
368-
});
412+
let future = self.list_unspent();
413+
let utxos = self.run_future_in_blocking_context(async move { future.await.0 });
369414
Ok(utxos
370415
.into_iter()
371416
.filter_map(|utxo| {
@@ -398,18 +443,16 @@ impl WalletSource for BitcoindClient {
398443
}
399444

400445
fn get_change_script(&self) -> Result<ScriptBuf, ()> {
401-
tokio::task::block_in_place(move || {
402-
Ok(self.handle.block_on(async move { self.get_new_address().await.script_pubkey() }))
403-
})
446+
let future = self.get_new_address();
447+
Ok(self.run_future_in_blocking_context(async move { future.await.script_pubkey() }))
404448
}
405449

406450
fn sign_psbt(&self, tx: Psbt) -> Result<Transaction, ()> {
407451
let mut tx_bytes = Vec::new();
408452
let _ = tx.unsigned_tx.consensus_encode(&mut tx_bytes).map_err(|_| ());
409453
let tx_hex = hex_utils::hex_str(&tx_bytes);
410-
let signed_tx = tokio::task::block_in_place(move || {
411-
self.handle.block_on(async move { self.sign_raw_transaction_with_wallet(tx_hex).await })
412-
});
454+
let future = self.sign_raw_transaction_with_wallet(tx_hex);
455+
let signed_tx = self.run_future_in_blocking_context(async move { future.await });
413456
let signed_tx_bytes = hex_utils::to_vec(&signed_tx.hex).ok_or(())?;
414457
Transaction::consensus_decode(&mut signed_tx_bytes.as_slice()).map_err(|_| ())
415458
}

0 commit comments

Comments
 (0)