Skip to content

Commit

Permalink
valkey tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed Sep 14, 2024
1 parent 950b6ce commit bf5baa4
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 69 deletions.
23 changes: 15 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ croniter = ">=2.0"
click = "^8.1"
rq = "^1.16"
pyyaml = { version = "^6.0", optional = true }
valkey-py = { version = "^0.0.1", optional = true }
valkey = "6.0.1"

[tool.poetry.dev-dependencies]
poetry = "^1.8.3"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .task_models import TaskAdmin # noqa: F401
from .redis_models import QueueAdmin, WorkerAdmin # noqa: F401
from .ephemeral_models import QueueAdmin, WorkerAdmin # noqa: F401
File renamed without changes.
8 changes: 5 additions & 3 deletions scheduler/admin/task_models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import redis
import valkey
from django.contrib import admin, messages
from django.contrib.contenttypes.admin import GenericStackedInline
from django.utils.translation import gettext_lazy as _

from scheduler import tools
from scheduler.connection_types import ConnectionErrorType
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions
Expand Down Expand Up @@ -185,17 +187,17 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
obj = self.get_object(request, object_id)
try:
execution_list = get_job_executions(obj.queue, obj)
except redis.ConnectionError as e:
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"])
paginator = self.get_paginator(request, execution_list, SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE)
page_number = request.GET.get("p", 1)
page_obj = paginator.get_page(page_number)
page_range = paginator.get_elided_page_range(page_obj.number)

extra.update(
{
"pagination_required": paginator.count > SCHEDULER_CONFIG["EXECUTIONS_IN_PAGE"],
"pagination_required": paginator.count > SCHEDULER_CONFIG.EXECUTIONS_IN_PAGE,
"executions": page_obj,
"page_range": page_range,
"page_var": "p",
Expand Down
19 changes: 19 additions & 0 deletions scheduler/connection_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Union, Dict, Tuple, Type

import redis
import valkey

from scheduler.settings import Broker

ConnectionErrorType = Union[redis.ConnectionError, valkey.ConnectionError]
ConnectionType = Union[redis.Redis, valkey.Valkey]
PipelineType = Union[redis.client.Pipeline, valkey.client.Pipeline]
RedisSentinel = redis.sentinel.Sentinel

BrokerConnectionClass: Dict[Tuple[Broker,bool], Type] = {
# Map of (Broker, Strict flag) => Connection Class
(Broker.REDIS, False): redis.Redis,
(Broker.VALKEY, False): valkey.Valkey,
(Broker.REDIS, True): redis.StrictRedis,
(Broker.VALKEY, True): valkey.StrictValkey,
}
6 changes: 2 additions & 4 deletions scheduler/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ def job(*args, **kwargs):
except KeyError:
raise QueueNotFoundError(f"Queue {queue} does not exist")

config = settings.SCHEDULER_CONFIG

kwargs.setdefault("result_ttl", config.get("DEFAULT_RESULT_TTL"))
kwargs.setdefault("timeout", config.get("DEFAULT_TIMEOUT"))
kwargs.setdefault("result_ttl", settings.SCHEDULER_CONFIG.DEFAULT_RESULT_TTL)
kwargs.setdefault("timeout", settings.SCHEDULER_CONFIG.DEFAULT_TIMEOUT)

decorator = rq_job_decorator(queue, *args, **kwargs)
if func:
Expand Down
5 changes: 3 additions & 2 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import sys

import click
import redis
import valkey
from django.core.management.base import BaseCommand
from django.db import connections
from redis.exceptions import ConnectionError
from rq.logutils import setup_loghandlers

from scheduler.tools import create_worker
Expand Down Expand Up @@ -109,6 +110,6 @@ def handle(self, **options):
logging_level=log_level,
max_jobs=options["max_jobs"],
)
except ConnectionError as e:
except (redis.ConnectionError, valkey.ConnectionError) as e:
click.echo(str(e), err=True)
sys.exit(1)
4 changes: 2 additions & 2 deletions scheduler/models/scheduled_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from scheduler.settings import QUEUES
from scheduler.settings import logger

SCHEDULER_INTERVAL = settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"]
SCHEDULER_INTERVAL = settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL


def failure_callback(job, connection, result, *args, **kwargs):
Expand Down Expand Up @@ -197,7 +197,7 @@ def _enqueue_args(self) -> Dict:

@property
def rqueue(self) -> DjangoQueue:
"""Returns redis-queue for job"""
"""Returns django-queue for job"""
return get_queue(self.queue)

def ready_for_schedule(self) -> bool:
Expand Down
17 changes: 9 additions & 8 deletions scheduler/queues.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from typing import List, Dict

import redis
from redis.sentinel import Sentinel
import valkey

from .connection_types import RedisSentinel, BrokerConnectionClass
from .rq_classes import JobExecution, DjangoQueue, DjangoWorker
from .settings import get_config
from .settings import logger
from .settings import SCHEDULER_CONFIG
from .settings import logger, Broker

_CONNECTION_PARAMS = {
"URL",
Expand All @@ -31,12 +32,12 @@ def _get_redis_connection(config, use_strict_redis=False):
"""
Returns a redis connection from a connection config
"""
if get_config("FAKEREDIS"):
if SCHEDULER_CONFIG.BROKER == Broker.FAKEREDIS:
import fakeredis

redis_cls = fakeredis.FakeRedis if use_strict_redis else fakeredis.FakeStrictRedis
else:
redis_cls = redis.StrictRedis if use_strict_redis else redis.Redis
redis_cls = BrokerConnectionClass[(SCHEDULER_CONFIG.BROKER, use_strict_redis)]
logger.debug(f"Getting connection for {config}")
if "URL" in config:
if config.get("SSL") or config.get("URL").startswith("rediss://"):
Expand All @@ -62,7 +63,7 @@ def _get_redis_connection(config, use_strict_redis=False):
}
connection_kwargs.update(config.get("CONNECTION_KWARGS", {}))
sentinel_kwargs = config.get("SENTINEL_KWARGS", {})
sentinel = Sentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
sentinel = RedisSentinel(config["SENTINELS"], sentinel_kwargs=sentinel_kwargs, **connection_kwargs)
return sentinel.master_for(
service_name=config["MASTER_NAME"],
redis_class=redis_cls,
Expand All @@ -86,7 +87,7 @@ def get_connection(queue_settings, use_strict_redis=False):


def get_queue(
name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs
name="default", default_timeout=None, is_async=None, autocommit=None, connection=None, **kwargs
) -> DjangoQueue:
"""Returns an DjangoQueue using parameters defined in `SCHEDULER_QUEUES`"""
from .settings import QUEUES
Expand Down Expand Up @@ -115,7 +116,7 @@ def get_all_workers():
try:
curr_workers = set(DjangoWorker.all(connection=connection))
workers.update(curr_workers)
except redis.ConnectionError as e:
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.error(f"Could not connect for queue {queue_name}: {e}")
return workers

Expand Down
25 changes: 12 additions & 13 deletions scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import django
from django.apps import apps
from redis import Redis
from redis.client import Pipeline
from rq import Worker
from rq.command import send_stop_job_command
from rq.decorators import job
Expand All @@ -24,6 +22,7 @@
from rq.worker import WorkerStatus

from scheduler import settings
from scheduler.connection_types import PipelineType, ConnectionType

MODEL_NAMES = ["ScheduledTask", "RepeatableTask", "CronTask"]

Expand Down Expand Up @@ -67,11 +66,11 @@ def is_scheduled_task(self):

def is_execution_of(self, scheduled_job):
return (
self.meta.get("task_type", None) == scheduled_job.TASK_TYPE
and self.meta.get("scheduled_task_id", None) == scheduled_job.id
self.meta.get("task_type", None) == scheduled_job.TASK_TYPE
and self.meta.get("scheduled_task_id", None) == scheduled_job.id
)

def stop_execution(self, connection: Redis):
def stop_execution(self, connection: ConnectionType):
send_stop_job_command(connection, self.id)


Expand All @@ -97,11 +96,11 @@ def __str__(self):
return f"{self.name}/{','.join(self.queue_names())}"

def _start_scheduler(
self,
burst: bool = False,
logging_level: str = "INFO",
date_format: str = "%H:%M:%S",
log_format: str = "%(asctime)s %(message)s",
self,
burst: bool = False,
logging_level: str = "INFO",
date_format: str = "%H:%M:%S",
log_format: str = "%(asctime)s %(message)s",
) -> None:
"""Starts the scheduler process.
This is specifically designed to be run by the worker when running the `work()` method.
Expand Down Expand Up @@ -145,14 +144,14 @@ def work(self, **kwargs) -> bool:
kwargs.setdefault("with_scheduler", True)
return super(DjangoWorker, self).work(**kwargs)

def _set_property(self, prop_name: str, val, pipeline: Optional[Pipeline] = None):
def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None):
connection = pipeline if pipeline is not None else self.connection
if val is None:
connection.hdel(self.key, prop_name)
else:
connection.hset(self.key, prop_name, val)

def _get_property(self, prop_name: str, pipeline: Optional[Pipeline] = None):
def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None):
connection = pipeline if pipeline is not None else self.connection
return as_text(connection.hget(self.key, prop_name))

Expand Down Expand Up @@ -263,7 +262,7 @@ def last_job_id(self):

class DjangoScheduler(RQScheduler):
def __init__(self, *args, **kwargs):
kwargs.setdefault("interval", settings.SCHEDULER_CONFIG["SCHEDULER_INTERVAL"])
kwargs.setdefault("interval", settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL)
super(DjangoScheduler, self).__init__(*args, **kwargs)

@staticmethod
Expand Down
50 changes: 36 additions & 14 deletions scheduler/settings.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Callable

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

logger = logging.getLogger(__package__)

QUEUES = dict()
SCHEDULER_CONFIG = dict()


class Broker(Enum):
REDIS = "redis"
FAKEREDIS = "fakeredis"
VALKEY = "valkey"


@dataclass
class SchedulerConfig:
EXECUTIONS_IN_PAGE: int
DEFAULT_RESULT_TTL: int
DEFAULT_TIMEOUT: int
SCHEDULER_INTERVAL: int
BROKER: Broker
TOKEN_VALIDATION_METHOD: Callable[[str], bool]


def _token_validation(token: str) -> bool:
return False


SCHEDULER_CONFIG: SchedulerConfig = SchedulerConfig(
EXECUTIONS_IN_PAGE=20,
DEFAULT_RESULT_TTL=600,
DEFAULT_TIMEOUT=300,
SCHEDULER_INTERVAL=10,
BROKER=Broker.REDIS,
TOKEN_VALIDATION_METHOD=_token_validation,
)


def conf_settings():
global QUEUES
global SCHEDULER_CONFIG
Expand All @@ -24,20 +52,14 @@ def conf_settings():
if QUEUES is None:
raise ImproperlyConfigured("You have to define SCHEDULER_QUEUES in settings.py")

SCHEDULER_CONFIG = {
"EXECUTIONS_IN_PAGE": 20,
"DEFAULT_RESULT_TTL": 600, # 10 minutes
"DEFAULT_TIMEOUT": 300, # 5 minutes
"SCHEDULER_INTERVAL": 10, # 10 seconds
"FAKEREDIS": False, # For testing purposes
"TOKEN_VALIDATION_METHOD": _token_validation, # Access stats from another application using API tokens
}
user_settings = getattr(settings, "SCHEDULER_CONFIG", {})
SCHEDULER_CONFIG.update(user_settings)
if "FAKEREDIS" in user_settings:
user_settings["BROKER"] = Broker.FAKEREDIS if user_settings["FAKEREDIS"] else Broker.REDIS
user_settings.pop("FAKEREDIS")
for k in user_settings:
if k not in SCHEDULER_CONFIG.__annotations__:
raise ImproperlyConfigured(f"Unknown setting {k} in SCHEDULER_CONFIG")
setattr(SCHEDULER_CONFIG, k, user_settings[k])


conf_settings()


def get_config(key: str, default=None):
return SCHEDULER_CONFIG.get(key, None)
10 changes: 5 additions & 5 deletions scheduler/tests/test_job_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@ def test_job_decorator_no_params(self):
test_job.delay()
config = settings.SCHEDULER_CONFIG
self._assert_job_with_func_and_props(
"default", test_job, config.get("DEFAULT_RESULT_TTL"), config.get("DEFAULT_TIMEOUT")
"default", test_job, config.DEFAULT_RESULT_TTL, config.DEFAULT_TIMEOUT
)

def test_job_decorator_timeout(self):
test_job_timeout.delay()
config = settings.SCHEDULER_CONFIG
self._assert_job_with_func_and_props("default", test_job_timeout, config.get("DEFAULT_RESULT_TTL"), 1)
self._assert_job_with_func_and_props("default", test_job_timeout, config.DEFAULT_RESULT_TTL, 1)

def test_job_decorator_result_ttl(self):
test_job_result_ttl.delay()
config = settings.SCHEDULER_CONFIG
self._assert_job_with_func_and_props("default", test_job_result_ttl, 1, config.get("DEFAULT_TIMEOUT"))
self._assert_job_with_func_and_props("default", test_job_result_ttl, 1, config.DEFAULT_TIMEOUT)

def test_job_decorator_different_queue(self):
test_job_diff_queue.delay()
config = settings.SCHEDULER_CONFIG
self._assert_job_with_func_and_props(
"django_tasks_scheduler_test",
test_job_diff_queue,
config.get("DEFAULT_RESULT_TTL"),
config.get("DEFAULT_TIMEOUT"),
config.DEFAULT_RESULT_TTL,
config.DEFAULT_TIMEOUT,
)

def _assert_job_with_func_and_props(self, queue_name, expected_func, expected_result_ttl, expected_timeout):
Expand Down
Loading

0 comments on commit bf5baa4

Please sign in to comment.