diff --git a/kafka/client.py b/kafka/client.py index 39c89ba43..8408070f6 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,8 +8,8 @@ from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - LeaderUnavailableError, - KafkaUnavailableError) + LeaderUnavailableError, CoordinatorUnavailableError, + KafkaUnavailableError, ConsumerMetadataNotSupportedError) from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol @@ -62,6 +62,23 @@ def _get_conn_for_broker(self, broker): return self._get_conn(broker.host, broker.port) + def _get_coordinator_for_consumer(self, consumer): + """ + Returns the coordinator for a consumer group as BrokerMetadata + """ + request_id = self._next_id() + request = KafkaProtocol.encode_consumer_metadata_request(self.client_id, + request_id, consumer) + + try: + response = self._send_broker_unaware_request(request_id, request) + broker = KafkaProtocol.decode_consumer_metadata_response(response) + except KafkaUnavailableError: + raise ConsumerMetadataNotSupportedError("Brokers do not support ConsumerMetadataRequest") + + log.debug("Broker metadata: %s", broker) + return broker + def _get_leader_for_partition(self, topic, partition): """ Returns the leader for a partition or None if the partition exists @@ -187,6 +204,60 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): + """ + Send requests to the coordinator for the specified consumer group + + Params + ====== + group: the consumer group string for the request + payloads: list of object-like entities with a topic and + partition attribute + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes + + Return + ====== + List of response objects in the same order as the supplied payloads + """ + # Get the coordinator for the consumer + broker = self._get_coordinator_for_consumer(group) + if broker is None: + raise CoordinatorUnavailableError( + "Coordinator not available for group %s" % group) + + # Send the list of request payloads + conn = self._get_conn_for_broker(broker) + requestId = self._next_id() + request = encoder_fn(client_id=self.client_id, + correlation_id=requestId, payloads=payloads) + + # Send the request, recv the response + try: + conn.send(requestId, request) + if decoder_fn is not None: + try: + response = conn.recv(requestId) + except ConnectionError, e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + raise FailedPayloadsError(payloads) + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + raise FailedPayloadsError(payloads) + + resp = [] + if decoder_fn is not None: + for response in decoder_fn(response): + resp.append(response) + + return resp + def __repr__(self): return '' % (self.client_id) @@ -373,7 +444,10 @@ def send_offset_commit_request(self, group, payloads=[], encoder = partial(KafkaProtocol.encode_offset_commit_request, group=group) decoder = KafkaProtocol.decode_offset_commit_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) + try: + resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) + except ConsumerMetadataNotSupportedError: + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] for resp in resps: @@ -392,7 +466,10 @@ def send_offset_fetch_request(self, group, payloads=[], encoder = partial(KafkaProtocol.encode_offset_fetch_request, group=group) decoder = KafkaProtocol.decode_offset_fetch_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) + try: + resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) + except ConsumerMetadataNotSupportedError: + resps = self._send_broker_aware_request(payloads, encoder, decoder) out = [] for resp in resps: diff --git a/kafka/common.py b/kafka/common.py index 005e6dd06..59220a555 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -63,6 +63,9 @@ 10 : 'MESSAGE_SIZE_TOO_LARGE', 11 : 'STALE_CONTROLLER_EPOCH', 12 : 'OFFSET_METADATA_TOO_LARGE', + 14 : 'OFFSETS_LOAD_IN_PROGRESS', + 15 : 'CONSUMER_COORDINATOR_NOT_AVAILABLE', + 16 : 'NOT_COORDINATOR_FOR_CONSUMER', } class ErrorMapping(object): @@ -92,6 +95,18 @@ class LeaderUnavailableError(KafkaError): pass +class OffsetLoadInProgressError(KafkaError): + pass + + +class CoordinatorUnavailableError(KafkaError): + pass + + +class ConsumerMetadataNotSupportedError(KafkaError): + pass + + class PartitionUnavailableError(KafkaError): pass diff --git a/kafka/protocol.py b/kafka/protocol.py index 25be023eb..234729411 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -9,7 +9,9 @@ BrokerMetadata, PartitionMetadata, Message, OffsetAndMessage, ProduceResponse, FetchResponse, OffsetResponse, OffsetCommitResponse, OffsetFetchResponse, - BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall + BufferUnderflowError, ChecksumError, ConsumerFetchSizeTooSmall, + OffsetLoadInProgressError, CoordinatorUnavailableError, + ErrorMapping ) from kafka.util import ( read_short_string, read_int_string, relative_unpack, @@ -31,6 +33,7 @@ class KafkaProtocol(object): METADATA_KEY = 3 OFFSET_COMMIT_KEY = 8 OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 ATTRIBUTE_CODEC_MASK = 0x03 CODEC_NONE = 0x00 @@ -411,6 +414,46 @@ def decode_metadata_response(cls, data): return brokers, topic_metadata + @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, consumer): + """ + Encode a ConsumerMetadataRequest + + Params + ====== + client_id: string + correlation_id: int + consumer: string + """ + message = cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY) + + message += struct.pack('>h%ds' % len(consumer), len(consumer), consumer) + + return write_int_string(message) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, error_code), cur) = relative_unpack('>ih', data, 0) + + if error_code == ErrorMapping.NO_ERROR: + # Broker info + ((nodeId, ), cur) = relative_unpack('>i', data, cur) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + return BrokerMetadata(nodeId, host, port) + elif error_code == ErrorMapping.OFFSET_LOAD_IN_PROGRESS: + raise OffsetLoadInProgressError("Offset load in progress. Try again.") + elif error_code == ErrorMapping.CONSUMER_COORDINATOR_NOT_AVAILABLE: + raise CoordinatorUnavailableError("Consumer coordinator not available") + @classmethod def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads):