|
12 | 12 | from kafka.protocol.admin import (
|
13 | 13 | CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
|
14 | 14 | ListGroupsRequest, DescribeGroupsRequest)
|
| 15 | +from kafka.protocol.commit import GroupCoordinatorRequest |
15 | 16 | from kafka.protocol.metadata import MetadataRequest
|
16 | 17 | from kafka.version import __version__
|
17 | 18 |
|
@@ -259,6 +260,44 @@ def _refresh_controller_id(self):
|
259 | 260 | "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
|
260 | 261 | .format(version))
|
261 | 262 |
|
| 263 | + def _find_group_coordinator_id(self, group_id): |
| 264 | + """Find the broker node_id of the coordinator of the given group. |
| 265 | +
|
| 266 | + Sends a FindCoordinatorRequest message to the cluster. Will block until |
| 267 | + the FindCoordinatorResponse is received. Any errors are immediately |
| 268 | + raised. |
| 269 | +
|
| 270 | + :param group_id: The consumer group ID. This is typically the group |
| 271 | + name as a string. |
| 272 | + :return: The node_id of the broker that is the coordinator. |
| 273 | + """ |
| 274 | + # Note: Java may change how this is implemented in KAFKA-6791. |
| 275 | + # |
| 276 | + # TODO add support for dynamically picking version of |
| 277 | + # GroupCoordinatorRequest which was renamed to FindCoordinatorRequest. |
| 278 | + # When I experimented with this, GroupCoordinatorResponse_v1 didn't |
| 279 | + # match GroupCoordinatorResponse_v0 and I couldn't figure out why. |
| 280 | + gc_request = GroupCoordinatorRequest[0](group_id) |
| 281 | + gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) |
| 282 | + # use the extra error checking in add_group_coordinator() rather than |
| 283 | + # immediately returning the group coordinator. |
| 284 | + success = self._client.cluster.add_group_coordinator(group_id, gc_response) |
| 285 | + if not success: |
| 286 | + error_type = Errors.for_code(gc_response.error_code) |
| 287 | + assert error_type is not Errors.NoError |
| 288 | + # Note: When error_type.retriable, Java will retry... see |
| 289 | + # KafkaAdminClient's handleFindCoordinatorError method |
| 290 | + raise error_type( |
| 291 | + "Could not identify group coordinator for group_id '{}' from response '{}'." |
| 292 | + .format(group_id, gc_response)) |
| 293 | + group_coordinator = self._client.cluster.coordinator_for_group(group_id) |
| 294 | + # will be None if the coordinator was never populated, which should never happen here |
| 295 | + assert group_coordinator is not None |
| 296 | + # will be -1 if add_group_coordinator() failed... but by this point the |
| 297 | + # error should have been raised. |
| 298 | + assert group_coordinator != -1 |
| 299 | + return group_coordinator |
| 300 | + |
262 | 301 | def _send_request_to_node(self, node, request):
|
263 | 302 | """Send a kafka protocol message to a specific broker. Will block until the message result is received.
|
264 | 303 |
|
|
0 commit comments