diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 188053a..6f1a4c2 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -40,7 +40,7 @@ dependencies = [ "oxrdflib", "SPARQLWrapper", "python-dotenv", - "decentriq_platform ==0.24.4", # TODO: conflict with pydantic 2 + "decentriq_platform >=0.26.0", # TODO: conflict with pydantic 2 "curies", # "pydantic >=2.0.0", # "pydantic-settings", diff --git a/backend/src/auth.py b/backend/src/auth.py index a23b933..8385c5c 100644 --- a/backend/src/auth.py +++ b/backend/src/auth.py @@ -118,14 +118,14 @@ async def auth_callback(code: str) -> RedirectResponse: # Check in payload if logged in user has the required permissions if ( - "https://explorer.icare4cvd.eu" - in access_payload["aud"] + "https://explorer.icare4cvd.eu" in access_payload["aud"] and "read:icare4cvd-dataset-descriptions" in access_payload["permissions"] ): # Reuse expiration time from decentriq Auth0 access token exp_timestamp = access_payload["exp"] jwt_token = create_access_token( - data={"email": id_payload["email"], "access_token": token["access_token"]}, expires_timestamp=exp_timestamp + data={"email": id_payload["email"], "access_token": token["access_token"]}, + expires_timestamp=exp_timestamp, ) # NOTE: Redirect to react frontend diff --git a/backend/src/decentriq.py b/backend/src/decentriq.py index 3b6ccfd..778ad34 100644 --- a/backend/src/decentriq.py +++ b/backend/src/decentriq.py @@ -2,7 +2,15 @@ from typing import Any import decentriq_platform as dq -import decentriq_platform.sql as dqsql + +# import decentriq_platform.sql as dqsql +from decentriq_platform.analytics import ( + AnalyticsDcrBuilder, + Column, + PrimitiveType, + PythonComputeNodeDefinition, + RawDataNodeDefinition, +) from fastapi import APIRouter, Depends, HTTPException from src.auth import get_current_user @@ -13,61 +21,56 @@ router = APIRouter() -def get_cohort_schema(cohort_dict: Cohort) -> list[tuple[str, dqsql.PrimitiveType, bool]]: +def get_cohort_schema(cohort_dict: Cohort) -> list[Column]: """Convert cohort variables to Decentriq schema""" schema = [] for variable_id, variable_info in cohort_dict.variables.items(): - prim_type = dqsql.PrimitiveType.STRING + prim_type = PrimitiveType.STRING if variable_info.var_type == "FLOAT": - prim_type = dqsql.PrimitiveType.FLOAT64 + prim_type = PrimitiveType.FLOAT if variable_info.var_type == "INT": - prim_type = dqsql.PrimitiveType.INT64 + prim_type = PrimitiveType.INTEGER nullable = bool(variable_info.na != 0) - schema.append((variable_id, prim_type, nullable)) + + schema.append(Column(name=variable_id, format_type=prim_type, is_nullable=nullable)) + # schema.append((variable_id, prim_type, nullable)) return schema # https://docs.decentriq.com/sdk/python-getting-started def create_provision_dcr(user: Any, cohort: Cohort) -> dict[str, Any]: """Initialize a Data Clean Room in Decentriq when a new cohort is uploaded""" - users = [user["email"]] - # Establish connection to Decentriq client = dq.create_client(settings.decentriq_email, settings.decentriq_token) - enclave_specs = dq.enclave_specifications.versions( - [ - "decentriq.driver:v20", - "decentriq.sql-worker:v12", - ] - ) - auth, _ = client.create_auth_using_decentriq_pki(enclave_specs) - session = client.create_session(auth, enclave_specs) # Creation of a Data Clean Room (DCR) - builder = dq.DataRoomBuilder(f"iCare4CVD DCR provision {cohort.cohort_id}", enclave_specs=enclave_specs) + dcr_title = f"iCARE4CVD DCR provision {cohort.cohort_id}" + builder = ( + AnalyticsDcrBuilder(client=client) + .with_name(dcr_title) + .with_owner(user["email"]) + .with_description(f"A data clean room to provision the data for the {cohort.cohort_id} cohort") + ) # Create data node for cohort - data_node_builder = dqsql.TabularDataNodeBuilder(cohort.cohort_id, schema=get_cohort_schema(cohort)) - data_node_builder.add_to_builder(builder, authentication=client.decentriq_pki_authentication, users=users) - - builder.add_user_permission( - email=user["email"], - authentication_method=client.decentriq_pki_authentication, - permissions=[dq.Permissions.update_data_room_status()], # To delete the DCR + data_node_id = cohort.cohort_id.replace(" ", "-") + builder.add_node_definition(RawDataNodeDefinition(name=data_node_id, is_required=True)) + # TODO: providing schema is broken in new SDK + # builder.add_node_definition(TableDataNodeDefinition(name=data_node_id, columns=get_cohort_schema(cohort), is_required=True)) + + builder.add_participant( + user["email"], + data_owner_of=[data_node_id], ) - # Build and publish DCR - data_room = builder.build() - data_room_id = session.publish_data_room(data_room) - - dcr_desc = client.get_data_room_description(data_room_id, enclave_specs) - dcr_url = f"https://platform.decentriq.com/datarooms/p/{data_room_id}" + dcr_definition = builder.build() + dcr = client.publish_analytics_dcr(dcr_definition) + dcr_url = f"https://platform.decentriq.com/datarooms/p/{dcr.id}" return { "message": f"Data Clean Room for {cohort.cohort_id} provisioned at {dcr_url}", "identifier": cohort.cohort_id, "dcr_url": dcr_url, - "dcr_title": dcr_desc["title"], - "dcr": dcr_desc, + "dcr_title": dcr_title, **cohort.dict(), } @@ -148,50 +151,46 @@ async def create_compute_dcr( # Establish connection to Decentriq client = dq.create_client(settings.decentriq_email, settings.decentriq_token) - enclave_specs = dq.enclave_specifications.versions( - [ - "decentriq.driver:v20", - "decentriq.sql-worker:v12", - # "decentriq.python-ml-worker-32-64:v21", - # "decentriq.r-latex-worker-32-32:v16", - ] - ) - auth, _ = client.create_auth_using_decentriq_pki(enclave_specs) - session = client.create_session(auth, enclave_specs) # Creation of a Data Clean Room (DCR) + data_nodes = [] dcr_count = len(client.get_data_room_descriptions()) - builder = dq.DataRoomBuilder(f"iCare4CVD DCR compute {dcr_count}", enclave_specs=enclave_specs) + dcr_title = f"iCARE4CVD DCR compute {dcr_count}" + builder = ( + AnalyticsDcrBuilder(client=client) + .with_name(dcr_title) + .with_owner(user["email"]) + .with_description("A data clean room to run computations on cohorts for the iCARE4CVD project") + ) + + # builder = dq.DataRoomBuilder(f"iCare4CVD DCR compute {dcr_count}", enclave_specs=enclave_specs) # Convert cohort variables to decentriq schema for cohort_id, cohort in selected_cohorts.items(): # Create data node for cohort - data_node_builder = dqsql.TabularDataNodeBuilder(cohort_id, schema=get_cohort_schema(cohort)) - data_node_builder.add_to_builder(builder, authentication=client.decentriq_pki_authentication, users=users) - - # Add empty list of permissions - builder.add_user_permission( - email=user["email"], - authentication_method=client.decentriq_pki_authentication, - permissions=[ - dq.Permissions.update_data_room_status(), # To delete the DCR - # dq.Permissions.leaf_crud(data_node_id), - # dq.Permissions.execute_compute(uppercase_text_node_id), - # dq.Permissions.retrieve_compute_result(uppercase_text_node_id), - ], + data_node_id = cohort_id.replace(" ", "-") + builder.add_node_definition(RawDataNodeDefinition(name=data_node_id, is_required=True)) + # TODO: providing schema is broken in new SDK + # builder.add_node_definition(TableDataNodeDefinition(name=data_node_id, columns=get_cohort_schema(cohort), is_required=True)) + data_nodes.append(data_node_id) + + # Add python data preparation script + builder.add_node_definition( + PythonComputeNodeDefinition(name="prepare-data", script=pandas_script, dependencies=data_nodes) ) - # Build and publish DCR - data_room = builder.build() - data_room_id = session.publish_data_room(data_room) + # Add users permissions + builder.add_participant(user["email"], data_owner_of=[data_node_id], analyst_of=["prepare-data"]) - dcr_desc = client.get_data_room_description(data_room_id, enclave_specs) - dcr_url = f"https://platform.decentriq.com/datarooms/p/{data_room_id}" + # Build and publish DCR + dcr_definition = builder.build() + dcr = client.publish_analytics_dcr(dcr_definition) + dcr_url = f"https://platform.decentriq.com/datarooms/p/{dcr.id}" return { "message": f"Data Clean Room available for compute at {dcr_url}", "dcr_url": dcr_url, - "dcr_title": dcr_desc["title"], - "dcr": dcr_desc, + "dcr_title": dcr_title, + # "dcr": dcr_desc, "merge_script": pandas_script, **cohorts_request, } diff --git a/backend/src/upload.py b/backend/src/upload.py index bbcce22..f7b87b6 100644 --- a/backend/src/upload.py +++ b/backend/src/upload.py @@ -2,13 +2,13 @@ import os import shutil from datetime import datetime -from typing import Any from re import sub +from typing import Any import pandas as pd import requests from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile -from rdflib import DCTERMS, XSD, Dataset, Graph, Literal, URIRef +from rdflib import XSD, Dataset, Graph, Literal, URIRef from rdflib.namespace import DC, RDF, RDFS from SPARQLWrapper import SPARQLWrapper @@ -146,10 +146,29 @@ def parse_categorical_string(s: str) -> list[dict[str, str]]: return result -COLUMNS_LIST = ["VARIABLE NAME", "VARIABLE LABEL", "VAR TYPE", "UNITS", "CATEGORICAL", "COUNT", "NA", "MIN", "MAX", "Definition", "Formula", "ICD-10", "ATC-DDD", "LOINC", "SNOMED-CT", "OMOP", "Visits"] +COLUMNS_LIST = [ + "VARIABLE NAME", + "VARIABLE LABEL", + "VAR TYPE", + "UNITS", + "CATEGORICAL", + "COUNT", + "NA", + "MIN", + "MAX", + "Definition", + "Formula", + "ICD-10", + "ATC-DDD", + "LOINC", + "SNOMED-CT", + "OMOP", + "Visits", +] ACCEPTED_DATATYPES = ["STR", "FLOAT", "INT", "DATETIME"] ID_COLUMNS_NAMESPACES = {"ICD-10": "icd10", "SNOMED-CT": "snomedct", "ATC-DDD": "atc", "LOINC": "loinc"} + def create_uri_from_id(row): """Build concepts URIs from the ID provided in the various columns of the data dictionary""" uris_list = [] @@ -158,16 +177,18 @@ def create_uri_from_id(row): if row[column]: if "," in str(row[column]): # Handle list of IDs separated by comma ids = str(row[column]).split(",") - uris_list.extend([converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{identif.strip()}") for identif in ids]) + uris_list.extend( + [converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{identif.strip()}") for identif in ids] + ) else: uris_list.append(converter.expand(f"{ID_COLUMNS_NAMESPACES[column]}:{str(row[column]).strip()}")) return ", ".join(uris_list) - def to_camelcase(s: str) -> str: - s = sub(r"(_|-)+", " ", s).title().replace(" ", "") - return ''.join([s[0].lower(), s[1:]]) + s = sub(r"(_|-)+", " ", s).title().replace(" ", "") + return "".join([s[0].lower(), s[1:]]) + def load_cohort_dict_file(dict_path: str, cohort_id: str, user_email: str) -> Dataset: """Parse the cohort dictionary uploaded as excel or CSV spreadsheet, and load it to the triplestore""" @@ -405,7 +426,10 @@ def init_triplestore() -> None: g = init_graph(onto_graph_uri) ntriple_g = init_graph() # ntriple_g.parse("https://raw.githubusercontent.com/vemonet/omop-cdm-owl/main/omop_cdm_v6.ttl", format="turtle") - ntriple_g.parse("https://raw.githubusercontent.com/MaastrichtU-IDS/cohort-explorer/main/cohort-explorer-ontology.ttl", format="turtle") + ntriple_g.parse( + "https://raw.githubusercontent.com/MaastrichtU-IDS/cohort-explorer/main/cohort-explorer-ontology.ttl", + format="turtle", + ) # Trick to convert ntriples to nquads with a given graph for s, p, o in ntriple_g.triples((None, None, None)): g.add((s, p, o, onto_graph_uri))