From 431ab07764cb80928b977b73ebb2ca2d4fa0db82 Mon Sep 17 00:00:00 2001 From: maxim-lixakov Date: Thu, 12 Dec 2024 12:14:12 +0300 Subject: [PATCH] [DOP-22267] - add removing orphan transfers from scheduler --- syncmaster/scheduler/__main__.py | 3 +++ syncmaster/scheduler/transfer_job_manager.py | 20 +++++++++++++++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/syncmaster/scheduler/__main__.py b/syncmaster/scheduler/__main__.py index 0561457d..d00449c7 100755 --- a/syncmaster/scheduler/__main__.py +++ b/syncmaster/scheduler/__main__.py @@ -20,6 +20,8 @@ async def main(): while True: logger.info("Looking at the transfer table...") + + await transfer_job_manager.remove_orphan_jobs() transfers = await transfer_fetcher.fetch_updated_jobs() if transfers: @@ -29,6 +31,7 @@ async def main(): ", ".join(str(t.id) for t in transfers), ) transfer_job_manager.update_jobs(transfers) + transfer_fetcher.last_updated_at = max(t.updated_at for t in transfers) logger.info("Scheduler state has been updated. Last updated at: %s", transfer_fetcher.last_updated_at) diff --git a/syncmaster/scheduler/transfer_job_manager.py b/syncmaster/scheduler/transfer_job_manager.py index 98c54edf..a417721e 100644 --- a/syncmaster/scheduler/transfer_job_manager.py +++ b/syncmaster/scheduler/transfer_job_manager.py @@ -5,10 +5,12 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from kombu.exceptions import KombuError +from sqlalchemy import select from syncmaster.backend.services.unit_of_work import UnitOfWork from syncmaster.db.models import RunType, Status, Transfer from syncmaster.exceptions.run import CannotConnectToTaskQueueError +from syncmaster.exceptions.transfer import TransferNotFoundError from syncmaster.scheduler.celery import app as celery from syncmaster.scheduler.settings import SchedulerAppSettings as Settings from syncmaster.scheduler.utils import get_async_session @@ -47,6 +49,18 @@ def update_jobs(self, transfers: list[Transfer]) -> None: args=(transfer.id,), ) + async def remove_orphan_jobs(self) -> None: + all_jobs = self.scheduler.get_jobs() + settings = self.settings + + async with get_async_session(settings) as session: + transfer_ids = {t.id for t in (await session.execute(select(Transfer))).scalars().all()} + + for job in all_jobs: + transfer_id = int(job.id) + if transfer_id not in transfer_ids: + self.scheduler.remove_job(job.id) + @staticmethod async def send_job_to_celery(transfer_id: int) -> None: """ @@ -61,7 +75,11 @@ async def send_job_to_celery(transfer_id: int) -> None: async with get_async_session(settings) as session: unit_of_work = UnitOfWork(session=session, settings=settings) - transfer = await unit_of_work.transfer.read_by_id(transfer_id) + try: + transfer = await unit_of_work.transfer.read_by_id(transfer_id) + except TransferNotFoundError: + return + credentials_source = await unit_of_work.credentials.read(transfer.source_connection_id) credentials_target = await unit_of_work.credentials.read(transfer.target_connection_id)