Skip to content

Commit

Permalink
add types, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed Sep 20, 2024
1 parent ad064ab commit c7c3992
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 74 deletions.
15 changes: 6 additions & 9 deletions scheduler/admin/ephemeral_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ def has_change_permission(self, request, obj=None):
return True

def has_module_permission(self, request):
"""
return True if the given request has any permission in the given
app label.
Can be overridden by the user in subclasses. In such case it should
return True if the given request has permission to view the module on
the admin index page and access the module's index page. Overriding it
does not restrict access to the add, change or delete views. Use
`ModelAdmin.has_(add|change|delete)_permission` for that.
"""Returns True if the given request has any permission in the given app label.
Can be overridden by the user in subclasses. In such case, it should return True if the given request has
permission to view the module on the admin index page and access the module's index page. Overriding it does
not restrict access to the add, change or delete views. Use `ModelAdmin.has_(add|change|delete)_permission` for
that.
"""
return request.user.has_module_perms("django-tasks-scheduler")

Expand Down
4 changes: 2 additions & 2 deletions scheduler/admin/task_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from scheduler import tools
from scheduler.models import CronTask, TaskArg, TaskKwarg, RepeatableTask, ScheduledTask
from scheduler.settings import SCHEDULER_CONFIG, logger
from scheduler.tools import get_job_executions
from scheduler.tools import get_job_executions_for_task


class HiddenMixin(object):
Expand Down Expand Up @@ -185,7 +185,7 @@ def change_view(self, request, object_id, form_url="", extra_context=None):
extra = extra_context or {}
obj = self.get_object(request, object_id)
try:
execution_list = get_job_executions(obj.queue, obj)
execution_list = get_job_executions_for_task(obj.queue, obj)
except (redis.ConnectionError, valkey.ConnectionError) as e:
logger.warn(f"Could not get job executions: {e}")
execution_list = list()
Expand Down
76 changes: 33 additions & 43 deletions scheduler/rq_classes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Any, Optional, Union
from typing import List, Optional, Union

import django
from django.apps import apps
Expand Down Expand Up @@ -37,43 +37,33 @@ def register_sentry(sentry_dsn, **opts):
rq_register_sentry(sentry_dsn, **opts)


def as_text(v: Union[bytes, str]) -> Optional[str]:
def as_str(v: Union[bytes, str]) -> Optional[str]:
"""Converts a bytes value to a string using `utf-8`.
:param v: The value (bytes or string)
:param v: The value (None/bytes/str)
:raises: ValueError: If the value is not bytes or string
:returns: Either the decoded string or None
"""
if v is None:
return None
elif isinstance(v, bytes):
if isinstance(v, bytes):
return v.decode("utf-8")
elif isinstance(v, str):
if isinstance(v, str):
return v
else:
raise ValueError("Unknown type %r" % type(v))


def compact(lst: List[Any]) -> List[Any]:
"""Remove `None` values from an iterable object.
:param lst: A list (or list-like) object
:returns: The list without None values
"""
return [item for item in lst if item is not None]
raise ValueError("Unknown type %r" % type(v))


class JobExecution(Job):
def __eq__(self, other):
def __eq__(self, other) -> bool:
return isinstance(other, Job) and self.id == other.id

@property
def is_scheduled_task(self):
def is_scheduled_task(self) -> bool:
return self.meta.get("scheduled_task_id", None) is not None

def is_execution_of(self, scheduled_job):
def is_execution_of(self, task: "ScheduledTask") -> bool:
return (
self.meta.get("task_type", None) == scheduled_job.TASK_TYPE
and self.meta.get("scheduled_task_id", None) == scheduled_job.id
self.meta.get("task_type", None) == task.TASK_TYPE and self.meta.get("scheduled_task_id", None) == task.id
)

def stop_execution(self, connection: ConnectionType):
Expand Down Expand Up @@ -138,7 +128,7 @@ def _start_scheduler(
proc = self.scheduler.start()
self._set_property("scheduler_pid", proc.pid)

def execute_job(self, job: "Job", queue: "Queue"):
def execute_job(self, job: "Job", queue: "Queue") -> None:
if self.fork_job_execution:
super(DjangoWorker, self).execute_job(job, queue)
else:
Expand All @@ -150,16 +140,17 @@ def work(self, **kwargs) -> bool:
kwargs.setdefault("with_scheduler", True)
return super(DjangoWorker, self).work(**kwargs)

def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None):
def _set_property(self, prop_name: str, val, pipeline: Optional[PipelineType] = None) -> None:
connection = pipeline if pipeline is not None else self.connection
if val is None:
connection.hdel(self.key, prop_name)
else:
connection.hset(self.key, prop_name, val)

def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None):
def _get_property(self, prop_name: str, pipeline: Optional[PipelineType] = None) -> Optional[str]:
connection = pipeline if pipeline is not None else self.connection
return as_text(connection.hget(self.key, prop_name))
res = connection.hget(self.key, prop_name)
return as_str(res)

def scheduler_pid(self) -> Optional[int]:
if len(self.queues) == 0:
Expand All @@ -170,6 +161,9 @@ def scheduler_pid(self) -> Optional[int]:


class DjangoQueue(Queue):
"""A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be enqueued later at the end of Django's
request/response cycle."""

REGISTRIES = dict(
finished="finished_job_registry",
failed="failed_job_registry",
Expand All @@ -178,12 +172,8 @@ class DjangoQueue(Queue):
deferred="deferred_job_registry",
canceled="canceled_job_registry",
)
"""
A subclass of RQ's QUEUE that allows jobs to be stored temporarily to be
enqueued later at the end of Django's request/response cycle.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
kwargs["job_class"] = JobExecution
super(DjangoQueue, self).__init__(*args, **kwargs)

Expand All @@ -196,43 +186,43 @@ def get_registry(self, name: str) -> Union[None, BaseRegistry, "DjangoQueue"]:
return None

@property
def finished_job_registry(self):
def finished_job_registry(self) -> FinishedJobRegistry:
return FinishedJobRegistry(self.name, self.connection)

@property
def started_job_registry(self):
def started_job_registry(self) -> StartedJobRegistry:
return StartedJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
)

@property
def deferred_job_registry(self):
def deferred_job_registry(self) -> DeferredJobRegistry:
return DeferredJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
)

@property
def failed_job_registry(self):
def failed_job_registry(self) -> FailedJobRegistry:
return FailedJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
)

@property
def scheduled_job_registry(self):
def scheduled_job_registry(self) -> ScheduledJobRegistry:
return ScheduledJobRegistry(
self.name,
self.connection,
job_class=JobExecution,
)

@property
def canceled_job_registry(self):
def canceled_job_registry(self) -> CanceledJobRegistry:
return CanceledJobRegistry(
self.name,
self.connection,
Expand All @@ -250,24 +240,24 @@ def get_all_job_ids(self) -> List[str]:
res.extend(self.canceled_job_registry.get_job_ids())
return res

def get_all_jobs(self):
def get_all_jobs(self) -> List[JobExecution]:
job_ids = self.get_all_job_ids()
return compact([self.fetch_job(job_id) for job_id in job_ids])
return list(filter(lambda j: j is not None, [self.fetch_job(job_id) for job_id in job_ids]))

def clean_registries(self):
def clean_registries(self) -> None:
self.started_job_registry.cleanup()
self.failed_job_registry.cleanup()
self.finished_job_registry.cleanup()

def remove_job_id(self, job_id: str):
def remove_job_id(self, job_id: str) -> None:
self.connection.lrem(self.key, 0, job_id)

def last_job_id(self):
def last_job_id(self) -> Optional[str]:
return self.connection.lindex(self.key, 0)


class DjangoScheduler(RQScheduler):
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
kwargs.setdefault("interval", settings.SCHEDULER_CONFIG.SCHEDULER_INTERVAL)
super(DjangoScheduler, self).__init__(*args, **kwargs)

Expand All @@ -281,10 +271,10 @@ def reschedule_all_jobs():
logger.debug(f"Rescheduling {str(item)}")
item.save()

def work(self):
def work(self) -> None:
django.setup()
super(DjangoScheduler, self).work()

def enqueue_scheduled_jobs(self):
def enqueue_scheduled_jobs(self) -> None:
self.reschedule_all_jobs()
super(DjangoScheduler, self).enqueue_scheduled_jobs()
1 change: 1 addition & 0 deletions scheduler/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def conf_settings():

user_settings = getattr(settings, "SCHEDULER_CONFIG", {})
if "FAKEREDIS" in user_settings:
logger.warning("Configuration using FAKEREDIS is deprecated. Use BROKER='fakeredis' instead")
user_settings["BROKER"] = Broker.FAKEREDIS if user_settings["FAKEREDIS"] else Broker.REDIS
user_settings.pop("FAKEREDIS")
for k in user_settings:
Expand Down
19 changes: 9 additions & 10 deletions scheduler/tools.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import importlib
import os
from typing import List, Any, Callable

import croniter
from django.apps import apps
from django.utils import timezone
from django.utils.module_loading import import_string

from scheduler.queues import get_queues, logger, get_queue
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES, JobExecution
from scheduler.settings import SCHEDULER_CONFIG, Broker


def callable_func(callable_str: str):
def callable_func(callable_str: str) -> Callable:
path = callable_str.split(".")
module = importlib.import_module(".".join(path[:-1]))
func = getattr(module, path[-1])
Expand All @@ -28,7 +29,7 @@ def get_next_cron_time(cron_string) -> timezone.datetime:
return next_itr


def get_scheduled_task(task_model: str, task_id: int):
def get_scheduled_task(task_model: str, task_id: int) -> "BaseTask":
if task_model not in MODEL_NAMES:
raise ValueError(f"Job Model {task_model} does not exist, choices are {MODEL_NAMES}")
model = apps.get_model(app_label="scheduler", model_name=task_model)
Expand All @@ -38,7 +39,7 @@ def get_scheduled_task(task_model: str, task_id: int):
return task


def run_task(task_model: str, task_id: int):
def run_task(task_model: str, task_id: int) -> Any:
"""Run a scheduled job"""
scheduled_task = get_scheduled_task(task_model, task_id)
logger.debug(f"Running task {str(scheduled_task)}")
Expand All @@ -48,7 +49,7 @@ def run_task(task_model: str, task_id: int):
return res


def _calc_worker_name(existing_worker_names):
def _calc_worker_name(existing_worker_names) -> str:
hostname = os.uname()[1]
c = 1
worker_name = f"{hostname}-worker.{c}"
Expand All @@ -58,10 +59,8 @@ def _calc_worker_name(existing_worker_names):
return worker_name


def create_worker(*queue_names, **kwargs):
"""
Returns a Django worker for all queues or specified ones.
"""
def create_worker(*queue_names, **kwargs) -> DjangoWorker:
"""Returns a Django worker for all queues or specified ones."""

queues = get_queues(*queue_names)
existing_workers = DjangoWorker.all(connection=queues[0].connection)
Expand All @@ -84,7 +83,7 @@ def create_worker(*queue_names, **kwargs):
return worker


def get_job_executions(queue_name, scheduled_task):
def get_job_executions_for_task(queue_name, scheduled_task) -> List[JobExecution]:
queue = get_queue(queue_name)
job_list = queue.get_all_jobs()
res = list(filter(lambda j: j.is_execution_of(scheduled_task), job_list))
Expand Down
Loading

0 comments on commit c7c3992

Please sign in to comment.