From 0130346f2857b765b519c036831760c7e6563508 Mon Sep 17 00:00:00 2001 From: tsiaeggai Date: Mon, 8 Jun 2026 13:57:14 +0300 Subject: [PATCH 1/2] fix(transport): forward connection-resilience kwargs to the PEL reclaimer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reclaimer created its own Redis client with no socket timeout or keepalive regardless of the settings passed to the transport, so a silently dropped connection (cloud Redis failover, idle-connection reaper) left its blocking reads hung indefinitely with no way to recover. Forward the broker's connection-resilience kwargs (socket_timeout, socket_keepalive, health_check_interval, retry_on_timeout, ssl_*, …) to PendingReclaimerManager so its independent client recovers the same way the broker does. decode_responses stays pinned to False for binary passthrough and cannot be overridden by callers. --- sdk/CHANGELOG.md | 12 ++++ sdk/eggai/transport/pending_reclaimer.py | 15 ++++- sdk/eggai/transport/redis.py | 31 +++++++++- sdk/tests/test_redis.py | 76 ++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 3 deletions(-) diff --git a/sdk/CHANGELOG.md b/sdk/CHANGELOG.md index 94c0a988..ac8b9b06 100644 --- a/sdk/CHANGELOG.md +++ b/sdk/CHANGELOG.md @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **RedisTransport / PEL reclaimer**: connection-resilience kwargs passed to the + transport (`socket_timeout`, `socket_connect_timeout`, `socket_keepalive`, + `socket_keepalive_options`, `health_check_interval`, `retry_on_timeout`, + `retry_on_error`, `max_connections`, and the `ssl_*` options) are now forwarded + to the background reclaimer's independent Redis client. Previously the reclaimer + created its client with no socket timeout or keepalive regardless of the + transport's settings, so a silently dropped connection (e.g. cloud Redis + failover or an idle-connection reaper) left its blocking reads hung indefinitely + with no way to recover. `decode_responses` remains pinned to `False` for binary + passthrough and cannot be overridden by callers. + ### Added - **RedisTransport**: Exponential **retry backoff** for SDK-managed retries. New `subscribe()` options `retry_backoff_multiplier` (default `1.0` = the previous diff --git a/sdk/eggai/transport/pending_reclaimer.py b/sdk/eggai/transport/pending_reclaimer.py index de477e35..053fa0bc 100644 --- a/sdk/eggai/transport/pending_reclaimer.py +++ b/sdk/eggai/transport/pending_reclaimer.py @@ -138,8 +138,15 @@ class PendingReclaimerManager: message body) can be used for application-level deduplication. """ - def __init__(self, redis_url: str): + def __init__(self, redis_url: str, connection_kwargs: dict[str, Any] | None = None): self._redis_url = redis_url + # Connection-resilience settings (socket_timeout, socket_keepalive, + # health_check_interval, retry_on_timeout, …) forwarded from the + # transport so this independent client recovers from a silently dropped + # connection the same way the broker does. Without them a blocking read + # against a half-dead socket (e.g. cloud Redis failover) hangs forever + # with no socket timeout to break it. + self._connection_kwargs: dict[str, Any] = connection_kwargs or {} self._redis_client: aioredis.Redis | None = None self._configs: dict[tuple[str, str, str], ReclaimerConfig] = {} self._tasks: dict[tuple[str, str, str], asyncio.Task] = {} @@ -157,7 +164,11 @@ async def start(self) -> None: """Start one background task per registered config. Safe to call again after stop.""" # decode_responses=False: field values are kept as raw bytes so that # FastStream's binary-encoded __data__ field is passed through unchanged. - self._redis_client = aioredis.from_url(self._redis_url, decode_responses=False) + # decode_responses is pinned here and must not be overridden by callers. + self._redis_client = aioredis.from_url( + self._redis_url, + **{**self._connection_kwargs, "decode_responses": False}, + ) for key, config in self._configs.items(): if key in self._tasks and not self._tasks[key].done(): continue diff --git a/sdk/eggai/transport/redis.py b/sdk/eggai/transport/redis.py index 3c4a0d89..a5202a8a 100644 --- a/sdk/eggai/transport/redis.py +++ b/sdk/eggai/transport/redis.py @@ -24,6 +24,28 @@ # the stream and own their own PEL entries — the competing-consumers pattern. _CONSUMER_INSTANCE = f"{socket.gethostname()}-{os.getpid()}" +# Connection settings that, when passed to the broker, must also be forwarded to +# the PEL reclaimer's independent Redis client so both connections recover from a +# silently dropped socket the same way. Kept to redis-py connection/resilience +# kwargs that are valid for ``aioredis.from_url`` — broker/FastStream-only kwargs +# (decoder, middlewares, asyncapi_*, …) are intentionally excluded. ``decode_responses`` +# is omitted on purpose: the reclaimer pins it to False for binary passthrough. +_RECLAIMER_CONNECTION_KEYS = ( + "socket_timeout", + "socket_connect_timeout", + "socket_keepalive", + "socket_keepalive_options", + "health_check_interval", + "retry_on_timeout", + "retry_on_error", + "max_connections", + "ssl_keyfile", + "ssl_certfile", + "ssl_cert_reqs", + "ssl_ca_certs", + "ssl_check_hostname", +) + @dataclass(frozen=True) class _StreamGroupInfo: @@ -132,6 +154,11 @@ def __init__( else: self.broker = RedisBroker(url, log_level=logging.INFO, **kwargs) self._redis_url = url + # Forward connection-resilience kwargs to the reclaimer's own client so it + # doesn't hang on a silently dropped connection while the broker recovers. + self._connection_kwargs: dict[str, Any] = { + k: kwargs[k] for k in _RECLAIMER_CONNECTION_KEYS if k in kwargs + } self._max_len = max_len self._retry_max_len = retry_max_len self._running = False @@ -684,7 +711,9 @@ def _setup_reclaimer( backoff_jitter: float = 0.0, ) -> tuple[str, str, str]: if self._reclaimer_manager is None: - self._reclaimer_manager = PendingReclaimerManager(self._redis_url) + self._reclaimer_manager = PendingReclaimerManager( + self._redis_url, connection_kwargs=self._connection_kwargs + ) return self._reclaimer_manager.add( ReclaimerConfig( stream=self._get_stream_key(stream), diff --git a/sdk/tests/test_redis.py b/sdk/tests/test_redis.py index fc925968..27ed23ee 100644 --- a/sdk/tests/test_redis.py +++ b/sdk/tests/test_redis.py @@ -1441,6 +1441,82 @@ async def handler(message): assert all(config.max_len == 250 for config in configs2) +@pytest.mark.asyncio +async def test_connection_kwargs_propagate_to_reclaimer(): + """Connection-resilience kwargs passed to the transport reach the reclaimer's + own client, so it recovers from a dropped connection like the broker does.""" + transport = RedisTransport( + socket_timeout=10.0, + socket_keepalive=True, + health_check_interval=30, + retry_on_timeout=True, + # A broker/FastStream-only kwarg that must NOT leak into the reclaimer's + # aioredis.from_url call. + graceful_timeout=20.0, + ) + + assert transport._connection_kwargs == { + "socket_timeout": 10.0, + "socket_keepalive": True, + "health_check_interval": 30, + "retry_on_timeout": True, + } + + async def handler(message): + return message + + await transport.subscribe( + "orders", handler, handler_id="orders-handler-1", retry_on_idle_ms=500 + ) + + assert transport._reclaimer_manager is not None + assert transport._reclaimer_manager._connection_kwargs == { + "socket_timeout": 10.0, + "socket_keepalive": True, + "health_check_interval": 30, + "retry_on_timeout": True, + } + + +@pytest.mark.asyncio +async def test_reclaimer_start_applies_connection_kwargs(monkeypatch): + """start() forwards connection_kwargs to from_url while pinning + decode_responses=False (callers cannot override the binary-passthrough flag).""" + from eggai.transport import pending_reclaimer + from eggai.transport.pending_reclaimer import PendingReclaimerManager + + captured: dict = {} + + class _FakeClient: + async def aclose(self): + pass + + def fake_from_url(url, **kwargs): + captured["url"] = url + captured["kwargs"] = kwargs + return _FakeClient() + + monkeypatch.setattr(pending_reclaimer.aioredis, "from_url", fake_from_url) + + manager = PendingReclaimerManager( + "redis://localhost:6379", + connection_kwargs={ + "socket_timeout": 10.0, + "socket_keepalive": True, + # Even if a caller sneaks decode_responses in, it must stay False. + "decode_responses": True, + }, + ) + await manager.start() + + assert captured["url"] == "redis://localhost:6379" + assert captured["kwargs"]["socket_timeout"] == 10.0 + assert captured["kwargs"]["socket_keepalive"] is True + assert captured["kwargs"]["decode_responses"] is False + + await manager.stop() + + @pytest.mark.asyncio async def test_publish_maxlen_bounds_stream_length(): """Integration: publishing well past max_len keeps the stream approximately From b485b38ba22b19847a84ed047a56da7844414248 Mon Sep 17 00:00:00 2001 From: tsiaeggai Date: Mon, 15 Jun 2026 10:33:16 +0300 Subject: [PATCH 2/2] fix(transport): forward connection-resilience kwargs to the group monitor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The consumer-group monitor (_monitor_stream_groups) created its own long-lived background Redis client with no socket timeout or keepalive, so a silently dropped connection left its blocking reads hung indefinitely — and that hang struck during the very failover the monitor exists to recover from. Forward the same resilience kwargs already harvested for the PEL reclaimer to the monitor's client, pinning decode_responses=True for its string commands. Rename _RECLAIMER_CONNECTION_KEYS to _BACKGROUND_CLIENT_CONNECTION_KEYS since it now feeds both background clients. --- sdk/CHANGELOG.md | 16 +++++++++------- sdk/eggai/transport/redis.py | 30 +++++++++++++++++++---------- sdk/tests/test_redis.py | 37 ++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 17 deletions(-) diff --git a/sdk/CHANGELOG.md b/sdk/CHANGELOG.md index ac8b9b06..e3971ba3 100644 --- a/sdk/CHANGELOG.md +++ b/sdk/CHANGELOG.md @@ -8,16 +8,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed -- **RedisTransport / PEL reclaimer**: connection-resilience kwargs passed to the +- **RedisTransport background clients**: connection-resilience kwargs passed to the transport (`socket_timeout`, `socket_connect_timeout`, `socket_keepalive`, `socket_keepalive_options`, `health_check_interval`, `retry_on_timeout`, `retry_on_error`, `max_connections`, and the `ssl_*` options) are now forwarded - to the background reclaimer's independent Redis client. Previously the reclaimer - created its client with no socket timeout or keepalive regardless of the - transport's settings, so a silently dropped connection (e.g. cloud Redis - failover or an idle-connection reaper) left its blocking reads hung indefinitely - with no way to recover. `decode_responses` remains pinned to `False` for binary - passthrough and cannot be overridden by callers. + to *both* long-lived background clients — the PEL reclaimer and the consumer-group + monitor. Previously each created its client with no socket timeout or keepalive + regardless of the transport's settings, so a silently dropped connection (e.g. + cloud Redis failover or an idle-connection reaper) left its blocking reads hung + indefinitely with no way to recover — and for the monitor, that hang struck + during the very failover it exists to recover from. `decode_responses` remains + pinned per client (`False` for the reclaimer's binary passthrough, `True` for the + monitor's string commands) and cannot be overridden by callers. ### Added - **RedisTransport**: Exponential **retry backoff** for SDK-managed retries. New diff --git a/sdk/eggai/transport/redis.py b/sdk/eggai/transport/redis.py index a5202a8a..7c78e405 100644 --- a/sdk/eggai/transport/redis.py +++ b/sdk/eggai/transport/redis.py @@ -25,12 +25,14 @@ _CONSUMER_INSTANCE = f"{socket.gethostname()}-{os.getpid()}" # Connection settings that, when passed to the broker, must also be forwarded to -# the PEL reclaimer's independent Redis client so both connections recover from a -# silently dropped socket the same way. Kept to redis-py connection/resilience -# kwargs that are valid for ``aioredis.from_url`` — broker/FastStream-only kwargs -# (decoder, middlewares, asyncapi_*, …) are intentionally excluded. ``decode_responses`` -# is omitted on purpose: the reclaimer pins it to False for binary passthrough. -_RECLAIMER_CONNECTION_KEYS = ( +# the transport's other long-lived background clients — the PEL reclaimer and the +# consumer-group monitor — so every connection recovers from a silently dropped +# socket the same way. Kept to redis-py connection/resilience kwargs that are valid +# for ``aioredis.from_url`` — broker/FastStream-only kwargs (decoder, middlewares, +# asyncapi_*, …) are intentionally excluded. ``decode_responses`` is omitted on +# purpose: each background client pins it itself (False for the reclaimer's binary +# passthrough, True for the monitor's string commands). +_BACKGROUND_CLIENT_CONNECTION_KEYS = ( "socket_timeout", "socket_connect_timeout", "socket_keepalive", @@ -154,10 +156,11 @@ def __init__( else: self.broker = RedisBroker(url, log_level=logging.INFO, **kwargs) self._redis_url = url - # Forward connection-resilience kwargs to the reclaimer's own client so it - # doesn't hang on a silently dropped connection while the broker recovers. + # Forward connection-resilience kwargs to the transport's background clients + # (PEL reclaimer, group monitor) so they don't hang on a silently dropped + # connection while the broker recovers. self._connection_kwargs: dict[str, Any] = { - k: kwargs[k] for k in _RECLAIMER_CONNECTION_KEYS if k in kwargs + k: kwargs[k] for k in _BACKGROUND_CLIENT_CONNECTION_KEYS if k in kwargs } self._max_len = max_len self._retry_max_len = retry_max_len @@ -659,7 +662,14 @@ async def _monitor_stream_groups(self) -> None: unprocessed messages are redelivered. Falls back to the original group_create_id with MKSTREAM when the stream itself is gone. """ - client = aioredis.from_url(self._redis_url, decode_responses=True) + # decode_responses=True (string commands here) is pinned and must not be + # overridden; the forwarded resilience kwargs (socket_timeout, keepalive, …) + # keep this client from hanging on a half-dead socket during the very + # failover this monitor exists to recover from. + client = aioredis.from_url( + self._redis_url, + **{**self._connection_kwargs, "decode_responses": True}, + ) try: while self._running: await asyncio.sleep(self._group_monitor_interval_s) diff --git a/sdk/tests/test_redis.py b/sdk/tests/test_redis.py index 27ed23ee..d6980b76 100644 --- a/sdk/tests/test_redis.py +++ b/sdk/tests/test_redis.py @@ -1517,6 +1517,43 @@ def fake_from_url(url, **kwargs): await manager.stop() +@pytest.mark.asyncio +async def test_monitor_applies_connection_kwargs(monkeypatch): + """The group monitor's background client gets the transport's resilience + kwargs while pinning decode_responses=True (its string commands need decoding, + and that flag must not be overridable).""" + from eggai.transport import redis as redis_module + + transport = RedisTransport( + socket_timeout=10.0, + socket_keepalive=True, + health_check_interval=30, + ) + + captured: dict = {} + + class _FakeClient: + async def aclose(self): + pass + + def fake_from_url(url, **kwargs): + captured["url"] = url + captured["kwargs"] = kwargs + return _FakeClient() + + monkeypatch.setattr(redis_module.aioredis, "from_url", fake_from_url) + + # _running stays False, so the monitor creates its client, skips the loop body, + # and closes the client via the finally block — enough to capture from_url. + await transport._monitor_stream_groups() + + assert captured["url"] == transport._redis_url + assert captured["kwargs"]["socket_timeout"] == 10.0 + assert captured["kwargs"]["socket_keepalive"] is True + assert captured["kwargs"]["health_check_interval"] == 30 + assert captured["kwargs"]["decode_responses"] is True + + @pytest.mark.asyncio async def test_publish_maxlen_bounds_stream_length(): """Integration: publishing well past max_len keeps the stream approximately