@@ -341,7 +341,7 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
341341
342342 // Do business logic only once per unique transaction_id
343343 let is_first_occurrence = processed_ids. insert ( tx. transaction_id ( ) ) ;
344-
344+
345345 if is_first_occurrence {
346346 match ( tx. transaction_id ( ) , confirmed_ids. get ( tx. transaction_id ( ) ) ) {
347347 // if the transaction id is noop, we don't do anything but still clean up
@@ -389,7 +389,10 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
389389 & mut tx_context,
390390 self . webhook_queue . clone ( ) ,
391391 ) {
392- tracing:: error!( "Failed to queue webhook for confirmed transaction: {}" , e) ;
392+ tracing:: error!(
393+ "Failed to queue webhook for confirmed transaction: {}" ,
394+ e
395+ ) ;
393396 }
394397 }
395398 }
@@ -425,7 +428,10 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
425428 & mut tx_context,
426429 self . webhook_queue . clone ( ) ,
427430 ) {
428- tracing:: error!( "Failed to queue webhook for replaced transaction: {}" , e) ;
431+ tracing:: error!(
432+ "Failed to queue webhook for replaced transaction: {}" ,
433+ e
434+ ) ;
429435 }
430436 }
431437
@@ -562,7 +568,7 @@ fn detect_violations<'a>(
562568}
563569
564570impl SafeRedisTransaction for CleanAndGetRecycledNonces < ' _ > {
565- type ValidationData = ( u64 , Vec < u64 > ) ;
571+ type ValidationData = ( u64 , Vec < u64 > , Option < u64 > ) ;
566572 type OperationResult = Vec < u64 > ;
567573
568574 fn name ( & self ) -> & str {
@@ -618,25 +624,37 @@ impl SafeRedisTransaction for CleanAndGetRecycledNonces<'_> {
618624 . filter ( |nonce| * nonce < highest_submitted_nonce)
619625 . collect ( ) ;
620626
621- Ok ( ( highest_submitted_nonce, recycled_nonces) )
627+ let current_optimistic_tx_count: Option < u64 > = conn
628+ . get ( self . keys . optimistic_transaction_count_key_name ( ) )
629+ . await ?;
630+
631+ Ok ( (
632+ highest_submitted_nonce,
633+ recycled_nonces,
634+ current_optimistic_tx_count,
635+ ) )
622636 }
623637
624638 fn operation (
625639 & self ,
626640 pipeline : & mut Pipeline ,
627- ( highest_submitted_nonce, recycled_nonces) : Self :: ValidationData ,
641+ ( highest_submitted_nonce, recycled_nonces, current_optimistic_tx_count ) : Self :: ValidationData ,
628642 ) -> Self :: OperationResult {
629643 pipeline. zrembyscore (
630644 self . keys . recycled_nonces_zset_name ( ) ,
631645 highest_submitted_nonce,
632646 "+inf" ,
633647 ) ;
634648
635- pipeline. set (
636- self . keys . optimistic_transaction_count_key_name ( ) ,
637- highest_submitted_nonce + 1 ,
638- ) ;
639-
649+ if let Some ( current_optimistic_tx_count) = current_optimistic_tx_count {
650+ // if the current optimistic tx count is floating too high, we need to bring it down
651+ if highest_submitted_nonce + 1 < current_optimistic_tx_count {
652+ pipeline. set (
653+ self . keys . optimistic_transaction_count_key_name ( ) ,
654+ highest_submitted_nonce + 1 ,
655+ ) ;
656+ }
657+ }
640658 recycled_nonces
641659 }
642660}
0 commit comments