Skip to content

Commit

Permalink
feat:add job-class kwarg to rqworker
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriels1234 committed Sep 10, 2024
1 parent 8fb696c commit 4140d64
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
3 changes: 3 additions & 0 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def add_arguments(self, parser):
help='Maximum number of jobs to execute before terminating worker')
parser.add_argument('--fork-job-execution', action='store', default=True, dest='fork_job_execution', type=bool,
help='Fork job execution to another process')
parser.add_argument('--job-class', action='store', dest='job_class',
help='Jobs class to use')
parser.add_argument(
'queues', nargs='*', type=str,
help='The queues to work on, separated by space, all queues should be using the same redis')
Expand All @@ -71,6 +73,7 @@ def handle(self, **options):
w = create_worker(
*queues,
name=options['name'],
job_class=options.get('job_class'),
default_worker_ttl=options['worker_ttl'],
fork_job_execution=options['fork_job_execution'], )

Expand Down
16 changes: 13 additions & 3 deletions scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,19 @@ def stop_execution(self, connection: Redis):

class DjangoWorker(Worker):
def __init__(self, *args, **kwargs):
self.fork_job_execution = kwargs.pop('fork_job_execution', True)
kwargs['job_class'] = JobExecution
kwargs['queue_class'] = DjangoQueue
self.fork_job_execution = kwargs.pop("fork_job_execution", True)
job_class = kwargs.get("job_class", JobExecution)
if not isinstance(job_class, type) or not issubclass(job_class, JobExecution):
if isinstance(job_class, type):
job_class = type("JobExecutionWrapper", (JobExecution, job_class), {})
else:
job_class = JobExecution

# Update kwargs with the potentially modified job_class
kwargs["job_class"] = job_class

kwargs["job_class"] = job_class
kwargs["queue_class"] = DjangoQueue
super(DjangoWorker, self).__init__(*args, **kwargs)

def __eq__(self, other):
Expand Down
8 changes: 8 additions & 0 deletions scheduler/tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import uuid

from rq.job import Job
from scheduler.rq_classes import JobExecution
from scheduler.tests.testtools import SchedulerBaseCase
from scheduler.tools import create_worker
from . import test_settings # noqa
Expand Down Expand Up @@ -38,3 +40,9 @@ def test_create_worker__scheduler_interval(self):
worker.work(burst=True)
self.assertEqual(worker.scheduler.interval, 1)
settings.SCHEDULER_CONFIG['SCHEDULER_INTERVAL'] = prev

def test_get_worker_custom_classes(self):
# Test with string representation of job_class
worker = create_worker('default', job_class='rq.job.Job')
self.assertTrue(issubclass(worker.job_class, Job))
self.assertTrue(issubclass(worker.job_class, JobExecution))
6 changes: 6 additions & 0 deletions scheduler/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import croniter
from django.apps import apps
from django.utils import timezone
from django.utils.module_loading import import_string

from scheduler.queues import get_queues, logger, get_queue
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES
Expand Down Expand Up @@ -71,6 +72,11 @@ def create_worker(*queue_names, **kwargs):
kwargs['name'] = _calc_worker_name(existing_worker_names)

kwargs['name'] = kwargs['name'].replace('/', '.')

# Handle job_class if provided
if 'job_class' in kwargs and isinstance(kwargs['job_class'], str):
kwargs['job_class'] = import_string(kwargs['job_class'])

worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
return worker

Expand Down

0 comments on commit 4140d64

Please sign in to comment.