Skip to content

fix: prevent mid-stream retry from duplicating published tokens (#5923)#6541

Open
EsraaKamel11 wants to merge 1 commit intoaden-hive:mainfrom
EsraaKamel11:fix/stream-retry-duplication-5923-clean
Open

fix: prevent mid-stream retry from duplicating published tokens (#5923)#6541
EsraaKamel11 wants to merge 1 commit intoaden-hive:mainfrom
EsraaKamel11:fix/stream-retry-duplication-5923-clean

Conversation

@EsraaKamel11
Copy link

Summary

Fixes #5923 — when the LiteLLM streaming layer retried after a mid-stream RateLimitError or transient error, it re-streamed from token 1, duplicating content that had already been yielded to callers and published to the event bus.

Since published events cannot be recalled, the retry must be abandoned when a partial stream has already been emitted.

Root Cause

LiteLLMProvider.stream() has an internal retry loop for transient errors. Both the RateLimitError handler and the generic transient-error handler would unconditionally continue — restarting the entire stream from the beginning — even when accumulated_text was non-empty (i.e., chunks had already been yielded upstream and emitted on the event bus).

Before — both handlers did this unconditionally:

except RateLimitError as e:
    if attempt < RATE_LIMIT_MAX_RETRIES:
        wait = _compute_retry_delay(...)
        await asyncio.sleep(wait)
        continue   # re-streams from token 0, duplicating all prior output

The event bus publishes token deltas eagerly as they stream in. There is no mechanism to retract already-published events, so retrying produced a second copy of every token the client had already received.

Fix

In both exception handlers, check accumulated_text before retrying. If any text has already been yielded, emit a recoverable=True StreamErrorEvent and return immediately. EventLoopNode's existing empty-response guard at line 1706 detects the non-empty accumulated_text and suppresses the outer retry, preserving the partial turn.

except RateLimitError as e:
    if attempt < RATE_LIMIT_MAX_RETRIES:
        if accumulated_text:
            # Text already published to event bus — cannot be recalled.
            # Yield recoverable error; EventLoopNode will commit the partial
            # text and skip the outer retry (accumulated_text non-empty).
            yield StreamErrorEvent(error=str(e), recoverable=True)
            return
        wait = _compute_retry_delay(attempt, exception=e)
        ...
        continue
except Exception as e:
    if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
        if accumulated_text:
            yield StreamErrorEvent(error=str(e), recoverable=True)
            return
        ...
        continue

Changes

File Change
core/framework/llm/litellm.py Add accumulated_text guard to RateLimitError handler (L989) and transient Exception handler (L1012)
core/tests/test_event_loop_node.py Add PartialStreamThenErrorLLM helper and TestMidStreamRetryNoDuplication class with 5 new tests

Tests

5 new automated tests added to TestMidStreamRetryNoDuplication in core/tests/test_event_loop_node.py — all passing (74/74 total):

  • test_mid_stream_error_no_duplicate_deltas_3_chunks — 3 chunks + error -> exactly 3 deltas on bus, no outer retry
  • test_mid_stream_error_no_duplicate_deltas_50_chunks — 50 chunks + error -> exactly 50 deltas, no outer retry
  • test_mid_stream_error_at_chunk_0_triggers_outer_retry — error before first chunk -> outer retry fires correctly, no duplication
  • test_mid_stream_tool_only_error_inner_retry_unaffected — tool-only error -> inner retry safe, guard does not block
  • test_mid_stream_recoverable_error_partial_text_committed — partial text committed to history, _call_index == 1

Notes

  • Guard uses accumulated_text only, not tool_calls_acc — tool deltas are buffered locally and never published before stream completion, so mid-tool-stream errors remain safe to retry internally
  • The empty-stream retry path (L1019-1020) is correctly guarded by not has_content and is unaffected
  • EventLoopNode empty-response guard at runtime_logger.py:1706 is the cooperating mechanism that absorbs the early exit without crashing the turn

…ent bus

Fixes aden-hive#5923

LiteLLMProvider.stream() retried on transient errors and rate limits
without checking whether TextDeltaEvents had already been yielded and
published to the event bus. When an error fired after K chunks had
streamed, the retry replayed the full response from token 1 — permanently
concatenating the partial first attempt with the complete second attempt
in the client UI stream. EventBus.publish() is fire-and-forget with no
retract mechanism, making the corruption irreversible.

With RATE_LIMIT_MAX_RETRIES=10, up to 11 concatenated partial attempts
could reach the client before a terminal error. Tool-call-only streams
were unaffected (tool deltas are buffered, never yielded as
TextDeltaEvents).

Fix: add a guard in both exception handlers — if accumulated_text is
non-empty when an error fires, yield StreamErrorEvent(recoverable=True)
and return instead of retrying. EventLoopNode._do_stream() commits the
partial text to conversation history and does not trigger an outer retry
(line 1706 condition requires accumulated_text == '' to raise
ConnectionError). Clean restart without touching the already-published
stream.

Guard uses accumulated_text only, not tool_calls_acc — tool deltas are
buffered locally and never published before stream completion, so
mid-tool-stream errors remain safe to retry internally.

Tests added (5 new, 74/74 passing):
- test_mid_stream_error_no_duplicate_deltas_3_chunks: 3 chunks + error
  -> exactly 3 deltas on bus, no outer retry
- test_mid_stream_error_no_duplicate_deltas_50_chunks: 50 chunks + error
  -> exactly 50 deltas, no outer retry
- test_mid_stream_error_at_chunk_0_triggers_outer_retry: error before
  first chunk -> outer retry fires, exactly 2 deltas from success path
- test_mid_stream_tool_only_error_inner_retry_unaffected: tool-only
  error -> inner retry safe, no duplication
- test_mid_stream_recoverable_error_partial_text_committed: partial text
  committed to history, call_index == 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: LiteLLMProvider.stream() retries mid-stream without guard — clients receive duplicated partial response concatenated with full retry response

1 participant