From 8b584e5d6e3bf0f83bd11f9abd59a3a61f9d7179 Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Fri, 9 May 2025 17:25:08 +0200 Subject: [PATCH 1/2] chore: run in single process --- bases/renku_data_services/data_api/main.py | 39 ++++++++-------------- projects/renku_data_service/Dockerfile | 1 + 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/bases/renku_data_services/data_api/main.py b/bases/renku_data_services/data_api/main.py index 0b938e83b..e83188baa 100644 --- a/bases/renku_data_services/data_api/main.py +++ b/bases/renku_data_services/data_api/main.py @@ -3,14 +3,12 @@ import argparse import asyncio from os import environ -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Final import sentry_sdk -import uvloop from sanic import Request, Sanic from sanic.log import logger from sanic.response import BaseHTTPResponse -from sanic.worker.loader import AppLoader from sentry_sdk.integrations.asyncio import AsyncioIntegration from sentry_sdk.integrations.grpc import GRPCIntegration from sentry_sdk.integrations.sanic import SanicIntegration, _context_enter, _context_exit, _set_transaction @@ -20,7 +18,7 @@ from renku_data_services.authz.admin_sync import sync_admins_from_keycloak from renku_data_services.base_models.core import APIUser from renku_data_services.data_api.app import register_all_handlers -from renku_data_services.data_api.prometheus import setup_app_metrics, setup_prometheus +from renku_data_services.data_api.prometheus import setup_prometheus from renku_data_services.errors.errors import ( ForbiddenError, MissingResourceError, @@ -36,27 +34,17 @@ import sentry_sdk._types -async def _solr_reindex(app: Sanic) -> None: +async def solr_reindex(config: Config) -> None: """Run a solr reindex of all data. This might be required after migrating the solr schema. """ - config = Config.from_env() + logger.info("starting SOLR reindexing.") reprovision = config.search_reprovisioning admin = APIUser(is_admin=True) await reprovision.run_reprovision(admin) -def solr_reindex(app_name: str) -> None: - """Runs a solr reindex.""" - app = Sanic(app_name) - setup_app_metrics(app) - - logger.info("Running SOLR reindex triggered by a migration") - asyncio.set_event_loop(uvloop.new_event_loop()) - asyncio.run(_solr_reindex(app)) - - def create_app() -> Sanic: """Create a Sanic application.""" config = Config.from_env() @@ -145,19 +133,22 @@ async def do_solr_migrations(app: Sanic) -> None: @app.before_server_start async def setup_rclone_validator(app: Sanic) -> None: + logger.info("Setting up rclone validator") validator = RCloneValidator() app.ext.dependency(validator) @app.main_process_ready - async def ready(app: Sanic) -> None: - """Application ready event handler.""" - logger.info("starting events background job.") - if getattr(app.ctx, "solr_reindex", False): - app.manager.manage("SolrReindex", solr_reindex, {"app_name": app.name}, transient=True) + async def do_solr_reindex(app: Sanic) -> None: + """Reindex solr if needed.""" + if not getattr(app.ctx, "solr_reindex", False): + return + app.add_task(solr_reindex(config), name="solr_reindex") return app +sanic_app: Final[Sanic] = create_app() + if __name__ == "__main__": parser = argparse.ArgumentParser(prog="Renku Data Services") # NOTE: K8s probes will fail if listening only on 127.0.0.1 - so we listen on 0.0.0.0 @@ -165,10 +156,8 @@ async def ready(app: Sanic) -> None: parser.add_argument("-p", "--port", default=8000, type=int, help="Port to listen on") parser.add_argument("--debug", action="store_true", help="Enable Sanic debug mode") parser.add_argument("--fast", action="store_true", help="Enable Sanic fast mode") + parser.add_argument("--workers", default=1, type=int, help="The number of workers to use.") parser.add_argument("-d", "--dev", action="store_true", help="Enable Sanic development mode") parser.add_argument("--single-process", action="store_true", help="Do not use multiprocessing.") args: dict[str, Any] = vars(parser.parse_args()) - loader = AppLoader(factory=create_app) - app = loader.load() - app.prepare(**args) - Sanic.serve(primary=app, app_loader=loader) + sanic_app.run(**args) diff --git a/projects/renku_data_service/Dockerfile b/projects/renku_data_service/Dockerfile index 67ca118f9..cf44c3fb3 100644 --- a/projects/renku_data_service/Dockerfile +++ b/projects/renku_data_service/Dockerfile @@ -47,3 +47,4 @@ USER $USER_UID:$USER_GID WORKDIR /app COPY --from=builder /app/env ./env ENTRYPOINT ["tini", "-g", "--", "env/bin/python", "-m", "renku_data_services.data_api.main"] +CMD ["--single-process"] From 3fe92ae6bec7797b01ad49ff522cee1ccf85edce Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Fri, 9 May 2025 18:06:00 +0200 Subject: [PATCH 2/2] squashme: minor fix --- bases/renku_data_services/data_api/main.py | 6 +++--- bases/renku_data_services/data_api/prometheus.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/bases/renku_data_services/data_api/main.py b/bases/renku_data_services/data_api/main.py index e83188baa..cc9f83dff 100644 --- a/bases/renku_data_services/data_api/main.py +++ b/bases/renku_data_services/data_api/main.py @@ -115,13 +115,13 @@ async def handle_head(request: Request, response: BaseHTTPResponse) -> None: if request.method == "HEAD": response.body = None - @app.main_process_start + @app.before_server_start async def do_migrations(_: Sanic) -> None: logger.info("running migrations") run_migrations_for_app("common") await config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool) - @app.main_process_start + @app.before_server_start async def do_solr_migrations(app: Sanic) -> None: logger.info(f"Running SOLR migrations at: {config.solr_config}") migrator = SchemaMigrator(config.solr_config) @@ -137,7 +137,7 @@ async def setup_rclone_validator(app: Sanic) -> None: validator = RCloneValidator() app.ext.dependency(validator) - @app.main_process_ready + @app.after_server_start async def do_solr_reindex(app: Sanic) -> None: """Reindex solr if needed.""" if not getattr(app.ctx, "solr_reindex", False): diff --git a/bases/renku_data_services/data_api/prometheus.py b/bases/renku_data_services/data_api/prometheus.py index 18d58ea40..75440dd40 100644 --- a/bases/renku_data_services/data_api/prometheus.py +++ b/bases/renku_data_services/data_api/prometheus.py @@ -44,7 +44,8 @@ async def collect_system_metrics(app: Sanic, name: str) -> None: async def collect_system_metrics_task(app: Sanic) -> None: """Background task to collect metrics.""" while True: - await collect_system_metrics(app, app.m.name) + name = app.name if not hasattr(app.multiplexer) else app.multiplexer.name + await collect_system_metrics(app, name) await asyncio.sleep(5)