Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-21799] Refactor celery initialization #165

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/entrypoint_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ set -e

# exec is required to forward all signals to the main process
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"

5 changes: 0 additions & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@
# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = False

# set the environment variable before imports
# TODO: remove after global init of WorkerAppSettings in worker/__init__.py
os.environ["SYNCMASTER__ENCRYPTION__CRYPTO_KEY"] = "crypto_key"
os.environ["SYNCMASTER__DATABASE__URL"] = "postgresql+asyncpg://syncmaster:changeme@db:5432/syncmaster"
os.environ["SYNCMASTER__BROKER__URL"] = "amqp://guest:guest@localhost:5672/"

# -- Options for HTMLHelp output ------------------------------------------

Expand Down
12 changes: 12 additions & 0 deletions syncmaster/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from celery import Celery
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
Expand All @@ -20,6 +21,15 @@
from syncmaster.exceptions import SyncmasterError


def celery_factory(settings: Settings) -> Celery:
app = Celery(
__name__,
broker=settings.broker.url,
backend="db+" + settings.database.sync_url,
)
return app


def application_factory(settings: Settings) -> FastAPI:
application = FastAPI(
title="Syncmaster",
Expand All @@ -30,6 +40,7 @@ def application_factory(settings: Settings) -> FastAPI:
redoc_url=None,
)
application.state.settings = settings
application.state.celery = celery_factory(settings)
application.include_router(api_router)
application.exception_handler(RequestValidationError)(validation_exception_handler)
application.exception_handler(ValidationError)(validation_exception_handler)
Expand All @@ -44,6 +55,7 @@ def application_factory(settings: Settings) -> FastAPI:
{
Settings: lambda: settings,
UnitOfWork: get_uow(session_factory, settings=settings),
Celery: lambda: application.state.celery,
},
)

Expand Down
3 changes: 2 additions & 1 deletion syncmaster/backend/api/v1/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Annotated

from asgi_correlation_id import correlation_id
from celery import Celery
from fastapi import APIRouter, Depends, Query
from jinja2 import Template
from kombu.exceptions import KombuError
Expand All @@ -24,7 +25,6 @@
ReadRunSchema,
RunPageSchema,
)
from syncmaster.worker import celery

router = APIRouter(tags=["Runs"], responses=get_error_responses())

Expand Down Expand Up @@ -84,6 +84,7 @@ async def read_run(
async def start_run(
create_run_data: CreateRunSchema,
settings: Annotated[Settings, Depends(Stub(Settings))],
celery: Annotated[Celery, Depends(Stub(Celery))],
unit_of_work: UnitOfWork = Depends(UnitOfWork),
current_user: User = Depends(get_user(is_active=True)),
) -> ReadRunSchema:
Expand Down
14 changes: 12 additions & 2 deletions syncmaster/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from celery import Celery

from syncmaster.scheduler.settings import SchedulerAppSettings


def celery_factory(settings: SchedulerAppSettings) -> Celery:
app = Celery(
__name__,
broker=settings.broker.url,
backend="db+" + settings.database.sync_url,
)
return app
7 changes: 7 additions & 0 deletions syncmaster/scheduler/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.scheduler import celery_factory
from syncmaster.scheduler.settings import SchedulerAppSettings

# Global object, since the TransferJobManager.send_job_to_celery method is static
app = celery_factory(SchedulerAppSettings())
dolfinus marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 5 additions & 2 deletions syncmaster/scheduler/transfer_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from syncmaster.backend.services.unit_of_work import UnitOfWork
from syncmaster.db.models import RunType, Status, Transfer
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
from syncmaster.scheduler.celery import app as celery
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
from syncmaster.scheduler.utils import get_async_session
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
from syncmaster.worker import celery


class TransferJobManager:
Expand Down Expand Up @@ -50,8 +50,11 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
@staticmethod
async def send_job_to_celery(transfer_id: int) -> None:
"""
Do not pass additional arguments like settings,
1. Do not pass additional arguments like settings,
otherwise they will be serialized in jobs table.
2. Instance methods are bound to specific objects and cannot be reliably serialized
due to the weak reference problem. Use a static method instead, as it is not
object-specific and can be serialized.
"""
settings = Settings()

Expand Down
7 changes: 1 addition & 6 deletions syncmaster/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from syncmaster.worker.settings import WorkerAppSettings


def create_celery_app(settings) -> Celery:
def celery_factory(settings: WorkerAppSettings) -> Celery:
app = Celery(
__name__,
broker=settings.broker.url,
Expand All @@ -17,8 +17,3 @@ def create_celery_app(settings) -> Celery:
],
)
return app


# TODO: initialize celery app in __name__ == "__main__"
# then initialize celery app in backend via dependency injection and initialize in scheduler
celery = create_celery_app(WorkerAppSettings())
6 changes: 6 additions & 0 deletions syncmaster/worker/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from syncmaster.worker import celery_factory
from syncmaster.worker.settings import WorkerAppSettings

app = celery_factory(WorkerAppSettings())
IlyasDevelopment marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion syncmaster/worker/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from syncmaster.db.repositories.utils import decrypt_auth_data
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.settings.log import setup_logging
from syncmaster.worker import celery
from syncmaster.worker.celery import app as celery
from syncmaster.worker.controller import TransferController
from syncmaster.worker.settings import WorkerAppSettings

Expand Down
26 changes: 24 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import time
from collections.abc import AsyncGenerator, Callable
from pathlib import Path
from unittest.mock import AsyncMock, Mock

import pytest
import pytest_asyncio
from alembic.config import Config as AlembicConfig
from celery import Celery
from fastapi import FastAPI
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand All @@ -23,7 +25,7 @@
from syncmaster.backend.utils.jwt import sign_jwt
from syncmaster.db.models import Base
from syncmaster.scheduler.settings import SchedulerAppSettings
from syncmaster.worker import create_celery_app
from syncmaster.worker import celery_factory
from syncmaster.worker.settings import WorkerAppSettings
from tests.mocks import UserTestRoles
from tests.settings import TestSettings
Expand Down Expand Up @@ -129,6 +131,26 @@ async def session(sessionmaker: async_sessionmaker[AsyncSession]):
await session.close()


@pytest.fixture(scope="session")
def mocked_celery() -> Celery:
celery_app = Mock(Celery)
celery_app.send_task = AsyncMock()
return celery_app


@pytest_asyncio.fixture(scope="session")
async def app(settings: Settings, mocked_celery: Celery) -> FastAPI:
app = application_factory(settings=settings)
app.dependency_overrides[Celery] = lambda: mocked_celery
return app


@pytest_asyncio.fixture(scope="session")
async def client_with_mocked_celery(app: FastAPI) -> AsyncGenerator:
async with AsyncClient(app=app, base_url="http://testserver") as client:
yield client


@pytest_asyncio.fixture(scope="session")
async def client(settings: Settings) -> AsyncGenerator:
logger.info("START CLIENT FIXTURE")
Expand All @@ -140,7 +162,7 @@ async def client(settings: Settings) -> AsyncGenerator:

@pytest.fixture(scope="session", params=[{}])
def celery(worker_settings: WorkerAppSettings) -> Celery:
celery_app = create_celery_app(worker_settings)
celery_app = celery_factory(worker_settings)
return celery_app


Expand Down
2 changes: 1 addition & 1 deletion tests/test_integration/celery_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from syncmaster.worker import celery
from syncmaster.worker.celery import app as celery

celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])
3 changes: 2 additions & 1 deletion tests/test_integration/test_scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from syncmaster.backend.settings import ServerAppSettings as Settings
from syncmaster.db.models import Run, Status
from syncmaster.scheduler import TransferFetcher, TransferJobManager
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
from tests.mocks import MockTransfer

pytestmark = [pytest.mark.asyncio, pytest.mark.worker, pytest.mark.scheduler_integration]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_integration/test_scheduler/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from syncmaster.db.models.run import Run, Status
from syncmaster.exceptions.run import RunNotFoundError
from syncmaster.worker import celery
from syncmaster.scheduler.celery import app as celery
from syncmaster.worker.base import WorkerTask


Expand Down
Loading
Loading