Skip to content

Add blocking mode for topic subscription #854

Open
@spenczar

Description

@spenczar

Description

I'm working on short-lived command line scripts which interact with a Kafka broker. For example, one script reads a number of messages out of Kafka, changes a few of the serialized fields, and inserts them back into a new topic.

This is hard to do with confluent_kafka because it uses an asynchronous style, designed for long-term consistency. In particular, I'd like to have a script that looks like this:

client = confluent_kafka.Consumer(config)
client.assign(confluent_kafka.TopicPartion(topic=t, partition=0, offset=confluent_kafka.OFFSET_BEGINNING)
while True:
    messages = client.consume(100, timeout=1)
    if len(messages) == 0: break # We have reached the end of the topic partition.
    handle(messages)

But this doesn't work because client.assign returns immediately - before we have actually managed to assign, and so client.consume returns an empty list. This took me a few hours to understand. At a minimum, I wish the documentation were clearer about this.

There is maybe a way to do this with callbacks, but it gets hairy. A far nicer API would be to have client.assign(..., blocking=True). Analogous stuff for the subscribe method would be great. This mode would block until assignment either succeeds or fails, and if it fails, it should raise an error.

I think this would be helpful when constructing the consumer or producer too. Right now, I have trouble knowing whether I have put in the right broker URL because the consumer or producer will retry connecting indefinitely without a loud error message.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
>>> import confluent_kafka
>>> confluent_kafka.libversion()
('1.4.0', 17039615)
>>> confluent_kafka.version()
('1.4.1', 17039616)

Metadata

Metadata

Assignees

No one assigned

    Labels

    component:consumerIssues tied specifically to consumer logic or code pathsenhancementRequesting a feature change

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions