Skip to content

RESP3 response callbacks #2798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
response callbacks
  • Loading branch information
dvora-h committed Jun 14, 2023
commit c08d0acd3c58df1be7089582be60e0849e0d422a
2 changes: 1 addition & 1 deletion redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ async def on_connect(self) -> None:
raise AuthenticationError("Invalid Username or Password")

# if resp version is specified, switch to it
elif self.protocol != 2:
elif self.protocol not in [2, "2"]:
if isinstance(self._parser, _AsyncRESP2Parser):
self.set_parser(_AsyncRESP3Parser)
# update cluster exception classes
Expand Down
69 changes: 31 additions & 38 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def parse_set_result(response, **options):

class AbstractRedis:
RESPONSE_CALLBACKS = {
**string_keys_to_dict("EXPIRE EXPIREAT PEXPIRE PEXPIREAT", bool),
**string_keys_to_dict("EXPIRE EXPIREAT PEXPIRE PEXPIREAT AUTH", bool),
**string_keys_to_dict("EXISTS", int),
**string_keys_to_dict("INCRBYFLOAT HINCRBYFLOAT", float),
**string_keys_to_dict("READONLY", bool_ok),
Expand Down Expand Up @@ -785,86 +785,82 @@ class AbstractRedis:
**string_keys_to_dict("XREAD XREADGROUP", parse_xread),
"COMMAND GETKEYS": lambda r: list(map(str_if_bytes, r)),
**string_keys_to_dict("SORT", sort_return_tuples),
"PING": lambda r: str_if_bytes(r) == "PONG",
"ACL SETUSER": bool_ok,
"PUBSUB NUMSUB": parse_pubsub_numsub,
"SCRIPT FLUSH": bool_ok,
"SCRIPT LOAD": str_if_bytes,
"ACL GETUSER": parse_acl_getuser,
"CONFIG SET": bool_ok,
**string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
"XCLAIM": parse_xclaim,

}

RESP2_RESPONSE_CALLBACKS = {
"CONFIG GET": parse_config_get,
**string_keys_to_dict(
"SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
),
**string_keys_to_dict(
"ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
"ZREVRANGE ZREVRANGEBYSCORE",
zset_score_pairs,
),
**string_keys_to_dict("ZSCORE ZINCRBY", float_or_none),
"ZADD": parse_zadd,
"ZMSCORE": parse_zmscore,
"HGETALL": lambda r: r and pairs_to_dict(r) or {},
"MEMORY STATS": parse_memory_stats,
"MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],

# **string_keys_to_dict(
# "AUTH COPY "
# "COPY "
# "HEXISTS HMSET MOVE MSETNX PERSIST "
# "PSETEX RENAMENX SISMEMBER SMOVE SETEX SETNX",
# bool,
# ),
# **string_keys_to_dict(
# "BITCOUNT BITPOS DECRBY DEL EXISTS GEOADD GETBIT HDEL HLEN "
# "HSTRLEN INCRBY LINSERT LLEN LPUSHX PFADD PFCOUNT RPUSHX SADD "
# "SCARD SDIFFSTORE SETBIT SETRANGE SINTERSTORE SREM STRLEN "
# "SUNIONSTORE UNLINK XACK XDEL XLEN XTRIM ZCARD ZLEXCOUNT ZREM "
# "ZREMRANGEBYLEX ZREMRANGEBYRANK ZREMRANGEBYSCORE",
# int,
# ),
# **string_keys_to_dict(
# # these return OK, or int if redis-server is >=1.3.4
# "LPUSH RPUSH",
# lambda r: isinstance(r, int) and r or str_if_bytes(r) == "OK",
# ),
# **string_keys_to_dict("ZSCORE ZINCRBY", float_or_none),
# **string_keys_to_dict(
# "FLUSHALL FLUSHDB LSET LTRIM MSET PFMERGE ASKING READWRITE "
# "RENAME SAVE SELECT SHUTDOWN SLAVEOF SWAPDB WATCH UNWATCH ",
# bool_ok,
# ),
# **string_keys_to_dict(
# "SDIFF SINTER SMEMBERS SUNION", lambda r: r and set(r) or set()
# ),
# **string_keys_to_dict(
# "ZPOPMAX ZPOPMIN ZINTER ZDIFF ZUNION ZRANGE ZRANGEBYSCORE "
# "ZREVRANGE ZREVRANGEBYSCORE",
# zset_score_pairs,
# ),
# **string_keys_to_dict("ZRANK ZREVRANK", int_or_none),
# **string_keys_to_dict("XREVRANGE XRANGE", parse_stream_list),
# **string_keys_to_dict("BGREWRITEAOF BGSAVE", lambda r: True),
# "ACL DELUSER": int,
# "ACL GETUSER": parse_acl_getuser,
# "ACL HELP": lambda r: list(map(str_if_bytes, r)),
# "ACL LIST": lambda r: list(map(str_if_bytes, r)),
# "ACL LOAD": bool_ok,
# "ACL SAVE": bool_ok,
# "ACL SETUSER": bool_ok,
# "ACL USERS": lambda r: list(map(str_if_bytes, r)),
# "CLIENT UNBLOCK": lambda r: r and int(r) == 1 or False,
# "CLIENT PAUSE": bool_ok,
# "CLIENT GETREDIR": int,
# "CLUSTER ADDSLOTSRANGE": bool_ok,
# "CLUSTER DELSLOTSRANGE": bool_ok,
# "CLUSTER GETKEYSINSLOT": lambda r: list(map(str_if_bytes, r)),
# "CLUSTER REPLICAS": parse_cluster_nodes,
# "CLUSTER SET-CONFIG-EPOCH": bool_ok,
# "COMMAND COUNT": int,
# "CONFIG GET": parse_config_get,
# "CONFIG RESETSTAT": bool_ok,
# "CONFIG SET": bool_ok,
# "DEBUG OBJECT": parse_debug_object,
# "FUNCTION DELETE": bool_ok,
# "FUNCTION FLUSH": bool_ok,
# "FUNCTION RESTORE": bool_ok,
# "HGETALL": lambda r: r and pairs_to_dict(r) or {},
# "MEMORY PURGE": bool_ok,
# "MEMORY STATS": parse_memory_stats,
# "MEMORY USAGE": int_or_none,
# "MODULE LOAD": parse_module_result,
# "MODULE UNLOAD": parse_module_result,
# "MODULE LIST": lambda r: [pairs_to_dict(m) for m in r],
# "OBJECT": parse_object,
# "PING": lambda r: str_if_bytes(r) == "PONG",
# "QUIT": bool_ok,
# "STRALGO": parse_stralgo,
# "PUBSUB NUMSUB": parse_pubsub_numsub,
# "RANDOMKEY": lambda r: r and r or None,
# "SCRIPT EXISTS": lambda r: list(map(bool, r)),
# "SCRIPT FLUSH": bool_ok,
# "SCRIPT KILL": bool_ok,
# "SCRIPT LOAD": str_if_bytes,
# "SENTINEL CKQUORUM": bool_ok,
# "SENTINEL FAILOVER": bool_ok,
# "SENTINEL FLUSHCONFIG": bool_ok,
Expand All @@ -877,17 +873,12 @@ class AbstractRedis:
# "SENTINEL SENTINELS": parse_sentinel_slaves_and_sentinels,
# "SENTINEL SET": bool_ok,
# "SENTINEL SLAVES": parse_sentinel_slaves_and_sentinels,
# "SLOWLOG LEN": int,
# "SLOWLOG RESET": bool_ok,
# "XCLAIM": parse_xclaim,
# "XGROUP CREATE": bool_ok,
# "XGROUP DELCONSUMER": int,
# "XGROUP DESTROY": bool,
# "XGROUP SETID": bool_ok,
# "XINFO CONSUMERS": parse_list_of_dicts,
# "XINFO GROUPS": parse_list_of_dicts,
# "ZADD": parse_zadd,
# "ZMSCORE": parse_zmscore,
"XINFO GROUPS": parse_list_of_dicts,
}

RESP3_RESPONSE_CALLBACKS = {
Expand Down Expand Up @@ -1128,6 +1119,8 @@ def __init__(

if self.connection_pool.connection_kwargs.get("protocol") in ["3", 3]:
self.response_callbacks.update(self.__class__.RESP3_RESPONSE_CALLBACKS)
else:
self.response_callbacks.update(self.__class__.RESP2_RESPONSE_CALLBACKS)

def __repr__(self):
return f"{type(self).__name__}<{repr(self.connection_pool)}>"
Expand Down
4 changes: 2 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def on_connect(self):
auth_args = cred_provider.get_credentials()
# if resp version is specified and we have auth args,
# we need to send them via HELLO
if auth_args and self.protocol != 2:
if auth_args and self.protocol not in [2, "2"]:
if isinstance(self._parser, _RESP2Parser):
self.set_parser(_RESP3Parser)
# update cluster exception classes
Expand Down Expand Up @@ -321,7 +321,7 @@ def on_connect(self):
raise AuthenticationError("Invalid Username or Password")

# if resp version is specified, switch to it
elif self.protocol != 2:
elif self.protocol not in [2, "2"]:
if isinstance(self._parser, _RESP2Parser):
self.set_parser(_RESP3Parser)
# update cluster exception classes
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

REDIS_INFO = {}
default_redis_url = "redis://localhost:6379/0"
default_redismod_url = "redis://localhost:36379"
default_redismod_url = "redis://localhost:6379"
default_redis_unstable_url = "redis://localhost:6378"

# default ssl client ignores verification for the purpose of testing
Expand Down
21 changes: 10 additions & 11 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_response_callbacks(self, r):
assert r["a"] == "static"

def test_case_insensitive_command_names(self, r):
assert r.response_callbacks["del"] == r.response_callbacks["DEL"]
assert r.response_callbacks["ping"] == r.response_callbacks["PING"]


class TestRedisCommands:
Expand Down Expand Up @@ -152,9 +152,8 @@ def teardown():

r.acl_setuser(username, keys=["*"], commands=["+set"])
assert r.acl_dryrun(username, "set", "key", "value") == b"OK"
assert r.acl_dryrun(username, "get", "key").startswith(
b"This user has no permissions to run the"
)
no_permissions_message = b"user has no permissions to run the"
assert no_permissions_message in r.acl_dryrun(username, "get", "key")

@skip_if_server_version_lt("6.0.0")
@skip_if_redis_enterprise()
Expand Down Expand Up @@ -232,12 +231,12 @@ def teardown():
enabled=True,
reset=True,
passwords=["+pass1", "+pass2"],
categories=["+set", "+@hash", "-geo"],
categories=["+set", "+@hash", "-@geo"],
commands=["+get", "+mget", "-hset"],
keys=["cache:*", "objects:*"],
)
acl = r.acl_getuser(username)
assert set(acl["categories"]) == {"-@all", "+@set", "+@hash"}
assert set(acl["categories"]) == {"-@all", "+@set", "+@hash", "-@geo"}
assert set(acl["commands"]) == {"+get", "+mget", "-hset"}
assert acl["enabled"] is True
assert "on" in acl["flags"]
Expand Down Expand Up @@ -315,7 +314,7 @@ def teardown():
selectors=[("+set", "%W~app*")],
)
acl = r.acl_getuser(username)
assert set(acl["categories"]) == {"-@all", "+@set", "+@hash"}
assert set(acl["categories"]) == {"-@all", "+@set", "+@hash", "-@geo"}
assert set(acl["commands"]) == {"+get", "+mget", "-hset"}
assert acl["enabled"] is True
assert "on" in acl["flags"]
Expand All @@ -325,7 +324,7 @@ def teardown():
assert_resp_response(
r,
acl["selectors"],
["commands", "-@all +set", "keys", "%W~app*", "channels", ""],
[["commands", "-@all +set", "keys", "%W~app*", "channels", ""]],
[{"commands": "-@all +set", "keys": "%W~app*", "channels": ""}],
)

Expand Down Expand Up @@ -4214,7 +4213,7 @@ def test_xgroup_setid(self, r):
]
assert r.xinfo_groups(stream) == expected

@skip_if_server_version_lt("5.0.0")
@skip_if_server_version_lt("7.2.0")
def test_xinfo_consumers(self, r):
stream = "stream"
group = "group"
Expand All @@ -4230,8 +4229,8 @@ def test_xinfo_consumers(self, r):
info = r.xinfo_consumers(stream, group)
assert len(info) == 2
expected = [
{"name": consumer1.encode(), "pending": 1},
{"name": consumer2.encode(), "pending": 2},
{"name": consumer1.encode(), "pending": 1, "inactive": 2},
{"name": consumer2.encode(), "pending": 2, "inactive": 2},
]

# we can't determine the idle time, so just make sure it's an int
Expand Down