Skip to content

Can't add a consumer to an existing group with a StickyPartitionAssignor #2153

Closed
@joein

Description

@joein

It is not possible to add a new consumer to an existing group with a StickyPartitionAssignor due to the lack of __len__ method in dict_itemiterator.

Kafka version 1.0.1 (https://archive.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz)
Python 3.8
kafka-python 2.0.2

Suppose we have first_consumer.py:

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["first_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

And second_consumer.py

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["second_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

Then we launch our script first_consumer.py and wait him to get his assignment and then launch the second script second_consumer.py.
This sequence leads to join_group_request during the second consumer connection, which in turn calls assignor.metadata(). Eventually, in assignor.metadata we are trying to encode user_data:

return b''.join(
           [Int32.encode(len(items))] +
           [self.array_of.encode(item) for item in items]
       )

But items is type of dict_iteritem (which is six.iteritems) and has no __len__ defined. And this is the reason for the following exception:

File "first_consumer.py", line 12, in consume
    records = consumer.poll(timeout_ms=4000, max_records=1)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
    future = self._send_join_group_request()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
    for protocol, metadata in self.group_protocols()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
    metadata = assignor.metadata(self._joined_subscription)
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
    user_data = data.encode()
  File "/venv/lib/python3.8/site-packages/kafka/util.py", line 50, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/venv/lib/python3.8/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
    return self.SCHEMA.encode(
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 146, in encode
    return b''.join([
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
    field.encode(item[i])
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 185, in encode
    [Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()

Maybe @aynroot can help here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions