From 45d76e778564fb6fadabdb8546a42cb734422982 Mon Sep 17 00:00:00 2001 From: Dima <4478955@gmail.com> Date: Sun, 15 Jun 2025 15:13:29 +0100 Subject: [PATCH 1/3] consumers hello world and tests added --- .../examples/KafkaConsumerExample.java | 175 ++++++++++++++++++ ...fkaConsumerInterceptorIntegrationTest.java | 157 ++++++++++++++++ .../agent/KafkaConsumerInterceptorTest.java | 155 ++++++++++++++++ 3 files changed, 487 insertions(+) create mode 100644 examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java create mode 100644 superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorIntegrationTest.java create mode 100644 superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java new file mode 100644 index 0000000..26f9050 --- /dev/null +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java @@ -0,0 +1,175 @@ +package ai.superstream.examples; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +/** + * Example application that uses the Kafka Clients API to consume messages. + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar + * -Dlogback.configurationFile=logback.xml -jar + * kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * Prerequisites: + * 1. A Kafka server with the following topics: + * - superstream.metadata_v1 - with a configuration message + * - superstream.clients - for client reports + * - example-topic - for test messages (created by KafkaProducerExample) + * + * Environment variables: + * - KAFKA_BOOTSTRAP_SERVERS: The Kafka bootstrap servers (default: + * localhost:9092) + * - SUPERSTREAM_TOPICS_LIST: Comma-separated list of topics to optimize for + * (default: example-topic) + */ +public class KafkaConsumerExample { + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerExample.class); + + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + + private static final String CLIENT_ID = "superstream-example-consumer"; + private static final String GROUP_ID = "superstream-example-consumer-group"; + private static final String AUTO_OFFSET_RESET = "earliest"; // Start from beginning of topic + private static final long POLL_TIMEOUT_MS = 1000; // 1 second poll timeout + + private static final String TOPIC_NAME = "example-topic"; + + public static void main(String[] args) { + logger.info("Starting Kafka Consumer Example"); + logger.info("This example will consume messages from topic: {}", TOPIC_NAME); + logger.info("Watch for the 'hello world - KafkaConsumer intercepted!' message from the Superstream agent"); + + Properties props = createConsumerProperties(); + + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { + // The Superstream Agent should have intercepted the consumer creation + // and logged "hello world - KafkaConsumer intercepted!" to the console + + logger.info("KafkaConsumer created successfully with client.id: {}", CLIENT_ID); + logger.info("Consumer group: {}", GROUP_ID); + + // Subscribe to the topic + consumer.subscribe(Collections.singletonList(TOPIC_NAME)); + logger.info("Subscribed to topic: {}", TOPIC_NAME); + + int messageCount = 0; + long startTime = System.currentTimeMillis(); + + logger.info("Starting message consumption loop..."); + + while (true) { + try { + ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + + if (records.isEmpty()) { + logger.debug("No messages received in this poll cycle"); + continue; + } + + logger.info("Received {} messages in this batch", records.count()); + + for (ConsumerRecord record : records) { + messageCount++; + + logger.info("Message {}: Key={}, Value length={} bytes, Partition={}, Offset={}", + messageCount, + record.key(), + record.value() != null ? record.value().length() : 0, + record.partition(), + record.offset()); + + // Log first few characters of the message for verification + if (record.value() != null && record.value().length() > 0) { + String preview = record.value().length() > 100 + ? record.value().substring(0, 100) + "..." + : record.value(); + logger.debug("Message preview: {}", preview); + } + + // Simulate message processing time + Thread.sleep(100); + } + + // Commit offsets after processing the batch + consumer.commitSync(); + logger.debug("Committed offsets for {} messages", records.count()); + + // Log progress every 10 messages + if (messageCount % 10 == 0) { + long elapsedTime = System.currentTimeMillis() - startTime; + double messagesPerSecond = (messageCount * 1000.0) / elapsedTime; + logger.info("Progress: {} messages consumed in {} ms ({:.2f} msg/sec)", + messageCount, elapsedTime, messagesPerSecond); + } + + } catch (Exception e) { + logger.error("Error processing messages: {}", e.getMessage(), e); + // Continue consuming despite errors + } + } + + } catch (Exception e) { + logger.error("Fatal error in consumer", e); + } + } + + /** + * Create consumer properties with optimal settings for the example. + */ + private static Properties createConsumerProperties() { + Properties props = new Properties(); + + // Basic Kafka configuration + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID); + + // Serialization + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + // Consumer behavior + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit for better control + + // Performance tuning + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB minimum fetch + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for minimum bytes + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // Process up to 100 records per poll + + // Session and heartbeat configuration + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30 seconds + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10 seconds + + logger.info("Consumer configuration:"); + props.stringPropertyNames().forEach(key -> { + if (!isSensitiveProperty(key)) { + logger.info(" {} = {}", key, props.getProperty(key)); + } + }); + + return props; + } + + /** + * Check if a property contains sensitive information that shouldn't be logged. + */ + private static boolean isSensitiveProperty(String propertyName) { + String lowerName = propertyName.toLowerCase(); + return lowerName.contains("password") || + lowerName.contains("secret") || + lowerName.contains("key") || + lowerName.contains("token") || + lowerName.contains("credential"); + } +} \ No newline at end of file diff --git a/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorIntegrationTest.java b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorIntegrationTest.java new file mode 100644 index 0000000..1381482 --- /dev/null +++ b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorIntegrationTest.java @@ -0,0 +1,157 @@ +package ai.superstream.agent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for KafkaConsumerInterceptor. + * These tests simulate more realistic scenarios with actual Kafka consumer configurations. + */ +public class KafkaConsumerInterceptorIntegrationTest { + + private ByteArrayOutputStream outputStreamCaptor; + private PrintStream standardOut; + + @BeforeEach + public void setUp() { + // Capture System.out to verify console output + outputStreamCaptor = new ByteArrayOutputStream(); + standardOut = System.out; + System.setOut(new PrintStream(outputStreamCaptor)); + } + + @AfterEach + public void tearDown() { + // Restore original System.out + System.setOut(standardOut); + } + + @Test + public void testInterceptor_WithTypicalConsumerProperties() { + // Arrange - typical Kafka consumer properties + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-consumer"); + + Object[] args = new Object[]{consumerProps}; + + // Act + KafkaConsumerInterceptor.onEnter(args); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should intercept consumer with typical properties"); + } + + @Test + public void testInterceptor_WithMinimalConsumerProperties() { + // Arrange - minimal Kafka consumer properties + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "minimal-group"); + + Object[] args = new Object[]{consumerProps}; + + // Act + KafkaConsumerInterceptor.onEnter(args); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should intercept consumer with minimal properties"); + } + + @Test + public void testInterceptor_SimulateConsumerCreationLifecycle() { + // Arrange + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "lifecycle-test-group"); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "lifecycle-consumer"); + + Object[] enterArgs = new Object[]{consumerProps}; + Object mockConsumer = new MockKafkaConsumer(); + + // Act - simulate the full constructor lifecycle + KafkaConsumerInterceptor.onEnter(enterArgs); + KafkaConsumerInterceptor.onExit(mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should intercept during consumer creation lifecycle"); + } + + @Test + public void testInterceptor_WithMultipleConsumers() { + // Arrange + Properties props1 = new Properties(); + props1.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props1.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1"); + props1.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1"); + + Properties props2 = new Properties(); + props2.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093"); + props2.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2"); + props2.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-2"); + + Object[] args1 = new Object[]{props1}; + Object[] args2 = new Object[]{props2}; + + // Act + KafkaConsumerInterceptor.onEnter(args1); + KafkaConsumerInterceptor.onEnter(args2); + + // Assert + String output = outputStreamCaptor.toString(); + int messageCount = output.split("hello world - KafkaConsumer intercepted!", -1).length - 1; + assertEquals(2, messageCount, "Should intercept both consumers"); + } + + @Test + public void testInterceptor_PerformanceImpact() { + // Arrange + Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "performance-test-group"); + + Object[] args = new Object[]{consumerProps}; + + // Act & Assert - measure performance impact + long startTime = System.nanoTime(); + + for (int i = 0; i < 100; i++) { + KafkaConsumerInterceptor.onEnter(args); + } + + long endTime = System.nanoTime(); + long totalTime = endTime - startTime; + + // Each call should be very fast (less than 1ms on average) + double averageTimeMs = (totalTime / 100.0) / 1_000_000.0; + assertTrue(averageTimeMs < 1.0, + "Interceptor should have minimal performance impact, average time: " + averageTimeMs + "ms"); + } + + /** + * Mock KafkaConsumer class for testing purposes. + */ + private static class MockKafkaConsumer { + // Empty mock class to simulate a KafkaConsumer + } +} \ No newline at end of file diff --git a/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java new file mode 100644 index 0000000..623c6f8 --- /dev/null +++ b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java @@ -0,0 +1,155 @@ +package ai.superstream.agent; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for KafkaConsumerInterceptor. + */ +public class KafkaConsumerInterceptorTest { + + private ByteArrayOutputStream outputStreamCaptor; + private PrintStream standardOut; + + @BeforeEach + public void setUp() { + // Capture System.out to verify console output + outputStreamCaptor = new ByteArrayOutputStream(); + standardOut = System.out; + System.setOut(new PrintStream(outputStreamCaptor)); + } + + @AfterEach + public void tearDown() { + // Restore original System.out + System.setOut(standardOut); + } + + @Test + public void testOnEnter_LogsHelloWorld() { + // Arrange + Object[] args = new Object[]{new Properties()}; + + // Act + KafkaConsumerInterceptor.onEnter(args); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should log hello world message to console"); + } + + @Test + public void testOnEnter_WithNullArgs() { + // Arrange + Object[] args = null; + + // Act & Assert - should not throw exception + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onEnter(args)); + + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should still log hello world message even with null args"); + } + + @Test + public void testOnEnter_WithEmptyArgs() { + // Arrange + Object[] args = new Object[]{}; + + // Act + KafkaConsumerInterceptor.onEnter(args); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should log hello world message with empty args"); + } + + @Test + public void testIsDisabled_ReturnsCurrentValue() { + // Test the current value of isDisabled() method + // Note: The actual value depends on the SUPERSTREAM_DISABLED environment variable + // at class loading time, so we just verify the method returns a boolean value + boolean result = KafkaConsumerInterceptor.isDisabled(); + // This should not throw an exception and should return a boolean + assertTrue(result || !result, "isDisabled() should return a boolean value"); + } + + @Test + public void testOnExit_WithValidConsumer() { + // Arrange + Object mockConsumer = new Object(); // Simple mock consumer + + // Act & Assert - should not throw exception + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onExit(mockConsumer)); + } + + @Test + public void testOnExit_WithNullConsumer() { + // Arrange + Object consumer = null; + + // Act & Assert - should not throw exception + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onExit(consumer)); + } + + @Test + public void testMultipleOnEnterCalls() { + // Arrange + Object[] args1 = new Object[]{new Properties()}; + Object[] args2 = new Object[]{new Properties()}; + + // Act + KafkaConsumerInterceptor.onEnter(args1); + KafkaConsumerInterceptor.onEnter(args2); + + // Assert + String output = outputStreamCaptor.toString(); + // Should contain the message twice + int count = output.split("hello world - KafkaConsumer intercepted!", -1).length - 1; + assertEquals(2, count, "Should log hello world message twice"); + } + + @Test + public void testOnEnterAndOnExit_Together() { + // Arrange + Object[] args = new Object[]{new Properties()}; + Object mockConsumer = new Object(); + + // Act + KafkaConsumerInterceptor.onEnter(args); + KafkaConsumerInterceptor.onExit(mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("hello world - KafkaConsumer intercepted!"), + "Should log hello world message from onEnter"); + // onExit doesn't produce console output, just logs debug info + } + + @Test + public void testLoggingDoesNotInterfereWithExecution() { + // Arrange + Object[] args = new Object[]{new Properties()}; + Object mockConsumer = new Object(); + + // Act & Assert - methods should complete without throwing exceptions + long startTime = System.currentTimeMillis(); + + assertDoesNotThrow(() -> { + KafkaConsumerInterceptor.onEnter(args); + KafkaConsumerInterceptor.onExit(mockConsumer); + }); + + long executionTime = System.currentTimeMillis() - startTime; + assertTrue(executionTime < 1000, "Interceptor methods should execute quickly"); + } +} \ No newline at end of file From 43d3795e812ddf9093432121b9809a37284fce44 Mon Sep 17 00:00:00 2001 From: Dima <4478955@gmail.com> Date: Sun, 15 Jun 2025 15:14:53 +0100 Subject: [PATCH 2/3] agent changes --- .../java/ai/superstream/agent/SuperstreamAgent.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java b/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java index d9a31af..af1d27c 100644 --- a/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java +++ b/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java @@ -57,6 +57,16 @@ private static void install(Instrumentation instrumentation) { .on(ElementMatchers.isConstructor()))) .installOn(instrumentation); + // Intercept KafkaConsumer constructor for metrics collection + new AgentBuilder.Default() + .disableClassFormatChanges() + .type(ElementMatchers.nameEndsWith("KafkaConsumer")) + .transform((builder, typeDescription, classLoader, module, + protectionDomain) -> builder + .visit(Advice.to(KafkaConsumerInterceptor.class) + .on(ElementMatchers.isConstructor()))) + .installOn(instrumentation); + logger.info("Superstream Agent successfully installed instrumentation"); } } \ No newline at end of file From c64b7b0b2ac89428dadf024e305633b4b286df5d Mon Sep 17 00:00:00 2001 From: Dima <4478955@gmail.com> Date: Mon, 16 Jun 2025 08:26:54 +0100 Subject: [PATCH 3/3] consume interceptor poll --- .../examples/KafkaConsumerExample.java | 198 +++++------ .../KafkaProducerConsumerExample.java | 235 +++++++++++++ .../agent/KafkaConsumerInterceptor.java | 225 ++++++++++++ .../superstream/agent/SuperstreamAgent.java | 6 +- .../agent/KafkaConsumerInterceptorTest.java | 319 ++++++++++++++++++ 5 files changed, 861 insertions(+), 122 deletions(-) create mode 100644 examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerConsumerExample.java create mode 100644 superstream-clients/src/main/java/ai/superstream/agent/KafkaConsumerInterceptor.java diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java index 26f9050..99fdd81 100644 --- a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaConsumerExample.java @@ -9,8 +9,7 @@ import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.Collections; -import java.util.Properties; +import java.util.*; /** * Example application that uses the Kafka Clients API to consume messages. @@ -36,140 +35,99 @@ public class KafkaConsumerExample { // === Configuration Constants === private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; - + private static final String CONSUMER_GROUP_ID = "superstream-example-consumer-group"; private static final String CLIENT_ID = "superstream-example-consumer"; - private static final String GROUP_ID = "superstream-example-consumer-group"; - private static final String AUTO_OFFSET_RESET = "earliest"; // Start from beginning of topic - private static final long POLL_TIMEOUT_MS = 1000; // 1 second poll timeout - private static final String TOPIC_NAME = "example-topic"; public static void main(String[] args) { - logger.info("Starting Kafka Consumer Example"); - logger.info("This example will consume messages from topic: {}", TOPIC_NAME); - logger.info("Watch for the 'hello world - KafkaConsumer intercepted!' message from the Superstream agent"); + System.out.println("=== Superstream KafkaConsumer Example ==="); + System.out.println("This example demonstrates the Superstream agent intercepting KafkaConsumer operations"); + System.out.println(); + + // Build consumer configuration + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); + + System.out.println("Consumer configuration:"); + System.out.println(" Bootstrap servers: " + DEFAULT_BOOTSTRAP_SERVERS); + System.out.println(" Group ID: " + CONSUMER_GROUP_ID); + System.out.println(" Client ID: " + CLIENT_ID); + System.out.println(" Topic: " + TOPIC_NAME); + System.out.println(); - Properties props = createConsumerProperties(); - - try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { - // The Superstream Agent should have intercepted the consumer creation - // and logged "hello world - KafkaConsumer intercepted!" to the console - - logger.info("KafkaConsumer created successfully with client.id: {}", CLIENT_ID); - logger.info("Consumer group: {}", GROUP_ID); - + // Create KafkaConsumer - this should trigger the interceptor + System.out.println("Creating KafkaConsumer (should trigger Superstream interceptor)..."); + KafkaConsumer consumer = new KafkaConsumer<>(props); + System.out.println("KafkaConsumer created successfully"); + System.out.println(); + + try { // Subscribe to the topic + System.out.println("Subscribing to topic: " + TOPIC_NAME); consumer.subscribe(Collections.singletonList(TOPIC_NAME)); - logger.info("Subscribed to topic: {}", TOPIC_NAME); - - int messageCount = 0; + System.out.println("Subscribed successfully"); + System.out.println(); + + System.out.println("Starting to poll for messages..."); + System.out.println("Note: Each poll() call should trigger the message consumed interceptor"); + System.out.println(); + long startTime = System.currentTimeMillis(); - - logger.info("Starting message consumption loop..."); - + int totalMessagesConsumed = 0; + int pollCount = 0; + while (true) { - try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - - if (records.isEmpty()) { - logger.debug("No messages received in this poll cycle"); - continue; - } - - logger.info("Received {} messages in this batch", records.count()); + pollCount++; + System.out.println("--- Poll #" + pollCount + " ---"); + + // Poll for messages - this should trigger the onPollExit interceptor + ConsumerRecords records = consumer.poll(Duration.ofMillis(5000)); + + if (records.isEmpty()) { + System.out.println("No messages received in this poll"); + } else { + System.out.println("Received " + records.count() + " messages in this poll"); for (ConsumerRecord record : records) { - messageCount++; - - logger.info("Message {}: Key={}, Value length={} bytes, Partition={}, Offset={}", - messageCount, - record.key(), - record.value() != null ? record.value().length() : 0, - record.partition(), - record.offset()); + totalMessagesConsumed++; + System.out.printf("Application processed message %d: topic=%s, partition=%d, offset=%d, key=%s%n", + totalMessagesConsumed, record.topic(), record.partition(), record.offset(), record.key()); - // Log first few characters of the message for verification - if (record.value() != null && record.value().length() > 0) { - String preview = record.value().length() > 100 - ? record.value().substring(0, 100) + "..." - : record.value(); - logger.debug("Message preview: {}", preview); - } - - // Simulate message processing time - Thread.sleep(100); + // Log message value only in debug mode to avoid spam + logger.debug("Message value: {}", record.value()); } - - // Commit offsets after processing the batch - consumer.commitSync(); - logger.debug("Committed offsets for {} messages", records.count()); - - // Log progress every 10 messages - if (messageCount % 10 == 0) { - long elapsedTime = System.currentTimeMillis() - startTime; - double messagesPerSecond = (messageCount * 1000.0) / elapsedTime; - logger.info("Progress: {} messages consumed in {} ms ({:.2f} msg/sec)", - messageCount, elapsedTime, messagesPerSecond); - } - - } catch (Exception e) { - logger.error("Error processing messages: {}", e.getMessage(), e); - // Continue consuming despite errors } + + System.out.println("Total messages consumed so far: " + totalMessagesConsumed); + + // Run for about 2 minutes then exit + if (System.currentTimeMillis() - startTime > 120000) { + System.out.println("Demo completed after 2 minutes"); + break; + } + + System.out.println(); + Thread.sleep(2000); // Wait 2 seconds between polls } - + } catch (Exception e) { - logger.error("Fatal error in consumer", e); + logger.error("Error in consumer", e); + System.err.println("Error: " + e.getMessage()); + } finally { + System.out.println("Closing KafkaConsumer..."); + consumer.close(); + System.out.println("Consumer closed successfully"); + System.out.println(); + System.out.println("=== Demo Complete ==="); } } - - /** - * Create consumer properties with optimal settings for the example. - */ - private static Properties createConsumerProperties() { - Properties props = new Properties(); - - // Basic Kafka configuration - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS); - props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID); - - // Serialization - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - // Consumer behavior - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit for better control - - // Performance tuning - props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); // 1KB minimum fetch - props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // Wait up to 500ms for minimum bytes - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // Process up to 100 records per poll - - // Session and heartbeat configuration - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 30 seconds - props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000); // 10 seconds - - logger.info("Consumer configuration:"); - props.stringPropertyNames().forEach(key -> { - if (!isSensitiveProperty(key)) { - logger.info(" {} = {}", key, props.getProperty(key)); - } - }); - - return props; - } - - /** - * Check if a property contains sensitive information that shouldn't be logged. - */ - private static boolean isSensitiveProperty(String propertyName) { - String lowerName = propertyName.toLowerCase(); - return lowerName.contains("password") || - lowerName.contains("secret") || - lowerName.contains("key") || - lowerName.contains("token") || - lowerName.contains("credential"); - } } \ No newline at end of file diff --git a/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerConsumerExample.java b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerConsumerExample.java new file mode 100644 index 0000000..8d9c794 --- /dev/null +++ b/examples/kafka-clients-example/src/main/java/ai/superstream/examples/KafkaProducerConsumerExample.java @@ -0,0 +1,235 @@ +package ai.superstream.examples; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * Combined example that demonstrates both KafkaProducer and KafkaConsumer + * interceptors working together. + * + * This example will: + * 1. Create a KafkaProducer (triggers producer interceptor) + * 2. Create a KafkaConsumer (triggers consumer interceptor) + * 3. Send messages with the producer + * 4. Consume messages with the consumer + * + * Run with: + * java -javaagent:path/to/superstream-clients-1.0.0.jar + * -Dlogback.configurationFile=logback.xml -jar + * kafka-clients-example-1.0.0-jar-with-dependencies.jar + * + * You should see both interceptor messages: + * - "hello world - KafkaConsumer intercepted!" + * - Producer optimization messages from the producer interceptor + */ +public class KafkaProducerConsumerExample { + private static final Logger logger = LoggerFactory.getLogger(KafkaProducerConsumerExample.class); + + // === Configuration Constants === + private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + private static final String TOPIC_NAME = "combined-example-topic"; + + // Producer settings + private static final String PRODUCER_CLIENT_ID = "combined-example-producer"; + private static final String COMPRESSION_TYPE = "zstd"; + private static final Integer BATCH_SIZE = 16384; // 16KB batch size + + // Consumer settings + private static final String CONSUMER_CLIENT_ID = "combined-example-consumer"; + private static final String GROUP_ID = "combined-example-consumer-group"; + + private static final int MESSAGE_COUNT = 10; + + public static void main(String[] args) { + logger.info("=== Kafka Producer-Consumer Combined Example ==="); + logger.info("This example demonstrates both producer and consumer interceptors"); + logger.info("Watch for interceptor messages:"); + logger.info(" - 'hello world - KafkaConsumer intercepted!'"); + logger.info(" - Producer optimization messages"); + logger.info(""); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + // Start consumer first to ensure it's ready + logger.info("Starting consumer thread..."); + Future consumerFuture = executor.submit(KafkaProducerConsumerExample::runConsumer); + + // Give consumer time to start up + Thread.sleep(2000); + + // Start producer + logger.info("Starting producer thread..."); + Future producerFuture = executor.submit(KafkaProducerConsumerExample::runProducer); + + // Wait for producer to finish + producerFuture.get(); + logger.info("Producer finished, consumer will continue running..."); + + // Let consumer run for a bit longer to process all messages + Thread.sleep(5000); + + // Cancel consumer and shutdown + consumerFuture.cancel(true); + + } catch (Exception e) { + logger.error("Error in combined example", e); + } finally { + executor.shutdownNow(); + logger.info("Example completed"); + } + } + + /** + * Run the producer in a separate thread. + */ + private static void runProducer() { + logger.info("=== PRODUCER: Creating KafkaProducer ==="); + + Properties producerProps = createProducerProperties(); + + try (Producer producer = new KafkaProducer<>(producerProps)) { + logger.info("PRODUCER: KafkaProducer created successfully"); + + for (int i = 1; i <= MESSAGE_COUNT; i++) { + String key = "key-" + i; + String value = String.format("Message %d: %s - timestamp: %d", + i, generateSampleMessage(), System.currentTimeMillis()); + + ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, key, value); + + producer.send(record, (metadata, exception) -> { + if (exception != null) { + logger.error("PRODUCER: Failed to send message: {}", exception.getMessage()); + } else { + logger.info("PRODUCER: Sent message to partition {} at offset {}", + metadata.partition(), metadata.offset()); + } + }); + + // Small delay between messages + Thread.sleep(500); + } + + producer.flush(); + logger.info("PRODUCER: All {} messages sent successfully", MESSAGE_COUNT); + + } catch (Exception e) { + logger.error("PRODUCER: Error", e); + } + } + + /** + * Run the consumer in a separate thread. + */ + private static void runConsumer() { + logger.info("=== CONSUMER: Creating KafkaConsumer ==="); + + Properties consumerProps = createConsumerProperties(); + + try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProps)) { + logger.info("CONSUMER: KafkaConsumer created successfully"); + + consumer.subscribe(Collections.singletonList(TOPIC_NAME)); + logger.info("CONSUMER: Subscribed to topic: {}", TOPIC_NAME); + + int messagesConsumed = 0; + long lastMessageTime = System.currentTimeMillis(); + + while (!Thread.currentThread().isInterrupted()) { + try { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + if (!records.isEmpty()) { + lastMessageTime = System.currentTimeMillis(); + + for (ConsumerRecord record : records) { + messagesConsumed++; + logger.info("CONSUMER: Message {}: Key={}, Value preview={}", + messagesConsumed, + record.key(), + record.value().length() > 50 + ? record.value().substring(0, 50) + "..." + : record.value()); + } + + consumer.commitSync(); + } else { + // Stop if no messages for 10 seconds and we've consumed some messages + if (messagesConsumed > 0 && + System.currentTimeMillis() - lastMessageTime > 10000) { + logger.info("CONSUMER: No messages for 10 seconds, stopping..."); + break; + } + } + + } catch (Exception e) { + if (Thread.currentThread().isInterrupted()) { + logger.info("CONSUMER: Consumer interrupted, stopping..."); + break; + } + logger.error("CONSUMER: Error processing messages", e); + } + } + + logger.info("CONSUMER: Finished consuming {} messages", messagesConsumed); + + } catch (Exception e) { + logger.error("CONSUMER: Error", e); + } + } + + /** + * Create producer properties. + */ + private static Properties createProducerProperties() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS); + props.put(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, COMPRESSION_TYPE); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE); + props.put(ProducerConfig.LINGER_MS_CONFIG, 100); + + logger.info("PRODUCER: Configuration created"); + return props; + } + + /** + * Create consumer properties. + */ + private static Properties createConsumerProperties() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, DEFAULT_BOOTSTRAP_SERVERS); + props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + logger.info("CONSUMER: Configuration created"); + return props; + } + + /** + * Generate a sample message with some content. + */ + private static String generateSampleMessage() { + return String.format("Sample data with timestamp %d and random value %d", + System.currentTimeMillis(), + (int)(Math.random() * 1000)); + } +} \ No newline at end of file diff --git a/superstream-clients/src/main/java/ai/superstream/agent/KafkaConsumerInterceptor.java b/superstream-clients/src/main/java/ai/superstream/agent/KafkaConsumerInterceptor.java new file mode 100644 index 0000000..fc62c99 --- /dev/null +++ b/superstream-clients/src/main/java/ai/superstream/agent/KafkaConsumerInterceptor.java @@ -0,0 +1,225 @@ +package ai.superstream.agent; + +import ai.superstream.util.SuperstreamLogger; +import net.bytebuddy.asm.Advice; + +/** + * Intercepts KafkaConsumer constructor calls and poll() method to collect metrics and log consumption. + */ +public class KafkaConsumerInterceptor { + public static final SuperstreamLogger logger = SuperstreamLogger.getLogger(KafkaConsumerInterceptor.class); + + // Environment variable to check if Superstream is disabled + private static final String DISABLED_ENV_VAR = "SUPERSTREAM_DISABLED"; + public static final boolean DISABLED = Boolean.parseBoolean(System.getenv(DISABLED_ENV_VAR)); + + /** + * Check if Superstream is disabled via environment variable. + * + * @return true if Superstream is disabled, false otherwise + */ + public static boolean isDisabled() { + return DISABLED; + } + + /** + * Called before the KafkaConsumer constructor. + * Used to intercept consumer creation. + * + * @param args The consumer arguments + */ + @Advice.OnMethodEnter + public static void onEnter(@Advice.AllArguments Object[] args) { + // Skip if Superstream is disabled via environment variable + if (isDisabled()) { + return; + } + + try { + logger.info("hello world"); + System.out.println("hello world - KafkaConsumer intercepted!"); + } catch (Exception e) { + logger.error("Error in KafkaConsumer interceptor: " + e.getMessage(), e); + } + } + + /** + * Called after the KafkaConsumer constructor. + * Used to register the consumer for future processing. + * + * @param consumer The KafkaConsumer instance that was just created + */ + @Advice.OnMethodExit + public static void onExit(@Advice.This Object consumer) { + // Skip if Superstream is disabled via environment variable + if (isDisabled()) { + return; + } + + try { + if (consumer != null) { + logger.debug("KafkaConsumer constructor completed for: {}", consumer.getClass().getName()); + } else { + logger.debug("KafkaConsumer constructor completed (consumer is null)"); + } + } catch (Exception e) { + logger.error("Error in KafkaConsumer exit interceptor: " + e.getMessage(), e); + } + } + + /** + * Called after the KafkaConsumer poll() method. + * Used to log consumed messages. + * + * @param result The ConsumerRecords returned by poll() + * @param consumer The KafkaConsumer instance + */ + @Advice.OnMethodExit + public static void onPollExit(@Advice.Return Object result, @Advice.This Object consumer) { + // Skip if Superstream is disabled via environment variable + if (isDisabled()) { + return; + } + + try { + if (result != null) { + // Get the count of messages using reflection + int messageCount = getConsumerRecordsCount(result); + + if (messageCount > 0) { + logger.info("message consumed"); + System.out.println("message consumed - " + messageCount + " records from poll()"); + + // Log detailed information if debug is enabled + if (logger.isDebugEnabled()) { + logConsumerRecordsDetails(result, consumer); + } + } + } + } catch (Exception e) { + logger.error("Error in KafkaConsumer poll interceptor: " + e.getMessage(), e); + } + } + + /** + * Get the count of records in ConsumerRecords using reflection. + */ + static int getConsumerRecordsCount(Object consumerRecords) { + try { + // Try to call count() method + java.lang.reflect.Method countMethod = consumerRecords.getClass().getMethod("count"); + Object countResult = countMethod.invoke(consumerRecords); + if (countResult instanceof Integer) { + return (Integer) countResult; + } + } catch (Exception e) { + logger.debug("Could not get record count via count() method: {}", e.getMessage()); + } + + try { + // Try to call isEmpty() method and return 0 if empty, 1 if not empty + java.lang.reflect.Method isEmptyMethod = consumerRecords.getClass().getMethod("isEmpty"); + Object isEmptyResult = isEmptyMethod.invoke(consumerRecords); + if (isEmptyResult instanceof Boolean) { + return ((Boolean) isEmptyResult) ? 0 : 1; // If not empty, assume at least 1 record + } + } catch (Exception e) { + logger.debug("Could not check isEmpty(): {}", e.getMessage()); + } + + return 0; + } + + /** + * Log detailed information about consumed records (debug mode only). + */ + private static void logConsumerRecordsDetails(Object consumerRecords, Object consumer) { + try { + // Get consumer client ID for logging context + String clientId = getConsumerClientId(consumer); + + // Try to iterate through records for detailed logging + java.lang.reflect.Method iteratorMethod = consumerRecords.getClass().getMethod("iterator"); + Object iterator = iteratorMethod.invoke(consumerRecords); + + if (iterator instanceof java.util.Iterator) { + @SuppressWarnings("unchecked") + java.util.Iterator recordIterator = (java.util.Iterator) iterator; + + int recordIndex = 0; + while (recordIterator.hasNext() && recordIndex < 5) { // Log max 5 records to avoid spam + Object record = recordIterator.next(); + recordIndex++; + + try { + Object topicObj = getRecordField(record, "topic"); + Object partitionObj = getRecordField(record, "partition"); + Object offsetObj = getRecordField(record, "offset"); + Object key = getRecordField(record, "key"); + + String topic = topicObj != null ? topicObj.toString() : null; + Integer partition = partitionObj instanceof Integer ? (Integer) partitionObj : null; + Long offset = offsetObj instanceof Long ? (Long) offsetObj : null; + + logger.debug("Consumer {} consumed record {}: topic={}, partition={}, offset={}, key={}", + clientId, recordIndex, topic, partition, offset, key); + } catch (Exception e) { + logger.debug("Could not extract record details for record {}: {}", recordIndex, e.getMessage()); + } + } + + if (recordIterator.hasNext()) { + logger.debug("... and more records (showing only first 5)"); + } + } + } catch (Exception e) { + logger.debug("Could not log consumer records details: {}", e.getMessage()); + } + } + + /** + * Get consumer client ID for logging context. + */ + private static String getConsumerClientId(Object consumer) { + try { + // Try to get clientId from the consumer + java.lang.reflect.Field clientIdField = findField(consumer.getClass(), "clientId"); + if (clientIdField != null) { + clientIdField.setAccessible(true); + Object clientId = clientIdField.get(consumer); + return clientId != null ? clientId.toString() : "unknown"; + } + } catch (Exception e) { + logger.debug("Could not get client ID: {}", e.getMessage()); + } + return "unknown"; + } + + /** + * Get a field value from a ConsumerRecord using reflection. + */ + static Object getRecordField(Object record, String fieldName) { + try { + java.lang.reflect.Method method = record.getClass().getMethod(fieldName); + return method.invoke(record); + } catch (Exception e) { + logger.debug("Could not get field {} from record: {}", fieldName, e.getMessage()); + return null; + } + } + + /** + * Find a field in a class or its superclasses. + */ + private static java.lang.reflect.Field findField(Class clazz, String fieldName) { + if (clazz == null || clazz == Object.class) { + return null; + } + + try { + return clazz.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + return findField(clazz.getSuperclass(), fieldName); + } + } +} \ No newline at end of file diff --git a/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java b/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java index 8842dd0..903e73c 100644 --- a/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java +++ b/superstream-clients/src/main/java/ai/superstream/agent/SuperstreamAgent.java @@ -78,14 +78,16 @@ private static void install(Instrumentation instrumentation) { .on(ElementMatchers.named("close")))) .installOn(instrumentation); - // Intercept KafkaConsumer constructor for metrics collection + // Intercept KafkaConsumer constructor and poll() method for metrics collection new AgentBuilder.Default() .disableClassFormatChanges() .type(ElementMatchers.nameEndsWith("KafkaConsumer")) .transform((builder, typeDescription, classLoader, module, protectionDomain) -> builder .visit(Advice.to(KafkaConsumerInterceptor.class) - .on(ElementMatchers.isConstructor()))) + .on(ElementMatchers.isConstructor())) + .visit(Advice.to(KafkaConsumerInterceptor.class) + .on(ElementMatchers.named("poll")))) .installOn(instrumentation); logger.info("Superstream Agent successfully installed instrumentation"); diff --git a/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java index 623c6f8..32feda2 100644 --- a/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java +++ b/superstream-clients/src/test/java/ai/superstream/agent/KafkaConsumerInterceptorTest.java @@ -3,12 +3,20 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.lang.reflect.Method; import java.util.Properties; +import java.util.Iterator; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; /** * Unit tests for KafkaConsumerInterceptor. @@ -152,4 +160,315 @@ public void testLoggingDoesNotInterfereWithExecution() { long executionTime = System.currentTimeMillis() - startTime; assertTrue(executionTime < 1000, "Interceptor methods should execute quickly"); } + + // ===== POLL METHOD INTERCEPTION TESTS ===== + + @Test + public void testOnPollExit_WithNullResult() { + // Arrange + Object result = null; + Object mockConsumer = new Object(); + + // Act & Assert - should not throw exception + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onPollExit(result, mockConsumer)); + + // Assert no message consumed log for null result + String output = outputStreamCaptor.toString(); + assertFalse(output.contains("message consumed"), + "Should not log message consumed for null result"); + } + + @Test + public void testOnPollExit_WithEmptyConsumerRecords() { + // Arrange + Object mockConsumerRecords = createMockConsumerRecords(0); + Object mockConsumer = new Object(); + + // Act + KafkaConsumerInterceptor.onPollExit(mockConsumerRecords, mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertFalse(output.contains("message consumed"), + "Should not log message consumed for empty records"); + } + + @Test + public void testOnPollExit_WithNonEmptyConsumerRecords() { + // Arrange + Object mockConsumerRecords = createMockConsumerRecords(3); + Object mockConsumer = new Object(); + + // Act + KafkaConsumerInterceptor.onPollExit(mockConsumerRecords, mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("message consumed - 3 records from poll()"), + "Should log message consumed with record count"); + } + + @Test + public void testOnPollExit_WithLargeNumberOfRecords() { + // Arrange + Object mockConsumerRecords = createMockConsumerRecords(100); + Object mockConsumer = new Object(); + + // Act + KafkaConsumerInterceptor.onPollExit(mockConsumerRecords, mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("message consumed - 100 records from poll()"), + "Should log message consumed with large record count"); + } + + @Test + public void testOnPollExit_HandlesReflectionFailure() { + // Arrange - create an object that doesn't have count() or isEmpty() methods + Object invalidRecords = new Object(); + Object mockConsumer = new Object(); + + // Act & Assert - should not throw exception even when reflection fails + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onPollExit(invalidRecords, mockConsumer)); + + // Should not log message consumed when count cannot be determined + String output = outputStreamCaptor.toString(); + assertFalse(output.contains("message consumed"), + "Should not log message consumed when record count cannot be determined"); + } + + @Test + public void testGetConsumerRecordsCount_WithValidCountMethod() { + // Arrange + Object mockRecords = createMockConsumerRecords(5); + + // Act + int count = KafkaConsumerInterceptor.getConsumerRecordsCount(mockRecords); + + // Assert + assertEquals(5, count, "Should return correct count from count() method"); + } + + @Test + public void testGetConsumerRecordsCount_WithIsEmptyMethod() { + // Arrange + MockConsumerRecordsWithIsEmpty mockRecords = new MockConsumerRecordsWithIsEmpty(false); + + // Act + int count = KafkaConsumerInterceptor.getConsumerRecordsCount(mockRecords); + + // Assert + assertEquals(1, count, "Should return 1 when isEmpty() returns false"); + } + + @Test + public void testGetConsumerRecordsCount_WithEmptyRecords() { + // Arrange + MockConsumerRecordsWithIsEmpty mockRecords = new MockConsumerRecordsWithIsEmpty(true); + + // Act + int count = KafkaConsumerInterceptor.getConsumerRecordsCount(mockRecords); + + // Assert + assertEquals(0, count, "Should return 0 when isEmpty() returns true"); + } + + @Test + public void testGetConsumerRecordsCount_WithInvalidObject() { + // Arrange + Object invalidObject = new Object(); + + // Act + int count = KafkaConsumerInterceptor.getConsumerRecordsCount(invalidObject); + + // Assert + assertEquals(0, count, "Should return 0 for object without count() or isEmpty() methods"); + } + + @Test + public void testMultiplePollCalls() { + // Arrange + Object records1 = createMockConsumerRecords(2); + Object records2 = createMockConsumerRecords(3); + Object records3 = createMockConsumerRecords(0); // empty + Object mockConsumer = new Object(); + + // Act + KafkaConsumerInterceptor.onPollExit(records1, mockConsumer); + KafkaConsumerInterceptor.onPollExit(records2, mockConsumer); + KafkaConsumerInterceptor.onPollExit(records3, mockConsumer); + + // Assert + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("message consumed - 2 records from poll()"), + "Should log first poll with 2 records"); + assertTrue(output.contains("message consumed - 3 records from poll()"), + "Should log second poll with 3 records"); + + // Count occurrences of specific console messages + int count2Records = output.split("message consumed - 2 records from poll\\(\\)", -1).length - 1; + int count3Records = output.split("message consumed - 3 records from poll\\(\\)", -1).length - 1; + assertEquals(1, count2Records, "Should log message consumed for 2 records once"); + assertEquals(1, count3Records, "Should log message consumed for 3 records once"); + + // Ensure empty poll doesn't generate console message + assertFalse(output.contains("message consumed - 0 records from poll()"), + "Should not log message consumed for empty poll"); + } + + @Test + public void testOnPollExit_WithNullConsumer() { + // Arrange + Object mockConsumerRecords = createMockConsumerRecords(1); + Object consumer = null; + + // Act & Assert - should not throw exception + assertDoesNotThrow(() -> KafkaConsumerInterceptor.onPollExit(mockConsumerRecords, consumer)); + + String output = outputStreamCaptor.toString(); + assertTrue(output.contains("message consumed - 1 records from poll()"), + "Should log message consumed even with null consumer"); + } + + @Test + public void testGetRecordField_WithValidRecord() { + // Arrange + MockConsumerRecord mockRecord = new MockConsumerRecord("test-topic", 0, 123L, "test-key"); + + // Act + Object topic = KafkaConsumerInterceptor.getRecordField(mockRecord, "topic"); + Object partition = KafkaConsumerInterceptor.getRecordField(mockRecord, "partition"); + Object offset = KafkaConsumerInterceptor.getRecordField(mockRecord, "offset"); + Object key = KafkaConsumerInterceptor.getRecordField(mockRecord, "key"); + + // Assert + assertEquals("test-topic", topic); + assertEquals(0, partition); + assertEquals(123L, offset); + assertEquals("test-key", key); + } + + @Test + public void testGetRecordField_WithInvalidField() { + // Arrange + MockConsumerRecord mockRecord = new MockConsumerRecord("test-topic", 0, 123L, "test-key"); + + // Act + Object result = KafkaConsumerInterceptor.getRecordField(mockRecord, "nonexistentField"); + + // Assert + assertNull(result, "Should return null for non-existent field"); + } + + @Test + public void testGetRecordField_WithNullRecord() { + // Arrange + Object record = null; + + // Act + Object result = KafkaConsumerInterceptor.getRecordField(record, "topic"); + + // Assert + assertNull(result, "Should return null for null record"); + } + + // ===== HELPER METHODS AND MOCK CLASSES ===== + + /** + * Creates a mock ConsumerRecords object with the specified count. + */ + private Object createMockConsumerRecords(int count) { + return new MockConsumerRecords(count); + } + + /** + * Mock ConsumerRecords class that implements count() method. + */ + public static class MockConsumerRecords { + private final int count; + + public MockConsumerRecords(int count) { + this.count = count; + } + + public int count() { + return count; + } + + public boolean isEmpty() { + return count == 0; + } + + public Iterator iterator() { + List records = new ArrayList<>(); + for (int i = 0; i < count; i++) { + records.add(new MockConsumerRecord("topic" + i, i % 3, (long) i, "key" + i)); + } + return records.iterator(); + } + } + + /** + * Mock ConsumerRecords class that only implements isEmpty() method. + */ + public static class MockConsumerRecordsWithIsEmpty { + private final boolean empty; + + public MockConsumerRecordsWithIsEmpty(boolean empty) { + this.empty = empty; + } + + public boolean isEmpty() { + return empty; + } + } + + /** + * Mock ConsumerRecord class with basic fields. + */ + public static class MockConsumerRecord { + private final String topic; + private final int partition; + private final long offset; + private final String key; + + public MockConsumerRecord(String topic, int partition, long offset, String key) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.key = key; + } + + public String topic() { + return topic; + } + + public int partition() { + return partition; + } + + public long offset() { + return offset; + } + + public String key() { + return key; + } + } + + /** + * Mock Consumer class for testing. + */ + public static class MockConsumer { + private final String clientId; + + public MockConsumer(String clientId) { + this.clientId = clientId; + } + + public String getClientId() { + return clientId; + } + } } \ No newline at end of file