diff --git a/sdk/CHANGELOG.md b/sdk/CHANGELOG.md index 94c0a988..e3971ba3 100644 --- a/sdk/CHANGELOG.md +++ b/sdk/CHANGELOG.md @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **RedisTransport background clients**: connection-resilience kwargs passed to the + transport (`socket_timeout`, `socket_connect_timeout`, `socket_keepalive`, + `socket_keepalive_options`, `health_check_interval`, `retry_on_timeout`, + `retry_on_error`, `max_connections`, and the `ssl_*` options) are now forwarded + to *both* long-lived background clients — the PEL reclaimer and the consumer-group + monitor. Previously each created its client with no socket timeout or keepalive + regardless of the transport's settings, so a silently dropped connection (e.g. + cloud Redis failover or an idle-connection reaper) left its blocking reads hung + indefinitely with no way to recover — and for the monitor, that hang struck + during the very failover it exists to recover from. `decode_responses` remains + pinned per client (`False` for the reclaimer's binary passthrough, `True` for the + monitor's string commands) and cannot be overridden by callers. + ### Added - **RedisTransport**: Exponential **retry backoff** for SDK-managed retries. New `subscribe()` options `retry_backoff_multiplier` (default `1.0` = the previous diff --git a/sdk/eggai/transport/pending_reclaimer.py b/sdk/eggai/transport/pending_reclaimer.py index de477e35..053fa0bc 100644 --- a/sdk/eggai/transport/pending_reclaimer.py +++ b/sdk/eggai/transport/pending_reclaimer.py @@ -138,8 +138,15 @@ class PendingReclaimerManager: message body) can be used for application-level deduplication. """ - def __init__(self, redis_url: str): + def __init__(self, redis_url: str, connection_kwargs: dict[str, Any] | None = None): self._redis_url = redis_url + # Connection-resilience settings (socket_timeout, socket_keepalive, + # health_check_interval, retry_on_timeout, …) forwarded from the + # transport so this independent client recovers from a silently dropped + # connection the same way the broker does. Without them a blocking read + # against a half-dead socket (e.g. cloud Redis failover) hangs forever + # with no socket timeout to break it. + self._connection_kwargs: dict[str, Any] = connection_kwargs or {} self._redis_client: aioredis.Redis | None = None self._configs: dict[tuple[str, str, str], ReclaimerConfig] = {} self._tasks: dict[tuple[str, str, str], asyncio.Task] = {} @@ -157,7 +164,11 @@ async def start(self) -> None: """Start one background task per registered config. Safe to call again after stop.""" # decode_responses=False: field values are kept as raw bytes so that # FastStream's binary-encoded __data__ field is passed through unchanged. - self._redis_client = aioredis.from_url(self._redis_url, decode_responses=False) + # decode_responses is pinned here and must not be overridden by callers. + self._redis_client = aioredis.from_url( + self._redis_url, + **{**self._connection_kwargs, "decode_responses": False}, + ) for key, config in self._configs.items(): if key in self._tasks and not self._tasks[key].done(): continue diff --git a/sdk/eggai/transport/redis.py b/sdk/eggai/transport/redis.py index 3c4a0d89..7c78e405 100644 --- a/sdk/eggai/transport/redis.py +++ b/sdk/eggai/transport/redis.py @@ -24,6 +24,30 @@ # the stream and own their own PEL entries — the competing-consumers pattern. _CONSUMER_INSTANCE = f"{socket.gethostname()}-{os.getpid()}" +# Connection settings that, when passed to the broker, must also be forwarded to +# the transport's other long-lived background clients — the PEL reclaimer and the +# consumer-group monitor — so every connection recovers from a silently dropped +# socket the same way. Kept to redis-py connection/resilience kwargs that are valid +# for ``aioredis.from_url`` — broker/FastStream-only kwargs (decoder, middlewares, +# asyncapi_*, …) are intentionally excluded. ``decode_responses`` is omitted on +# purpose: each background client pins it itself (False for the reclaimer's binary +# passthrough, True for the monitor's string commands). +_BACKGROUND_CLIENT_CONNECTION_KEYS = ( + "socket_timeout", + "socket_connect_timeout", + "socket_keepalive", + "socket_keepalive_options", + "health_check_interval", + "retry_on_timeout", + "retry_on_error", + "max_connections", + "ssl_keyfile", + "ssl_certfile", + "ssl_cert_reqs", + "ssl_ca_certs", + "ssl_check_hostname", +) + @dataclass(frozen=True) class _StreamGroupInfo: @@ -132,6 +156,12 @@ def __init__( else: self.broker = RedisBroker(url, log_level=logging.INFO, **kwargs) self._redis_url = url + # Forward connection-resilience kwargs to the transport's background clients + # (PEL reclaimer, group monitor) so they don't hang on a silently dropped + # connection while the broker recovers. + self._connection_kwargs: dict[str, Any] = { + k: kwargs[k] for k in _BACKGROUND_CLIENT_CONNECTION_KEYS if k in kwargs + } self._max_len = max_len self._retry_max_len = retry_max_len self._running = False @@ -632,7 +662,14 @@ async def _monitor_stream_groups(self) -> None: unprocessed messages are redelivered. Falls back to the original group_create_id with MKSTREAM when the stream itself is gone. """ - client = aioredis.from_url(self._redis_url, decode_responses=True) + # decode_responses=True (string commands here) is pinned and must not be + # overridden; the forwarded resilience kwargs (socket_timeout, keepalive, …) + # keep this client from hanging on a half-dead socket during the very + # failover this monitor exists to recover from. + client = aioredis.from_url( + self._redis_url, + **{**self._connection_kwargs, "decode_responses": True}, + ) try: while self._running: await asyncio.sleep(self._group_monitor_interval_s) @@ -684,7 +721,9 @@ def _setup_reclaimer( backoff_jitter: float = 0.0, ) -> tuple[str, str, str]: if self._reclaimer_manager is None: - self._reclaimer_manager = PendingReclaimerManager(self._redis_url) + self._reclaimer_manager = PendingReclaimerManager( + self._redis_url, connection_kwargs=self._connection_kwargs + ) return self._reclaimer_manager.add( ReclaimerConfig( stream=self._get_stream_key(stream), diff --git a/sdk/tests/test_redis.py b/sdk/tests/test_redis.py index fc925968..d6980b76 100644 --- a/sdk/tests/test_redis.py +++ b/sdk/tests/test_redis.py @@ -1441,6 +1441,119 @@ async def handler(message): assert all(config.max_len == 250 for config in configs2) +@pytest.mark.asyncio +async def test_connection_kwargs_propagate_to_reclaimer(): + """Connection-resilience kwargs passed to the transport reach the reclaimer's + own client, so it recovers from a dropped connection like the broker does.""" + transport = RedisTransport( + socket_timeout=10.0, + socket_keepalive=True, + health_check_interval=30, + retry_on_timeout=True, + # A broker/FastStream-only kwarg that must NOT leak into the reclaimer's + # aioredis.from_url call. + graceful_timeout=20.0, + ) + + assert transport._connection_kwargs == { + "socket_timeout": 10.0, + "socket_keepalive": True, + "health_check_interval": 30, + "retry_on_timeout": True, + } + + async def handler(message): + return message + + await transport.subscribe( + "orders", handler, handler_id="orders-handler-1", retry_on_idle_ms=500 + ) + + assert transport._reclaimer_manager is not None + assert transport._reclaimer_manager._connection_kwargs == { + "socket_timeout": 10.0, + "socket_keepalive": True, + "health_check_interval": 30, + "retry_on_timeout": True, + } + + +@pytest.mark.asyncio +async def test_reclaimer_start_applies_connection_kwargs(monkeypatch): + """start() forwards connection_kwargs to from_url while pinning + decode_responses=False (callers cannot override the binary-passthrough flag).""" + from eggai.transport import pending_reclaimer + from eggai.transport.pending_reclaimer import PendingReclaimerManager + + captured: dict = {} + + class _FakeClient: + async def aclose(self): + pass + + def fake_from_url(url, **kwargs): + captured["url"] = url + captured["kwargs"] = kwargs + return _FakeClient() + + monkeypatch.setattr(pending_reclaimer.aioredis, "from_url", fake_from_url) + + manager = PendingReclaimerManager( + "redis://localhost:6379", + connection_kwargs={ + "socket_timeout": 10.0, + "socket_keepalive": True, + # Even if a caller sneaks decode_responses in, it must stay False. + "decode_responses": True, + }, + ) + await manager.start() + + assert captured["url"] == "redis://localhost:6379" + assert captured["kwargs"]["socket_timeout"] == 10.0 + assert captured["kwargs"]["socket_keepalive"] is True + assert captured["kwargs"]["decode_responses"] is False + + await manager.stop() + + +@pytest.mark.asyncio +async def test_monitor_applies_connection_kwargs(monkeypatch): + """The group monitor's background client gets the transport's resilience + kwargs while pinning decode_responses=True (its string commands need decoding, + and that flag must not be overridable).""" + from eggai.transport import redis as redis_module + + transport = RedisTransport( + socket_timeout=10.0, + socket_keepalive=True, + health_check_interval=30, + ) + + captured: dict = {} + + class _FakeClient: + async def aclose(self): + pass + + def fake_from_url(url, **kwargs): + captured["url"] = url + captured["kwargs"] = kwargs + return _FakeClient() + + monkeypatch.setattr(redis_module.aioredis, "from_url", fake_from_url) + + # _running stays False, so the monitor creates its client, skips the loop body, + # and closes the client via the finally block — enough to capture from_url. + await transport._monitor_stream_groups() + + assert captured["url"] == transport._redis_url + assert captured["kwargs"]["socket_timeout"] == 10.0 + assert captured["kwargs"]["socket_keepalive"] is True + assert captured["kwargs"]["health_check_interval"] == 30 + assert captured["kwargs"]["decode_responses"] is True + + @pytest.mark.asyncio async def test_publish_maxlen_bounds_stream_length(): """Integration: publishing well past max_len keeps the stream approximately