From 10fbe41064cd67e2a9d3b627e92e4f48e68e6754 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Wed, 10 Jul 2024 15:59:29 +0200 Subject: [PATCH 01/15] switch to sqlx --- .gitignore | 7 + Cargo.toml | 30 ++++ Cross.toml | 5 + Dockerfile | 14 ++ deny.toml | 215 +++++++++++++++++++++++++++ src/db_utils.rs | 86 +++++++++++ src/main.rs | 382 ++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 739 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 Cross.toml create mode 100644 Dockerfile create mode 100644 deny.toml create mode 100644 src/db_utils.rs create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..048acb5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +/target +.env +.vscode +Cargo.lock +.clj-kondo/ +.lsp/ +Dockerfile_test diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a75cb1e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "fhir2sql" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +[workspace] +members = ["."] + +[dependencies] +dotenv = "0.15" +reqwest = { version = "0.11", features = ["json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]} +sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] } +diesel = { version = "2.2.0", features = ["postgres", "r2d2"] } +diesel_migrations = "2.2.0" +anyhow = "1.0.58" + +# Logging +tracing = "0.1.37" +tracing-subscriber = "0.3.17" + +[profile.release] +#opt-level = "z" # Optimize for size. +lto = true # Enable Link Time Optimization +codegen-units = 1 # Reduce number of codegen units to increase optimizations. +panic = "abort" # Abort on panic +strip = true # Automatically strip symbols from the binary. \ No newline at end of file diff --git a/Cross.toml b/Cross.toml new file mode 100644 index 0000000..e253e5b --- /dev/null +++ b/Cross.toml @@ -0,0 +1,5 @@ +[target.aarch64-unknown-linux-gnu] +pre-build = ["dpkg --add-architecture arm64 && apt-get update && apt-get install --assume-yes libssl-dev:arm64 && rm -rf /var/lib/apt/lists/*"] + +[target.x86_64-unknown-linux-gnu] +pre-build = ["dpkg --add-architecture amd64 && apt-get update && apt-get install --assume-yes libssl-dev:amd64 && rm -rf /var/lib/apt/lists/*"] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1815dc5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +# This Dockerfile is infused with magic to speedup the build. +# In particular, it requires built binaries to be present (see COPY directive). + +FROM alpine AS chmodder +ARG TARGETARCH +ARG COMPONENT +ARG FEATURE +COPY /artifacts/binaries-$TARGETARCH$FEATURE/$COMPONENT /app/$COMPONENT +RUN chmod +x /app/* + +FROM gcr.io/distroless/cc-debian12 +ARG COMPONENT +COPY --from=chmodder /app/$COMPONENT /usr/local/bin/samply +ENTRYPOINT [ "/usr/local/bin/fhir2sql" ] diff --git a/deny.toml b/deny.toml new file mode 100644 index 0000000..fbf4fae --- /dev/null +++ b/deny.toml @@ -0,0 +1,215 @@ +# This template contains all of the possible sections and their default values + +# Note that all fields that take a lint level have these possible values: +# * deny - An error will be produced and the check will fail +# * warn - A warning will be produced, but the check will not fail +# * allow - No warning or error will be produced, though in some cases a note +# will be + +# The values provided in this template are the default values that will be used +# when any section or field is not specified in your own configuration + +# If 1 or more target triples (and optionally, target_features) are specified, +# only the specified targets will be checked when running `cargo deny check`. +# This means, if a particular package is only ever used as a target specific +# dependency, such as, for example, the `nix` crate only being used via the +# `target_family = "unix"` configuration, that only having windows targets in +# this list would mean the nix crate, as well as any of its exclusive +# dependencies not shared by any other crates, would be ignored, as the target +# list here is effectively saying which targets you are building for. +targets = [ + # The triple can be any string, but only the target triples built in to + # rustc (as of 1.40) can be checked against actual config expressions + #{ triple = "x86_64-unknown-linux-musl" }, + # You can also specify which target_features you promise are enabled for a + # particular target. target_features are currently not validated against + # the actual valid features supported by the target architecture. + #{ triple = "wasm32-unknown-unknown", features = ["atomics"] }, +] + +# This section is considered when running `cargo deny check advisories` +# More documentation for the advisories section can be found here: +# https://embarkstudios.github.io/cargo-deny/checks/advisories/cfg.html +[advisories] +# The path where the advisory database is cloned/fetched into +db-path = "~/.cargo/advisory-db" +# The url(s) of the advisory databases to use +db-urls = ["https://github.com/rustsec/advisory-db"] +# The lint level for security vulnerabilities +vulnerability = "deny" +# The lint level for unmaintained crates +unmaintained = "warn" +# The lint level for crates that have been yanked from their source registry +yanked = "warn" +# The lint level for crates with security notices. Note that as of +# 2019-12-17 there are no security notice advisories in +# https://github.com/rustsec/advisory-db +notice = "warn" +# A list of advisory IDs to ignore. Note that ignored advisories will still +# output a note when they are encountered. +ignore = [ + "RUSTSEC-2020-0071", #used only by build-data as a build dependency +] +# Threshold for security vulnerabilities, any vulnerability with a CVSS score +# lower than the range specified will be ignored. Note that ignored advisories +# will still output a note when they are encountered. +# * None - CVSS Score 0.0 +# * Low - CVSS Score 0.1 - 3.9 +# * Medium - CVSS Score 4.0 - 6.9 +# * High - CVSS Score 7.0 - 8.9 +# * Critical - CVSS Score 9.0 - 10.0 +#severity-threshold = + +# If this is true, then cargo deny will use the git executable to fetch advisory database. +# If this is false, then it uses a built-in git library. +# Setting this to true can be helpful if you have special authentication requirements that cargo-deny does not support. +# See Git Authentication for more information about setting up git authentication. +#git-fetch-with-cli = true + +# This section is considered when running `cargo deny check licenses` +# More documentation for the licenses section can be found here: +# https://embarkstudios.github.io/cargo-deny/checks/licenses/cfg.html +[licenses] +# The lint level for crates which do not have a detectable license +unlicensed = "deny" +# List of explicitly allowed licenses +# See https://spdx.org/licenses/ for list of possible licenses +# [possible values: any SPDX 3.11 short identifier (+ optional exception)]. +allow = [ + "MIT", + "Apache-2.0", + "ISC", + "BSD-3-Clause", + "Unicode-DFS-2016", + "Unicode-3.0", + #"Apache-2.0 WITH LLVM-exception", +] +# List of explicitly disallowed licenses +# See https://spdx.org/licenses/ for list of possible licenses +# [possible values: any SPDX 3.11 short identifier (+ optional exception)]. +deny = [ + #"Nokia", +] +# Lint level for licenses considered copyleft +copyleft = "warn" +# Blanket approval or denial for OSI-approved or FSF Free/Libre licenses +# * both - The license will be approved if it is both OSI-approved *AND* FSF +# * either - The license will be approved if it is either OSI-approved *OR* FSF +# * osi-only - The license will be approved if is OSI-approved *AND NOT* FSF +# * fsf-only - The license will be approved if is FSF *AND NOT* OSI-approved +# * neither - This predicate is ignored and the default lint level is used +allow-osi-fsf-free = "neither" +# Lint level used when no other predicates are matched +# 1. License isn't in the allow or deny lists +# 2. License isn't copyleft +# 3. License isn't OSI/FSF, or allow-osi-fsf-free = "neither" +default = "deny" +# The confidence threshold for detecting a license from license text. +# The higher the value, the more closely the license text must be to the +# canonical license text of a valid SPDX license file. +# [possible values: any between 0.0 and 1.0]. +confidence-threshold = 0.8 +# Allow 1 or more licenses on a per-crate basis, so that particular licenses +# aren't accepted for every possible crate as with the normal allow list +exceptions = [ + # Each entry is the crate and version constraint, and its specific allow + # list + #{ allow = ["Zlib"], name = "adler32", version = "*" }, +] + +# Some crates don't have (easily) machine readable licensing information, +# adding a clarification entry for it allows you to manually specify the +# licensing information +#[[licenses.clarify]] +# The name of the crate the clarification applies to +#name = "ring" +# The optional version constraint for the crate +#version = "*" +# The SPDX expression for the license requirements of the crate +#expression = "MIT AND ISC AND OpenSSL" +# One or more files in the crate's source used as the "source of truth" for +# the license expression. If the contents match, the clarification will be used +# when running the license check, otherwise the clarification will be ignored +# and the crate will be checked normally, which may produce warnings or errors +# depending on the rest of your configuration +#license-files = [ + # Each entry is a crate relative path, and the (opaque) hash of its contents + #{ path = "LICENSE", hash = 0xbd0eed23 } +#] + +[licenses.private] +# If true, ignores workspace crates that aren't published, or are only +# published to private registries. +# To see how to mark a crate as unpublished (to the official registry), +# visit https://doc.rust-lang.org/cargo/reference/manifest.html#the-publish-field. +ignore = false +# One or more private registries that you might publish crates to, if a crate +# is only published to private registries, and ignore is true, the crate will +# not have its license(s) checked +registries = [ + #"https://sekretz.com/registry +] + +# This section is considered when running `cargo deny check bans`. +# More documentation about the 'bans' section can be found here: +# https://embarkstudios.github.io/cargo-deny/checks/bans/cfg.html +[bans] +# Lint level for when multiple versions of the same crate are detected +multiple-versions = "warn" +# Lint level for when a crate version requirement is `*` +wildcards = "allow" +# The graph highlighting used when creating dotgraphs for crates +# with multiple versions +# * lowest-version - The path to the lowest versioned duplicate is highlighted +# * simplest-path - The path to the version with the fewest edges is highlighted +# * all - Both lowest-version and simplest-path are used +highlight = "all" +# List of crates that are allowed. Use with care! +allow = [ + #{ name = "ansi_term", version = "=0.11.0" }, +] +# List of crates to deny +deny = [ + # Each entry the name of a crate and a version range. If version is + # not specified, all versions will be matched. + #{ name = "ansi_term", version = "=0.11.0" }, + # + # Wrapper crates can optionally be specified to allow the crate when it + # is a direct dependency of the otherwise banned crate + #{ name = "ansi_term", version = "=0.11.0", wrappers = [] }, +] +# Certain crates/versions that will be skipped when doing duplicate detection. +skip = [ + #{ name = "ansi_term", version = "=0.11.0" }, +] +# Similarly to `skip` allows you to skip certain crates during duplicate +# detection. Unlike skip, it also includes the entire tree of transitive +# dependencies starting at the specified crate, up to a certain depth, which is +# by default infinite +skip-tree = [ + #{ name = "ansi_term", version = "=0.11.0", depth = 20 }, +] + +# This section is considered when running `cargo deny check sources`. +# More documentation about the 'sources' section can be found here: +# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html +[sources] +# Lint level for what to happen when a crate from a crate registry that is not +# in the allow list is encountered +unknown-registry = "warn" +# Lint level for what to happen when a crate from a git repository that is not +# in the allow list is encountered +unknown-git = "warn" +# List of URLs for allowed crate registries. Defaults to the crates.io index +# if not specified. If it is specified but empty, no registries are allowed. +allow-registry = ["https://github.com/rust-lang/crates.io-index"] +# List of URLs for allowed Git repositories +allow-git = [] + +[sources.allow-org] +# 1 or more github.com organizations to allow git sources for +github = ["samply"] +# 1 or more gitlab.com organizations to allow git sources for +#gitlab = [""] +# 1 or more bitbucket.org organizations to allow git sources for +#bitbucket = [""] diff --git a/src/db_utils.rs b/src/db_utils.rs new file mode 100644 index 0000000..782b5d3 --- /dev/null +++ b/src/db_utils.rs @@ -0,0 +1,86 @@ +use sqlx::{PgPool, Row}; +use tracing::{error, info}; + + +pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result { + info!("Checking whether PostgreSQL tables exist"); + + let table_query: &str = r#"select table_name from information_schema.tables;"#; + + let rows = sqlx::query(table_query) + .fetch_all(pg_con_pool) + .await + .map_err(|err| { + error!("Failed to execute query: {}", err); + anyhow::Error::new(err) + })?; + + let pg_table_names: Vec = rows.into_iter().map(|row| row.get(0)).collect(); + let all_tables_exist = table_names.iter().all(|table_name| pg_table_names.contains(&table_name.to_string())); + + + Ok(all_tables_exist) +} + + +pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { + info!("Creating PostgreSQL tables"); + + let create_tables_queries = vec![ + "CREATE TABLE IF NOT EXISTS patients ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS specimen ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS conditions ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE TABLE IF NOT EXISTS observations ( + id SERIAL PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resource JSONB NOT NULL + )", + "CREATE OR REPLACE FUNCTION update_last_updated() + RETURNS TRIGGER AS $$ + BEGIN + NEW.last_updated_at = CURRENT_TIMESTAMP; + RETURN NEW; + END; + $$ LANGUAGE plpgsql;", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON patients + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON specimen + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON conditions + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + "CREATE TRIGGER update_last_updated_trigger + BEFORE UPDATE ON observations + FOR EACH ROW + EXECUTE PROCEDURE update_last_updated();", + ]; + + for query in create_tables_queries { + sqlx::query(query) + .execute(pg_con_pool) + .await?; + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..0159891 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,382 @@ +mod db_utils; +use db_utils::*; +use sqlx::{postgres::PgPoolOptions, PgPool}; + +use std::collections::BTreeMap; +use anyhow::bail; +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use serde_json; +use std::env; +use tracing::{info, warn, error}; +use tracing_subscriber; +use dotenv::dotenv; + + +#[derive(Deserialize, Serialize)] +struct Entry { + resource: serde_json::Value, +} + +#[derive(Deserialize, Serialize)] +struct Search { + mode: String, +} + +#[derive(Deserialize, Serialize)] +struct SearchSet { + id: String, + #[serde(rename = "type")] + type_: String, + entry: Option>, + link: Vec, + total: u32, + #[serde(rename = "resourceType")] + resource_type: String, +} + +#[derive(Deserialize, Serialize)] +struct Link { + relation: String, + url: String, +} + +pub struct ResourceVersion { + resource_id: String, + version_id: i64 +} + +pub struct BTreeMapValue { + pk_id: i32, //pg row primary key. Needed for upsert + version_id: i64 //resource version_id +} + +pub struct PgInsertItem { + resource: String +} + +pub struct PgUpdateItem { + id: i32, + resource: String +} + +// read id and version_id from a resource if possible +pub fn get_version(resource: serde_json::Value) -> Option { + let resource_id = resource.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()); + let version_id = resource.get("meta").and_then(|v| v.get("versionId")).and_then(|v| v.as_str()).map(|s| s.parse::().ok()).flatten(); + + if let (Some(resource_id), Some(version_id)) = (resource_id, version_id) { + Some(ResourceVersion { version_id, resource_id }) + } else { + None + } +} + +// get BTreeMap of all pairs in pg for the given resource type +pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { + let query = format!("SELECT id pk_id, resource::text FROM {}", table_name); + let rows: Vec<(i32, String)> = sqlx::query_as(&query) + .fetch_all(pg_con_pool) + .await?; + + let mut pg_versions = BTreeMap::new(); + + for row in rows { + let pk_id: i32 = row.0; + let resource_str: String = row.1; + let resource: serde_json::Value = match serde_json::from_str(&resource_str) { + Ok(resource) => resource, + Err(_) => continue + }; + let res_version = get_version(resource); + match res_version { + Some(res_version) => { + // only if resource_id is found something is inserted into resource_versions + pg_versions.insert(res_version.resource_id, BTreeMapValue {pk_id, version_id: res_version.version_id}); + } + None => continue + } + } + Ok(pg_versions) +} + +pub async fn update_helper(pg_con_pool: &PgPool, items: &[PgUpdateItem], table_name: &str) -> Result<(), sqlx::Error> { + + let values: Vec<_> = items + .into_iter() + .map(|item| format!("({}, $${}$$)", item.id, item.resource)) + .collect(); + + let query = format!( + "UPDATE {} SET resource = data.resource::jsonb FROM (VALUES {}) AS data(id, resource) WHERE data.id = {}.id", + table_name, + values.join(","), + table_name + ); + + sqlx::query(&query) + .execute(pg_con_pool) + .await?; + + Ok(()) +} + +pub async fn insert_helper(pg_con_pool: &PgPool, items: &[PgInsertItem], table_name: &str) -> Result<(), anyhow::Error> { + + let values: Vec<_> = items + .into_iter() + .map(|item| format!("($${}$$)", item.resource)) + .collect(); + + let query = format!( + "INSERT INTO {} (resource) values {};", + table_name, + values.join(",") + ); + + sqlx::query(&query) + .execute(pg_con_pool) + .await?; + Ok(()) +} + +pub async fn delete_helper(pg_con_pool: &PgPool, items: &[i32], table_name: &str) -> Result<(), anyhow::Error> { + + let values: Vec<_> = items + .into_iter() + .map(|item| format!("{}", item)) + .collect(); + + let query = format!( + "DELETE FROM {} where id in ({});", + table_name, + values.join(",") + ); + + sqlx::query(&query) + .execute(pg_con_pool) + .await?; + + Ok(()) +} + + +/// Synchronizes resources from Blaze to PostgreSQL +/// +/// pg_versions holds a BTreeMap that is used to look up resource version_ids currently in pg +/// resource-ids that are found in Blaze but not in pg: will be inserted +/// resource-ids that are found in Blaze with a different version_id than in pg: will be updated +/// resource-ids that are found in pg but not in blaze : will be deleted from pg +/// # Parameters +/// +/// * `base_url`: The base URL of the Blaze API. +/// * `pg_client`: A mutable reference to a PostgreSQL client. +/// * `type_arg`: The type of resource to synchronize (e.g. "specimen", "patient", etc.). +/// * `batch_size`: The number of resources to process in each batch. +/// * `page_resource_count`: The number of resources to fetch from Blaze in each page. +/// +/// # Returns +/// +/// A `Result` indicating whether the synchronization was successful. If an error occurs, it will be returned as an `anyhow::Error`. +async fn sync( + base_url: &str, + pg_con_pool: &PgPool, + type_arg: &str, + batch_size: u32, + page_resource_count: u32 + ) -> Result<(), anyhow::Error>{ + + // set pg table name to insert into or update + let table_name = match type_arg.to_lowercase().as_str() { + "specimen" => "specimen", + "patient" => "patients", + "condition" => "conditions", + "observation" => "observations", + _ => { + bail!("Invalid type_arg!"); + } + }; + + // vectors holding batches of items to insert/update + let mut update_batch: Vec = Vec::new(); + let mut insert_batch: Vec = Vec::new(); + + let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?; + + let client = Client::new(); + let mut url: String = format!("{}{}?_count={}&_history=current", base_url, type_arg, page_resource_count); + let mut update_counter: u32 = 0; + let mut insert_counter: u32 = 0; + info!("Attempting to sync: {}", &type_arg); + loop { + let res = client.get(&url).send().await; + match res { + Ok(res) => { + let res_text = match res.text().await { + Ok(text) => text, + Err(err) => { + error!("Failed to get payload: {}", err); + break; + } + }; + let search_set: Result = serde_json::from_str(&res_text); + match search_set { + Ok(search_set) => { + let entries = match search_set.entry { + Some(entries) => entries, + None => { + warn!("Could not read entries from search set."); + break; + } + }; + for e in entries { + + let blaze_version = get_version(e.resource.clone()); + let blaze_version = match blaze_version { + Some(v) => v, + None => { + warn!("Could not read resource version from Blaze search set entry."); + continue + } + }; + + let resource_str = match serde_json::to_string(&e.resource) { + Ok(s) => s, + Err(_) => { + "{}".to_string() + } + }; + + match pg_versions.get(&blaze_version.resource_id) { + Some(pg_version) => { //Resource is already in pg + if pg_version.version_id < blaze_version.version_id || + pg_version.version_id > blaze_version.version_id + { + //Resource exists in pg but is outdated. + //Add resource for batch update into pg + update_counter += 1; + update_batch.push(PgUpdateItem {id: pg_version.pk_id, resource: resource_str.clone()}); + //Remove all entries from pg_versions that can be found in Blaze + //=> remaining ones need to be deleted from pg + pg_versions.remove(&blaze_version.resource_id); + } + }, + None => { // current resource not (yet) in pg + //Add resource for batch insert into pg + insert_counter += 1; + insert_batch.push(PgInsertItem {resource: resource_str.clone()}); + } + } + + if update_counter > 0 && update_counter % batch_size == 0 { + update_helper(pg_con_pool, &update_batch, table_name).await?; + update_batch.clear(); + + } else if insert_counter > 0 && insert_counter % batch_size == 0 { + insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_batch.clear(); + + } + + } + // extract link to next page if exists or break + let next_url = search_set.link.iter() + .find(|link| link.relation == "next") + .map(|link| link.url.clone()); + + if let Some(next_url) = next_url { + url = next_url; + } else { + break; + } + }, + Err(err) => { + error!("Could not deserialize JSON to SearchSet: {}", err); + break; + } + } + }, + //no valid search response from Blaze + Err(err) => { + error!("Failed to get response from Blaze: {}", err); + break; + } + } + } + //insert or update the last remaining resources + if update_batch.len() > 0 { + update_helper(pg_con_pool, &update_batch, table_name).await?; + } + if insert_batch.len() > 0 { + insert_helper(pg_con_pool, &insert_batch, table_name).await?; + } + //remove rows from pg that were not encountered in blaze + let delete_ids: Vec = pg_versions.values().map(|value| value.pk_id).collect(); + if delete_ids.len() > 0 { + delete_helper(pg_con_pool, &delete_ids, table_name).await?; + } + Ok(()) +} + + +//@todo: add check whether to number of blaze resources exactly matches the number of resources in pg after inserts/updates/deletes +//@todo: add main loop that performs sync, sleeps and exits gracefully +//@todo: check blaze and pg connections and retry if connection can't be established +pub async fn main_loop(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ + let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; + + let page_resource_count = 1000; //the number of resources to return per page by blaze + let batch_size = 10000; //the number of resources to insert or update per batch in PostgreSQL + let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; + + let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; + if all_tables_exist { + info!("All tables found as expected"); + for type_arg in type_args { + sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + } + } else { + create_tables(pg_con_pool).await?; + for type_arg in type_args { + sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + } + } + Ok(()) +} + + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error>{ + + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .init(); + + dotenv().ok(); + + //@todo: make use of clap crate? + let blaze_base_url: String = env::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"); + let host_name = env::var("PG_HOST").expect("PG_HOST must be set"); + let user_name = env::var("PG_USERNAME").expect("PG_USERNAME must be set"); + let password = env::var("PG_PASSWORD").expect("PG_PASSWORD must be set"); + let db_name = env::var("PG_DBNAME").expect("PG_DBNAME must be set"); + let port: u16 = 5432; + + info!("fhir2sql started"); //@todo: replace with proper banner + + let pg_url = format!("postgresql://{}:{}@{}:{}/{}", user_name, password, host_name, port, db_name); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let pg_con_pool = PgPoolOptions::new() + .max_connections(10) + .connect(&pg_url) + .await + .map_err(|err| { + error!("Failed to connect to PostgreSQL: {}", err); + anyhow::Error::new(err) + })?; + + + main_loop(&pg_con_pool, &blaze_base_url).await?; + Ok(()) +} From 6094ac78d3e79aa984133f6694b77706f3497e89 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Wed, 10 Jul 2024 15:59:57 +0200 Subject: [PATCH 02/15] switch to sqlx --- .github/workflows/rust.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..5d91baa --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,24 @@ +name: Build with rust and docker + +on: + push: + workflow_dispatch: + pull_request: + schedule: + # Fetch new base image updates every night at 1am + - cron: '0 1 * * *' + +jobs: + build-with-samply: + uses: samply/github-workflows/.github/workflows/rust.yml@main + with: + image-prefix: "samply/" + components: '[ "fhir2sql" ]' + #architectures: '[ "amd64", "arm64" ]' + #profile: debug + test-via-script: false + #features: '[ "bbmri", "dktk", "" ]' + push-to: ${{ (github.ref_protected == true || github.event_name == 'workflow_dispatch') && 'dockerhub' || 'ghcr' }} + secrets: + DOCKERHUB_USERNAME: ${{ secrets.DOCKERHUB_USERNAME }} + DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} From f1324e3fc631a0c1f3a44a3f0bbddec1722344de Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Thu, 11 Jul 2024 08:59:22 +0200 Subject: [PATCH 03/15] fix cargo.toml --- Cargo.toml | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a3efa46..3342169 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,14 +13,7 @@ reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]} -<<<<<<< HEAD -tokio-postgres = "0.7.10" -diesel = { version = "2.2.0", features = ["postgres", "r2d2"] } -======= -sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] } -diesel = { version = "2.2.0", features = ["postgres", "r2d2"] } -diesel_migrations = "2.2.0" ->>>>>>> switch_to_sqlx +sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] } anyhow = "1.0.58" # Logging From 3bd9d6c178616cb469900e1c23c601e9b659a9f5 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Thu, 11 Jul 2024 12:47:28 +0200 Subject: [PATCH 04/15] retry connections --- src/db_utils.rs | 57 ++++++++++++++++++++++++++++++++++++++++++++++++- src/main.rs | 32 +++++++++++++-------------- 2 files changed, 71 insertions(+), 18 deletions(-) diff --git a/src/db_utils.rs b/src/db_utils.rs index 782b5d3..86dd2a5 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -1,6 +1,61 @@ -use sqlx::{PgPool, Row}; +use sqlx::{postgres::PgPoolOptions, PgPool, Row}; use tracing::{error, info}; +use anyhow::anyhow; +use reqwest::Client; +pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result { + info!("Trying to establish a PostgreSQL connection pool"); + + let mut attempts = 0; + let mut err: Option = None; + + while attempts < num_attempts { + info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts); + match PgPoolOptions::new() + .max_connections(10) + .connect(&pg_url) + .await + { + Ok(pg_con_pool) => { + info!("PostgreSQL connection successfull \u{2705}"); + return Ok(pg_con_pool) + }, + Err(e) => { + error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + err = Some(anyhow!(e)); + } + } + attempts += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else? + } + Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL"))) +} + +pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> Result { + info!("Attempting to connect to Blaze"); + + let mut attempts = 0; + let mut err: Option = None; + let client = Client::new(); + + while attempts < num_attempts { + info!("Attempt to connect to Blaze {} of {}", attempts + 1, num_attempts); + match client.get(format!("{}/health", blaze_base_url)).send().await { + Ok(_) => { + info!("Blaze connection successfull \u{2705}"); + return Ok(true) + }, + Err(e) => { + error!("Failed to connect to Blaze. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + err = Some(anyhow!(e)); + } + } + attempts += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else? + } + Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL"))) + +} pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result { info!("Checking whether PostgreSQL tables exist"); diff --git a/src/main.rs b/src/main.rs index 0159891..d700da2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,11 @@ mod db_utils; use db_utils::*; -use sqlx::{postgres::PgPoolOptions, PgPool}; +use tokio::prelude::*; +use tokio::time::{interval, Duration}; +use tokio::signal::unix::{signal, SignalKind}; +use ctrlc; +use sqlx::PgPool; use std::collections::BTreeMap; use anyhow::bail; use reqwest::Client; @@ -12,7 +16,6 @@ use tracing::{info, warn, error}; use tracing_subscriber; use dotenv::dotenv; - #[derive(Deserialize, Serialize)] struct Entry { resource: serde_json::Value, @@ -204,7 +207,8 @@ async fn sync( let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?; let client = Client::new(); - let mut url: String = format!("{}{}?_count={}&_history=current", base_url, type_arg, page_resource_count); + let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", + base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; let mut insert_counter: u32 = 0; info!("Attempting to sync: {}", &type_arg); @@ -321,7 +325,6 @@ async fn sync( //@todo: add check whether to number of blaze resources exactly matches the number of resources in pg after inserts/updates/deletes //@todo: add main loop that performs sync, sleeps and exits gracefully -//@todo: check blaze and pg connections and retry if connection can't be established pub async fn main_loop(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; @@ -329,13 +332,17 @@ pub async fn main_loop(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), let batch_size = 10000; //the number of resources to insert or update per batch in PostgreSQL let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; + //check preconditions for sync + //@todo: move param somewhere else? + let blaze_available = check_blaze_connection(blaze_base_url, 10).await?; let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; - if all_tables_exist { - info!("All tables found as expected"); + + if blaze_available && all_tables_exist { + info!("All tables found as expected \u{2705}"); for type_arg in type_args { sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; } - } else { + } else if blaze_available && !all_tables_exist { create_tables(pg_con_pool).await?; for type_arg in type_args { sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; @@ -365,17 +372,8 @@ async fn main() -> Result<(), anyhow::Error>{ info!("fhir2sql started"); //@todo: replace with proper banner let pg_url = format!("postgresql://{}:{}@{}:{}/{}", user_name, password, host_name, port, db_name); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - - let pg_con_pool = PgPoolOptions::new() - .max_connections(10) - .connect(&pg_url) - .await - .map_err(|err| { - error!("Failed to connect to PostgreSQL: {}", err); - anyhow::Error::new(err) - })?; + let pg_con_pool = get_pg_connection_pool(&pg_url, 10).await?; main_loop(&pg_con_pool, &blaze_base_url).await?; Ok(()) From 3052c30de6f2cd75a5282e101adbb2ee0ef128d9 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Thu, 11 Jul 2024 15:23:30 +0200 Subject: [PATCH 05/15] add main loop --- Cargo.toml | 3 +- src/db_utils.rs | 2 +- src/graceful_shutdown.rs | 16 ++++++++++ src/main.rs | 65 +++++++++++++++++++++++++++++++++------- 4 files changed, 73 insertions(+), 13 deletions(-) create mode 100644 src/graceful_shutdown.rs diff --git a/Cargo.toml b/Cargo.toml index 3342169..0692427 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,10 @@ dotenv = "0.15" reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]} +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "full", "time", "signal"]} sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] } anyhow = "1.0.58" +chrono = "0.4" # Logging tracing = "0.1.37" diff --git a/src/db_utils.rs b/src/db_utils.rs index 86dd2a5..dda372e 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -53,7 +53,7 @@ pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> attempts += 1; tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else? } - Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL"))) + Err(err.unwrap_or_else(|| anyhow!("Failed to connect to Blaze"))) } diff --git a/src/graceful_shutdown.rs b/src/graceful_shutdown.rs new file mode 100644 index 0000000..a416082 --- /dev/null +++ b/src/graceful_shutdown.rs @@ -0,0 +1,16 @@ +use tracing::info; + +pub async fn wait_for_signal() { + use tokio::signal::unix::{signal,SignalKind}; + let mut sigterm = signal(SignalKind::terminate()) + .expect("Unable to register shutdown handler"); + let mut sigint = signal(SignalKind::interrupt()) + .expect("Unable to register shutdown handler"); + let signal = tokio::select! { + _ = sigterm.recv() => "SIGTERM", + _ = sigint.recv() => "SIGINT" + }; + // The following does not print in docker-compose setups but it does when run individually. + // Probably a docker-compose error. + info!("Received signal ({signal}) - shutting down gracefully."); +} diff --git a/src/main.rs b/src/main.rs index d700da2..0dd46cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,9 @@ mod db_utils; -use db_utils::*; +mod graceful_shutdown; -use tokio::prelude::*; -use tokio::time::{interval, Duration}; -use tokio::signal::unix::{signal, SignalKind}; -use ctrlc; +use db_utils::*; +use tokio::select; +use tokio::time::{interval, Duration, sleep_until, Instant}; use sqlx::PgPool; use std::collections::BTreeMap; use anyhow::bail; @@ -12,9 +11,10 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json; use std::env; -use tracing::{info, warn, error}; +use tracing::{error, info, warn}; use tracing_subscriber; use dotenv::dotenv; +use chrono::{NaiveTime, Timelike, Utc}; #[derive(Deserialize, Serialize)] struct Entry { @@ -324,8 +324,7 @@ async fn sync( //@todo: add check whether to number of blaze resources exactly matches the number of resources in pg after inserts/updates/deletes -//@todo: add main loop that performs sync, sleeps and exits gracefully -pub async fn main_loop(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ +pub async fn run_sync(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; let page_resource_count = 1000; //the number of resources to return per page by blaze @@ -361,7 +360,7 @@ async fn main() -> Result<(), anyhow::Error>{ dotenv().ok(); - //@todo: make use of clap crate? + //@todo: make use of clap crate? let blaze_base_url: String = env::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"); let host_name = env::var("PG_HOST").expect("PG_HOST must be set"); let user_name = env::var("PG_USERNAME").expect("PG_USERNAME must be set"); @@ -372,9 +371,53 @@ async fn main() -> Result<(), anyhow::Error>{ info!("fhir2sql started"); //@todo: replace with proper banner let pg_url = format!("postgresql://{}:{}@{}:{}/{}", user_name, password, host_name, port, db_name); - let pg_con_pool = get_pg_connection_pool(&pg_url, 10).await?; + + info!("Running initial sync"); + match run_sync(&pg_con_pool, &blaze_base_url).await { + Ok(()) => { + info!("Sync run successfull"); + }, + Err(err) => { + error!("Sync run unsuccessfull: {}", err); + } + } + + // main loop + let mut interval = interval(Duration::from_secs(60)); // execute every 1 minute + //@todo: move target_time param somewhere else? + let target_time = match NaiveTime::from_hms_opt(3, 0, 0) { + Some(time) => time, + None => { + error!("Invalid target time"); + return Err(anyhow::Error::msg("Invalid target time")); + } + }; + + loop { + select! { + _ = interval.tick() => { + let now = Utc::now().naive_local().time(); + if now.hour() == target_time.hour() && now.minute() == target_time.minute() { + info!("Syncing at target time"); + match run_sync(&pg_con_pool, &blaze_base_url).await { + Ok(()) => { + info!("Sync run successfull"); + }, + Err(err) => { + error!("Sync run unsuccessfull: {}", err); + } + } + } + } _ = graceful_shutdown::wait_for_signal() => { + break; + } + _ = async { + let instant = Instant::now() + chrono::Duration::seconds(60).to_std().unwrap(); + sleep_until(instant).await; + } => {} + } + } - main_loop(&pg_con_pool, &blaze_base_url).await?; Ok(()) } From 84c608da3b9f2bd87bf2a183abfbe3c238ff4a40 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 10:32:15 +0200 Subject: [PATCH 06/15] add entry count comparison after sync --- src/db_utils.rs | 4 +-- src/main.rs | 82 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/src/db_utils.rs b/src/db_utils.rs index dda372e..0fa275e 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -17,7 +17,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

{ - info!("PostgreSQL connection successfull \u{2705}"); + info!("PostgreSQL connection successfull"); return Ok(pg_con_pool) }, Err(e) => { @@ -42,7 +42,7 @@ pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> info!("Attempt to connect to Blaze {} of {}", attempts + 1, num_attempts); match client.get(format!("{}/health", blaze_base_url)).send().await { Ok(_) => { - info!("Blaze connection successfull \u{2705}"); + info!("Blaze connection successfull"); return Ok(true) }, Err(e) => { diff --git a/src/main.rs b/src/main.rs index 0dd46cc..3f05949 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ mod graceful_shutdown; use db_utils::*; use tokio::select; use tokio::time::{interval, Duration, sleep_until, Instant}; -use sqlx::PgPool; +use sqlx::{PgPool, Row}; use std::collections::BTreeMap; use anyhow::bail; use reqwest::Client; @@ -163,6 +163,46 @@ pub async fn delete_helper(pg_con_pool: &PgPool, items: &[i32], table_name: &str Ok(()) } +// get the number of rows of a given pg table +pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result { + let query = format!("SELECT COUNT(*) FROM {}", table_name); + let row = sqlx::query(query.as_str()) + .fetch_one(pg_con_pool) + .await?; + let count: i64 = row.try_get(0)?; + Ok(count) +} + +//get the number of resources of a specific type from blaze +pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> Result { + let client = Client::new(); + let res = client.get(format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).send().await; + match res { + Ok(res) => { + let res_text = match res.text().await { + Ok(text) => text, + Err(err) => { + error!("Failed to get payload: {}", err); + "".to_string() + } + }; + let search_set: Result = serde_json::from_str(&res_text); + match search_set { + Ok(search_set) => { + Ok(search_set.total as i64) + } Err(err) => { + error!("Could not deserialize JSON to SearchSet: {}", err); + Err(anyhow::Error::new(err)) + } + } + }, + //no valid search response from Blaze + Err(err) => { + error!("Failed to get response from Blaze: {}", err); + Err(anyhow::Error::new(err)) + } + } +} /// Synchronizes resources from Blaze to PostgreSQL /// @@ -181,8 +221,8 @@ pub async fn delete_helper(pg_con_pool: &PgPool, items: &[i32], table_name: &str /// # Returns /// /// A `Result` indicating whether the synchronization was successful. If an error occurs, it will be returned as an `anyhow::Error`. -async fn sync( - base_url: &str, +async fn sync_blaze_2_pg( + blaze_base_url: &str, pg_con_pool: &PgPool, type_arg: &str, batch_size: u32, @@ -208,7 +248,7 @@ async fn sync( let client = Client::new(); let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", - base_url, type_arg, page_resource_count); + blaze_base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; let mut insert_counter: u32 = 0; info!("Attempting to sync: {}", &type_arg); @@ -263,6 +303,8 @@ async fn sync( //Remove all entries from pg_versions that can be found in Blaze //=> remaining ones need to be deleted from pg pg_versions.remove(&blaze_version.resource_id); + } else { //Blaze and pg versions match + pg_versions.remove(&blaze_version.resource_id); } }, None => { // current resource not (yet) in pg @@ -309,42 +351,57 @@ async fn sync( } //insert or update the last remaining resources if update_batch.len() > 0 { + update_counter += update_batch.len() as u32; update_helper(pg_con_pool, &update_batch, table_name).await?; } if insert_batch.len() > 0 { + insert_counter += insert_batch.len() as u32; insert_helper(pg_con_pool, &insert_batch, table_name).await?; } //remove rows from pg that were not encountered in blaze let delete_ids: Vec = pg_versions.values().map(|value| value.pk_id).collect(); - if delete_ids.len() > 0 { + if delete_ids.len() > 0 { delete_helper(pg_con_pool, &delete_ids, table_name).await?; - } + } + + info!("Updated {} rows", update_counter); + info!("Inserted {} rows", insert_counter); + info!("Deleted {} rows", delete_ids.len() as u32); + + //compare total entry counts between blaze and pg + let row_count = get_num_rows(pg_con_pool, table_name).await?; + let resource_count = get_blaze_resource_count(blaze_base_url, type_arg).await?; + if row_count != resource_count { + warn!("{} entry counts do not match between Blaze and PostgreSQL", type_arg); + } else { + info!("{} entry counts match between Blaze and PostgreSQL \u{2705}", type_arg); + } + Ok(()) } -//@todo: add check whether to number of blaze resources exactly matches the number of resources in pg after inserts/updates/deletes pub async fn run_sync(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; - let page_resource_count = 1000; //the number of resources to return per page by blaze + let page_resource_count = 5000; //the number of resources to return per page by blaze let batch_size = 10000; //the number of resources to insert or update per batch in PostgreSQL let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; //check preconditions for sync - //@todo: move param somewhere else? + //@todo: move num_attempts param somewhere else? let blaze_available = check_blaze_connection(blaze_base_url, 10).await?; let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; if blaze_available && all_tables_exist { - info!("All tables found as expected \u{2705}"); + info!("All tables found as expected"); for type_arg in type_args { - sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + sync_blaze_2_pg(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; } } else if blaze_available && !all_tables_exist { create_tables(pg_con_pool).await?; for type_arg in type_args { - sync(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + sync_blaze_2_pg(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; } } Ok(()) @@ -384,6 +441,7 @@ async fn main() -> Result<(), anyhow::Error>{ } // main loop + info!("Entering regular sync schedule"); let mut interval = interval(Duration::from_secs(60)); // execute every 1 minute //@todo: move target_time param somewhere else? let target_time = match NaiveTime::from_hms_opt(3, 0, 0) { From 45fced1877a3ed1251ecc07d83952666be2425a0 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 12:00:30 +0200 Subject: [PATCH 07/15] first cleanup --- .gitignore | 3 - src/db_utils.rs | 3 + src/main.rs | 246 +++++++++++++++++++----------------------------- src/models.rs | 48 ++++++++++ src/schema.rs | 44 --------- 5 files changed, 146 insertions(+), 198 deletions(-) create mode 100644 src/models.rs delete mode 100644 src/schema.rs diff --git a/.gitignore b/.gitignore index 20dac00..048acb5 100644 --- a/.gitignore +++ b/.gitignore @@ -2,9 +2,6 @@ .env .vscode Cargo.lock -<<<<<<< HEAD -======= .clj-kondo/ .lsp/ Dockerfile_test ->>>>>>> switch_to_sqlx diff --git a/src/db_utils.rs b/src/db_utils.rs index 0fa275e..4e669ff 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -31,6 +31,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

Result { info!("Attempting to connect to Blaze"); @@ -57,6 +58,8 @@ pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> } + +//function that checks wheter a given list of required tables exist in pg pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result { info!("Checking whether PostgreSQL tables exist"); diff --git a/src/main.rs b/src/main.rs index 3f05949..ee74926 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,16 @@ mod db_utils; mod graceful_shutdown; +mod models; use db_utils::*; +use models::*; + use tokio::select; use tokio::time::{interval, Duration, sleep_until, Instant}; use sqlx::{PgPool, Row}; use std::collections::BTreeMap; use anyhow::bail; use reqwest::Client; -use serde::{Deserialize, Serialize}; use serde_json; use std::env; use tracing::{error, info, warn}; @@ -16,52 +18,7 @@ use tracing_subscriber; use dotenv::dotenv; use chrono::{NaiveTime, Timelike, Utc}; -#[derive(Deserialize, Serialize)] -struct Entry { - resource: serde_json::Value, -} - -#[derive(Deserialize, Serialize)] -struct Search { - mode: String, -} - -#[derive(Deserialize, Serialize)] -struct SearchSet { - id: String, - #[serde(rename = "type")] - type_: String, - entry: Option>, - link: Vec, - total: u32, - #[serde(rename = "resourceType")] - resource_type: String, -} - -#[derive(Deserialize, Serialize)] -struct Link { - relation: String, - url: String, -} - -pub struct ResourceVersion { - resource_id: String, - version_id: i64 -} - -pub struct BTreeMapValue { - pk_id: i32, //pg row primary key. Needed for upsert - version_id: i64 //resource version_id -} - -pub struct PgInsertItem { - resource: String -} -pub struct PgUpdateItem { - id: i32, - resource: String -} // read id and version_id from a resource if possible pub fn get_version(resource: serde_json::Value) -> Option { @@ -103,6 +60,7 @@ pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Ok(pg_versions) } +//do a batch update of pg rows pub async fn update_helper(pg_con_pool: &PgPool, items: &[PgUpdateItem], table_name: &str) -> Result<(), sqlx::Error> { let values: Vec<_> = items @@ -124,6 +82,7 @@ pub async fn update_helper(pg_con_pool: &PgPool, items: &[PgUpdateItem], table_n Ok(()) } +//do a batch insert of pg rows pub async fn insert_helper(pg_con_pool: &PgPool, items: &[PgInsertItem], table_name: &str) -> Result<(), anyhow::Error> { let values: Vec<_> = items @@ -143,6 +102,7 @@ pub async fn insert_helper(pg_con_pool: &PgPool, items: &[PgInsertItem], table_n Ok(()) } +//do a batch delete of pg rows pub async fn delete_helper(pg_con_pool: &PgPool, items: &[i32], table_name: &str) -> Result<(), anyhow::Error> { let values: Vec<_> = items @@ -173,10 +133,9 @@ pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result Result { +pub async fn get_blaze_search_set(url: &str) -> Result { let client = Client::new(); - let res = client.get(format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).send().await; + let res = client.get(url).send().await; match res { Ok(res) => { let res_text = match res.text().await { @@ -189,7 +148,7 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R let search_set: Result = serde_json::from_str(&res_text); match search_set { Ok(search_set) => { - Ok(search_set.total as i64) + Ok(search_set) } Err(err) => { error!("Could not deserialize JSON to SearchSet: {}", err); Err(anyhow::Error::new(err)) @@ -204,6 +163,17 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R } } +//get the number of resources of a specific type from blaze +pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> Result { + + match get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).await { + Ok(search_set) => { + Ok(search_set.total as i64) + }, + Err(err) => Err(err) + } +} + /// Synchronizes resources from Blaze to PostgreSQL /// /// pg_versions holds a BTreeMap that is used to look up resource version_ids currently in pg @@ -222,13 +192,13 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R /// /// A `Result` indicating whether the synchronization was successful. If an error occurs, it will be returned as an `anyhow::Error`. async fn sync_blaze_2_pg( - blaze_base_url: &str, - pg_con_pool: &PgPool, - type_arg: &str, - batch_size: u32, - page_resource_count: u32 - ) -> Result<(), anyhow::Error>{ - + blaze_base_url: &str, + pg_con_pool: &PgPool, + type_arg: &str, + batch_size: u32, + page_resource_count: u32 +) -> Result<(), anyhow::Error>{ + // set pg table name to insert into or update let table_name = match type_arg.to_lowercase().as_str() { "specimen" => "specimen", @@ -245,108 +215,82 @@ async fn sync_blaze_2_pg( let mut insert_batch: Vec = Vec::new(); let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?; - - let client = Client::new(); + let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", blaze_base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; let mut insert_counter: u32 = 0; info!("Attempting to sync: {}", &type_arg); loop { - let res = client.get(&url).send().await; - match res { - Ok(res) => { - let res_text = match res.text().await { - Ok(text) => text, - Err(err) => { - error!("Failed to get payload: {}", err); - break; - } - }; - let search_set: Result = serde_json::from_str(&res_text); - match search_set { - Ok(search_set) => { - let entries = match search_set.entry { - Some(entries) => entries, - None => { - warn!("Could not read entries from search set."); - break; - } - }; - for e in entries { - - let blaze_version = get_version(e.resource.clone()); - let blaze_version = match blaze_version { - Some(v) => v, - None => { - warn!("Could not read resource version from Blaze search set entry."); - continue - } - }; - - let resource_str = match serde_json::to_string(&e.resource) { - Ok(s) => s, - Err(_) => { - "{}".to_string() - } - }; - - match pg_versions.get(&blaze_version.resource_id) { - Some(pg_version) => { //Resource is already in pg - if pg_version.version_id < blaze_version.version_id || - pg_version.version_id > blaze_version.version_id - { - //Resource exists in pg but is outdated. - //Add resource for batch update into pg - update_counter += 1; - update_batch.push(PgUpdateItem {id: pg_version.pk_id, resource: resource_str.clone()}); - //Remove all entries from pg_versions that can be found in Blaze - //=> remaining ones need to be deleted from pg - pg_versions.remove(&blaze_version.resource_id); - } else { //Blaze and pg versions match - pg_versions.remove(&blaze_version.resource_id); - } - }, - None => { // current resource not (yet) in pg - //Add resource for batch insert into pg - insert_counter += 1; - insert_batch.push(PgInsertItem {resource: resource_str.clone()}); - } - } - - if update_counter > 0 && update_counter % batch_size == 0 { - update_helper(pg_con_pool, &update_batch, table_name).await?; - update_batch.clear(); - - } else if insert_counter > 0 && insert_counter % batch_size == 0 { - insert_helper(pg_con_pool, &insert_batch, table_name).await?; - insert_batch.clear(); - - } - - } - // extract link to next page if exists or break - let next_url = search_set.link.iter() - .find(|link| link.relation == "next") - .map(|link| link.url.clone()); - - if let Some(next_url) = next_url { - url = next_url; - } else { - break; - } - }, - Err(err) => { - error!("Could not deserialize JSON to SearchSet: {}", err); - break; - } - } - }, - //no valid search response from Blaze - Err(err) => { - error!("Failed to get response from Blaze: {}", err); + let search_set = get_blaze_search_set(&url).await?; + let entries = match search_set.entry { + Some(entries) => entries, + None => { + warn!("Could not read entries from search set."); break; } + }; + for e in entries { + + let blaze_version = get_version(e.resource.clone()); + let blaze_version = match blaze_version { + Some(v) => v, + None => { + warn!("Could not read resource version from Blaze search set entry."); + continue + } + }; + + let resource_str = match serde_json::to_string(&e.resource) { + Ok(s) => s, + Err(_) => { + "{}".to_string() + } + }; + + match pg_versions.get(&blaze_version.resource_id) { + Some(pg_version) => { //Resource is already in pg + if pg_version.version_id < blaze_version.version_id || + pg_version.version_id > blaze_version.version_id + { + //Resource exists in pg but is outdated. + //Add resource for batch update into pg + update_counter += 1; + update_batch.push(PgUpdateItem {id: pg_version.pk_id, resource: resource_str.clone()}); + //Remove all entries from pg_versions that can be found in Blaze + //=> remaining ones need to be deleted from pg + pg_versions.remove(&blaze_version.resource_id); + } else { //Blaze and pg versions match + pg_versions.remove(&blaze_version.resource_id); + } + }, + None => { // current resource not (yet) in pg + //Add resource for batch insert into pg + insert_counter += 1; + insert_batch.push(PgInsertItem {resource: resource_str.clone()}); + } + } + + if update_counter > 0 && update_counter % batch_size == 0 { + update_helper(pg_con_pool, &update_batch, table_name).await?; + update_batch.clear(); + + } else if insert_counter > 0 && insert_counter % batch_size == 0 { + insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_batch.clear(); + + } + + } + // extract link to next page if exists or break + let next_url = search_set.link.iter() + .find(|link| link.relation == "next") + .map(|link| link.url.clone()); + + if let Some(next_url) = next_url { + url = next_url; + } else { + break; } } //insert or update the last remaining resources diff --git a/src/models.rs b/src/models.rs new file mode 100644 index 0000000..ff0d760 --- /dev/null +++ b/src/models.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize)] +pub struct Entry { + pub resource: serde_json::Value, +} + +#[derive(Deserialize, Serialize)] +pub struct Search { + pub mode: String, +} + +#[derive(Deserialize, Serialize)] +pub struct SearchSet { + pub id: String, + #[serde(rename = "type")] + pub type_: String, + pub entry: Option>, + pub link: Vec, + pub total: u32, + #[serde(rename = "resourceType")] + pub resource_type: String, +} + +#[derive(Deserialize, Serialize)] +pub struct Link { + pub relation: String, + pub url: String, +} + +pub struct ResourceVersion { + pub resource_id: String, + pub version_id: i64 +} + +pub struct BTreeMapValue { + pub pk_id: i32, //pg row primary key. Needed for upsert + pub version_id: i64 //resource version_id +} + +pub struct PgInsertItem { + pub resource: String +} + +pub struct PgUpdateItem { + pub id: i32, + pub resource: String +} \ No newline at end of file diff --git a/src/schema.rs b/src/schema.rs deleted file mode 100644 index 7d348c1..0000000 --- a/src/schema.rs +++ /dev/null @@ -1,44 +0,0 @@ -// @generated automatically by Diesel CLI. - -diesel::table! { - conditions (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - observations (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - patients (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::table! { - specimen (id) { - id -> Int4, - created_at -> Timestamp, - last_updated_at -> Timestamp, - resource -> Jsonb, - } -} - -diesel::allow_tables_to_appear_in_same_query!( - conditions, - observations, - patients, - specimen, -); From edf946c812160550db2864ec9d147065638d04ab Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 12:40:08 +0200 Subject: [PATCH 08/15] second cleanup --- src/db_utils.rs | 2 +- src/main.rs | 86 +++++++++++++++++++++++++++++++------------------ 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/db_utils.rs b/src/db_utils.rs index 4e669ff..257fdaa 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -80,7 +80,7 @@ pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Ok(all_tables_exist) } - +//create necessary tables and triggers in pg if they don't exist yet pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { info!("Creating PostgreSQL tables"); diff --git a/src/main.rs b/src/main.rs index ee74926..a9588b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,18 @@ use tracing_subscriber; use dotenv::dotenv; use chrono::{NaiveTime, Timelike, Utc}; - +pub struct Config { + blaze_base_url: String, + pg_host: String, + pg_username: String, + pg_password: String, + pg_dbname: String, + pg_port: u16, + pg_batch_size: u32, + blaze_page_resource_count: u32, + blaze_num_connection_attempts: u32, + target_time: NaiveTime, +} // read id and version_id from a resource if possible pub fn get_version(resource: serde_json::Value) -> Option { @@ -325,27 +336,35 @@ async fn sync_blaze_2_pg( } -pub async fn run_sync(pg_con_pool: &PgPool, blaze_base_url: &str) -> Result<(), anyhow::Error>{ +pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error>{ let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; - let page_resource_count = 5000; //the number of resources to return per page by blaze - let batch_size = 10000; //the number of resources to insert or update per batch in PostgreSQL let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; //check preconditions for sync //@todo: move num_attempts param somewhere else? - let blaze_available = check_blaze_connection(blaze_base_url, 10).await?; + let blaze_available = check_blaze_connection(&config.blaze_base_url, 10).await?; let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; if blaze_available && all_tables_exist { info!("All tables found as expected"); for type_arg in type_args { - sync_blaze_2_pg(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + sync_blaze_2_pg( + &config.blaze_base_url, + pg_con_pool, + type_arg, + config.pg_batch_size, + config.blaze_page_resource_count).await?; } } else if blaze_available && !all_tables_exist { create_tables(pg_con_pool).await?; for type_arg in type_args { - sync_blaze_2_pg(&blaze_base_url, pg_con_pool, type_arg, batch_size, page_resource_count).await?; + sync_blaze_2_pg( + &config.blaze_base_url, + pg_con_pool, + type_arg, + config.pg_batch_size, + config.blaze_page_resource_count).await?; } } Ok(()) @@ -362,20 +381,37 @@ async fn main() -> Result<(), anyhow::Error>{ dotenv().ok(); //@todo: make use of clap crate? - let blaze_base_url: String = env::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"); - let host_name = env::var("PG_HOST").expect("PG_HOST must be set"); - let user_name = env::var("PG_USERNAME").expect("PG_USERNAME must be set"); - let password = env::var("PG_PASSWORD").expect("PG_PASSWORD must be set"); - let db_name = env::var("PG_DBNAME").expect("PG_DBNAME must be set"); - let port: u16 = 5432; + let config = Config { + blaze_base_url: env::var("BLAZE_BASE_URL").expect("BLAZE_BASE_URL must be set"), + pg_host: env::var("PG_HOST").expect("PG_HOST must be set"), + pg_username: env::var("PG_USERNAME").expect("PG_USERNAME must be set"), + pg_password: env::var("PG_PASSWORD").expect("PG_PASSWORD must be set"), + pg_dbname: env::var("PG_DBNAME").expect("PG_DBNAME must be set"), + pg_port: 5432, + pg_batch_size: 10000, + blaze_page_resource_count: 5000, + blaze_num_connection_attempts: 20, + target_time: match NaiveTime::from_hms_opt(3, 0, 0) { + Some(time) => time, + None => { + error!("Invalid target time"); + return Err(anyhow::Error::msg("Invalid target time")); + } + } + }; info!("fhir2sql started"); //@todo: replace with proper banner - let pg_url = format!("postgresql://{}:{}@{}:{}/{}", user_name, password, host_name, port, db_name); + let pg_url = format!("postgresql://{}:{}@{}:{}/{}", + config.pg_username, + config.pg_password, + config.pg_host, + config.pg_port, + config.pg_dbname); let pg_con_pool = get_pg_connection_pool(&pg_url, 10).await?; info!("Running initial sync"); - match run_sync(&pg_con_pool, &blaze_base_url).await { + match run_sync(&pg_con_pool, &config).await { Ok(()) => { info!("Sync run successfull"); }, @@ -387,22 +423,14 @@ async fn main() -> Result<(), anyhow::Error>{ // main loop info!("Entering regular sync schedule"); let mut interval = interval(Duration::from_secs(60)); // execute every 1 minute - //@todo: move target_time param somewhere else? - let target_time = match NaiveTime::from_hms_opt(3, 0, 0) { - Some(time) => time, - None => { - error!("Invalid target time"); - return Err(anyhow::Error::msg("Invalid target time")); - } - }; - + loop { select! { - _ = interval.tick() => { + _ = interval.tick() => { let now = Utc::now().naive_local().time(); - if now.hour() == target_time.hour() && now.minute() == target_time.minute() { + if now.hour() == config.target_time.hour() && now.minute() == config.target_time.minute() { info!("Syncing at target time"); - match run_sync(&pg_con_pool, &blaze_base_url).await { + match run_sync(&pg_con_pool, &config).await { Ok(()) => { info!("Sync run successfull"); }, @@ -414,10 +442,6 @@ async fn main() -> Result<(), anyhow::Error>{ } _ = graceful_shutdown::wait_for_signal() => { break; } - _ = async { - let instant = Instant::now() + chrono::Duration::seconds(60).to_std().unwrap(); - sleep_until(instant).await; - } => {} } } From 0ac3ba688462524294ba49bf3e405ea48bb240f8 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 12:41:31 +0200 Subject: [PATCH 09/15] remove diesel.toml --- diesel.toml | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 diesel.toml diff --git a/diesel.toml b/diesel.toml deleted file mode 100644 index de53284..0000000 --- a/diesel.toml +++ /dev/null @@ -1,6 +0,0 @@ -# For documentation on how to configure this file, -# see https://diesel.rs/guides/configuring-diesel-cli - -[print_schema] -file = "src/schema.rs" -custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] From 3a9b4ff90232691f030379618d556a57eab7ff1b Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 12:42:49 +0200 Subject: [PATCH 10/15] remove diesel migrations --- migrations/.keep | 0 .../down.sql | 6 --- .../up.sql | 36 ---------------- .../2024-06-18-075834_create_tables/down.sql | 4 -- .../2024-06-18-075834_create_tables/up.sql | 43 ------------------- 5 files changed, 89 deletions(-) delete mode 100644 migrations/.keep delete mode 100644 migrations/00000000000000_diesel_initial_setup/down.sql delete mode 100644 migrations/00000000000000_diesel_initial_setup/up.sql delete mode 100644 migrations/2024-06-18-075834_create_tables/down.sql delete mode 100644 migrations/2024-06-18-075834_create_tables/up.sql diff --git a/migrations/.keep b/migrations/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/migrations/00000000000000_diesel_initial_setup/down.sql b/migrations/00000000000000_diesel_initial_setup/down.sql deleted file mode 100644 index a9f5260..0000000 --- a/migrations/00000000000000_diesel_initial_setup/down.sql +++ /dev/null @@ -1,6 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - -DROP FUNCTION IF EXISTS diesel_manage_updated_at(_tbl regclass); -DROP FUNCTION IF EXISTS diesel_set_updated_at(); diff --git a/migrations/00000000000000_diesel_initial_setup/up.sql b/migrations/00000000000000_diesel_initial_setup/up.sql deleted file mode 100644 index d68895b..0000000 --- a/migrations/00000000000000_diesel_initial_setup/up.sql +++ /dev/null @@ -1,36 +0,0 @@ --- This file was automatically created by Diesel to setup helper functions --- and other internal bookkeeping. This file is safe to edit, any future --- changes will be added to existing projects as new migrations. - - - - --- Sets up a trigger for the given table to automatically set a column called --- `updated_at` whenever the row is modified (unless `updated_at` was included --- in the modified columns) --- --- # Example --- --- ```sql --- CREATE TABLE users (id SERIAL PRIMARY KEY, updated_at TIMESTAMP NOT NULL DEFAULT NOW()); --- --- SELECT diesel_manage_updated_at('users'); --- ``` -CREATE OR REPLACE FUNCTION diesel_manage_updated_at(_tbl regclass) RETURNS VOID AS $$ -BEGIN - EXECUTE format('CREATE TRIGGER set_updated_at BEFORE UPDATE ON %s - FOR EACH ROW EXECUTE PROCEDURE diesel_set_updated_at()', _tbl); -END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION diesel_set_updated_at() RETURNS trigger AS $$ -BEGIN - IF ( - NEW IS DISTINCT FROM OLD AND - NEW.updated_at IS NOT DISTINCT FROM OLD.updated_at - ) THEN - NEW.updated_at := current_timestamp; - END IF; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; diff --git a/migrations/2024-06-18-075834_create_tables/down.sql b/migrations/2024-06-18-075834_create_tables/down.sql deleted file mode 100644 index 0029e40..0000000 --- a/migrations/2024-06-18-075834_create_tables/down.sql +++ /dev/null @@ -1,4 +0,0 @@ -drop table patients; -drop table specimen; -drop table conditions; -drop function update_last_updated(); diff --git a/migrations/2024-06-18-075834_create_tables/up.sql b/migrations/2024-06-18-075834_create_tables/up.sql deleted file mode 100644 index f960bca..0000000 --- a/migrations/2024-06-18-075834_create_tables/up.sql +++ /dev/null @@ -1,43 +0,0 @@ -CREATE TABLE patients ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE TABLE specimen ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE TABLE conditions ( - id SERIAL PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - entry JSONB NOT NULL -); - -CREATE OR REPLACE FUNCTION update_last_updated() -RETURNS TRIGGER AS $$ -BEGIN - NEW.last_updated_at = CURRENT_TIMESTAMP; - RETURN NEW; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON patients -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON specimen -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); - -CREATE TRIGGER update_last_updated_trigger -BEFORE UPDATE ON conditions -FOR EACH ROW -EXECUTE PROCEDURE update_last_updated(); \ No newline at end of file From a5c064f04648390cedf916c6ecd3e20e1c153f06 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 12:47:16 +0200 Subject: [PATCH 11/15] fix blaze_num_connection_attempts param --- src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index a9588b0..deb25d0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -341,9 +341,9 @@ pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyho let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; - //check preconditions for sync - //@todo: move num_attempts param somewhere else? - let blaze_available = check_blaze_connection(&config.blaze_base_url, 10).await?; + //check preconditions for sync + let blaze_available = check_blaze_connection(&config.blaze_base_url, + config.blaze_num_connection_attempts).await?; let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; if blaze_available && all_tables_exist { From af4cc6e6da23f780be172392ba117d5f2ef1ab37 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 18:04:10 +0200 Subject: [PATCH 12/15] add readme --- README.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..93f294d --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +Here is a concise and catchy README.md for your code base: + +**FHIR2SQL** +===================== + +A Rust-based application that synchronizes FHIR (Fast Healthcare Interoperability Resources) data with a PostgreSQL database. + +**Overview** +------------ + +This application connects to a FHIR server, retrieves data, and syncs it with a PostgreSQL database. It uses the `sqlx` library for database interactions and `reqwest` for making HTTP requests to the FHIR server. The application is designed to run continuously, syncing data at regular intervals. + +**Features** +------------ + +* Connects to a PostgreSQL database and creates necessary tables and triggers if they don't exist +* Retrieves FHIR data from a specified server and syncs it with the PostgreSQL database +* Supports regular syncing at a specified interval +* Handles errors and retries connections to the FHIR server and PostgreSQL database +* Supports graceful shutdown on SIGTERM and SIGINT signals + +**Components** +-------------- + +* `main.rs`: The main application entry point +* `db_utils.rs`: Database utility functions for connecting to PostgreSQL and creating tables and triggers +* `models.rs`: Data models for FHIR resources and database interactions +* `graceful_shutdown.rs`: Functions for handling graceful shutdown on SIGTERM and SIGINT signals + +**Getting Started** +------------------- + +To use this application, you'll need to: + +1. Install Rust and the required dependencies +2. Configure the application by setting environment variables for the FHIR server URL, PostgreSQL connection details, and syncing interval +3. Run the application using `cargo run` \ No newline at end of file From 846819a5169e8a576cbf7dfece873b9313166050 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 12 Jul 2024 18:06:42 +0200 Subject: [PATCH 13/15] fix readme --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index 93f294d..3361f60 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,3 @@ -Here is a concise and catchy README.md for your code base: - **FHIR2SQL** ===================== From e3ecf4331b2d60d19eb974a64c62d7526f83f099 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Thu, 25 Jul 2024 15:37:52 +0200 Subject: [PATCH 14/15] changes according to review --- src/db_utils.rs | 12 ++--- src/main.rs | 122 ++++++++++++++++-------------------------------- src/models.rs | 2 +- 3 files changed, 47 insertions(+), 89 deletions(-) diff --git a/src/db_utils.rs b/src/db_utils.rs index 257fdaa..3b8bd58 100644 --- a/src/db_utils.rs +++ b/src/db_utils.rs @@ -85,7 +85,7 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { info!("Creating PostgreSQL tables"); let create_tables_queries = vec![ - "CREATE TABLE IF NOT EXISTS patients ( + "CREATE TABLE IF NOT EXISTS patient ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -97,13 +97,13 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, resource JSONB NOT NULL )", - "CREATE TABLE IF NOT EXISTS conditions ( + "CREATE TABLE IF NOT EXISTS condition ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, resource JSONB NOT NULL )", - "CREATE TABLE IF NOT EXISTS observations ( + "CREATE TABLE IF NOT EXISTS observation ( id SERIAL PRIMARY KEY, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -117,7 +117,7 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { END; $$ LANGUAGE plpgsql;", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON patients + BEFORE UPDATE ON patient FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger @@ -125,11 +125,11 @@ pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> { FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON conditions + BEFORE UPDATE ON condition FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", "CREATE TRIGGER update_last_updated_trigger - BEFORE UPDATE ON observations + BEFORE UPDATE ON observation FOR EACH ROW EXECUTE PROCEDURE update_last_updated();", ]; diff --git a/src/main.rs b/src/main.rs index deb25d0..419f9b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,10 +6,11 @@ use db_utils::*; use models::*; use tokio::select; -use tokio::time::{interval, Duration, sleep_until, Instant}; +use tokio::time::{interval, Duration}; use sqlx::{PgPool, Row}; use std::collections::BTreeMap; use anyhow::bail; +use anyhow::anyhow; use reqwest::Client; use serde_json; use std::env; @@ -43,8 +44,8 @@ pub fn get_version(resource: serde_json::Value) -> Option { } } -// get BTreeMap of all pairs in pg for the given resource type -pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { +// get BTreeMap of all pairs in pg for the given resource type +pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> Result, anyhow::Error> { let query = format!("SELECT id pk_id, resource::text FROM {}", table_name); let rows: Vec<(i32, String)> = sqlx::query_as(&query) .fetch_all(pg_con_pool) @@ -58,12 +59,13 @@ pub async fn get_pg_resource_versions(table_name: &str, pg_con_pool: &PgPool) -> let resource: serde_json::Value = match serde_json::from_str(&resource_str) { Ok(resource) => resource, Err(_) => continue - }; + }; + let res_version = get_version(resource); match res_version { Some(res_version) => { // only if resource_id is found something is inserted into resource_versions - pg_versions.insert(res_version.resource_id, BTreeMapValue {pk_id, version_id: res_version.version_id}); + pg_versions.insert(res_version.resource_id, PgVersion {pk_id, version_id: res_version.version_id}); } None => continue } @@ -144,45 +146,13 @@ pub async fn get_num_rows(pg_con_pool: &PgPool, table_name: &str) -> Result Result { - let client = Client::new(); +pub async fn get_blaze_search_set(url: &str, client: &Client) -> Result { let res = client.get(url).send().await; - match res { - Ok(res) => { - let res_text = match res.text().await { - Ok(text) => text, - Err(err) => { - error!("Failed to get payload: {}", err); - "".to_string() - } - }; - let search_set: Result = serde_json::from_str(&res_text); - match search_set { - Ok(search_set) => { - Ok(search_set) - } Err(err) => { - error!("Could not deserialize JSON to SearchSet: {}", err); - Err(anyhow::Error::new(err)) - } - } - }, - //no valid search response from Blaze - Err(err) => { - error!("Failed to get response from Blaze: {}", err); - Err(anyhow::Error::new(err)) - } - } -} - -//get the number of resources of a specific type from blaze -pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> Result { - - match get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg)).await { - Ok(search_set) => { - Ok(search_set.total as i64) - }, - Err(err) => Err(err) - } + let res = res.inspect_err(|e| error!("Failed to get response from Blaze: {}", e))?; + let res_text = res.text().await.inspect_err(|e| error!("Failed to get payload: {}", e))?; + let search_set: SearchSet = serde_json::from_str(&res_text) + .inspect_err(|e| error!("Could not deserialize JSON to SearchSet: {}", e))?; + Ok(search_set) } /// Synchronizes resources from Blaze to PostgreSQL @@ -193,8 +163,8 @@ pub async fn get_blaze_resource_count(blaze_base_url: &str, type_arg: &str) -> R /// resource-ids that are found in pg but not in blaze : will be deleted from pg /// # Parameters /// -/// * `base_url`: The base URL of the Blaze API. -/// * `pg_client`: A mutable reference to a PostgreSQL client. +/// * `blaze_base_url`: The base URL of the Blaze API. +/// * `pg_con_pool`: A PostgreSQL connection pool. /// * `type_arg`: The type of resource to synchronize (e.g. "specimen", "patient", etc.). /// * `batch_size`: The number of resources to process in each batch. /// * `page_resource_count`: The number of resources to fetch from Blaze in each page. @@ -211,29 +181,22 @@ async fn sync_blaze_2_pg( ) -> Result<(), anyhow::Error>{ // set pg table name to insert into or update - let table_name = match type_arg.to_lowercase().as_str() { - "specimen" => "specimen", - "patient" => "patients", - "condition" => "conditions", - "observation" => "observations", - _ => { - bail!("Invalid type_arg!"); - } - }; + let table_name = type_arg.to_lowercase(); + let client = Client::new(); // vectors holding batches of items to insert/update let mut update_batch: Vec = Vec::new(); let mut insert_batch: Vec = Vec::new(); - let mut pg_versions = get_pg_resource_versions(table_name, pg_con_pool).await?; + let mut pg_versions = get_pg_resource_versions(&table_name, pg_con_pool).await?; let mut url: String = format!("{}/fhir/{}?_count={}&_history=current", blaze_base_url, type_arg, page_resource_count); let mut update_counter: u32 = 0; let mut insert_counter: u32 = 0; - info!("Attempting to sync: {}", &type_arg); + info!("Attempting to sync: {}", type_arg); loop { - let search_set = get_blaze_search_set(&url).await?; + let search_set = get_blaze_search_set(&url, &client).await?; let entries = match search_set.entry { Some(entries) => entries, None => { @@ -251,13 +214,8 @@ async fn sync_blaze_2_pg( continue } }; - - let resource_str = match serde_json::to_string(&e.resource) { - Ok(s) => s, - Err(_) => { - "{}".to_string() - } - }; + + let resource_str = serde_json::to_string(&e.resource).unwrap_or("{}".to_string()); match pg_versions.get(&blaze_version.resource_id) { Some(pg_version) => { //Resource is already in pg @@ -283,13 +241,12 @@ async fn sync_blaze_2_pg( } if update_counter > 0 && update_counter % batch_size == 0 { - update_helper(pg_con_pool, &update_batch, table_name).await?; + update_helper(pg_con_pool, &update_batch, &table_name).await?; update_batch.clear(); } else if insert_counter > 0 && insert_counter % batch_size == 0 { - insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; insert_batch.clear(); - } } @@ -307,16 +264,16 @@ async fn sync_blaze_2_pg( //insert or update the last remaining resources if update_batch.len() > 0 { update_counter += update_batch.len() as u32; - update_helper(pg_con_pool, &update_batch, table_name).await?; + update_helper(pg_con_pool, &update_batch, &table_name).await?; } if insert_batch.len() > 0 { insert_counter += insert_batch.len() as u32; - insert_helper(pg_con_pool, &insert_batch, table_name).await?; + insert_helper(pg_con_pool, &insert_batch, &table_name).await?; } //remove rows from pg that were not encountered in blaze let delete_ids: Vec = pg_versions.values().map(|value| value.pk_id).collect(); if delete_ids.len() > 0 { - delete_helper(pg_con_pool, &delete_ids, table_name).await?; + delete_helper(pg_con_pool, &delete_ids, &table_name).await?; } info!("Updated {} rows", update_counter); @@ -324,8 +281,10 @@ async fn sync_blaze_2_pg( info!("Deleted {} rows", delete_ids.len() as u32); //compare total entry counts between blaze and pg - let row_count = get_num_rows(pg_con_pool, table_name).await?; - let resource_count = get_blaze_resource_count(blaze_base_url, type_arg).await?; + let row_count = get_num_rows(pg_con_pool, &table_name).await?; + let resource_count = get_blaze_search_set(&format!("{}/fhir/{}?_count=0", blaze_base_url, type_arg), &client) + .await + .map(|search_set| search_set.total as i64)?; if row_count != resource_count { warn!("{} entry counts do not match between Blaze and PostgreSQL", type_arg); } else { @@ -336,14 +295,18 @@ async fn sync_blaze_2_pg( } -pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error>{ +pub async fn run_sync(pg_con_pool: &PgPool, config: &Config) -> Result<(), anyhow::Error> { let type_args: Vec<&str> = vec!["Specimen", "Patient", "Observation", "Condition"]; - - let table_names: Vec<&str> = vec!["specimen", "patients", "observations", "conditions"]; + let table_names: Vec<&str> = vec!["specimen", "patient", "observation", "condition"]; //check preconditions for sync let blaze_available = check_blaze_connection(&config.blaze_base_url, config.blaze_num_connection_attempts).await?; + + if !blaze_available { + bail!("Aborting sync run because connection to Blaze could not be established"); + } + let all_tables_exist = pred_tables_exist(pg_con_pool, &table_names).await?; if blaze_available && all_tables_exist { @@ -391,13 +354,8 @@ async fn main() -> Result<(), anyhow::Error>{ pg_batch_size: 10000, blaze_page_resource_count: 5000, blaze_num_connection_attempts: 20, - target_time: match NaiveTime::from_hms_opt(3, 0, 0) { - Some(time) => time, - None => { - error!("Invalid target time"); - return Err(anyhow::Error::msg("Invalid target time")); - } - } + target_time: NaiveTime::from_hms_opt(3, 0, 0) + .ok_or_else(|| anyhow!("Invalid target time"))? }; info!("fhir2sql started"); //@todo: replace with proper banner diff --git a/src/models.rs b/src/models.rs index ff0d760..b2abe3e 100644 --- a/src/models.rs +++ b/src/models.rs @@ -33,7 +33,7 @@ pub struct ResourceVersion { pub version_id: i64 } -pub struct BTreeMapValue { +pub struct PgVersion { pub pk_id: i32, //pg row primary key. Needed for upsert pub version_id: i64 //resource version_id } From d2730262f815b7f2d8e0e14ddec81801a059c654 Mon Sep 17 00:00:00 2001 From: davidmscholz Date: Fri, 26 Jul 2024 12:06:13 +0200 Subject: [PATCH 15/15] better error handling --- src/main.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main.rs b/src/main.rs index 419f9b7..1bd2ed5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -204,7 +204,15 @@ async fn sync_blaze_2_pg( break; } }; - for e in entries { + for e in entries { + + let resource_str = match serde_json::to_string(&e.resource) { + Ok(resource) => resource, + Err(err) => { + warn!("Could not read resource from search set entry: {}", err); + continue + } + }; let blaze_version = get_version(e.resource.clone()); let blaze_version = match blaze_version { @@ -215,8 +223,6 @@ async fn sync_blaze_2_pg( } }; - let resource_str = serde_json::to_string(&e.resource).unwrap_or("{}".to_string()); - match pg_versions.get(&blaze_version.resource_id) { Some(pg_version) => { //Resource is already in pg if pg_version.version_id < blaze_version.version_id || @@ -358,7 +364,7 @@ async fn main() -> Result<(), anyhow::Error>{ .ok_or_else(|| anyhow!("Invalid target time"))? }; - info!("fhir2sql started"); //@todo: replace with proper banner + info!("🔥2🐘 fhir2sql started"); //@todo: replace with proper banner let pg_url = format!("postgresql://{}:{}@{}:{}/{}", config.pg_username,