Skip to content

StreamingCredentialProvider support #3445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 57 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
0a8f770
Added StreamingCredentialProvider interface
vladvildanov Nov 19, 2024
8272c73
StreamingCredentialProvider support
vladvildanov Nov 27, 2024
7021c7b
Removed debug statement
vladvildanov Nov 27, 2024
400ba2a
Changed an approach to handle multiple connection pools
vladvildanov Dec 3, 2024
9561032
Added support for RedisCluster
vladvildanov Dec 3, 2024
dfcd488
Merge branch 'master' of github.com:redis/redis-py into vv-tba-support
vladvildanov Dec 4, 2024
833968d
Added dispatching of custom connection pool
vladvildanov Dec 4, 2024
3848b57
Extended CredentialProvider interface with async API
vladvildanov Dec 5, 2024
fa9bc3c
Changed method implementation
vladvildanov Dec 5, 2024
1776679
Added support for async API
vladvildanov Dec 9, 2024
87a1ffa
Removed unused lock
vladvildanov Dec 9, 2024
24714ae
Added async API
vladvildanov Dec 10, 2024
0327f36
Merge branch 'master' of github.com:redis/redis-py into vv-tba-support
vladvildanov Dec 10, 2024
6dae71b
Added support for single connection client
vladvildanov Dec 11, 2024
32fc374
Added core functionality
vladvildanov Dec 11, 2024
c2eef78
Revert debug call
vladvildanov Dec 11, 2024
1a1b211
Added package to setup.py
vladvildanov Dec 11, 2024
974ad4f
Added handling of in-use connections
vladvildanov Dec 12, 2024
66a53ea
Added testing
vladvildanov Dec 12, 2024
2cad8b0
Changed fixture name
vladvildanov Dec 12, 2024
7eb6600
Added marker
vladvildanov Dec 12, 2024
5facdae
Marked tests with correct annotations
vladvildanov Dec 13, 2024
ee2ce1a
Added better cancelation handling
vladvildanov Dec 13, 2024
835ede7
Removed another annotation
vladvildanov Dec 16, 2024
e14d680
Added support for async cluster
vladvildanov Dec 16, 2024
90204e7
Added pipeline tests
vladvildanov Dec 17, 2024
0de0f4d
Added support for Pub/Sub
vladvildanov Dec 17, 2024
46e2f94
Added support for Pub/Sub in cluster
vladvildanov Dec 18, 2024
5488726
Added an option to parse endpoint from endpoints.json
vladvildanov Dec 18, 2024
76e9dea
Updated package names and ENV variables
vladvildanov Dec 18, 2024
b697e27
Moved SSL certificates code into context of class
vladvildanov Dec 19, 2024
c24ab17
Fixed fixtures for async
vladvildanov Dec 19, 2024
68ebdee
Fixed test
vladvildanov Dec 19, 2024
98fa92f
Added better endpoitns handling
vladvildanov Dec 20, 2024
e84d77a
Changed variable names
vladvildanov Dec 20, 2024
4ccd380
Added logging
vladvildanov Dec 20, 2024
6e7ad70
Fixed broken tests
vladvildanov Dec 20, 2024
a9c200c
Added TODO for SSL tests
vladvildanov Dec 20, 2024
4527bf0
Added error propagation to main thread
vladvildanov Dec 20, 2024
ac1164e
Added single connection lock
vladvildanov Dec 20, 2024
96aeb68
Codestyle fixes
vladvildanov Dec 20, 2024
9cada36
Added missing methods
vladvildanov Dec 20, 2024
92356bb
Removed wrong annotation
vladvildanov Dec 20, 2024
bd89ff8
Fixed tests
vladvildanov Dec 20, 2024
fcfdcb8
Codestyle fix
vladvildanov Dec 20, 2024
063f0d5
Updated EventListener instantiation inside of class
vladvildanov Dec 20, 2024
b15358b
Fixed variable name
vladvildanov Dec 20, 2024
e691162
Fixed variable names
vladvildanov Dec 20, 2024
ce1e10c
Fixed variable name
vladvildanov Dec 20, 2024
5de68a6
Added EventException
vladvildanov Dec 20, 2024
2851a7c
Codestyle fix
vladvildanov Dec 20, 2024
87c4e7e
Removed redundant code
vladvildanov Dec 20, 2024
d890193
Codestyle fix
vladvildanov Dec 20, 2024
04f3511
Updated test case
vladvildanov Dec 20, 2024
67f1d13
Fixed tests
vladvildanov Dec 20, 2024
c3d099d
Fixed test
vladvildanov Dec 20, 2024
a7233b0
Removed dependency
vladvildanov Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/run-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ runs:

if (( $REDIS_MAJOR_VERSION < 7 )) && [ "$protocol" == "3" ]; then
echo "Skipping module tests: Modules doesn't support RESP3 for Redis versions < 7"
invoke standalone-tests --redis-mod-url=${REDIS_MOD_URL} $eventloop --protocol="${protocol}" --extra-markers="not redismod"
invoke standalone-tests --redis-mod-url=${REDIS_MOD_URL} $eventloop --protocol="${protocol}" --extra-markers="not redismod and not cp_integration"
else
invoke standalone-tests --redis-mod-url=${REDIS_MOD_URL} $eventloop --protocol="${protocol}"
fi
Expand Down
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ uvloop
vulture>=2.3.0
wheel>=0.30.0
numpy>=1.24.0
redispy-entraid-credentials @ git+https://github.com/redis-developer/redispy-entra-credentials.git/@main
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ markers =
asyncio: marker for async tests
replica: replica tests
experimental: run only experimental tests
cp_integration: credential provider integration tests
asyncio_mode = auto
timeout = 30
filterwarnings =
Expand Down
43 changes: 42 additions & 1 deletion redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@
list_or_args,
)
from redis.credentials import CredentialProvider
from redis.event import (
AfterPooledConnectionsInstantiationEvent,
AfterPubSubConnectionInstantiationEvent,
AfterSingleConnectionInstantiationEvent,
ClientType,
EventDispatcher,
)
from redis.exceptions import (
ConnectionError,
ExecAbortError,
Expand Down Expand Up @@ -233,6 +240,7 @@ def __init__(
redis_connect_func=None,
credential_provider: Optional[CredentialProvider] = None,
protocol: Optional[int] = 2,
event_dispatcher: Optional[EventDispatcher] = None,
):
"""
Initialize a new Redis client.
Expand All @@ -242,6 +250,10 @@ def __init__(
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
"""
kwargs: Dict[str, Any]
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
self._event_dispatcher = event_dispatcher
# auto_close_connection_pool only has an effect if connection_pool is
# None. It is assumed that if connection_pool is not None, the user
# wants to manage the connection pool themselves.
Expand Down Expand Up @@ -320,9 +332,19 @@ def __init__(
# This arg only used if no pool is passed in
self.auto_close_connection_pool = auto_close_connection_pool
connection_pool = ConnectionPool(**kwargs)
self._event_dispatcher.dispatch(
AfterPooledConnectionsInstantiationEvent(
[connection_pool], ClientType.ASYNC, credential_provider
)
)
else:
# If a pool is passed in, do not close it
self.auto_close_connection_pool = False
self._event_dispatcher.dispatch(
AfterPooledConnectionsInstantiationEvent(
[connection_pool], ClientType.ASYNC, credential_provider
)
)

self.connection_pool = connection_pool
self.single_connection_client = single_connection_client
Expand Down Expand Up @@ -354,6 +376,12 @@ async def initialize(self: _RedisT) -> _RedisT:
async with self._single_conn_lock:
if self.connection is None:
self.connection = await self.connection_pool.get_connection("_")

self._event_dispatcher.dispatch(
AfterSingleConnectionInstantiationEvent(
self.connection, ClientType.ASYNC, self._single_conn_lock
)
)
return self

def set_response_callback(self, command: str, callback: ResponseCallbackT):
Expand Down Expand Up @@ -521,7 +549,9 @@ def pubsub(self, **kwargs) -> "PubSub":
subscribe to channels and listen for messages that get published to
them.
"""
return PubSub(self.connection_pool, **kwargs)
return PubSub(
self.connection_pool, event_dispatcher=self._event_dispatcher, **kwargs
)

def monitor(self) -> "Monitor":
return Monitor(self.connection_pool)
Expand Down Expand Up @@ -759,7 +789,12 @@ def __init__(
ignore_subscribe_messages: bool = False,
encoder=None,
push_handler_func: Optional[Callable] = None,
event_dispatcher: Optional["EventDispatcher"] = None,
):
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
self._event_dispatcher = event_dispatcher
self.connection_pool = connection_pool
self.shard_hint = shard_hint
self.ignore_subscribe_messages = ignore_subscribe_messages
Expand Down Expand Up @@ -876,6 +911,12 @@ async def connect(self):
if self.push_handler_func is not None and not HIREDIS_AVAILABLE:
self.connection._parser.set_pubsub_push_handler(self.push_handler_func)

self._event_dispatcher.dispatch(
AfterPubSubConnectionInstantiationEvent(
self.connection, self.connection_pool, ClientType.ASYNC, self._lock
)
)

async def _disconnect_raise_connect(self, conn, error):
"""
Close the connection and raise an exception
Expand Down
54 changes: 54 additions & 0 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from redis.asyncio.connection import Connection, DefaultParser, SSLConnection, parse_url
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.auth.token import TokenInterface
from redis.backoff import default_backoff
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
from redis.cluster import (
Expand All @@ -45,6 +46,7 @@
from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
from redis.credentials import CredentialProvider
from redis.event import AfterAsyncClusterInstantiationEvent, EventDispatcher
from redis.exceptions import (
AskError,
BusyLoadingError,
Expand All @@ -57,6 +59,7 @@
MaxConnectionsError,
MovedError,
RedisClusterException,
RedisError,
ResponseError,
SlotNotCoveredError,
TimeoutError,
Expand Down Expand Up @@ -270,6 +273,7 @@ def __init__(
ssl_ciphers: Optional[str] = None,
protocol: Optional[int] = 2,
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
event_dispatcher: Optional[EventDispatcher] = None,
) -> None:
if db:
raise RedisClusterException(
Expand Down Expand Up @@ -366,11 +370,17 @@ def __init__(
if host and port:
startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs))

if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
self._event_dispatcher = event_dispatcher

self.nodes_manager = NodesManager(
startup_nodes,
require_full_coverage,
kwargs,
address_remap=address_remap,
event_dispatcher=self._event_dispatcher,
)
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
self.read_from_replicas = read_from_replicas
Expand Down Expand Up @@ -929,6 +939,8 @@ class ClusterNode:
__slots__ = (
"_connections",
"_free",
"_lock",
"_event_dispatcher",
"connection_class",
"connection_kwargs",
"host",
Expand Down Expand Up @@ -966,6 +978,9 @@ def __init__(

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
if self._event_dispatcher is None:
self._event_dispatcher = EventDispatcher()

def __repr__(self) -> str:
return (
Expand Down Expand Up @@ -1082,10 +1097,38 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:

return ret

async def re_auth_callback(self, token: TokenInterface):
tmp_queue = collections.deque()
while self._free:
conn = self._free.popleft()
await conn.retry.call_with_retry(
lambda: conn.send_command(
"AUTH", token.try_get("oid"), token.get_value()
),
lambda error: self._mock(error),
)
await conn.retry.call_with_retry(
lambda: conn.read_response(), lambda error: self._mock(error)
)
tmp_queue.append(conn)

while tmp_queue:
conn = tmp_queue.popleft()
self._free.append(conn)

async def _mock(self, error: RedisError):
"""
Dummy functions, needs to be passed as error callback to retry object.
:param error:
:return:
"""
pass


class NodesManager:
__slots__ = (
"_moved_exception",
"_event_dispatcher",
"connection_kwargs",
"default_node",
"nodes_cache",
Expand All @@ -1102,6 +1145,7 @@ def __init__(
require_full_coverage: bool,
connection_kwargs: Dict[str, Any],
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
event_dispatcher: Optional[EventDispatcher] = None,
) -> None:
self.startup_nodes = {node.name: node for node in startup_nodes}
self.require_full_coverage = require_full_coverage
Expand All @@ -1113,6 +1157,10 @@ def __init__(
self.slots_cache: Dict[int, List["ClusterNode"]] = {}
self.read_load_balancer = LoadBalancer()
self._moved_exception: MovedError = None
if event_dispatcher is None:
self._event_dispatcher = EventDispatcher()
else:
self._event_dispatcher = event_dispatcher

def get_node(
self,
Expand Down Expand Up @@ -1230,6 +1278,12 @@ async def initialize(self) -> None:
try:
# Make sure cluster mode is enabled on this node
try:
self._event_dispatcher.dispatch(
AfterAsyncClusterInstantiationEvent(
self.nodes_cache,
self.connection_kwargs.get("credential_provider", None),
)
)
cluster_slots = await startup_node.execute_command("CLUSTER SLOTS")
except ResponseError:
raise RedisClusterException(
Expand Down
Loading
Loading