Skip to content

Consumer compatible with asyncio #1017

@gwaramadze

Description

@gwaramadze

Description

A bit of a context question: what is the most optimal consumption pattern if I have more than one topics, and possibly multiple partitions per topic to be handled by a single application instance? I feel like doing this is an anti-pattern:

...
consumer = Consumer(config)
consumer.subscribe(["topic1", "topic2"])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    elif msg.error():
        print('error: {}'.format(msg.error()))
    elif msg.topic() == "topic1":
        # topic1 logic
    elif msg.topic() == "topic2":
        # topic2 logic
...

Let's assume that the topic1 logic is a lightweight filter/repartition (discard most of the stream, rehash a new key and publish to topic2) and the topic2 logic is an IO. Seems, counterproductive, It's like maintaining a single queue to a grocery and a pharmacy. Now, how to optimize this?

I have read this mind-blowing blog post https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/

I figure that the key-level parallelism is like a holy grail, not available in the Python world as of now. But first things first: a good enough step would be to shuffle each topic to a separate Consumer instance, hopefully with asyncio rather than multiprocessing.

I have read though related issues: #185 and #100
and the famous blog post https://www.confluent.io/blog/kafka-python-asyncio-integration/
and several other resources I cannot comprehend now in my gazillion open browser tabs 😆

I have come up with a snippet of code that I am kindly requesting to review. This generally works in a local environment, I wonder what you think. Does this approach make sense or is it a disaster awaiting the moment I put some serious load there. Thanks in advance.

import asyncio
import functools
import logging
import signal
import sys

import confluent_kafka

signal.signal(signal.SIGTERM, lambda *args: sys.exit())
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

config = {
    "group.id": "consumer-group-name",
    "bootstrap.servers": "localhost:9092",
}


async def consume(config, topic):
    consumer = confluent_kafka.Consumer(config)
    consumer.subscribe([topic])
    loop = asyncio.get_running_loop()
    poll = functools.partial(consumer.poll, 0.1)
    try:
        log.info(f"Starting consumer: {topic}")
        while True:
            message = await loop.run_in_executor(None, poll)
            if message is None:
                continue
            if message.error():
                log.error("Consumer error: {}".format(msg.error()))
                continue
            # TODO: Inject topic-specific logic here
            log.info(f"Consuming message: {message.value()}")
    finally:
        log.info(f"Closing consumer: {topic}")
        consumer.close()


consume = functools.partial(consume, config)


async def main():
    await asyncio.gather(
        consume("topic1"),
        consume("topic2"),
    )

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, SystemExit):
        log.info("Application shutdown complete")

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('1.5.0', 17104896) and ('1.5.0', 17105151), respectively
  • Apache Kafka broker version: confluentinc/cp-kafka:6.0.0 locally and Confluent Cloud clusters on production
  • Client configuration: very minimal for starters, available in the snippet
  • Operating system: Python 3.8.5 on MacOS, but production apps usually run on Buster images
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

Labels

code:pythonIssues that are specific to Python or versions of Python independent of library logiccomponent:consumerIssues tied specifically to consumer logic or code pathsenhancementRequesting a feature changepriority:highMaintainer triage tag for indicating high impact or criticality issuesquestionA question about how to use or about expected behavior of the librarysize:largeMaintainer triage tag for indicating change required is particularly large or complex

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions