Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ impl<'a> RequestBuilder<'a> {
async fn send(self) -> Result<UnaryResponse, ApiError> {
let request = self.request;

let mut retry_backoffs: Option<RetryBackoff> = self
let mut retry_backoff: Option<RetryBackoff> = self
.retry_enabled
.then(|| self.client.retry_builder.build());

Expand Down Expand Up @@ -873,13 +873,13 @@ impl<'a> RequestBuilder<'a> {
};

if err.is_retryable()
&& let Some(backoff) = retry_backoffs.as_mut().and_then(|b| b.next())
&& let Some(backoff) = retry_backoff.as_mut().and_then(|b| b.next())
{
let backoff = retry_after.map_or(backoff, |ra| ra.max(backoff));
debug!(
%err,
?backoff,
num_retries_remaining = retry_backoffs.as_ref().map(|b| b.remaining()).unwrap_or(0),
num_retries_remaining = retry_backoff.as_ref().map(|b| b.remaining()).unwrap_or(0),
"retrying request"
);
tokio::time::sleep(backoff).await;
Expand All @@ -888,7 +888,7 @@ impl<'a> RequestBuilder<'a> {
%err,
is_retryable = err.is_retryable(),
retry_enabled = self.retry_enabled,
retries_exhausted = retry_backoffs.as_ref().is_none_or(|b| b.is_exhausted()),
retries_exhausted = retry_backoff.as_ref().is_none_or(|b| b.is_exhausted()),
"not retrying request"
);
return Err(err);
Expand Down
36 changes: 18 additions & 18 deletions src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,53 +45,53 @@ impl RetryBackoffBuilder {
RetryBackoff {
min_base_delay: self.min_base_delay,
max_base_delay: self.max_base_delay,
max_attempts: self.max_retries,
cur_attempt: 0,
max_retries: self.max_retries,
cur_retry: 0,
}
}
}

pub struct RetryBackoff {
min_base_delay: Duration,
max_base_delay: Duration,
max_attempts: u32,
cur_attempt: u32,
max_retries: u32,
cur_retry: u32,
}

impl RetryBackoff {
pub fn remaining(&self) -> u32 {
self.max_attempts.saturating_sub(self.cur_attempt)
self.max_retries.saturating_sub(self.cur_retry)
}

pub fn is_exhausted(&self) -> bool {
self.cur_attempt >= self.max_attempts
self.cur_retry >= self.max_retries
}

pub fn reset(&mut self) {
self.cur_attempt = 0;
self.cur_retry = 0;
}

pub fn attempts_used(&self) -> u32 {
self.cur_attempt
pub fn used(&self) -> u32 {
self.cur_retry
}
}

impl Iterator for RetryBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Self::Item> {
if self.cur_attempt == self.max_attempts {
if self.cur_retry == self.max_retries {
return None;
}
let base_delay = (self
.min_base_delay
.saturating_mul(2u32.saturating_pow(self.cur_attempt)))
.saturating_mul(2u32.saturating_pow(self.cur_retry)))
.min(self.max_base_delay);
let jitter =
Duration::try_from_secs_f64(base_delay.as_secs_f64() * rng().random_range(0.0..=1.0))
.unwrap_or(Duration::MAX);
let delay = base_delay + jitter;
self.cur_attempt += 1;
self.cur_retry += 1;
Some(delay)
}
}
Expand Down Expand Up @@ -131,31 +131,31 @@ mod tests {
fn backoff_with_reset() {
let mut backoff = RetryBackoffBuilder::default().with_max_retries(3).build();

assert_eq!(backoff.attempts_used(), 0);
assert_eq!(backoff.used(), 0);
assert_eq!(backoff.remaining(), 3);
assert!(!backoff.is_exhausted());

assert!(backoff.next().is_some());
assert_eq!(backoff.attempts_used(), 1);
assert_eq!(backoff.used(), 1);
assert_eq!(backoff.remaining(), 2);
assert!(!backoff.is_exhausted());

backoff.reset();

assert_eq!(backoff.attempts_used(), 0);
assert_eq!(backoff.used(), 0);
assert_eq!(backoff.remaining(), 3);
assert!(!backoff.is_exhausted());

assert!(backoff.next().is_some());
assert_eq!(backoff.attempts_used(), 1);
assert_eq!(backoff.used(), 1);
assert_eq!(backoff.remaining(), 2);

assert!(backoff.next().is_some());
assert_eq!(backoff.attempts_used(), 2);
assert_eq!(backoff.used(), 2);
assert_eq!(backoff.remaining(), 1);

assert!(backoff.next().is_some());
assert_eq!(backoff.attempts_used(), 3);
assert_eq!(backoff.used(), 3);
assert_eq!(backoff.remaining(), 0);
assert!(backoff.is_exhausted());

Expand Down
10 changes: 5 additions & 5 deletions src/session/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ async fn run_session_with_retry(
stashed_submission: None,
};
let mut prev_total_acked_records = 0;
let mut retry_backoffs = retry_builder.build();
let mut retry_backoff = retry_builder.build();

loop {
let result = run_session(&client, &stream, &mut state, buffer_size).await;
Expand All @@ -421,7 +421,7 @@ async fn run_session_with_retry(
Err(err) => {
if prev_total_acked_records < state.total_acked_records {
prev_total_acked_records = state.total_acked_records;
retry_backoffs.reset();
retry_backoff.reset();
}

let retry_policy_compliant = retry_policy_compliant(
Expand All @@ -431,20 +431,20 @@ async fn run_session_with_retry(

if retry_policy_compliant
&& err.is_retryable()
&& let Some(backoff) = retry_backoffs.next()
&& let Some(backoff) = retry_backoff.next()
{
debug!(
%err,
?backoff,
num_retries_remaining = retry_backoffs.remaining(),
num_retries_remaining = retry_backoff.remaining(),
"retrying append session"
);
tokio::time::sleep(backoff).await;
} else {
debug!(
%err,
retry_policy_compliant,
retries_exhausted = retry_backoffs.is_exhausted(),
retries_exhausted = retry_backoff.is_exhausted(),
"not retrying append session"
);

Expand Down
15 changes: 7 additions & 8 deletions src/session/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ pub async fn read_session(
mut end: ReadEnd,
ignore_command_records: bool,
) -> Result<Streaming<ReadBatch>, ReadSessionError> {
let retry_builder = retry_builder(&client.config.retry);
let mut retry_backoffs = retry_builder.build();
let mut retry_backoff = retry_builder(&client.config.retry).build();
let baseline_wait = end.wait;
let mut last_tail_at: Option<Instant> = None;

Expand All @@ -64,11 +63,11 @@ pub async fn read_session(
.await
{
Ok(batches) => {
retry_backoffs.reset();
retry_backoff.reset();
break batches;
}
Err(err) => {
if can_retry(&err, &mut retry_backoffs).await {
if can_retry(&err, &mut retry_backoff).await {
continue;
}
return Err(err);
Expand All @@ -91,7 +90,7 @@ pub async fn read_session(
).await {
Ok(b) => batches = Some(b),
Err(err) => {
if can_retry(&err, &mut retry_backoffs).await {
if can_retry(&err, &mut retry_backoff).await {
continue;
}
yield Err(err);
Expand All @@ -107,8 +106,8 @@ pub async fn read_session(
.await
{
Some(Ok(batch)) => {
if retry_backoffs.attempts_used() > 0 {
retry_backoffs.reset();
if retry_backoff.used() > 0 {
retry_backoff.reset();
}

if batch.tail.is_some() {
Expand Down Expand Up @@ -136,7 +135,7 @@ pub async fn read_session(
}
Some(Err(err)) => {
batches = None;
if can_retry(&err, &mut retry_backoffs).await {
if can_retry(&err, &mut retry_backoff).await {
continue;
}
yield Err(err);
Expand Down
Loading