Skip to content

Commit c325865

Browse files
committed
feat(events): improve events kafka pool & client retry
* Check consumers on access from kafka consumer pool * Allow infinite retries from python actions consumer * Locking fixes, race condition fixes with consumer pool
1 parent fac6c2f commit c325865

File tree

14 files changed

+1450
-164
lines changed

14 files changed

+1450
-164
lines changed

datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class DataHubEventsSourceConfig(ConfigModel):
6262
consumer_id: Optional[str] = None # Used to store offset for the consumer.
6363
lookback_days: Optional[int] = None
6464
reset_offsets: Optional[bool] = False
65+
infinite_retry: Optional[bool] = False
6566

6667
# Time and Exit Conditions.
6768
kill_after_idle_timeout: bool = False
@@ -106,6 +107,7 @@ def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext):
106107
graph=self.ctx.graph.graph,
107108
lookback_days=self.source_config.lookback_days,
108109
reset_offsets=self.source_config.reset_offsets,
110+
infinite_retry=self.source_config.infinite_retry,
109111
)
110112

111113
self.ack_manager = AckManager()
@@ -120,6 +122,7 @@ def _initialize_topic_consumers(
120122
graph: DataHubGraph,
121123
lookback_days: Optional[int],
122124
reset_offsets: Optional[bool],
125+
infinite_retry: Optional[bool],
123126
) -> Dict[str, DataHubEventsConsumer]:
124127
"""
125128
Initialize DataHub consumers for each topic with appropriate consumer IDs.
@@ -156,6 +159,7 @@ def _initialize_topic_consumers(
156159
consumer_id=topic_consumer_id,
157160
lookback_days=lookback_days,
158161
reset_offsets=reset_offsets,
162+
infinite_retry=infinite_retry,
159163
)
160164

161165
return topic_consumers

datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
Timeout,
1212
)
1313
from tenacity import (
14-
retry,
14+
Retrying,
1515
retry_if_exception_type,
1616
stop_after_attempt,
17+
stop_never,
1718
wait_exponential,
1819
)
1920

@@ -49,6 +50,7 @@ def __init__(
4950
offset_id: Optional[str] = None,
5051
lookback_days: Optional[int] = None,
5152
reset_offsets: Optional[bool] = False,
53+
infinite_retry: Optional[bool] = False,
5254
):
5355
# 1) Always set self.consumer_id, even if None, so tests can assert it safely.
5456
self.consumer_id: Optional[str] = consumer_id
@@ -57,6 +59,9 @@ def __init__(
5759
self.graph: DataHubGraph = graph
5860
self.offset_id: Optional[str] = offset_id
5961
self.default_lookback_days: Optional[int] = lookback_days
62+
self.infinite_retry: bool = (
63+
infinite_retry if infinite_retry is not None else False
64+
)
6065
self.offsets_store: Optional[
6166
DataHubEventsConsumerPlatformResourceOffsetsStore
6267
] = None
@@ -83,14 +88,6 @@ def __init__(
8388
else:
8489
logger.debug("Starting DataHub Events Consumer with no consumer ID.")
8590

86-
@retry(
87-
retry=retry_if_exception_type(
88-
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
89-
),
90-
wait=wait_exponential(multiplier=1, min=2, max=30),
91-
stop=stop_after_attempt(3),
92-
reraise=True,
93-
)
9491
def poll_events(
9592
self,
9693
topic: str,
@@ -101,6 +98,36 @@ def poll_events(
10198
"""
10299
Fetch events for a specific topic.
103100
"""
101+
stop_condition = stop_never if self.infinite_retry else stop_after_attempt(15)
102+
103+
retry_strategy = Retrying(
104+
retry=retry_if_exception_type(
105+
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
106+
),
107+
wait=wait_exponential(multiplier=1, min=2, max=60),
108+
stop=stop_condition,
109+
reraise=True,
110+
)
111+
112+
for attempt in retry_strategy:
113+
with attempt:
114+
return self._poll_events_impl(
115+
topic, offset_id, limit, poll_timeout_seconds
116+
)
117+
118+
# This should never be reached due to reraise=True, but mypy needs it
119+
raise RuntimeError("Retry strategy exhausted without returning or raising")
120+
121+
def _poll_events_impl(
122+
self,
123+
topic: str,
124+
offset_id: Optional[str] = None,
125+
limit: Optional[int] = None,
126+
poll_timeout_seconds: Optional[int] = None,
127+
) -> ExternalEventsResponse:
128+
"""
129+
Internal implementation of poll_events.
130+
"""
104131
endpoint = f"{self.base_url}/v1/events/poll"
105132

106133
# If the caller provided an offset_id, use it; otherwise fall back to self.offset_id.

datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py

Lines changed: 145 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,12 +200,12 @@ def test_poll_events_http_error(mock_graph: DataHubGraph) -> None:
200200
with patch(
201201
"requests.get", side_effect=HTTPError(response=dummy_response)
202202
) as mock_get:
203-
# The default Tenacity stop_after_attempt=3
203+
# The default Tenacity stop_after_attempt=15
204204
with pytest.raises(HTTPError):
205205
consumer.poll_events(topic="TestTopic")
206206

207207
# requests.get should be called multiple times due to retry
208-
assert mock_get.call_count == 3
208+
assert mock_get.call_count == 15
209209

210210

211211
def test_poll_events_connection_error(mock_graph: DataHubGraph) -> None:
@@ -225,7 +225,7 @@ def test_poll_events_connection_error(mock_graph: DataHubGraph) -> None:
225225
consumer.poll_events(topic="TestTopic")
226226

227227
# requests.get should be called multiple times due to retry
228-
assert mock_get.call_count == 3
228+
assert mock_get.call_count == 15
229229

230230

231231
def test_poll_events_chunked_encoding_error(mock_graph: DataHubGraph) -> None:
@@ -245,7 +245,7 @@ def test_poll_events_chunked_encoding_error(mock_graph: DataHubGraph) -> None:
245245
consumer.poll_events(topic="TestTopic")
246246

247247
# requests.get should be called multiple times due to retry
248-
assert mock_get.call_count == 3
248+
assert mock_get.call_count == 15
249249

250250

251251
def test_poll_events_timeout(mock_graph: DataHubGraph) -> None:
@@ -263,7 +263,7 @@ def test_poll_events_timeout(mock_graph: DataHubGraph) -> None:
263263
consumer.poll_events(topic="TestTopic")
264264

265265
# requests.get should be called multiple times due to retry
266-
assert mock_get.call_count == 3
266+
assert mock_get.call_count == 15
267267

268268

269269
def test_get_events(
@@ -324,3 +324,143 @@ def test_commit_offsets_no_store(mock_graph: DataHubGraph) -> None:
324324
# No error should occur, and no calls to the store.
325325
# We just assert that the code doesn't raise an exception.
326326
# (No additional assert needed here.)
327+
328+
329+
def test_poll_events_infinite_retry_retries_more_than_default(
330+
mock_graph: DataHubGraph,
331+
) -> None:
332+
"""
333+
Test that when infinite_retry=True, poll_events retries more than the default 15 times.
334+
"""
335+
consumer = DataHubEventsConsumer(
336+
graph=mock_graph,
337+
consumer_id="test-consumer",
338+
offset_id="initial-offset",
339+
infinite_retry=True,
340+
)
341+
342+
dummy_response = Response()
343+
# Fail 5 times, then succeed on the 6th attempt
344+
call_count = 0
345+
346+
def side_effect(*args, **kwargs):
347+
nonlocal call_count
348+
call_count += 1
349+
if call_count < 6:
350+
raise HTTPError(response=dummy_response)
351+
# Return a successful response on the 6th attempt
352+
mock_response = MagicMock()
353+
mock_response.json.return_value = {
354+
"offsetId": "success-offset",
355+
"count": 0,
356+
"events": [],
357+
}
358+
mock_response.raise_for_status.return_value = None
359+
return mock_response
360+
361+
with patch("requests.get", side_effect=side_effect) as mock_get:
362+
result = consumer.poll_events(topic="TestTopic")
363+
364+
# Should have been called 6 times (5 failures + 1 success)
365+
assert mock_get.call_count == 6
366+
assert result.offsetId == "success-offset"
367+
368+
369+
def test_poll_events_infinite_retry_false_uses_default_retries(
370+
mock_graph: DataHubGraph,
371+
) -> None:
372+
"""
373+
Test that when infinite_retry=False (default), poll_events only retries 15 times.
374+
"""
375+
consumer = DataHubEventsConsumer(
376+
graph=mock_graph,
377+
consumer_id="test-consumer",
378+
offset_id="initial-offset",
379+
infinite_retry=False,
380+
)
381+
382+
dummy_response = Response()
383+
with patch(
384+
"requests.get", side_effect=HTTPError(response=dummy_response)
385+
) as mock_get:
386+
with pytest.raises(HTTPError):
387+
consumer.poll_events(topic="TestTopic")
388+
389+
# Should only retry 15 times (default behavior)
390+
assert mock_get.call_count == 15
391+
392+
393+
def test_poll_events_infinite_retry_connection_error(
394+
mock_graph: DataHubGraph,
395+
) -> None:
396+
"""
397+
Test that infinite_retry works with ConnectionError and retries more than default (15).
398+
"""
399+
consumer = DataHubEventsConsumer(
400+
graph=mock_graph,
401+
consumer_id="test-consumer",
402+
offset_id="initial-offset",
403+
infinite_retry=True,
404+
)
405+
406+
# Fail 4 times, then succeed
407+
call_count = 0
408+
409+
def side_effect(*args, **kwargs):
410+
nonlocal call_count
411+
call_count += 1
412+
if call_count < 5:
413+
raise ConnectionError("Connection Error")
414+
# Return a successful response on the 5th attempt
415+
mock_response = MagicMock()
416+
mock_response.json.return_value = {
417+
"offsetId": "success-offset",
418+
"count": 0,
419+
"events": [],
420+
}
421+
mock_response.raise_for_status.return_value = None
422+
return mock_response
423+
424+
with patch("requests.get", side_effect=side_effect) as mock_get:
425+
result = consumer.poll_events(topic="TestTopic")
426+
427+
# Should have been called 5 times (4 failures + 1 success)
428+
assert mock_get.call_count == 5
429+
assert result.offsetId == "success-offset"
430+
431+
432+
def test_poll_events_infinite_retry_timeout(mock_graph: DataHubGraph) -> None:
433+
"""
434+
Test that infinite_retry works with Timeout and retries more than default.
435+
"""
436+
consumer = DataHubEventsConsumer(
437+
graph=mock_graph,
438+
consumer_id="test-consumer",
439+
offset_id="initial-offset",
440+
infinite_retry=True,
441+
)
442+
443+
# Fail 7 times, then succeed
444+
call_count = 0
445+
446+
def side_effect(*args, **kwargs):
447+
nonlocal call_count
448+
call_count += 1
449+
if call_count < 8:
450+
raise Timeout("Request Timeout")
451+
# Return a successful response on the 8th attempt
452+
mock_response = MagicMock()
453+
mock_response.json.return_value = {
454+
"offsetId": "success-offset",
455+
"count": 0,
456+
"events": [],
457+
}
458+
mock_response.raise_for_status.return_value = None
459+
return mock_response
460+
461+
with patch("requests.get", side_effect=side_effect) as mock_get:
462+
result = consumer.poll_events(topic="TestTopic")
463+
464+
# Should have been called 8 times (7 failures + 1 success)
465+
assert mock_get.call_count == 8
466+
assert result.offsetId == "success-offset"

docs/actions/sources/datahub-cloud-event-source.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ source:
7777
topics: ["PlatformEvent_v1"] # Add MetadataChangeLog_Versioned_v1 and / or MetadataChangeLog_Timeseries_v1 to generate raw MCL events.
7878
lookback_days: 7 # Look back 7 days for events
7979
reset_offsets: true # Ignore stored offsets and start fresh
80+
infinite_retry: true # Enable infinite retry for connection failures (default: false)
8081
kill_after_idle_timeout: true # Enable shutdown after idle period
8182
idle_timeout_duration_seconds: 60 # Idle timeout set to 60 seconds
8283
event_processing_time_max_duration_seconds: 45 # Max processing time of 45 seconds per batch
@@ -94,6 +95,7 @@ Note that the `datahub` configuration block is **required** to connect to your D
9495
| `topics` || `PlatformEvent_v1` | The name of the topic from which events will be consumed. By default only produces `EntityChangeEvent_v1` events. To include `MetadataChangeLog_v1` events, set this value to include ["MetadataChangeLog_Versioned_v1", "MetadataChangeLog_Timeseries_v1"] |
9596
| `lookback_days` || None | Optional number of days to look back when polling for events. |
9697
| `reset_offsets` || `False` | When set to `True`, the consumer will ignore any stored offsets and start fresh. |
98+
| `infinite_retry` || `False` | When set to `True`, the consumer will retry indefinitely on connection failures (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) with exponential backoff (2s to 60s). When `False` (default), it retries up to 15 times before failing. |
9799
| `kill_after_idle_timeout` || `False` | If `True`, stops the consumer after being idle for the specified timeout duration. |
98100
| `idle_timeout_duration_seconds` || `30` | Duration in seconds after which, if no events are received, the consumer is considered idle. |
99101
| `event_processing_time_max_duration_seconds` || `30` | Maximum allowed time in seconds for processing events before timing out. |
@@ -110,3 +112,7 @@ Yes, simply set `reset_offsets` to True for a single run of the action. Remember
110112

111113
Today, there is undefined behavior deploying multiple actions with the same name using the DataHub Cloud Events Source.
112114
All events must be processed by a single running action
115+
116+
3. How do I handle transient connection failures?
117+
118+
By default, the consumer will retry up to 15 times on connection failures (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) with exponential backoff (2s to 60s). If you expect longer outages or want to ensure the consumer continues retrying indefinitely, set `infinite_retry: true` in the source configuration. When enabled, the consumer will retry indefinitely with exponential backoff (starting at 2 seconds, increasing up to 60 seconds, then continuing at 60 seconds) until the connection is restored.

metadata-service/configuration/src/main/resources/application.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,8 @@ kafka:
583583
consumerPool:
584584
initialSize: ${KAFKA_CONSUMER_POOL_INITIAL_SIZE:1}
585585
maxSize: ${KAFKA_CONSUMER_POOL_MAX_SIZE:5}
586+
validationTimeoutSeconds: ${KAFKA_CONSUMER_POOL_VALIDATION_TIMEOUT_SECONDS:5}
587+
validationCacheIntervalMinutes: ${KAFKA_CONSUMER_POOL_VALIDATION_CACHE_INTERVAL_MINUTES:5}
586588

587589
schemaRegistry:
588590
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE

metadata-service/events-service/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99

1010
implementation externalDependency.kafkaClients
1111
implementation externalDependency.kafkaAvroSerde
12+
implementation project(':metadata-utils')
1213

1314
testImplementation externalDependency.mockito
1415
testImplementation externalDependency.testng

0 commit comments

Comments
 (0)