diff --git a/scheduler/admin/ephemeral_models.py b/scheduler/admin/ephemeral_models.py index bcb3421..a556577 100644 --- a/scheduler/admin/ephemeral_models.py +++ b/scheduler/admin/ephemeral_models.py @@ -13,15 +13,12 @@ def has_change_permission(self, request, obj=None): return True def has_module_permission(self, request): - """ - return True if the given request has any permission in the given - app label. - - Can be overridden by the user in subclasses. In such case it should - return True if the given request has permission to view the module on - the admin index page and access the module's index page. Overriding it - does not restrict access to the add, change or delete views. Use - `ModelAdmin.has_(add|change|delete)_permission` for that. + """Returns True if the given request has any permission in the given app label. + + Can be overridden by the user in subclasses. In such case, it should return True if the given request has + permission to view the module on the admin index page and access the module's index page. Overriding it does + not restrict access to the add, change or delete views. Use `ModelAdmin.has_(add|change|delete)_permission` for + that. """ return request.user.has_module_perms("django-tasks-scheduler") diff --git a/scheduler/admin/task_models.py b/scheduler/admin/task_models.py index fc5fb50..639765a 100644 --- a/scheduler/admin/task_models.py +++ b/scheduler/admin/task_models.py @@ -7,7 +7,7 @@ from scheduler import tools from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask from scheduler.settings import SCHEDULER_CONFIG, logger -from scheduler.tools import get_job_executions +from scheduler.tools import get_job_executions_for_task class HiddenMixin(object): @@ -185,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None): extra = extra_context or {} obj = self.get_object(request, object_id) try: - execution_list = get_job_executions(obj.queue, obj) + execution_list = get_job_executions_for_task(obj.queue, obj) except (redis.ConnectionError, valkey.ConnectionError) as e: logger.warn(f"Could not get job executions: {e}") execution_list = list() diff --git a/scheduler/rq_classes.py b/scheduler/rq_classes.py index 99bc1ec..a14b926 100644 --- a/scheduler/rq_classes.py +++ b/scheduler/rq_classes.py @@ -1,4 +1,4 @@ -from typing import List, Any, Optional, Union +from typing import List, Optional, Union import django from django.apps import apps @@ -37,43 +37,33 @@ def register_sentry(sentry_dsn, **opts): rq_register_sentry(sentry_dsn, **opts) -def as_text(v: Union[bytes, str]) -> Optional[str]: +def as_str(v: Union[bytes, str]) -> Optional[str]: """Converts a bytes value to a string using `utf-8`. - :param v: The value (bytes or string) + :param v: The value (None/bytes/str) :raises: ValueError: If the value is not bytes or string :returns: Either the decoded string or None """ if v is None: return None - elif isinstance(v, bytes): + if isinstance(v, bytes): return v.decode("utf-8") - elif isinstance(v, str): + if isinstance(v, str): return v - else: - raise ValueError("Unknown type %r" % type(v)) - - -def compact(lst: List[Any]) -> List[Any]: - """Remove `None` values from an iterable object. - :param lst: A list (or list-like) object - :returns: The list without None values - """ - return [item for item in lst if item is not None] + raise ValueError("Unknown type %r" % type(v)) class JobExecution(Job): - def __eq__(self, other): + def __eq__(self, other) -> bool: return isinstance(other, Job) and self.id == other.id @property - def is_scheduled_task(self): + def is_scheduled_task(self) -> bool: return self.meta.get("scheduled_task_id", None) is not None - def is_execution_of(self, scheduled_job): + def is_execution_of(self, task: "ScheduledTask") -> bool: return ( - self.meta.get("task_type", None) == scheduled_job.TASK_TYPE - and self.meta.get("scheduled_task_id", None) == scheduled_job.id + self.meta.get("task_type", None) == task.TASK_TYPE and self.meta.get("scheduled_task_id", None) == task.id ) def stop_execution(self, connection: ConnectionType): @@ -138,7 +128,7 @@ def _start_scheduler( proc = self.scheduler.start() self._set_property("scheduler_pid", proc.pid) - def execute_job(self, job: "Job", queue: "Queue"): + def execute_job(self, job: "Job", queue: "Queue") -> None: if self.fork_job_execution: super(DjangoWorker, self).execute_job(job, queue) else: @@ -150,16 +140,17 @@ def work(self, **kwargs) -> bool: kwargs.setdefault("with_scheduler", True) return super(DjangoWorker, self).work(**kwargs) - def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None): + def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None) -> None: connection = pipeline if pipeline is not None else self.connection if val is None: connection.hdel(self.key, prop_name) else: connection.hset(self.key, prop_name, val) - def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None): + def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None) -> Optional[str]: connection = pipeline if pipeline is not None else self.connection - return as_text(connection.hget(self.key, prop_name)) + res = connection.hget(self.key, prop_name) + return as_str(res) def scheduler_pid(self) -> Optional[int]: if len(self.queues) == 0: @@ -170,6 +161,9 @@ def scheduler_pid(self) -> Optional[int]: class DjangoQueue(Queue): + """A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be enqueued later at the end of Django's + request/response cycle.""" + REGISTRIES = dict( finished="finished_job_registry", failed="failed_job_registry", @@ -178,12 +172,8 @@ class DjangoQueue(Queue): deferred="deferred_job_registry", canceled="canceled_job_registry", ) - """ - A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be - enqueued later at the end of Django's request/response cycle. - """ - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: kwargs["job_class"] = JobExecution super(DjangoQueue, self).__init__(*args, **kwargs) @@ -196,11 +186,11 @@ def get_registry(self, name: str) -> Union[None, BaseRegistry, "DjangoQueue"]: return None @property - def finished_job_registry(self): + def finished_job_registry(self) -> FinishedJobRegistry: return FinishedJobRegistry(self.name, self.connection) @property - def started_job_registry(self): + def started_job_registry(self) -> StartedJobRegistry: return StartedJobRegistry( self.name, self.connection, @@ -208,7 +198,7 @@ def started_job_registry(self): ) @property - def deferred_job_registry(self): + def deferred_job_registry(self) -> DeferredJobRegistry: return DeferredJobRegistry( self.name, self.connection, @@ -216,7 +206,7 @@ def deferred_job_registry(self): ) @property - def failed_job_registry(self): + def failed_job_registry(self) -> FailedJobRegistry: return FailedJobRegistry( self.name, self.connection, @@ -224,7 +214,7 @@ def failed_job_registry(self): ) @property - def scheduled_job_registry(self): + def scheduled_job_registry(self) -> ScheduledJobRegistry: return ScheduledJobRegistry( self.name, self.connection, @@ -232,7 +222,7 @@ def scheduled_job_registry(self): ) @property - def canceled_job_registry(self): + def canceled_job_registry(self) -> CanceledJobRegistry: return CanceledJobRegistry( self.name, self.connection, @@ -250,24 +240,24 @@ def get_all_job_ids(self) -> List[str]: res.extend(self.canceled_job_registry.get_job_ids()) return res - def get_all_jobs(self): + def get_all_jobs(self) -> List[JobExecution]: job_ids = self.get_all_job_ids() - return compact([self.fetch_job(job_id) for job_id in job_ids]) + return list(filter(lambda j: j is not None, [self.fetch_job(job_id) for job_id in job_ids])) - def clean_registries(self): + def clean_registries(self) -> None: self.started_job_registry.cleanup() self.failed_job_registry.cleanup() self.finished_job_registry.cleanup() - def remove_job_id(self, job_id: str): + def remove_job_id(self, job_id: str) -> None: self.connection.lrem(self.key, 0, job_id) - def last_job_id(self): + def last_job_id(self) -> Optional[str]: return self.connection.lindex(self.key, 0) class DjangoScheduler(RQScheduler): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: kwargs.setdefault("interval", settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL) super(DjangoScheduler, self).__init__(*args, **kwargs) @@ -281,10 +271,10 @@ def reschedule_all_jobs(): logger.debug(f"Rescheduling {str(item)}") item.save() - def work(self): + def work(self) -> None: django.setup() super(DjangoScheduler, self).work() - def enqueue_scheduled_jobs(self): + def enqueue_scheduled_jobs(self) -> None: self.reschedule_all_jobs() super(DjangoScheduler, self).enqueue_scheduled_jobs() diff --git a/scheduler/settings.py b/scheduler/settings.py index cf7e3dd..db770be 100644 --- a/scheduler/settings.py +++ b/scheduler/settings.py @@ -54,6 +54,7 @@ def conf_settings(): user_settings = getattr(settings, "SCHEDULER_CONFIG", {}) if "FAKEREDIS" in user_settings: + logger.warning("Configuration using FAKEREDIS is deprecated. Use BROKER='fakeredis' instead") user_settings["BROKER"] = Broker.FAKEREDIS if user_settings["FAKEREDIS"] else Broker.REDIS user_settings.pop("FAKEREDIS") for k in user_settings: diff --git a/scheduler/tools.py b/scheduler/tools.py index 476fff0..f9f1d99 100644 --- a/scheduler/tools.py +++ b/scheduler/tools.py @@ -1,5 +1,6 @@ import importlib import os +from typing import List, Any, Callable import croniter from django.apps import apps @@ -7,11 +8,11 @@ from django.utils.module_loading import import_string from scheduler.queues import get_queues, logger, get_queue -from scheduler.rq_classes import DjangoWorker, MODEL_NAMES +from scheduler.rq_classes import DjangoWorker, MODEL_NAMES, JobExecution from scheduler.settings import SCHEDULER_CONFIG, Broker -def callable_func(callable_str: str): +def callable_func(callable_str: str) -> Callable: path = callable_str.split(".") module = importlib.import_module(".".join(path[:-1])) func = getattr(module, path[-1]) @@ -28,7 +29,7 @@ def get_next_cron_time(cron_string) -> timezone.datetime: return next_itr -def get_scheduled_task(task_model: str, task_id: int): +def get_scheduled_task(task_model: str, task_id: int) -> "BaseTask": if task_model not in MODEL_NAMES: raise ValueError(f"Job Model {task_model} does not exist, choices are {MODEL_NAMES}") model = apps.get_model(app_label="scheduler", model_name=task_model) @@ -38,7 +39,7 @@ def get_scheduled_task(task_model: str, task_id: int): return task -def run_task(task_model: str, task_id: int): +def run_task(task_model: str, task_id: int) -> Any: """Run a scheduled job""" scheduled_task = get_scheduled_task(task_model, task_id) logger.debug(f"Running task {str(scheduled_task)}") @@ -48,7 +49,7 @@ def run_task(task_model: str, task_id: int): return res -def _calc_worker_name(existing_worker_names): +def _calc_worker_name(existing_worker_names) -> str: hostname = os.uname()[1] c = 1 worker_name = f"{hostname}-worker.{c}" @@ -58,10 +59,8 @@ def _calc_worker_name(existing_worker_names): return worker_name -def create_worker(*queue_names, **kwargs): - """ - Returns a Django worker for all queues or specified ones. - """ +def create_worker(*queue_names, **kwargs) -> DjangoWorker: + """Returns a Django worker for all queues or specified ones.""" queues = get_queues(*queue_names) existing_workers = DjangoWorker.all(connection=queues[0].connection) @@ -84,7 +83,7 @@ def create_worker(*queue_names, **kwargs): return worker -def get_job_executions(queue_name, scheduled_task): +def get_job_executions_for_task(queue_name, scheduled_task) -> List[JobExecution]: queue = get_queue(queue_name) job_list = queue.get_all_jobs() res = list(filter(lambda j: j.is_execution_of(scheduled_task), job_list)) diff --git a/scheduler/views.py b/scheduler/views.py index 47e3dd3..d70d359 100644 --- a/scheduler/views.py +++ b/scheduler/views.py @@ -6,7 +6,7 @@ from django.contrib import admin, messages from django.contrib.admin.views.decorators import staff_member_required from django.core.paginator import Paginator -from django.http import JsonResponse +from django.http import JsonResponse, HttpResponse, HttpRequest from django.http.response import HttpResponseNotFound, Http404, HttpResponseBadRequest from django.shortcuts import redirect from django.shortcuts import render @@ -158,7 +158,7 @@ def jobs_view(request, queue_name: str, registry_name: str): @never_cache @staff_member_required -def queue_workers(request, queue_name): +def queue_workers(request: HttpRequest, queue_name: str) -> HttpResponse: queue = get_queue(queue_name) all_workers = DjangoWorker.all(queue.connection) for w in all_workers: @@ -175,7 +175,7 @@ def queue_workers(request, queue_name): @never_cache @staff_member_required -def workers(request): +def workers(request: HttpRequest) -> HttpResponse: all_workers = get_all_workers() worker_list = [worker for worker in all_workers] @@ -188,7 +188,7 @@ def workers(request): @never_cache @staff_member_required -def worker_details(request, name): +def worker_details(request: HttpRequest, name: str) -> HttpResponse: queue, worker = None, None workers = get_all_workers() worker = next((w for w in workers if w.name == name), None) @@ -233,7 +233,7 @@ def _find_job(job_id: str) -> Tuple[Optional[DjangoQueue], Optional[JobExecution @never_cache @staff_member_required -def job_detail(request, job_id: str): +def job_detail(request: HttpRequest, job_id: str) -> HttpResponse: queue, job = _find_job(job_id) if job is None: return HttpResponseBadRequest(f"Job {escape(job_id)} does not exist, maybe its TTL has passed") @@ -261,7 +261,7 @@ def job_detail(request, job_id: str): @never_cache @staff_member_required -def clear_queue_registry(request, queue_name, registry_name): +def clear_queue_registry(request: HttpRequest, queue_name: str, registry_name: str) -> HttpResponse: queue = get_queue(queue_name) registry = queue.get_registry(registry_name) if registry is None: @@ -306,7 +306,7 @@ def clear_queue_registry(request, queue_name, registry_name): @never_cache @staff_member_required -def requeue_all(request, queue_name, registry_name): +def requeue_all(request: HttpRequest, queue_name: str, registry_name: str) -> HttpResponse: queue = get_queue(queue_name) registry = queue.get_registry(registry_name) if registry is None: @@ -344,7 +344,7 @@ def requeue_all(request, queue_name, registry_name): @never_cache @staff_member_required -def confirm_action(request, queue_name): +def confirm_action(request: HttpRequest, queue_name: str) -> HttpResponse: queue = get_queue(queue_name) next_url = request.META.get("HTTP_REFERER") or reverse("queue_registry_jobs", args=[queue_name, "queued"]) try: @@ -378,7 +378,7 @@ def confirm_action(request, queue_name): @never_cache @staff_member_required -def actions(request, queue_name): +def actions(request: HttpRequest, queue_name: str) -> HttpResponse: queue = get_queue(queue_name) next_url = request.POST.get("next_url") or reverse("queue_registry_jobs", args=[queue_name, "queued"]) try: @@ -430,7 +430,7 @@ def actions(request, queue_name): @never_cache @staff_member_required -def job_action(request, job_id: str, action: str): +def job_action(request: HttpRequest, job_id: str, action: str) -> HttpResponse: queue, job = _find_job(job_id) if job is None: return HttpResponseBadRequest(f"Job {escape(job_id)} does not exist, maybe its TTL has passed")