Skip to content

Commit

Permalink
changes according to review
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmscholz committed Jul 25, 2024
1 parent 846819a commit e3ecf43
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 89 deletions.
12 changes: 6 additions & 6 deletions src/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> {
info!("Creating PostgreSQL tables");

let create_tables_queries = vec![
"CREATE TABLE IF NOT EXISTS patients (
"CREATE TABLE IF NOT EXISTS patient (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -97,13 +97,13 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> {
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS conditions (
"CREATE TABLE IF NOT EXISTS condition (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS observations (
"CREATE TABLE IF NOT EXISTS observation (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand All @@ -117,19 +117,19 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> {
END;
$$ LANGUAGE plpgsql;",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON patients
BEFORE UPDATE ON patient
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON specimen
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON conditions
BEFORE UPDATE ON condition
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON observations
BEFORE UPDATE ON observation
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
];
Expand Down
122 changes: 40 additions & 82 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use db_utils::*;
use models::*;

use tokio::select;
use tokio::time::{interval, Duration, sleep_until, Instant};
use tokio::time::{interval, Duration};
use sqlx::{PgPool, Row};
use std::collections::BTreeMap;
use anyhow::bail;
use anyhow::anyhow;
use reqwest::Client;
use serde_json;
use std::env;
Expand Down Expand Up @@ -43,8 +44,8 @@ pub fn get_version(resource: serde_json::Value) -> Option<ResourceVersion> {
}
}

// get BTreeMap of all <resource_id, BTreeMapValue> pairs in pg for the given resource type
pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result<BTreeMap<String,BTreeMapValue>, anyhow::Error> {
// get BTreeMap of all <resource_id, PgVersion> pairs in pg for the given resource type
pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result<BTreeMap<String,PgVersion>, anyhow::Error> {
let query = format!("SELECT id pk_id, resource::text FROM {}", table_name);
let rows: Vec<(i32, String)> = sqlx::query_as(&query)
.fetch_all(pg_con_pool)
Expand All @@ -58,12 +59,13 @@ pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) ->
let resource: serde_json::Value = match serde_json::from_str(&resource_str) {
Ok(resource) => resource,
Err(_) => continue
};
};

let res_version = get_version(resource);
match res_version {
Some(res_version) => {
// only if resource_id is found something is inserted into resource_versions
pg_versions.insert(res_version.resource_id, BTreeMapValue {pk_id, version_id: res_version.version_id});
pg_versions.insert(res_version.resource_id, PgVersion {pk_id, version_id: res_version.version_id});
}
None => continue
}
Expand Down Expand Up @@ -144,45 +146,13 @@ pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result<i64,
Ok(count)
}

pub async fn get_blaze_search_set(url: &str) -> Result<SearchSet, anyhow::Error> {
let client = Client::new();
pub async fn get_blaze_search_set(url: &str, client: &Client) -> Result<SearchSet, anyhow::Error> {
let res = client.get(url).send().await;
match res {
Ok(res) => {
let res_text = match res.text().await {
Ok(text) => text,
Err(err) => {
error!("Failed to get payload: {}", err);
"".to_string()
}
};
let search_set: Result<SearchSet, serde_json::Error> = serde_json::from_str(&res_text);
match search_set {
Ok(search_set) => {
Ok(search_set)
} Err(err) => {
error!("Could not deserialize JSON to SearchSet: {}", err);
Err(anyhow::Error::new(err))
}
}
},
//no valid search response from Blaze
Err(err) => {
error!("Failed to get response from Blaze: {}", err);
Err(anyhow::Error::new(err))
}
}
}

//get the number of resources of a specific type from blaze
pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> Result<i64, anyhow::Error> {

match get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).await {
Ok(search_set) => {
Ok(search_set.total as i64)
},
Err(err) => Err(err)
}
let res = res.inspect_err(|e| error!("Failed to get response from Blaze: {}", e))?;
let res_text = res.text().await.inspect_err(|e| error!("Failed to get payload: {}", e))?;
let search_set: SearchSet = serde_json::from_str(&res_text)
.inspect_err(|e| error!("Could not deserialize JSON to SearchSet: {}", e))?;
Ok(search_set)
}

/// Synchronizes resources from Blaze to PostgreSQL
Expand All @@ -193,8 +163,8 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R
/// resource-ids that are found in pg but not in blaze : will be deleted from pg
/// # Parameters
///
/// * `base_url`: The base URL of the Blaze API.
/// * `pg_client`: A mutable reference to a PostgreSQL client.
/// * `blaze_base_url`: The base URL of the Blaze API.
/// * `pg_con_pool`: A PostgreSQL connection pool.
/// * `type_arg`: The type of resource to synchronize (e.g. "specimen", "patient", etc.).
/// * `batch_size`: The number of resources to process in each batch.
/// * `page_resource_count`: The number of resources to fetch from Blaze in each page.
Expand All @@ -211,29 +181,22 @@ async fn sync_blaze_2_pg(
) -> Result<(), anyhow::Error>{

// set pg table name to insert into or update
let table_name = match type_arg.to_lowercase().as_str() {
"specimen" => "specimen",
"patient" => "patients",
"condition" => "conditions",
"observation" => "observations",
_ => {
bail!("Invalid type_arg!");
}
};
let table_name = type_arg.to_lowercase();
let client = Client::new();

// vectors holding batches of items to insert/update
let mut update_batch: Vec<PgUpdateItem> = Vec::new();
let mut insert_batch: Vec<PgInsertItem> = Vec::new();

let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?;
let mut pg_versions = get_pg_resource_versions(&table_name, pg_con_pool).await?;

let mut url: String = format!("{}/fhir/{}?_count={}&_history=current",
blaze_base_url, type_arg, page_resource_count);
let mut update_counter: u32 = 0;
let mut insert_counter: u32 = 0;
info!("Attempting to sync: {}", &type_arg);
info!("Attempting to sync: {}", type_arg);
loop {
let search_set = get_blaze_search_set(&url).await?;
let search_set = get_blaze_search_set(&url, &client).await?;
let entries = match search_set.entry {
Some(entries) => entries,
None => {
Expand All @@ -251,13 +214,8 @@ async fn sync_blaze_2_pg(
continue
}
};

let resource_str = match serde_json::to_string(&e.resource) {
Ok(s) => s,
Err(_) => {
"{}".to_string()
}
};

let resource_str = serde_json::to_string(&e.resource).unwrap_or("{}".to_string());

match pg_versions.get(&blaze_version.resource_id) {
Some(pg_version) => { //Resource is already in pg
Expand All @@ -283,13 +241,12 @@ async fn sync_blaze_2_pg(
}

if update_counter > 0 && update_counter % batch_size == 0 {
update_helper(pg_con_pool, &update_batch, table_name).await?;
update_helper(pg_con_pool, &update_batch, &table_name).await?;
update_batch.clear();

} else if insert_counter > 0 && insert_counter % batch_size == 0 {
insert_helper(pg_con_pool, &insert_batch, table_name).await?;
insert_helper(pg_con_pool, &insert_batch, &table_name).await?;
insert_batch.clear();

}

}
Expand All @@ -307,25 +264,27 @@ async fn sync_blaze_2_pg(
//insert or update the last remaining resources
if update_batch.len() > 0 {
update_counter += update_batch.len() as u32;
update_helper(pg_con_pool, &update_batch, table_name).await?;
update_helper(pg_con_pool, &update_batch, &table_name).await?;
}
if insert_batch.len() > 0 {
insert_counter += insert_batch.len() as u32;
insert_helper(pg_con_pool, &insert_batch, table_name).await?;
insert_helper(pg_con_pool, &insert_batch, &table_name).await?;
}
//remove rows from pg that were not encountered in blaze
let delete_ids: Vec<i32> = pg_versions.values().map(|value| value.pk_id).collect();
if delete_ids.len() > 0 {
delete_helper(pg_con_pool, &delete_ids, table_name).await?;
delete_helper(pg_con_pool, &delete_ids, &table_name).await?;
}

info!("Updated {} rows", update_counter);
info!("Inserted {} rows", insert_counter);
info!("Deleted {} rows", delete_ids.len() as u32);

//compare total entry counts between blaze and pg
let row_count = get_num_rows(pg_con_pool, table_name).await?;
let resource_count = get_blaze_resource_count(blaze_base_url, type_arg).await?;
let row_count = get_num_rows(pg_con_pool, &table_name).await?;
let resource_count = get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg), &client)
.await
.map(|search_set| search_set.total as i64)?;
if row_count != resource_count {
warn!("{} entry counts do not match between Blaze and PostgreSQL", type_arg);
} else {
Expand All @@ -336,14 +295,18 @@ async fn sync_blaze_2_pg(
}


pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error>{
pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error> {
let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"];

let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"];
let table_names: Vec<&str> = vec!["specimen", "patient", "observation", "condition"];

//check preconditions for sync
let blaze_available = check_blaze_connection(&config.blaze_base_url,
config.blaze_num_connection_attempts).await?;

if !blaze_available {
bail!("Aborting sync run because connection to Blaze could not be established");
}

let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?;

if blaze_available && all_tables_exist {
Expand Down Expand Up @@ -391,13 +354,8 @@ async fn main() -> Result<(), anyhow::Error>{
pg_batch_size: 10000,
blaze_page_resource_count: 5000,
blaze_num_connection_attempts: 20,
target_time: match NaiveTime::from_hms_opt(3, 0, 0) {
Some(time) => time,
None => {
error!("Invalid target time");
return Err(anyhow::Error::msg("Invalid target time"));
}
}
target_time: NaiveTime::from_hms_opt(3, 0, 0)
.ok_or_else(|| anyhow!("Invalid target time"))?
};

info!("fhir2sql started"); //@todo: replace with proper banner
Expand Down
2 changes: 1 addition & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct ResourceVersion {
pub version_id: i64
}

pub struct BTreeMapValue {
pub struct PgVersion {
pub pk_id: i32, //pg row primary key. Needed for upsert
pub version_id: i64 //resource version_id
}
Expand Down

0 comments on commit e3ecf43

Please sign in to comment.