Closed
Description
Environment
kafka-python = "^2.0.2"
python = Python 3.10.15
We're using AWS MSK - Kafka version 3.5.1.
Code
import traceback
from kafka import KafkaAdminClient
BOOTSTRAP_SERVERS = (
"srv1.us-east-1.amazonaws.com:9092"
",srv2.us-east-1.amazonaws.com:9092"
",srv3.us-east-1.amazonaws.com:9092"
)
def run_util() -> None:
admin_client = None
try:
# Create an admin client
print(f">> Connecting to: '{BOOTSTRAP_SERVERS}'...")
# api_version=(3, 5, 1)
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
# Retrieve and print the list of topics
topics = admin_client.list_topics()
print("Kafka Topics:")
for topic in topics:
print(f"- {topic}")
except Exception as e:
print(f"An error occurred while creating the admin client: {e}")
traceback.print_exc()
finally:
# Close the admin client if it was created
if admin_client:
try:
admin_client.close()
except Exception as close_e:
print(f"Error while closing the admin client: {close_e}")
traceback.print_exc()
if __name__ == "__main__":
run_util()
Observations
When I run this code locally, I get the below error. Curiously, when running it from a databricks notebook, I'm not getting the error. Python version there is 3.10.12.
Error stack trace
An error occurred while creating the admin client: Invalid file object: None
Traceback (most recent call last):
File "/msk_proj/with_kafka_python/msk_kp_util_list_topics.py", line 49, in run_util
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
File "/.venv310/lib/python3.10/site-packages/kafka/admin/client.py", line 208, in __init__
self._client = KafkaClient(metrics=self._metrics,
File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 244, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 909, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 1254, in check_version
selector.register(self._sock, selectors.EVENT_READ)
File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 518, in register
key = super().register(fileobj, events, data)
File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 239, in register
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 226, in _fileobj_lookup
return _fileobj_to_fd(fileobj)
File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 39, in _fileobj_to_fd
raise ValueError("Invalid file object: "
ValueError: Invalid file object: None
Specifying the Kafka API version explicitly
Specifying the Kafka API version explicitly:
admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, api_version=(3, 5, 1))
Same error.
Network?
Seems OK.
telnet srv1.us-east-1.amazonaws.com 9092
Trying 10.34.11.23...
Connected to srv1.amazonaws.com.
Escape character is '^]'.
Basic socket operations
Used the below code to verify sockets are working fine.
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('srv1.us-east-1.amazonaws.com', 9092))
print("Connection successful!")
except Exception as e:
print(f"Socket connection failed: {e}")
finally:
sock.close()
The exception occurs here:
Line 1254 in 5bb126b
in the check_version method.
for version, request in test_cases:
if not self.connect_blocking(timeout_at - time.time()):
reset_override_configs()
raise Errors.NodeNotReadyError()
f = self.send(request)
# HACK: sleeping to wait for socket to send bytes
time.sleep(0.1)
# when broker receives an unrecognized request API
# it abruptly closes our socket.
# so we attempt to send a second request immediately
# that we believe it will definitely recognize (metadata)
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
mr = self.send(MetadataRequest[0](topics))
selector = self.config['selector']()
selector.register(self._sock, selectors.EVENT_READ)
Any cause or workaround? Thanks.
Metadata
Metadata
Assignees
Labels
No labels