Skip to content

Commit 6c52747

Browse files
committed
feat: support valkey
1 parent efd1247 commit 6c52747

File tree

2 files changed

+83
-23
lines changed

2 files changed

+83
-23
lines changed

src/socketio/async_redis_manager.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import pickle
3+
from urllib.parse import urlparse
34

45
try: # pragma: no cover
56
from redis import asyncio as aioredis
@@ -12,6 +13,15 @@
1213
aioredis = None
1314
RedisError = None
1415

16+
try: # pragma: no cover
17+
from valkey import asyncio as valkey
18+
from valkey.exceptions import ValkeyError
19+
except ImportError: # pragma: no cover
20+
valkey = None
21+
ValkeyError = None
22+
23+
24+
1525
from .async_pubsub_manager import AsyncPubSubManager
1626
from .redis_manager import parse_redis_sentinel_url
1727

@@ -47,38 +57,58 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
4757

4858
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
4959
write_only=False, logger=None, redis_options=None):
50-
if aioredis is None:
60+
if aioredis is None and valkey is None:
5161
raise RuntimeError('Redis package is not installed '
52-
'(Run "pip install redis" in your virtualenv).')
53-
if not hasattr(aioredis.Redis, 'from_url'):
62+
'(Run "pip install redis" or "pip install valkey" '
63+
'in your virtualenv).')
64+
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
5465
raise RuntimeError('Version 2 of aioredis package is required.')
5566
super().__init__(channel=channel, write_only=write_only, logger=logger)
5667
self.redis_url = url
5768
self.redis_options = redis_options or {}
5869
self._redis_connect()
5970

71+
def _get_redis_module_and_error(self):
72+
parsed_url = urlparse(self.redis_url)
73+
schema = parsed_url.scheme.split('+', 1)[0].lower()
74+
if schema == 'redis':
75+
if aioredis is None or RedisError is None:
76+
raise RuntimeError('Redis package is not installed '
77+
'(Run "pip install redis" in your virtualenv).')
78+
return aioredis, RedisError
79+
if schema == 'valkey':
80+
if valkey is None or ValkeyError is None:
81+
raise RuntimeError('Valkey package is not installed '
82+
'(Run "pip install valkey" in your virtualenv).')
83+
return valkey, ValkeyError
84+
error_msg = f'Unsupported Redis URL schema: {schema}'
85+
raise ValueError(error_msg)
86+
6087
def _redis_connect(self):
61-
if not self.redis_url.startswith('redis+sentinel://'):
62-
self.redis = aioredis.Redis.from_url(self.redis_url,
63-
**self.redis_options)
64-
else:
88+
module, _ = self._get_redis_module_and_error()
89+
parsed_url = urlparse(self.redis_url)
90+
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
6591
sentinels, service_name, connection_kwargs = \
6692
parse_redis_sentinel_url(self.redis_url)
6793
kwargs = self.redis_options
6894
kwargs.update(connection_kwargs)
69-
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
95+
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
7096
self.redis = sentinel.master_for(service_name or self.channel)
97+
else:
98+
self.redis = module.Redis.from_url(self.redis_url,
99+
**self.redis_options)
71100
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
72101

73102
async def _publish(self, data):
74103
retry = True
104+
_, error = self._get_redis_module_and_error()
75105
while True:
76106
try:
77107
if not retry:
78108
self._redis_connect()
79109
return await self.redis.publish(
80110
self.channel, pickle.dumps(data))
81-
except RedisError:
111+
except error:
82112
if retry:
83113
self._get_logger().error('Cannot publish to redis... '
84114
'retrying')
@@ -91,6 +121,7 @@ async def _publish(self, data):
91121
async def _redis_listen_with_retries(self):
92122
retry_sleep = 1
93123
connect = False
124+
_, error = self._get_redis_module_and_error()
94125
while True:
95126
try:
96127
if connect:
@@ -99,10 +130,10 @@ async def _redis_listen_with_retries(self):
99130
retry_sleep = 1
100131
async for message in self.pubsub.listen():
101132
yield message
102-
except RedisError:
133+
except error:
103134
self._get_logger().error('Cannot receive from redis... '
104135
'retrying in '
105-
'{} secs'.format(retry_sleep))
136+
f'{retry_sleep} secs')
106137
connect = True
107138
await asyncio.sleep(retry_sleep)
108139
retry_sleep *= 2

src/socketio/redis_manager.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,17 @@
55

66
try:
77
import redis
8+
from redis.exceptions import RedisError
89
except ImportError:
910
redis = None
11+
RedisError = None
12+
13+
try:
14+
import valkey
15+
from valkey.exceptions import ValkeyError
16+
except ImportError:
17+
valkey = None
18+
ValkeyError = None
1019

1120
from .pubsub_manager import PubSubManager
1221

@@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url):
1827
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
1928
"""
2029
parsed_url = urlparse(url)
21-
if parsed_url.scheme != 'redis+sentinel':
30+
if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}:
2231
raise ValueError('Invalid Redis Sentinel URL')
2332
sentinels = []
2433
for host_port in parsed_url.netloc.split('@')[-1].split(','):
@@ -71,10 +80,10 @@ class RedisManager(PubSubManager): # pragma: no cover
7180

7281
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
7382
write_only=False, logger=None, redis_options=None):
74-
if redis is None:
83+
if redis is None and valkey is None:
7584
raise RuntimeError('Redis package is not installed '
76-
'(Run "pip install redis" in your '
77-
'virtualenv).')
85+
'(Run "pip install redis" or "pip install valkey" '
86+
'in your virtualenv).')
7887
super().__init__(channel=channel, write_only=write_only, logger=logger)
7988
self.redis_url = url
8089
self.redis_options = redis_options or {}
@@ -95,27 +104,46 @@ def initialize(self):
95104
'Redis requires a monkey patched socket library to work '
96105
'with ' + self.server.async_mode)
97106

107+
def _get_redis_module_and_error(self):
108+
parsed_url = urlparse(self.redis_url)
109+
schema = parsed_url.scheme.split('+', 1)[0].lower()
110+
if schema == 'redis':
111+
if redis is None or RedisError is None:
112+
raise RuntimeError('Redis package is not installed '
113+
'(Run "pip install redis" in your virtualenv).')
114+
return redis, RedisError
115+
if schema == 'valkey':
116+
if valkey is None or ValkeyError is None:
117+
raise RuntimeError('Valkey package is not installed '
118+
'(Run "pip install valkey" in your virtualenv).')
119+
return valkey, ValkeyError
120+
error_msg = f'Unsupported Redis URL schema: {schema}'
121+
raise ValueError(error_msg)
122+
98123
def _redis_connect(self):
99-
if not self.redis_url.startswith('redis+sentinel://'):
100-
self.redis = redis.Redis.from_url(self.redis_url,
101-
**self.redis_options)
102-
else:
124+
module, _ = self._get_redis_module_and_error()
125+
parsed_url = urlparse(self.redis_url)
126+
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
103127
sentinels, service_name, connection_kwargs = \
104128
parse_redis_sentinel_url(self.redis_url)
105129
kwargs = self.redis_options
106130
kwargs.update(connection_kwargs)
107-
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
131+
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
108132
self.redis = sentinel.master_for(service_name or self.channel)
133+
else:
134+
self.redis = module.Redis.from_url(self.redis_url,
135+
**self.redis_options)
109136
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
110137

111138
def _publish(self, data):
112139
retry = True
140+
_, error = self._get_redis_module_and_error()
113141
while True:
114142
try:
115143
if not retry:
116144
self._redis_connect()
117145
return self.redis.publish(self.channel, pickle.dumps(data))
118-
except redis.exceptions.RedisError:
146+
except error:
119147
if retry:
120148
logger.error('Cannot publish to redis... retrying')
121149
retry = False
@@ -126,16 +154,17 @@ def _publish(self, data):
126154
def _redis_listen_with_retries(self):
127155
retry_sleep = 1
128156
connect = False
157+
_, error = self._get_redis_module_and_error()
129158
while True:
130159
try:
131160
if connect:
132161
self._redis_connect()
133162
self.pubsub.subscribe(self.channel)
134163
retry_sleep = 1
135164
yield from self.pubsub.listen()
136-
except redis.exceptions.RedisError:
165+
except error:
137166
logger.error('Cannot receive from redis... '
138-
'retrying in {} secs'.format(retry_sleep))
167+
f'retrying in {retry_sleep} secs')
139168
connect = True
140169
time.sleep(retry_sleep)
141170
retry_sleep *= 2

0 commit comments

Comments
 (0)