Skip to content

Commit 339cfe6

Browse files
fix(openai_responses): preserve 503 retryable errors in stream (#3460)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 40e3ba1 commit 339cfe6

2 files changed

Lines changed: 131 additions & 5 deletions

File tree

crates/forge_app/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod operation;
2020
mod orch;
2121
#[cfg(test)]
2222
mod orch_spec;
23-
mod retry;
23+
pub mod retry;
2424
mod search_dedup;
2525
mod services;
2626
mod set_conversation_id;

crates/forge_repo/src/provider/openai_responses/repository.rs

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,22 @@ impl<T: HttpInfra> OpenAIResponsesProvider<T> {
249249
}
250250
}
251251
Err(forge_eventsource::Error::StreamEnded) => None,
252-
Err(forge_eventsource::Error::InvalidStatusCode(_, response))
253-
| Err(forge_eventsource::Error::InvalidContentType(_, response)) => {
252+
Err(forge_eventsource::Error::InvalidStatusCode(status, response)) => {
254253
let (_, reason) = read_http_error_reason(*response).await;
255-
Some(Err(anyhow::anyhow!(reason)
256-
.context(format_http_context(None, "POST", &url))))
254+
Some(Err(anyhow::Error::from(
255+
forge_app::dto::openai::Error::InvalidStatusCode(status.as_u16()),
256+
)
257+
.context(reason)
258+
.context(format_http_context(None, "POST", &url))))
259+
}
260+
Err(forge_eventsource::Error::InvalidContentType(_, response)) => {
261+
let status = response.status();
262+
let (_, reason) = read_http_error_reason(*response).await;
263+
Some(Err(anyhow::Error::from(
264+
forge_app::dto::openai::Error::InvalidStatusCode(status.as_u16()),
265+
)
266+
.context(reason)
267+
.context(format_http_context(None, "POST", &url))))
257268
}
258269
Err(e) => {
259270
Some(Err(anyhow::Error::from(e)
@@ -1678,4 +1689,119 @@ mod tests {
16781689
assert!(err_str.contains("/v1/responses"), "missing url: {err_str}");
16791690
Ok(())
16801691
}
1692+
1693+
/// Tests that a 503 Service Unavailable error from the SSE endpoint is
1694+
/// correctly classified as retryable by the retry logic.
1695+
#[tokio::test]
1696+
async fn test_stream_503_error_is_retryable() -> anyhow::Result<()> {
1697+
let mut fixture = MockServer::new().await;
1698+
let _mock = fixture
1699+
.mock_post_error("/v1/responses", "upstream connec", 503)
1700+
.await;
1701+
1702+
let provider = openai_responses(
1703+
"test-api-key",
1704+
&format!("{}/v1/chat/completions", fixture.url()),
1705+
);
1706+
let infra = Arc::new(MockHttpClient { client: reqwest::Client::new() });
1707+
let provider_impl = OpenAIResponsesProvider::new(provider, infra);
1708+
let context = ChatContext::default()
1709+
.add_message(ContextMessage::user("Hi", None))
1710+
.stream(true);
1711+
1712+
let mut stream = provider_impl
1713+
.chat(&ModelId::from("gpt-4o"), context)
1714+
.await?;
1715+
1716+
let actual = stream.next().await.expect("stream should yield one item");
1717+
assert!(actual.is_err());
1718+
let error = actual.unwrap_err();
1719+
1720+
// Verify the status code is preserved in the error
1721+
let expected = Some(503u16);
1722+
assert_eq!(retry::get_api_status_code(&error), expected);
1723+
1724+
// Verify it is classified as retryable
1725+
let retry_config =
1726+
forge_config::RetryConfig::default().status_codes(vec![429, 500, 502, 503, 504]);
1727+
let retry_error = retry::into_retry(error, &retry_config);
1728+
assert!(
1729+
retry_error
1730+
.downcast_ref::<forge_domain::Error>()
1731+
.is_some_and(|e| { matches!(e, forge_domain::Error::Retryable(_)) }),
1732+
"503 error should be classified as retryable"
1733+
);
1734+
1735+
Ok(())
1736+
}
1737+
1738+
/// Tests that the retry_with_config mechanism will actually retry an
1739+
/// operation that produces a 503 error from the OpenAI Responses stream.
1740+
#[tokio::test]
1741+
async fn test_503_error_triggers_retry() -> anyhow::Result<()> {
1742+
use std::sync::atomic::{AtomicUsize, Ordering};
1743+
1744+
let mut fixture = MockServer::new().await;
1745+
let _mock = fixture
1746+
.mock_post_error("/v1/responses", "upstream connec", 503)
1747+
.await;
1748+
1749+
let provider = openai_responses(
1750+
"test-api-key",
1751+
&format!("{}/v1/chat/completions", fixture.url()),
1752+
);
1753+
let infra = Arc::new(MockHttpClient { client: reqwest::Client::new() });
1754+
let provider_impl = OpenAIResponsesProvider::new(provider, infra);
1755+
let retry_config = forge_config::RetryConfig::default()
1756+
.status_codes(vec![429, 500, 502, 503, 504])
1757+
.max_attempts(3usize)
1758+
.min_delay_ms(1u64);
1759+
1760+
let attempt_count = Arc::new(AtomicUsize::new(0));
1761+
let attempt_count_clone = attempt_count.clone();
1762+
1763+
let result: anyhow::Result<()> = forge_app::retry::retry_with_config(
1764+
&retry_config,
1765+
|| {
1766+
let provider_impl = provider_impl.clone();
1767+
let retry_config = retry_config.clone();
1768+
attempt_count_clone.fetch_add(1, Ordering::SeqCst);
1769+
async move {
1770+
let context = ChatContext::default()
1771+
.add_message(ContextMessage::user("Hi", None))
1772+
.stream(true);
1773+
1774+
let mut stream = provider_impl
1775+
.chat(&ModelId::from("gpt-4o"), context)
1776+
.await
1777+
.map_err(|e| retry::into_retry(e, &retry_config))?;
1778+
1779+
// Drain the stream to surface the 503 error
1780+
while let Some(item) = stream.next().await {
1781+
let _ = item.map_err(|e| retry::into_retry(e, &retry_config))?;
1782+
}
1783+
1784+
// The first attempt should never reach here (503 error),
1785+
// but if the mock server stops returning 503, we succeed.
1786+
Ok(())
1787+
}
1788+
},
1789+
None::<fn(&anyhow::Error, std::time::Duration)>,
1790+
)
1791+
.await;
1792+
1793+
// The operation should have failed after exhausting retries
1794+
assert!(result.is_err(), "Expected error after retries");
1795+
1796+
// Verify that the operation was retried (1 initial + up to max_attempts
1797+
// retries)
1798+
let actual_attempts = attempt_count.load(Ordering::SeqCst);
1799+
let expected_min_attempts = 2; // At least initial + 1 retry
1800+
assert!(
1801+
actual_attempts >= expected_min_attempts,
1802+
"Expected at least {expected_min_attempts} attempts, got {actual_attempts}"
1803+
);
1804+
1805+
Ok(())
1806+
}
16811807
}

0 commit comments

Comments
 (0)