Skip to content

async fixes in kernel usage handler #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 14 additions & 21 deletions jupyter_resource_usage/api.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
import json
from concurrent.futures import ThreadPoolExecutor
from inspect import isawaitable

import psutil
import zmq
import zmq.asyncio
from jupyter_client.jsonutil import date_default
from jupyter_server.base.handlers import APIHandler
from jupyter_server.utils import url_path_join
from packaging import version
from tornado import web
from tornado.concurrent import run_on_executor

try:
# Traitlets >= 4.3.3
from traitlets import Callable
except ImportError:
from .utils import Callable


try:
import ipykernel
Expand All @@ -24,8 +18,6 @@
except ImportError:
USAGE_IS_SUPPORTED = False

MAX_RETRIES = 3


class ApiHandler(APIHandler):
executor = ThreadPoolExecutor(max_workers=5)
Expand Down Expand Up @@ -113,17 +105,18 @@ async def get(self, matched_part=None, *args, **kwargs):
usage_request = session.msg("usage_request", {})

control_channel.send(usage_request)
poller = zmq.Poller()
poller = zmq.asyncio.Poller()
control_socket = control_channel.socket
poller.register(control_socket, zmq.POLLIN)
for i in range(1, MAX_RETRIES + 1):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a second try could ever be taken, because the options are:

  1. events is empty, in which case empty dict is written, or
  2. events contains the control socket, in which case the response is written

both of which end with break

There shouldn't be a way for the continue branch to ever be taken, which meant that the loop could only ever take one iteration.

timeout_ms = 1000 * i
events = dict(poller.poll(timeout_ms))
if not events:
self.write(json.dumps({}))
break
if control_socket not in events:
continue
res = await client.control_channel.get_msg(timeout=0)
# previous behavior was 3 retries: 1 + 2 + 3 = 6 seconds
timeout_ms = 6_000
events = dict(await poller.poll(timeout_ms))
if control_socket not in events:
self.write(json.dumps({}))
else:
res = client.control_channel.get_msg(timeout=0)
if isawaitable(res):
# control_channel.get_msg may return a Future,
# depending on configured KernelManager class
res = await res
self.write(json.dumps(res, default=date_default))
break
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"jupyter_server>=1.0",
"prometheus_client",
"psutil~=5.6",
"pyzmq>=19",
]
dynamic = ["version"]

Expand Down