diff --git a/src/job/mod.rs b/src/job/mod.rs index 84395d86..06007f41 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -278,7 +278,16 @@ async fn sync_all_wallets( .expect("couldn't build JobExecutor") .execute(|_| async move { for (account_id, wallet_id) in wallets.all_ids().await? { - let _ = spawn_sync_wallet(&pool, SyncWalletData::new(account_id, wallet_id)).await; + if let Err(err) = + spawn_sync_wallet(&pool, SyncWalletData::new(account_id, wallet_id)).await + { + tracing::error!( + account_id = %account_id, + wallet_id = %wallet_id, + error = %err, + "failed to spawn sync_wallet" + ); + } } Ok::<(), JobError>(()) }) diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index ec14e931..19378e3e 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -19,6 +19,10 @@ use crate::{ wallet::*, }; use std::collections::HashMap; +use std::time::Duration; + +const BDK_SYNC_WARN_AFTER: Duration = Duration::from_secs(20 * 60); +const BDK_SYNC_HARD_TIMEOUT: Duration = Duration::from_secs(30 * 60); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncWalletData { @@ -105,7 +109,56 @@ pub async fn execute( let (blockchain, current_height) = init_electrum(&deps.blockchain_cfg.electrum_url).await?; span.record("current_height", current_height); let latest_change_settle_height = wallet.config.latest_change_settle_height(current_height); - keychain_wallet.sync(blockchain).await?; + let sync_start = tokio::time::Instant::now(); + let sync_fut = keychain_wallet.sync(blockchain); + tokio::pin!(sync_fut); + + let warn_timer = tokio::time::sleep(BDK_SYNC_WARN_AFTER); + tokio::pin!(warn_timer); + + let hard_timer = tokio::time::sleep(BDK_SYNC_HARD_TIMEOUT); + tokio::pin!(hard_timer); + + let mut warned = false; + let mut hard_threshold_reached = false; + + loop { + tokio::select! { + res = &mut sync_fut => { + if hard_threshold_reached { + tracing::info!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + hard_timeout_secs = BDK_SYNC_HARD_TIMEOUT.as_secs(), + "bdk sync finished after hard-timeout threshold" + ); + } + res?; + break; + } + _ = &mut warn_timer, if !warned => { + warned = true; + tracing::warn!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + warn_after_secs = BDK_SYNC_WARN_AFTER.as_secs(), + "bdk sync exceeded warning threshold" + ); + } + _ = &mut hard_timer, if !hard_threshold_reached => { + hard_threshold_reached = true; + tracing::error!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + hard_timeout_secs = BDK_SYNC_HARD_TIMEOUT.as_secs(), + "bdk sync exceeded hard-timeout threshold; waiting for sync completion" + ); + } + } + } let bdk_txs = Transactions::new(keychain_id, pool.clone()); let bdk_utxos = BdkUtxos::new(keychain_id, pool.clone()); let mut txs_to_skip = Vec::new();