diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cbee72..8cb7e5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +# Samply.Focus v0.8.0 2024-11-04 + +In this release, we are supporting 4 types of SQL queries for Exliquid and Organoids + +## Major changes +* Allowlist of SQL queries + + +# Samply.Focus v0.7.0 2024-09-24 + +In this release, we are extending the supported data backends beyond CQL-enabled FHIR stores. We now support PostgreSQL as well. Usage instructions are included in the Readme. + +## Major changes +* PostgreSQL support added + + + # Focus -- 2023-02-08 This is the initial release of Focus, a task distribution application designed for working with Samply.Beam. Currently, only Samply.Blaze is supported as an endpoint, but other endpoints can easily be integrated. diff --git a/Cargo.toml b/Cargo.toml index ee17b0b..b9d6527 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "focus" -version = "0.6.0" +version = "0.9.0" edition = "2021" license = "Apache-2.0" @@ -11,34 +11,37 @@ base64 = "0.22.1" reqwest = { version = "0.12", default-features = false, features = ["json", "default-tls"] } serde = { version = "1.0.152", features = ["serde_derive"] } serde_json = "1.0" -thiserror = "1.0.38" +thiserror = "2.0.3" chrono = "0.4.31" indexmap = "2.1.0" -tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] } +tokio = { version = "1.25.0", default-features = false, features = ["signal", "rt-multi-thread", "macros"] } beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] } -laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } +laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.4.0" } uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } -sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } -sqlx-pgrow-serde = "0.2.0" tryhard = "0.5" # Logging -tracing = { version = "0.1.37", default_features = false } -tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] } +tracing = { version = "0.1.37", default-features = false } +tracing-subscriber = { version = "0.3.11", default-features = false, features = ["env-filter", "ansi"] } # Global variables once_cell = "1.18" # Command Line Interface -clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } +clap = { version = "4", default-features = false, features = ["std", "env", "derive", "help", "color"] } + +# Query via SQL +sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono", "rust_decimal", "uuid"], optional = true } +kurtbuilds_sqlx_serde = { version = "0.3.2", features = [ "json", "decimal", "chrono", "uuid"], optional = true } [features] default = [] bbmri = [] -dktk = [] +dktk = ["query-sql"] +query-sql = ["dep:sqlx", "dep:kurtbuilds_sqlx_serde"] [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/README.md b/README.md index 1718f1a..54a5828 100644 --- a/README.md +++ b/README.md @@ -51,10 +51,19 @@ PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base64 encoded queries whose results are to be cached. If not set, no results are cached PROVIDER = "name" #EUCAIM provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon -AUTH_HEADER = "ApiKey XXXX" #Authorization header +AUTH_HEADER = "ApiKey XXXX" #Authorization header; if the endpoint type is Blaze or BlazeAndSql, this header is used for the Exporter target application, and the syntax is AUTH_HEADER = "XXXX" where "XXXX" is the API key +``` + +In order to use Postgres querying, a Docker image built with the feature "dktk" needs to be used and this optional variable set: +```bash POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string ``` +Additionally when using Postgres this optional variable can be set: +```bash +MAX_DB_ATTEMPTS = "8" # Max number of attempts to connect to the database; default value: 8 +``` + Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable to add additional trusted certificates, e.g., if you have a TLS-terminating proxy server in place. The application respects the `HTTP_PROXY`, `HTTPS_PROXY`, `ALL_PROXY`, `NO_PROXY`, and their respective lowercase equivalents. @@ -81,9 +90,9 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` -Creating a sample SQL task for a `SELECT_TABLES` query using curl: +Creating a sample SQL task for a `SELECT_TEST` query using curl: ```bash - curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks + curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RFU1QifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` Creating a sample [Exporter](https://github.com/samply/exporter) "execute" task containing an Exporter query using curl: diff --git a/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER b/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER new file mode 100644 index 0000000..ffd36d2 --- /dev/null +++ b/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER @@ -0,0 +1,5 @@ +define Encounter: +if InInitialPopulation then [Encounter] else {} as List + +define function Departments(encounter FHIR.Encounter): +encounter.identifier.where(system = 'http://dktk.dkfz.de/fhir/sid/hki-department').value.first() diff --git a/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER b/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER new file mode 100644 index 0000000..835ee49 --- /dev/null +++ b/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER @@ -0,0 +1,5 @@ +define MedicationStatement: +if InInitialPopulation then [MedicationStatement] else {} as List + +define function AppliedMedications(medication FHIR.MedicationStatement): +medication.medication.coding.code.last() diff --git a/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER b/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER new file mode 100644 index 0000000..75534e4 --- /dev/null +++ b/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER @@ -0,0 +1,8 @@ +define Specimen: +if InInitialPopulation then [Specimen] else {} as List + +define function SampleType(specimen FHIR.Specimen): +specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first() + +define function SampleSubtype(specimen FHIR.Specimen): +specimen.type.text.first() diff --git a/resources/cql/DKTK_STRAT_AGE_STRATIFIER b/resources/cql/DKTK_STRAT_AGE_STRATIFIER index 6cb9744..9efc998 100644 --- a/resources/cql/DKTK_STRAT_AGE_STRATIFIER +++ b/resources/cql/DKTK_STRAT_AGE_STRATIFIER @@ -4,5 +4,12 @@ from [Condition] C where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() and C.onset is not null sort by date from onset asc) +define FirstDiagnosis: +First( +from [Condition] C +sort by date from onset asc) + define AgeClass: -if (PrimaryDiagnosis.onset is null) then 'unknown' else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10) +if (PrimaryDiagnosis.onset is null) +then ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(FirstDiagnosis.onset)) div 10) * 10) +else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10) diff --git a/resources/cql/DKTK_STRAT_GENETIC_VARIANT b/resources/cql/DKTK_STRAT_GENETIC_VARIANT new file mode 100644 index 0000000..1021262 --- /dev/null +++ b/resources/cql/DKTK_STRAT_GENETIC_VARIANT @@ -0,0 +1,5 @@ +define GeneticVariantCount: +if InInitialPopulation then [Observation: Code '69548-6' from loinc] else {} as List + +define GeneticVariantCode: +First (from [Observation: Code '69548-6' from loinc] O return O.component.where(code.coding contains Code '48018-6' from loinc).value.coding.code.first()) diff --git a/resources/sql/EXLIQUID_SAMPLE_3LEVELS b/resources/sql/EXLIQUID_SAMPLE_3LEVELS new file mode 100644 index 0000000..35597db --- /dev/null +++ b/resources/sql/EXLIQUID_SAMPLE_3LEVELS @@ -0,0 +1,43 @@ +/* +Exliquid query for sites with 'legacy' exliquid specimen documentation (3 level hierarchy versus 'virtual' mother sample). +For current expected documentation see: https://wiki.verbis.dkfz.de/pages/viewpage.action?pageId=294716167. +*/ +with t as ( + select + (s.resource ->> 'id')::text s_id, + (s_coding ->> 'code')::text sample_type + from specimen s, jsonb_array_elements(s.resource -> 'type' -> 'coding') as s_coding + where s_coding ->> 'system' = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType' +), +t2 as ( + SELECT + s_ali.resource ->> 'id' s_ali_id, + sample_type_ali.sample_type as s_ali_type, + (s_ali.resource -> 'container' -> 0 -> 'specimenQuantity' ->> 'value')::float s_ali_amountrest, + s_ali_grp.resource ->> 'id' s_ali_grp_id, + sample_type_ali_grp.sample_type as s_ali_grp_type, + (s_ali_grp.resource -> 'container' -> 0 -> 'specimenQuantity' ->> 'value')::float s_ali_grp_amountrest, + s_mother.resource ->> 'id' s_mother_id, + sample_type_mother.sample_type as s_mother_type, + (s_mother.resource -> 'container' -> 0 -> 'specimenQuantity' ->> 'value')::float s_mother_amountrest, + s_mother.resource -> 'subject' ->> 'reference' as patient_id + FROM specimen s_ali + JOIN specimen s_ali_grp ON (s_ali.resource->'parent'->0->>'reference')::text = (s_ali_grp.resource->>'resourceType')::text || '/' || (s_ali_grp.resource->>'id')::text + JOIN specimen s_mother ON (s_ali_grp.resource->'parent'->0->>'reference')::text = (s_mother.resource->>'resourceType')::text || '/' || (s_mother.resource->>'id')::text + join t as sample_type_ali on s_ali.resource ->> 'id' = sample_type_ali.s_id + join t as sample_type_ali_grp on s_ali_grp.resource ->> 'id' = sample_type_ali_grp.s_id + join t as sample_type_mother on s_mother.resource ->> 'id' = sample_type_mother.s_id + where (s_ali.resource -> 'container' -> 0 -> 'specimenQuantity' ->> 'value')::float > 0 +), +t3 as ( +select distinct + t2.patient_id, + c.resource -> 'code' -> 'coding' -> 0 ->> 'code' icd10_code, + c.resource -> 'code' ->> 'text' diag_desc, + t2.s_mother_type +from t2 +join condition c on t2.patient_id = c.resource -> 'subject' ->> 'reference' +) +select icd10_code, diag_desc, count(distinct patient_id) patient_count, s_mother_type, count(s_mother_type) sample_count +from t3 +group by icd10_code, diag_desc, patient_id, s_mother_type; \ No newline at end of file diff --git a/resources/sql/SELECT_TABLES b/resources/sql/SELECT_TABLES deleted file mode 100644 index c59f3b3..0000000 --- a/resources/sql/SELECT_TABLES +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM pg_catalog.pg_tables \ No newline at end of file diff --git a/resources/sql/SELECT_TEST b/resources/sql/SELECT_TEST new file mode 100644 index 0000000..8f90872 --- /dev/null +++ b/resources/sql/SELECT_TEST @@ -0,0 +1 @@ +SELECT 10 AS VALUE, quote_literal('Hello Rustaceans') AS GREETING, 4.7 as FLOATY, CURRENT_DATE AS TODAY; \ No newline at end of file diff --git a/resources/sql/SIORGP_PUBLIC_MAIN b/resources/sql/SIORGP_PUBLIC_MAIN new file mode 100644 index 0000000..d15737f --- /dev/null +++ b/resources/sql/SIORGP_PUBLIC_MAIN @@ -0,0 +1,78 @@ +/* +SIorgP MetPredict project +The approach chosen here is to minimize the number of tasks generated and thus network traffic via Beam +=> one large query that returns the most necessary fields over multiple smaller queries +*/ +with t as ( +select +o.resource->'subject'->>'reference' as pat_ref, +o.resource->'code'->'coding'->0->>'code' as crf, + component->'code'->'coding'->0->>'code' AS code, + COALESCE( + component->'valueCodeableConcept'->'coding'->0->>'code', + component->>'valueDateTime', + component->'valueQuantity'->>'value', + component->>'valueString' + ) AS value +FROM + observation o , + jsonb_array_elements(o.resource->'component') AS component +where o.resource->'code'->'coding'->0->>'code' like 'SIOrgP%' +), +t2 AS ( +select t.value as pat_pseudonym, + -- t.crf, + p.resource->>'gender' as gender, + p.resource->>'birthDate' as birth_date, + t5.value as organoid_id, + t2.value as location_primary_tumor, + t7.value as location_primary_tumor_precise, + t3.value as therapy, + t4.value as metastases_therapy, + t6.value::integer as age_at_enrollment +from t +left join t t2 on t.pat_ref = t2.pat_ref and t2.code='SIOP_LOCALISATION_PRIMARY_TUMOR' +left join t t3 on t.pat_ref = t3.pat_ref and t3.code='SIOP_NEOADJ_T_RECTAL_CARCINOMA' +left join t t4 on t.pat_ref = t4.pat_ref and t4.code='SIOP_NEOADJ_CTX_MET' +left join t t5 on t.pat_ref = t5.pat_ref and t5.code like 'SIOP_SAMPLE_M0%_PSEUDONYM' +left join t t6 on t.pat_ref = t6.pat_ref and t6.code='SIOP_AGE_STUDY_ENROLLMENT' +left join t t7 on t.pat_ref = t7.pat_ref and t7.code='SIOP_LOCALISATION_PRIMARY_TUMOR_COLON' +left join patient p on t.pat_ref = 'Patient/' || (p.resource->>'id')::text +where t.crf like 'SIOrgP - MetPredict - Visite 1%' and t.code = 'SIOP_PATIENT_PSEUDONYM' +), +t8 as ( + select pat_pseudonym, count(distinct organoid_id) n_organoids + from t2 + group by pat_pseudonym +) +-- patients having <= 3 organoids +select 'MetPredict' as project, 'pat_pdos_leq_3' as field, (select count(distinct pat_pseudonym) from t8 where n_organoids <= 3) as value +union +-- patients having 4 organoids +select 'MetPredict' as project, 'pat_pdos_4' as field, (select count(distinct pat_pseudonym) from t8 where n_organoids = 4) as value +union +-- patients having 5 organoids +select 'MetPredict' as project, 'pat_pdos_5' as field, (select count(distinct pat_pseudonym) from t8 where n_organoids = 5) as value +union +-- patients having > 5 organoids +select 'MetPredict' as project, 'pat_pdos_gt_5' as field, (select count(distinct pat_pseudonym) from t8 where n_organoids > 5) as value +union +-- the total number of patients +select 'MetPredict' as project, 'n_patients' as field, (select count(distinct pat_pseudonym) from t2) as value +union +-- the total number of organoids +select 'MetPredict' as project, 'n_organoids' as field, (select count(distinct organoid_id) from t2) as value +union +select 'MetPredict' as project, 'gender_male' as field, (select count(distinct pat_pseudonym) from t2 where gender = 'male') as value +union +select 'MetPredict' as project, 'gender_female' as field, (select count(distinct pat_pseudonym) from t2 where gender = 'female') as value +union +select 'MetPredict' as project, '<=30' as field, (select count(distinct pat_pseudonym) from t2 where age_at_enrollment <= 30) as value +union +select 'MetPredict' as project, '31-40' as field, (select count(distinct pat_pseudonym) from t2 where age_at_enrollment >= 31 and age_at_enrollment <= 40) as value +union +select 'MetPredict' as project, '41-50' as field, (select count(distinct pat_pseudonym) from t2 where age_at_enrollment >= 41 and age_at_enrollment <= 50) as value +union +select 'MetPredict' as project, '51-60' as field, (select count(distinct pat_pseudonym) from t2 where age_at_enrollment >= 51 and age_at_enrollment <= 60) as value +union +select 'MetPredict' as project, '>=61' as field, (select count(distinct pat_pseudonym) from t2 where age_at_enrollment >= 61) as value; diff --git a/resources/sql/SIORGP_PUBLIC_NPAT b/resources/sql/SIORGP_PUBLIC_NPAT new file mode 100644 index 0000000..609feb7 --- /dev/null +++ b/resources/sql/SIORGP_PUBLIC_NPAT @@ -0,0 +1,5 @@ +-- Test query. Number of patients that have a documented visit 1 +select count(distinct p.resource) +from observation o +join patient p on o.resource->'subject'->>'reference' = 'Patient/' || (p.resource->>'id')::text +where o.resource->'code'->'coding'->0->>'code' like 'SIOrgP - MetPredict - Visite 1%'; \ No newline at end of file diff --git a/resources/sql/SIORGP_PUBLIC_NVISIT2B b/resources/sql/SIORGP_PUBLIC_NVISIT2B new file mode 100644 index 0000000..72d5058 --- /dev/null +++ b/resources/sql/SIORGP_PUBLIC_NVISIT2B @@ -0,0 +1,4 @@ +-- Test query: Number of observations for visit 2b as a lower bound for the number of expected organoids +select count(o) +from observation o +where o.resource->'code'->'coding'->0->>'code' like 'SIOrgP - MetPredict - Visite 2b%'; \ No newline at end of file diff --git a/resources/test/result_current.cql b/resources/test/result_current.cql new file mode 100644 index 0000000..ca99e6a --- /dev/null +++ b/resources/test/result_current.cql @@ -0,0 +1,89 @@ +library Retrieve +using FHIR version '4.0.0' +include FHIRHelpers version '4.0.0' + +codesystem icd10: 'http://hl7.org/fhir/sid/icd-10' +codesystem SampleMaterialType: 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType' +codesystem icd10gm: 'http://fhir.de/CodeSystem/dimdi/icd-10-gm' +codesystem icd10gmnew: 'http://fhir.de/CodeSystem/bfarm/icd-10-gm' +codesystem StorageTemperature: 'https://fhir.bbmri.de/CodeSystem/StorageTemperature' + + +context Patient + +define AgeClass: +if (Patient.birthDate is null) then 'unknown' else ToString((AgeInYears() div 10) * 10) + +define Gender: +if (Patient.gender is null) then 'unknown' else Patient.gender + +define Custodian: + First(from Specimen.extension E + where E.url = 'https://fhir.bbmri.de/StructureDefinition/Custodian' + return (E.value as Reference).identifier.value) + +define function SampleType(specimen FHIR.Specimen): + case FHIRHelpers.ToCode(specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').first()) + when Code 'plasma-edta' from SampleMaterialType then 'blood-plasma' + when Code 'plasma-citrat' from SampleMaterialType then 'blood-plasma' + when Code 'plasma-heparin' from SampleMaterialType then 'blood-plasma' + when Code 'plasma-cell-free' from SampleMaterialType then 'blood-plasma' + when Code 'plasma-other' from SampleMaterialType then 'blood-plasma' + when Code 'plasma' from SampleMaterialType then 'blood-plasma' + when Code 'tissue-formalin' from SampleMaterialType then 'tissue-ffpe' + when Code 'tumor-tissue-ffpe' from SampleMaterialType then 'tissue-ffpe' + when Code 'normal-tissue-ffpe' from SampleMaterialType then 'tissue-ffpe' + when Code 'other-tissue-ffpe' from SampleMaterialType then 'tissue-ffpe' + when Code 'tumor-tissue-frozen' from SampleMaterialType then 'tissue-frozen' + when Code 'normal-tissue-frozen' from SampleMaterialType then 'tissue-frozen' + when Code 'other-tissue-frozen' from SampleMaterialType then 'tissue-frozen' + when Code 'tissue-paxgene-or-else' from SampleMaterialType then 'tissue-other' + when Code 'derivative' from SampleMaterialType then 'derivative-other' + when Code 'liquid' from SampleMaterialType then 'liquid-other' + when Code 'tissue' from SampleMaterialType then 'tissue-other' + when Code 'serum' from SampleMaterialType then 'blood-serum' + when Code 'cf-dna' from SampleMaterialType then 'dna' + when Code 'g-dna' from SampleMaterialType then 'dna' + when Code 'blood-plasma' from SampleMaterialType then 'blood-plasma' + when Code 'tissue-ffpe' from SampleMaterialType then 'tissue-ffpe' + when Code 'tissue-frozen' from SampleMaterialType then 'tissue-frozen' + when Code 'tissue-other' from SampleMaterialType then 'tissue-other' + when Code 'derivative-other' from SampleMaterialType then 'derivative-other' + when Code 'liquid-other' from SampleMaterialType then 'liquid-other' + when Code 'blood-serum' from SampleMaterialType then 'blood-serum' + when Code 'dna' from SampleMaterialType then 'dna' + when Code 'buffy-coat' from SampleMaterialType then 'buffy-coat' + when Code 'urine' from SampleMaterialType then 'urine' + when Code 'ascites' from SampleMaterialType then 'ascites' + when Code 'saliva' from SampleMaterialType then 'saliva' + when Code 'csf-liquor' from SampleMaterialType then 'csf-liquor' + when Code 'bone-marrow' from SampleMaterialType then 'bone-marrow' + when Code 'peripheral-blood-cells-vital' from SampleMaterialType then 'peripheral-blood-cells-vital' + when Code 'stool-faeces' from SampleMaterialType then 'stool-faeces' + when Code 'rna' from SampleMaterialType then 'rna' + when Code 'whole-blood' from SampleMaterialType then 'whole-blood' + when Code 'swab' from SampleMaterialType then 'swab' + when Code 'dried-whole-blood' from SampleMaterialType then 'dried-whole-blood' + when null then 'Unknown' + else 'Unknown' + end +define Specimen: + if InInitialPopulation then [Specimen] S where (((((FHIRHelpers.ToDateTime(S.collection.collected) between @1900-01-01 and @2024-10-25) )) and (((((S.extension.where(url='https://fhir.bbmri.de/StructureDefinition/StorageTemperature').value.coding.code contains 'temperature2to10'))))))) else {} as List + +define Diagnosis: +if InInitialPopulation then [Condition] else {} as List + +define function DiagnosisCode(condition FHIR.Condition): +condition.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first() + +define function DiagnosisCode(condition FHIR.Condition, specimen FHIR.Specimen): +Coalesce( + condition.code.coding.where(system = 'http://hl7.org/fhir/sid/icd-10').code.first(), + condition.code.coding.where(system = 'http://fhir.de/CodeSystem/dimdi/icd-10-gm').code.first(), + condition.code.coding.where(system = 'http://fhir.de/CodeSystem/bfarm/icd-10-gm').code.first(), + specimen.extension.where(url='https://fhir.bbmri.de/StructureDefinition/SampleDiagnosis').value.coding.code.first() + ) + +define InInitialPopulation: +((((((Patient.gender = 'male')))) and ((((((exists[Condition: Code 'C61' from icd10]) or (exists[Condition: Code 'C61' from icd10gm]) or (exists[Condition: Code 'C61' from icd10gmnew])) or (exists from [Specimen] S where (S.extension.where(url='https://fhir.bbmri.de/StructureDefinition/SampleDiagnosis').value.coding.code contains 'C61')))))) and (( AgeInYears() between Ceiling(10) and Ceiling(90)))) or (((exists from [Specimen] S +where FHIRHelpers.ToDateTime(S.collection.collected) between @1900-01-01 and @2024-10-25 )) and ((((exists from [Specimen] S where (S.extension.where(url='https://fhir.bbmri.de/StructureDefinition/StorageTemperature').value.coding contains Code 'temperature2to10' from StorageTemperature) )))))) \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 7a09748..d45a408 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,7 +20,9 @@ pub enum Obfuscate { pub enum EndpointType { Blaze, Omop, + #[cfg(feature = "query-sql")] BlazeAndSql, + #[cfg(feature = "query-sql")] Sql, } @@ -29,7 +31,9 @@ impl fmt::Display for EndpointType { match self { EndpointType::Blaze => write!(f, "blaze"), EndpointType::Omop => write!(f, "omop"), + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"), + #[cfg(feature = "query-sql")] EndpointType::Sql => write!(f, "sql"), } } @@ -158,9 +162,15 @@ struct CliArgs { #[clap(long, env, value_parser)] auth_header: Option, - /// Database connection string + /// Postgres connection string + #[cfg(feature = "query-sql")] #[clap(long, env, value_parser)] postgres_connection_string: Option, + + /// Max number of attempts to connect to the database + #[cfg(feature = "query-sql")] + #[clap(long, env, value_parser, default_value = "8")] + max_db_attempts: u32, } pub(crate) struct Config { @@ -188,7 +198,10 @@ pub(crate) struct Config { pub provider: Option, pub provider_icon: Option, pub auth_header: Option, + #[cfg(feature = "query-sql")] pub postgres_connection_string: Option, + #[cfg(feature = "query-sql")] + pub max_db_attempts: u32, } impl Config { @@ -230,7 +243,10 @@ impl Config { provider: cli_args.provider, provider_icon: cli_args.provider_icon, auth_header: cli_args.auth_header, + #[cfg(feature = "query-sql")] postgres_connection_string: cli_args.postgres_connection_string, + #[cfg(feature = "query-sql")] + max_db_attempts: cli_args.max_db_attempts, client, }; Ok(config) diff --git a/src/cql.rs b/src/cql.rs index 0468dfe..fc9f2b2 100644 --- a/src/cql.rs +++ b/src/cql.rs @@ -7,7 +7,7 @@ use crate::projects::{ use base64::{prelude::BASE64_STANDARD as BASE64, Engine as _}; use chrono::offset::Utc; -use chrono::DateTime; +use chrono::{DateTime, NaiveDate, NaiveTime}; use indexmap::set::IndexSet; use tracing::info; use uuid::Uuid; @@ -146,9 +146,21 @@ pub fn process( match condition.value { ast::ConditionValue::DateRange(date_range) => { let datetime_str_min = date_range.min.as_str(); - let datetime_min: DateTime = datetime_str_min - .parse() - .map_err(|_| FocusError::AstInvalidDateFormat(date_range.min))?; + + let datetime_min_maybe: Result, _> = datetime_str_min.parse(); + + let datetime_min: DateTime = if let Ok(datetime) = datetime_min_maybe { + datetime + } else { + let naive_date_maybe = NaiveDate::parse_from_str(datetime_str_min, "%Y-%m-%d"); //FIXME remove once Lens2 behaves, only return the error + + if let Ok(naive_date) = naive_date_maybe { + DateTime::::from_naive_utc_and_offset(naive_date.and_time(NaiveTime::default()), Utc) + } else { + return Err(FocusError::AstInvalidDateFormat(date_range.min)); + } + }; + let date_str_min = format!("@{}", datetime_min.format("%Y-%m-%d")); condition_string = @@ -156,15 +168,26 @@ pub fn process( filter_string = filter_string.replace("{{D1}}", date_str_min.as_str()); // no condition needed, "" stays "" - let datetime_max: DateTime = date_range.max - .as_str() - .parse() - .map_err(|_| FocusError::AstInvalidDateFormat(date_range.max))?; + let datetime_str_max = date_range.max.as_str(); + let datetime_max_maybe: Result, _> = datetime_str_max.parse(); + + let datetime_max: DateTime = if let Ok(datetime) = datetime_max_maybe { + datetime + } else { + let naive_date_maybe = NaiveDate::parse_from_str(datetime_str_max, "%Y-%m-%d"); //FIXME remove once Lens2 behaves, only return the error + + if let Ok(naive_date) = naive_date_maybe { + DateTime::::from_naive_utc_and_offset(naive_date.and_time(NaiveTime::default()), Utc) + } else { + return Err(FocusError::AstInvalidDateFormat(date_range.max)); + } + }; let date_str_max = format!("@{}", datetime_max.format("%Y-%m-%d")); condition_string = condition_string.replace("{{D2}}", date_str_max.as_str()); filter_string = filter_string.replace("{{D2}}", date_str_max.as_str()); + // no condition needed, "" stays "" } ast::ConditionValue::NumRange(num_range) => { @@ -176,6 +199,7 @@ pub fn process( filter_string.replace("{{D1}}", num_range.min.to_string().as_str()); // no condition needed, "" stays "" filter_string = filter_string.replace("{{D2}}", num_range.max.to_string().as_str()); + // no condition needed, "" stays "" } other => { @@ -228,6 +252,7 @@ pub fn process( if !filter_string.is_empty() { filter_string = filter_humongous_string + ")"; } + } other => { return Err(FocusError::AstOperatorValueMismatch(format!( @@ -273,6 +298,7 @@ pub fn process( if !filter_string.is_empty() { filter_string = filter_humongous_string + ")"; } + } other => { return Err(FocusError::AstOperatorValueMismatch(format!( @@ -294,6 +320,7 @@ pub fn process( } filter_cond += filter_string.as_str(); // no condition needed, "" can be added with no change + } ast::Child::Operation(operation) => { @@ -315,9 +342,13 @@ pub fn process( retrieval_cond += operator_str; if !filter_cond.is_empty() { filter_cond += operator_str; + } } } + if let Some(pos) = filter_cond.rfind(')') { + _ = filter_cond.split_off(pos + 1); + } } } @@ -359,8 +390,9 @@ mod test { const LENS2: &str = r#"{"ast":{"children":[{"children":[{"children":[{"key":"gender","system":"","type":"EQUALS","value":"male"},{"key":"gender","system":"","type":"EQUALS","value":"female"}],"operand":"OR"},{"children":[{"key":"diagnosis","system":"","type":"EQUALS","value":"C41"},{"key":"diagnosis","system":"","type":"EQUALS","value":"C50"}],"operand":"OR"},{"children":[{"key":"sample_kind","system":"","type":"EQUALS","value":"tissue-frozen"},{"key":"sample_kind","system":"","type":"EQUALS","value":"blood-serum"}],"operand":"OR"}],"operand":"AND"},{"children":[{"children":[{"key":"gender","system":"","type":"EQUALS","value":"male"}],"operand":"OR"},{"children":[{"key":"diagnosis","system":"","type":"EQUALS","value":"C41"},{"key":"diagnosis","system":"","type":"EQUALS","value":"C50"}],"operand":"OR"},{"children":[{"key":"sample_kind","system":"","type":"EQUALS","value":"liquid-other"},{"key":"sample_kind","system":"","type":"EQUALS","value":"rna"},{"key":"sample_kind","system":"","type":"EQUALS","value":"urine"}],"operand":"OR"},{"children":[{"key":"storage_temperature","system":"","type":"EQUALS","value":"temperatureRoom"},{"key":"storage_temperature","system":"","type":"EQUALS","value":"four_degrees"}],"operand":"OR"}],"operand":"AND"}],"operand":"OR"},"id":"a6f1ccf3-ebf1-424f-9d69-4e5d135f2340"}"#; - const EMPTY: &str = - r#"{"ast":{"children":[],"operand":"OR"}, "id":"a6f1ccf3-ebf1-424f-9d69-4e5d135f2340"}"#; + const EMPTY: &str = r#"{"ast":{"children":[],"operand":"OR"}, "id":"a6f1ccf3-ebf1-424f-9d69-4e5d135f2340"}"#; + + const CURRENT: &str = r#"{"ast":{"operand":"OR","children":[{"operand":"AND","children":[{"operand":"OR","children":[{"key":"gender","type":"EQUALS","system":"","value":"male"}]},{"operand":"OR","children":[{"key":"diagnosis","type":"EQUALS","system":"http://fhir.de/CodeSystem/dimdi/icd-10-gm","value":"C61"}]},{"operand":"OR","children":[{"key":"donor_age","type":"BETWEEN","system":"","value":{"min":10,"max":90}}]}]},{"operand":"AND","children":[{"operand":"OR","children":[{"key":"sampling_date","type":"BETWEEN","system":"","value":{"min":"1900-01-01","max":"2024-10-25"}}]},{"operand":"OR","children":[{"key":"storage_temperature","type":"EQUALS","system":"","value":"temperature2to10"}]}]}]},"id":"53b4414e-75e4-401b-b794-20a2936e1be5"}"#; #[test] fn test_common() { @@ -411,6 +443,12 @@ mod test { generate_cql(serde_json::from_str(EMPTY).unwrap()).unwrap(), include_str!("../resources/test/result_empty.cql").to_string() ); + + pretty_assertions::assert_eq!( + generate_cql(serde_json::from_str(CURRENT).unwrap()).unwrap(), + include_str!("../resources/test/result_current.cql").to_string() + ); + } #[test] diff --git a/src/db.rs b/src/db.rs index fad0553..a58a69d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,7 +2,7 @@ use crate::errors::FocusError; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; -use sqlx_pgrow_serde::SerMapPgRow; +use sqlx_serde::SerMapPgRow; use std::{collections::HashMap, time::Duration}; use tracing::{warn, info, debug}; @@ -14,7 +14,7 @@ pub struct SqlQuery { include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); -pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result { +pub async fn get_pg_connection_pool(pg_url: &str, max_db_attempts: u32) -> Result { info!("Trying to establish a PostgreSQL connection pool"); tryhard::retry_fn(|| async { @@ -28,7 +28,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result

Result, FocusEr sqlx::query(query) .fetch_all(pool) .await - .map_err(FocusError::ErrorExecutingQuery) + .map_err(FocusError::ErrorExecutingSqlQuery) } pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result, FocusError> { @@ -62,3 +62,27 @@ pub fn serialize_rows(rows: Vec) -> Result { Ok(Value::Array(rows_json)) } + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + #[ignore] //TODO mock DB + async fn serialize() { + let pool = + get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) + .await + .unwrap(); + + let rows = run_query(&pool, SQL_REPLACE_MAP.get("SELECT_TEST").unwrap()) + .await + .unwrap(); + + let rows_json = serialize_rows(rows).unwrap(); + + assert!(rows_json.is_array()); + + assert_ne!(rows_json[0]["floaty"], Value::Null); + } +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs index 8cccda1..1470b09 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -63,12 +63,13 @@ pub enum FocusError { MissingExporterTaskType, #[error("Cannot connect to database: {0}")] CannotConnectToDatabase(String), - #[error("Error executing query: {0}")] - ErrorExecutingQuery(sqlx::Error), #[error("QueryResultBad: {0}")] QueryResultBad(String), #[error("Query not allowed: {0}")] QueryNotAllowed(String), + #[cfg(feature = "query-sql")] + #[error("Error executing SQL query: {0}")] + ErrorExecutingSqlQuery(sqlx::Error), } impl FocusError { diff --git a/src/main.rs b/src/main.rs index 75528bb..6aa85b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,20 +8,21 @@ mod errors; mod graceful_shutdown; mod logger; -mod db; mod exporter; mod intermediate_rep; mod projects; mod task_processing; mod util; +#[cfg(feature = "query-sql")] +mod db; + use base64::engine::general_purpose; use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use beam_lib::{TaskRequest, TaskResult}; use futures_util::future::BoxFuture; use futures_util::FutureExt; use laplace_rs::ObfCache; -use sqlx::PgPool; use tokio::sync::Mutex; use crate::blaze::{parse_blaze_query_payload_ast, AstQuery}; @@ -118,22 +119,51 @@ pub async fn main() -> ExitCode { } } -async fn main_loop() -> ExitCode { - let db_pool = if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { - match db::get_pg_connection_pool(&connection_string, 8).await { +#[cfg(not(feature = "query-sql"))] +type DbPool = (); + +#[cfg(feature = "query-sql")] +type DbPool = sqlx::PgPool; + +#[cfg(not(feature = "query-sql"))] +async fn get_db_pool() -> Result,ExitCode> { + Ok(None) +} + +#[cfg(feature = "query-sql")] +async fn get_db_pool() -> Result,ExitCode> { + use tracing::info; + + if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { + match db::get_pg_connection_pool(&connection_string, CONFIG.max_db_attempts).await { Err(e) => { error!("Error connecting to database: {}", e); - return ExitCode::from(8); + Err(ExitCode::from(8)) + } + + Ok(pool) => { + info!("Postgresql connection established"); + Ok(Some(pool)) } - Ok(pool) => Some(pool), } } else { - None + Ok(None) + } +} + +async fn main_loop() -> ExitCode { + let db_pool = match get_db_pool().await { + Ok(pool) => pool, + Err(code) => { + return code; + }, }; let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type { EndpointType::Blaze => || blaze::check_availability().boxed(), EndpointType::Omop => || async { true }.boxed(), // TODO health check + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), + #[cfg(feature = "query-sql")] EndpointType::Sql => || async { true }.boxed(), }; let mut failures = 0; @@ -169,7 +199,7 @@ async fn process_task( task: &BeamTask, obf_cache: Arc>, report_cache: Arc>, - db_pool: Option, + db_pool: Option, ) -> Result { debug!("Processing task {}", task.id); @@ -217,6 +247,7 @@ async fn process_task( ) .await }, + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => { let mut generated_from_ast: bool = false; let data = base64_decode(&task.body)?; @@ -260,6 +291,7 @@ async fn process_task( .await } }, + #[cfg(feature="query-sql")] EndpointType::Sql => { let data = base64_decode(&task.body)?; let query_maybe: Result = serde_json::from_slice(&(data));