Skip to content

Support for consumer-aware OffsetFetchRequest and OffsetCommitRequest #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
KafkaUnavailableError)
LeaderUnavailableError, CoordinatorUnavailableError,
KafkaUnavailableError, ConsumerMetadataNotSupportedError)

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
Expand Down Expand Up @@ -62,6 +62,23 @@ def _get_conn_for_broker(self, broker):

return self._get_conn(broker.host, broker.port)

def _get_coordinator_for_consumer(self, consumer):
"""
Returns the coordinator for a consumer group as BrokerMetadata
"""
request_id = self._next_id()
request = KafkaProtocol.encode_consumer_metadata_request(self.client_id,
request_id, consumer)

try:
response = self._send_broker_unaware_request(request_id, request)
broker = KafkaProtocol.decode_consumer_metadata_response(response)
except KafkaUnavailableError:
raise ConsumerMetadataNotSupportedError("Brokers do not support ConsumerMetadataRequest")

log.debug("Broker metadata: %s", broker)
return broker

def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
Expand Down Expand Up @@ -187,6 +204,60 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()

def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
"""
Send requests to the coordinator for the specified consumer group

Params
======
group: the consumer group string for the request
payloads: list of object-like entities with a topic and
partition attribute
encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments
decode_fn: a method to decode a response body into response objects.
The response objects must be object-like and have topic
and partition attributes

Return
======
List of response objects in the same order as the supplied payloads
"""
# Get the coordinator for the consumer
broker = self._get_coordinator_for_consumer(group)
if broker is None:
raise CoordinatorUnavailableError(
"Coordinator not available for group %s" % group)

# Send the list of request payloads
conn = self._get_conn_for_broker(broker)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

# Send the request, recv the response
try:
conn.send(requestId, request)
if decoder_fn is not None:
try:
response = conn.recv(requestId)
except ConnectionError, e:
log.warning("Could not receive response to request [%s] "
"from server %s: %s", request, conn, e)
raise FailedPayloadsError(payloads)
except ConnectionError, e:
log.warning("Could not send request [%s] to server %s: %s",
request, conn, e)
raise FailedPayloadsError(payloads)

resp = []
if decoder_fn is not None:
for response in decoder_fn(response):
resp.append(response)

return resp

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

Expand Down Expand Up @@ -373,7 +444,10 @@ def send_offset_commit_request(self, group, payloads=[],
encoder = partial(KafkaProtocol.encode_offset_commit_request,
group=group)
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
try:
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be a flag. If it fails once, won't it fail every time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is the situation where the client survives a cluster upgrade. It could be considered a corner case, and not worth supporting, however. I'm more concerned about the behavior in _send_broker_aware_request that will cause this request to get sent to every broker even if it's unsupported.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it always require two round trips if you aren't using a kafka server that supports this API request (eg, any production kafka cluster outside LinkedIn?)

except ConsumerMetadataNotSupportedError:
resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
for resp in resps:
Expand All @@ -392,7 +466,10 @@ def send_offset_fetch_request(self, group, payloads=[],
encoder = partial(KafkaProtocol.encode_offset_fetch_request,
group=group)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)
try:
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
except ConsumerMetadataNotSupportedError:
resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
for resp in resps:
Expand Down
15 changes: 15 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
14 : 'OFFSETS_LOAD_IN_PROGRESS',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible typo? Should say 'OFFSET_LOAD_IN_PROGRESS' to be consistent with protocol.py, 452.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pull request is pretty old, and was made against the dev branch. It might have changed names since then.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it’s really old and needs to be rebased. I haven’t had a chance to do it yet, however.

-Todd

From: Mark Roberts <[email protected]mailto:[email protected]>
Reply-To: mumrah/kafka-python <[email protected]mailto:[email protected]>
Date: Monday, December 8, 2014 at 9:45 AM
To: mumrah/kafka-python <[email protected]mailto:[email protected]>
Cc: Todd Palino <[email protected]mailto:[email protected]>
Subject: Re: [kafka-python] Support for consumer-aware OffsetFetchRequest and OffsetCommitRequest (#164)

In kafka/common.py:

@@ -63,6 +63,9 @@
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',

  • 14 : 'OFFSETS_LOAD_IN_PROGRESS',

This pull request is pretty old, and was made against the dev branch. It might have changed names since then.


Reply to this email directly or view it on GitHubhttps://github.com//pull/164/files#r21470157.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right. Checked this out hoping to resolve some issues offset commit
issues against 0.8.2b and got an exception due to this instead, didn't look
at the date :).

On Mon Dec 08 2014 at 18:03:47 toddpalino [email protected] wrote:

In kafka/common.py:

@@ -63,6 +63,9 @@
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',

  • 14 : 'OFFSETS_LOAD_IN_PROGRESS',

Yeah, it’s really old and needs to be rebased. I haven’t had a chance to
do it yet, however.
-Todd From: Mark Roberts <[email protected]<mailto:
[email protected]>> Reply-To: mumrah/kafka-python <
[email protected]:[email protected]> Date: Monday,
December 8, 2014 at 9:45 AM To: mumrah/kafka-python <
[email protected]:[email protected]>
Cc: Todd Palino <[email protected]mailto:[email protected]>
Subject: Re: [kafka-python] Support for consumer-aware OffsetFetchRequest
and OffsetCommitRequest (#164
#164) In kafka/common.py:
@@ -63,6 +63,9 @@ 10 : 'MESSAGE_SIZE_TOO_LARGE', 11 :
'STALE_CONTROLLER_EPOCH', 12 : 'OFFSET_METADATA_TOO_LARGE', + 14 :
'OFFSETS_LOAD_IN_PROGRESS',
This pull request is pretty old, and was made against the dev branch. It
might have changed names since then. — Reply to this email directly or view
it on GitHub<
https://github.com/mumrah/kafka-python/pull/164/files#r21470157>.


Reply to this email directly or view it on GitHub
https://github.com/mumrah/kafka-python/pull/164/files#r21471308.

15 : 'CONSUMER_COORDINATOR_NOT_AVAILABLE',
16 : 'NOT_COORDINATOR_FOR_CONSUMER',
}

class ErrorMapping(object):
Expand Down Expand Up @@ -92,6 +95,18 @@ class LeaderUnavailableError(KafkaError):
pass


class OffsetLoadInProgressError(KafkaError):
pass


class CoordinatorUnavailableError(KafkaError):
pass


class ConsumerMetadataNotSupportedError(KafkaError):
pass


class PartitionUnavailableError(KafkaError):
pass

Expand Down
45 changes: 44 additions & 1 deletion kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage,
ProduceResponse, FetchResponse, OffsetResponse,
OffsetCommitResponse, OffsetFetchResponse,
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall
BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall,
OffsetLoadInProgressError, CoordinatorUnavailableError,
ErrorMapping
)
from kafka.util import (
read_short_string, read_int_string, relative_unpack,
Expand All @@ -31,6 +33,7 @@ class KafkaProtocol(object):
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
CONSUMER_METADATA_KEY = 10

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
Expand Down Expand Up @@ -411,6 +414,46 @@ def decode_metadata_response(cls, data):

return brokers, topic_metadata

@classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, consumer):
"""
Encode a ConsumerMetadataRequest

Params
======
client_id: string
correlation_id: int
consumer: string
"""
message = cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.CONSUMER_METADATA_KEY)

message += struct.pack('>h%ds' % len(consumer), len(consumer), consumer)

return write_int_string(message)

@classmethod
def decode_consumer_metadata_response(cls, data):
"""
Decode bytes to a ConsumerMetadataResponse

Params
======
data: bytes to decode
"""
((correlation_id, error_code), cur) = relative_unpack('>ih', data, 0)

if error_code == ErrorMapping.NO_ERROR:
# Broker info
((nodeId, ), cur) = relative_unpack('>i', data, cur)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)
return BrokerMetadata(nodeId, host, port)
elif error_code == ErrorMapping.OFFSET_LOAD_IN_PROGRESS:
raise OffsetLoadInProgressError("Offset load in progress. Try again.")
elif error_code == ErrorMapping.CONSUMER_COORDINATOR_NOT_AVAILABLE:
raise CoordinatorUnavailableError("Consumer coordinator not available")

@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
Expand Down