Skip to content

Getting ValueError: Invalid file object: None in check_version when connecting #2450

Closed
@dgoldenberg-ias

Description

@dgoldenberg-ias

Environment

kafka-python = "^2.0.2"
python = Python 3.10.15

We're using AWS MSK - Kafka version 3.5.1.

Code

import traceback

from kafka import KafkaAdminClient

BOOTSTRAP_SERVERS = (
    "srv1.us-east-1.amazonaws.com:9092"
    ",srv2.us-east-1.amazonaws.com:9092"
    ",srv3.us-east-1.amazonaws.com:9092"
)

def run_util() -> None:
    admin_client = None
    try:
        # Create an admin client
        print(f">> Connecting to: '{BOOTSTRAP_SERVERS}'...")
        # api_version=(3, 5, 1)
        admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)

        # Retrieve and print the list of topics
        topics = admin_client.list_topics()
        print("Kafka Topics:")
        for topic in topics:
            print(f"- {topic}")

    except Exception as e:
        print(f"An error occurred while creating the admin client: {e}")
        traceback.print_exc()
    finally:
        # Close the admin client if it was created
        if admin_client:
            try:
                admin_client.close()
            except Exception as close_e:
                print(f"Error while closing the admin client: {close_e}")
                traceback.print_exc()

if __name__ == "__main__":
    run_util()

Observations

When I run this code locally, I get the below error. Curiously, when running it from a databricks notebook, I'm not getting the error. Python version there is 3.10.12.

Error stack trace

An error occurred while creating the admin client: Invalid file object: None
Traceback (most recent call last):
  File "/msk_proj/with_kafka_python/msk_kp_util_list_topics.py", line 49, in run_util
    admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/.venv310/lib/python3.10/site-packages/kafka/admin/client.py", line 208, in __init__
    self._client = KafkaClient(metrics=self._metrics,
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 909, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 1254, in check_version
    selector.register(self._sock, selectors.EVENT_READ)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 518, in register
    key = super().register(fileobj, events, data)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 239, in register
    key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 226, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/opt/homebrew/Cellar/[email protected]/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 39, in _fileobj_to_fd
    raise ValueError("Invalid file object: "
ValueError: Invalid file object: None

Specifying the Kafka API version explicitly

Specifying the Kafka API version explicitly:

admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, api_version=(3, 5, 1))

Same error.

Network?

Seems OK.

telnet srv1.us-east-1.amazonaws.com 9092
Trying 10.34.11.23...
Connected to srv1.amazonaws.com.
Escape character is '^]'.

Basic socket operations

Used the below code to verify sockets are working fine.

import socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('srv1.us-east-1.amazonaws.com', 9092))
    print("Connection successful!")
except Exception as e:
    print(f"Socket connection failed: {e}")
finally:
    sock.close()

The exception occurs here:

selector.register(self._sock, selectors.EVENT_READ)

in the check_version method.

        for version, request in test_cases:
            if not self.connect_blocking(timeout_at - time.time()):
                reset_override_configs()
                raise Errors.NodeNotReadyError()
            f = self.send(request)
            # HACK: sleeping to wait for socket to send bytes
            time.sleep(0.1)
            # when broker receives an unrecognized request API
            # it abruptly closes our socket.
            # so we attempt to send a second request immediately
            # that we believe it will definitely recognize (metadata)
            # the attempt to write to a disconnected socket should
            # immediately fail and allow us to infer that the prior
            # request was unrecognized
            mr = self.send(MetadataRequest[0](topics))

            selector = self.config['selector']()
            selector.register(self._sock, selectors.EVENT_READ)

Any cause or workaround? Thanks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions