Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- **RedisTransport background clients**: connection-resilience kwargs passed to the
transport (`socket_timeout`, `socket_connect_timeout`, `socket_keepalive`,
`socket_keepalive_options`, `health_check_interval`, `retry_on_timeout`,
`retry_on_error`, `max_connections`, and the `ssl_*` options) are now forwarded
to *both* long-lived background clients — the PEL reclaimer and the consumer-group
monitor. Previously each created its client with no socket timeout or keepalive
regardless of the transport's settings, so a silently dropped connection (e.g.
cloud Redis failover or an idle-connection reaper) left its blocking reads hung
indefinitely with no way to recover — and for the monitor, that hang struck
during the very failover it exists to recover from. `decode_responses` remains
pinned per client (`False` for the reclaimer's binary passthrough, `True` for the
monitor's string commands) and cannot be overridden by callers.

### Added
- **RedisTransport**: Exponential **retry backoff** for SDK-managed retries. New
`subscribe()` options `retry_backoff_multiplier` (default `1.0` = the previous
Expand Down
15 changes: 13 additions & 2 deletions sdk/eggai/transport/pending_reclaimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@
message body) can be used for application-level deduplication.
"""

def __init__(self, redis_url: str):
def __init__(self, redis_url: str, connection_kwargs: dict[str, Any] | None = None):
self._redis_url = redis_url
# Connection-resilience settings (socket_timeout, socket_keepalive,
# health_check_interval, retry_on_timeout, …) forwarded from the
# transport so this independent client recovers from a silently dropped
# connection the same way the broker does. Without them a blocking read
# against a half-dead socket (e.g. cloud Redis failover) hangs forever
# with no socket timeout to break it.
self._connection_kwargs: dict[str, Any] = connection_kwargs or {}
self._redis_client: aioredis.Redis | None = None
self._configs: dict[tuple[str, str, str], ReclaimerConfig] = {}
self._tasks: dict[tuple[str, str, str], asyncio.Task] = {}
Expand All @@ -157,7 +164,11 @@
"""Start one background task per registered config. Safe to call again after stop."""
# decode_responses=False: field values are kept as raw bytes so that
# FastStream's binary-encoded __data__ field is passed through unchanged.
self._redis_client = aioredis.from_url(self._redis_url, decode_responses=False)
# decode_responses is pinned here and must not be overridden by callers.
self._redis_client = aioredis.from_url(
self._redis_url,
**{**self._connection_kwargs, "decode_responses": False},

Check notice on line 170 in sdk/eggai/transport/pending_reclaimer.py

View workflow job for this annotation

GitHub Actions / QualOps Code Quality

LOW: maintainability

The `decode_responses` key in `connection_kwargs` can be silently overridden by a caller who passes it in, but the comment says it "must not be overridden by callers" — the merge expression `{**self._connection_kwargs, "decode_responses": False}` does enforce the pin correctly, but there is no validation or warning when a caller passes `decode_responses=True`, which could cause confusion during debugging.
)
for key, config in self._configs.items():
if key in self._tasks and not self._tasks[key].done():
continue
Expand Down
43 changes: 41 additions & 2 deletions sdk/eggai/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@
# the stream and own their own PEL entries — the competing-consumers pattern.
_CONSUMER_INSTANCE = f"{socket.gethostname()}-{os.getpid()}"

# Connection settings that, when passed to the broker, must also be forwarded to
# the transport's other long-lived background clients — the PEL reclaimer and the
# consumer-group monitor — so every connection recovers from a silently dropped
# socket the same way. Kept to redis-py connection/resilience kwargs that are valid
# for ``aioredis.from_url`` — broker/FastStream-only kwargs (decoder, middlewares,
# asyncapi_*, …) are intentionally excluded. ``decode_responses`` is omitted on
# purpose: each background client pins it itself (False for the reclaimer's binary
# passthrough, True for the monitor's string commands).
_BACKGROUND_CLIENT_CONNECTION_KEYS = (

Check warning on line 35 in sdk/eggai/transport/redis.py

View workflow job for this annotation

GitHub Actions / QualOps Code Quality

MEDIUM: maintainability

The `_BACKGROUND_CLIENT_CONNECTION_KEYS` tuple omits `retry` (the redis-py `Retry` object) and `ssl` (the boolean SSL flag), which are valid `aioredis.from_url` kwargs that users may pass to `RedisTransport`.
"socket_timeout",
"socket_connect_timeout",
"socket_keepalive",
"socket_keepalive_options",
"health_check_interval",
"retry_on_timeout",
"retry_on_error",
"max_connections",
"ssl_keyfile",
"ssl_certfile",
"ssl_cert_reqs",
"ssl_ca_certs",
"ssl_check_hostname",
)


@dataclass(frozen=True)
class _StreamGroupInfo:
Expand Down Expand Up @@ -132,6 +156,12 @@
else:
self.broker = RedisBroker(url, log_level=logging.INFO, **kwargs)
self._redis_url = url
# Forward connection-resilience kwargs to the transport's background clients
# (PEL reclaimer, group monitor) so they don't hang on a silently dropped
# connection while the broker recovers.
self._connection_kwargs: dict[str, Any] = {

Check warning on line 162 in sdk/eggai/transport/redis.py

View workflow job for this annotation

GitHub Actions / QualOps Code Quality

MEDIUM: bug

The `_monitor_stream_groups` background client is constructed with `**{**self._connection_kwargs, 'decode_responses': True}`, but `self._connection_kwargs` is populated from `kwargs` at construction time — if the user passed a `broker=` instance (line 154-155), `kwargs` is still forwarded to `_connection_kwargs` even though it was not used to build the broker, potentially injecting unexpected kwargs into `aioredis.from_url`.
k: kwargs[k] for k in _BACKGROUND_CLIENT_CONNECTION_KEYS if k in kwargs
}
self._max_len = max_len
self._retry_max_len = retry_max_len
self._running = False
Expand Down Expand Up @@ -632,7 +662,14 @@
unprocessed messages are redelivered. Falls back to the original
group_create_id with MKSTREAM when the stream itself is gone.
"""
client = aioredis.from_url(self._redis_url, decode_responses=True)
# decode_responses=True (string commands here) is pinned and must not be
# overridden; the forwarded resilience kwargs (socket_timeout, keepalive, …)
# keep this client from hanging on a half-dead socket during the very
# failover this monitor exists to recover from.
client = aioredis.from_url(

Check notice on line 669 in sdk/eggai/transport/redis.py

View workflow job for this annotation

GitHub Actions / QualOps Code Quality

LOW: bug

The `_monitor_stream_groups` method creates a new Redis client on every invocation but the client is only closed in the `finally` block; if `aioredis.from_url` itself raises, the partially-constructed client may leak.
self._redis_url,
**{**self._connection_kwargs, "decode_responses": True},
)
try:
while self._running:
await asyncio.sleep(self._group_monitor_interval_s)
Expand Down Expand Up @@ -684,7 +721,9 @@
backoff_jitter: float = 0.0,
) -> tuple[str, str, str]:
if self._reclaimer_manager is None:
self._reclaimer_manager = PendingReclaimerManager(self._redis_url)
self._reclaimer_manager = PendingReclaimerManager(
self._redis_url, connection_kwargs=self._connection_kwargs
)
return self._reclaimer_manager.add(
ReclaimerConfig(
stream=self._get_stream_key(stream),
Expand Down
113 changes: 113 additions & 0 deletions sdk/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,119 @@ async def handler(message):
assert all(config.max_len == 250 for config in configs2)


@pytest.mark.asyncio
async def test_connection_kwargs_propagate_to_reclaimer():
"""Connection-resilience kwargs passed to the transport reach the reclaimer's
own client, so it recovers from a dropped connection like the broker does."""
transport = RedisTransport(
socket_timeout=10.0,
socket_keepalive=True,
health_check_interval=30,
retry_on_timeout=True,
# A broker/FastStream-only kwarg that must NOT leak into the reclaimer's
# aioredis.from_url call.
graceful_timeout=20.0,
)

assert transport._connection_kwargs == {
"socket_timeout": 10.0,
"socket_keepalive": True,
"health_check_interval": 30,
"retry_on_timeout": True,
}

async def handler(message):
return message

await transport.subscribe(
"orders", handler, handler_id="orders-handler-1", retry_on_idle_ms=500
)

assert transport._reclaimer_manager is not None
assert transport._reclaimer_manager._connection_kwargs == {
"socket_timeout": 10.0,
"socket_keepalive": True,
"health_check_interval": 30,
"retry_on_timeout": True,
}


@pytest.mark.asyncio
async def test_reclaimer_start_applies_connection_kwargs(monkeypatch):
"""start() forwards connection_kwargs to from_url while pinning
decode_responses=False (callers cannot override the binary-passthrough flag)."""
from eggai.transport import pending_reclaimer
from eggai.transport.pending_reclaimer import PendingReclaimerManager

captured: dict = {}

class _FakeClient:
async def aclose(self):
pass

def fake_from_url(url, **kwargs):
captured["url"] = url
captured["kwargs"] = kwargs
return _FakeClient()

monkeypatch.setattr(pending_reclaimer.aioredis, "from_url", fake_from_url)

manager = PendingReclaimerManager(
"redis://localhost:6379",
connection_kwargs={
"socket_timeout": 10.0,
"socket_keepalive": True,
# Even if a caller sneaks decode_responses in, it must stay False.
"decode_responses": True,
},
)
await manager.start()

assert captured["url"] == "redis://localhost:6379"
assert captured["kwargs"]["socket_timeout"] == 10.0
assert captured["kwargs"]["socket_keepalive"] is True
assert captured["kwargs"]["decode_responses"] is False

await manager.stop()


@pytest.mark.asyncio
async def test_monitor_applies_connection_kwargs(monkeypatch):
"""The group monitor's background client gets the transport's resilience
kwargs while pinning decode_responses=True (its string commands need decoding,
and that flag must not be overridable)."""
from eggai.transport import redis as redis_module

transport = RedisTransport(
socket_timeout=10.0,
socket_keepalive=True,
health_check_interval=30,
)

captured: dict = {}

class _FakeClient:
async def aclose(self):
pass

def fake_from_url(url, **kwargs):
captured["url"] = url
captured["kwargs"] = kwargs
return _FakeClient()

monkeypatch.setattr(redis_module.aioredis, "from_url", fake_from_url)

# _running stays False, so the monitor creates its client, skips the loop body,
# and closes the client via the finally block — enough to capture from_url.
await transport._monitor_stream_groups()

assert captured["url"] == transport._redis_url
assert captured["kwargs"]["socket_timeout"] == 10.0
assert captured["kwargs"]["socket_keepalive"] is True
assert captured["kwargs"]["health_check_interval"] == 30
assert captured["kwargs"]["decode_responses"] is True


@pytest.mark.asyncio
async def test_publish_maxlen_bounds_stream_length():
"""Integration: publishing well past max_len keeps the stream approximately
Expand Down
Loading