From de28b6648b4626623d4b91c26dee1200a387af86 Mon Sep 17 00:00:00 2001 From: Gregor Lichtner Date: Tue, 17 Dec 2024 17:01:01 +0100 Subject: [PATCH 1/2] feat: add recommendation,criterion details to database --- execution_engine/constants.py | 1 + execution_engine/execution_engine.py | 13 +++++++++++++ execution_engine/omop/db/celida/tables.py | 9 ++++++++- execution_engine/task/runner.py | 2 -- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/execution_engine/constants.py b/execution_engine/constants.py index f76b5858..86a9ed59 100644 --- a/execution_engine/constants.py +++ b/execution_engine/constants.py @@ -60,6 +60,7 @@ class OMOPConcepts(Enum): Collection of standard concepts in the OMOP CDM. """ + UNKNOWN = 0 VISIT_TYPE_STILL_PATIENT = 32220 BODY_HEIGHT = 3036277 # Body height (observation; from LOINC) BODY_WEIGHT_LOINC = 3025315 # Body weight (maps to LOINC code) diff --git a/execution_engine/execution_engine.py b/execution_engine/execution_engine.py index 80b2daba..77a8962d 100644 --- a/execution_engine/execution_engine.py +++ b/execution_engine/execution_engine.py @@ -10,6 +10,7 @@ from execution_engine import __version__ from execution_engine.builder import ExecutionEngineBuilder from execution_engine.clients import fhir_client, omopdb +from execution_engine.constants import OMOPConcepts from execution_engine.converter.recommendation_factory import ( FhirToRecommendationFactory, ) @@ -236,6 +237,7 @@ def register_recommendation(self, recommendation: cohort.Recommendation) -> None .values( recommendation_name=recommendation.name, recommendation_title=recommendation.title, + recommendation_description=recommendation.description, recommendation_url=recommendation.url, recommendation_version=recommendation.version, recommendation_package_version=recommendation.package_version, @@ -343,6 +345,17 @@ def register_criterion(self, criterion: Criterion) -> None: insert(result_db.Criterion) .values( criterion_hash=crit_hash, + criterion_concept_id=( + criterion.concept.concept_id + if hasattr(criterion, "concept") + else OMOPConcepts.UNKNOWN + ), + criterion_concept_name=( + criterion.concept.concept_name + if hasattr(criterion, "concept") + else "unknown" + ), + criterion_json=criterion.json(), criterion_description=criterion.description(), ) .returning(result_db.Criterion.criterion_id) diff --git a/execution_engine/omop/db/celida/tables.py b/execution_engine/omop/db/celida/tables.py index 0db17e26..b3cdd304 100644 --- a/execution_engine/omop/db/celida/tables.py +++ b/execution_engine/omop/db/celida/tables.py @@ -37,7 +37,6 @@ CohortCategoryEnum = Enum(CohortCategory, name="cohort_category", schema="public") - class Recommendation(Base): # noqa: D101 __tablename__ = "recommendation" __table_args__ = {"schema": SCHEMA_NAME} @@ -49,6 +48,7 @@ class Recommendation(Base): # noqa: D101 ) recommendation_name: Mapped[str] recommendation_title: Mapped[str] + recommendation_description: Mapped[str] recommendation_url: Mapped[str] = mapped_column( String(255), nullable=False, index=True ) @@ -91,6 +91,13 @@ class Criterion(Base): # noqa: D101 ) # todo: add link to recommendation or 1:n to population/intervention pair? criterion_description: Mapped[str] + criterion_concept_id: Mapped[int] = mapped_column( + BigInteger(), + # ForeignKey(f"{OMOP_SCHEMA_NAME}.concept.concept_id"), + index=True, + ) + criterion_concept_name: Mapped[str] + criterion_json: Mapped[str] criterion_hash: Mapped[str] = mapped_column(String(64), index=True, unique=True) diff --git a/execution_engine/task/runner.py b/execution_engine/task/runner.py index 9379fb9a..dafa2aca 100644 --- a/execution_engine/task/runner.py +++ b/execution_engine/task/runner.py @@ -191,7 +191,6 @@ def run(self, bind_params: dict) -> None: try: while len(self.completed_tasks) < len(self.tasks): self.enqueue_ready_tasks() - logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks") if self.queue.empty() and not any( task.status == TaskStatus.RUNNING for task in self.tasks @@ -317,7 +316,6 @@ def task_executor_worker() -> None: break self.enqueue_ready_tasks() - logging.info(f"{len(self.completed_tasks)}/{len(self.tasks)} tasks") if self.completed_tasks == self.enqueued_tasks and len( self.completed_tasks From 90fd86c4c09ea02e0adc22541689e72da4832910 Mon Sep 17 00:00:00 2001 From: Gregor Lichtner Date: Tue, 17 Dec 2024 17:01:48 +0100 Subject: [PATCH 2/2] feat(viz-backend): add endpoints --- apps/viz-backend/app/database.py | 77 ++++++- apps/viz-backend/app/main.py | 350 +++++++++++++++++++++++++++++-- apps/viz-backend/app/models.py | 15 +- 3 files changed, 418 insertions(+), 24 deletions(-) diff --git a/apps/viz-backend/app/database.py b/apps/viz-backend/app/database.py index 4a8deeda..ad0495fb 100644 --- a/apps/viz-backend/app/database.py +++ b/apps/viz-backend/app/database.py @@ -1,7 +1,7 @@ from urllib.parse import quote from settings import config -from sqlalchemy import create_engine, event +from sqlalchemy import MetaData, Table, create_engine, event from sqlalchemy.engine.interfaces import DBAPIConnection from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker @@ -46,3 +46,78 @@ def set_timezone( SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() + +metadata = MetaData() + + +def reflect_tables() -> dict[str, Table]: + """ + Reflect tables and views from the specified schemas. + Returns dynamically reflected tables. + """ + result_schema = config.omop.result_schema # Schema for result views + data_schema = config.omop.data_schema # Schema for data tables + + interval_result = Table( + "interval_result", metadata, autoload_with=engine, schema=result_schema + ) + full_day_coverage = Table( + "full_day_coverage", metadata, autoload_with=engine, schema=result_schema + ) + partial_day_coverage = Table( + "partial_day_coverage", metadata, autoload_with=engine, schema=result_schema + ) + criterion = Table("criterion", metadata, autoload_with=engine, schema=result_schema) + + concept = Table("concept", metadata, autoload_with=engine, schema=data_schema) + person = Table("person", metadata, autoload_with=engine, schema=data_schema) + measurement = Table( + "measurement", metadata, autoload_with=engine, schema=data_schema + ) + visit_occurrence = Table( + "visit_occurrence", metadata, autoload_with=engine, schema=data_schema + ) + drug_exposure = Table( + "drug_exposure", metadata, autoload_with=engine, schema=data_schema + ) + observation = Table( + "observation", metadata, autoload_with=engine, schema=data_schema + ) + condition_occurrence = Table( + "condition_occurrence", metadata, autoload_with=engine, schema=data_schema + ) + procedure_occurrence = Table( + "procedure_occurrence", metadata, autoload_with=engine, schema=data_schema + ) + + return { + "interval_result": interval_result, + "full_day_coverage": full_day_coverage, + "partial_day_coverage": partial_day_coverage, + "criterion": criterion, + "concept": concept, + "person": person, + "measurement": measurement, + "visit_occurrence": visit_occurrence, + "drug_exposure": drug_exposure, + "observation": observation, + "condition_occurrence": condition_occurrence, + "procedure_occurrence": procedure_occurrence, + } + + +# Reflect tables and expose them +tables = reflect_tables() +interval_result = tables["interval_result"] +full_day_coverage = tables["full_day_coverage"] +partial_day_coverage = tables["partial_day_coverage"] +criterion = tables["criterion"] + +concept = tables["concept"] +person = tables["person"] +measurement = tables["measurement"] +visit_occurrence = tables["visit_occurrence"] +drug_exposure = tables["drug_exposure"] +observation = tables["observation"] +condition_occurrence = tables["condition_occurrence"] +procedure_occurrence = tables["procedure_occurrence"] diff --git a/apps/viz-backend/app/main.py b/apps/viz-backend/app/main.py index 830b1381..0f375e93 100644 --- a/apps/viz-backend/app/main.py +++ b/apps/viz-backend/app/main.py @@ -1,13 +1,28 @@ +import datetime import json import re from typing import List -from database import SessionLocal +from database import ( + SessionLocal, + concept, + condition_occurrence, + criterion, + drug_exposure, + full_day_coverage, + interval_result, + measurement, + observation, + partial_day_coverage, + person, + procedure_occurrence, + visit_occurrence, +) from fastapi import Depends, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware -from models import Interval, Recommendation, RecommendationRun +from models import DayCoverage, Interval, Recommendation, RecommendationRun from settings import config -from sqlalchemy import text +from sqlalchemy import and_, func, join, or_, select, text, union from sqlalchemy.orm import Session app = FastAPI() @@ -103,7 +118,8 @@ def get_execution_runs(db: Session = Depends(get_db)) -> dict: result = db.execute( text( f""" - SELECT run_id, observation_start_datetime, observation_end_datetime, run_datetime, r.recommendation_name + SELECT run_id, observation_start_datetime, observation_end_datetime, run_datetime, + r.recommendation_name, r.recommendation_title, r.recommendation_url, r.recommendation_description FROM {result_schema}.execution_run INNER JOIN {result_schema}.recommendation r ON r.recommendation_id = execution_run.recommendation_id """ # nosec: result_schema is checked above (is_valid_identifier) @@ -151,32 +167,225 @@ def get_intervals( detail="Only one of person_id or person_source_value can be provided", ) - params: dict[str, int | str] = {"run_id": run_id} + # Base query + base_query = select(interval_result).where(interval_result.c.run_id == run_id) + # Add filters for person_id or person_source_value if person_id is not None: - query = f""" - SELECT * - FROM {result_schema}.interval_result - WHERE run_id = :run_id - AND person_id = :person_id - """ # nosec: result_schema is checked above (is_valid_identifier) - params["person_id"] = person_id + base_query = base_query.where(interval_result.c.person_id == person_id) elif person_source_value is not None: - query = f""" - SELECT ir.* - FROM {result_schema}.interval_result ir - INNER JOIN person p ON ir.person_id = p.person_id - WHERE ir.run_id = :run_id - AND p.person_source_value = :person_source_value - """ # nosec: result_schema is checked above (is_valid_identifier) - params["person_source_value"] = str(person_source_value) + joined_table = join( + interval_result, person, interval_result.c.person_id == person.c.person_id + ) + base_query = ( + select(interval_result) + .select_from(joined_table) + .where( + and_( + interval_result.c.run_id == run_id, + person.c.person_source_value == person_source_value, + ) + ) + ) + + # Execute query + result = db.execute(base_query) + rows = result.fetchall() + + if not rows: + raise HTTPException( + status_code=404, + detail=f"No intervals found for run_id {run_id} with the specified parameters.", + ) - result = db.execute(text(query), params) return result.fetchall() +@app.get("/criteria/{run_id}/{person_id}", response_model=List[dict]) +def get_criteria_for_person( + person_id: int, + run_id: int, + start_datetime: datetime.datetime, + end_datetime: datetime.datetime, + db: Session = Depends(get_db), +) -> list[dict]: + """ + Retrieve all raw data criteria for a person within a specified time range. + """ + # Step 1: Get all criterion_ids from result_interval where criterion_id is not NULL + query = ( + select( + interval_result.c.criterion_id, + interval_result.c.cohort_category, + criterion.c.criterion_concept_id, + concept.c.concept_name, + ) + .join(criterion, interval_result.c.criterion_id == criterion.c.criterion_id) + .join( + concept, + criterion.c.criterion_concept_id == concept.c.concept_id, + isouter=True, + ) + .where( + and_( + interval_result.c.person_id == person_id, + interval_result.c.criterion_id.is_not(None), + # interval_result.c.interval_start >= start_datetime, + # interval_result.c.interval_end <= end_datetime, + interval_result.c.run_id == run_id, + ) + ) + .distinct() + ) + + result = db.execute(query) + rows = result.fetchall() + + return [ + { + "criterion_id": row.criterion_id, + "concept_id": row.criterion_concept_id, + "concept_name": row.concept_name, + "cohort_category": row.cohort_category, + } + for row in rows + ] + + +### SECOND ENDPOINT: Retrieve raw data for a person and criterion +@app.get("/person/{person_id}/data", response_model=List[dict]) +def get_raw_data_for_person( + person_id: int, + concept_id: int, + start_datetime: datetime.datetime, + end_datetime: datetime.datetime, + db: Session = Depends(get_db), +) -> list[dict]: + """ + Retrieve raw data for a person based on a criterion ID within a specified time range. + """ + # Step 1: Resolve domain_id from the concept table via criterion_id + domain_query = ( + select(concept.c.domain_id) + # .join(criterion, criterion.c.concept_id == concept.c.concept_id) + .where(concept.c.concept_id == concept_id) + ) + domain_result = db.execute(domain_query).scalar() + + if not domain_result: + raise HTTPException( + status_code=404, detail=f"No domain found for concept_id {concept_id}" + ) + + # Step 2: Identify the correct table based on domain_id + table_mapping = { + "Measurement": (tables["measurement"], measurement), + "Visit": (tables["visit_occurrence"], visit_occurrence), + "Drug": (tables["drug_exposure"], drug_exposure), + "Observation": (tables["observation"], observation), + "Condition": (tables["condition_occurrence"], condition_occurrence), + "Procedure": (tables["procedure_occurrence"], procedure_occurrence), + } + target_table = table_mapping.get(domain_result) + + if not target_table: + raise HTTPException( + status_code=400, detail=f"No table mapped for domain_id {domain_result}" + ) + + # condition_occurrence": { + # "concept_id": "condition_concept_id", + # "columns": [ + # "condition_start_datetime", + # "condition_end_datetime", + # ], + # "sort_keys": ["condition_start_datetime"], + # }, + + # Step 3: Query the relevant table + t = target_table[1] + columns = target_table[0] + col_concept = columns["concept_id"] + + if t == measurement: + cols = select( + t.c.measurement_datetime.label("start_datetime"), + t.c.value_as_number.label("value"), + t.c.unit_concept_id, + ) + col_datetime_start, col_datetime_end = ( + "measurement_datetime", + "measurement_datetime", + ) + elif t == observation: + cols = select( + t.c.observation_datetime.label("start_datetime"), + t.c.value_as_number.label("value"), + ) + col_datetime_start, col_datetime_end = ( + "observation_datetime", + "observation_datetime", + ) + elif t == drug_exposure: + cols = select( + t.c.drug_exposure_start_datetime.label("start_datetime"), + t.c.drug_exposure_end_datetime.label("end_datetime"), + t.c.quantity.label("value"), + t.c.route_concept_id, + ) + col_datetime_start, col_datetime_end = ( + "drug_exposure_start_datetime", + "drug_exposure_end_datetime", + ) + elif t == visit_occurrence: + cols = select( + t.c.visit_start_datetime.label("start_datetime"), + t.c.visit_end_datetime.label("end_datetime"), + ) + col_datetime_start, col_datetime_end = ( + "visit_start_datetime", + "visit_end_datetime", + ) + elif t == condition_occurrence: + cols = select( + t.c.condition_start_datetime.label("start_datetime"), + t.c.condition_end_datetime.label("end_datetime"), + ) + col_datetime_start, col_datetime_end = ( + "condition_start_datetime", + "condition_end_datetime", + ) + elif t == procedure_occurrence: + cols = select( + t.c.procedure_datetime.label("start_datetime"), + t.c.procedure_end_datetime.label("end_datetime"), + ) + col_datetime_start, col_datetime_end = ( + "procedure_datetime", + "procedure_end_datetime", + ) + else: + raise HTTPException(status_code=400, detail=f"No columns mapped for table {t}") + + query = cols.where( + and_( + t.c["person_id"] == person_id, + t.c[col_datetime_end] >= start_datetime, + t.c[col_datetime_start] <= end_datetime, + t.c[col_concept] == concept_id, + ) + ) + + # Execute query and return results + result = db.execute(query) + columns = result.keys() + rows = result.fetchall() + + return [dict(zip(columns, row)) for row in rows] + + ######################################### -# Person Data +# Person Data (for OMOP viewer) tables = { @@ -352,6 +561,103 @@ async def get_concepts( return data +@app.get("/full_day_coverage/{run_id}/", response_model=List[DayCoverage]) +def get_full_day_coverage_endpoint( + run_id: int, + person_id: int | None = None, + person_source_value: int | None = None, + valid_date: datetime.date = Query( + ..., description="The specified date to filter results." + ), + n_days: int = 10, + db: Session = Depends(get_db), +) -> list[DayCoverage]: + """ + Get full-day coverage intervals and combine with partial-day coverage entries + (filtered for cohort_category = 'POPULATION'), + including dates up to `n_days` before the specified valid_date. + """ + if person_id is not None and person_source_value is not None: + raise HTTPException( + status_code=400, + detail="Only one of person_id or person_source_value can be provided", + ) + + if n_days > 10 or n_days < 1: + raise HTTPException( + status_code=400, detail="n_days must be between 1 and 10 (inclusive)" + ) + + # Calculate date range + date_lower_bound = valid_date - datetime.timedelta(days=n_days) + + # Base filters + base_filters = and_( + full_day_coverage.c.run_id == run_id, + full_day_coverage.c.valid_date.between(date_lower_bound, valid_date), + or_( + and_( + full_day_coverage.c.criterion_id.is_(None), + full_day_coverage.c.pi_pair_id.is_(None), + full_day_coverage.c.cohort_category == "POPULATION_INTERVENTION", + ), + full_day_coverage.c.cohort_category == "BASE", + ), + ) + + partial_filters = and_( + partial_day_coverage.c.run_id == run_id, + partial_day_coverage.c.valid_date.between(date_lower_bound, valid_date), + partial_day_coverage.c.cohort_category == "POPULATION", + ) + + # Add person filters + if person_id: + base_filters = and_(base_filters, full_day_coverage.c.person_id == person_id) + partial_filters = and_( + partial_filters, partial_day_coverage.c.person_id == person_id + ) + elif person_source_value: + base_filters = and_( + base_filters, + full_day_coverage.c.person_id == person.c.person_id, + person.c.person_source_value == person_source_value, + ) + partial_filters = and_( + partial_filters, + partial_day_coverage.c.person_id == person.c.person_id, + person.c.person_source_value == person_source_value, + ) + + # Build queries for full_day and partial_day + full_day_query = select( + full_day_coverage.c.person_id, + full_day_coverage.c.cohort_category, + func.date(full_day_coverage.c.valid_date).label("valid_date"), + ).where(base_filters) + + partial_day_query = select( + partial_day_coverage.c.person_id, + partial_day_coverage.c.cohort_category, + func.date(partial_day_coverage.c.valid_date).label("valid_date"), + ).where(partial_filters) + + # Combine queries using UNION + combined_query = union(full_day_query, partial_day_query) + + # Execute the combined query + result = db.execute(combined_query) + rows = result.fetchall() + + if not rows: + raise HTTPException( + status_code=404, + detail=f"No coverage data found for run_id {run_id} within the specified date range.", + ) + # Convert rows into the response model + return rows + + if __name__ == "__main__": import uvicorn diff --git a/apps/viz-backend/app/models.py b/apps/viz-backend/app/models.py index 18f66b23..f4b76a93 100644 --- a/apps/viz-backend/app/models.py +++ b/apps/viz-backend/app/models.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import date, datetime from pydantic import BaseModel @@ -27,6 +27,9 @@ class RecommendationRun(BaseModel): observation_end_datetime: datetime run_datetime: datetime recommendation_name: str + recommendation_title: str + recommendation_url: str + recommendation_description: str class Interval(BaseModel): @@ -43,3 +46,13 @@ class Interval(BaseModel): interval_start: datetime interval_end: datetime cohort_category: str + + +class DayCoverage(BaseModel): + """ + Represents a single day of coverage in a recommendation run. + """ + + person_id: int + cohort_category: str + valid_date: date # Add this to match `view_full_day_coverage`