Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class DataHubEventsSourceConfig(ConfigModel):
consumer_id: Optional[str] = None # Used to store offset for the consumer.
lookback_days: Optional[int] = None
reset_offsets: Optional[bool] = False
infinite_retry: Optional[bool] = False

# Time and Exit Conditions.
kill_after_idle_timeout: bool = False
Expand Down Expand Up @@ -106,6 +107,7 @@ def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext):
graph=self.ctx.graph.graph,
lookback_days=self.source_config.lookback_days,
reset_offsets=self.source_config.reset_offsets,
infinite_retry=self.source_config.infinite_retry,
)

self.ack_manager = AckManager()
Expand All @@ -120,6 +122,7 @@ def _initialize_topic_consumers(
graph: DataHubGraph,
lookback_days: Optional[int],
reset_offsets: Optional[bool],
infinite_retry: Optional[bool],
) -> Dict[str, DataHubEventsConsumer]:
"""
Initialize DataHub consumers for each topic with appropriate consumer IDs.
Expand Down Expand Up @@ -156,6 +159,7 @@ def _initialize_topic_consumers(
consumer_id=topic_consumer_id,
lookback_days=lookback_days,
reset_offsets=reset_offsets,
infinite_retry=infinite_retry,
)

return topic_consumers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
Timeout,
)
from tenacity import (
retry,
Retrying,
retry_if_exception_type,
stop_after_attempt,
stop_never,
wait_exponential,
)

Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(
offset_id: Optional[str] = None,
lookback_days: Optional[int] = None,
reset_offsets: Optional[bool] = False,
infinite_retry: Optional[bool] = False,
):
# 1) Always set self.consumer_id, even if None, so tests can assert it safely.
self.consumer_id: Optional[str] = consumer_id
Expand All @@ -57,6 +59,9 @@ def __init__(
self.graph: DataHubGraph = graph
self.offset_id: Optional[str] = offset_id
self.default_lookback_days: Optional[int] = lookback_days
self.infinite_retry: bool = (
infinite_retry if infinite_retry is not None else False
)
self.offsets_store: Optional[
DataHubEventsConsumerPlatformResourceOffsetsStore
] = None
Expand All @@ -83,14 +88,6 @@ def __init__(
else:
logger.debug("Starting DataHub Events Consumer with no consumer ID.")

@retry(
retry=retry_if_exception_type(
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
),
wait=wait_exponential(multiplier=1, min=2, max=30),
stop=stop_after_attempt(3),
reraise=True,
)
def poll_events(
self,
topic: str,
Expand All @@ -101,6 +98,36 @@ def poll_events(
"""
Fetch events for a specific topic.
"""
stop_condition = stop_never if self.infinite_retry else stop_after_attempt(15)

retry_strategy = Retrying(
retry=retry_if_exception_type(
(HTTPError, ConnectionError, ChunkedEncodingError, Timeout)
),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_condition,
reraise=True,
)

for attempt in retry_strategy:
with attempt:
return self._poll_events_impl(
topic, offset_id, limit, poll_timeout_seconds
)

# This should never be reached due to reraise=True, but mypy needs it
raise RuntimeError("Retry strategy exhausted without returning or raising")

def _poll_events_impl(
self,
topic: str,
offset_id: Optional[str] = None,
limit: Optional[int] = None,
poll_timeout_seconds: Optional[int] = None,
) -> ExternalEventsResponse:
"""
Internal implementation of poll_events.
"""
endpoint = f"{self.base_url}/v1/events/poll"

# If the caller provided an offset_id, use it; otherwise fall back to self.offset_id.
Expand Down
Loading
Loading