-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
68 lines (48 loc) · 1.85 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import logging
from collections.abc import AsyncGenerator
from aidbox_python_sdk.main import init_client, register_app, setup_routes
from aidbox_python_sdk.settings import Settings
from aiohttp import BasicAuth, ClientSession, web
from fhirpy import AsyncFHIRClient
from app import app_keys as ak
from app import config, operations # noqa: F401
from app.sdk import sdk
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
async def init_fhir_client(settings: Settings, prefix: str = "") -> AsyncFHIRClient:
basic_auth = BasicAuth(
login=settings.APP_INIT_CLIENT_ID,
password=settings.APP_INIT_CLIENT_SECRET,
)
return AsyncFHIRClient(
f"{settings.APP_INIT_URL}{prefix}",
authorization=basic_auth.encode(),
dump_resource=lambda x: x.model_dump(),
)
async def fhir_client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app[ak.fhir_client] = await init_fhir_client(app[ak.settings], "/fhir")
yield
async def client_ctx(app: web.Application) -> AsyncGenerator[None, None]:
app[ak.client] = await init_client(app[ak.settings])
yield
async def app_ctx(app: web.Application) -> AsyncGenerator[None, None]:
await register_app(app[ak.sdk], app[ak.client])
yield
async def client_session_ctx(app: web.Application) -> AsyncGenerator[None, None]:
session = ClientSession()
app[ak.session] = session
yield
await session.close()
def create_app() -> web.Application:
app = web.Application()
app[ak.sdk] = sdk
app[ak.settings] = sdk.settings
app.cleanup_ctx.append(client_session_ctx)
app.cleanup_ctx.append(client_ctx)
app.cleanup_ctx.append(fhir_client_ctx)
app.cleanup_ctx.append(app_ctx)
setup_routes(app)
return app
async def create_gunicorn_app() -> web.Application:
return create_app()