Description
It is not possible to add a new consumer to an existing group with a StickyPartitionAssignor
due to the lack of __len__
method in dict_itemiterator
.
Kafka version 1.0.1 (https://archive.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz)
Python 3.8
kafka-python 2.0.2
Suppose we have first_consumer.py
:
from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
addr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["first_topic"]
consumer = KafkaConsumer(
*topic,
partition_assignment_strategy=(StickyPartitionAssignor,),
bootstrap_servers=addr,
group_id=group_id)
while True:
consumer.poll(timeout_ms=4000, max_records=1)
print(consumer.assignment())
And second_consumer.py
from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
addr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["second_topic"]
consumer = KafkaConsumer(
*topic,
partition_assignment_strategy=(StickyPartitionAssignor,),
bootstrap_servers=addr,
group_id=group_id)
while True:
consumer.poll(timeout_ms=4000, max_records=1)
print(consumer.assignment())
Then we launch our script first_consumer.py
and wait him to get his assignment and then launch the second script second_consumer.py
.
This sequence leads to join_group_request
during the second consumer connection, which in turn calls assignor.metadata()
. Eventually, in assignor.metadata
we are trying to encode user_data
:
return b''.join(
[Int32.encode(len(items))] +
[self.array_of.encode(item) for item in items]
)
But items
is type of dict_iteritem
(which is six.iteritems) and has no __len__
defined. And this is the reason for the following exception:
File "first_consumer.py", line 12, in consume
records = consumer.poll(timeout_ms=4000, max_records=1)
File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 675, in _poll_once
self._coordinator.poll()
File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 289, in poll
self.ensure_active_group()
File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
future = self._send_join_group_request()
File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
for protocol, metadata in self.group_protocols()
File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
metadata = assignor.metadata(self._joined_subscription)
File "/venv/lib/python3.8/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
user_data = data.encode()
File "/venv/lib/python3.8/site-packages/kafka/util.py", line 50, in __call__
return self.method()(self.target(), *args, **kwargs)
File "/venv/lib/python3.8/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
return self.SCHEMA.encode(
File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 146, in encode
return b''.join([
File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
field.encode(item[i])
File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 185, in encode
[Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()
Maybe @aynroot can help here.