diff --git a/.gitignore b/.gitignore index 6bde8de..048acb5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ .env .vscode Cargo.lock +.clj-kondo/ +.lsp/ +Dockerfile_test diff --git a/Cargo.toml b/Cargo.toml index f3cf6c9..0692427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,10 +12,10 @@ dotenv = "0.15" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]} -tokio-postgres = "0.7.10" -diesel = { version = "2.2.0", features = ["postgres", "r2d2"] } +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "full", "time", "signal"]} +sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] } anyhow = "1.0.58" +chrono = "0.4" # Logging tracing = "0.1.37" diff --git a/README.md b/README.md new file mode 100644 index 0000000..3361f60 --- /dev/null +++ b/README.md @@ -0,0 +1,35 @@ +**FHIR2SQL** +===================== + +A Rust-based application that synchronizes FHIR (Fast Healthcare Interoperability Resources) data with a PostgreSQL database. + +**Overview** +------------ + +This application connects to a FHIR server, retrieves data, and syncs it with a PostgreSQL database. It uses the `sqlx` library for database interactions and `reqwest` for making HTTP requests to the FHIR server. The application is designed to run continuously, syncing data at regular intervals. + +**Features** +------------ + +* Connects to a PostgreSQL database and creates necessary tables and triggers if they don't exist +* Retrieves FHIR data from a specified server and syncs it with the PostgreSQL database +* Supports regular syncing at a specified interval +* Handles errors and retries connections to the FHIR server and PostgreSQL database +* Supports graceful shutdown on SIGTERM and SIGINT signals + +**Components** +-------------- + +* `main.rs`: The main application entry point +* `db_utils.rs`: Database utility functions for connecting to PostgreSQL and creating tables and triggers +* `models.rs`: Data models for FHIR resources and database interactions +* `graceful_shutdown.rs`: Functions for handling graceful shutdown on SIGTERM and SIGINT signals + +**Getting Started** +------------------- + +To use this application, you'll need to: + +1. Install Rust and the required dependencies +2. Configure the application by setting environment variables for the FHIR server URL, PostgreSQL connection details, and syncing interval +3. Run the application using `cargo run` \ No newline at end of file diff --git a/diesel.toml b/diesel.toml deleted file mode 100644 index de53284..0000000 --- a/diesel.toml +++ /dev/null @@ -1,6 +0,0 @@ -# For documentation on how to configure this file, -# see https://diesel.rs/guides/configuring-diesel-cli - -[print_schema] -file = "src/schema.rs" -custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] diff --git a/migrations/.keep b/migrations/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/migrations/00000000000000_diesel_initial_setup/down.sql b/migrations/00000000000000_diesel_initial_setup/down.sql deleted file mode 100644 index a9f5260..0000000 --- a/migrations/00000000000000_diesel_initial_setup/down.sql +++ /dev/null @@ -1,6 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - -DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); -DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/migrations/00000000000000_diesel_initial_setup/up.sql b/migrations/00000000000000_diesel_initial_setup/up.sql deleted file mode 100644 index d68895b..0000000 --- a/migrations/00000000000000_diesel_initial_setup/up.sql +++ /dev/null @@ -1,36 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - - - - --- Sets up a trigger for the given table to automatically set a column called --- `updated_at` whenever the row is modified (unless `updated_at` was included --- in the modified columns) --- --- # Example --- --- ```sql --- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); --- --- SELECT diesel_manage_updated_at('users'); --- ``` -CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ -BEGIN - EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s - FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); -END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ -BEGIN - IF ( - NEW IS DISTINCT FROM OLD AND - NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at - ) THEN - NEW.updated_at := current_timestamp; - END IF; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; diff --git a/migrations/2024-06-18-075834_create_tables/down.sql b/migrations/2024-06-18-075834_create_tables/down.sql deleted file mode 100644 index 0029e40..0000000 --- a/migrations/2024-06-18-075834_create_tables/down.sql +++ /dev/null @@ -1,4 +0,0 @@ -drop table patients; -drop table specimen; -drop table conditions; -drop function update_last_updated(); diff --git a/migrations/2024-06-18-075834_create_tables/up.sql b/migrations/2024-06-18-075834_create_tables/up.sql deleted file mode 100644 index f960bca..0000000 --- a/migrations/2024-06-18-075834_create_tables/up.sql +++ /dev/null @@ -1,43 +0,0 @@ -CREATE TABLE patients ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE TABLE specimen ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE TABLE conditions ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE OR REPLACE FUNCTION update_last_updated() -RETURNS TRIGGER AS $$ -BEGIN - NEW.last_updated_at = CURRENT_TIMESTAMP; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON patients -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON specimen -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON conditions -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); \ No newline at end of file diff --git a/src/db_utils.rs b/src/db_utils.rs new file mode 100644 index 0000000..3b8bd58 --- /dev/null +++ b/src/db_utils.rs @@ -0,0 +1,144 @@ +use sqlx::{postgres::PgPoolOptions, PgPool, Row}; +use tracing::{error, info}; +use anyhow::anyhow; +use reqwest::Client; + +pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result { + info!("Trying to establish a PostgreSQL connection pool"); + + let mut attempts = 0; + let mut err: Option = None; + + while attempts < num_attempts { + info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts); + match PgPoolOptions::new() + .max_connections(10) + .connect(&pg_url) + .await + { + Ok(pg_con_pool) => { + info!("PostgreSQL connection successfull"); + return Ok(pg_con_pool) + }, + Err(e) => { + error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + err = Some(anyhow!(e)); + } + } + attempts += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else? + } + Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL"))) +} + + +pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> Result { + info!("Attempting to connect to Blaze"); + + let mut attempts = 0; + let mut err: Option = None; + let client = Client::new(); + + while attempts < num_attempts { + info!("Attempt to connect to Blaze {} of {}", attempts + 1, num_attempts); + match client.get(format!("{}/health", blaze_base_url)).send().await { + Ok(_) => { + info!("Blaze connection successfull"); + return Ok(true) + }, + Err(e) => { + error!("Failed to connect to Blaze. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + err = Some(anyhow!(e)); + } + } + attempts += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else? + } + Err(err.unwrap_or_else(|| anyhow!("Failed to connect to Blaze"))) + +} + + +//function that checks wheter a given list of required tables exist in pg +pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result { + info!("Checking whether PostgreSQL tables exist"); + + let table_query: &str = r#"select table_name from information_schema.tables;"#; + + let rows = sqlx::query(table_query) + .fetch_all(pg_con_pool) + .await + .map_err(|err| { + error!("Failed to execute query: {}", err); + anyhow::Error::new(err) + })?; + + let pg_table_names: Vec = rows.into_iter().map(|row| row.get(0)).collect(); + let all_tables_exist = table_names.iter().all(|table_name| pg_table_names.contains(&table_name.to_string())); + + + Ok(all_tables_exist) +} + +//create necessary tables and triggers in pg if they don't exist yet +pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { + info!("Creating PostgreSQL tables"); + + let create_tables_queries = vec![ + "CREATE TABLE IF NOT EXISTS patient ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS specimen ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS condition ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS observation ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE OR REPLACE FUNCTION update_last_updated() + RETURNS TRIGGER AS $$ + BEGIN + NEW.last_updated_at = CURRENT_TIMESTAMP; + RETURN NEW; + END; + $$ LANGUAGE plpgsql;", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON patient + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON specimen + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON condition + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON observation + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + ]; + + for query in create_tables_queries { + sqlx::query(query) + .execute(pg_con_pool) + .await?; + } + + Ok(()) +} diff --git a/src/graceful_shutdown.rs b/src/graceful_shutdown.rs new file mode 100644 index 0000000..a416082 --- /dev/null +++ b/src/graceful_shutdown.rs @@ -0,0 +1,16 @@ +use tracing::info; + +pub async fn wait_for_signal() { + use tokio::signal::unix::{signal,SignalKind}; + let mut sigterm = signal(SignalKind::terminate()) + .expect("Unable to register shutdown handler"); + let mut sigint = signal(SignalKind::interrupt()) + .expect("Unable to register shutdown handler"); + let signal = tokio::select! { + _ = sigterm.recv() => "SIGTERM", + _ = sigint.recv() => "SIGINT" + }; + // The following does not print in docker-compose setups but it does when run individually. + // Probably a docker-compose error. + info!("Received signal ({signal}) - shutting down gracefully."); +} diff --git a/src/main.rs b/src/main.rs index 53c6fa4..1bd2ed5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,59 +1,35 @@ +mod db_utils; +mod graceful_shutdown; +mod models; + +use db_utils::*; +use models::*; + +use tokio::select; +use tokio::time::{interval, Duration}; +use sqlx::{PgPool, Row}; use std::collections::BTreeMap; use anyhow::bail; +use anyhow::anyhow; use reqwest::Client; -use serde::{Deserialize, Serialize}; use serde_json; -use dotenv::dotenv; -use tokio_postgres::{Client as PgClient, NoTls}; -use tracing::{info, warn, error}; +use std::env; +use tracing::{error, info, warn}; use tracing_subscriber; - - -#[derive(Deserialize, Serialize)] -struct Entry { - resource: serde_json::Value, //interpret resource as Value -} - -#[derive(Deserialize, Serialize)] -struct Search { - mode: String, -} - -#[derive(Deserialize, Serialize)] -struct SearchSet { - id: String, - #[serde(rename = "type")] - type_: String, - entry: Option>, - link: Vec, - total: u32, - #[serde(rename = "resourceType")] - resource_type: String, -} - -#[derive(Deserialize, Serialize)] -struct Link { - relation: String, - url: String, -} - -pub struct ResourceVersion { - resource_id: String, - version_id: i64 -} - -pub struct BTreeMapValue { - pk_id: i32, //pg row primary key. Needed for upsert - version_id: i64 //resource version_id -} - -pub struct PgInsertItem { - resource: String -} - -pub struct PgUpdateItem { - id: i32, - resource: String +use dotenv::dotenv; +use chrono::{NaiveTime, Timelike, Utc}; + +pub struct Config { + blaze_base_url: String, + pg_host: String, + pg_username: String, + pg_password: String, + pg_dbname: String, + pg_port: u16, + pg_batch_size: u32, + blaze_page_resource_count: u32, + blaze_num_connection_attempts: u32, + target_time: NaiveTime, } // read id and version_id from a resource if possible @@ -68,25 +44,28 @@ pub fn get_version(resource: serde_json::Value) -> Option { } } -// get BTreeMap of all pairs in pg for the given resource type -pub async fn get_pg_resource_versions(table_name: &str, pg_client: &PgClient) -> Result, anyhow::Error> { - let rows = pg_client.query( - &format!("SELECT id pk_id, resource::text FROM {};", table_name), &[]).await?; +// get BTreeMap of all pairs in pg for the given resource type +pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { + let query = format!("SELECT id pk_id, resource::text FROM {}", table_name); + let rows: Vec<(i32, String)> = sqlx::query_as(&query) + .fetch_all(pg_con_pool) + .await?; let mut pg_versions = BTreeMap::new(); for row in rows { - let pk_id: i32 = row.get("pk_id"); - let resource_str: String = row.get("resource"); + let pk_id: i32 = row.0; + let resource_str: String = row.1; let resource: serde_json::Value = match serde_json::from_str(&resource_str) { Ok(resource) => resource, Err(_) => continue - }; + }; + let res_version = get_version(resource); match res_version { Some(res_version) => { // only if resource_id is found something is inserted into resource_versions - pg_versions.insert(res_version.resource_id, BTreeMapValue {pk_id, version_id: res_version.version_id}); + pg_versions.insert(res_version.resource_id, PgVersion {pk_id, version_id: res_version.version_id}); } None => continue } @@ -94,15 +73,14 @@ pub async fn get_pg_resource_versions(table_name: &str, pg_client: &PgClient) -> Ok(pg_versions) } -pub async fn update_helper(pg_client: &mut PgClient, items: &[PgUpdateItem], table_name: &str) -> Result<(), anyhow::Error> { - let tx = pg_client.transaction().await?; +//do a batch update of pg rows +pub async fn update_helper(pg_con_pool: &PgPool, items: &[PgUpdateItem], table_name: &str) -> Result<(), sqlx::Error> { let values: Vec<_> = items .into_iter() .map(|item| format!("({}, $${}$$)", item.id, item.resource)) .collect(); - //@todo implement this let query = format!( "UPDATE {} SET resource = data.resource::jsonb FROM (VALUES {}) AS data(id, resource) WHERE data.id = {}.id", table_name, @@ -110,14 +88,15 @@ pub async fn update_helper(pg_client: &mut PgClient, items: &[PgUpdateItem], tab table_name ); - tx.execute(&query, &[]).await?; - tx.commit().await?; + sqlx::query(&query) + .execute(pg_con_pool) + .await?; Ok(()) } -pub async fn insert_helper(pg_client: &mut PgClient, items: &[PgInsertItem], table_name: &str) -> Result<(), anyhow::Error> { - let tx = pg_client.transaction().await?; +//do a batch insert of pg rows +pub async fn insert_helper(pg_con_pool: &PgPool, items: &[PgInsertItem], table_name: &str) -> Result<(), anyhow::Error> { let values: Vec<_> = items .into_iter() @@ -129,14 +108,15 @@ pub async fn insert_helper(pg_client: &mut PgClient, items: &[PgInsertItem], tab table_name, values.join(",") ); - tx.execute(&query, &[]).await?; - tx.commit().await?; + sqlx::query(&query) + .execute(pg_con_pool) + .await?; Ok(()) } -pub async fn delete_helper(pg_client: &mut PgClient, items: &[i32], table_name: &str) -> Result<(), anyhow::Error> { - let tx = pg_client.transaction().await?; +//do a batch delete of pg rows +pub async fn delete_helper(pg_con_pool: &PgPool, items: &[i32], table_name: &str) -> Result<(), anyhow::Error> { let values: Vec<_> = items .into_iter() @@ -148,217 +128,286 @@ pub async fn delete_helper(pg_client: &mut PgClient, items: &[i32], table_name: table_name, values.join(",") ); - tx.execute(&query, &[]).await?; - tx.commit().await?; + + sqlx::query(&query) + .execute(pg_con_pool) + .await?; Ok(()) } -async fn sync( - base_url: &str, - pg_client: &mut PgClient, - type_arg: &str, - batch_size: u32, - page_resource_count: u32 - ) -> Result<(), anyhow::Error>{ +// get the number of rows of a given pg table +pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result { + let query = format!("SELECT COUNT(*) FROM {}", table_name); + let row = sqlx::query(query.as_str()) + .fetch_one(pg_con_pool) + .await?; + let count: i64 = row.try_get(0)?; + Ok(count) +} + +pub async fn get_blaze_search_set(url: &str, client: &Client) -> Result { + let res = client.get(url).send().await; + let res = res.inspect_err(|e| error!("Failed to get response from Blaze: {}", e))?; + let res_text = res.text().await.inspect_err(|e| error!("Failed to get payload: {}", e))?; + let search_set: SearchSet = serde_json::from_str(&res_text) + .inspect_err(|e| error!("Could not deserialize JSON to SearchSet: {}", e))?; + Ok(search_set) +} + +/// Synchronizes resources from Blaze to PostgreSQL +/// +/// pg_versions holds a BTreeMap that is used to look up resource version_ids currently in pg +/// resource-ids that are found in Blaze but not in pg: will be inserted +/// resource-ids that are found in Blaze with a different version_id than in pg: will be updated +/// resource-ids that are found in pg but not in blaze : will be deleted from pg +/// # Parameters +/// +/// * `blaze_base_url`: The base URL of the Blaze API. +/// * `pg_con_pool`: A PostgreSQL connection pool. +/// * `type_arg`: The type of resource to synchronize (e.g. "specimen", "patient", etc.). +/// * `batch_size`: The number of resources to process in each batch. +/// * `page_resource_count`: The number of resources to fetch from Blaze in each page. +/// +/// # Returns +/// +/// A `Result` indicating whether the synchronization was successful. If an error occurs, it will be returned as an `anyhow::Error`. +async fn sync_blaze_2_pg( + blaze_base_url: &str, + pg_con_pool: &PgPool, + type_arg: &str, + batch_size: u32, + page_resource_count: u32 +) -> Result<(), anyhow::Error>{ // set pg table name to insert into or update - let table_name = match type_arg.to_lowercase().as_str() { - "specimen" => "specimen", - "patient" => "patients", - "condition" => "conditions", - "observation" => "observations", - _ => { - bail!("Invalid type_arg!"); - } - }; + let table_name = type_arg.to_lowercase(); + let client = Client::new(); // vectors holding batches of items to insert/update let mut update_batch: Vec = Vec::new(); let mut insert_batch: Vec = Vec::new(); - let mut pg_versions = get_pg_resource_versions(table_name, &pg_client).await?; - - let client = Client::new(); - let mut url: String = format!("{}{}?_count={}", base_url, type_arg, page_resource_count); + let mut pg_versions = get_pg_resource_versions(&table_name, pg_con_pool).await?; + + let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", + blaze_base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; - let mut insert_counter: u32 = 0; + let mut insert_counter: u32 = 0; + info!("Attempting to sync: {}", type_arg); loop { - let res = client.get(&url).send().await; - match res { - Ok(res) => { - let res_text = match res.text().await { - Ok(text) => text, - Err(err) => { - error!("failed to get payload: {}", err); - break; - } - }; - let search_set: Result = serde_json::from_str(&res_text); - match search_set { - Ok(search_set) => { - let entries = match search_set.entry { - Some(entries) => entries, - None => break - }; - info!("Processing: {}", url); - for e in entries { - - let blaze_version = get_version(e.resource.clone()); - let blaze_version = match blaze_version { - Some(v) => v, - None => { - //@todo: log stuff (to pg?) and - continue - } - }; - - let resource_str = match serde_json::to_string(&e.resource) { - Ok(s) => s, - Err(e) => { - //@todo: log error - "{}".to_string() // use empty JSON object - } - }; - - match pg_versions.get(&blaze_version.resource_id) { - Some(pg_version) => { //resource is already in pg - if pg_version.version_id < blaze_version.version_id { - //Resource exists in pg but is outdated. - //Add e.resource for batch upsert into pg - update_counter += 1; - update_batch.push(PgUpdateItem {id: pg_version.pk_id, resource: resource_str.clone()}); - //remove all entries from pg_versions that can be found in Blaze - //=> remaining ones need to be deleted from pg - pg_versions.remove(&blaze_version.resource_id); - } - }, - None => { // current resource not (yet) in pg - //@todo: e.resource for batch insert into pg - insert_counter += 1; - insert_batch.push(PgInsertItem {resource: resource_str.clone()}); - } - } - - if update_counter > 0 && update_counter % batch_size == 0 { - update_helper(pg_client, &update_batch, table_name).await?; - update_batch.clear(); - - } else if insert_counter > 0 && insert_counter % batch_size == 0 { - insert_helper(pg_client, &insert_batch, table_name).await?; - insert_batch.clear(); - - } - - } - // extract link to next page if exists or break - let next_url = search_set.link.iter() - .find(|link| link.relation == "next") - .map(|link| link.url.clone()); - - if let Some(next_url) = next_url { - url = next_url; - } else { - break; - } - }, - Err(error) => { - bail!("Error deserializing JSON: {}", error); //@todo: log - } - } - }, - //no valid search response from Blaze - Err(err) => { - eprintln!("Failed to get response from Blaze: {}", err); //@todo: log + let search_set = get_blaze_search_set(&url, &client).await?; + let entries = match search_set.entry { + Some(entries) => entries, + None => { + warn!("Could not read entries from search set."); break; } + }; + for e in entries { + + let resource_str = match serde_json::to_string(&e.resource) { + Ok(resource) => resource, + Err(err) => { + warn!("Could not read resource from search set entry: {}", err); + continue + } + }; + + let blaze_version = get_version(e.resource.clone()); + let blaze_version = match blaze_version { + Some(v) => v, + None => { + warn!("Could not read resource version from Blaze search set entry."); + continue + } + }; + + match pg_versions.get(&blaze_version.resource_id) { + Some(pg_version) => { //Resource is already in pg + if pg_version.version_id < blaze_version.version_id || + pg_version.version_id > blaze_version.version_id + { + //Resource exists in pg but is outdated. + //Add resource for batch update into pg + update_counter += 1; + update_batch.push(PgUpdateItem {id: pg_version.pk_id, resource: resource_str.clone()}); + //Remove all entries from pg_versions that can be found in Blaze + //=> remaining ones need to be deleted from pg + pg_versions.remove(&blaze_version.resource_id); + } else { //Blaze and pg versions match + pg_versions.remove(&blaze_version.resource_id); + } + }, + None => { // current resource not (yet) in pg + //Add resource for batch insert into pg + insert_counter += 1; + insert_batch.push(PgInsertItem {resource: resource_str.clone()}); + } + } + + if update_counter > 0 && update_counter % batch_size == 0 { + update_helper(pg_con_pool, &update_batch, &table_name).await?; + update_batch.clear(); + + } else if insert_counter > 0 && insert_counter % batch_size == 0 { + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; + insert_batch.clear(); + } + + } + // extract link to next page if exists or break + let next_url = search_set.link.iter() + .find(|link| link.relation == "next") + .map(|link| link.url.clone()); + + if let Some(next_url) = next_url { + url = next_url; + } else { + break; } } //insert or update the last remaining resources if update_batch.len() > 0 { - update_helper(pg_client, &update_batch, table_name).await?; + update_counter += update_batch.len() as u32; + update_helper(pg_con_pool, &update_batch, &table_name).await?; } if insert_batch.len() > 0 { - insert_helper(pg_client, &insert_batch, table_name).await?; + insert_counter += insert_batch.len() as u32; + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; } //remove rows from pg that were not encountered in blaze let delete_ids: Vec = pg_versions.values().map(|value| value.pk_id).collect(); - if delete_ids.len() > 0 { - delete_helper(pg_client, &delete_ids, table_name).await?; + if delete_ids.len() > 0 { + delete_helper(pg_con_pool, &delete_ids, &table_name).await?; } + + info!("Updated {} rows", update_counter); + info!("Inserted {} rows", insert_counter); + info!("Deleted {} rows", delete_ids.len() as u32); + + //compare total entry counts between blaze and pg + let row_count = get_num_rows(pg_con_pool, &table_name).await?; + let resource_count = get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg), &client) + .await + .map(|search_set| search_set.total as i64)?; + if row_count != resource_count { + warn!("{} entry counts do not match between Blaze and PostgreSQL", type_arg); + } else { + info!("{} entry counts match between Blaze and PostgreSQL \u{2705}", type_arg); + } + + Ok(()) +} + + +pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error> { + let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; + let table_names: Vec<&str> = vec!["specimen", "patient", "observation", "condition"]; + //check preconditions for sync + let blaze_available = check_blaze_connection(&config.blaze_base_url, + config.blaze_num_connection_attempts).await?; + + if !blaze_available { + bail!("Aborting sync run because connection to Blaze could not be established"); + } + + let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; + + if blaze_available && all_tables_exist { + info!("All tables found as expected"); + for type_arg in type_args { + sync_blaze_2_pg( + &config.blaze_base_url, + pg_con_pool, + type_arg, + config.pg_batch_size, + config.blaze_page_resource_count).await?; + } + } else if blaze_available && !all_tables_exist { + create_tables(pg_con_pool).await?; + for type_arg in type_args { + sync_blaze_2_pg( + &config.blaze_base_url, + pg_con_pool, + type_arg, + config.pg_batch_size, + config.blaze_page_resource_count).await?; + } + } Ok(()) } -//@todo: add check whether to number of blaze resources exactly matches the number of resources in pg after inserts #[tokio::main] async fn main() -> Result<(), anyhow::Error>{ - // Initialize tracing subscriber + tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .init(); - //let args: Vec = env::args().collect(); - /* - if args.len() != 3 || args[1] != "upsert" { - eprintln!("Usage: fhir2sql upsert . \n - must be one of observation, patient, specimen, or condition"); - return; - } - let type_arg = &args[2]; - // Call your upsert function with the type argument - upsert(type_arg); - */ - //@todo: add error handling and stuff - let type_arg = "Specimen"; - dotenv().ok(); - let blaze_base_url: String = dotenv::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"); - let host = dotenv::var("PG_HOST").expect("PG_HOST must be set"); - let port = dotenv::var("PG_PORT").expect("PG_PORT must be set"); - let username = dotenv::var("PG_USERNAME").expect("PG_USERNAME must be set"); - let password = dotenv::var("PG_PASSWORD").expect("PG_PASSWORD must be set"); //@todo: move out of .env - let dbname = dotenv::var("PG_DBNAME").expect("PG_DBNAME must be set"); - let batch_size = match dotenv::var("PG_BATCH_SIZE") { - Ok(val) => match val.parse::() { - Ok(num) => num, - Err(_) => { - eprintln!("PG_BATCH_SIZE must be a positive number. Using default value."); - 10000 // default value if parsing fails - } - }, - Err(_) => { - eprintln!("PG_BATCH_SIZE must be set. Using default value."); - 10000 // default value if env var is not set - } + //@todo: make use of clap crate? + let config = Config { + blaze_base_url: env::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"), + pg_host: env::var("PG_HOST").expect("PG_HOST must be set"), + pg_username: env::var("PG_USERNAME").expect("PG_USERNAME must be set"), + pg_password: env::var("PG_PASSWORD").expect("PG_PASSWORD must be set"), + pg_dbname: env::var("PG_DBNAME").expect("PG_DBNAME must be set"), + pg_port: 5432, + pg_batch_size: 10000, + blaze_page_resource_count: 5000, + blaze_num_connection_attempts: 20, + target_time: NaiveTime::from_hms_opt(3, 0, 0) + .ok_or_else(|| anyhow!("Invalid target time"))? }; - let page_resource_count = match dotenv::var("BLAZE_PAGE_RESOURCE_COUNT") { - Ok(val) => match val.parse::() { - Ok(num) => num, - Err(_) => { - eprintln!("BLAZE_PAGE_RESOURCE_COUNT not set. Using default value."); - 100 // default value if parsing failsf - } + + info!("🔥2🐘 fhir2sql started"); //@todo: replace with proper banner + + let pg_url = format!("postgresql://{}:{}@{}:{}/{}", + config.pg_username, + config.pg_password, + config.pg_host, + config.pg_port, + config.pg_dbname); + let pg_con_pool = get_pg_connection_pool(&pg_url, 10).await?; + + info!("Running initial sync"); + match run_sync(&pg_con_pool, &config).await { + Ok(()) => { + info!("Sync run successfull"); }, - Err(_) => { - eprintln!("BLAZE_PAGE_RESOURCE_COUNT not set. Using default value."); - 100 // default value if env var is not set + Err(err) => { + error!("Sync run unsuccessfull: {}", err); } - }; - - let con_str = &format!("host={} port={} user={} password={} dbname={}", host, port, username, password, dbname); - let (mut client, connection) = - tokio_postgres::connect(&con_str, NoTls).await?; + } - tokio::spawn(async move { - if let Err(e) = connection.await { - //eprintln!("connection error: {}", e); - error!("Could not connect to PostgreSQL: {e}"); - } - }); + // main loop + info!("Entering regular sync schedule"); + let mut interval = interval(Duration::from_secs(60)); // execute every 1 minute - sync(&blaze_base_url, &mut client, type_arg, batch_size, page_resource_count).await?; + loop { + select! { + _ = interval.tick() => { + let now = Utc::now().naive_local().time(); + if now.hour() == config.target_time.hour() && now.minute() == config.target_time.minute() { + info!("Syncing at target time"); + match run_sync(&pg_con_pool, &config).await { + Ok(()) => { + info!("Sync run successfull"); + }, + Err(err) => { + error!("Sync run unsuccessfull: {}", err); + } + } + } + } _ = graceful_shutdown::wait_for_signal() => { + break; + } + } + } + Ok(()) } diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..b2abe3e --- /dev/null +++ b/src/models.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize)] +pub struct Entry { + pub resource: serde_json::Value, +} + +#[derive(Deserialize, Serialize)] +pub struct Search { + pub mode: String, +} + +#[derive(Deserialize, Serialize)] +pub struct SearchSet { + pub id: String, + #[serde(rename = "type")] + pub type_: String, + pub entry: Option>, + pub link: Vec, + pub total: u32, + #[serde(rename = "resourceType")] + pub resource_type: String, +} + +#[derive(Deserialize, Serialize)] +pub struct Link { + pub relation: String, + pub url: String, +} + +pub struct ResourceVersion { + pub resource_id: String, + pub version_id: i64 +} + +pub struct PgVersion { + pub pk_id: i32, //pg row primary key. Needed for upsert + pub version_id: i64 //resource version_id +} + +pub struct PgInsertItem { + pub resource: String +} + +pub struct PgUpdateItem { + pub id: i32, + pub resource: String +} \ No newline at end of file diff --git a/src/schema.rs b/src/schema.rs deleted file mode 100644 index 7d348c1..0000000 --- a/src/schema.rs +++ /dev/null @@ -1,44 +0,0 @@ -// @generated automatically by Diesel CLI. - -diesel::table! { - conditions (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - observations (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - patients (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - specimen (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::allow_tables_to_appear_in_same_query!( - conditions, - observations, - patients, - specimen, -);