Skip to content

Commit

Permalink
feat:add job-class kwarg to rqworker (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriels1234 committed Sep 14, 2024
1 parent 0b6d3b5 commit 8a09f51
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 3 deletions.
4 changes: 4 additions & 0 deletions scheduler/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click
from django.core.management.base import BaseCommand
from django.db import connections
from django.template.defaultfilters import default
from redis.exceptions import ConnectionError
from rq.logutils import setup_loghandlers

Expand Down Expand Up @@ -47,6 +48,8 @@ def add_arguments(self, parser):
help='Maximum number of jobs to execute before terminating worker')
parser.add_argument('--fork-job-execution', action='store', default=True, dest='fork_job_execution', type=bool,
help='Fork job execution to another process')
parser.add_argument('--job-class', action='store', dest='job_class',
help='Jobs class to use')
parser.add_argument(
'queues', nargs='*', type=str,
help='The queues to work on, separated by space, all queues should be using the same redis')
Expand All @@ -71,6 +74,7 @@ def handle(self, **options):
w = create_worker(
*queues,
name=options['name'],
job_class=options.get('job_class'),
default_worker_ttl=options['worker_ttl'],
fork_job_execution=options['fork_job_execution'], )

Expand Down
11 changes: 8 additions & 3 deletions scheduler/rq_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,14 @@ def stop_execution(self, connection: Redis):

class DjangoWorker(Worker):
def __init__(self, *args, **kwargs):
self.fork_job_execution = kwargs.pop('fork_job_execution', True)
kwargs['job_class'] = JobExecution
kwargs['queue_class'] = DjangoQueue
self.fork_job_execution = kwargs.pop("fork_job_execution", True)
job_class = kwargs.get("job_class") or JobExecution
if not isinstance(job_class, type) or not issubclass(job_class, JobExecution):
raise ValueError("job_class must be a subclass of JobExecution")

# Update kwargs with the potentially modified job_class
kwargs["job_class"] = job_class
kwargs["queue_class"] = DjangoQueue
super(DjangoWorker, self).__init__(*args, **kwargs)

def __eq__(self, other):
Expand Down
33 changes: 33 additions & 0 deletions scheduler/tests/test_mgmt_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@ def test_rqworker__no_queues_params(self):
for job in jobs:
self.assertTrue(job.is_failed)

def test_rqworker__job_class_param__green(self):
queue = get_queue('default')

# enqueue some jobs that will fail
jobs = []
job_ids = []
for _ in range(0, 3):
job = queue.enqueue(failing_job)
jobs.append(job)
job_ids.append(job.id)

# Create a worker to execute these jobs
call_command('rqworker', '--job-class', 'scheduler.rq_classes.JobExecution', fork_job_execution=False, burst=True)

# check if all jobs are really failed
for job in jobs:
self.assertTrue(job.is_failed)

def test_rqworker__bad_job_class__fail(self):
queue = get_queue('default')

# enqueue some jobs that will fail
jobs = []
job_ids = []
for _ in range(0, 3):
job = queue.enqueue(failing_job)
jobs.append(job)
job_ids.append(job.id)

# Create a worker to execute these jobs
with self.assertRaises(ImportError):
call_command('rqworker', '--job-class', 'rq.badclass', fork_job_execution=False, burst=True)

def test_rqworker__run_jobs(self):
queue = get_queue('default')

Expand Down
12 changes: 12 additions & 0 deletions scheduler/tests/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import uuid

from rq.job import Job
from scheduler.rq_classes import JobExecution
from scheduler.tests.testtools import SchedulerBaseCase
from scheduler.tools import create_worker
from . import test_settings # noqa
Expand Down Expand Up @@ -38,3 +40,13 @@ def test_create_worker__scheduler_interval(self):
worker.work(burst=True)
self.assertEqual(worker.scheduler.interval, 1)
settings.SCHEDULER_CONFIG['SCHEDULER_INTERVAL'] = prev

def test_get_worker_with_custom_job_class(self):
# Test with string representation of job_class
worker = create_worker('default', job_class='scheduler.rq_classes.JobExecution')
self.assertTrue(issubclass(worker.job_class, Job))
self.assertTrue(issubclass(worker.job_class, JobExecution))

def test_get_worker_without_custom_job_class(self):
worker = create_worker('default')
self.assertTrue(issubclass(worker.job_class, JobExecution))
10 changes: 10 additions & 0 deletions scheduler/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import croniter
from django.apps import apps
from django.utils import timezone
from django.utils.module_loading import import_string

from scheduler.queues import get_queues, logger, get_queue
from scheduler.rq_classes import DjangoWorker, MODEL_NAMES
Expand Down Expand Up @@ -71,6 +72,15 @@ def create_worker(*queue_names, **kwargs):
kwargs['name'] = _calc_worker_name(existing_worker_names)

kwargs['name'] = kwargs['name'].replace('/', '.')

# Handle job_class if provided
if 'job_class' not in kwargs or kwargs["job_class"] is None:
kwargs['job_class'] = 'scheduler.rq_classes.JobExecution'
try:
kwargs['job_class'] = import_string(kwargs['job_class'])
except ImportError:
raise ImportError(f"Could not import job class {kwargs['job_class']}")

worker = DjangoWorker(queues, connection=queues[0].connection, **kwargs)
return worker

Expand Down

0 comments on commit 8a09f51

Please sign in to comment.