fix(transport): forward connection-resilience kwargs to the PEL reclaimer#243
Conversation
…imer The reclaimer created its own Redis client with no socket timeout or keepalive regardless of the settings passed to the transport, so a silently dropped connection (cloud Redis failover, idle-connection reaper) left its blocking reads hung indefinitely with no way to recover. Forward the broker's connection-resilience kwargs (socket_timeout, socket_keepalive, health_check_interval, retry_on_timeout, ssl_*, …) to PendingReclaimerManager so its independent client recovers the same way the broker does. decode_responses stays pinned to False for binary passthrough and cannot be overridden by callers.
QualOps Code Quality AnalysisStatus: Summary
🟡 Medium Issues (2)
📊 Full ReportPowered by QualOps |
…itor The consumer-group monitor (_monitor_stream_groups) created its own long-lived background Redis client with no socket timeout or keepalive, so a silently dropped connection left its blocking reads hung indefinitely — and that hang struck during the very failover the monitor exists to recover from. Forward the same resilience kwargs already harvested for the PEL reclaimer to the monitor's client, pinning decode_responses=True for its string commands. Rename _RECLAIMER_CONNECTION_KEYS to _BACKGROUND_CLIENT_CONNECTION_KEYS since it now feeds both background clients.
The PEL reclaimer runs its own background Redis client, created with
aioredis.from_url(url, decode_responses=False)and nothing else — so itignored whatever connection settings the transport was given. Even when the
broker was configured with
socket_timeout/socket_keepalive/health_check_interval, the reclaimer's client had none of them. When theunderlying connection was silently dropped (cloud Redis failover, an
idle-connection reaper, a network blip), the reclaimer's blocking reads had no
socket timeout or keepalive to break them and hung indefinitely, with no path
to recovery short of a process restart.
This PR forwards the transport's connection-resilience kwargs to the reclaimer's
client so both connections recover the same way. The forwarded keys are a
whitelist of redis-py connection/resilience options valid for
aioredis.from_url; broker/FastStream-only kwargs (decoder, middlewares,asyncapi_*, …) are intentionally excluded.
decode_responsesstays pinned toFalsefor binary passthrough and cannot be overridden by callers.The change is opt-in and fully backward compatible: when no connection kwargs
are passed (or a pre-built
broker=is supplied), the reclaimer behaves exactlyas before.
Closes #244
Type of Change
Related Issue
Fixes #244
Changes Made
RedisTransport.__init__captures connection-resilience kwargs from**kwargsintoself._connection_kwargsvia a_RECLAIMER_CONNECTION_KEYSwhitelist (socket_timeout,socket_connect_timeout,socket_keepalive,socket_keepalive_options,health_check_interval,retry_on_timeout,retry_on_error,max_connections,ssl_*). Copied, not popped — the broker still receives them too._setup_reclaimerpassesconnection_kwargs=self._connection_kwargstoPendingReclaimerManager.PendingReclaimerManager.__init__acceptsconnection_kwargs;start()applies them infrom_url, withdecode_responses=Falsepinned last so callers cannot break binary passthrough.test_connection_kwargs_propagate_to_reclaimer(transport → manager propagation, broker-only kwargs excluded) andtest_reclaimer_start_applies_connection_kwargs(kwargs reachfrom_url;decode_responsesforced toFalse).sdk/CHANGELOG.mdunder[Unreleased] → Fixed.Migration
None — fully backward compatible. No on-wire or API changes; the reclaimer only
gains the connection settings already passed to the transport.
Testing
make test)Local run against a Redis 7 container: 48 passed including the 2 new tests. Two
pre-existing failures (
test_backoff_*) are unrelated to this change — a localfaststream-version mismatch (
BinaryMessageFormatV1.encodenot awaitable) thatalso fails on the clean base commit.
Changelog
sdk/CHANGELOG.mdunder[Unreleased]section (required for code changes)Checklist
make lintpasses)make formatapplied)