Skip to content

Commit 3c8240e

Browse files
authored
Implement transaction counts with flashblocks awareness in EoaExecutorWorker and related modules (#46)
* Implement transaction counts with flashblocks awareness in EoaExecutorWorker and related modules - Introduced a new `TransactionCounts` struct to encapsulate both latest and preconfirmed transaction counts, enhancing support for flashblocks. - Updated the `clean_submitted_transactions` method to utilize the new transaction counts for better transaction management. - Refactored the `confirm_flow` method in `EoaExecutorWorker` to fetch and handle transaction counts with flashblocks support, improving accuracy in nonce management and transaction processing. - Adjusted logic in `CleanSubmittedTransactions` to ensure proper handling of transactions based on the latest and preconfirmed counts, addressing potential issues with flashblocks propagation delays. These changes enhance the robustness and responsiveness of transaction handling in the EOA executor. * Refactor transaction handling in CleanSubmittedTransactions and EoaExecutorWorker - Updated the `CleanSubmittedTransactions` implementation to directly use the `to_redis_string_with_nonce` method, improving clarity and efficiency. - Introduced a new variable for the legacy Redis string to ensure proper removal of legacy formatted keys. - Modified the `clean_submitted_transactions` method in `EoaExecutorWorker` to utilize `saturating_sub` for transaction counts, enhancing robustness against underflow. These changes streamline transaction management and improve error handling in the EOA executor. * Refactor Flashblocks transaction count handling in RpcWithBlock - Improved the implementation of the `FlashblocksTransactionCount` trait to utilize a helper function for checking flashblocks chain IDs, enhancing code clarity and maintainability. - Streamlined the logic for fetching transaction counts based on whether the chain supports flashblocks, ensuring efficient parallel fetching for supported chains and single fetching for others. These changes enhance the robustness and readability of transaction count handling in the executor.
1 parent ee80eb0 commit 3c8240e

File tree

4 files changed

+185
-72
lines changed

4 files changed

+185
-72
lines changed

executors/src/eoa/store/atomic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,12 +593,12 @@ impl AtomicEoaExecutorStore {
593593
pub async fn clean_submitted_transactions(
594594
&self,
595595
confirmed_transactions: &[ConfirmedTransaction],
596-
last_confirmed_nonce: u64,
596+
transaction_counts: crate::TransactionCounts,
597597
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
598598
) -> Result<CleanupReport, TransactionStoreError> {
599599
self.execute_with_watch_and_retry(&CleanSubmittedTransactions {
600600
confirmed_transactions,
601-
last_confirmed_nonce,
601+
transaction_counts,
602602
keys: &self.keys,
603603
webhook_queue,
604604
eoa_metrics: &self.eoa_metrics,

executors/src/eoa/store/submitted.rs

Lines changed: 102 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
88
use twmq::redis::{AsyncCommands, Pipeline, aio::ConnectionManager};
99

1010
use crate::{
11+
TransactionCounts,
1112
eoa::{
1213
EoaExecutorStore, EoaTransactionRequest,
1314
events::EoaExecutorEvent,
@@ -16,7 +17,7 @@ use crate::{
1617
TransactionStoreError, atomic::SafeRedisTransaction,
1718
},
1819
},
19-
metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics},
20+
metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms},
2021
webhook::{WebhookJobHandler, queue_webhook_envelopes},
2122
};
2223

@@ -194,13 +195,39 @@ impl SubmittedTransactionDehydrated {
194195
}
195196

196197
pub struct CleanSubmittedTransactions<'a> {
197-
pub last_confirmed_nonce: u64,
198+
pub transaction_counts: TransactionCounts,
198199
pub confirmed_transactions: &'a [ConfirmedTransaction],
199200
pub keys: &'a EoaExecutorStoreKeys,
200201
pub webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
201202
pub eoa_metrics: &'a EoaMetrics,
202203
}
203204

205+
impl<'a> CleanSubmittedTransactions<'a> {
206+
/// Helper method to remove a transaction from Redis submitted state
207+
fn remove_transaction_from_redis_submitted_zset(
208+
&self,
209+
pipeline: &mut Pipeline,
210+
tx: &SubmittedTransactionHydrated,
211+
) {
212+
let (submitted_tx_redis_string, _nonce) = tx.to_redis_string_with_nonce();
213+
214+
pipeline.zrem(
215+
self.keys.submitted_transactions_zset_name(),
216+
&submitted_tx_redis_string,
217+
);
218+
219+
let (submitted_tx_legacy_redis_string, _nonce) = tx.to_legacy_redis_string_with_nonce();
220+
221+
// Also remove the legacy formatted key if present
222+
pipeline.zrem(
223+
self.keys.submitted_transactions_zset_name(),
224+
&submitted_tx_legacy_redis_string,
225+
);
226+
227+
pipeline.del(self.keys.transaction_hash_to_id_key_name(tx.hash()));
228+
}
229+
}
230+
204231
pub struct CleanAndGetRecycledNonces<'a> {
205232
pub keys: &'a EoaExecutorStoreKeys,
206233
}
@@ -254,11 +281,17 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
254281
conn: &mut ConnectionManager,
255282
store: &EoaExecutorStore,
256283
) -> Result<Self::ValidationData, TransactionStoreError> {
284+
// Fetch transactions up to the latest confirmed nonce for replacements
285+
// This includes both transactions that should be confirmed and those that might be replaced
286+
let max_nonce = std::cmp::max(
287+
self.transaction_counts.preconfirmed,
288+
self.transaction_counts.latest,
289+
);
257290
let submitted_txs: Vec<SubmittedTransactionStringWithNonce> = conn
258291
.zrangebyscore_withscores(
259292
self.keys.submitted_transactions_zset_name(),
260293
0,
261-
self.last_confirmed_nonce as isize,
294+
max_nonce as isize,
262295
)
263296
.await?;
264297

@@ -296,31 +329,31 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
296329
let mut replaced_transactions = Vec::with_capacity(submitted_txs.len());
297330

298331
for tx in &submitted_txs {
299-
// Clean up this hash from Redis (happens for ALL hashes)
300-
let (submitted_tx_redis_string, _nonce) = tx.clone().to_redis_string_with_nonce();
332+
let tx_nonce = tx.nonce();
301333

302-
pipeline.zrem(
303-
self.keys.submitted_transactions_zset_name(),
304-
&submitted_tx_redis_string,
305-
);
334+
// Only process transactions that are within range for either confirmation or replacement
335+
let should_process = tx_nonce <= self.transaction_counts.preconfirmed
336+
|| tx_nonce <= self.transaction_counts.latest;
306337

307-
// Also remove the legacy formmated key if present
308-
pipeline.zrem(
309-
self.keys.submitted_transactions_zset_name(),
310-
tx.to_legacy_redis_string_with_nonce(),
311-
);
312-
313-
pipeline.del(self.keys.transaction_hash_to_id_key_name(tx.hash()));
338+
if !should_process {
339+
continue;
340+
}
314341

315342
// Process each unique transaction_id once
316343
if processed_ids.insert(tx.transaction_id()) {
317344
match (tx.transaction_id(), confirmed_ids.get(tx.transaction_id())) {
318-
// if the transaction id is noop, we don't do anything
319-
(NO_OP_TRANSACTION_ID, _) => report.noop_count += 1,
345+
// if the transaction id is noop, we don't do anything but still clean up
346+
(NO_OP_TRANSACTION_ID, _) => {
347+
// Clean up noop transaction from Redis
348+
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
349+
report.noop_count += 1;
350+
}
320351

321352
// in case of a valid ID, we check if it's in the confirmed transactions
322353
// if it is confirmed, we succeed it and queue success jobs
323354
(id, Some(confirmed_tx)) => {
355+
// Clean up confirmed transaction from Redis
356+
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
324357
let data_key_name = self.keys.transaction_data_key_name(id);
325358
pipeline.hset(&data_key_name, "status", "confirmed");
326359
pipeline.hset(&data_key_name, "completed_at", now);
@@ -333,15 +366,13 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
333366
if let SubmittedTransactionHydrated::Real(tx) = tx {
334367
// Record metrics: transaction queued to mined for confirmed transactions
335368
let confirmed_timestamp = current_timestamp_ms();
336-
let queued_to_mined_duration = calculate_duration_seconds(
337-
tx.queued_at,
338-
confirmed_timestamp
339-
);
369+
let queued_to_mined_duration =
370+
calculate_duration_seconds(tx.queued_at, confirmed_timestamp);
340371
// Record metrics using the clean EoaMetrics abstraction
341372
self.eoa_metrics.record_transaction_confirmed(
342373
self.keys.eoa,
343374
self.keys.chain_id,
344-
queued_to_mined_duration
375+
queued_to_mined_duration,
345376
);
346377
if !tx.user_request.webhook_options.is_empty() {
347378
let event = EoaExecutorEvent {
@@ -372,32 +403,55 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
372403
// if the ID is not in the confirmed transactions, we queue it for pending
373404
_ => {
374405
if let SubmittedTransactionHydrated::Real(tx) = tx {
375-
// zadd_multiple expects (score, member)
376-
replaced_transactions.push((tx.queued_at, tx.transaction_id.clone()));
377-
378-
if !tx.user_request.webhook_options.is_empty() {
379-
let event = EoaExecutorEvent {
380-
transaction_id: tx.transaction_id.clone(),
381-
address: tx.user_request.from,
382-
};
383-
384-
let success_envelope =
385-
event.transaction_replaced_envelope(tx.data.clone());
386-
387-
let mut tx_context = self
388-
.webhook_queue
389-
.transaction_context_from_pipeline(pipeline);
390-
if let Err(e) = queue_webhook_envelopes(
391-
success_envelope,
392-
tx.user_request.webhook_options.clone(),
393-
&mut tx_context,
394-
self.webhook_queue.clone(),
395-
) {
396-
tracing::error!("Failed to queue webhook for fail: {}", e);
406+
// Only move to pending (replaced) if it's within the latest transaction count range
407+
// This prevents false replacements due to flashblocks propagation delays
408+
if tx_nonce <= self.transaction_counts.latest {
409+
// Clean up replaced transaction from Redis
410+
self.remove_transaction_from_redis_submitted_zset(
411+
pipeline,
412+
&SubmittedTransactionHydrated::Real(tx.clone()),
413+
);
414+
415+
// zadd_multiple expects (score, member)
416+
replaced_transactions
417+
.push((tx.queued_at, tx.transaction_id.clone()));
418+
419+
if !tx.user_request.webhook_options.is_empty() {
420+
let event = EoaExecutorEvent {
421+
transaction_id: tx.transaction_id.clone(),
422+
address: tx.user_request.from,
423+
};
424+
425+
let success_envelope =
426+
event.transaction_replaced_envelope(tx.data.clone());
427+
428+
let mut tx_context = self
429+
.webhook_queue
430+
.transaction_context_from_pipeline(pipeline);
431+
if let Err(e) = queue_webhook_envelopes(
432+
success_envelope,
433+
tx.user_request.webhook_options.clone(),
434+
&mut tx_context,
435+
self.webhook_queue.clone(),
436+
) {
437+
tracing::error!("Failed to queue webhook for fail: {}", e);
438+
}
397439
}
398-
}
399440

400-
report.moved_to_pending += 1;
441+
report.moved_to_pending += 1;
442+
} else {
443+
// Transaction is beyond latest confirmed nonce, keep it in submitted state
444+
// This happens when preconfirmed > latest due to flashblocks
445+
tracing::debug!(
446+
transaction_id = tx.transaction_id,
447+
transaction_hash = tx.transaction_hash,
448+
tx_nonce = tx_nonce,
449+
latest_confirmed_nonce = self.transaction_counts.latest,
450+
preconfirmed_nonce = self.transaction_counts.preconfirmed,
451+
"Keeping transaction in submitted state due to potential flashblocks propagation delay"
452+
);
453+
// Don't increment any counter - transaction stays in submitted state
454+
}
401455
}
402456
}
403457
}
@@ -413,7 +467,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
413467

414468
pipeline.set(
415469
self.keys.last_transaction_count_key_name(),
416-
self.last_confirmed_nonce + 1,
470+
self.transaction_counts.latest + 1,
417471
);
418472

419473
// Finalize report stats

executors/src/eoa/worker/confirm.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError};
33
use serde::{Deserialize, Serialize};
44

55
use crate::{
6-
FlashblocksSupport,
6+
FlashblocksTransactionCount, TransactionCounts,
77
eoa::{
88
EoaExecutorStore,
99
store::{
@@ -32,39 +32,38 @@ impl<C: Chain> EoaExecutorWorker<C> {
3232
// ========== CONFIRM FLOW ==========
3333
#[tracing::instrument(skip_all, fields(worker_id = self.store.worker_id))]
3434
pub async fn confirm_flow(&self) -> Result<CleanupReport, EoaExecutorWorkerError> {
35-
// Get fresh on-chain transaction count
36-
let current_chain_transaction_count = self
35+
// Get fresh on-chain transaction counts (both latest and preconfirmed)
36+
let transaction_counts = self
3737
.chain
3838
.provider()
39-
.get_transaction_count(self.eoa)
40-
.with_flashblocks_support(self.chain.chain_id())
39+
.get_transaction_counts_with_flashblocks_support(self.eoa, self.chain.chain_id())
4140
.await
4241
.map_err(|e| {
4342
let engine_error = e.to_engine_error(&self.chain);
4443
EoaExecutorWorkerError::RpcError {
45-
message: format!("Failed to get transaction count: {engine_error}"),
44+
message: format!("Failed to get transaction counts: {engine_error}"),
4645
inner_error: engine_error,
4746
}
4847
})?;
4948

5049
if self.store.is_manual_reset_scheduled().await? {
5150
tracing::info!("Manual reset scheduled, executing now");
5251
self.store
53-
.reset_nonces(current_chain_transaction_count)
52+
.reset_nonces(transaction_counts.latest)
5453
.await?;
5554
}
5655

5756
let cached_transaction_count = match self.store.get_cached_transaction_count().await {
5857
Err(e) => match e {
5958
TransactionStoreError::NonceSyncRequired { .. } => {
6059
tracing::warn!(
61-
cached_transaction_count = current_chain_transaction_count,
60+
cached_transaction_count = transaction_counts.latest,
6261
"Nonce sync required, store was uninitialized, updating cached transaction count with current chain transaction count"
6362
);
6463
self.store
65-
.update_cached_transaction_count(current_chain_transaction_count)
64+
.update_cached_transaction_count(transaction_counts.latest)
6665
.await?;
67-
current_chain_transaction_count
66+
transaction_counts.latest
6867
}
6968
_ => return Err(e.into()),
7069
},
@@ -74,7 +73,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
7473
let submitted_count = self.store.get_submitted_transactions_count().await?;
7574

7675
// no nonce progress
77-
if current_chain_transaction_count <= cached_transaction_count {
76+
if transaction_counts.preconfirmed <= cached_transaction_count {
7877
let current_health = self.get_eoa_health().await?;
7978
let now = EoaExecutorStore::now();
8079
// No nonce progress - check if we should attempt gas bumping for stalled nonce
@@ -97,14 +96,14 @@ impl<C: Chain> EoaExecutorWorker<C> {
9796
tracing::info!(
9897
time_since_movement = time_since_movement,
9998
stall_timeout = NONCE_STALL_LIMIT_MS,
100-
current_chain_nonce = current_chain_transaction_count,
99+
current_chain_nonce = transaction_counts.preconfirmed,
101100
cached_transaction_count = cached_transaction_count,
102101
"Nonce has been stalled, attempting gas bump"
103102
);
104103

105104
// Attempt gas bump for the next expected nonce
106105
if let Err(e) = self
107-
.attempt_gas_bump_for_stalled_nonce(current_chain_transaction_count)
106+
.attempt_gas_bump_for_stalled_nonce(transaction_counts.preconfirmed)
108107
.await
109108
{
110109
tracing::warn!(
@@ -141,7 +140,8 @@ impl<C: Chain> EoaExecutorWorker<C> {
141140
}
142141

143142
tracing::info!(
144-
current_chain_nonce = current_chain_transaction_count,
143+
current_chain_nonce_latest = transaction_counts.latest,
144+
current_chain_nonce_preconfirmed = transaction_counts.preconfirmed,
145145
cached_transaction_count = cached_transaction_count,
146146
"Processing confirmations"
147147
);
@@ -151,7 +151,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
151151
let waiting_txs = self
152152
.store
153153
.get_submitted_transactions_below_chain_transaction_count(
154-
current_chain_transaction_count,
154+
transaction_counts.preconfirmed,
155155
)
156156
.await?;
157157

@@ -201,15 +201,18 @@ impl<C: Chain> EoaExecutorWorker<C> {
201201
.store
202202
.clean_submitted_transactions(
203203
&successes,
204-
current_chain_transaction_count - 1,
204+
TransactionCounts {
205+
latest: transaction_counts.latest.saturating_sub(1), // Use latest for replacement detection
206+
preconfirmed: transaction_counts.preconfirmed.saturating_sub(1), // Use preconfirmed for confirmation
207+
},
205208
self.webhook_queue.clone(),
206209
)
207210
.await?;
208211

209-
if current_chain_transaction_count != cached_transaction_count {
210-
if current_chain_transaction_count < cached_transaction_count {
212+
if transaction_counts.latest != cached_transaction_count {
213+
if transaction_counts.latest < cached_transaction_count {
211214
tracing::error!(
212-
current_chain_transaction_count = current_chain_transaction_count,
215+
current_chain_transaction_count = transaction_counts.latest,
213216
cached_transaction_count = cached_transaction_count,
214217
"Fresh fetched chain transaction count is lower than cached transaction count. \
215218
This indicates a re-org or RPC block lag. Engine will use the newest fetched transaction count from now (assuming re-org).\
@@ -219,7 +222,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
219222
}
220223

221224
self.store
222-
.update_cached_transaction_count(current_chain_transaction_count)
225+
.update_cached_transaction_count(transaction_counts.latest)
223226
.await?;
224227
}
225228

0 commit comments

Comments
 (0)