Skip to content

Commit f73134e

Browse files
authored
Merge pull request #137 from samply/feature/exporter-status
exporter query status task type
2 parents 50ab353 + 9a094b9 commit f73134e

File tree

6 files changed

+191
-27
lines changed

6 files changed

+191
-27
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "focus"
3-
version = "0.5.1"
3+
version = "0.5.2"
44
edition = "2021"
55
license = "Apache-2.0"
66

src/blaze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub async fn run_cql_query(library: &Value, measure: &Value) -> Result<String, F
114114
let url: String = if let Ok(value) = get_json_field(&measure.to_string(), "url") {
115115
value.to_string().replace('"', "")
116116
} else {
117-
return Err(FocusError::CQLQueryError());
117+
return Err(FocusError::CQLQueryError);
118118
};
119119
debug!("Evaluating the Measure with canonical URL: {}", url);
120120

src/errors.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub enum FocusError {
1111
#[error("FHIR Measure evaluation error in Blaze: {0}")]
1212
MeasureEvaluationErrorBlaze(String),
1313
#[error("CQL query error")]
14-
CQLQueryError(),
14+
CQLQueryError,
1515
#[error("Unable to retrieve tasks from Beam: {0}")]
1616
UnableToRetrieveTasksHttp(beam_lib::BeamError),
1717
#[error("Unable to answer task: {0}")]
@@ -38,15 +38,18 @@ pub enum FocusError {
3838
UnableToPostAst(reqwest::Error),
3939
#[error("Unable to post Exporter query: {0}")]
4040
UnableToPostExporterQuery(reqwest::Error),
41+
#[error("Unable to get Exporter query status: {0}")]
42+
UnableToGetExporterQueryStatus(reqwest::Error),
4143
#[error("Exporter query error in Reqwest: {0}")]
4244
ExporterQueryErrorReqwest(String),
4345
#[error("AST Posting error in Reqwest: {0}")]
4446
AstPostingErrorReqwest(String),
4547
#[error("Invalid Header Value: {0}")]
4648
InvalidHeaderValue(http::header::InvalidHeaderValue),
4749
#[error("Missing Exporter Endpoint")]
48-
MissingExporterEndpoint(),
49-
50+
MissingExporterEndpoint,
51+
#[error("Missing Exporter Task Type")]
52+
MissingExporterTaskType,
5053
}
5154

5255
impl FocusError {

src/exporter.rs

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,23 @@ use http::header;
22
use http::HeaderMap;
33
use http::HeaderValue;
44
use http::StatusCode;
5+
use serde::Deserialize;
6+
use serde::Serialize;
7+
use serde_json::Value;
8+
use std::str;
59
use tracing::{debug, warn};
610

711
use crate::config::CONFIG;
812
use crate::errors::FocusError;
13+
use crate::util;
14+
15+
#[derive(Clone, PartialEq, Debug, Copy, Serialize, Deserialize)]
16+
#[serde(rename_all = "UPPERCASE")]
17+
pub enum TaskType {
18+
Execute,
19+
Create,
20+
Status,
21+
}
922

1023
struct Params {
1124
method: &'static str,
@@ -25,21 +38,13 @@ const EXECUTE: Params = Params {
2538
done: "executed",
2639
};
2740

28-
pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String, FocusError> {
41+
pub async fn post_exporter_query(body: &String, task_type: TaskType) -> Result<String, FocusError> {
2942
let Some(exporter_url) = &CONFIG.exporter_url else {
30-
return Err(FocusError::MissingExporterEndpoint());
43+
return Err(FocusError::MissingExporterEndpoint);
3144
};
3245

33-
let exporter_params = if execute { EXECUTE } else { CREATE };
34-
debug!("{} exporter query...", exporter_params.doing);
35-
3646
let mut headers = HeaderMap::new();
3747

38-
headers.insert(
39-
header::CONTENT_TYPE,
40-
HeaderValue::from_static("application/json"),
41-
);
42-
4348
if let Some(auth_header_value) = CONFIG.auth_header.clone() {
4449
headers.insert(
4550
"x-api-key",
@@ -48,6 +53,81 @@ pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String,
4853
);
4954
}
5055

56+
if task_type == TaskType::Status {
57+
let value: Value = serde_json::from_str(
58+
String::from_utf8(util::base64_decode(&body)?)
59+
.map_err(|e| {
60+
FocusError::DeserializationError(format!(
61+
r#"Task body is not a valid string {}"#,
62+
e
63+
))
64+
})?
65+
.as_str(),
66+
)
67+
.map_err(|e| {
68+
FocusError::DeserializationError(format!(r#"Task body is not a valid JSON: {}"#, e))
69+
})?;
70+
let id = value["query-execution-id"].as_str();
71+
if id.is_none() {
72+
return Err(FocusError::ParsingError(format!(
73+
r#"Body does not contain the id of the query to check the status of: {}"#,
74+
value
75+
)));
76+
}
77+
let id: &str = id.unwrap(); //we already made sure that it is not None
78+
79+
let resp = CONFIG
80+
.client
81+
.get(format!("{}status?query-execution-id={}", exporter_url, id))
82+
.headers(headers)
83+
.send()
84+
.await
85+
.map_err(FocusError::UnableToGetExporterQueryStatus)?;
86+
87+
debug!("asked for status for query id= {} ", id);
88+
89+
match resp.status() {
90+
StatusCode::OK => {
91+
let text = resp.text().await;
92+
match text {
93+
Ok(ok_text) => {
94+
return Ok(ok_text);
95+
}
96+
Err(e) => {
97+
warn!(
98+
"The code was 200 OK, but can't get the body of the Exporter's response for status of the query id={}, {}", id, e);
99+
return Err(FocusError::ExporterQueryErrorReqwest(format!(
100+
"Error while checking the status of the query id={}, the code was 200 OK, but can't get the body of the Exporter's response: {}",
101+
id, e
102+
)));
103+
}
104+
}
105+
}
106+
code => {
107+
warn!(
108+
"Got unexpected code {code} while checking the status of the query id={}, {:?}",
109+
id, resp
110+
);
111+
return Err(FocusError::ExporterQueryErrorReqwest(format!(
112+
"Error while checking the status of the query id={}, {:?}",
113+
id, resp
114+
)));
115+
}
116+
};
117+
}
118+
119+
let exporter_params = if task_type == TaskType::Execute {
120+
EXECUTE
121+
} else {
122+
CREATE
123+
};
124+
debug!("{} exporter query...", exporter_params.doing);
125+
126+
headers.insert(
127+
header::CONTENT_TYPE,
128+
HeaderValue::from_static("application/json"),
129+
);
130+
51131
let resp = CONFIG
52132
.client
53133
.post(format!("{}{}", exporter_url, exporter_params.method))
@@ -73,10 +153,8 @@ pub async fn post_exporter_query(body: &String, execute: bool) -> Result<String,
73153
"Error while {} query, the code was 200 OK, but can't get the body of the Exporter's response: {:?}",
74154
exporter_params.doing, body
75155
)));
76-
77156
}
78157
}
79-
80158
}
81159
code => {
82160
warn!(

src/main.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ type BeamResult = TaskResult<beam_lib::RawString>;
4848
#[derive(Debug, Deserialize, Serialize, Clone)]
4949
struct Metadata {
5050
project: String,
51-
#[serde(default)]
52-
execute: bool,
51+
task_type: Option<exporter::TaskType>,
5352
}
5453

5554
#[derive(Debug, Clone, Default)]
@@ -149,7 +148,7 @@ async fn process_task(
149148

150149
let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata {
151150
project: "default_obfuscation".to_string(),
152-
execute: true,
151+
task_type: None
153152
});
154153

155154
if metadata.project == "focus-healthcheck" {
@@ -160,10 +159,12 @@ async fn process_task(
160159
"healthy".into()
161160
));
162161
}
163-
164162
if metadata.project == "exporter" {
163+
if metadata.task_type.is_none() {
164+
return Err(FocusError::MissingExporterTaskType)
165+
}
165166
let body = &task.body;
166-
return Ok(run_exporter_query(task, body, metadata.execute).await)?;
167+
return Ok(run_exporter_query(task, body, metadata.task_type.unwrap()).await)?; //we already made sure that it is not None
167168
}
168169

169170
if CONFIG.endpoint_type == EndpointType::Blaze {
@@ -337,7 +338,7 @@ async fn run_intermediate_rep_query(
337338
async fn run_exporter_query(
338339
task: &BeamTask,
339340
body: &String,
340-
execute: bool,
341+
task_type: exporter::TaskType,
341342
) -> Result<BeamResult, FocusError> {
342343
let mut err = beam::beam_result::perm_failed(
343344
CONFIG.beam_app_id_long.clone(),
@@ -346,7 +347,7 @@ async fn run_exporter_query(
346347
String::new(),
347348
);
348349

349-
let exporter_result = exporter::post_exporter_query(body, execute).await?;
350+
let exporter_result = exporter::post_exporter_query(body, task_type).await?;
350351

351352
let result = beam_result(task.to_owned(), exporter_result).unwrap_or_else(|e| {
352353
err.body = beam_lib::RawString(e.to_string());
@@ -402,19 +403,28 @@ fn beam_result(task: BeamTask, measure_report: String) -> Result<BeamResult, Foc
402403
))
403404
}
404405

406+
405407
#[cfg(test)]
406408
mod test {
407409
use super::*;
408410

409411
const METADATA_STRING: &str = r#"{"project": "exliquid"}"#;
412+
const METADATA_STRING_EXPORTER: &str = r#"{"project": "exporter", "task_type": "EXECUTE"}"#;
410413

411414
#[test]
412415
fn test_metadata_deserialization_default() {
413416
let metadata: Metadata = serde_json::from_str(METADATA_STRING).unwrap_or(Metadata {
414417
project: "default_obfuscation".to_string(),
415-
execute: true,
418+
task_type: None
416419
});
417420

418-
assert!(!metadata.execute);
421+
assert_eq!(metadata.task_type, None);
419422
}
420-
}
423+
424+
#[test]
425+
fn test_metadata_deserialization_exporter() {
426+
let metadata: Metadata = serde_json::from_str(METADATA_STRING_EXPORTER).unwrap();
427+
428+
assert_eq!(metadata.task_type, Some(exporter::TaskType::Execute));
429+
}
430+
}

src/projects/shared/mod.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use std::collections::HashMap;
2+
3+
use indexmap::IndexSet;
4+
5+
use super::{CriterionRole, Project, ProjectName};
6+
7+
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Hash)]
8+
pub(crate) struct Shared;
9+
10+
impl Project for Shared {
11+
fn append_code_lists(&self, map: &mut HashMap<&'static str, &'static str>) {
12+
map.extend(
13+
[
14+
("icd10", "http://hl7.org/fhir/sid/icd-10"),
15+
("icd10gm", "http://fhir.de/CodeSystem/dimdi/icd-10-gm"),
16+
("icd10gmnew", "http://fhir.de/CodeSystem/bfarm/icd-10-gm"),
17+
("loinc", "http://loinc.org"),
18+
(
19+
"SampleMaterialType",
20+
"https://fhir.bbmri.de/CodeSystem/SampleMaterialType",
21+
),
22+
(
23+
"StorageTemperature",
24+
"https://fhir.bbmri.de/CodeSystem/StorageTemperature",
25+
),
26+
(
27+
"FastingStatus",
28+
"http://terminology.hl7.org/CodeSystem/v2-0916",
29+
),
30+
(
31+
"SmokingStatus",
32+
"http://hl7.org/fhir/uv/ips/ValueSet/current-smoking-status-uv-ips",
33+
),
34+
]);
35+
}
36+
37+
fn append_observation_loinc_codes(&self, map: &mut HashMap<&'static str, &'static str>) {
38+
map.extend(
39+
[
40+
("body_weight", "29463-7"),
41+
("bmi", "39156-5"),
42+
("smoking_status", "72166-2"),
43+
]);
44+
}
45+
46+
fn append_criterion_code_lists(&self, _map: &mut HashMap<&str, Vec<&str>>) {
47+
// none
48+
}
49+
50+
fn append_cql_snippets(&self, _map: &mut HashMap<(&str, CriterionRole), &str>) {
51+
// none
52+
}
53+
54+
fn append_mandatory_code_lists(&self, _set: &mut IndexSet<&str>) {
55+
// none
56+
}
57+
58+
fn append_cql_template(&self, _template: &mut String) {
59+
// none
60+
}
61+
62+
fn append_body(&self, _body: &mut String) {
63+
// none
64+
}
65+
66+
fn name(&self) -> &'static ProjectName {
67+
&ProjectName::NotSpecified
68+
}
69+
70+
fn append_sample_type_workarounds(&self, _map: &mut HashMap<&str, Vec<&str>>) {
71+
//none
72+
}
73+
}

0 commit comments

Comments
 (0)