From 84e9cc5a6aa8d3427876ad01083c0f72a90f0a2c Mon Sep 17 00:00:00 2001 From: Pedro Martins Date: Mon, 11 Nov 2024 16:02:49 -0300 Subject: [PATCH 1/3] Update models.py transaction to be using the user defined router Attemps to resolve the issue #182 --- django_apscheduler/models.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/django_apscheduler/models.py b/django_apscheduler/models.py index 7a5efb6..e34d4c1 100644 --- a/django_apscheduler/models.py +++ b/django_apscheduler/models.py @@ -1,6 +1,6 @@ from datetime import timedelta, datetime -from django.db import models, transaction +from django.db import models, transaction, routers from django.db.models import UniqueConstraint from django.utils import timezone from django.utils.translation import gettext_lazy as _ @@ -162,7 +162,8 @@ def atomic_update_or_create( finished = finished.timestamp() try: - with transaction.atomic(): + db = router.db_for_write(DjangoJobExecution) + with transaction.atomic(using=db): job_execution = DjangoJobExecution.objects.select_for_update().get( job_id=job_id, run_time=run_time ) From 3614b31023834dfccdae0824b87afbee879dc1e0 Mon Sep 17 00:00:00 2001 From: Pedro Martins Date: Mon, 11 Nov 2024 16:07:31 -0300 Subject: [PATCH 2/3] Update jobstores.py --- django_apscheduler/jobstores.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/django_apscheduler/jobstores.py b/django_apscheduler/jobstores.py index cdca9f3..2f0ee92 100644 --- a/django_apscheduler/jobstores.py +++ b/django_apscheduler/jobstores.py @@ -11,7 +11,7 @@ from apscheduler.schedulers.base import BaseScheduler from django import db -from django.db import transaction, IntegrityError +from django.db import transaction, IntegrityError, router from django_apscheduler import util from django_apscheduler.models import DjangoJob, DjangoJobExecution @@ -233,7 +233,8 @@ def get_all_jobs(self): @util.retry_on_db_operational_error def add_job(self, job: AppSchedulerJob): - with transaction.atomic(): + db = router.db_for_write(DjangoJob) + with transaction.atomic(using=db): try: return DjangoJob.objects.create( id=job.id, @@ -246,7 +247,8 @@ def add_job(self, job: AppSchedulerJob): @util.retry_on_db_operational_error def update_job(self, job: AppSchedulerJob): # Acquire lock for update - with transaction.atomic(): + db = router.db_for_write(DjangoJob) + with transaction.atomic(using=db): try: db_job = DjangoJob.objects.select_for_update().get(id=job.id) @@ -262,7 +264,8 @@ def update_job(self, job: AppSchedulerJob): @util.retry_on_db_operational_error def remove_job(self, job_id: str): - with transaction.atomic(): + db = router.db_for_write(DjangoJob) + with transaction.atomic(using=db): try: DjangoJob.objects.select_for_update().get(id=job_id).delete() except DjangoJob.DoesNotExist: From b30712c5de0d7c75f51fb458a7c0474760d94c6d Mon Sep 17 00:00:00 2001 From: Pedro Martins Date: Mon, 11 Nov 2024 16:10:48 -0300 Subject: [PATCH 3/3] Update 0005_migrate_name_to_id.py --- .../migrations/0005_migrate_name_to_id.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/django_apscheduler/migrations/0005_migrate_name_to_id.py b/django_apscheduler/migrations/0005_migrate_name_to_id.py index a55b02e..aef4cc7 100644 --- a/django_apscheduler/migrations/0005_migrate_name_to_id.py +++ b/django_apscheduler/migrations/0005_migrate_name_to_id.py @@ -9,25 +9,26 @@ def migrate_name_to_id(apps, schema_editor): JobExecutionModel = apps.get_model("django_apscheduler", "DjangoJobExecution") migrated_id_mappings = {} migrated_job_executions = [] - + db = schema_editor.connection.alias + # Copy 'name' to 'id'. - for job in JobModel.objects.all(): + for job in JobModel.objects.using(db).all(): migrated_id_mappings[job.id] = job.name job.id = job.name job.name = f"{job.name}_tmp" job.save() # Update all job execution references - for job_execution in JobExecutionModel.objects.filter( + for job_execution in JobExecutionModel.objects.using(db).filter( job_id__in=migrated_id_mappings ): job_execution.job_id = migrated_id_mappings[job_execution.job_id] migrated_job_executions.append(job_execution) - JobExecutionModel.objects.bulk_update(migrated_job_executions, ["job_id"]) + JobExecutionModel.objects.using(db).bulk_update(migrated_job_executions, ["job_id"]) # Remove old jobs - JobModel.objects.filter(id__in=migrated_id_mappings).delete() + JobModel.objects.using(db).filter(id__in=migrated_id_mappings).delete() class Migration(migrations.Migration):