diff --git a/src/api.rs b/src/api.rs index a43c54c..34939a2 100644 --- a/src/api.rs +++ b/src/api.rs @@ -821,7 +821,7 @@ impl<'a> RequestBuilder<'a> { async fn send(self) -> Result { let request = self.request; - let mut retry_backoffs: Option = self + let mut retry_backoff: Option = self .retry_enabled .then(|| self.client.retry_builder.build()); @@ -873,13 +873,13 @@ impl<'a> RequestBuilder<'a> { }; if err.is_retryable() - && let Some(backoff) = retry_backoffs.as_mut().and_then(|b| b.next()) + && let Some(backoff) = retry_backoff.as_mut().and_then(|b| b.next()) { let backoff = retry_after.map_or(backoff, |ra| ra.max(backoff)); debug!( %err, ?backoff, - num_retries_remaining = retry_backoffs.as_ref().map(|b| b.remaining()).unwrap_or(0), + num_retries_remaining = retry_backoff.as_ref().map(|b| b.remaining()).unwrap_or(0), "retrying request" ); tokio::time::sleep(backoff).await; @@ -888,7 +888,7 @@ impl<'a> RequestBuilder<'a> { %err, is_retryable = err.is_retryable(), retry_enabled = self.retry_enabled, - retries_exhausted = retry_backoffs.as_ref().is_none_or(|b| b.is_exhausted()), + retries_exhausted = retry_backoff.as_ref().is_none_or(|b| b.is_exhausted()), "not retrying request" ); return Err(err); diff --git a/src/retry.rs b/src/retry.rs index 4ec25ee..31039b5 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -45,8 +45,8 @@ impl RetryBackoffBuilder { RetryBackoff { min_base_delay: self.min_base_delay, max_base_delay: self.max_base_delay, - max_attempts: self.max_retries, - cur_attempt: 0, + max_retries: self.max_retries, + cur_retry: 0, } } } @@ -54,25 +54,25 @@ impl RetryBackoffBuilder { pub struct RetryBackoff { min_base_delay: Duration, max_base_delay: Duration, - max_attempts: u32, - cur_attempt: u32, + max_retries: u32, + cur_retry: u32, } impl RetryBackoff { pub fn remaining(&self) -> u32 { - self.max_attempts.saturating_sub(self.cur_attempt) + self.max_retries.saturating_sub(self.cur_retry) } pub fn is_exhausted(&self) -> bool { - self.cur_attempt >= self.max_attempts + self.cur_retry >= self.max_retries } pub fn reset(&mut self) { - self.cur_attempt = 0; + self.cur_retry = 0; } - pub fn attempts_used(&self) -> u32 { - self.cur_attempt + pub fn used(&self) -> u32 { + self.cur_retry } } @@ -80,18 +80,18 @@ impl Iterator for RetryBackoff { type Item = Duration; fn next(&mut self) -> Option { - if self.cur_attempt == self.max_attempts { + if self.cur_retry == self.max_retries { return None; } let base_delay = (self .min_base_delay - .saturating_mul(2u32.saturating_pow(self.cur_attempt))) + .saturating_mul(2u32.saturating_pow(self.cur_retry))) .min(self.max_base_delay); let jitter = Duration::try_from_secs_f64(base_delay.as_secs_f64() * rng().random_range(0.0..=1.0)) .unwrap_or(Duration::MAX); let delay = base_delay + jitter; - self.cur_attempt += 1; + self.cur_retry += 1; Some(delay) } } @@ -131,31 +131,31 @@ mod tests { fn backoff_with_reset() { let mut backoff = RetryBackoffBuilder::default().with_max_retries(3).build(); - assert_eq!(backoff.attempts_used(), 0); + assert_eq!(backoff.used(), 0); assert_eq!(backoff.remaining(), 3); assert!(!backoff.is_exhausted()); assert!(backoff.next().is_some()); - assert_eq!(backoff.attempts_used(), 1); + assert_eq!(backoff.used(), 1); assert_eq!(backoff.remaining(), 2); assert!(!backoff.is_exhausted()); backoff.reset(); - assert_eq!(backoff.attempts_used(), 0); + assert_eq!(backoff.used(), 0); assert_eq!(backoff.remaining(), 3); assert!(!backoff.is_exhausted()); assert!(backoff.next().is_some()); - assert_eq!(backoff.attempts_used(), 1); + assert_eq!(backoff.used(), 1); assert_eq!(backoff.remaining(), 2); assert!(backoff.next().is_some()); - assert_eq!(backoff.attempts_used(), 2); + assert_eq!(backoff.used(), 2); assert_eq!(backoff.remaining(), 1); assert!(backoff.next().is_some()); - assert_eq!(backoff.attempts_used(), 3); + assert_eq!(backoff.used(), 3); assert_eq!(backoff.remaining(), 0); assert!(backoff.is_exhausted()); diff --git a/src/session/append.rs b/src/session/append.rs index 900e6a4..51377c1 100644 --- a/src/session/append.rs +++ b/src/session/append.rs @@ -409,7 +409,7 @@ async fn run_session_with_retry( stashed_submission: None, }; let mut prev_total_acked_records = 0; - let mut retry_backoffs = retry_builder.build(); + let mut retry_backoff = retry_builder.build(); loop { let result = run_session(&client, &stream, &mut state, buffer_size).await; @@ -421,7 +421,7 @@ async fn run_session_with_retry( Err(err) => { if prev_total_acked_records < state.total_acked_records { prev_total_acked_records = state.total_acked_records; - retry_backoffs.reset(); + retry_backoff.reset(); } let retry_policy_compliant = retry_policy_compliant( @@ -431,12 +431,12 @@ async fn run_session_with_retry( if retry_policy_compliant && err.is_retryable() - && let Some(backoff) = retry_backoffs.next() + && let Some(backoff) = retry_backoff.next() { debug!( %err, ?backoff, - num_retries_remaining = retry_backoffs.remaining(), + num_retries_remaining = retry_backoff.remaining(), "retrying append session" ); tokio::time::sleep(backoff).await; @@ -444,7 +444,7 @@ async fn run_session_with_retry( debug!( %err, retry_policy_compliant, - retries_exhausted = retry_backoffs.is_exhausted(), + retries_exhausted = retry_backoff.is_exhausted(), "not retrying append session" ); diff --git a/src/session/read.rs b/src/session/read.rs index bab744d..6ad9a00 100644 --- a/src/session/read.rs +++ b/src/session/read.rs @@ -47,8 +47,7 @@ pub async fn read_session( mut end: ReadEnd, ignore_command_records: bool, ) -> Result, ReadSessionError> { - let retry_builder = retry_builder(&client.config.retry); - let mut retry_backoffs = retry_builder.build(); + let mut retry_backoff = retry_builder(&client.config.retry).build(); let baseline_wait = end.wait; let mut last_tail_at: Option = None; @@ -64,11 +63,11 @@ pub async fn read_session( .await { Ok(batches) => { - retry_backoffs.reset(); + retry_backoff.reset(); break batches; } Err(err) => { - if can_retry(&err, &mut retry_backoffs).await { + if can_retry(&err, &mut retry_backoff).await { continue; } return Err(err); @@ -91,7 +90,7 @@ pub async fn read_session( ).await { Ok(b) => batches = Some(b), Err(err) => { - if can_retry(&err, &mut retry_backoffs).await { + if can_retry(&err, &mut retry_backoff).await { continue; } yield Err(err); @@ -107,8 +106,8 @@ pub async fn read_session( .await { Some(Ok(batch)) => { - if retry_backoffs.attempts_used() > 0 { - retry_backoffs.reset(); + if retry_backoff.used() > 0 { + retry_backoff.reset(); } if batch.tail.is_some() { @@ -136,7 +135,7 @@ pub async fn read_session( } Some(Err(err)) => { batches = None; - if can_retry(&err, &mut retry_backoffs).await { + if can_retry(&err, &mut retry_backoff).await { continue; } yield Err(err);