|
4 | 4 | import logging
|
5 | 5 | import socket
|
6 | 6 | from kafka.client_async import KafkaClient, selectors
|
| 7 | +import kafka.errors as Errors |
7 | 8 | from kafka.errors import (
|
8 | 9 | IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
|
9 | 10 | NodeNotReadyError, NotControllerError)
|
10 | 11 | from kafka.metrics import MetricConfig, Metrics
|
11 | 12 | from kafka.protocol.admin import (
|
12 | 13 | CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
|
13 | 14 | ListGroupsRequest, DescribeGroupsRequest)
|
| 15 | +from kafka.protocol.commit import GroupCoordinatorRequest |
14 | 16 | from kafka.protocol.metadata import MetadataRequest
|
15 | 17 | from kafka.version import __version__
|
16 | 18 |
|
@@ -243,6 +245,44 @@ def _refresh_controller_id(self):
|
243 | 245 | "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
|
244 | 246 | .format(version))
|
245 | 247 |
|
| 248 | + def _find_group_coordinator_id(self, group_id): |
| 249 | + """Find the broker node_id of the coordinator of the given group. |
| 250 | +
|
| 251 | + Sends a FindCoordinatorRequest message to the cluster. Will block until |
| 252 | + the FindCoordinatorResponse is received. Any errors are immediately |
| 253 | + raised. |
| 254 | +
|
| 255 | + :param group_id: The consumer group ID. This is typically the group |
| 256 | + name as a string. |
| 257 | + :return: The node_id of the broker that is the coordinator. |
| 258 | + """ |
| 259 | + # Note: Java may change how this is implemented in KAFKA-6791. |
| 260 | + # |
| 261 | + # TODO add support for dynamically picking version of |
| 262 | + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. |
| 263 | + # When I experimented with this, GroupCoordinatorResponse_v1 didn't |
| 264 | + # match GroupCoordinatorResponse_v0 and I couldn't figure out why. |
| 265 | + gc_request = GroupCoordinatorRequest[0](group_id) |
| 266 | + gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) |
| 267 | + # use the extra error checking in add_group_coordinator() rather than |
| 268 | + # immediately returning the group coordinator. |
| 269 | + success = self._client.cluster.add_group_coordinator(group_id, gc_response) |
| 270 | + if not success: |
| 271 | + error_type = Errors.for_code(gc_response.error_code) |
| 272 | + assert error_type is not Errors.NoError |
| 273 | + # Note: When error_type.retriable, Java will retry... see |
| 274 | + # KafkaAdminClient's handleFindCoordinatorError method |
| 275 | + raise error_type( |
| 276 | + "Could not identify group coordinator for group_id '{}' from response '{}'." |
| 277 | + .format(group_id, gc_response)) |
| 278 | + group_coordinator = self._client.cluster.coordinator_for_group(group_id) |
| 279 | + # will be None if the coordinator was never populated, which should never happen here |
| 280 | + assert group_coordinator is not None |
| 281 | + # will be -1 if add_group_coordinator() failed... but by this point the |
| 282 | + # error should have been raised. |
| 283 | + assert group_coordinator != -1 |
| 284 | + return group_coordinator |
| 285 | + |
246 | 286 | def _send_request_to_node(self, node, request):
|
247 | 287 | """Send a kafka protocol message to a specific broker. Will block until the message result is received.
|
248 | 288 |
|
|
0 commit comments