Skip to content

Commit 39044eb

Browse files
committed
feat(events): improve events kafka pool & client retry
* Check consumers on access from kafka consumer pool * Allow infinite retries from python actions consumer * Locking fixes, race condition fixes with consumer pool
1 parent ba7842d commit 39044eb

File tree

17 files changed

+1904
-174
lines changed

17 files changed

+1904
-174
lines changed

datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class DataHubEventsSourceConfig(ConfigModel):
6262
consumer_id: Optional[str] = None # Used to store offset for the consumer.
6363
lookback_days: Optional[int] = None
6464
reset_offsets: Optional[bool] = False
65+
infinite_retry: Optional[bool] = False
6566

6667
# Time and Exit Conditions.
6768
kill_after_idle_timeout: bool = False
@@ -106,6 +107,7 @@ def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext):
106107
graph=self.ctx.graph.graph,
107108
lookback_days=self.source_config.lookback_days,
108109
reset_offsets=self.source_config.reset_offsets,
110+
infinite_retry=self.source_config.infinite_retry,
109111
)
110112

111113
self.ack_manager = AckManager()
@@ -120,6 +122,7 @@ def _initialize_topic_consumers(
120122
graph: DataHubGraph,
121123
lookback_days: Optional[int],
122124
reset_offsets: Optional[bool],
125+
infinite_retry: Optional[bool],
123126
) -> Dict[str, DataHubEventsConsumer]:
124127
"""
125128
Initialize DataHub consumers for each topic with appropriate consumer IDs.
@@ -156,6 +159,7 @@ def _initialize_topic_consumers(
156159
consumer_id=topic_consumer_id,
157160
lookback_days=lookback_days,
158161
reset_offsets=reset_offsets,
162+
infinite_retry=infinite_retry,
159163
)
160164

161165
return topic_consumers

datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
Timeout,
1212
)
1313
from tenacity import (
14-
retry,
14+
Retrying,
1515
retry_if_exception_type,
1616
stop_after_attempt,
17+
stop_never,
1718
wait_exponential,
1819
)
1920

@@ -49,6 +50,7 @@ def __init__(
4950
offset_id: Optional[str] = None,
5051
lookback_days: Optional[int] = None,
5152
reset_offsets: Optional[bool] = False,
53+
infinite_retry: Optional[bool] = False,
5254
):
5355
# 1) Always set self.consumer_id, even if None, so tests can assert it safely.
5456
self.consumer_id: Optional[str] = consumer_id
@@ -57,6 +59,9 @@ def __init__(
5759
self.graph: DataHubGraph = graph
5860
self.offset_id: Optional[str] = offset_id
5961
self.default_lookback_days: Optional[int] = lookback_days
62+
self.infinite_retry: bool = (
63+
infinite_retry if infinite_retry is not None else False
64+
)
6065
self.offsets_store: Optional[
6166
DataHubEventsConsumerPlatformResourceOffsetsStore
6267
] = None
@@ -83,14 +88,6 @@ def __init__(
8388
else:
8489
logger.debug("Starting DataHub Events Consumer with no consumer ID.")
8590

86-
@retry(
87-
retry=retry_if_exception_type(
88-
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
89-
),
90-
wait=wait_exponential(multiplier=1, min=2, max=30),
91-
stop=stop_after_attempt(3),
92-
reraise=True,
93-
)
9491
def poll_events(
9592
self,
9693
topic: str,
@@ -101,6 +98,36 @@ def poll_events(
10198
"""
10299
Fetch events for a specific topic.
103100
"""
101+
stop_condition = stop_never if self.infinite_retry else stop_after_attempt(15)
102+
103+
retry_strategy = Retrying(
104+
retry=retry_if_exception_type(
105+
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
106+
),
107+
wait=wait_exponential(multiplier=1, min=2, max=60),
108+
stop=stop_condition,
109+
reraise=True,
110+
)
111+
112+
for attempt in retry_strategy:
113+
with attempt:
114+
return self._poll_events_impl(
115+
topic, offset_id, limit, poll_timeout_seconds
116+
)
117+
118+
# This should never be reached due to reraise=True, but mypy needs it
119+
raise RuntimeError("Retry strategy exhausted without returning or raising")
120+
121+
def _poll_events_impl(
122+
self,
123+
topic: str,
124+
offset_id: Optional[str] = None,
125+
limit: Optional[int] = None,
126+
poll_timeout_seconds: Optional[int] = None,
127+
) -> ExternalEventsResponse:
128+
"""
129+
Internal implementation of poll_events.
130+
"""
104131
endpoint = f"{self.base_url}/v1/events/poll"
105132

106133
# If the caller provided an offset_id, use it; otherwise fall back to self.offset_id.

0 commit comments

Comments
 (0)