diff --git a/tests/ducktape/README.md b/tests/ducktape/README.md new file mode 100644 index 000000000..0925aec2a --- /dev/null +++ b/tests/ducktape/README.md @@ -0,0 +1,24 @@ +# Ducktape Producer Tests + +Ducktape-based producer tests for the Confluent Kafka Python client. + +## Prerequisites + +- `pip install ducktape confluent-kafka` +- Kafka running on `localhost:9092` + +## Running Tests + +```bash +# Run all tests +./tests/ducktape/run_ducktape_test.py + +# Run specific test +./tests/ducktape/run_ducktape_test.py SimpleProducerTest.test_basic_produce +``` + +## Test Cases + +- **test_basic_produce**: Basic message production with callbacks +- **test_produce_multiple_batches**: Parameterized tests (5, 10, 50 messages) +- **test_produce_with_compression**: Matrix tests (none, gzip, snappy) \ No newline at end of file diff --git a/tests/ducktape/__init__.py b/tests/ducktape/__init__.py new file mode 100644 index 000000000..e4e265020 --- /dev/null +++ b/tests/ducktape/__init__.py @@ -0,0 +1 @@ +# Ducktape tests for Confluent Kafka Python diff --git a/tests/ducktape/run_ducktape_test.py b/tests/ducktape/run_ducktape_test.py new file mode 100755 index 000000000..824b11701 --- /dev/null +++ b/tests/ducktape/run_ducktape_test.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +""" +Simple test runner for Ducktape Producer tests +""" +import sys +import os +import subprocess +import tempfile +import json +import ducktape + +def create_cluster_config(): + """Create a simple cluster configuration for local testing""" + cluster_config = { + "cluster_type": "ducktape.cluster.localhost.LocalhostCluster", + "num_nodes": 3 + } + return cluster_config + +def create_test_config(): + """Create test configuration file""" + config = { + "ducktape_dir": os.path.dirname(os.path.abspath(__file__)), + "results_dir": os.path.join(tempfile.gettempdir(), "ducktape_results") + } + return config + +def main(): + """Run the ducktape producer test""" + print("Confluent Kafka Python - Ducktape Producer Test Runner") + print("=" * 60) + + print(f"Using ducktape version: {ducktape.__version__}") + + # Check if confluent_kafka is available + try: + import confluent_kafka + print(f"Using confluent-kafka version: {confluent_kafka.version()}") + except ImportError: + print("ERROR: confluent_kafka is not installed.") + print("Install it with: pip install confluent-kafka") + return 1 + + # Get test file path (ducktape expects file paths, not module paths) + test_dir = os.path.dirname(os.path.abspath(__file__)) + test_file = os.path.join(test_dir, "test_producer.py") + + if not os.path.exists(test_file): + print(f"ERROR: Test file not found: {test_file}") + return 1 + + # Create results directory + results_dir = os.path.join(tempfile.gettempdir(), "ducktape_results") + os.makedirs(results_dir, exist_ok=True) + + print(f"Test file: {test_file}") + print(f"Results directory: {results_dir}") + print() + + # Build ducktape command using improved subprocess approach + cmd = [ + sys.executable, "-m", "ducktape", + "--debug", # Enable debug output + "--results-root", results_dir, + "--cluster", "ducktape.cluster.localhost.LocalhostCluster", + "--default-num-nodes", "1", # Reduced since we don't need nodes for services + test_file + ] + + # Add specific test if provided as argument + if len(sys.argv) > 1: + test_method = sys.argv[1] + cmd[-1] = f"{test_file}::{test_method}" + print(f"Running specific test: {test_method}") + else: + print("Running all producer tests") + + print("Command:", " ".join(cmd)) + print() + + # Run the test + try: + result = subprocess.run(cmd) + return result.returncode + except KeyboardInterrupt: + print("\nTest interrupted by user") + return 1 + except Exception as e: + print(f"Error running test: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/ducktape/services/__init__.py b/tests/ducktape/services/__init__.py new file mode 100644 index 000000000..63f18ed42 --- /dev/null +++ b/tests/ducktape/services/__init__.py @@ -0,0 +1 @@ +# Ducktape services for Confluent Kafka Python testing diff --git a/tests/ducktape/services/kafka.py b/tests/ducktape/services/kafka.py new file mode 100644 index 000000000..55433e98b --- /dev/null +++ b/tests/ducktape/services/kafka.py @@ -0,0 +1,126 @@ +""" +Kafka client wrapper for Ducktape tests +Assumes Kafka is already running on localhost:9092 +""" +import time +from ducktape.services.service import Service + + +class KafkaClient(Service): + """Kafka client wrapper - assumes external Kafka is running""" + + def __init__(self, context, bootstrap_servers="localhost:9092"): + # Use num_nodes=0 since we're not managing any nodes + super(KafkaClient, self).__init__(context, num_nodes=0) + self.bootstrap_servers_str = bootstrap_servers + + def start_node(self, node): + """No-op since we assume Kafka is already running""" + self.logger.info("Assuming Kafka is already running on %s", self.bootstrap_servers_str) + + def stop_node(self, node): + """No-op since we don't manage the Kafka service""" + self.logger.info("Not stopping external Kafka service") + + def clean_node(self, node): + """No-op since we don't manage any files""" + self.logger.info("No cleanup needed for external Kafka service") + + def bootstrap_servers(self): + """Get bootstrap servers string for clients""" + return self.bootstrap_servers_str + + def verify_connection(self): + """Verify that Kafka is accessible""" + try: + from confluent_kafka.admin import AdminClient + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + + # Try to get cluster metadata to verify connection + metadata = admin_client.list_topics(timeout=10) + self.logger.info("Successfully connected to Kafka. Available topics: %s", + list(metadata.topics.keys())) + return True + except Exception as e: + self.logger.error("Failed to connect to Kafka at %s: %s", self.bootstrap_servers_str, e) + return False + + def create_topic(self, topic, partitions=1, replication_factor=1): + """Create a topic using Kafka admin client""" + try: + from confluent_kafka.admin import AdminClient, NewTopic + + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + + topic_config = NewTopic( + topic=topic, + num_partitions=partitions, + replication_factor=replication_factor + ) + + self.logger.info("Creating topic: %s (partitions=%d, replication=%d)", + topic, partitions, replication_factor) + + # Create topic + fs = admin_client.create_topics([topic_config]) + + # Wait for topic creation to complete + for topic_name, f in fs.items(): + try: + f.result(timeout=30) # Wait up to 30 seconds + self.logger.info("Topic %s created successfully", topic_name) + except Exception as e: + if "already exists" in str(e).lower(): + self.logger.info("Topic %s already exists", topic_name) + else: + self.logger.warning("Failed to create topic %s: %s", topic_name, e) + + except ImportError: + self.logger.error("confluent_kafka not available for topic creation") + except Exception as e: + self.logger.error("Failed to create topic %s: %s", topic, e) + + def list_topics(self): + """List all topics using admin client""" + try: + from confluent_kafka.admin import AdminClient + + admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str}) + metadata = admin_client.list_topics(timeout=10) + + topics = list(metadata.topics.keys()) + self.logger.debug("Available topics: %s", topics) + return topics + + except ImportError: + self.logger.error("confluent_kafka not available for listing topics") + return [] + except Exception as e: + self.logger.error("Failed to list topics: %s", e) + return [] + + def wait_for_topic(self, topic_name, max_wait_time=30, initial_wait=0.1): + """ + Wait for topic to be created with exponential backoff retry logic. + """ + wait_time = initial_wait + total_wait = 0 + + self.logger.info("Waiting for topic '%s' to be available...", topic_name) + + while total_wait < max_wait_time: + topics = self.list_topics() + if topic_name in topics: + self.logger.info("Topic '%s' is ready after %.2fs", topic_name, total_wait) + return True + + self.logger.debug("Topic '%s' not found, waiting %.2fs (total: %.2fs)", + topic_name, wait_time, total_wait) + time.sleep(wait_time) + total_wait += wait_time + + # Exponential backoff with max cap of 2 seconds + wait_time = min(wait_time * 2, 2.0) + + self.logger.error("Timeout waiting for topic '%s' after %ds", topic_name, max_wait_time) + return False \ No newline at end of file diff --git a/tests/ducktape/test_producer.py b/tests/ducktape/test_producer.py new file mode 100644 index 000000000..d7b86b74b --- /dev/null +++ b/tests/ducktape/test_producer.py @@ -0,0 +1,297 @@ +""" +Ducktape test for Confluent Kafka Python Producer +Assumes Kafka is already running on localhost:9092 +""" +import time +from ducktape.tests.test import Test +from ducktape.mark import matrix, parametrize + +from tests.ducktape.services.kafka import KafkaClient + +try: + from confluent_kafka import Producer, KafkaError +except ImportError: + # Handle case where confluent_kafka is not installed + Producer = None + KafkaError = None + + +class SimpleProducerTest(Test): + """Test basic producer functionality with external Kafka""" + + def __init__(self, test_context): + super(SimpleProducerTest, self).__init__(test_context=test_context) + + # Set up Kafka client (assumes external Kafka running) + self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092") + + def setUp(self): + """Set up test environment""" + self.logger.info("Verifying connection to external Kafka at localhost:9092") + + if not self.kafka.verify_connection(): + raise Exception("Cannot connect to Kafka at localhost:9092. " + "Please ensure Kafka is running.") + + self.logger.info("Successfully connected to Kafka") + + def test_basic_produce(self): + """Test basic message production throughput over 5 seconds (time-based)""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = "test-topic" + test_duration = 5.0 # 5 seconds + + # Create topic + self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) + + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout. Available topics: {self.kafka.list_topics()}" + + # Configure producer + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': 'ducktape-test-producer' + } + + self.logger.info("Creating producer with config: %s", producer_config) + producer = Producer(producer_config) + + # Track delivery results + delivered_messages = [] + failed_messages = [] + + def delivery_callback(err, msg): + """Delivery report callback""" + if err is not None: + self.logger.error("Message delivery failed: %s", err) + failed_messages.append(err) + else: + delivered_messages.append(msg) + + # Time-based message production + self.logger.info("Producing messages for %.1f seconds to topic %s", test_duration, topic_name) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + message_value = f"Test message {messages_sent}" + message_key = f"key-{messages_sent}" + + try: + producer.produce( + topic=topic_name, + value=message_value, + key=message_key, + callback=delivery_callback + ) + messages_sent += 1 + + # Poll frequently to trigger delivery callbacks + if messages_sent % 100 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + + # Flush to ensure all messages are sent + self.logger.info("Flushing producer...") + producer.flush(timeout=30) + + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = len(delivered_messages) / actual_duration + + # Verify results + self.logger.info("Time-based production results:") + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", len(delivered_messages)) + self.logger.info(" Messages failed: %d", len(failed_messages)) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + + # Basic performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert len(delivered_messages) > 0, "No messages were delivered" + assert send_throughput > 10, f"Send throughput too low: {send_throughput:.2f} msg/s (expected > 10 msg/s)" + + self.logger.info("Successfully completed time-based basic production test") + + @parametrize(test_duration=2) + @parametrize(test_duration=5) + @parametrize(test_duration=10) + def test_produce_multiple_batches(self, test_duration): + """Test batch throughput over different time durations (time-based)""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = f"batch-test-topic-{test_duration}s" + + # Create topic + self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) + + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout" + + # Configure producer with batch settings + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': f'batch-test-producer-{test_duration}s', + 'batch.size': 1000, # Small batch size for testing + 'linger.ms': 100 # Small linger time for testing + } + + producer = Producer(producer_config) + + delivered_count = [0] # Use list to modify from callback + + def delivery_callback(err, msg): + if err is None: + delivered_count[0] += 1 + else: + self.logger.error("Delivery failed: %s", err) + + # Time-based batch message production + self.logger.info("Producing batches for %d seconds", test_duration) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + try: + producer.produce( + topic=topic_name, + value=f"Batch message {messages_sent}", + key=f"batch-key-{messages_sent % 10}", # Use modulo for key distribution + callback=delivery_callback + ) + messages_sent += 1 + + # Poll occasionally to trigger callbacks + if messages_sent % 100 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + + # Final flush + producer.flush(timeout=30) + + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = delivered_count[0] / actual_duration + + # Verify results + self.logger.info("Batch production results (%ds):", test_duration) + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", delivered_count[0]) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + + # Performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert delivered_count[0] > 0, "No messages were delivered" + assert send_throughput > 10, f"Send throughput too low: {send_throughput:.2f} msg/s" + + self.logger.info("Successfully completed %ds batch production test", test_duration) + + @matrix(compression_type=['none', 'gzip', 'snappy']) + def test_produce_with_compression(self, compression_type): + """Test compression throughput over 5 seconds (time-based)""" + if Producer is None: + self.logger.error("confluent_kafka not available, skipping test") + return + + topic_name = f"compression-test-{compression_type}" + test_duration = 5.0 # 5 seconds + + # Create topic + self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) + + # Wait for topic to be available with retry logic + topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30) + assert topic_ready, f"Topic {topic_name} was not created within timeout" + + # Configure producer with compression + producer_config = { + 'bootstrap.servers': self.kafka.bootstrap_servers(), + 'client.id': f'compression-test-{compression_type}', + 'compression.type': compression_type + } + + producer = Producer(producer_config) + + # Create larger messages to test compression effectiveness + large_message = "x" * 1000 # 1KB message + delivered_count = [0] + + def delivery_callback(err, msg): + if err is None: + delivered_count[0] += 1 + + # Time-based message production with compression + self.logger.info("Producing messages with %s compression for %.1f seconds", compression_type, test_duration) + start_time = time.time() + messages_sent = 0 + + while time.time() - start_time < test_duration: + try: + producer.produce( + topic=topic_name, + value=f"{large_message}-{messages_sent}", + key=f"comp-key-{messages_sent}", + callback=delivery_callback + ) + messages_sent += 1 + + # Poll frequently to prevent buffer overflow + if messages_sent % 10 == 0: + producer.poll(0) + except BufferError: + # If buffer is full, poll and wait briefly + producer.poll(0.001) + continue + + actual_duration = time.time() - start_time + + producer.flush(timeout=30) + + # Calculate throughput + send_throughput = messages_sent / actual_duration + delivery_throughput = delivered_count[0] / actual_duration + message_size_kb = len(f"{large_message}-{messages_sent}") / 1024 + throughput_mb_s = (delivery_throughput * message_size_kb) / 1024 + + # Verify results + self.logger.info("Compression production results (%s):", compression_type) + self.logger.info(" Duration: %.2f seconds", actual_duration) + self.logger.info(" Messages sent: %d", messages_sent) + self.logger.info(" Messages delivered: %d", delivered_count[0]) + self.logger.info(" Message size: %.1f KB", message_size_kb) + self.logger.info(" Send throughput: %.2f msg/s", send_throughput) + self.logger.info(" Delivery throughput: %.2f msg/s", delivery_throughput) + self.logger.info(" Data throughput: %.2f MB/s", throughput_mb_s) + + # Performance assertions + assert messages_sent > 0, "No messages were sent during test duration" + assert delivered_count[0] > 0, "No messages were delivered" + assert send_throughput > 5, f"Send throughput too low for {compression_type}: {send_throughput:.2f} msg/s" + + self.logger.info("Successfully completed %s compression test", compression_type) + + + def tearDown(self): + """Clean up test environment""" + self.logger.info("Test completed - external Kafka service remains running")