From 4e2e35b0b2797f6c6d1c8c3b9f19fa5a25acc9a2 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Thu, 14 Aug 2025 21:36:27 +0530 Subject: [PATCH 1/4] Initialize ducktape setup --- tests/ducktape/README.md | 24 ++++ tests/ducktape/__init__.py | 1 + tests/ducktape/run_ducktape_test.py | 107 ++++++++++++++ tests/ducktape/services/__init__.py | 1 + tests/ducktape/services/kafka.py | 100 +++++++++++++ tests/ducktape/test_producer.py | 216 ++++++++++++++++++++++++++++ 6 files changed, 449 insertions(+) create mode 100644 tests/ducktape/README.md create mode 100644 tests/ducktape/__init__.py create mode 100755 tests/ducktape/run_ducktape_test.py create mode 100644 tests/ducktape/services/__init__.py create mode 100644 tests/ducktape/services/kafka.py create mode 100644 tests/ducktape/test_producer.py diff --git a/tests/ducktape/README.md b/tests/ducktape/README.md new file mode 100644 index 000000000..0925aec2a --- /dev/null +++ b/tests/ducktape/README.md @@ -0,0 +1,24 @@ +# Ducktape Producer Tests + +Ducktape-based producer tests for the Confluent Kafka Python client. + +## Prerequisites + +- `pip install ducktape confluent-kafka` +- Kafka running on `localhost:9092` + +## Running Tests + +```bash +# Run all tests +./tests/ducktape/run_ducktape_test.py + +# Run specific test +./tests/ducktape/run_ducktape_test.py SimpleProducerTest.test_basic_produce +``` + +## Test Cases + +- **test_basic_produce**: Basic message production with callbacks +- **test_produce_multiple_batches**: Parameterized tests (5, 10, 50 messages) +- **test_produce_with_compression**: Matrix tests (none, gzip, snappy) \ No newline at end of file diff --git a/tests/ducktape/__init__.py b/tests/ducktape/__init__.py new file mode 100644 index 000000000..e4e265020 --- /dev/null +++ b/tests/ducktape/__init__.py @@ -0,0 +1 @@ +# Ducktape tests for Confluent Kafka Python diff --git a/tests/ducktape/run_ducktape_test.py b/tests/ducktape/run_ducktape_test.py new file mode 100755 index 000000000..217259266 --- /dev/null +++ b/tests/ducktape/run_ducktape_test.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Simple test runner for Ducktape Producer tests +""" +import sys +import os +import subprocess +import tempfile +import json + +# Add the project root to Python path +project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.insert(0, project_root) + +def create_cluster_config(): + """Create a simple cluster configuration for local testing""" + cluster_config = { + "cluster_type": "ducktape.cluster.localhost.LocalhostCluster", + "num_nodes": 3 + } + return cluster_config + +def create_test_config(): + """Create test configuration file""" + config = { + "ducktape_dir": os.path.dirname(os.path.abspath(__file__)), + "results_dir": os.path.join(tempfile.gettempdir(), "ducktape_results") + } + return config + +def main(): + """Run the ducktape producer test""" + print("Confluent Kafka Python - Ducktape Producer Test Runner") + print("=" * 60) + + # Check if ducktape is installed + try: + import ducktape + print(f"Using ducktape version: {ducktape.__version__}") + except ImportError: + print("ERROR: ducktape is not installed.") + print("Install it with: pip install ducktape") + return 1 + + # Check if confluent_kafka is available + try: + import confluent_kafka + print(f"Using confluent-kafka version: {confluent_kafka.version()}") + except ImportError: + print("ERROR: confluent_kafka is not installed.") + print("Install it with: pip install confluent-kafka") + return 1 + + # Get test directory and file + test_dir = os.path.dirname(os.path.abspath(__file__)) + test_file = os.path.join(test_dir, "test_producer.py") + + if not os.path.exists(test_file): + print(f"ERROR: Test file not found: {test_file}") + return 1 + + # Create results directory + results_dir = os.path.join(tempfile.gettempdir(), "ducktape_results") + os.makedirs(results_dir, exist_ok=True) + + print(f"Test file: {test_file}") + print(f"Results directory: {results_dir}") + print() + + # Build ducktape command + cmd = [ + "ducktape", + "--debug", # Enable debug output + "--results-root", results_dir, + "--cluster", "ducktape.cluster.localhost.LocalhostCluster", + "--default-num-nodes", "1", # Reduced since we don't need nodes for services + test_file + ] + + # Add specific test if provided as argument + if len(sys.argv) > 1: + test_method = sys.argv[1] + cmd[-1] = f"{test_file}::{test_method}" + print(f"Running specific test: {test_method}") + else: + print("Running all producer tests") + + print("Command:", " ".join(cmd)) + print() + + # Set up environment with proper Python path + env = os.environ.copy() + env['PYTHONPATH'] = project_root + + # Run the test + try: + result = subprocess.run(cmd, cwd=project_root, env=env) + return result.returncode + except KeyboardInterrupt: + print("\nTest interrupted by user") + return 1 + except Exception as e: + print(f"Error running test: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/ducktape/services/__init__.py b/tests/ducktape/services/__init__.py new file mode 100644 index 000000000..63f18ed42 --- /dev/null +++ b/tests/ducktape/services/__init__.py @@ -0,0 +1 @@ +# Ducktape services for Confluent Kafka Python testing diff --git a/tests/ducktape/services/kafka.py b/tests/ducktape/services/kafka.py new file mode 100644 index 000000000..733fe54f9 --- /dev/null +++ b/tests/ducktape/services/kafka.py @@ -0,0 +1,100 @@ +""" +Kafka client wrapper for Ducktape tests +Assumes Kafka is already running on localhost:9092 +""" +import time +from ducktape.services.service import Service + + +class KafkaClient(Service): + """Kafka client wrapper - assumes external Kafka is running""" + + def __init__(self, context, bootstrap_servers="localhost:9092"): + # Use num_nodes=0 since we're not managing any nodes + super(KafkaClient, self).__init__(context, num_nodes=0) + self.bootstrap_servers_str = bootstrap_servers + + def start_node(self, node): + """No-op since we assume Kafka is already running""" + self.logger.info("Assuming Kafka is already running on %s", self.bootstrap_servers_str) + + def stop_node(self, node): + """No-op since we don't manage the Kafka service""" + self.logger.info("Not stopping external Kafka service") + + def clean_node(self, node): + """No-op since we don't manage any files""" + self.logger.info("No cleanup needed for external Kafka service") + + def bootstrap_servers(self): + """Get bootstrap servers string for clients""" + return self.bootstrap_servers_str + + def verify_connection(self): + """Verify that Kafka is accessible""" + try: + from confluent_kafka.admin import AdminClient + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + + # Try to get cluster metadata to verify connection + metadata = admin_client.list_topics(timeout=10) + self.logger.info("Successfully connected to Kafka. Available topics: %s", + list(metadata.topics.keys())) + return True + except Exception as e: + self.logger.error("Failed to connect to Kafka at %s: %s", self.bootstrap_servers_str, e) + return False + + def create_topic(self, topic, partitions=1, replication_factor=1): + """Create a topic using Kafka admin client""" + try: + from confluent_kafka.admin import AdminClient, NewTopic + + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + + topic_config = NewTopic( + topic=topic, + num_partitions=partitions, + replication_factor=replication_factor + ) + + self.logger.info("Creating topic: %s (partitions=%d, replication=%d)", + topic, partitions, replication_factor) + + # Create topic + fs = admin_client.create_topics([topic_config]) + + # Wait for topic creation to complete + for topic_name, f in fs.items(): + try: + f.result(timeout=30) # Wait up to 30 seconds + self.logger.info("Topic %s created successfully", topic_name) + except Exception as e: + if "already exists" in str(e).lower(): + self.logger.info("Topic %s already exists", topic_name) + else: + self.logger.warning("Failed to create topic %s: %s", topic_name, e) + + except ImportError: + self.logger.error("confluent_kafka not available for topic creation") + except Exception as e: + self.logger.error("Failed to create topic %s: %s", topic, e) + + def list_topics(self): + """List all topics using admin client""" + try: + from confluent_kafka.admin import AdminClient + + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + metadata = admin_client.list_topics(timeout=10) + + topics = list(metadata.topics.keys()) + self.logger.debug("Available topics: %s", topics) + return topics + + except ImportError: + self.logger.error("confluent_kafka not available for listing topics") + return [] + except Exception as e: + self.logger.error("Failed to list topics: %s", e) + return [] diff --git a/tests/ducktape/test_producer.py b/tests/ducktape/test_producer.py new file mode 100644 index 000000000..15adb1bf5 --- /dev/null +++ b/tests/ducktape/test_producer.py @@ -0,0 +1,216 @@ +""" +Ducktape test for Confluent Kafka Python Producer +Assumes Kafka is already running on localhost:9092 +""" +import time +from ducktape.tests.test import Test +from ducktape.mark import matrix, parametrize + +from tests.ducktape.services.kafka import KafkaClient + +try: + from confluent_kafka import Producer, KafkaError +except ImportError: + # Handle case where confluent_kafka is not installed + Producer = None + KafkaError = None + + +class SimpleProducerTest(Test): + """Test basic producer functionality with external Kafka""" + + def __init__(self, test_context): + super(SimpleProducerTest, self).__init__(test_context=test_context) + + # Set up Kafka client (assumes external Kafka running) + self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092") + + def setUp(self): + """Set up test environment""" + self.logger.info("Verifying connection to external Kafka at localhost:9092") + + if not self.kafka.verify_connection(): + raise Exception("Cannot connect to Kafka at localhost:9092. " + "Please ensure Kafka is running.") + + self.logger.info("Successfully connected to Kafka") + + def test_basic_produce(self): + """Test basic message production to a topic""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = "test-topic" + num_messages = 10 + + # Create topic + self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) + + # Wait for topic to be created + time.sleep(2) + + # Verify topic exists + topics = self.kafka.list_topics() + assert topic_name in topics, f"Topic {topic_name} was not created. Available topics: {topics}" + + # Configure producer + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': 'ducktape-test-producer' + } + + self.logger.info("Creating producer with config: %s", producer_config) + producer = Producer(producer_config) + + # Track delivery results + delivered_messages = [] + failed_messages = [] + + def delivery_callback(err, msg): + """Delivery report callback""" + if err is not None: + self.logger.error("Message delivery failed: %s", err) + failed_messages.append(err) + else: + self.logger.debug("Message delivered to %s [%d] at offset %d", + msg.topic(), msg.partition(), msg.offset()) + delivered_messages.append(msg) + + # Produce messages + self.logger.info("Producing %d messages to topic %s", num_messages, topic_name) + for i in range(num_messages): + message_value = f"Test message {i}" + message_key = f"key-{i}" + + producer.produce( + topic=topic_name, + value=message_value, + key=message_key, + callback=delivery_callback + ) + + # Poll to trigger delivery callbacks + producer.poll(0) + + # Flush to ensure all messages are sent + self.logger.info("Flushing producer...") + producer.flush(timeout=30) + + # Verify all messages were delivered + self.logger.info("Delivered: %d, Failed: %d", len(delivered_messages), len(failed_messages)) + + assert len(failed_messages) == 0, f"Some messages failed to deliver: {failed_messages}" + assert len(delivered_messages) == num_messages, \ + f"Expected {num_messages} delivered messages, got {len(delivered_messages)}" + + self.logger.info("Successfully produced %d messages to topic %s", + len(delivered_messages), topic_name) + + @parametrize(num_messages=5) + @parametrize(num_messages=10) + @parametrize(num_messages=50) + def test_produce_multiple_batches(self, num_messages): + """Test producing different numbers of messages""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = f"batch-test-topic-{num_messages}" + + # Create topic + self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) + time.sleep(2) + + # Configure producer + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': f'batch-test-producer-{num_messages}', + 'batch.size': 1000, # Small batch size for testing + 'linger.ms': 100 # Small linger time for testing + } + + producer = Producer(producer_config) + + delivered_count = [0] # Use list to modify from callback + + def delivery_callback(err, msg): + if err is None: + delivered_count[0] += 1 + else: + self.logger.error("Delivery failed: %s", err) + + # Produce messages + self.logger.info("Producing %d messages in batches", num_messages) + for i in range(num_messages): + producer.produce( + topic=topic_name, + value=f"Batch message {i}", + key=f"batch-key-{i % 10}", # Use modulo for key distribution + callback=delivery_callback + ) + + # Poll occasionally + if i % 10 == 0: + producer.poll(0) + + # Final flush + producer.flush(timeout=30) + + # Verify results + assert delivered_count[0] == num_messages, \ + f"Expected {num_messages} delivered, got {delivered_count[0]}" + + self.logger.info("Successfully produced %d messages in batches", delivered_count[0]) + + @matrix(compression_type=['none', 'gzip', 'snappy']) + def test_produce_with_compression(self, compression_type): + """Test producing messages with different compression types""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = f"compression-test-{compression_type}" + + # Create topic + self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) + time.sleep(2) + + # Configure producer with compression + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': f'compression-test-{compression_type}', + 'compression.type': compression_type + } + + producer = Producer(producer_config) + + # Create larger messages to test compression + large_message = "x" * 1000 # 1KB message + delivered_count = [0] + + def delivery_callback(err, msg): + if err is None: + delivered_count[0] += 1 + + # Produce messages + self.logger.info("Producing messages with %s compression", compression_type) + for i in range(20): + producer.produce( + topic=topic_name, + value=f"{large_message}-{i}", + key=f"comp-key-{i}", + callback=delivery_callback + ) + producer.poll(0) + + producer.flush(timeout=30) + + assert delivered_count[0] == 20, \ + f"Expected 20 delivered messages, got {delivered_count[0]}" + + self.logger.info("Successfully produced 20 messages with %s compression", compression_type) + + def tearDown(self): + """Clean up test environment""" + self.logger.info("Test completed - external Kafka service remains running") From f910e5294499369ef7f7813c67f5f4ef288530ab Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 16 Aug 2025 14:27:32 +0530 Subject: [PATCH 2/4] Fix ducktape module resolution --- tests/ducktape/run_ducktape_test.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/tests/ducktape/run_ducktape_test.py b/tests/ducktape/run_ducktape_test.py index 217259266..824b11701 100755 --- a/tests/ducktape/run_ducktape_test.py +++ b/tests/ducktape/run_ducktape_test.py @@ -7,10 +7,7 @@ import subprocess import tempfile import json - -# Add the project root to Python path -project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -sys.path.insert(0, project_root) +import ducktape def create_cluster_config(): """Create a simple cluster configuration for local testing""" @@ -33,14 +30,7 @@ def main(): print("Confluent Kafka Python - Ducktape Producer Test Runner") print("=" * 60) - # Check if ducktape is installed - try: - import ducktape - print(f"Using ducktape version: {ducktape.__version__}") - except ImportError: - print("ERROR: ducktape is not installed.") - print("Install it with: pip install ducktape") - return 1 + print(f"Using ducktape version: {ducktape.__version__}") # Check if confluent_kafka is available try: @@ -51,7 +41,7 @@ def main(): print("Install it with: pip install confluent-kafka") return 1 - # Get test directory and file + # Get test file path (ducktape expects file paths, not module paths) test_dir = os.path.dirname(os.path.abspath(__file__)) test_file = os.path.join(test_dir, "test_producer.py") @@ -67,9 +57,9 @@ def main(): print(f"Results directory: {results_dir}") print() - # Build ducktape command + # Build ducktape command using improved subprocess approach cmd = [ - "ducktape", + sys.executable, "-m", "ducktape", "--debug", # Enable debug output "--results-root", results_dir, "--cluster", "ducktape.cluster.localhost.LocalhostCluster", @@ -88,13 +78,9 @@ def main(): print("Command:", " ".join(cmd)) print() - # Set up environment with proper Python path - env = os.environ.copy() - env['PYTHONPATH'] = project_root - # Run the test try: - result = subprocess.run(cmd, cwd=project_root, env=env) + result = subprocess.run(cmd) return result.returncode except KeyboardInterrupt: print("\nTest interrupted by user") From a86559a500f74db5e6a4e7af6df37b5c6d682aa5 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 16 Aug 2025 14:30:45 +0530 Subject: [PATCH 3/4] Add wait for topic to find topics with timeouts --- tests/ducktape/services/kafka.py | 26 ++++++++++++++++++++++++++ tests/ducktape/test_producer.py | 19 +++++++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/tests/ducktape/services/kafka.py b/tests/ducktape/services/kafka.py index 733fe54f9..55433e98b 100644 --- a/tests/ducktape/services/kafka.py +++ b/tests/ducktape/services/kafka.py @@ -98,3 +98,29 @@ def list_topics(self): except Exception as e: self.logger.error("Failed to list topics: %s", e) return [] + + def wait_for_topic(self, topic_name, max_wait_time=30, initial_wait=0.1): + """ + Wait for topic to be created with exponential backoff retry logic. + """ + wait_time = initial_wait + total_wait = 0 + + self.logger.info("Waiting for topic '%s' to be available...", topic_name) + + while total_wait < max_wait_time: + topics = self.list_topics() + if topic_name in topics: + self.logger.info("Topic '%s' is ready after %.2fs", topic_name, total_wait) + return True + + self.logger.debug("Topic '%s' not found, waiting %.2fs (total: %.2fs)", + topic_name, wait_time, total_wait) + time.sleep(wait_time) + total_wait += wait_time + + # Exponential backoff with max cap of 2 seconds + wait_time = min(wait_time * 2, 2.0) + + self.logger.error("Timeout waiting for topic '%s' after %ds", topic_name, max_wait_time) + return False \ No newline at end of file diff --git a/tests/ducktape/test_producer.py b/tests/ducktape/test_producer.py index 15adb1bf5..e3ab8a03c 100644 --- a/tests/ducktape/test_producer.py +++ b/tests/ducktape/test_producer.py @@ -47,12 +47,9 @@ def test_basic_produce(self): # Create topic self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) - # Wait for topic to be created - time.sleep(2) - - # Verify topic exists - topics = self.kafka.list_topics() - assert topic_name in topics, f"Topic {topic_name} was not created. Available topics: {topics}" + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout. Available topics: {self.kafka.list_topics()}" # Configure producer producer_config = { @@ -120,7 +117,10 @@ def test_produce_multiple_batches(self, num_messages): # Create topic self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) - time.sleep(2) + + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout" # Configure producer producer_config = { @@ -174,7 +174,10 @@ def test_produce_with_compression(self, compression_type): # Create topic self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) - time.sleep(2) + + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout" # Configure producer with compression producer_config = { From adf9490c732659046f4147eb625721f52d9da6df Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Sat, 16 Aug 2025 19:34:12 +0530 Subject: [PATCH 4/4] Convert test to throughput based approach --- tests/ducktape/test_producer.py | 212 ++++++++++++++++++++++---------- 1 file changed, 145 insertions(+), 67 deletions(-) diff --git a/tests/ducktape/test_producer.py b/tests/ducktape/test_producer.py index e3ab8a03c..d7b86b74b 100644 --- a/tests/ducktape/test_producer.py +++ b/tests/ducktape/test_producer.py @@ -36,13 +36,13 @@ def setUp(self): self.logger.info("Successfully connected to Kafka") def test_basic_produce(self): - """Test basic message production to a topic""" + """Test basic message production throughput over 5 seconds (time-based)""" if Producer is None: self.logger.error("confluent_kafka not available, skipping test") return topic_name = "test-topic" - num_messages = 10 + test_duration = 5.0 # 5 seconds # Create topic self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) @@ -70,50 +70,70 @@ def delivery_callback(err, msg): self.logger.error("Message delivery failed: %s", err) failed_messages.append(err) else: - self.logger.debug("Message delivered to %s [%d] at offset %d", - msg.topic(), msg.partition(), msg.offset()) delivered_messages.append(msg) - # Produce messages - self.logger.info("Producing %d messages to topic %s", num_messages, topic_name) - for i in range(num_messages): - message_value = f"Test message {i}" - message_key = f"key-{i}" - - producer.produce( - topic=topic_name, - value=message_value, - key=message_key, - callback=delivery_callback - ) - - # Poll to trigger delivery callbacks - producer.poll(0) + # Time-based message production + self.logger.info("Producing messages for %.1f seconds to topic %s", test_duration, topic_name) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + message_value = f"Test message {messages_sent}" + message_key = f"key-{messages_sent}" + try: + producer.produce( + topic=topic_name, + value=message_value, + key=message_key, + callback=delivery_callback + ) + messages_sent += 1 + + # Poll frequently to trigger delivery callbacks + if messages_sent % 100 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + # Flush to ensure all messages are sent self.logger.info("Flushing producer...") producer.flush(timeout=30) - # Verify all messages were delivered - self.logger.info("Delivered: %d, Failed: %d", len(delivered_messages), len(failed_messages)) + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = len(delivered_messages) / actual_duration - assert len(failed_messages) == 0, f"Some messages failed to deliver: {failed_messages}" - assert len(delivered_messages) == num_messages, \ - f"Expected {num_messages} delivered messages, got {len(delivered_messages)}" - - self.logger.info("Successfully produced %d messages to topic %s", - len(delivered_messages), topic_name) + # Verify results + self.logger.info("Time-based production results:") + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", len(delivered_messages)) + self.logger.info(" Messages failed: %d", len(failed_messages)) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + + # Basic performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert len(delivered_messages) > 0, "No messages were delivered" + assert send_throughput > 10, f"Send throughput too low: {send_throughput:.2f} msg/s (expected > 10 msg/s)" + + self.logger.info("Successfully completed time-based basic production test") - @parametrize(num_messages=5) - @parametrize(num_messages=10) - @parametrize(num_messages=50) - def test_produce_multiple_batches(self, num_messages): - """Test producing different numbers of messages""" + @parametrize(test_duration=2) + @parametrize(test_duration=5) + @parametrize(test_duration=10) + def test_produce_multiple_batches(self, test_duration): + """Test batch throughput over different time durations (time-based)""" if Producer is None: self.logger.error("confluent_kafka not available, skipping test") return - topic_name = f"batch-test-topic-{num_messages}" + topic_name = f"batch-test-topic-{test_duration}s" # Create topic self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) @@ -122,10 +142,10 @@ def test_produce_multiple_batches(self, num_messages): topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) assert topic_ready, f"Topic {topic_name} was not created within timeout" - # Configure producer + # Configure producer with batch settings producer_config = { 'bootstrap.servers': self.kafka.bootstrap_servers(), - 'client.id': f'batch-test-producer-{num_messages}', + 'client.id': f'batch-test-producer-{test_duration}s', 'batch.size': 1000, # Small batch size for testing 'linger.ms': 100 # Small linger time for testing } @@ -140,37 +160,62 @@ def delivery_callback(err, msg): else: self.logger.error("Delivery failed: %s", err) - # Produce messages - self.logger.info("Producing %d messages in batches", num_messages) - for i in range(num_messages): - producer.produce( - topic=topic_name, - value=f"Batch message {i}", - key=f"batch-key-{i % 10}", # Use modulo for key distribution - callback=delivery_callback - ) - - # Poll occasionally - if i % 10 == 0: - producer.poll(0) + # Time-based batch message production + self.logger.info("Producing batches for %d seconds", test_duration) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + try: + producer.produce( + topic=topic_name, + value=f"Batch message {messages_sent}", + key=f"batch-key-{messages_sent % 10}", # Use modulo for key distribution + callback=delivery_callback + ) + messages_sent += 1 + # Poll occasionally to trigger callbacks + if messages_sent % 100 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + # Final flush producer.flush(timeout=30) + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = delivered_count[0] / actual_duration + # Verify results - assert delivered_count[0] == num_messages, \ - f"Expected {num_messages} delivered, got {delivered_count[0]}" + self.logger.info("Batch production results (%ds):", test_duration) + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", delivered_count[0]) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + + # Performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert delivered_count[0] > 0, "No messages were delivered" + assert send_throughput > 10, f"Send throughput too low: {send_throughput:.2f} msg/s" - self.logger.info("Successfully produced %d messages in batches", delivered_count[0]) + self.logger.info("Successfully completed %ds batch production test", test_duration) @matrix(compression_type=['none', 'gzip', 'snappy']) def test_produce_with_compression(self, compression_type): - """Test producing messages with different compression types""" + """Test compression throughput over 5 seconds (time-based)""" if Producer is None: self.logger.error("confluent_kafka not available, skipping test") return topic_name = f"compression-test-{compression_type}" + test_duration = 5.0 # 5 seconds # Create topic self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) @@ -188,31 +233,64 @@ def test_produce_with_compression(self, compression_type): producer = Producer(producer_config) - # Create larger messages to test compression + # Create larger messages to test compression effectiveness large_message = "x" * 1000 # 1KB message delivered_count = [0] def delivery_callback(err, msg): if err is None: delivered_count[0] += 1 + + # Time-based message production with compression + self.logger.info("Producing messages with %s compression for %.1f seconds", compression_type, test_duration) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + try: + producer.produce( + topic=topic_name, + value=f"{large_message}-{messages_sent}", + key=f"comp-key-{messages_sent}", + callback=delivery_callback + ) + messages_sent += 1 - # Produce messages - self.logger.info("Producing messages with %s compression", compression_type) - for i in range(20): - producer.produce( - topic=topic_name, - value=f"{large_message}-{i}", - key=f"comp-key-{i}", - callback=delivery_callback - ) - producer.poll(0) - + # Poll frequently to prevent buffer overflow + if messages_sent % 10 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + producer.flush(timeout=30) - assert delivered_count[0] == 20, \ - f"Expected 20 delivered messages, got {delivered_count[0]}" + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = delivered_count[0] / actual_duration + message_size_kb = len(f"{large_message}-{messages_sent}") / 1024 + throughput_mb_s = (delivery_throughput * message_size_kb) / 1024 + + # Verify results + self.logger.info("Compression production results (%s):", compression_type) + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", delivered_count[0]) + self.logger.info(" Message size: %.1f KB", message_size_kb) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + self.logger.info(" Data throughput: %.2f MB/s", throughput_mb_s) + + # Performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert delivered_count[0] > 0, "No messages were delivered" + assert send_throughput > 5, f"Send throughput too low for {compression_type}: {send_throughput:.2f} msg/s" - self.logger.info("Successfully produced 20 messages with %s compression", compression_type) + self.logger.info("Successfully completed %s compression test", compression_type) + def tearDown(self): """Clean up test environment"""