-
-
Notifications
You must be signed in to change notification settings - Fork 973
support redis cluster transport #2204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
support redis cluster transport #2204
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2204 +/- ##
==========================================
+ Coverage 81.60% 81.89% +0.28%
==========================================
Files 77 78 +1
Lines 9540 9963 +423
Branches 1162 1238 +76
==========================================
+ Hits 7785 8159 +374
- Misses 1563 1591 +28
- Partials 192 213 +21 ☔ View full report in Codecov by Sentry. |
auvipy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets see if the CI passes
dca3e75 to
9983557
Compare
f4d275a to
f5c7356
Compare
c87be51 to
6986510
Compare
|
Only one test is failing, due to connection failure |
|
Check linter error :) |
|
Sorry for the failure, I will fix it and improve the test coverage. |
|
I am following it and already had reviewed it twice. Will have an in depth review again tomorrow. No worries. Thanks for picking my work |
Added docs for kombu.transport.rediscluster, now I'm sure there won't be problems anymore |
Nusnus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally that everything passes on the CI, we can properly review the code.
That being said, we’re currently in a release phase so we can’t merge it until we complete the release, after the new year holidays.
Good work and thank you for fixing everything.
|
Would this also support Redis cluster for backend? |
|
|
||
| conn_params['connection_pool_class'] = ManagedConnectionPool | ||
|
|
||
| conn_params['url'] = f'redis://{conn_params["host"]}:{conn_params["port"]}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zs-neo thanks for all of your work on this! I'm testing out this PR right now and noticed that TLS support is broken. it looks as though it might be as simple as using the rediss scheme here with the ssl logic above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very sorry for the late reply. Thank you for your attention to this PR! I agree with you and I will add support and testing for TLS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you add the test for TLS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds native support for Redis Cluster as a transport backend in Kombu, including key prefixing and cluster-aware polling.
- Introduces a new
rediscluster.pytransport module with cluster-aware clients, pipelining, pub/sub, and QoS. - Adds comprehensive unit tests for cluster behavior under failover, key prefixing, and polling.
- Updates package exports and documentation to include the new
redisclustertransport.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| t/unit/transport/test_rediscluster.py | New unit tests covering cluster transport behaviors |
| kombu/transport/rediscluster.py | Cluster-aware transport implementation |
| kombu/transport/init.py | Registers rediscluster in the transport mapping |
| docs/reference/kombu.transport.rediscluster.rst | Documentation stub for the rediscluster transport |
Comments suppressed due to low confidence (3)
t/unit/transport/test_rediscluster.py:1161
- [nitpick] Using
setas a variable name shadows the built-inset. Consider renaming this mock to avoid confusion (e.g.,mock_set).
set = client.set = Mock()
kombu/transport/rediscluster.py:651
- Catching
Exceptionmay hide unexpected errors. Restrict the except clause to anticipated exception types (e.g.,redis.exceptions.RedisError).
def _do_restore_message(self, payload, exchange, routing_key,
kombu/transport/rediscluster.py:301
- Parameter name
typeshadows the built-intype. Rename it to something likeevent_typefor clarity.
def _register(self, channel, client, conn, type):
|
it is scheduled for v5.7 release |
| target_node.redis_connection = None | ||
| self.client.nodes_manager.initialize() | ||
| raise | ||
| except MovedError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During a master-replica transition, for example, in a cluster with a total of 6 nodes (3 master nodes and 3 replica nodes), when a node undergoes a master-replica switch, an error will occur in _brpop_read.
[2025-08-06 11:38:40,268: CRITICAL/MainProcess] Unrecoverable error: ResponseError('UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)')
Traceback (most recent call last):
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 341, in start
blueprint.start(self)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\consumer\consumer.py", line 772, in start
c.loop(*c.loop_args())
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 143, in synloop
_loop_cycle()
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\celery\worker\loops.py", line 132, in _loop_cycle
connection.drain_events(timeout=2.0)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\virtual\base.py", line 997, in drain_events
get(self._deliver, timeout=timeout)
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\kombu\transport\redis.py", line 598, in get
ret = self.handle_event(fileno, event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 379, in handle_event
return self.on_readable(fileno), self
^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 352, in on_readable
chan.handlers[type](**{'conn': conn})
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\celery_demo\kombu_rediscluster.py", line 512, in _brpop_read
dest__item = conn.read_response('BRPOP', **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\mine\workspace\my-demo\python\celery-redis-cluster-demo\.venv\Lib\site-packages\redis\connection.py", line 666, in read_response
raise response
redis.exceptions.ResponseError: UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rfenner we need this to be addressed
|
It would be nice to see this land sooon as I'm trying to use AWS serverless valkey and it runs in a cluster mode so Celery won't run on it. It's going to become more common for people to want to run their projects on serverless versions since they tend to offer scalable storage and not a fixed amount like you would get when you use a specific instance type. I guess my workaround for now will be to spin up a normal version just for celery to use. |
for more information, see https://pre-commit.ci
Co-authored-by: bashir-abdelwahed <51696104+bashir-abdelwahed@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
5e5de4c to
134a2bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
kombu/transport/rediscluster.py:1
- Typo in variable name: 'disconect' should be 'disconnect'
"""Redis cluster transport module for Kombu.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Hello team, any update on this one? Looks like it's very close to get shipped to next release? |
|
@auvipy Looks like it's a documentation issue, is it transient, or is there something else needed? |
|
this isslated for v5.7.0 release |
auvipy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=================================== FAILURES ===================================
_____________________ test_Channel.test_prefixed_pipeline ______________________
self = <t.unit.transport.test_rediscluster.test_Channel object at 0x7f074f0af7c0>
mock_initialize =
mock_execute_command =
@patch("redis.cluster.RedisCluster.execute_command")
@patch('redis.cluster.NodesManager.initialize')
def test_prefixed_pipeline(self, mock_initialize, mock_execute_command):
client = redis.PrefixedStrictRedis(global_keyprefix='foo_', startup_nodes=[ClusterNode()])
pipeline = client.pipeline()
t/unit/transport/test_rediscluster.py:614:
self = <kombu.transport.rediscluster.PrefixedStrictRedis object at 0x7f0736200ca0>
transaction = False, shard_hint = None
Restoring 2 unacknowledged message(s)
def pipeline(self, transaction=False, shard_hint=None):
if shard_hint:
raise RedisClusterException("shard_hint is deprecated in cluster mode")
if transaction:
raise RedisClusterException("transaction is deprecated in cluster mode")
return PrefixedRedisPipeline(
nodes_manager=self.nodes_manager,
commands_parser=self.commands_parser,
startup_nodes=self.nodes_manager.startup_nodes,
result_callbacks=self.result_callbacks,
cluster_response_callbacks=self.cluster_response_callbacks,
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
read_from_replicas=self.read_from_replicas,
reinitialize_steps=self.reinitialize_steps,
lock=self._lock,
global_keyprefix=self.global_keyprefix,
)
E AttributeError: 'PrefixedStrictRedis' object has no attribute 'cluster_error_retry_attempts'
kombu/transport/rediscluster.py:133: AttributeError
=============================== warnings summary ===============================
.tox/3.9-unit/lib/python3.9/site-packages/coverage/inorout.py:473
/home/runner/_work/kombu/kombu/.tox/3.9-unit/lib/python3.9/site-packages/coverage/inorout.py:473: CoverageWarning: --include is ignored because --source is set (include-ignored); see https://coverage.readthedocs.io/en/7.10.7/messages.html#warning-include-ignored
self.warn("--include is ignored because --source is set", slug="include-ignored")
.tox/3.9-unit/lib/python3.9/site-packages/google/api_core/_python_version_support.py:252
/home/runner/_work/kombu/kombu/.tox/3.9-unit/lib/python3.9/site-packages/google/api_core/_python_version_support.py:252: FutureWarning: You are using a Python version (3.9.23) past its end of life. Google will update google.api_core with critical bug fixes on a best-effort basis, but not with any other fixes or features. Please upgrade to the latest Python version, or at least Python 3.10, and then update google.api_core.
warnings.warn(message, FutureWarning)


Attempt to address #1021
Thank you very much for your code, it helps us a lot. @auvipy
We use redis-py instead of redis-py-cluster because redis-py-cluster has been merged into redis-py.
Celery works fine on our cluster with multi producers and multi consumers, when a node goes down, it can automatically switch.