Skip to content

Commit 81c4303

Browse files
authored
Merge pull request #124 from opentensor/fix/thewhaleking/handle-incorect-timeout
Handle Incorrect Timeouts
2 parents a88a787 + 2e06364 commit 81c4303

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import logging
1010
import ssl
1111
import time
12+
import warnings
1213
from unittest.mock import AsyncMock
1314
from hashlib import blake2b
1415
from typing import (
@@ -531,15 +532,31 @@ def __init__(
531532
self._exit_task = None
532533
self._open_subscriptions = 0
533534
self._options = options if options else {}
534-
self.last_received = time.time()
535+
try:
536+
now = asyncio.get_running_loop().time()
537+
except RuntimeError:
538+
warnings.warn(
539+
"You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. "
540+
"Verify this is intended."
541+
)
542+
now = asyncio.new_event_loop().time()
543+
self.last_received = now
544+
self.last_sent = now
535545

536546
async def __aenter__(self):
537547
async with self._lock:
538548
self._in_use += 1
539549
await self.connect()
540550
return self
541551

552+
@staticmethod
553+
async def loop_time() -> float:
554+
return asyncio.get_running_loop().time()
555+
542556
async def connect(self, force=False):
557+
now = await self.loop_time()
558+
self.last_received = now
559+
self.last_sent = now
543560
if self._exit_task:
544561
self._exit_task.cancel()
545562
if not self._initialized or force:
@@ -595,7 +612,7 @@ async def _recv(self) -> None:
595612
try:
596613
# TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic
597614
response = json.loads(await self.ws.recv(decode=False))
598-
self.last_received = time.time()
615+
self.last_received = await self.loop_time()
599616
async with self._lock:
600617
# note that these 'subscriptions' are all waiting sent messages which have not received
601618
# responses, and thus are not the same as RPC 'subscriptions', which are unique
@@ -631,12 +648,12 @@ async def send(self, payload: dict) -> int:
631648
Returns:
632649
id: the internal ID of the request (incremented int)
633650
"""
634-
# async with self._lock:
635651
original_id = get_next_id()
636652
# self._open_subscriptions += 1
637653
await self.max_subscriptions.acquire()
638654
try:
639655
await self.ws.send(json.dumps({**payload, **{"id": original_id}}))
656+
self.last_sent = await self.loop_time()
640657
return original_id
641658
except (ConnectionClosed, ssl.SSLError, EOFError):
642659
async with self._lock:
@@ -2126,7 +2143,11 @@ async def _make_rpc_request(
21262143

21272144
if request_manager.is_complete:
21282145
break
2129-
if time.time() - self.ws.last_received >= self.retry_timeout:
2146+
if (
2147+
(current_time := await self.ws.loop_time()) - self.ws.last_received
2148+
>= self.retry_timeout
2149+
and current_time - self.ws.last_sent >= self.retry_timeout
2150+
):
21302151
if attempt >= self.max_retries:
21312152
logger.warning(
21322153
f"Timed out waiting for RPC requests {attempt} times. Exiting."

0 commit comments

Comments
 (0)