Skip to content

Commit e53f27f

Browse files
authored
Merge pull request #127 from opentensor/feat/thewhaleking/improvements
Edge Case Fixes
2 parents d88d2b7 + a46e1b8 commit e53f27f

File tree

8 files changed

+411
-133
lines changed

8 files changed

+411
-133
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
name: Run Tests
2+
3+
on:
4+
push:
5+
branches: [main, staging]
6+
pull_request:
7+
branches: [main, staging]
8+
workflow_dispatch:
9+
10+
jobs:
11+
find-tests:
12+
runs-on: ubuntu-latest
13+
steps:
14+
- name: Check-out repository
15+
uses: actions/checkout@v4
16+
17+
- name: Find test files
18+
id: get-tests
19+
run: |
20+
test_files=$(find tests -name "test*.py" | jq -R -s -c 'split("\n") | map(select(. != ""))')
21+
echo "::set-output name=test-files::$test_files"
22+
23+
pull-docker-image:
24+
runs-on: ubuntu-latest
25+
steps:
26+
- name: Log in to GitHub Container Registry
27+
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
28+
29+
- name: Pull Docker Image
30+
run: docker pull ghcr.io/opentensor/subtensor-localnet:devnet-ready
31+
32+
- name: Save Docker Image to Cache
33+
run: docker save -o subtensor-localnet.tar ghcr.io/opentensor/subtensor-localnet:devnet-ready
34+
35+
- name: Upload Docker Image as Artifact
36+
uses: actions/upload-artifact@v4
37+
with:
38+
name: subtensor-localnet
39+
path: subtensor-localnet.tar
40+
41+
run-unit-tests:
42+
name: ${{ matrix.test-file }} / Python ${{ matrix.python-version }}
43+
needs:
44+
- find-tests
45+
- pull-docker-image
46+
runs-on: ubuntu-latest
47+
timeout-minutes: 30
48+
strategy:
49+
fail-fast: false
50+
max-parallel: 32
51+
matrix:
52+
test-file: ${{ fromJson(needs.find-tests.outputs.test-files) }}
53+
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
54+
55+
steps:
56+
- name: Check-out repository
57+
uses: actions/checkout@v4
58+
59+
- name: Install uv
60+
uses: astral-sh/setup-uv@v4
61+
with:
62+
python-version: ${{ matrix.python-version }}
63+
64+
- name: Install dependencies
65+
run: |
66+
uv venv .venv
67+
source .venv/bin/activate
68+
uv pip install .[dev]
69+
70+
- name: Download Docker Image
71+
uses: actions/download-artifact@v4
72+
with:
73+
name: subtensor-localnet
74+
75+
- name: Load Docker Image
76+
run: docker load -i subtensor-localnet.tar
77+
78+
- name: Run pytest
79+
run: |
80+
source .venv/bin/activate
81+
uv run pytest ${{ matrix.test-file }} -v -s

async_substrate_interface/async_substrate.py

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
TYPE_CHECKING,
2323
)
2424

25-
import asyncstdlib as a
2625
from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string
2726
from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject
2827
from scalecodec.types import (
@@ -58,7 +57,7 @@
5857
get_next_id,
5958
rng as random,
6059
)
61-
from async_substrate_interface.utils.cache import async_sql_lru_cache
60+
from async_substrate_interface.utils.cache import async_sql_lru_cache, CachedFetcher
6261
from async_substrate_interface.utils.decoding import (
6362
_determine_if_old_runtime_call,
6463
_bt_decode_to_dict_or_list,
@@ -539,14 +538,17 @@ def __init__(
539538
"You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. "
540539
"Verify this is intended."
541540
)
542-
now = asyncio.new_event_loop().time()
541+
# default value for in case there's no running asyncio loop
542+
# this really doesn't matter in most cases, as it's only used for comparison on the first call to
543+
# see how long it's been since the last call
544+
now = 0.0
543545
self.last_received = now
544546
self.last_sent = now
547+
self._in_use_ids = set()
545548

546549
async def __aenter__(self):
547-
async with self._lock:
548-
self._in_use += 1
549-
await self.connect()
550+
self._in_use += 1
551+
await self.connect()
550552
return self
551553

552554
@staticmethod
@@ -559,18 +561,19 @@ async def connect(self, force=False):
559561
self.last_sent = now
560562
if self._exit_task:
561563
self._exit_task.cancel()
562-
if not self._initialized or force:
563-
self._initialized = True
564-
try:
565-
self._receiving_task.cancel()
566-
await self._receiving_task
567-
await self.ws.close()
568-
except (AttributeError, asyncio.CancelledError):
569-
pass
570-
self.ws = await asyncio.wait_for(
571-
connect(self.ws_url, **self._options), timeout=10
572-
)
573-
self._receiving_task = asyncio.create_task(self._start_receiving())
564+
async with self._lock:
565+
if not self._initialized or force:
566+
try:
567+
self._receiving_task.cancel()
568+
await self._receiving_task
569+
await self.ws.close()
570+
except (AttributeError, asyncio.CancelledError):
571+
pass
572+
self.ws = await asyncio.wait_for(
573+
connect(self.ws_url, **self._options), timeout=10
574+
)
575+
self._receiving_task = asyncio.create_task(self._start_receiving())
576+
self._initialized = True
574577

575578
async def __aexit__(self, exc_type, exc_val, exc_tb):
576579
async with self._lock: # TODO is this actually what I want to happen?
@@ -619,6 +622,7 @@ async def _recv(self) -> None:
619622
self._open_subscriptions -= 1
620623
if "id" in response:
621624
self._received[response["id"]] = response
625+
self._in_use_ids.remove(response["id"])
622626
elif "params" in response:
623627
self._received[response["params"]["subscription"]] = response
624628
else:
@@ -649,6 +653,9 @@ async def send(self, payload: dict) -> int:
649653
id: the internal ID of the request (incremented int)
650654
"""
651655
original_id = get_next_id()
656+
while original_id in self._in_use_ids:
657+
original_id = get_next_id()
658+
self._in_use_ids.add(original_id)
652659
# self._open_subscriptions += 1
653660
await self.max_subscriptions.acquire()
654661
try:
@@ -674,7 +681,7 @@ async def retrieve(self, item_id: int) -> Optional[dict]:
674681
self.max_subscriptions.release()
675682
return item
676683
except KeyError:
677-
await asyncio.sleep(0.001)
684+
await asyncio.sleep(0.1)
678685
return None
679686

680687

@@ -725,6 +732,7 @@ def __init__(
725732
)
726733
else:
727734
self.ws = AsyncMock(spec=Websocket)
735+
728736
self._lock = asyncio.Lock()
729737
self.config = {
730738
"use_remote_preset": use_remote_preset,
@@ -748,6 +756,12 @@ def __init__(
748756
self.registry_type_map = {}
749757
self.type_id_to_name = {}
750758
self._mock = _mock
759+
self._block_hash_fetcher = CachedFetcher(512, self._get_block_hash)
760+
self._parent_hash_fetcher = CachedFetcher(512, self._get_parent_block_hash)
761+
self._runtime_info_fetcher = CachedFetcher(16, self._get_block_runtime_info)
762+
self._runtime_version_for_fetcher = CachedFetcher(
763+
512, self._get_block_runtime_version_for
764+
)
751765

752766
async def __aenter__(self):
753767
if not self._mock:
@@ -1869,9 +1883,8 @@ async def get_metadata(self, block_hash=None) -> MetadataV15:
18691883

18701884
return runtime.metadata_v15
18711885

1872-
@a.lru_cache(maxsize=512)
18731886
async def get_parent_block_hash(self, block_hash):
1874-
return await self._get_parent_block_hash(block_hash)
1887+
return await self._parent_hash_fetcher.execute(block_hash)
18751888

18761889
async def _get_parent_block_hash(self, block_hash):
18771890
block_header = await self.rpc_request("chain_getHeader", [block_hash])
@@ -1916,9 +1929,8 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any:
19161929
"Unknown error occurred during retrieval of events"
19171930
)
19181931

1919-
@a.lru_cache(maxsize=16)
19201932
async def get_block_runtime_info(self, block_hash: str) -> dict:
1921-
return await self._get_block_runtime_info(block_hash)
1933+
return await self._runtime_info_fetcher.execute(block_hash)
19221934

19231935
get_block_runtime_version = get_block_runtime_info
19241936

@@ -1929,9 +1941,8 @@ async def _get_block_runtime_info(self, block_hash: str) -> dict:
19291941
response = await self.rpc_request("state_getRuntimeVersion", [block_hash])
19301942
return response.get("result")
19311943

1932-
@a.lru_cache(maxsize=512)
19331944
async def get_block_runtime_version_for(self, block_hash: str):
1934-
return await self._get_block_runtime_version_for(block_hash)
1945+
return await self._runtime_version_for_fetcher.execute(block_hash)
19351946

19361947
async def _get_block_runtime_version_for(self, block_hash: str):
19371948
"""
@@ -2149,14 +2160,14 @@ async def _make_rpc_request(
21492160
and current_time - self.ws.last_sent >= self.retry_timeout
21502161
):
21512162
if attempt >= self.max_retries:
2152-
logger.warning(
2163+
logger.error(
21532164
f"Timed out waiting for RPC requests {attempt} times. Exiting."
21542165
)
21552166
raise MaxRetriesExceeded("Max retries reached.")
21562167
else:
21572168
self.ws.last_received = time.time()
21582169
await self.ws.connect(force=True)
2159-
logger.error(
2170+
logger.warning(
21602171
f"Timed out waiting for RPC requests. "
21612172
f"Retrying attempt {attempt + 1} of {self.max_retries}"
21622173
)
@@ -2240,9 +2251,8 @@ async def rpc_request(
22402251
else:
22412252
raise SubstrateRequestException(result[payload_id][0])
22422253

2243-
@a.lru_cache(maxsize=512)
22442254
async def get_block_hash(self, block_id: int) -> str:
2245-
return await self._get_block_hash(block_id)
2255+
return await self._block_hash_fetcher.execute(block_id)
22462256

22472257
async def _get_block_hash(self, block_id: int) -> str:
22482258
return (await self.rpc_request("chain_getBlockHash", [block_id]))["result"]

async_substrate_interface/sync_substrate.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2497,13 +2497,13 @@ def runtime_call(
24972497
Returns:
24982498
ScaleType from the runtime call
24992499
"""
2500-
self.init_runtime(block_hash=block_hash)
2500+
runtime = self.init_runtime(block_hash=block_hash)
25012501

25022502
if params is None:
25032503
params = {}
25042504

25052505
try:
2506-
metadata_v15_value = self.runtime.metadata_v15.value()
2506+
metadata_v15_value = runtime.metadata_v15.value()
25072507

25082508
apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]}
25092509
api_entry = apis[api]

async_substrate_interface/utils/cache.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
import asyncio
2+
from collections import OrderedDict
13
import functools
24
import os
35
import pickle
46
import sqlite3
57
from pathlib import Path
8+
from typing import Callable, Any
9+
610
import asyncstdlib as a
711

12+
813
USE_CACHE = True if os.getenv("NO_CACHE") != "1" else False
914
CACHE_LOCATION = (
1015
os.path.expanduser(
@@ -139,3 +144,54 @@ async def inner(self, *args, **kwargs):
139144
return inner
140145

141146
return decorator
147+
148+
149+
class LRUCache:
150+
def __init__(self, max_size: int):
151+
self.max_size = max_size
152+
self.cache = OrderedDict()
153+
154+
def set(self, key, value):
155+
if key in self.cache:
156+
self.cache.move_to_end(key)
157+
self.cache[key] = value
158+
if len(self.cache) > self.max_size:
159+
self.cache.popitem(last=False)
160+
161+
def get(self, key):
162+
if key in self.cache:
163+
# Mark as recently used
164+
self.cache.move_to_end(key)
165+
return self.cache[key]
166+
return None
167+
168+
169+
class CachedFetcher:
170+
def __init__(self, max_size: int, method: Callable):
171+
self._inflight: dict[int, asyncio.Future] = {}
172+
self._method = method
173+
self._cache = LRUCache(max_size=max_size)
174+
175+
async def execute(self, single_arg: Any) -> str:
176+
if item := self._cache.get(single_arg):
177+
return item
178+
179+
if single_arg in self._inflight:
180+
result = await self._inflight[single_arg]
181+
return result
182+
183+
loop = asyncio.get_running_loop()
184+
future = loop.create_future()
185+
self._inflight[single_arg] = future
186+
187+
try:
188+
result = await self._method(single_arg)
189+
self._cache.set(single_arg, result)
190+
future.set_result(result)
191+
return result
192+
except Exception as e:
193+
# Propagate errors
194+
future.set_exception(e)
195+
raise
196+
finally:
197+
self._inflight.pop(single_arg, None)

0 commit comments

Comments
 (0)