Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 17 additions & 28 deletions bases/renku_data_services/data_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import argparse
import asyncio
from os import environ
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Final

import sentry_sdk
import uvloop
from sanic import Request, Sanic
from sanic.log import logger
from sanic.response import BaseHTTPResponse
from sanic.worker.loader import AppLoader
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from sentry_sdk.integrations.grpc import GRPCIntegration
from sentry_sdk.integrations.sanic import SanicIntegration, _context_enter, _context_exit, _set_transaction
Expand All @@ -20,7 +18,7 @@
from renku_data_services.authz.admin_sync import sync_admins_from_keycloak
from renku_data_services.base_models.core import APIUser
from renku_data_services.data_api.app import register_all_handlers
from renku_data_services.data_api.prometheus import setup_app_metrics, setup_prometheus
from renku_data_services.data_api.prometheus import setup_prometheus
from renku_data_services.errors.errors import (
ForbiddenError,
MissingResourceError,
Expand All @@ -36,27 +34,17 @@
import sentry_sdk._types


async def _solr_reindex(app: Sanic) -> None:
async def solr_reindex(config: Config) -> None:
"""Run a solr reindex of all data.

This might be required after migrating the solr schema.
"""
config = Config.from_env()
logger.info("starting SOLR reindexing.")
reprovision = config.search_reprovisioning
admin = APIUser(is_admin=True)
await reprovision.run_reprovision(admin)


def solr_reindex(app_name: str) -> None:
"""Runs a solr reindex."""
app = Sanic(app_name)
setup_app_metrics(app)

logger.info("Running SOLR reindex triggered by a migration")
asyncio.set_event_loop(uvloop.new_event_loop())
asyncio.run(_solr_reindex(app))


def create_app() -> Sanic:
"""Create a Sanic application."""
config = Config.from_env()
Expand Down Expand Up @@ -127,13 +115,13 @@ async def handle_head(request: Request, response: BaseHTTPResponse) -> None:
if request.method == "HEAD":
response.body = None

@app.main_process_start
@app.before_server_start
async def do_migrations(_: Sanic) -> None:
logger.info("running migrations")
run_migrations_for_app("common")
await config.rp_repo.initialize(config.db.conn_url(async_client=False), config.default_resource_pool)

@app.main_process_start
@app.before_server_start
async def do_solr_migrations(app: Sanic) -> None:
logger.info(f"Running SOLR migrations at: {config.solr_config}")
migrator = SchemaMigrator(config.solr_config)
Expand All @@ -145,30 +133,31 @@ async def do_solr_migrations(app: Sanic) -> None:

@app.before_server_start
async def setup_rclone_validator(app: Sanic) -> None:
logger.info("Setting up rclone validator")
validator = RCloneValidator()
app.ext.dependency(validator)

@app.main_process_ready
async def ready(app: Sanic) -> None:
"""Application ready event handler."""
logger.info("starting events background job.")
if getattr(app.ctx, "solr_reindex", False):
app.manager.manage("SolrReindex", solr_reindex, {"app_name": app.name}, transient=True)
@app.after_server_start
async def do_solr_reindex(app: Sanic) -> None:
"""Reindex solr if needed."""
if not getattr(app.ctx, "solr_reindex", False):
return
app.add_task(solr_reindex(config), name="solr_reindex")

return app


sanic_app: Final[Sanic] = create_app()

if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="Renku Data Services")
# NOTE: K8s probes will fail if listening only on 127.0.0.1 - so we listen on 0.0.0.0
parser.add_argument("-H", "--host", default="0.0.0.0", help="Host to listen on") # nosec B104
parser.add_argument("-p", "--port", default=8000, type=int, help="Port to listen on")
parser.add_argument("--debug", action="store_true", help="Enable Sanic debug mode")
parser.add_argument("--fast", action="store_true", help="Enable Sanic fast mode")
parser.add_argument("--workers", default=1, type=int, help="The number of workers to use.")
parser.add_argument("-d", "--dev", action="store_true", help="Enable Sanic development mode")
parser.add_argument("--single-process", action="store_true", help="Do not use multiprocessing.")
args: dict[str, Any] = vars(parser.parse_args())
loader = AppLoader(factory=create_app)
app = loader.load()
app.prepare(**args)
Sanic.serve(primary=app, app_loader=loader)
sanic_app.run(**args)
3 changes: 2 additions & 1 deletion bases/renku_data_services/data_api/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ async def collect_system_metrics(app: Sanic, name: str) -> None:
async def collect_system_metrics_task(app: Sanic) -> None:
"""Background task to collect metrics."""
while True:
await collect_system_metrics(app, app.m.name)
name = app.name if not hasattr(app.multiplexer) else app.multiplexer.name
await collect_system_metrics(app, name)
await asyncio.sleep(5)


Expand Down
1 change: 1 addition & 0 deletions projects/renku_data_service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ USER $USER_UID:$USER_GID
WORKDIR /app
COPY --from=builder /app/env ./env
ENTRYPOINT ["tini", "-g", "--", "env/bin/python", "-m", "renku_data_services.data_api.main"]
CMD ["--single-process"]
Loading