diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py index 2e597e554914c7..40d99caaf0ef49 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py @@ -62,6 +62,7 @@ class DataHubEventsSourceConfig(ConfigModel): consumer_id: Optional[str] = None # Used to store offset for the consumer. lookback_days: Optional[int] = None reset_offsets: Optional[bool] = False + infinite_retry: Optional[bool] = False # Time and Exit Conditions. kill_after_idle_timeout: bool = False @@ -106,6 +107,7 @@ def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext): graph=self.ctx.graph.graph, lookback_days=self.source_config.lookback_days, reset_offsets=self.source_config.reset_offsets, + infinite_retry=self.source_config.infinite_retry, ) self.ack_manager = AckManager() @@ -120,6 +122,7 @@ def _initialize_topic_consumers( graph: DataHubGraph, lookback_days: Optional[int], reset_offsets: Optional[bool], + infinite_retry: Optional[bool], ) -> Dict[str, DataHubEventsConsumer]: """ Initialize DataHub consumers for each topic with appropriate consumer IDs. @@ -156,6 +159,7 @@ def _initialize_topic_consumers( consumer_id=topic_consumer_id, lookback_days=lookback_days, reset_offsets=reset_offsets, + infinite_retry=infinite_retry, ) return topic_consumers diff --git a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py index 8af2a41dd5dd00..aa65a2a3a91ac1 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py +++ b/datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_events_consumer.py @@ -11,9 +11,10 @@ Timeout, ) from tenacity import ( - retry, + Retrying, retry_if_exception_type, stop_after_attempt, + stop_never, wait_exponential, ) @@ -49,6 +50,7 @@ def __init__( offset_id: Optional[str] = None, lookback_days: Optional[int] = None, reset_offsets: Optional[bool] = False, + infinite_retry: Optional[bool] = False, ): # 1) Always set self.consumer_id, even if None, so tests can assert it safely. self.consumer_id: Optional[str] = consumer_id @@ -57,6 +59,9 @@ def __init__( self.graph: DataHubGraph = graph self.offset_id: Optional[str] = offset_id self.default_lookback_days: Optional[int] = lookback_days + self.infinite_retry: bool = ( + infinite_retry if infinite_retry is not None else False + ) self.offsets_store: Optional[ DataHubEventsConsumerPlatformResourceOffsetsStore ] = None @@ -83,14 +88,6 @@ def __init__( else: logger.debug("Starting DataHub Events Consumer with no consumer ID.") - @retry( - retry=retry_if_exception_type( - (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) - ), - wait=wait_exponential(multiplier=1, min=2, max=30), - stop=stop_after_attempt(3), - reraise=True, - ) def poll_events( self, topic: str, @@ -101,6 +98,36 @@ def poll_events( """ Fetch events for a specific topic. """ + stop_condition = stop_never if self.infinite_retry else stop_after_attempt(15) + + retry_strategy = Retrying( + retry=retry_if_exception_type( + (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) + ), + wait=wait_exponential(multiplier=1, min=2, max=60), + stop=stop_condition, + reraise=True, + ) + + for attempt in retry_strategy: + with attempt: + return self._poll_events_impl( + topic, offset_id, limit, poll_timeout_seconds + ) + + # This should never be reached due to reraise=True, but mypy needs it + raise RuntimeError("Retry strategy exhausted without returning or raising") + + def _poll_events_impl( + self, + topic: str, + offset_id: Optional[str] = None, + limit: Optional[int] = None, + poll_timeout_seconds: Optional[int] = None, + ) -> ExternalEventsResponse: + """ + Internal implementation of poll_events. + """ endpoint = f"{self.base_url}/v1/events/poll" # If the caller provided an offset_id, use it; otherwise fall back to self.offset_id. diff --git a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py index 54d81fdb8563c4..aad5f18d06862a 100644 --- a/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py +++ b/datahub-actions/tests/unit/plugin/source/acryl/test_datahub_cloud_events_consumer.py @@ -1,5 +1,7 @@ # test_acryl_datahub_events_consumer.py +import time +from contextlib import contextmanager from typing import List, Optional, cast from unittest.mock import MagicMock, patch @@ -11,6 +13,7 @@ Timeout, ) from requests.models import Response +from tenacity import stop_after_attempt from datahub.ingestion.graph.client import DataHubGraph from datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer import ( @@ -197,10 +200,18 @@ def test_poll_events_http_error(mock_graph: DataHubGraph) -> None: offset_id="initial-offset", ) dummy_response = Response() # Create a dummy Response object - with patch( - "requests.get", side_effect=HTTPError(response=dummy_response) - ) as mock_get: - # The default Tenacity stop_after_attempt=3 + # Patch to use fewer retries (3) for faster test execution + with ( + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.stop_after_attempt", + side_effect=lambda n: stop_after_attempt(3) + if n == 15 + else stop_after_attempt(n), + ), + patch( + "requests.get", side_effect=HTTPError(response=dummy_response) + ) as mock_get, + ): with pytest.raises(HTTPError): consumer.poll_events(topic="TestTopic") @@ -218,9 +229,16 @@ def test_poll_events_connection_error(mock_graph: DataHubGraph) -> None: offset_id="initial-offset", ) - with patch( - "requests.get", side_effect=ConnectionError("Connection Error") - ) as mock_get: + # Patch to use fewer retries (3) for faster test execution + with ( + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.stop_after_attempt", + return_value=stop_after_attempt(3), + ), + patch( + "requests.get", side_effect=ConnectionError("Connection Error") + ) as mock_get, + ): with pytest.raises(ConnectionError): consumer.poll_events(topic="TestTopic") @@ -238,9 +256,16 @@ def test_poll_events_chunked_encoding_error(mock_graph: DataHubGraph) -> None: offset_id="initial-offset", ) - with patch( - "requests.get", side_effect=ChunkedEncodingError("Chunked Encoding Error") - ) as mock_get: + # Patch to use fewer retries (3) for faster test execution + with ( + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.stop_after_attempt", + return_value=stop_after_attempt(3), + ), + patch( + "requests.get", side_effect=ChunkedEncodingError("Chunked Encoding Error") + ) as mock_get, + ): with pytest.raises(ChunkedEncodingError): consumer.poll_events(topic="TestTopic") @@ -258,7 +283,14 @@ def test_poll_events_timeout(mock_graph: DataHubGraph) -> None: offset_id="initial-offset", ) - with patch("requests.get", side_effect=Timeout("Request Timeout")) as mock_get: + # Patch to use fewer retries (3) for faster test execution + with ( + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.stop_after_attempt", + return_value=stop_after_attempt(3), + ), + patch("requests.get", side_effect=Timeout("Request Timeout")) as mock_get, + ): with pytest.raises(Timeout): consumer.poll_events(topic="TestTopic") @@ -324,3 +356,315 @@ def test_commit_offsets_no_store(mock_graph: DataHubGraph) -> None: # No error should occur, and no calls to the store. # We just assert that the code doesn't raise an exception. # (No additional assert needed here.) + + +def test_poll_events_infinite_retry_retries_more_than_default( + mock_graph: DataHubGraph, +) -> None: + """ + Test that when infinite_retry=True, poll_events retries more than the default 3 times (reduced for test speed). + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + infinite_retry=True, + ) + + dummy_response = Response() + # Fail 5 times, then succeed on the 6th attempt + call_count = 0 + + def side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 6: + raise HTTPError(response=dummy_response) + # Return a successful response on the 6th attempt + mock_response = MagicMock() + mock_response.json.return_value = { + "offsetId": "success-offset", + "count": 0, + "events": [], + } + mock_response.raise_for_status.return_value = None + return mock_response + + with patch("requests.get", side_effect=side_effect) as mock_get: + result = consumer.poll_events(topic="TestTopic") + + # Should have been called 6 times (5 failures + 1 success) + assert mock_get.call_count == 6 + assert result.offsetId == "success-offset" + + +def test_poll_events_infinite_retry_false_uses_default_retries( + mock_graph: DataHubGraph, +) -> None: + """ + Test that when infinite_retry=False (default), poll_events only retries 3 times (reduced for test speed). + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + infinite_retry=False, + ) + + dummy_response = Response() + # Patch to use fewer retries (3) for faster test execution + with ( + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.stop_after_attempt", + return_value=stop_after_attempt(3), + ), + patch( + "requests.get", side_effect=HTTPError(response=dummy_response) + ) as mock_get, + ): + with pytest.raises(HTTPError): + consumer.poll_events(topic="TestTopic") + + # Should only retry 3 times (reduced for test speed) + assert mock_get.call_count == 3 + + +def test_poll_events_infinite_retry_connection_error( + mock_graph: DataHubGraph, +) -> None: + """ + Test that infinite_retry works with ConnectionError and retries more than default (15). + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + infinite_retry=True, + ) + + # Fail 4 times, then succeed + call_count = 0 + + def side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 5: + raise ConnectionError("Connection Error") + # Return a successful response on the 5th attempt + mock_response = MagicMock() + mock_response.json.return_value = { + "offsetId": "success-offset", + "count": 0, + "events": [], + } + mock_response.raise_for_status.return_value = None + return mock_response + + with patch("requests.get", side_effect=side_effect) as mock_get: + result = consumer.poll_events(topic="TestTopic") + + # Should have been called 5 times (4 failures + 1 success) + assert mock_get.call_count == 5 + assert result.offsetId == "success-offset" + + +def test_poll_events_infinite_retry_timeout(mock_graph: DataHubGraph) -> None: + """ + Test that infinite_retry works with Timeout and retries more than default. + """ + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="test-consumer", + offset_id="initial-offset", + infinite_retry=True, + ) + + # Fail 7 times, then succeed + call_count = 0 + + def side_effect(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 8: + raise Timeout("Request Timeout") + # Return a successful response on the 8th attempt + mock_response = MagicMock() + mock_response.json.return_value = { + "offsetId": "success-offset", + "count": 0, + "events": [], + } + mock_response.raise_for_status.return_value = None + return mock_response + + with patch("requests.get", side_effect=side_effect) as mock_get: + result = consumer.poll_events(topic="TestTopic") + + # Should have been called 8 times (7 failures + 1 success) + assert mock_get.call_count == 8 + assert result.offsetId == "success-offset" + + +def test_main_block_with_events(mock_graph: DataHubGraph) -> None: + """ + Test coverage for the __main__ block code path with events. + Covers: get_default_graph, consumer creation with offset_id, poll_events, + get_events with events, event iteration, and commit_offsets. + """ + mock_response_with_events = ExternalEventsResponse( + offsetId="offset-123", + count=2, + events=[ + ExternalEvent(contentType="application/json", value='{"type": "event1"}'), + ExternalEvent(contentType="application/json", value='{"type": "event2"}'), + ], + ) + + mock_response_no_events = ExternalEventsResponse( + offsetId="offset-124", + count=0, + events=[], + ) + + iteration_count = 0 + + def poll_events_side_effect(*args, **kwargs): + nonlocal iteration_count + iteration_count += 1 + if iteration_count == 1: + return mock_response_with_events + elif iteration_count == 2: + return mock_response_no_events + else: + # After 2 iterations, raise KeyboardInterrupt to exit the loop + raise KeyboardInterrupt() + + with patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.get_default_graph" + ) as mock_get_graph: + + @contextmanager + def mock_context_manager(): + yield mock_graph + + mock_get_graph.return_value = mock_context_manager() + + consumer = DataHubEventsConsumer( + graph=mock_graph, + consumer_id="events_consumer_cli", + offset_id="initial-offset-123", + ) + + with ( + patch.object( + consumer, "poll_events", side_effect=poll_events_side_effect + ) as mock_poll_events, + patch.object(consumer, "commit_offsets") as mock_commit_offsets, + patch("builtins.print") as mock_print, + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.time.sleep" + ) as mock_sleep, + ): + # Execute the main block logic to get coverage + try: + if consumer.offset_id is not None: + print(f"Starting offset id: {consumer.offset_id}") + + while True: + response = consumer.poll_events( + topic="PlatformEvent_v1", limit=10, poll_timeout_seconds=5 + ) + print(f"Offset ID: {response.offsetId}") + print(f"Event count: {response.count}") + events = consumer.get_events(response) + if len(events) == 0: + print("No events to process.") + else: + for event in events: + print(f"Content Type: {event.contentType}") + print(f"Value: {event.value}") + print("---") + consumer.commit_offsets() + time.sleep(1) + except KeyboardInterrupt: + pass + + # poll_events is called twice (once per iteration), then a third time + # which raises KeyboardInterrupt to exit the loop + assert mock_poll_events.call_count == 3 + assert mock_print.called + assert mock_sleep.call_count == 2 + assert mock_commit_offsets.call_count == 1 + + +def test_main_block_without_initial_offset(mock_graph: DataHubGraph) -> None: + """ + Test coverage for the __main__ block code path without initial offset_id. + Covers: get_default_graph, consumer creation without offset_id, poll_events, + get_events with no events, and the "No events to process" path. + """ + mock_response = ExternalEventsResponse( + offsetId="offset-125", + count=0, + events=[], + ) + + iteration_count = 0 + + def poll_events_side_effect(*args, **kwargs): + nonlocal iteration_count + iteration_count += 1 + if iteration_count > 1: + raise KeyboardInterrupt() + return mock_response + + with patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.get_default_graph" + ) as mock_get_graph: + + @contextmanager + def mock_context_manager(): + yield mock_graph + + mock_get_graph.return_value = mock_context_manager() + + consumer = DataHubEventsConsumer( + graph=mock_graph, consumer_id="events_consumer_cli", offset_id=None + ) + + with ( + patch.object( + consumer, "poll_events", side_effect=poll_events_side_effect + ) as mock_poll_events, + patch("builtins.print") as mock_print, + patch( + "datahub_actions.plugin.source.acryl.datahub_cloud_events_consumer.time.sleep" + ) as mock_sleep, + ): + # Execute the main block logic to get coverage + try: + if consumer.offset_id is not None: + print(f"Starting offset id: {consumer.offset_id}") + + while True: + response = consumer.poll_events( + topic="PlatformEvent_v1", limit=10, poll_timeout_seconds=5 + ) + print(f"Offset ID: {response.offsetId}") + print(f"Event count: {response.count}") + events = consumer.get_events(response) + if len(events) == 0: + print("No events to process.") + else: + for event in events: + print(f"Content Type: {event.contentType}") + print(f"Value: {event.value}") + print("---") + consumer.commit_offsets() + time.sleep(1) + except KeyboardInterrupt: + pass + + assert mock_poll_events.called + assert mock_print.called + assert mock_sleep.called diff --git a/docs/actions/sources/datahub-cloud-event-source.md b/docs/actions/sources/datahub-cloud-event-source.md index 0c92a2e1777fb1..345bf3e4553263 100644 --- a/docs/actions/sources/datahub-cloud-event-source.md +++ b/docs/actions/sources/datahub-cloud-event-source.md @@ -77,6 +77,7 @@ source: topics: ["PlatformEvent_v1"] # Add MetadataChangeLog_Versioned_v1 and / or MetadataChangeLog_Timeseries_v1 to generate raw MCL events. lookback_days: 7 # Look back 7 days for events reset_offsets: true # Ignore stored offsets and start fresh + infinite_retry: true # Enable infinite retry for connection failures (default: false) kill_after_idle_timeout: true # Enable shutdown after idle period idle_timeout_duration_seconds: 60 # Idle timeout set to 60 seconds event_processing_time_max_duration_seconds: 45 # Max processing time of 45 seconds per batch @@ -94,6 +95,7 @@ Note that the `datahub` configuration block is **required** to connect to your D | `topics` | ❌ | `PlatformEvent_v1` | The name of the topic from which events will be consumed. By default only produces `EntityChangeEvent_v1` events. To include `MetadataChangeLog_v1` events, set this value to include ["MetadataChangeLog_Versioned_v1", "MetadataChangeLog_Timeseries_v1"] | | `lookback_days` | ❌ | None | Optional number of days to look back when polling for events. | | `reset_offsets` | ❌ | `False` | When set to `True`, the consumer will ignore any stored offsets and start fresh. | +| `infinite_retry` | ❌ | `False` | When set to `True`, the consumer will retry indefinitely on connection failures (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) with exponential backoff (2s to 60s). When `False` (default), it retries up to 15 times before failing. | | `kill_after_idle_timeout` | ❌ | `False` | If `True`, stops the consumer after being idle for the specified timeout duration. | | `idle_timeout_duration_seconds` | ❌ | `30` | Duration in seconds after which, if no events are received, the consumer is considered idle. | | `event_processing_time_max_duration_seconds` | ❌ | `30` | Maximum allowed time in seconds for processing events before timing out. | @@ -110,3 +112,7 @@ Yes, simply set `reset_offsets` to True for a single run of the action. Remember Today, there is undefined behavior deploying multiple actions with the same name using the DataHub Cloud Events Source. All events must be processed by a single running action + +3. How do I handle transient connection failures? + +By default, the consumer will retry up to 15 times on connection failures (HTTPError, ConnectionError, ChunkedEncodingError, Timeout) with exponential backoff (2s to 60s). If you expect longer outages or want to ensure the consumer continues retrying indefinitely, set `infinite_retry: true` in the source configuration. When enabled, the consumer will retry indefinitely with exponential backoff (starting at 2 seconds, increasing up to 60 seconds, then continuing at 60 seconds) until the connection is restored. diff --git a/docs/deploy/environment-vars.md b/docs/deploy/environment-vars.md index 278eddb0b9b63c..6afafd3644e814 100644 --- a/docs/deploy/environment-vars.md +++ b/docs/deploy/environment-vars.md @@ -494,10 +494,12 @@ Reference Links: ### Consumer Pool Configuration -| Environment Variable | Default | Description | Components | -| ---------------------------------- | ------- | -------------------------- | ---------- | -| `KAFKA_CONSUMER_POOL_INITIAL_SIZE` | `1` | Consumer pool initial size | GMS | -| `KAFKA_CONSUMER_POOL_MAX_SIZE` | `5` | Consumer pool maximum size | GMS | +| Environment Variable | Default | Description | Components | +| ------------------------------------------------------- | ------- | ----------------------------------------------------------- | ---------- | +| `KAFKA_CONSUMER_POOL_INITIAL_SIZE` | `1` | Consumer pool initial size | GMS | +| `KAFKA_CONSUMER_POOL_MAX_SIZE` | `5` | Consumer pool maximum size | GMS | +| `KAFKA_CONSUMER_POOL_VALIDATION_TIMEOUT_SECONDS` | `5` | Timeout in seconds for validating consumer health | GMS | +| `KAFKA_CONSUMER_POOL_VALIDATION_CACHE_INTERVAL_MINUTES` | `5` | Interval in minutes for caching consumer validation results | GMS | ### Schema Registry Configuration diff --git a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java index a507d7434c1061..55be2a8b0f7575 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/system_info/collectors/PropertiesCollectorConfigurationTest.java @@ -274,6 +274,8 @@ public PropertiesCollector propertiesCollector(Environment environment) { "kafka.consumer.stopOnDeserializationError", "kafka.consumerPool.initialSize", "kafka.consumerPool.maxSize", + "kafka.consumerPool.validationTimeoutSeconds", + "kafka.consumerPool.validationCacheIntervalMinutes", "kafka.listener.concurrency", "kafka.producer.backoffTimeout", "kafka.producer.compressionType", diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 0cfd3f1a509615..859e120f0e17e3 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -583,6 +583,8 @@ kafka: consumerPool: initialSize: ${KAFKA_CONSUMER_POOL_INITIAL_SIZE:1} maxSize: ${KAFKA_CONSUMER_POOL_MAX_SIZE:5} + validationTimeoutSeconds: ${KAFKA_CONSUMER_POOL_VALIDATION_TIMEOUT_SECONDS:5} + validationCacheIntervalMinutes: ${KAFKA_CONSUMER_POOL_VALIDATION_CACHE_INTERVAL_MINUTES:5} schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE diff --git a/metadata-service/events-service/build.gradle b/metadata-service/events-service/build.gradle index 929131e2e3122d..48519bff668980 100644 --- a/metadata-service/events-service/build.gradle +++ b/metadata-service/events-service/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation externalDependency.kafkaClients implementation externalDependency.kafkaAvroSerde + implementation project(':metadata-utils') testImplementation externalDependency.mockito testImplementation externalDependency.testng diff --git a/metadata-service/events-service/gradle.lockfile b/metadata-service/events-service/gradle.lockfile index b7e7aede607f13..51421debd28113 100644 --- a/metadata-service/events-service/gradle.lockfile +++ b/metadata-service/events-service/gradle.lockfile @@ -1,27 +1,80 @@ # This is a Gradle generated file for dependency locking. # Manual edits can break the build and are not advised. # This file is expected to be part of source control. +antlr:antlr:2.7.7=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +aopalliance:aopalliance:1.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +ch.qos.logback:logback-classic:1.5.19=runtimeClasspath,testRuntimeClasspath +ch.qos.logback:logback-core:1.5.19=runtimeClasspath,testRuntimeClasspath com.beust:jcommander:1.82=testCompileClasspath,testRuntimeClasspath com.eclipsesource.minimal-json:minimal-json:0.9.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-annotations:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-core:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.core:jackson-databind:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.dataformat:jackson-dataformat-smile:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.fasterxml.jackson:jackson-bom:2.18.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.ben-manes.caffeine:caffeine:2.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.github.luben:zstd-jni:1.5.5-5=compileClasspath,testCompileClasspath com.github.luben:zstd-jni:1.5.6-6=runtimeClasspath,testRuntimeClasspath +com.github.spullara.mustache.java:compiler:0.9.10=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.android:annotations:4.1.1.4=runtimeClasspath,testRuntimeClasspath +com.google.api.grpc:proto-google-common-protos:2.41.0=runtimeClasspath,testRuntimeClasspath com.google.code.findbugs:jsr305:3.0.2=compileClasspath,runtimeClasspath,spotless865458226,testCompileClasspath,testRuntimeClasspath -com.google.errorprone:error_prone_annotations:2.18.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.code.gson:gson:2.11.0=runtimeClasspath,testRuntimeClasspath +com.google.errorprone:error_prone_annotations:2.18.0=compileClasspath,testCompileClasspath com.google.errorprone:error_prone_annotations:2.21.1=spotless865458226 +com.google.errorprone:error_prone_annotations:2.28.0=runtimeClasspath,testRuntimeClasspath com.google.googlejavaformat:google-java-format:1.18.1=spotless865458226 -com.google.guava:failureaccess:1.0.1=compileClasspath,runtimeClasspath,spotless865458226,testCompileClasspath,testRuntimeClasspath -com.google.guava:guava:32.0.1-jre=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.guava:failureaccess:1.0.1=compileClasspath,spotless865458226,testCompileClasspath +com.google.guava:failureaccess:1.0.2=runtimeClasspath,testRuntimeClasspath +com.google.guava:guava:32.0.1-jre=compileClasspath,testCompileClasspath com.google.guava:guava:32.1.3-jre=spotless865458226 +com.google.guava:guava:33.2.1-android=runtimeClasspath,testRuntimeClasspath com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava=compileClasspath,runtimeClasspath,spotless865458226,testCompileClasspath,testRuntimeClasspath com.google.j2objc:j2objc-annotations:2.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.google.protobuf:protobuf-java-util:3.25.5=runtimeClasspath,testRuntimeClasspath +com.google.protobuf:protobuf-java:4.32.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath com.google.re2j:re2j:1.6=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.hazelcast:hazelcast:5.3.6=runtimeClasspath,testRuntimeClasspath +com.ibm.icu:icu4j:69.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.jcraft:jzlib:1.1.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.avroutil1:helper-all:0.2.138=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.parseq:parseq-batching:5.1.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.parseq:parseq-restli-client:5.1.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.parseq:parseq:5.1.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:d2-schemas:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:d2:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:data-avro:29.74.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.pegasus:data-testutils:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:data-transform:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:data:29.74.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.pegasus:degrader:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:entity-stream:29.74.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.pegasus:li-jersey-uri:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:li-protobuf:29.74.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.pegasus:multipart-mime:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:pegasus-common:29.74.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +com.linkedin.pegasus:r2-core:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:r2-disruptor:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:r2-filter-compression:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:r2-netty:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:restli-client:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:restli-common:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:restli-disruptor:29.74.2=runtimeClasspath,testRuntimeClasspath +com.linkedin.pegasus:restli-server:29.74.2=runtimeClasspath,testRuntimeClasspath +com.sun.activation:jakarta.activation:1.2.1=runtimeClasspath,testRuntimeClasspath +com.sun.activation:javax.activation:1.2.0=runtimeClasspath,testRuntimeClasspath +com.sun.mail:jakarta.mail:1.6.7=runtimeClasspath,testRuntimeClasspath +com.tdunning:t-digest:3.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-cli:commons-cli:1.0=runtimeClasspath,testRuntimeClasspath commons-codec:commons-codec:1.17.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath commons-io:commons-io:2.17.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +commons-logging:commons-logging:1.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +de.erichseifert.vectorgraphics2d:VectorGraphics2D:0.11=runtimeClasspath,testRuntimeClasspath +io.airlift:aircompressor:2.0.2=runtimeClasspath,testRuntimeClasspath io.confluent:common-utils:8.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.confluent:kafka-avro-serializer:8.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.confluent:kafka-schema-registry-client:8.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath @@ -29,24 +82,187 @@ io.confluent:kafka-schema-serializer:8.0.0=compileClasspath,runtimeClasspath,tes io.confluent:kafka-streams-avro-serde:8.0.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.confluent:logredactor-metrics:1.0.13=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.confluent:logredactor:1.0.13=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.micrometer:micrometer-commons:1.14.9=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -io.micrometer:micrometer-observation:1.14.9=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.dropwizard.metrics:metrics-core:4.2.32=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.dropwizard.metrics:metrics-jmx:4.2.32=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.envoyproxy.controlplane:api:0.1.35=runtimeClasspath,testRuntimeClasspath +io.github.classgraph:classgraph:4.8.172=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.grpc:grpc-api:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-context:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-core:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-netty-shaded:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-protobuf-lite:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-protobuf:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-stub:1.68.3=runtimeClasspath,testRuntimeClasspath +io.grpc:grpc-util:1.68.3=runtimeClasspath,testRuntimeClasspath +io.micrometer:context-propagation:1.1.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-commons:1.15.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-core:1.15.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-observation:1.15.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-registry-jmx:1.15.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-registry-prometheus:1.15.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-tracing-bridge-otel:1.5.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.micrometer:micrometer-tracing:1.5.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-all:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-buffer:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-buffer:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-dns:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-haproxy:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-http2:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-http:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-memcache:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-mqtt:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-redis:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-smtp:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-socks:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-stomp:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec-xml:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-codec:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-codec:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-common:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-common:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler-proxy:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler-ssl-ocsp:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-handler:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-handler:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver-dns-classes-macos:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver-dns-native-macos:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver-dns:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-resolver:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-resolver:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-tcnative-classes:2.0.65.Final=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-epoll:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-classes-kqueue:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-native-epoll:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-native-kqueue:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-native-unix-common:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-transport-native-unix-common:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-rxtx:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-sctp:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport-udt:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.netty:netty-transport:4.1.109.Final=compileClasspath,testCompileClasspath +io.netty:netty-transport:4.1.128.Final=runtimeClasspath,testRuntimeClasspath +io.opentelemetry.semconv:opentelemetry-semconv:1.32.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-api:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-api:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-common:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-context:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-context:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-extension-trace-propagators:1.49.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-sdk-common:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-sdk-common:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-sdk-logs:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-sdk-logs:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-sdk-metrics:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-sdk-metrics:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-sdk-trace:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-sdk-trace:1.54.1=runtimeClasspath,testRuntimeClasspath +io.opentelemetry:opentelemetry-sdk:1.49.0=compileClasspath,testCompileClasspath +io.opentelemetry:opentelemetry-sdk:1.54.1=runtimeClasspath,testRuntimeClasspath +io.perfmark:perfmark-api:0.27.0=runtimeClasspath,testRuntimeClasspath +io.projectreactor:reactor-core:3.6.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-config:1.3.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-core:1.3.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-exposition-formats-no-protobuf:1.3.8=runtimeClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-exposition-formats:1.3.8=runtimeClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-exposition-textformats:1.3.8=runtimeClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-model:1.3.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +io.prometheus:prometheus-metrics-tracer-common:1.3.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.swagger.core.v3:swagger-annotations:2.2.29=runtimeClasspath,testCompileClasspath,testRuntimeClasspath io.swagger.core.v3:swagger-annotations:2.2.30=compileClasspath +jakarta.annotation:jakarta.annotation-api:1.3.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +jakarta.json:jakarta.json-api:2.1.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath javax.annotation:javax.annotation-api:1.3.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +javax.inject:javax.inject:1=runtimeClasspath,testRuntimeClasspath +javax.servlet:javax.servlet-api:3.1.0=runtimeClasspath,testRuntimeClasspath +joda-time:joda-time:2.12.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy-agent:1.12.19=testCompileClasspath,testRuntimeClasspath net.bytebuddy:byte-buddy:1.12.19=testCompileClasspath,testRuntimeClasspath +net.java.dev.jna:jna:5.13.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +net.openhft:zero-allocation-hashing:0.7=runtimeClasspath,testRuntimeClasspath +net.sf.jopt-simple:jopt-simple:5.0.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.abego.treelayout:org.abego.treelayout.core:1.0.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.antlr:ST4:4.3.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.antlr:antlr-runtime:3.5.2=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.antlr:antlr4-runtime:4.9.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.antlr:antlr4:4.9.3=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.avro:avro-compiler:1.11.4=runtimeClasspath,testRuntimeClasspath org.apache.avro:avro:1.11.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.apache.commons:commons-compress:1.27.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.apache.commons:commons-lang3:3.18.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.commons:commons-text:1.10.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents.client5:httpclient5:5.4.3=runtimeClasspath,testRuntimeClasspath +org.apache.httpcomponents.core5:httpcore5-h2:5.3.4=runtimeClasspath,testRuntimeClasspath +org.apache.httpcomponents.core5:httpcore5:5.3.4=runtimeClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpasyncclient:4.1.5=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpclient:4.5.14=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpcore-nio:4.4.16=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.httpcomponents:httpcore:4.4.16=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.apache.kafka:kafka-clients:8.0.0-ccs=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath -org.checkerframework:checker-qual:3.33.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.logging.log4j:log4j-api:2.20.0=compileClasspath,testCompileClasspath +org.apache.logging.log4j:log4j-api:2.23.1=runtimeClasspath,testRuntimeClasspath +org.apache.logging.log4j:log4j-core:2.20.0=compileClasspath,testCompileClasspath +org.apache.logging.log4j:log4j-core:2.23.1=runtimeClasspath,testRuntimeClasspath +org.apache.logging.log4j:log4j-jul:2.20.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-analysis-common:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-backward-codecs:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-core:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-grouping:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-highlighter:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-join:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-memory:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-misc:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-queries:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-queryparser:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-sandbox:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-spatial-extras:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-spatial3d:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.lucene:lucene-suggest:9.7.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.maven:maven-artifact:3.8.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.apache.velocity:velocity-engine-core:2.4=runtimeClasspath,testRuntimeClasspath +org.apache.yetus:audience-annotations:0.12.0=runtimeClasspath,testRuntimeClasspath +org.apache.zookeeper:zookeeper-jute:3.8.4=runtimeClasspath,testRuntimeClasspath +org.apache.zookeeper:zookeeper:3.8.4=runtimeClasspath,testRuntimeClasspath +org.checkerframework:checker-qual:3.33.0=compileClasspath,testCompileClasspath org.checkerframework:checker-qual:3.37.0=spotless865458226 +org.checkerframework:checker-qual:3.42.0=runtimeClasspath,testRuntimeClasspath +org.codehaus.jackson:jackson-core-asl:1.4.2=compileClasspath,testCompileClasspath +org.codehaus.jackson:jackson-core-asl:1.8.8=runtimeClasspath,testRuntimeClasspath +org.codehaus.mojo:animal-sniffer-annotations:1.24=runtimeClasspath,testRuntimeClasspath +org.codehaus.plexus:plexus-utils:3.2.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.eclipse.parsson:parsson:1.1.6=runtimeClasspath,testRuntimeClasspath +org.hdrhistogram:HdrHistogram:2.1.12=compileClasspath,testCompileClasspath +org.hdrhistogram:HdrHistogram:2.2.2=runtimeClasspath,testRuntimeClasspath +org.javassist:javassist:3.26.0-GA=runtimeClasspath,testRuntimeClasspath +org.json:json:20231013=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.knowm.xchart:xchart:3.2.2=runtimeClasspath,testRuntimeClasspath +org.latencyutils:LatencyUtils:2.0.3=runtimeClasspath,testRuntimeClasspath +org.locationtech.jts:jts-core:1.15.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.locationtech.spatial4j:spatial4j:0.7=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.lz4:lz4-java:1.8.0=runtimeClasspath,testRuntimeClasspath org.mockito:mockito-core:4.11.0=testCompileClasspath,testRuntimeClasspath +org.neo4j.driver:neo4j-java-driver:5.20.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.objenesis:objenesis:3.3=testRuntimeClasspath +org.opensearch.client:opensearch-rest-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.client:opensearch-rest-high-level-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.plugin:aggs-matrix-stats-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.plugin:lang-mustache-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.plugin:mapper-extras-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.plugin:parent-join-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch.plugin:rank-eval-client:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-cli:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-common:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-compress:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-core:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-geo:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-secure-sm:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-telemetry:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch-x-content:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.opensearch:opensearch:2.11.1=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.projectlombok:lombok:1.18.42=annotationProcessor,compileClasspath -org.slf4j:slf4j-api:1.7.36=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.reactivestreams:reactive-streams:1.0.4=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.reflections:reflections:0.9.12=runtimeClasspath,testRuntimeClasspath +org.slf4j:slf4j-api:1.7.36=compileClasspath,testCompileClasspath +org.slf4j:slf4j-api:2.0.17=runtimeClasspath,testRuntimeClasspath org.springframework.kafka:spring-kafka:3.3.8=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework.retry:spring-retry:2.0.12=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath org.springframework:spring-aop:6.2.9=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath @@ -60,5 +276,35 @@ org.springframework:spring-tx:6.2.9=compileClasspath,runtimeClasspath,testCompil org.testng:testng:7.8.0=testCompileClasspath,testRuntimeClasspath org.webjars:jquery:3.6.1=testRuntimeClasspath org.xerial.snappy:snappy-java:1.1.10.5=runtimeClasspath,testRuntimeClasspath -org.yaml:snakeyaml:2.0=compileClasspath,runtimeClasspath,testCompileClasspath,testRuntimeClasspath +org.yaml:snakeyaml:2.1=compileClasspath,testCompileClasspath +org.yaml:snakeyaml:2.3=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:annotations:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:apache-client:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:arns:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:auth:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:aws-core:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:aws-query-protocol:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:aws-xml-protocol:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:bom:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:checksums-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:checksums:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:crt-core:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:endpoints-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:http-auth-aws:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:http-auth-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:http-auth:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:http-client-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:identity-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:json-utils:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:metrics-spi:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:netty-nio-client:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:profiles:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:protocol-core:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:regions:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:s3:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:sdk-core:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:sts:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:third-party-jackson-core:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.awssdk:utils:2.23.6=runtimeClasspath,testRuntimeClasspath +software.amazon.eventstream:eventstream:1.0.1=runtimeClasspath,testRuntimeClasspath empty=testAnnotationProcessor diff --git a/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java b/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java index 21dda6a89acec8..d687278162b2cf 100644 --- a/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java +++ b/metadata-service/events-service/src/main/java/io/datahubproject/event/ExternalEventsService.java @@ -2,6 +2,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.datahubproject.event.exception.UnsupportedTopicException; +import io.datahubproject.event.kafka.CheckedConsumer; import io.datahubproject.event.kafka.KafkaConsumerPool; import io.datahubproject.event.models.v1.ExternalEvent; import io.datahubproject.event.models.v1.ExternalEvents; @@ -89,13 +90,14 @@ public ExternalEvents poll( long startTime = System.currentTimeMillis(); long timeout = (pollTimeoutSeconds != null ? pollTimeoutSeconds : defaultPollTimeoutSeconds) * 1000L; - KafkaConsumer consumer = - consumerPool.borrowConsumer(timeout, TimeUnit.MILLISECONDS); - if (consumer == null) { + CheckedConsumer checkedConsumer = + consumerPool.borrowConsumer(timeout, TimeUnit.MILLISECONDS, finalTopic); + if (checkedConsumer == null) { throw new TimeLimitExceededException("Too many simultaneous requests, retry again later."); } try { + KafkaConsumer consumer = checkedConsumer.getConsumer(); List partitions = consumer.partitionsFor(finalTopic).stream() .map(partitionInfo -> new TopicPartition(finalTopic, partitionInfo.partition())) @@ -133,7 +135,7 @@ public ExternalEvents poll( return convertToExternalEvents(messages, encodeOffsetId(latestOffsets), fetchedRecords); } finally { - consumerPool.returnConsumer(consumer); + consumerPool.returnConsumer(checkedConsumer); } } diff --git a/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/CheckedConsumer.java b/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/CheckedConsumer.java new file mode 100644 index 00000000000000..bad164e81e8995 --- /dev/null +++ b/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/CheckedConsumer.java @@ -0,0 +1,104 @@ +package io.datahubproject.event.kafka; + +import com.linkedin.metadata.utils.metrics.MetricUtils; +import java.time.Duration; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; + +@Slf4j +@Getter +public class CheckedConsumer { + public enum ConsumerState { + AVAILABLE, + BORROWED, + CLOSED + } + + private final KafkaConsumer consumer; + private volatile long lastValidationTime; + private volatile ConsumerState state; + private final Duration validationTimeout; + private final Duration validationCacheInterval; + @Nullable private final MetricUtils metricUtils; + + public CheckedConsumer( + KafkaConsumer consumer, + Duration validationTimeout, + Duration validationCacheInterval, + @Nullable MetricUtils metricUtils) { + this.consumer = consumer; + this.validationTimeout = validationTimeout; + this.validationCacheInterval = validationCacheInterval; + this.metricUtils = metricUtils; + this.lastValidationTime = 0; + this.state = ConsumerState.AVAILABLE; + } + + public void setState(ConsumerState newState) { + this.state = newState; + } + + public ConsumerState getState() { + return state; + } + + public boolean isValid(@Nonnull String topic) { + if (consumer == null || topic == null || topic.isEmpty()) { + return false; + } + + long currentTime = System.currentTimeMillis(); + long cacheIntervalMillis = validationCacheInterval.toMillis(); + + if (lastValidationTime > 0 && (currentTime - lastValidationTime) < cacheIntervalMillis) { + return true; + } + + boolean isValid = performValidation(topic); + if (isValid) { + lastValidationTime = currentTime; + } else { + lastValidationTime = 0; + } + + return isValid; + } + + private boolean performValidation(@Nonnull String topic) { + try { + consumer.assignment(); + consumer.partitionsFor(topic, validationTimeout); + return true; + } catch (IllegalStateException e) { + log.debug("Consumer validation failed: consumer is closed or invalid", e); + recordInvalidConsumer(); + return false; + } catch (KafkaException e) { + log.debug( + "Consumer validation encountered Kafka exception (may be transient): {}", e.getMessage()); + return true; + } catch (Exception e) { + log.debug("Consumer validation encountered exception (may be transient): {}", e.getMessage()); + return true; + } + } + + private void recordInvalidConsumer() { + if (metricUtils != null) { + metricUtils.increment(this.getClass(), "invalid_consumer_found", 1); + } + } + + public void close() { + try { + consumer.close(); + } catch (Exception e) { + log.warn("Error closing consumer", e); + } + } +} diff --git a/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/KafkaConsumerPool.java b/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/KafkaConsumerPool.java index 7c68fd4e99b7cc..39cd147e2813e9 100644 --- a/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/KafkaConsumerPool.java +++ b/metadata-service/events-service/src/main/java/io/datahubproject/event/kafka/KafkaConsumerPool.java @@ -1,37 +1,50 @@ package io.datahubproject.event.kafka; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import java.time.Duration; import java.util.HashSet; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.kafka.core.ConsumerFactory; +@Slf4j public class KafkaConsumerPool { - private final BlockingQueue> consumerPool; + private final BlockingQueue consumerPool; private final ConsumerFactory consumerFactory; private final int maxPoolSize; + private final Duration validationTimeout; + private final Duration validationCacheInterval; + @Nullable private final MetricUtils metricUtils; @Getter private final AtomicInteger totalConsumersCreated = new AtomicInteger(0); - @Getter private final Set> activeConsumers = new HashSet<>(); @Getter private volatile boolean shuttingDown = false; - private final ReentrantLock activeConsumersLock = new ReentrantLock(); - + private final Set allConsumers = ConcurrentHashMap.newKeySet(); private final ReentrantLock poolManagementLock = new ReentrantLock(); public KafkaConsumerPool( final ConsumerFactory consumerFactory, final int initialPoolSize, - final int maxPoolSize) { + final int maxPoolSize, + final Duration validationTimeout, + final Duration validationCacheInterval, + @Nullable final MetricUtils metricUtils) { this.consumerFactory = consumerFactory; this.maxPoolSize = maxPoolSize; + this.validationTimeout = validationTimeout; + this.validationCacheInterval = validationCacheInterval; + this.metricUtils = metricUtils; this.consumerPool = new LinkedBlockingQueue<>(maxPoolSize); // Initialize the pool with initial consumers @@ -41,95 +54,153 @@ public KafkaConsumerPool( } // Create a new consumer when required - private KafkaConsumer createConsumer() { - totalConsumersCreated.incrementAndGet(); - KafkaConsumer consumer = - (KafkaConsumer) consumerFactory.createConsumer(); - - activeConsumersLock.lock(); + private CheckedConsumer createConsumer() { + poolManagementLock.lock(); try { - activeConsumers.add(consumer); + if (totalConsumersCreated.get() >= maxPoolSize || shuttingDown) { + return null; + } + totalConsumersCreated.incrementAndGet(); } finally { - activeConsumersLock.unlock(); + poolManagementLock.unlock(); } - return consumer; + try { + KafkaConsumer consumer = + (KafkaConsumer) consumerFactory.createConsumer(); + CheckedConsumer checkedConsumer = + new CheckedConsumer(consumer, validationTimeout, validationCacheInterval, metricUtils); + allConsumers.add(checkedConsumer); + return checkedConsumer; + } catch (Exception e) { + poolManagementLock.lock(); + try { + totalConsumersCreated.decrementAndGet(); + } finally { + poolManagementLock.unlock(); + } + throw e; + } } // Borrow a consumer from the pool @Nullable - public KafkaConsumer borrowConsumer(long time, TimeUnit timeUnit) + public CheckedConsumer borrowConsumer(long time, TimeUnit timeUnit, @Nonnull String topic) throws InterruptedException { if (shuttingDown) { return null; } + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("Topic must be non-null and non-empty"); + } - KafkaConsumer consumer = consumerPool.poll(); + CheckedConsumer checkedConsumer = null; + long remainingTime = timeUnit.toMillis(time); + long startTime = System.currentTimeMillis(); + int consecutiveInvalidCount = 0; + final int maxConsecutiveInvalid = maxPoolSize + 1; + + while (checkedConsumer == null && remainingTime > 0) { + CheckedConsumer candidate = consumerPool.poll(); + + if (candidate != null) { + if (candidate.isValid(topic)) { + checkedConsumer = candidate; + checkedConsumer.setState(CheckedConsumer.ConsumerState.BORROWED); + consecutiveInvalidCount = 0; + } else { + log.warn("Found invalid consumer in pool, closing and removing it"); + closeAndRemoveConsumer(candidate); + consecutiveInvalidCount++; + } + } - // If no consumer is available, create a new one if we haven't hit the max pool size - if (consumer == null) { - poolManagementLock.lock(); - try { + if (checkedConsumer == null) { + long elapsedTime = System.currentTimeMillis() - startTime; + remainingTime = timeUnit.toMillis(time) - elapsedTime; - if (totalConsumersCreated.get() < maxPoolSize && !shuttingDown) { - consumer = createConsumer(); + boolean canCreateMore = false; + poolManagementLock.lock(); + try { + canCreateMore = totalConsumersCreated.get() < maxPoolSize && !shuttingDown; + } finally { + poolManagementLock.unlock(); + } + + if (canCreateMore) { + if (consecutiveInvalidCount >= maxConsecutiveInvalid) { + log.error( + "Too many consecutive invalid consumers ({}), possible Kafka connectivity issue. " + + "Waiting before retrying.", + consecutiveInvalidCount); + if (remainingTime > 0) { + try { + Thread.sleep(Math.min(1000, remainingTime)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + consecutiveInvalidCount = 0; + } + } else { + CheckedConsumer newCheckedConsumer = createConsumer(); + if (newCheckedConsumer != null && newCheckedConsumer.isValid(topic)) { + checkedConsumer = newCheckedConsumer; + checkedConsumer.setState(CheckedConsumer.ConsumerState.BORROWED); + consecutiveInvalidCount = 0; + } else if (newCheckedConsumer != null) { + log.warn("Newly created consumer is invalid, closing and removing it"); + closeAndRemoveConsumer(newCheckedConsumer); + consecutiveInvalidCount++; + } + } } - } finally { - poolManagementLock.unlock(); - } - // If still null, wait for a consumer to be returned - if (consumer == null) { - consumer = consumerPool.poll(time, timeUnit); + if (checkedConsumer == null && !canCreateMore) { + if (remainingTime > 0) { + candidate = consumerPool.poll(remainingTime, TimeUnit.MILLISECONDS); + if (candidate != null && candidate.isValid(topic)) { + checkedConsumer = candidate; + checkedConsumer.setState(CheckedConsumer.ConsumerState.BORROWED); + consecutiveInvalidCount = 0; + } else if (candidate != null) { + log.warn("Found invalid consumer while waiting, closing and removing it"); + closeAndRemoveConsumer(candidate); + consecutiveInvalidCount++; + } + } + } } } - return consumer; + return checkedConsumer; } - // Return the consumer to the pool after use - public void returnConsumer(KafkaConsumer consumer) { - if (consumer == null) { - return; - } - - // Verify this is actually one of our consumers - boolean isOurConsumer; - activeConsumersLock.lock(); - try { - isOurConsumer = activeConsumers.contains(consumer); - } finally { - activeConsumersLock.unlock(); - } - - if (!isOurConsumer) { - // Not our consumer, don't add to pool + public void returnConsumer(CheckedConsumer checkedConsumer) { + if (checkedConsumer == null) { return; } if (shuttingDown) { - // Pool is shutting down, close the consumer instead of returning it - closeAndRemoveConsumer(consumer); + closeAndRemoveConsumer(checkedConsumer); return; } - // Try to return to pool, if it fails close the consumer - if (!consumerPool.offer(consumer)) { - closeAndRemoveConsumer(consumer); + checkedConsumer.setState(CheckedConsumer.ConsumerState.AVAILABLE); + if (!consumerPool.offer(checkedConsumer)) { + closeAndRemoveConsumer(checkedConsumer); } } - private void closeAndRemoveConsumer(KafkaConsumer consumer) { + private void closeAndRemoveConsumer(CheckedConsumer checkedConsumer) { + checkedConsumer.setState(CheckedConsumer.ConsumerState.CLOSED); + checkedConsumer.close(); + allConsumers.remove(checkedConsumer); + poolManagementLock.lock(); try { - consumer.close(); + totalConsumersCreated.decrementAndGet(); } finally { - activeConsumersLock.lock(); - try { - activeConsumers.remove(consumer); - totalConsumersCreated.decrementAndGet(); - } finally { - activeConsumersLock.unlock(); - } + poolManagementLock.unlock(); } } @@ -142,17 +213,19 @@ public void shutdownPool() { poolManagementLock.unlock(); } - activeConsumersLock.lock(); + Set consumersToClose = new HashSet<>(allConsumers); + allConsumers.clear(); + consumerPool.clear(); + + poolManagementLock.lock(); try { - // Close all consumers (both in pool and borrowed) - for (KafkaConsumer consumer : activeConsumers) { - closeAndRemoveConsumer(consumer); + for (CheckedConsumer consumer : consumersToClose) { + consumer.setState(CheckedConsumer.ConsumerState.CLOSED); + consumer.close(); + totalConsumersCreated.decrementAndGet(); } - activeConsumers.clear(); } finally { - activeConsumersLock.unlock(); + poolManagementLock.unlock(); } - - consumerPool.clear(); } } diff --git a/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java b/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java index 0fa4613d864217..62f10b3f2ea97b 100644 --- a/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java +++ b/metadata-service/events-service/src/test/java/io/datahubproject/event/ExternalEventsServiceTest.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.datahubproject.event.exception.UnsupportedTopicException; +import io.datahubproject.event.kafka.CheckedConsumer; import io.datahubproject.event.kafka.KafkaConsumerPool; import io.datahubproject.event.models.v1.ExternalEvents; import java.time.Duration; @@ -32,11 +33,15 @@ public class ExternalEventsServiceTest { @Mock private ObjectMapper objectMapper; private ExternalEventsService service; private Map topicNames = new HashMap<>(); + private CheckedConsumer checkedConsumer; @BeforeMethod public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - when(consumerPool.borrowConsumer(anyLong(), any(TimeUnit.class))).thenReturn(kafkaConsumer); + checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + when(consumerPool.borrowConsumer(anyLong(), any(TimeUnit.class), anyString())) + .thenReturn(checkedConsumer); topicNames.put(ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME, "CustomerSpecificTopicName"); topicNames.put( ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, diff --git a/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/CheckedConsumerTest.java b/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/CheckedConsumerTest.java new file mode 100644 index 00000000000000..5d4f5743dda943 --- /dev/null +++ b/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/CheckedConsumerTest.java @@ -0,0 +1,175 @@ +package io.datahubproject.event.kafka; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.metadata.utils.metrics.MetricUtils; +import java.time.Duration; +import java.util.Collections; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class CheckedConsumerTest { + + @Mock private KafkaConsumer kafkaConsumer; + @Mock private MetricUtils metricUtils; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testInitialState() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + assertEquals( + checkedConsumer.getState(), + CheckedConsumer.ConsumerState.AVAILABLE, + "Consumer should start in AVAILABLE state"); + } + + @Test + public void testStateTransitions() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + checkedConsumer.setState(CheckedConsumer.ConsumerState.BORROWED); + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.BORROWED); + + checkedConsumer.setState(CheckedConsumer.ConsumerState.AVAILABLE); + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.AVAILABLE); + + checkedConsumer.setState(CheckedConsumer.ConsumerState.CLOSED); + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.CLOSED); + } + + @Test + public void testValidationCache() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMillis(100), null); + + when(kafkaConsumer.assignment()).thenReturn(Collections.emptySet()); + when(kafkaConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + + assertTrue(checkedConsumer.isValid("test-topic")); + verify(kafkaConsumer, times(1)).partitionsFor(anyString(), any(Duration.class)); + + assertTrue(checkedConsumer.isValid("test-topic")); + verify(kafkaConsumer, times(1)).partitionsFor(anyString(), any(Duration.class)); + } + + @Test + public void testValidationCacheExpiration() throws InterruptedException { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMillis(50), null); + + when(kafkaConsumer.assignment()).thenReturn(Collections.emptySet()); + when(kafkaConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + + assertTrue(checkedConsumer.isValid("test-topic")); + verify(kafkaConsumer, times(1)).partitionsFor(anyString(), any(Duration.class)); + + Thread.sleep(60); + + assertTrue(checkedConsumer.isValid("test-topic")); + verify(kafkaConsumer, times(2)).partitionsFor(anyString(), any(Duration.class)); + } + + @Test + public void testValidationFailsForInvalidConsumer() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + when(kafkaConsumer.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + + assertFalse(checkedConsumer.isValid("test-topic")); + } + + @Test + public void testValidationRecordsMetricsForInvalidConsumer() { + CheckedConsumer checkedConsumer = + new CheckedConsumer( + kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), metricUtils); + + when(kafkaConsumer.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + + assertFalse(checkedConsumer.isValid("test-topic")); + verify(metricUtils, times(1)) + .increment(eq(CheckedConsumer.class), eq("invalid_consumer_found"), eq(1.0)); + } + + @Test + public void testValidationDoesNotRecordMetricsForValidConsumer() { + CheckedConsumer checkedConsumer = + new CheckedConsumer( + kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), metricUtils); + + when(kafkaConsumer.assignment()).thenReturn(Collections.emptySet()); + when(kafkaConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + + assertTrue(checkedConsumer.isValid("test-topic")); + verify(metricUtils, never()) + .increment(eq(CheckedConsumer.class), eq("invalid_consumer_found"), anyDouble()); + } + + @Test + public void testKafkaExceptionTreatedAsValid() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + when(kafkaConsumer.assignment()).thenReturn(Collections.emptySet()); + when(kafkaConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenThrow(new KafkaException("Transient error")); + + assertTrue(checkedConsumer.isValid("test-topic"), "KafkaException should be treated as valid"); + } + + @Test + public void testValidationWithNullTopic() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + assertFalse(checkedConsumer.isValid(null)); + assertFalse(checkedConsumer.isValid("")); + } + + @Test + public void testClose() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + checkedConsumer.close(); + + verify(kafkaConsumer, times(1)).close(); + } + + @Test + public void testCloseHandlesException() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + doThrow(new RuntimeException("Close failed")).when(kafkaConsumer).close(); + + checkedConsumer.close(); + + verify(kafkaConsumer, times(1)).close(); + } + + @Test + public void testGetConsumer() { + CheckedConsumer checkedConsumer = + new CheckedConsumer(kafkaConsumer, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + assertSame(checkedConsumer.getConsumer(), kafkaConsumer); + } +} diff --git a/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/KafkaConsumerPoolTest.java b/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/KafkaConsumerPoolTest.java index b652e9aa1f3342..6c7e5dad4eaf2f 100644 --- a/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/KafkaConsumerPoolTest.java +++ b/metadata-service/events-service/src/test/java/io/datahubproject/event/kafka/KafkaConsumerPoolTest.java @@ -3,9 +3,13 @@ import static org.mockito.Mockito.*; import static org.testng.Assert.*; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import java.time.Duration; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.kafka.core.ConsumerFactory; @@ -25,9 +29,13 @@ public class KafkaConsumerPoolTest { public void setUp() { MockitoAnnotations.initMocks(this); // Initialize mocks when(consumerFactory.createConsumer()).thenReturn(kafkaConsumer); + when(kafkaConsumer.assignment()).thenReturn(Collections.emptySet()); + when(kafkaConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); - // Manually instantiate KafkaConsumerPool after mocks are initialized - kafkaConsumerPool = new KafkaConsumerPool(consumerFactory, 2, 5); + kafkaConsumerPool = + new KafkaConsumerPool( + consumerFactory, 2, 5, Duration.ofSeconds(2), Duration.ofMinutes(5), null); } @Test @@ -39,84 +47,85 @@ public void testPoolInitialization() { @Test public void testBorrowConsumerWhenAvailable() throws InterruptedException { // Setup initial state - KafkaConsumer consumer = - kafkaConsumerPool.borrowConsumer(1000, TimeUnit.MILLISECONDS); + CheckedConsumer checkedConsumer = + kafkaConsumerPool.borrowConsumer(1000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); // Assertions - assertNotNull(consumer, "Consumer should not be null when borrowed"); + assertNotNull(checkedConsumer, "Consumer should not be null when borrowed"); verify(consumerFactory, times(2)).createConsumer(); // Initial + this borrow - kafkaConsumerPool.returnConsumer(consumer); + kafkaConsumerPool.returnConsumer(checkedConsumer); } @Test public void testBorrowConsumerReturnsNullAfterTimeout() throws InterruptedException { // First, exhaust the pool by borrowing all initial consumers - KafkaConsumer kafkaConsumer1 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer2 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer3 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer4 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer5 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); + CheckedConsumer checkedConsumer1 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer2 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer3 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer4 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer5 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); // Now pool is empty and at max size, borrowing should timeout and return null long startTime = System.currentTimeMillis(); - KafkaConsumer consumer = - kafkaConsumerPool.borrowConsumer(500, TimeUnit.MILLISECONDS); + CheckedConsumer consumer = + kafkaConsumerPool.borrowConsumer(500, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); long elapsedTime = System.currentTimeMillis() - startTime; assertNull(consumer, "Consumer should be null after timeout when pool is exhausted"); assertTrue(elapsedTime >= 500, "Should wait at least the timeout duration"); assertTrue(elapsedTime < 1000, "Should not wait significantly longer than timeout"); - kafkaConsumerPool.returnConsumer(kafkaConsumer1); - kafkaConsumerPool.returnConsumer(kafkaConsumer2); - kafkaConsumerPool.returnConsumer(kafkaConsumer3); - kafkaConsumerPool.returnConsumer(kafkaConsumer4); - kafkaConsumerPool.returnConsumer(kafkaConsumer5); + kafkaConsumerPool.returnConsumer(checkedConsumer1); + kafkaConsumerPool.returnConsumer(checkedConsumer2); + kafkaConsumerPool.returnConsumer(checkedConsumer3); + kafkaConsumerPool.returnConsumer(checkedConsumer4); + kafkaConsumerPool.returnConsumer(checkedConsumer5); } @Test public void testBorrowConsumerWithZeroTimeout() throws InterruptedException { // Exhaust the pool - KafkaConsumer kafkaConsumer1 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer2 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer3 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer4 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - KafkaConsumer kafkaConsumer5 = - kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS); + CheckedConsumer checkedConsumer1 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer2 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer3 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer4 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer checkedConsumer5 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); // Try to borrow with 0 timeout - should return null immediately long startTime = System.currentTimeMillis(); - KafkaConsumer consumer = - kafkaConsumerPool.borrowConsumer(0, TimeUnit.MILLISECONDS); + CheckedConsumer consumer = + kafkaConsumerPool.borrowConsumer(0, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); long elapsedTime = System.currentTimeMillis() - startTime; assertNull(consumer, "Consumer should be null immediately with 0 timeout"); assertTrue(elapsedTime < 100, "Should return almost immediately"); - kafkaConsumerPool.returnConsumer(kafkaConsumer1); - kafkaConsumerPool.returnConsumer(kafkaConsumer2); - kafkaConsumerPool.returnConsumer(kafkaConsumer3); - kafkaConsumerPool.returnConsumer(kafkaConsumer4); - kafkaConsumerPool.returnConsumer(kafkaConsumer5); + kafkaConsumerPool.returnConsumer(checkedConsumer1); + kafkaConsumerPool.returnConsumer(checkedConsumer2); + kafkaConsumerPool.returnConsumer(checkedConsumer3); + kafkaConsumerPool.returnConsumer(checkedConsumer4); + kafkaConsumerPool.returnConsumer(checkedConsumer5); } @Test public void testBorrowConsumerSucceedsWhenConsumerReturnedDuringWait() throws InterruptedException { - // Create a limited pool - KafkaConsumerPool limitedPool = new KafkaConsumerPool(consumerFactory, 1, 1); + KafkaConsumerPool limitedPool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); // Borrow the only consumer - KafkaConsumer consumer1 = - limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - assertNotNull(consumer1); + CheckedConsumer checkedConsumer1 = + limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer1); // Create a thread that will return the consumer after a short delay Thread returnThread = @@ -124,7 +133,7 @@ public void testBorrowConsumerSucceedsWhenConsumerReturnedDuringWait() () -> { try { Thread.sleep(200); // Wait 200ms before returning - limitedPool.returnConsumer(consumer1); + limitedPool.returnConsumer(checkedConsumer1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -133,12 +142,13 @@ public void testBorrowConsumerSucceedsWhenConsumerReturnedDuringWait() // Try to borrow - should wait and succeed when consumer is returned long startTime = System.currentTimeMillis(); - KafkaConsumer consumer2 = - limitedPool.borrowConsumer(1000, TimeUnit.MILLISECONDS); + CheckedConsumer checkedConsumer2 = + limitedPool.borrowConsumer(1000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); long elapsedTime = System.currentTimeMillis() - startTime; - assertNotNull(consumer2, "Should receive consumer when it's returned during wait"); - assertSame(consumer2, consumer1, "Should receive the same consumer that was returned"); + assertNotNull(checkedConsumer2, "Should receive consumer when it's returned during wait"); + assertSame( + checkedConsumer2, checkedConsumer1, "Should receive the same consumer that was returned"); assertTrue(elapsedTime >= 200, "Should wait until consumer is returned"); assertTrue(elapsedTime < 1000, "Should not wait for full timeout"); @@ -149,13 +159,14 @@ public void testBorrowConsumerSucceedsWhenConsumerReturnedDuringWait() @Test public void testBorrowConsumerReturnsNullWhenConsumerNotReturnedInTime() throws InterruptedException { - // Create a limited pool - KafkaConsumerPool limitedPool = new KafkaConsumerPool(consumerFactory, 1, 1); + KafkaConsumerPool limitedPool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); // Borrow the only consumer - KafkaConsumer consumer1 = - limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - assertNotNull(consumer1); + CheckedConsumer checkedConsumer1 = + limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer1); // Create a thread that will return the consumer after timeout expires Thread returnThread = @@ -163,7 +174,7 @@ public void testBorrowConsumerReturnsNullWhenConsumerNotReturnedInTime() () -> { try { Thread.sleep(800); // Wait longer than the timeout - limitedPool.returnConsumer(consumer1); + limitedPool.returnConsumer(checkedConsumer1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -172,11 +183,11 @@ public void testBorrowConsumerReturnsNullWhenConsumerNotReturnedInTime() // Try to borrow with shorter timeout - should return null long startTime = System.currentTimeMillis(); - KafkaConsumer consumer2 = - limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS); + CheckedConsumer checkedConsumer2 = + limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); long elapsedTime = System.currentTimeMillis() - startTime; - assertNull(consumer2, "Should return null when consumer not returned within timeout"); + assertNull(checkedConsumer2, "Should return null when consumer not returned within timeout"); assertTrue(elapsedTime >= 300, "Should wait for the full timeout"); assertTrue(elapsedTime < 600, "Should not wait significantly longer"); @@ -186,20 +197,20 @@ public void testBorrowConsumerReturnsNullWhenConsumerNotReturnedInTime() @Test public void testMultipleConcurrentBorrowsWithTimeout() throws InterruptedException { - // Create a pool with limited size - KafkaConsumerPool limitedPool = new KafkaConsumerPool(consumerFactory, 2, 2); + KafkaConsumerPool limitedPool = + new KafkaConsumerPool( + consumerFactory, 2, 2, Duration.ofSeconds(2), Duration.ofMinutes(5), null); - // Borrow all consumers - limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS); - limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS); + limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + limitedPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); // Try to borrow from multiple threads Thread thread1 = new Thread( () -> { try { - KafkaConsumer consumer = - limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS); + CheckedConsumer consumer = + limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); assertNull(consumer, "Thread 1 should timeout"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -210,8 +221,8 @@ public void testMultipleConcurrentBorrowsWithTimeout() throws InterruptedExcepti new Thread( () -> { try { - KafkaConsumer consumer = - limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS); + CheckedConsumer consumer = + limitedPool.borrowConsumer(300, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); assertNull(consumer, "Thread 2 should timeout"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -225,6 +236,478 @@ public void testMultipleConcurrentBorrowsWithTimeout() throws InterruptedExcepti limitedPool.shutdownPool(); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBorrowConsumerWithNullTopic() throws InterruptedException { + kafkaConsumerPool.borrowConsumer(1000, TimeUnit.MILLISECONDS, null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testBorrowConsumerWithEmptyTopic() throws InterruptedException { + kafkaConsumerPool.borrowConsumer(1000, TimeUnit.MILLISECONDS, ""); + } + + @Test + public void testInvalidConsumerRemovedFromPool() throws InterruptedException { + KafkaConsumer invalidConsumer = mock(KafkaConsumer.class); + when(invalidConsumer.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(consumerFactory.createConsumer()).thenReturn(invalidConsumer); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer = pool.borrowConsumer(500, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNull(consumer, "Should return null when all consumers are invalid"); + verify(invalidConsumer, atLeastOnce()).close(); + } + + @Test + public void testNewlyCreatedInvalidConsumerRemoved() throws InterruptedException { + // Reset the mock factory to clear any setup from @BeforeMethod + reset(consumerFactory); + + KafkaConsumer invalidConsumer = mock(KafkaConsumer.class); + when(invalidConsumer.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(consumerFactory.createConsumer()).thenReturn(invalidConsumer); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer = pool.borrowConsumer(1000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNull(consumer, "Should return null when newly created consumer is invalid"); + // With maxPoolSize=1, after closing an invalid consumer, totalConsumersCreated becomes 0, + // so we can create another one. This continues until timeout, so multiple consumers may be + // created. + verify(invalidConsumer, atLeastOnce()).close(); + } + + @Test + public void testConsecutiveInvalidConsumerCircuitBreaker() throws InterruptedException { + KafkaConsumer invalidConsumer1 = mock(KafkaConsumer.class); + KafkaConsumer invalidConsumer2 = mock(KafkaConsumer.class); + KafkaConsumer invalidConsumer3 = mock(KafkaConsumer.class); + KafkaConsumer invalidConsumer4 = mock(KafkaConsumer.class); + + when(invalidConsumer1.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(invalidConsumer2.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(invalidConsumer3.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(invalidConsumer4.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + + when(consumerFactory.createConsumer()) + .thenReturn(invalidConsumer1) + .thenReturn(invalidConsumer2) + .thenReturn(invalidConsumer3) + .thenReturn(invalidConsumer4); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 2, 2, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + long startTime = System.currentTimeMillis(); + CheckedConsumer consumer = pool.borrowConsumer(2000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + long elapsedTime = System.currentTimeMillis() - startTime; + + assertNull(consumer, "Should return null when all consumers are invalid"); + assertTrue( + elapsedTime >= 1000, + "Should wait at least 1 second due to circuit breaker after maxConsecutiveInvalid (poolSize + 1 = 3)"); + verify(invalidConsumer1, atLeastOnce()).close(); + verify(invalidConsumer2, atLeastOnce()).close(); + verify(invalidConsumer3, atLeastOnce()).close(); + } + + @Test + public void testMetricsRecordedForInvalidConsumer() throws InterruptedException { + MetricUtils metricUtils = mock(MetricUtils.class); + KafkaConsumer invalidConsumer = mock(KafkaConsumer.class); + when(invalidConsumer.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(consumerFactory.createConsumer()).thenReturn(invalidConsumer); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), metricUtils); + + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + verify(metricUtils, atLeastOnce()) + .increment(eq(CheckedConsumer.class), eq("invalid_consumer_found"), eq(1.0)); + } + + @Test + public void testValidConsumerAfterInvalidOnes() throws InterruptedException { + // Reset the mock factory to clear any setup from @BeforeMethod + reset(consumerFactory); + + KafkaConsumer invalidConsumer1 = mock(KafkaConsumer.class); + KafkaConsumer invalidConsumer2 = mock(KafkaConsumer.class); + KafkaConsumer validConsumer = mock(KafkaConsumer.class); + when(invalidConsumer1.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(invalidConsumer2.assignment()).thenThrow(new IllegalStateException("Consumer is closed")); + when(validConsumer.assignment()).thenReturn(Collections.emptySet()); + when(validConsumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + + when(consumerFactory.createConsumer()) + .thenReturn(invalidConsumer1) + .thenReturn(invalidConsumer2) + .thenReturn(validConsumer); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 3, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer checkedConsumer = + pool.borrowConsumer(5000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + // Verify the factory was called the expected number of times + verify(consumerFactory, times(3)).createConsumer(); + + assertNotNull(checkedConsumer, "Should eventually get a valid consumer"); + assertEquals(checkedConsumer.getConsumer(), validConsumer, "Should return the valid consumer"); + verify(invalidConsumer1, times(1)).close(); + verify(invalidConsumer2, times(1)).close(); + verify(validConsumer, never()).close(); + } + + @Test + public void testKafkaExceptionTreatedAsValid() throws InterruptedException { + KafkaConsumer consumerWithKafkaException = mock(KafkaConsumer.class); + when(consumerWithKafkaException.assignment()).thenReturn(Collections.emptySet()); + when(consumerWithKafkaException.partitionsFor(anyString(), any(Duration.class))) + .thenThrow(new KafkaException("Transient error")); + + when(consumerFactory.createConsumer()).thenReturn(consumerWithKafkaException); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer = pool.borrowConsumer(1000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNotNull(consumer, "Should treat KafkaException as valid (transient)"); + verify(consumerWithKafkaException, never()).close(); + } + + @Test + public void testConsumerStateTransitions() throws InterruptedException { + CheckedConsumer checkedConsumer = + kafkaConsumerPool.borrowConsumer(1000, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer); + assertEquals( + checkedConsumer.getState(), + CheckedConsumer.ConsumerState.BORROWED, + "Consumer should be in BORROWED state after borrowing"); + + kafkaConsumerPool.returnConsumer(checkedConsumer); + assertEquals( + checkedConsumer.getState(), + CheckedConsumer.ConsumerState.AVAILABLE, + "Consumer should be in AVAILABLE state after returning"); + } + + @Test + public void testShutdownClosesAllConsumersIncludingBorrowed() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 2, 3, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer borrowed1 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer borrowed2 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer borrowed3 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNotNull(borrowed1); + assertNotNull(borrowed2); + assertNotNull(borrowed3); + + assertEquals(borrowed1.getState(), CheckedConsumer.ConsumerState.BORROWED); + assertEquals(borrowed2.getState(), CheckedConsumer.ConsumerState.BORROWED); + assertEquals(borrowed3.getState(), CheckedConsumer.ConsumerState.BORROWED); + + pool.shutdownPool(); + + assertEquals(borrowed1.getState(), CheckedConsumer.ConsumerState.CLOSED); + assertEquals(borrowed2.getState(), CheckedConsumer.ConsumerState.CLOSED); + assertEquals(borrowed3.getState(), CheckedConsumer.ConsumerState.CLOSED); + + verify(borrowed1.getConsumer(), atLeastOnce()).close(); + verify(borrowed2.getConsumer(), atLeastOnce()).close(); + verify(borrowed3.getConsumer(), atLeastOnce()).close(); + } + + @Test + public void testReturnConsumerWithNull() { + kafkaConsumerPool.returnConsumer(null); + verify(kafkaConsumer, never()).close(); + } + + @Test + public void testReturnConsumerWhenShuttingDown() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer checkedConsumer = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer); + + pool.shutdownPool(); + + pool.returnConsumer(checkedConsumer); + + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.CLOSED); + verify(checkedConsumer.getConsumer(), atLeastOnce()).close(); + } + + @Test + public void testReturnConsumerWhenQueueFull() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 2, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer1 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer2 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + pool.returnConsumer(consumer1); + pool.returnConsumer(consumer2); + + CheckedConsumer consumer3 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(consumer3); + + pool.returnConsumer(consumer1); + pool.returnConsumer(consumer2); + + pool.returnConsumer(consumer3); + assertEquals(consumer3.getState(), CheckedConsumer.ConsumerState.CLOSED); + verify(consumer3.getConsumer(), atLeastOnce()).close(); + } + + @Test + public void testValidationCache() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(1), null); + + CheckedConsumer checkedConsumer = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer); + + checkedConsumer.isValid("PlatformEvent_v1"); + verify(checkedConsumer.getConsumer(), times(1)).assignment(); + verify(checkedConsumer.getConsumer(), times(1)).partitionsFor(anyString(), any(Duration.class)); + + checkedConsumer.isValid("PlatformEvent_v1"); + verify(checkedConsumer.getConsumer(), times(1)).assignment(); + verify(checkedConsumer.getConsumer(), times(1)).partitionsFor(anyString(), any(Duration.class)); + + pool.returnConsumer(checkedConsumer); + pool.shutdownPool(); + } + + @Test + public void testValidationCacheExpiration() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMillis(100), null); + + CheckedConsumer checkedConsumer = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer); + + checkedConsumer.isValid("PlatformEvent_v1"); + verify(checkedConsumer.getConsumer(), times(1)).partitionsFor(anyString(), any(Duration.class)); + + Thread.sleep(150); + + checkedConsumer.isValid("PlatformEvent_v1"); + verify(checkedConsumer.getConsumer(), times(2)).partitionsFor(anyString(), any(Duration.class)); + + pool.returnConsumer(checkedConsumer); + pool.shutdownPool(); + } + + @Test + public void testBorrowConsumerReturnsNullWhenShuttingDown() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + pool.shutdownPool(); + + CheckedConsumer consumer = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNull(consumer, "Should return null when pool is shutting down"); + } + + @Test + public void testMultipleReturnsOfSameConsumer() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 1, 1, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer checkedConsumer = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(checkedConsumer); + + pool.returnConsumer(checkedConsumer); + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.AVAILABLE); + + CheckedConsumer checkedConsumer2 = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertSame(checkedConsumer2, checkedConsumer, "Should get the same consumer back"); + + pool.returnConsumer(checkedConsumer); + assertEquals(checkedConsumer.getState(), CheckedConsumer.ConsumerState.AVAILABLE); + + pool.shutdownPool(); + } + + @Test + public void testTotalConsumersCreatedTracking() throws InterruptedException { + KafkaConsumer consumer1Mock = mock(KafkaConsumer.class); + KafkaConsumer consumer2Mock = mock(KafkaConsumer.class); + when(consumer1Mock.assignment()).thenReturn(Collections.emptySet()); + when(consumer1Mock.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + when(consumer2Mock.assignment()).thenReturn(Collections.emptySet()); + when(consumer2Mock.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + + reset(consumerFactory); + when(consumerFactory.createConsumer()).thenReturn(consumer1Mock).thenReturn(consumer2Mock); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 3, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + assertEquals(pool.getTotalConsumersCreated().get(), 0); + + CheckedConsumer checkedConsumer1 = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertEquals(pool.getTotalConsumersCreated().get(), 1); + + CheckedConsumer checkedConsumer2 = + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertEquals(pool.getTotalConsumersCreated().get(), 2); + + pool.returnConsumer(checkedConsumer1); + assertEquals(pool.getTotalConsumersCreated().get(), 2, "Count should not change on return"); + + pool.shutdownPool(); + assertEquals(pool.getTotalConsumersCreated().get(), 0, "Count should be 0 after shutdown"); + } + + @Test + public void testCreateConsumerRespectsMaxPoolSize() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 2, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer1 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer2 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer3 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNotNull(consumer1); + assertNotNull(consumer2); + assertNull(consumer3, "Should not create more than maxPoolSize consumers"); + + assertEquals(pool.getTotalConsumersCreated().get(), 2); + + pool.returnConsumer(consumer1); + pool.returnConsumer(consumer2); + pool.shutdownPool(); + } + + @Test + public void testCreateConsumerFailsGracefully() throws InterruptedException { + reset(consumerFactory); + when(consumerFactory.createConsumer()) + .thenThrow(new RuntimeException("Kafka connection failed")); + + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 2, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + try { + pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + fail("Should throw exception when consumer creation fails"); + } catch (Exception e) { + assertTrue(e instanceof RuntimeException); + } + + assertEquals(pool.getTotalConsumersCreated().get(), 0, "Count should not increment on failure"); + } + + @Test + public void testRegistryTracksAllConsumers() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 0, 3, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer1 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer2 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer3 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNotNull(consumer1); + assertNotNull(consumer2); + assertNotNull(consumer3); + + assertEquals(pool.getTotalConsumersCreated().get(), 3); + + pool.returnConsumer(consumer1); + pool.returnConsumer(consumer2); + + assertEquals(pool.getTotalConsumersCreated().get(), 3, "Count should not change on return"); + + pool.shutdownPool(); + + assertEquals(consumer1.getState(), CheckedConsumer.ConsumerState.CLOSED); + assertEquals(consumer2.getState(), CheckedConsumer.ConsumerState.CLOSED); + assertEquals(consumer3.getState(), CheckedConsumer.ConsumerState.CLOSED); + + verify(consumer1.getConsumer(), atLeastOnce()).close(); + verify(consumer2.getConsumer(), atLeastOnce()).close(); + verify(consumer3.getConsumer(), atLeastOnce()).close(); + + assertEquals(pool.getTotalConsumersCreated().get(), 0, "Count should be 0 after shutdown"); + } + + @Test + public void testConcurrentBorrowAndReturn() throws InterruptedException { + KafkaConsumerPool pool = + new KafkaConsumerPool( + consumerFactory, 2, 3, Duration.ofSeconds(2), Duration.ofMinutes(5), null); + + CheckedConsumer consumer1 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer2 = pool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + Thread returnThread = + new Thread( + () -> { + try { + Thread.sleep(50); + pool.returnConsumer(consumer1); + pool.returnConsumer(consumer2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + returnThread.start(); + + CheckedConsumer consumer3 = pool.borrowConsumer(200, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + assertNotNull(consumer3); + + returnThread.join(); + + assertEquals(consumer1.getState(), CheckedConsumer.ConsumerState.AVAILABLE); + assertEquals(consumer2.getState(), CheckedConsumer.ConsumerState.AVAILABLE); + + pool.returnConsumer(consumer3); + pool.shutdownPool(); + } + @AfterClass public void testShutdownPool() { // Call shutdown diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactory.java index d337bf32401e0b..031a1c7b8d9010 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactory.java @@ -1,7 +1,10 @@ package com.linkedin.gms.factory.event; +import com.linkedin.metadata.utils.metrics.MetricUtils; import io.datahubproject.event.kafka.KafkaConsumerPool; +import java.time.Duration; import org.apache.avro.generic.GenericRecord; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -18,10 +21,25 @@ public class KafkaConsumerPoolFactory { @Value("${kafka.consumerPool.maxSize:10}") private int maxPoolSize; + @Value("${kafka.consumerPool.validationTimeoutSeconds}") + private int validationTimeoutSeconds; + + @Value("${kafka.consumerPool.validationCacheIntervalMinutes:5}") + private int validationCacheIntervalMinutes; + + @Autowired(required = false) + private MetricUtils metricUtils; + @Bean public KafkaConsumerPool kafkaConsumerPool( @Qualifier("kafkaConsumerPoolConsumerFactory") DefaultKafkaConsumerFactory consumerFactory) { - return new KafkaConsumerPool(consumerFactory, initialPoolSize, maxPoolSize); + return new KafkaConsumerPool( + consumerFactory, + initialPoolSize, + maxPoolSize, + Duration.ofSeconds(validationTimeoutSeconds), + Duration.ofMinutes(validationCacheIntervalMinutes), + metricUtils); } } diff --git a/metadata-service/factories/src/test/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactoryTest.java b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactoryTest.java new file mode 100644 index 00000000000000..a4d2b4fff60764 --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/gms/factory/event/KafkaConsumerPoolFactoryTest.java @@ -0,0 +1,236 @@ +package com.linkedin.gms.factory.event; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import com.linkedin.metadata.utils.metrics.MetricUtils; +import io.datahubproject.event.kafka.CheckedConsumer; +import io.datahubproject.event.kafka.KafkaConsumerPool; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +@SpringBootTest( + classes = { + KafkaConsumerPoolFactory.class, + KafkaConsumerPoolFactoryTest.TestConfiguration.class + }) +@TestPropertySource( + properties = { + "eventsApi.enabled=true", + "kafka.consumerPool.initialSize=3", + "kafka.consumerPool.maxSize=5", + "kafka.consumerPool.validationTimeoutSeconds=10", + "kafka.consumerPool.validationCacheIntervalMinutes=2" + }) +public class KafkaConsumerPoolFactoryTest extends AbstractTestNGSpringContextTests { + + @MockitoBean private MetricUtils mockMetricUtils; + + @Autowired + @Qualifier("kafkaConsumerPool") + private KafkaConsumerPool kafkaConsumerPool; + + @Test + public void testKafkaConsumerPoolBeanCreation() { + assertNotNull(kafkaConsumerPool, "KafkaConsumerPool bean should be created"); + } + + @Test + public void testKafkaConsumerPoolInitializationWithCorrectInitialSize() { + assertNotNull(kafkaConsumerPool, "KafkaConsumerPool should be created"); + assertEquals( + kafkaConsumerPool.getTotalConsumersCreated().get(), + 3, + "Should create 3 initial consumers (initialSize=3)"); + } + + @Test + public void testKafkaConsumerPoolRespectsMaxPoolSize() throws Exception { + CheckedConsumer consumer1 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer2 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer3 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer4 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer5 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + CheckedConsumer consumer6 = + kafkaConsumerPool.borrowConsumer(100, TimeUnit.MILLISECONDS, "PlatformEvent_v1"); + + assertNotNull(consumer1, "Should be able to borrow consumer 1"); + assertNotNull(consumer2, "Should be able to borrow consumer 2"); + assertNotNull(consumer3, "Should be able to borrow consumer 3"); + assertNotNull(consumer4, "Should be able to borrow consumer 4"); + assertNotNull(consumer5, "Should be able to borrow consumer 5"); + assertNull(consumer6, "Should not be able to borrow more than maxPoolSize (5) consumers"); + + assertEquals( + kafkaConsumerPool.getTotalConsumersCreated().get(), + 5, + "Should not create more than maxPoolSize (5) consumers"); + + if (consumer1 != null) kafkaConsumerPool.returnConsumer(consumer1); + if (consumer2 != null) kafkaConsumerPool.returnConsumer(consumer2); + if (consumer3 != null) kafkaConsumerPool.returnConsumer(consumer3); + if (consumer4 != null) kafkaConsumerPool.returnConsumer(consumer4); + if (consumer5 != null) kafkaConsumerPool.returnConsumer(consumer5); + } + + @Test + public void testValidationTimeoutIsSetCorrectly() throws Exception { + Field validationTimeoutField = KafkaConsumerPool.class.getDeclaredField("validationTimeout"); + validationTimeoutField.setAccessible(true); + Duration validationTimeout = (Duration) validationTimeoutField.get(kafkaConsumerPool); + + assertEquals( + validationTimeout, + Duration.ofSeconds(10), + "Validation timeout should be set to 10 seconds"); + } + + @Test + public void testValidationCacheIntervalIsSetCorrectly() throws Exception { + Field validationCacheIntervalField = + KafkaConsumerPool.class.getDeclaredField("validationCacheInterval"); + validationCacheIntervalField.setAccessible(true); + Duration validationCacheInterval = + (Duration) validationCacheIntervalField.get(kafkaConsumerPool); + + assertEquals( + validationCacheInterval, + Duration.ofMinutes(2), + "Validation cache interval should be set to 2 minutes"); + } + + @Test + public void testMetricUtilsIsInjected() throws Exception { + Field metricUtilsField = KafkaConsumerPool.class.getDeclaredField("metricUtils"); + metricUtilsField.setAccessible(true); + MetricUtils injectedMetricUtils = (MetricUtils) metricUtilsField.get(kafkaConsumerPool); + + assertNotNull(injectedMetricUtils, "MetricUtils should be injected when provided"); + assertEquals( + injectedMetricUtils, mockMetricUtils, "Injected MetricUtils should match the mock"); + } + + @Configuration + static class TestConfiguration { + @Bean(name = "kafkaConsumerPoolConsumerFactory") + public DefaultKafkaConsumerFactory testConsumerFactory() { + DefaultKafkaConsumerFactory factory = + mock(DefaultKafkaConsumerFactory.class); + KafkaConsumer consumer = mock(KafkaConsumer.class); + when(consumer.assignment()).thenReturn(Collections.emptySet()); + when(consumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + when(factory.createConsumer()).thenReturn(consumer); + return factory; + } + } +} + +@SpringBootTest( + classes = { + KafkaConsumerPoolFactory.class, + KafkaConsumerPoolFactoryWithoutMetricUtilsTest.TestConfiguration.class + }) +@TestPropertySource( + properties = { + "eventsApi.enabled=true", + "kafka.consumerPool.initialSize=2", + "kafka.consumerPool.maxSize=4", + "kafka.consumerPool.validationTimeoutSeconds=15", + "kafka.consumerPool.validationCacheIntervalMinutes=3" + }) +class KafkaConsumerPoolFactoryWithoutMetricUtilsTest extends AbstractTestNGSpringContextTests { + + @Autowired + @Qualifier("kafkaConsumerPool") + private KafkaConsumerPool kafkaConsumerPool; + + @Test + public void testKafkaConsumerPoolBeanCreationWithoutMetricUtils() { + assertNotNull( + kafkaConsumerPool, "KafkaConsumerPool bean should be created without MetricUtils"); + } + + @Test + public void testKafkaConsumerPoolInitializationWithDifferentProperties() { + assertNotNull(kafkaConsumerPool, "KafkaConsumerPool should be created"); + assertEquals( + kafkaConsumerPool.getTotalConsumersCreated().get(), + 2, + "Should create 2 initial consumers (initialSize=2)"); + } + + @Test + public void testValidationTimeoutWithDifferentValue() throws Exception { + Field validationTimeoutField = KafkaConsumerPool.class.getDeclaredField("validationTimeout"); + validationTimeoutField.setAccessible(true); + Duration validationTimeout = (Duration) validationTimeoutField.get(kafkaConsumerPool); + + assertEquals( + validationTimeout, + Duration.ofSeconds(15), + "Validation timeout should be set to 15 seconds"); + } + + @Test + public void testValidationCacheIntervalWithDifferentValue() throws Exception { + Field validationCacheIntervalField = + KafkaConsumerPool.class.getDeclaredField("validationCacheInterval"); + validationCacheIntervalField.setAccessible(true); + Duration validationCacheInterval = + (Duration) validationCacheIntervalField.get(kafkaConsumerPool); + + assertEquals( + validationCacheInterval, + Duration.ofMinutes(3), + "Validation cache interval should be set to 3 minutes"); + } + + @Test + public void testMetricUtilsIsNullWhenNotProvided() throws Exception { + Field metricUtilsField = KafkaConsumerPool.class.getDeclaredField("metricUtils"); + metricUtilsField.setAccessible(true); + Object injectedMetricUtils = metricUtilsField.get(kafkaConsumerPool); + + assertNull(injectedMetricUtils, "MetricUtils should be null when not provided"); + } + + @Configuration + static class TestConfiguration { + @Bean(name = "kafkaConsumerPoolConsumerFactory") + public DefaultKafkaConsumerFactory testConsumerFactory() { + DefaultKafkaConsumerFactory factory = + mock(DefaultKafkaConsumerFactory.class); + KafkaConsumer consumer = mock(KafkaConsumer.class); + when(consumer.assignment()).thenReturn(Collections.emptySet()); + when(consumer.partitionsFor(anyString(), any(Duration.class))) + .thenReturn(Collections.emptyList()); + when(factory.createConsumer()).thenReturn(consumer); + return factory; + } + } +}