|
11 | 11 | from kafka.protocol.admin import (
|
12 | 12 | CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
|
13 | 13 | ListGroupsRequest, DescribeGroupsRequest)
|
| 14 | +from kafka.protocol.commit import GroupCoordinatorRequest |
14 | 15 | from kafka.protocol.metadata import MetadataRequest
|
15 | 16 | from kafka.version import __version__
|
16 | 17 |
|
@@ -243,6 +244,44 @@ def _refresh_controller_id(self):
|
243 | 244 | "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
|
244 | 245 | .format(version))
|
245 | 246 |
|
| 247 | + def _find_group_coordinator_id(self, group_id): |
| 248 | + """Find the broker node_id of the coordinator of the given group. |
| 249 | +
|
| 250 | + Sends a FindCoordinatorRequest message to the cluster. Will block until |
| 251 | + the FindCoordinatorResponse is received. Any errors are immediately |
| 252 | + raised. |
| 253 | +
|
| 254 | + :param group_id: The consumer group ID. This is typically the group |
| 255 | + name as a string. |
| 256 | + :return: The node_id of the broker that is the coordinator. |
| 257 | + """ |
| 258 | + # Note: Java may change how this is implemented in KAFKA-6791. |
| 259 | + # |
| 260 | + # TODO add support for dynamically picking version of |
| 261 | + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. |
| 262 | + # When I experimented with this, GroupCoordinatorResponse_v1 didn't |
| 263 | + # match GroupCoordinatorResponse_v0 and I couldn't figure out why. |
| 264 | + gc_request = GroupCoordinatorRequest[0](group_id) |
| 265 | + gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) |
| 266 | + # use the extra error checking in add_group_coordinator() rather than |
| 267 | + # immediately returning the group coordinator. |
| 268 | + success = self._client.cluster.add_group_coordinator(group_id, gc_response) |
| 269 | + if not success: |
| 270 | + error_type = Errors.for_code(gc_response.error_code) |
| 271 | + assert error_type is not Errors.NoError |
| 272 | + # Note: When error_type.retriable, Java will retry... see |
| 273 | + # KafkaAdminClient's handleFindCoordinatorError method |
| 274 | + raise error_type( |
| 275 | + "Could not identify group coordinator for group_id '{}' from response '{}'." |
| 276 | + .format(group_id, gc_response)) |
| 277 | + group_coordinator = self._client.cluster.coordinator_for_group(group_id) |
| 278 | + # will be None if the coordinator was never populated, which should never happen here |
| 279 | + assert group_coordinator is not None |
| 280 | + # will be -1 if add_group_coordinator() failed... but by this point the |
| 281 | + # error should have been raised. |
| 282 | + assert group_coordinator != -1 |
| 283 | + return group_coordinator |
| 284 | + |
246 | 285 | def _send_request_to_node(self, node, request):
|
247 | 286 | """Send a kafka protocol message to a specific broker. Will block until the message result is received.
|
248 | 287 |
|
|
0 commit comments