Skip to content

multiprocessing Python Kafka consumer client not getting back messages #925

@zclai

Description

@zclai

Description

a Python consumer client worked fine when ran as standalone but not able to retrieve message when run as a multiprocessing worker with same configuration.

It looked like the consumers running as multiprocessing workers were not able join the group because the cluster was constantly being rebalanced.
server log did not show any abnormal messages.

I first observed this behavior by running the consumer workers connecting to a 5-node Kafka cluster using GSSAPI. to isolate the problem and simplify troubleshooting. I now run the consumer workers connecting a single-node Kafka server without security features.

I even tried setting static group.instance.id for each worker, set long (6 minutes) session.timeout.ms and long (6 minutes) max.poll.interval.ms in a effort disable rebalancing, but that did not help, either.
Also, if I start only 1 consumer worker via multiprocessing, it still doesn't work.

It seems Kafka is not compatible with Python multiprocessing library.
Really appreciate any help in diagnosing this issue.

Client conf properties are the simplest:

                                    "bootstrap.servers" : "localhost:9092",
                                    "group.id": "python_example_group_2",
                                    "auto.offset.reset": "earliest"

How to reproduce

The worker basically looks like:

from multiprocessing import Process
...
class saListener(Process):
     def   __init__(self, n)
            self.ClientName = "saListener-" + str(n)
            ...
            self.cons = DeserializingConsumer(conf)
            Process.__init__(self)

    def connect(self):
            self.cons.subscribe([self.topic])

    def run(self):
            while True:
                            msg = self.cons.poll(2.0)
                            if msg is None:
                                    print(self.ClientName + ":Waiting for message or event/error in poll()")
                                    continue

The controller look like:

    for n in range(4):
                    lnr_instance = saListener(n)
                    lnr_instance.connect()
                    lnr_instance.start()

The debug output:

%7|1596816914.717|JOIN|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2": join with 1 (1) subscribed topic(s)
%7|1596816914.717|CGRPMETADATA|rdkafka#consumer-3| [thrd:main]: consumer join: metadata for subscription is up to date (1995ms old)
%7|1596816914.717|JOIN|rdkafka#consumer-3| [thrd:main]: localhost:9092/0: Joining group "python_example_group_2" with 1 subscribed topic(s)
%7|1596816914.717|CGRPJOINSTATE|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2" changed join state init -> wait-join (v1, state up)
%7|1596816914.718|SEND|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 208 bytes @ 0, CorrId 3)
%7|1596816914.718|BROADCAST|rdkafka#consumer-3| [thrd:GroupCoordinator]: Broadcasting state change
%7|1596816914.721|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/0: Received HeartbeatResponse (v3, 6 bytes, CorrId 7, rtt 3.65ms)
%7|1596816914.723|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: Group "python_example_group_2" heartbeat error response in state up (join state wait-assign-rebalance_cb, 0 partition(s) assigned): Broker: Group rebalance in progress
%7|1596816914.723|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "python_example_group_2" is rebalancing in state up (join-state wait-assign-rebalance_cb) without assignment: group is rebalancing
%7|1596816914.724|RECV|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 64 bytes, CorrId 3, rtt 5.86ms)
%7|1596816914.724|JOINGROUP|rdkafka#consumer-3| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId rdkafka-e06cc2f9-bbdd-4a43-a204-d8d73eccce74, 0 members in group: Broker: Group member needs a valid member ID
%7|1596816914.724|REQERR|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore
%7|1596816914.724|MEMBERID|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2": updating member id "" -> "rdkafka-e06cc2f9-bbdd-4a43-a204-d8d73eccce74"
%7|1596816914.724|CGRPJOINSTATE|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2" changed join state wait-join -> init (v1, state up)
%7|1596816914.724|JOIN|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2": join with 1 (1) subscribed topic(s)
%7|1596816914.724|CGRPMETADATA|rdkafka#consumer-3| [thrd:main]: consumer join: metadata for subscription is up to date (2002ms old)
%7|1596816914.724|JOIN|rdkafka#consumer-3| [thrd:main]: localhost:9092/0: Joining group "python_example_group_2" with 1 subscribed topic(s)
%7|1596816914.724|CGRPJOINSTATE|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2" changed join state init -> wait-join (v1, state up)
%7|1596816914.725|SEND|rdkafka#consumer-3| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 252 bytes @ 0, CorrId 4)
%7|1596816914.725|BROADCAST|rdkafka#consumer-3| [thrd:GroupCoordinator]: Broadcasting state change

%7|1596816914.728|JOIN|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2": join with 1 (1) subscribed topic(s)
%7|1596816914.728|CGRPMETADATA|rdkafka#consumer-4| [thrd:main]: consumer join: metadata for subscription is up to date (1998ms old)
%7|1596816914.728|JOIN|rdkafka#consumer-4| [thrd:main]: localhost:9092/0: Joining group "python_example_group_2" with 1 subscribed topic(s)
%7|1596816914.728|CGRPJOINSTATE|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2" changed join state init -> wait-join (v1, state up)
%7|1596816914.729|SEND|rdkafka#consumer-4| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 208 bytes @ 0, CorrId 3)
%7|1596816914.729|BROADCAST|rdkafka#consumer-4| [thrd:GroupCoordinator]: Broadcasting state change
%7|1596816914.731|RECV|rdkafka#consumer-4| [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 64 bytes, CorrId 3, rtt 2.06ms)
%7|1596816914.731|JOINGROUP|rdkafka#consumer-4| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId rdkafka-9050f7cd-9c12-4175-9a8a-ee3b9f172f4a, 0 members in group: Broker: Group member needs a valid member ID
%7|1596816914.731|REQERR|rdkafka#consumer-4| [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore
%7|1596816914.731|MEMBERID|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2": updating member id "" -> "rdkafka-9050f7cd-9c12-4175-9a8a-ee3b9f172f4a"
%7|1596816914.731|CGRPJOINSTATE|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2" changed join state wait-join -> init (v1, state up)
%7|1596816914.731|JOIN|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2": join with 1 (1) subscribed topic(s)
%7|1596816914.731|CGRPMETADATA|rdkafka#consumer-4| [thrd:main]: consumer join: metadata for subscription is up to date (2000ms old)
%7|1596816914.731|JOIN|rdkafka#consumer-4| [thrd:main]: localhost:9092/0: Joining group "python_example_group_2" with 1 subscribed topic(s)
%7|1596816914.731|CGRPJOINSTATE|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2" changed join state init -> wait-join (v1, state up)
%7|1596816914.731|SEND|rdkafka#consumer-4| [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 252 bytes @ 0, CorrId 4)
%7|1596816914.731|BROADCAST|rdkafka#consumer-4| [thrd:GroupCoordinator]: Broadcasting state change

%7|1596816916.702|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596816916.702|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "python_example_group_2": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596816916.710|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596816916.710|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "python_example_group_2": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596816916.718|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596816916.718|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "python_example_group_2": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596816916.727|COMMIT|rdkafka#consumer-4| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596816916.727|UNASSIGN|rdkafka#consumer-4| [thrd:main]: Group "python_example_group_2": unassign done in state up (join state wait-join): without new assignment: OffsetCommit done (__NO_OFFSET)
...

Checklist

Please provide the following information:

  • [x ] confluent-kafka-python and librdkafka version (1.42, 1.40):
  • [x ] Apache Kafka broker version: 2.4
  • [x ] Client configuration: {...} see above
  • [x ] Operating system: Redhat Linux 8
  • [x ] Provide client logs (with 'debug': '..' as necessary): see above
  • [x ] Provide broker log excerpts: nothing abnormal
  • [x ] Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugReporting an unexpected or problematic behavior of the codebasecomponent:consumerIssues tied specifically to consumer logic or code pathsinvestigate furtherIt's unclear what the issue is at this time but there is enough interest to look into it

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions