From bf5baa4e7ddacbf5423872aeb7d01b97ac2ec044 Mon Sep 17 00:00:00 2001 From: Daniel M Date: Sat, 14 Sep 2024 18:39:29 -0400 Subject: [PATCH] valkey tests --- poetry.lock | 23 ++++++--- pyproject.toml | 2 +- scheduler/admin/__init__.py | 2 +- .../{redis_models.py => ephemeral_models.py} | 0 scheduler/admin/task_models.py | 8 +-- scheduler/connection_types.py | 19 +++++++ scheduler/decorators.py | 6 +-- scheduler/management/commands/rqworker.py | 5 +- scheduler/models/scheduled_task.py | 4 +- scheduler/queues.py | 17 ++++--- scheduler/rq_classes.py | 25 +++++----- scheduler/settings.py | 50 +++++++++++++------ scheduler/tests/test_job_decorator.py | 10 ++-- scheduler/tests/test_worker.py | 6 +-- scheduler/tools.py | 4 +- scheduler/views.py | 6 +-- 16 files changed, 118 insertions(+), 69 deletions(-) rename scheduler/admin/{redis_models.py => ephemeral_models.py} (100%) create mode 100644 scheduler/connection_types.py diff --git a/poetry.lock b/poetry.lock index b9f5e78..94c636e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1637,16 +1637,23 @@ socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] [[package]] -name = "valkey-py" -version = "0.0.1" -description = "Python interface for valkey key-value store" -optional = true -python-versions = ">=3.7" +name = "valkey" +version = "6.0.1" +description = "Python client for Valkey forked from redis-py" +optional = false +python-versions = ">=3.8" files = [ - {file = "valkey-py-0.0.1.tar.gz", hash = "sha256:7a9e9e60975f7a474682537388be245baa78a6b522b789886470301860b3aaa8"}, - {file = "valkey_py-0.0.1-py3-none-any.whl", hash = "sha256:125fe45bbdd8b75192e2bcac665962ec1cf9a3451fa48cfda0a25b1a8678f130"}, + {file = "valkey-6.0.1-py3-none-any.whl", hash = "sha256:6702bf323e88e50ef0be37aad697bcc6334edd40cc66f01259265dd410fa22dc"}, + {file = "valkey-6.0.1.tar.gz", hash = "sha256:58f4628dc038ab5aa04eea6e75557309c9412a8c45e81ad42d53e42b9a36e7dc"}, ] +[package.dependencies] +async-timeout = {version = ">=4.0.3", markers = "python_version < \"3.11\""} + +[package.extras] +libvalkey = ["libvalkey (>=4.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)"] + [[package]] name = "virtualenv" version = "20.26.4" @@ -1765,4 +1772,4 @@ yaml = ["pyyaml"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "e6909a84bf45075e96b1180df40a5287771a71eecfd982576d0a9430d7e8b375" +content-hash = "f6f35ae29aa4944f12e0261def333a8634cf6e83d7d0c4fd662732691c816520" diff --git a/pyproject.toml b/pyproject.toml index 41c3105..f301a66 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ croniter = ">=2.0" click = "^8.1" rq = "^1.16" pyyaml = { version = "^6.0", optional = true } -valkey-py = { version = "^0.0.1", optional = true } +valkey = "6.0.1" [tool.poetry.dev-dependencies] poetry = "^1.8.3" diff --git a/scheduler/admin/__init__.py b/scheduler/admin/__init__.py index 5b0fa97..237e1c8 100644 --- a/scheduler/admin/__init__.py +++ b/scheduler/admin/__init__.py @@ -1,2 +1,2 @@ from .task_models import TaskAdmin # noqa: F401 -from .redis_models import QueueAdmin, WorkerAdmin # noqa: F401 +from .ephemeral_models import QueueAdmin, WorkerAdmin # noqa: F401 diff --git a/scheduler/admin/redis_models.py b/scheduler/admin/ephemeral_models.py similarity index 100% rename from scheduler/admin/redis_models.py rename to scheduler/admin/ephemeral_models.py diff --git a/scheduler/admin/task_models.py b/scheduler/admin/task_models.py index 0b5640e..96703d0 100644 --- a/scheduler/admin/task_models.py +++ b/scheduler/admin/task_models.py @@ -1,9 +1,11 @@ import redis +import valkey from django.contrib import admin, messages from django.contrib.contenttypes.admin import GenericStackedInline from django.utils.translation import gettext_lazy as _ from scheduler import tools +from scheduler.connection_types import ConnectionErrorType from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask from scheduler.settings import SCHEDULER_CONFIG, logger from scheduler.tools import get_job_executions @@ -185,17 +187,17 @@ def change_view(self, request, object_id, form_url="", extra_context=None): obj = self.get_object(request, object_id) try: execution_list = get_job_executions(obj.queue, obj) - except redis.ConnectionError as e: + except (redis.ConnectionError, valkey.ConnectionError) as e: logger.warn(f"Could not get job executions: {e}") execution_list = list() - paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"]) + paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE) page_number = request.GET.get("p", 1) page_obj = paginator.get_page(page_number) page_range = paginator.get_elided_page_range(page_obj.number) extra.update( { - "pagination_required": paginator.count > SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"], + "pagination_required": paginator.count > SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE, "executions": page_obj, "page_range": page_range, "page_var": "p", diff --git a/scheduler/connection_types.py b/scheduler/connection_types.py new file mode 100644 index 0000000..fe7399a --- /dev/null +++ b/scheduler/connection_types.py @@ -0,0 +1,19 @@ +from typing import Union, Dict, Tuple, Type + +import redis +import valkey + +from scheduler.settings import Broker + +ConnectionErrorType = Union[redis.ConnectionError, valkey.ConnectionError] +ConnectionType = Union[redis.Redis, valkey.Valkey] +PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline] +RedisSentinel = redis.sentinel.Sentinel + +BrokerConnectionClass: Dict[Tuple[Broker,bool], Type] = { + # Map of (Broker, Strict flag) => Connection Class + (Broker.REDIS, False): redis.Redis, + (Broker.VALKEY, False): valkey.Valkey, + (Broker.REDIS, True): redis.StrictRedis, + (Broker.VALKEY, True): valkey.StrictValkey, +} diff --git a/scheduler/decorators.py b/scheduler/decorators.py index 84b0d0f..76c1a1c 100644 --- a/scheduler/decorators.py +++ b/scheduler/decorators.py @@ -31,10 +31,8 @@ def job(*args, **kwargs): except KeyError: raise QueueNotFoundError(f"Queue {queue} does not exist") - config = settings.SCHEDULER_CONFIG - - kwargs.setdefault("result_ttl", config.get("DEFAULT_RESULT_TTL")) - kwargs.setdefault("timeout", config.get("DEFAULT_TIMEOUT")) + kwargs.setdefault("result_ttl", settings.SCHEDULER_CONFIG.DEFAULT_RESULT_TTL) + kwargs.setdefault("timeout", settings.SCHEDULER_CONFIG.DEFAULT_TIMEOUT) decorator = rq_job_decorator(queue, *args, **kwargs) if func: diff --git a/scheduler/management/commands/rqworker.py b/scheduler/management/commands/rqworker.py index 0b1a5e7..cdf16e9 100644 --- a/scheduler/management/commands/rqworker.py +++ b/scheduler/management/commands/rqworker.py @@ -3,9 +3,10 @@ import sys import click +import redis +import valkey from django.core.management.base import BaseCommand from django.db import connections -from redis.exceptions import ConnectionError from rq.logutils import setup_loghandlers from scheduler.tools import create_worker @@ -109,6 +110,6 @@ def handle(self, **options): logging_level=log_level, max_jobs=options["max_jobs"], ) - except ConnectionError as e: + except (redis.ConnectionError, valkey.ConnectionError) as e: click.echo(str(e), err=True) sys.exit(1) diff --git a/scheduler/models/scheduled_task.py b/scheduler/models/scheduled_task.py index a2bfb28..ece0d2e 100644 --- a/scheduler/models/scheduled_task.py +++ b/scheduler/models/scheduled_task.py @@ -25,7 +25,7 @@ from scheduler.settings import QUEUES from scheduler.settings import logger -SCHEDULER_INTERVAL = settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"] +SCHEDULER_INTERVAL = settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL def failure_callback(job, connection, result, *args, **kwargs): @@ -197,7 +197,7 @@ def _enqueue_args(self) -> Dict: @property def rqueue(self) -> DjangoQueue: - """Returns redis-queue for job""" + """Returns django-queue for job""" return get_queue(self.queue) def ready_for_schedule(self) -> bool: diff --git a/scheduler/queues.py b/scheduler/queues.py index 0d7dc3d..340ee3b 100644 --- a/scheduler/queues.py +++ b/scheduler/queues.py @@ -1,11 +1,12 @@ from typing import List, Dict import redis -from redis.sentinel import Sentinel +import valkey +from .connection_types import RedisSentinel, BrokerConnectionClass from .rq_classes import JobExecution, DjangoQueue, DjangoWorker -from .settings import get_config -from .settings import logger +from .settings import SCHEDULER_CONFIG +from .settings import logger, Broker _CONNECTION_PARAMS = { "URL", @@ -31,12 +32,12 @@ def _get_redis_connection(config, use_strict_redis=False): """ Returns a redis connection from a connection config """ - if get_config("FAKEREDIS"): + if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS: import fakeredis redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis else: - redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis + redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)] logger.debug(f"Getting connection for {config}") if "URL" in config: if config.get("SSL") or config.get("URL").startswith("rediss://"): @@ -62,7 +63,7 @@ def _get_redis_connection(config, use_strict_redis=False): } connection_kwargs.update(config.get("CONNECTION_KWARGS", {})) sentinel_kwargs = config.get("SENTINEL_KWARGS", {}) - sentinel = Sentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs) + sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs) return sentinel.master_for( service_name=config["MASTER_NAME"], redis_class=redis_cls, @@ -86,7 +87,7 @@ def get_connection(queue_settings, use_strict_redis=False): def get_queue( - name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs + name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs ) -> DjangoQueue: """Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`""" from .settings import QUEUES @@ -115,7 +116,7 @@ def get_all_workers(): try: curr_workers = set(DjangoWorker.all(connection=connection)) workers.update(curr_workers) - except redis.ConnectionError as e: + except (redis.ConnectionError, valkey.ConnectionError) as e: logger.error(f"Could not connect for queue {queue_name}: {e}") return workers diff --git a/scheduler/rq_classes.py b/scheduler/rq_classes.py index c537efa..5b961f6 100644 --- a/scheduler/rq_classes.py +++ b/scheduler/rq_classes.py @@ -2,8 +2,6 @@ import django from django.apps import apps -from redis import Redis -from redis.client import Pipeline from rq import Worker from rq.command import send_stop_job_command from rq.decorators import job @@ -24,6 +22,7 @@ from rq.worker import WorkerStatus from scheduler import settings +from scheduler.connection_types import PipelineType, ConnectionType MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"] @@ -67,11 +66,11 @@ def is_scheduled_task(self): def is_execution_of(self, scheduled_job): return ( - self.meta.get("task_type", None) == scheduled_job.TASK_TYPE - and self.meta.get("scheduled_task_id", None) == scheduled_job.id + self.meta.get("task_type", None) == scheduled_job.TASK_TYPE + and self.meta.get("scheduled_task_id", None) == scheduled_job.id ) - def stop_execution(self, connection: Redis): + def stop_execution(self, connection: ConnectionType): send_stop_job_command(connection, self.id) @@ -97,11 +96,11 @@ def __str__(self): return f"{self.name}/{','.join(self.queue_names())}" def _start_scheduler( - self, - burst: bool = False, - logging_level: str = "INFO", - date_format: str = "%H:%M:%S", - log_format: str = "%(asctime)s %(message)s", + self, + burst: bool = False, + logging_level: str = "INFO", + date_format: str = "%H:%M:%S", + log_format: str = "%(asctime)s %(message)s", ) -> None: """Starts the scheduler process. This is specifically designed to be run by the worker when running the `work()` method. @@ -145,14 +144,14 @@ def work(self, **kwargs) -> bool: kwargs.setdefault("with_scheduler", True) return super(DjangoWorker, self).work(**kwargs) - def _set_property(self, prop_name: str, val, pipeline: Optional[Pipeline] = None): + def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None): connection = pipeline if pipeline is not None else self.connection if val is None: connection.hdel(self.key, prop_name) else: connection.hset(self.key, prop_name, val) - def _get_property(self, prop_name: str, pipeline: Optional[Pipeline] = None): + def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None): connection = pipeline if pipeline is not None else self.connection return as_text(connection.hget(self.key, prop_name)) @@ -263,7 +262,7 @@ def last_job_id(self): class DjangoScheduler(RQScheduler): def __init__(self, *args, **kwargs): - kwargs.setdefault("interval", settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"]) + kwargs.setdefault("interval", settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL) super(DjangoScheduler, self).__init__(*args, **kwargs) @staticmethod diff --git a/scheduler/settings.py b/scheduler/settings.py index c893bce..cf7e3dd 100644 --- a/scheduler/settings.py +++ b/scheduler/settings.py @@ -1,4 +1,7 @@ import logging +from dataclasses import dataclass +from enum import Enum +from typing import Callable from django.conf import settings from django.core.exceptions import ImproperlyConfigured @@ -6,13 +9,38 @@ logger = logging.getLogger(__package__) QUEUES = dict() -SCHEDULER_CONFIG = dict() + + +class Broker(Enum): + REDIS = "redis" + FAKEREDIS = "fakeredis" + VALKEY = "valkey" + + +@dataclass +class SchedulerConfig: + EXECUTIONS_IN_PAGE: int + DEFAULT_RESULT_TTL: int + DEFAULT_TIMEOUT: int + SCHEDULER_INTERVAL: int + BROKER: Broker + TOKEN_VALIDATION_METHOD: Callable[[str], bool] def _token_validation(token: str) -> bool: return False +SCHEDULER_CONFIG: SchedulerConfig = SchedulerConfig( + EXECUTIONS_IN_PAGE=20, + DEFAULT_RESULT_TTL=600, + DEFAULT_TIMEOUT=300, + SCHEDULER_INTERVAL=10, + BROKER=Broker.REDIS, + TOKEN_VALIDATION_METHOD=_token_validation, +) + + def conf_settings(): global QUEUES global SCHEDULER_CONFIG @@ -24,20 +52,14 @@ def conf_settings(): if QUEUES is None: raise ImproperlyConfigured("You have to define SCHEDULER_QUEUES in settings.py") - SCHEDULER_CONFIG = { - "EXECUTIONS_IN_PAGE": 20, - "DEFAULT_RESULT_TTL": 600, # 10 minutes - "DEFAULT_TIMEOUT": 300, # 5 minutes - "SCHEDULER_INTERVAL": 10, # 10 seconds - "FAKEREDIS": False, # For testing purposes - "TOKEN_VALIDATION_METHOD": _token_validation, # Access stats from another application using API tokens - } user_settings = getattr(settings, "SCHEDULER_CONFIG", {}) - SCHEDULER_CONFIG.update(user_settings) + if "FAKEREDIS" in user_settings: + user_settings["BROKER"] = Broker.FAKEREDIS if user_settings["FAKEREDIS"] else Broker.REDIS + user_settings.pop("FAKEREDIS") + for k in user_settings: + if k not in SCHEDULER_CONFIG.__annotations__: + raise ImproperlyConfigured(f"Unknown setting {k} in SCHEDULER_CONFIG") + setattr(SCHEDULER_CONFIG, k, user_settings[k]) conf_settings() - - -def get_config(key: str, default=None): - return SCHEDULER_CONFIG.get(key, None) diff --git a/scheduler/tests/test_job_decorator.py b/scheduler/tests/test_job_decorator.py index 19fa10e..53179ef 100644 --- a/scheduler/tests/test_job_decorator.py +++ b/scheduler/tests/test_job_decorator.py @@ -38,18 +38,18 @@ def test_job_decorator_no_params(self): test_job.delay() config = settings.SCHEDULER_CONFIG self._assert_job_with_func_and_props( - "default", test_job, config.get("DEFAULT_RESULT_TTL"), config.get("DEFAULT_TIMEOUT") + "default", test_job, config.DEFAULT_RESULT_TTL, config.DEFAULT_TIMEOUT ) def test_job_decorator_timeout(self): test_job_timeout.delay() config = settings.SCHEDULER_CONFIG - self._assert_job_with_func_and_props("default", test_job_timeout, config.get("DEFAULT_RESULT_TTL"), 1) + self._assert_job_with_func_and_props("default", test_job_timeout, config.DEFAULT_RESULT_TTL, 1) def test_job_decorator_result_ttl(self): test_job_result_ttl.delay() config = settings.SCHEDULER_CONFIG - self._assert_job_with_func_and_props("default", test_job_result_ttl, 1, config.get("DEFAULT_TIMEOUT")) + self._assert_job_with_func_and_props("default", test_job_result_ttl, 1, config.DEFAULT_TIMEOUT) def test_job_decorator_different_queue(self): test_job_diff_queue.delay() @@ -57,8 +57,8 @@ def test_job_decorator_different_queue(self): self._assert_job_with_func_and_props( "django_tasks_scheduler_test", test_job_diff_queue, - config.get("DEFAULT_RESULT_TTL"), - config.get("DEFAULT_TIMEOUT"), + config.DEFAULT_RESULT_TTL, + config.DEFAULT_TIMEOUT, ) def _assert_job_with_func_and_props(self, queue_name, expected_func, expected_result_ttl, expected_timeout): diff --git a/scheduler/tests/test_worker.py b/scheduler/tests/test_worker.py index 22aa24f..4b40bfb 100644 --- a/scheduler/tests/test_worker.py +++ b/scheduler/tests/test_worker.py @@ -34,12 +34,12 @@ def test_create_worker__with_name_containing_slash(self): self.assertEqual(name.replace("/", "."), worker1.name) def test_create_worker__scheduler_interval(self): - prev = settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"] - settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"] = 1 + prev = settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL + settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL = 1 worker = create_worker("default") worker.work(burst=True) self.assertEqual(worker.scheduler.interval, 1) - settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"] = prev + settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL = prev def test_get_worker_with_custom_job_class(self): # Test with string representation of job_class diff --git a/scheduler/tools.py b/scheduler/tools.py index c4f94c5..b332574 100644 --- a/scheduler/tools.py +++ b/scheduler/tools.py @@ -8,7 +8,7 @@ from scheduler.queues import get_queues, logger, get_queue from scheduler.rq_classes import DjangoWorker, MODEL_NAMES -from scheduler.settings import get_config +from scheduler.settings import SCHEDULER_CONFIG, Broker def callable_func(callable_str: str): @@ -66,7 +66,7 @@ def create_worker(*queue_names, **kwargs): queues = get_queues(*queue_names) existing_workers = DjangoWorker.all(connection=queues[0].connection) existing_worker_names = set(map(lambda w: w.name, existing_workers)) - kwargs["fork_job_execution"] = not get_config("FAKEREDIS") + kwargs["fork_job_execution"] = (SCHEDULER_CONFIG.BROKER != Broker.FAKEREDIS) if kwargs.get("name", None) is None: kwargs["name"] = _calc_worker_name(existing_worker_names) diff --git a/scheduler/views.py b/scheduler/views.py index c532dd7..47e3dd3 100644 --- a/scheduler/views.py +++ b/scheduler/views.py @@ -47,7 +47,7 @@ def stats(request): def stats_json(request): auth_token = request.headers.get("Authorization") - token_validation_func = SCHEDULER_CONFIG.get("TOKEN_VALIDATION_METHOD") + token_validation_func = SCHEDULER_CONFIG.TOKEN_VALIDATION_METHOD if request.user.is_staff or (token_validation_func and auth_token and token_validation_func(auth_token)): return JsonResponse(get_statistics()) @@ -110,7 +110,7 @@ def get_statistics(run_maintenance_tasks=False): def _get_registry_job_list(queue, registry, page): - items_per_page = SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"] + items_per_page = SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE num_jobs = len(registry) job_list = [] @@ -199,7 +199,7 @@ def worker_details(request, name): worker.total_working_time = worker.total_working_time / 1000 execution_list = get_worker_executions(worker) - paginator = Paginator(execution_list, SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"]) + paginator = Paginator(execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE) page_number = request.GET.get("p", 1) page_obj = paginator.get_page(page_number) page_range = paginator.get_elided_page_range(page_obj.number)