Skip to content

Initialize ducktape setup #2021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions tests/ducktape/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Ducktape Producer Tests

Ducktape-based producer tests for the Confluent Kafka Python client.

## Prerequisites

- `pip install ducktape confluent-kafka`
- Kafka running on `localhost:9092`

## Running Tests

```bash
# Run all tests
./tests/ducktape/run_ducktape_test.py

# Run specific test
./tests/ducktape/run_ducktape_test.py SimpleProducerTest.test_basic_produce
```

## Test Cases

- **test_basic_produce**: Basic message production with callbacks
- **test_produce_multiple_batches**: Parameterized tests (5, 10, 50 messages)
- **test_produce_with_compression**: Matrix tests (none, gzip, snappy)
1 change: 1 addition & 0 deletions tests/ducktape/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Ducktape tests for Confluent Kafka Python
93 changes: 93 additions & 0 deletions tests/ducktape/run_ducktape_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""
Simple test runner for Ducktape Producer tests
"""
import sys
import os
import subprocess
import tempfile
import json
import ducktape

def create_cluster_config():
"""Create a simple cluster configuration for local testing"""
cluster_config = {
"cluster_type": "ducktape.cluster.localhost.LocalhostCluster",
"num_nodes": 3
}
return cluster_config

def create_test_config():
"""Create test configuration file"""
config = {
"ducktape_dir": os.path.dirname(os.path.abspath(__file__)),
"results_dir": os.path.join(tempfile.gettempdir(), "ducktape_results")
}
return config

def main():
"""Run the ducktape producer test"""
print("Confluent Kafka Python - Ducktape Producer Test Runner")
print("=" * 60)

print(f"Using ducktape version: {ducktape.__version__}")

# Check if confluent_kafka is available
try:
import confluent_kafka
print(f"Using confluent-kafka version: {confluent_kafka.version()}")
except ImportError:
print("ERROR: confluent_kafka is not installed.")
print("Install it with: pip install confluent-kafka")
return 1

# Get test file path (ducktape expects file paths, not module paths)
test_dir = os.path.dirname(os.path.abspath(__file__))
test_file = os.path.join(test_dir, "test_producer.py")

if not os.path.exists(test_file):
print(f"ERROR: Test file not found: {test_file}")
return 1

# Create results directory
results_dir = os.path.join(tempfile.gettempdir(), "ducktape_results")
os.makedirs(results_dir, exist_ok=True)

print(f"Test file: {test_file}")
print(f"Results directory: {results_dir}")
print()

# Build ducktape command using improved subprocess approach
cmd = [
sys.executable, "-m", "ducktape",
"--debug", # Enable debug output
"--results-root", results_dir,
"--cluster", "ducktape.cluster.localhost.LocalhostCluster",
"--default-num-nodes", "1", # Reduced since we don't need nodes for services
test_file
]

# Add specific test if provided as argument
if len(sys.argv) > 1:
test_method = sys.argv[1]
cmd[-1] = f"{test_file}::{test_method}"
print(f"Running specific test: {test_method}")
else:
print("Running all producer tests")

print("Command:", " ".join(cmd))
print()

# Run the test
try:
result = subprocess.run(cmd)
return result.returncode
except KeyboardInterrupt:
print("\nTest interrupted by user")
return 1
except Exception as e:
print(f"Error running test: {e}")
return 1

if __name__ == "__main__":
sys.exit(main())
1 change: 1 addition & 0 deletions tests/ducktape/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Ducktape services for Confluent Kafka Python testing
126 changes: 126 additions & 0 deletions tests/ducktape/services/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
Kafka client wrapper for Ducktape tests
Assumes Kafka is already running on localhost:9092
"""
import time
from ducktape.services.service import Service


class KafkaClient(Service):
"""Kafka client wrapper - assumes external Kafka is running"""

def __init__(self, context, bootstrap_servers="localhost:9092"):
# Use num_nodes=0 since we're not managing any nodes
super(KafkaClient, self).__init__(context, num_nodes=0)
self.bootstrap_servers_str = bootstrap_servers

def start_node(self, node):
"""No-op since we assume Kafka is already running"""
self.logger.info("Assuming Kafka is already running on %s", self.bootstrap_servers_str)

def stop_node(self, node):
"""No-op since we don't manage the Kafka service"""
self.logger.info("Not stopping external Kafka service")

def clean_node(self, node):
"""No-op since we don't manage any files"""
self.logger.info("No cleanup needed for external Kafka service")

def bootstrap_servers(self):
"""Get bootstrap servers string for clients"""
return self.bootstrap_servers_str

def verify_connection(self):
"""Verify that Kafka is accessible"""
try:
from confluent_kafka.admin import AdminClient
admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})

# Try to get cluster metadata to verify connection
metadata = admin_client.list_topics(timeout=10)
self.logger.info("Successfully connected to Kafka. Available topics: %s",
list(metadata.topics.keys()))
return True
except Exception as e:
self.logger.error("Failed to connect to Kafka at %s: %s", self.bootstrap_servers_str, e)
return False

def create_topic(self, topic, partitions=1, replication_factor=1):
"""Create a topic using Kafka admin client"""
try:
from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})

topic_config = NewTopic(
topic=topic,
num_partitions=partitions,
replication_factor=replication_factor
)

self.logger.info("Creating topic: %s (partitions=%d, replication=%d)",
topic, partitions, replication_factor)

# Create topic
fs = admin_client.create_topics([topic_config])

# Wait for topic creation to complete
for topic_name, f in fs.items():
try:
f.result(timeout=30) # Wait up to 30 seconds
Copy link
Preview

Copilot AI Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The 30-second timeout for topic creation might be too long for test environments. Consider using a shorter timeout or making it configurable.

Suggested change
f.result(timeout=30) # Wait up to 30 seconds
f.result(timeout=self.topic_creation_timeout) # Wait up to configured seconds

Copilot uses AI. Check for mistakes.

self.logger.info("Topic %s created successfully", topic_name)
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info("Topic %s already exists", topic_name)
else:
self.logger.warning("Failed to create topic %s: %s", topic_name, e)

except ImportError:
self.logger.error("confluent_kafka not available for topic creation")
except Exception as e:
self.logger.error("Failed to create topic %s: %s", topic, e)

def list_topics(self):
"""List all topics using admin client"""
try:
from confluent_kafka.admin import AdminClient

admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})
metadata = admin_client.list_topics(timeout=10)

topics = list(metadata.topics.keys())
self.logger.debug("Available topics: %s", topics)
return topics

except ImportError:
self.logger.error("confluent_kafka not available for listing topics")
return []
except Exception as e:
self.logger.error("Failed to list topics: %s", e)
return []

def wait_for_topic(self, topic_name, max_wait_time=30, initial_wait=0.1):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably overkill out of the copilot recommendation, but nice to have the abstraction

"""
Wait for topic to be created with exponential backoff retry logic.
"""
wait_time = initial_wait
total_wait = 0

self.logger.info("Waiting for topic '%s' to be available...", topic_name)

while total_wait < max_wait_time:
topics = self.list_topics()
if topic_name in topics:
self.logger.info("Topic '%s' is ready after %.2fs", topic_name, total_wait)
return True

self.logger.debug("Topic '%s' not found, waiting %.2fs (total: %.2fs)",
topic_name, wait_time, total_wait)
time.sleep(wait_time)
total_wait += wait_time

# Exponential backoff with max cap of 2 seconds
wait_time = min(wait_time * 2, 2.0)

self.logger.error("Timeout waiting for topic '%s' after %ds", topic_name, max_wait_time)
return False
Loading