diff --git a/src/db_utils.rs b/src/db_utils.rs index 257fdaa..3b8bd58 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -85,7 +85,7 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { info!("Creating PostgreSQL tables"); let create_tables_queries = vec![ - "CREATE TABLE IF NOT EXISTS patients ( + "CREATE TABLE IF NOT EXISTS patient ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -97,13 +97,13 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, resource JSONB NOT NULL )", - "CREATE TABLE IF NOT EXISTS conditions ( + "CREATE TABLE IF NOT EXISTS condition ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, resource JSONB NOT NULL )", - "CREATE TABLE IF NOT EXISTS observations ( + "CREATE TABLE IF NOT EXISTS observation ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -117,7 +117,7 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { END; $$ LANGUAGE plpgsql;", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON patients + BEFORE UPDATE ON patient FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger @@ -125,11 +125,11 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON conditions + BEFORE UPDATE ON condition FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON observations + BEFORE UPDATE ON observation FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", ]; diff --git a/src/main.rs b/src/main.rs index deb25d0..419f9b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,10 +6,11 @@ use db_utils::*; use models::*; use tokio::select; -use tokio::time::{interval, Duration, sleep_until, Instant}; +use tokio::time::{interval, Duration}; use sqlx::{PgPool, Row}; use std::collections::BTreeMap; use anyhow::bail; +use anyhow::anyhow; use reqwest::Client; use serde_json; use std::env; @@ -43,8 +44,8 @@ pub fn get_version(resource: serde_json::Value) -> Option { } } -// get BTreeMap of all pairs in pg for the given resource type -pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { +// get BTreeMap of all pairs in pg for the given resource type +pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { let query = format!("SELECT id pk_id, resource::text FROM {}", table_name); let rows: Vec<(i32, String)> = sqlx::query_as(&query) .fetch_all(pg_con_pool) @@ -58,12 +59,13 @@ pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> let resource: serde_json::Value = match serde_json::from_str(&resource_str) { Ok(resource) => resource, Err(_) => continue - }; + }; + let res_version = get_version(resource); match res_version { Some(res_version) => { // only if resource_id is found something is inserted into resource_versions - pg_versions.insert(res_version.resource_id, BTreeMapValue {pk_id, version_id: res_version.version_id}); + pg_versions.insert(res_version.resource_id, PgVersion {pk_id, version_id: res_version.version_id}); } None => continue } @@ -144,45 +146,13 @@ pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result Result { - let client = Client::new(); +pub async fn get_blaze_search_set(url: &str, client: &Client) -> Result { let res = client.get(url).send().await; - match res { - Ok(res) => { - let res_text = match res.text().await { - Ok(text) => text, - Err(err) => { - error!("Failed to get payload: {}", err); - "".to_string() - } - }; - let search_set: Result = serde_json::from_str(&res_text); - match search_set { - Ok(search_set) => { - Ok(search_set) - } Err(err) => { - error!("Could not deserialize JSON to SearchSet: {}", err); - Err(anyhow::Error::new(err)) - } - } - }, - //no valid search response from Blaze - Err(err) => { - error!("Failed to get response from Blaze: {}", err); - Err(anyhow::Error::new(err)) - } - } -} - -//get the number of resources of a specific type from blaze -pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> Result { - - match get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).await { - Ok(search_set) => { - Ok(search_set.total as i64) - }, - Err(err) => Err(err) - } + let res = res.inspect_err(|e| error!("Failed to get response from Blaze: {}", e))?; + let res_text = res.text().await.inspect_err(|e| error!("Failed to get payload: {}", e))?; + let search_set: SearchSet = serde_json::from_str(&res_text) + .inspect_err(|e| error!("Could not deserialize JSON to SearchSet: {}", e))?; + Ok(search_set) } /// Synchronizes resources from Blaze to PostgreSQL @@ -193,8 +163,8 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R /// resource-ids that are found in pg but not in blaze : will be deleted from pg /// # Parameters /// -/// * `base_url`: The base URL of the Blaze API. -/// * `pg_client`: A mutable reference to a PostgreSQL client. +/// * `blaze_base_url`: The base URL of the Blaze API. +/// * `pg_con_pool`: A PostgreSQL connection pool. /// * `type_arg`: The type of resource to synchronize (e.g. "specimen", "patient", etc.). /// * `batch_size`: The number of resources to process in each batch. /// * `page_resource_count`: The number of resources to fetch from Blaze in each page. @@ -211,29 +181,22 @@ async fn sync_blaze_2_pg( ) -> Result<(), anyhow::Error>{ // set pg table name to insert into or update - let table_name = match type_arg.to_lowercase().as_str() { - "specimen" => "specimen", - "patient" => "patients", - "condition" => "conditions", - "observation" => "observations", - _ => { - bail!("Invalid type_arg!"); - } - }; + let table_name = type_arg.to_lowercase(); + let client = Client::new(); // vectors holding batches of items to insert/update let mut update_batch: Vec = Vec::new(); let mut insert_batch: Vec = Vec::new(); - let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?; + let mut pg_versions = get_pg_resource_versions(&table_name, pg_con_pool).await?; let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", blaze_base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; let mut insert_counter: u32 = 0; - info!("Attempting to sync: {}", &type_arg); + info!("Attempting to sync: {}", type_arg); loop { - let search_set = get_blaze_search_set(&url).await?; + let search_set = get_blaze_search_set(&url, &client).await?; let entries = match search_set.entry { Some(entries) => entries, None => { @@ -251,13 +214,8 @@ async fn sync_blaze_2_pg( continue } }; - - let resource_str = match serde_json::to_string(&e.resource) { - Ok(s) => s, - Err(_) => { - "{}".to_string() - } - }; + + let resource_str = serde_json::to_string(&e.resource).unwrap_or("{}".to_string()); match pg_versions.get(&blaze_version.resource_id) { Some(pg_version) => { //Resource is already in pg @@ -283,13 +241,12 @@ async fn sync_blaze_2_pg( } if update_counter > 0 && update_counter % batch_size == 0 { - update_helper(pg_con_pool, &update_batch, table_name).await?; + update_helper(pg_con_pool, &update_batch, &table_name).await?; update_batch.clear(); } else if insert_counter > 0 && insert_counter % batch_size == 0 { - insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; insert_batch.clear(); - } } @@ -307,16 +264,16 @@ async fn sync_blaze_2_pg( //insert or update the last remaining resources if update_batch.len() > 0 { update_counter += update_batch.len() as u32; - update_helper(pg_con_pool, &update_batch, table_name).await?; + update_helper(pg_con_pool, &update_batch, &table_name).await?; } if insert_batch.len() > 0 { insert_counter += insert_batch.len() as u32; - insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; } //remove rows from pg that were not encountered in blaze let delete_ids: Vec = pg_versions.values().map(|value| value.pk_id).collect(); if delete_ids.len() > 0 { - delete_helper(pg_con_pool, &delete_ids, table_name).await?; + delete_helper(pg_con_pool, &delete_ids, &table_name).await?; } info!("Updated {} rows", update_counter); @@ -324,8 +281,10 @@ async fn sync_blaze_2_pg( info!("Deleted {} rows", delete_ids.len() as u32); //compare total entry counts between blaze and pg - let row_count = get_num_rows(pg_con_pool, table_name).await?; - let resource_count = get_blaze_resource_count(blaze_base_url, type_arg).await?; + let row_count = get_num_rows(pg_con_pool, &table_name).await?; + let resource_count = get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg), &client) + .await + .map(|search_set| search_set.total as i64)?; if row_count != resource_count { warn!("{} entry counts do not match between Blaze and PostgreSQL", type_arg); } else { @@ -336,14 +295,18 @@ async fn sync_blaze_2_pg( } -pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error>{ +pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error> { let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; - - let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; + let table_names: Vec<&str> = vec!["specimen", "patient", "observation", "condition"]; //check preconditions for sync let blaze_available = check_blaze_connection(&config.blaze_base_url, config.blaze_num_connection_attempts).await?; + + if !blaze_available { + bail!("Aborting sync run because connection to Blaze could not be established"); + } + let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; if blaze_available && all_tables_exist { @@ -391,13 +354,8 @@ async fn main() -> Result<(), anyhow::Error>{ pg_batch_size: 10000, blaze_page_resource_count: 5000, blaze_num_connection_attempts: 20, - target_time: match NaiveTime::from_hms_opt(3, 0, 0) { - Some(time) => time, - None => { - error!("Invalid target time"); - return Err(anyhow::Error::msg("Invalid target time")); - } - } + target_time: NaiveTime::from_hms_opt(3, 0, 0) + .ok_or_else(|| anyhow!("Invalid target time"))? }; info!("fhir2sql started"); //@todo: replace with proper banner diff --git a/src/models.rs b/src/models.rs index ff0d760..b2abe3e 100644 --- a/src/models.rs +++ b/src/models.rs @@ -33,7 +33,7 @@ pub struct ResourceVersion { pub version_id: i64 } -pub struct BTreeMapValue { +pub struct PgVersion { pub pk_id: i32, //pg row primary key. Needed for upsert pub version_id: i64 //resource version_id }