-
Notifications
You must be signed in to change notification settings - Fork 925
Initialize ducktape setup #2021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a new Ducktape-based integration test suite for the Confluent Kafka Python client, providing automated producer testing capabilities against an external Kafka instance running on localhost:9092.
- Adds comprehensive test infrastructure with a test runner script and Kafka client wrapper service
- Implements producer test cases covering basic message production, batch processing, and compression scenarios
- Provides documentation and setup instructions for running the integration tests
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
tests/ducktape/test_producer.py | Core producer test class with basic, parameterized batch, and matrix compression tests |
tests/ducktape/services/kafka.py | Kafka client wrapper service for interacting with external Kafka instance |
tests/ducktape/services/init.py | Package initialization for ducktape services |
tests/ducktape/run_ducktape_test.py | Test runner script with environment setup and command execution |
tests/ducktape/init.py | Package initialization for ducktape tests |
tests/ducktape/README.md | Documentation explaining prerequisites, usage, and available test cases |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
# Verify topic exists | ||
topics = self.kafka.list_topics() | ||
assert topic_name in topics, f"Topic {topic_name} was not created. Available topics: {topics}" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a fixed sleep time is unreliable and can cause flaky tests. Consider implementing a retry mechanism or polling for topic existence instead of a hard-coded sleep.
# Wait for topic to be created by polling | |
timeout = 10 # seconds | |
poll_interval = 0.5 # seconds | |
start_time = time.time() | |
while True: | |
topics = self.kafka.list_topics() | |
if topic_name in topics: | |
break | |
if time.time() - start_time > timeout: | |
raise AssertionError(f"Topic {topic_name} was not created within {timeout} seconds. Available topics: {topics}") | |
time.sleep(poll_interval) | |
Copilot uses AI. Check for mistakes.
tests/ducktape/test_producer.py
Outdated
|
||
# Create topic | ||
self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) | ||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a fixed sleep time is unreliable and can cause flaky tests. Consider implementing a retry mechanism or polling for topic existence instead of a hard-coded sleep.
time.sleep(2) | |
# Poll for topic existence instead of fixed sleep | |
timeout = 30 # seconds | |
poll_interval = 0.5 # seconds | |
start_time = time.time() | |
while True: | |
if self.kafka.topic_exists(topic_name): | |
break | |
if time.time() - start_time > timeout: | |
raise Exception(f"Topic {topic_name} did not become available within {timeout} seconds") | |
time.sleep(poll_interval) |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overkill copilot suggestion
tests/ducktape/test_producer.py
Outdated
|
||
# Create topic | ||
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1) | ||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a fixed sleep time is unreliable and can cause flaky tests. Consider implementing a retry mechanism or polling for topic existence instead of a hard-coded sleep.
time.sleep(2) | |
self._wait_for_topic(topic_name, timeout=30) |
Copilot uses AI. Check for mistakes.
# Wait for topic creation to complete | ||
for topic_name, f in fs.items(): | ||
try: | ||
f.result(timeout=30) # Wait up to 30 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The 30-second timeout for topic creation might be too long for test environments. Consider using a shorter timeout or making it configurable.
f.result(timeout=30) # Wait up to 30 seconds | |
f.result(timeout=self.topic_creation_timeout) # Wait up to configured seconds |
Copilot uses AI. Check for mistakes.
tests/ducktape/run_ducktape_test.py
Outdated
import json | ||
|
||
# Add the project root to Python path | ||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using multiple os.path.dirname()
calls makes the path calculation fragile and hard to understand. Consider using os.path.join()
with relative paths or pathlib.Path
for more robust path handling.
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
from pathlib import Path | |
# Add the project root to Python path | |
project_root = str(Path(__file__).resolve().parents[2]) |
Copilot uses AI. Check for mistakes.
|
||
self.logger.info("Successfully connected to Kafka") | ||
|
||
def test_basic_produce(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test_produce_with_schema_registry()
to we can benchmark test with SR 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also do this as a follow-up
tests/ducktape/run_ducktape_test.py
Outdated
import json | ||
|
||
# Add the project root to Python path | ||
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary given you have an init file to include this in the packaging such that python -m tests.ducktape.run_ducktape_test
should work with all the modules loaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If need be we can move the integration tests within the module scope if necessary for path loading as an alternative
tests/ducktape/run_ducktape_test.py
Outdated
|
||
# Check if ducktape is installed | ||
try: | ||
import ducktape |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just move this to top of file and we can make it a dev dependency, no need for import catches
|
||
self.logger.info("Successfully connected to Kafka") | ||
|
||
def test_basic_produce(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can also do this as a follow-up
tests/ducktape/run_ducktape_test.py
Outdated
print() | ||
|
||
# Set up environment with proper Python path | ||
env = os.environ.copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use a python -m command below we probably don't need this either
tests/ducktape/test_producer.py
Outdated
self.logger.info("Successfully produced %d messages to topic %s", | ||
len(delivered_messages), topic_name) | ||
|
||
@parametrize(num_messages=5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll probably want to do time based max throughput rather than time to complete N message, but this is fine for first PR setup
tests/ducktape/test_producer.py
Outdated
self.logger.info("Delivered: %d, Failed: %d", len(delivered_messages), len(failed_messages)) | ||
|
||
assert len(failed_messages) == 0, f"Some messages failed to deliver: {failed_messages}" | ||
assert len(delivered_messages) == num_messages, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO we'll want to extract execution metrics and assert against bounds therein. A follow-up effort
tests/ducktape/test_producer.py
Outdated
|
||
# Create topic | ||
self.kafka.create_topic(topic_name, partitions=2, replication_factor=1) | ||
time.sleep(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overkill copilot suggestion
@@ -98,3 +98,29 @@ def list_topics(self): | |||
except Exception as e: | |||
self.logger.error("Failed to list topics: %s", e) | |||
return [] | |||
|
|||
def wait_for_topic(self, topic_name, max_wait_time=30, initial_wait=0.1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably overkill out of the copilot recommendation, but nice to have the abstraction
start_time = time.time() | ||
messages_sent = 0 | ||
|
||
while time.time() - start_time < test_duration: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do a follow-up later on which pulls this into an async context so we don't need to write the while try loop in each test.
Summary
This pull request introduces a new Ducktape-based integration test suite for the Confluent Kafka Python client, enabling automated producer tests against a locally running Kafka instance.
Ducktape test infrastructure and runner:
run_ducktape_test.py
script to automate running Ducktape producer tests, with support for running all or specific test cases, environment setup, and result directory management.services/kafka.py
with aKafkaClient
wrapper to interact with an external Kafka instance, including topic management and connection verification.Producer test cases:
test_producer.py
with theSimpleProducerTest
class, covering basic message production, parameterized batch message tests, and matrix tests for different compression types.Documentation and structure:
README.md
) explaining prerequisites, usage, and available test cases for the Ducktape producer tests.