Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #3

Merged
merged 16 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
.env
.vscode
Cargo.lock
.clj-kondo/
.lsp/
Dockerfile_test
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ dotenv = "0.15"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"]}
tokio-postgres = "0.7.10"
diesel = { version = "2.2.0", features = ["postgres", "r2d2"] }
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "full", "time", "signal"]}
sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres"] }
anyhow = "1.0.58"
chrono = "0.4"

# Logging
tracing = "0.1.37"
Expand Down
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
**FHIR2SQL**
=====================

A Rust-based application that synchronizes FHIR (Fast Healthcare Interoperability Resources) data with a PostgreSQL database.

**Overview**
------------

This application connects to a FHIR server, retrieves data, and syncs it with a PostgreSQL database. It uses the `sqlx` library for database interactions and `reqwest` for making HTTP requests to the FHIR server. The application is designed to run continuously, syncing data at regular intervals.

**Features**
------------

* Connects to a PostgreSQL database and creates necessary tables and triggers if they don't exist
* Retrieves FHIR data from a specified server and syncs it with the PostgreSQL database
* Supports regular syncing at a specified interval
* Handles errors and retries connections to the FHIR server and PostgreSQL database
* Supports graceful shutdown on SIGTERM and SIGINT signals

**Components**
--------------

* `main.rs`: The main application entry point
* `db_utils.rs`: Database utility functions for connecting to PostgreSQL and creating tables and triggers
* `models.rs`: Data models for FHIR resources and database interactions
* `graceful_shutdown.rs`: Functions for handling graceful shutdown on SIGTERM and SIGINT signals

**Getting Started**
-------------------

To use this application, you'll need to:

1. Install Rust and the required dependencies
2. Configure the application by setting environment variables for the FHIR server URL, PostgreSQL connection details, and syncing interval
3. Run the application using `cargo run`
6 changes: 0 additions & 6 deletions diesel.toml

This file was deleted.

Empty file removed migrations/.keep
Empty file.
6 changes: 0 additions & 6 deletions migrations/00000000000000_diesel_initial_setup/down.sql

This file was deleted.

36 changes: 0 additions & 36 deletions migrations/00000000000000_diesel_initial_setup/up.sql

This file was deleted.

4 changes: 0 additions & 4 deletions migrations/2024-06-18-075834_create_tables/down.sql

This file was deleted.

43 changes: 0 additions & 43 deletions migrations/2024-06-18-075834_create_tables/up.sql

This file was deleted.

144 changes: 144 additions & 0 deletions src/db_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use sqlx::{postgres::PgPoolOptions, PgPool, Row};
use tracing::{error, info};
use anyhow::anyhow;
use reqwest::Client;

pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result<PgPool, anyhow::Error> {
info!("Trying to establish a PostgreSQL connection pool");

let mut attempts = 0;
let mut err: Option<anyhow::Error> = None;

while attempts < num_attempts {
info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts);
match PgPoolOptions::new()
.max_connections(10)
.connect(&pg_url)
.await
{
Ok(pg_con_pool) => {
info!("PostgreSQL connection successfull");
return Ok(pg_con_pool)
},
Err(e) => {
error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e);
err = Some(anyhow!(e));
}
}
attempts += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else?
}
Err(err.unwrap_or_else(|| anyhow!("Failed to connect to PostgreSQL")))
}


pub async fn check_blaze_connection(blaze_base_url: &str, num_attempts: u32) -> Result<bool, anyhow::Error> {
info!("Attempting to connect to Blaze");

let mut attempts = 0;
let mut err: Option<anyhow::Error> = None;
let client = Client::new();

while attempts < num_attempts {
info!("Attempt to connect to Blaze {} of {}", attempts + 1, num_attempts);
match client.get(format!("{}/health", blaze_base_url)).send().await {
Ok(_) => {
info!("Blaze connection successfull");
return Ok(true)
},
Err(e) => {
error!("Failed to connect to Blaze. Attempt {} of {}: {}", attempts + 1, num_attempts, e);
err = Some(anyhow!(e));
}
}
attempts += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await; //@todo: move param somewhere else?
}
Err(err.unwrap_or_else(|| anyhow!("Failed to connect to Blaze")))

}
Comment on lines +6 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to consider using tryhard as we ended up doing in focus as well for the retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For focus the goal was to implement exponential backoff, right. That makes sense to me. In this case here a scheduled job is supposed to run that may take up considerable resources from the Bridgehead servers. For the time being I'd prefer to have a very easy to understand window of time when sync_blaze_2_pg is supposed to run.



//function that checks wheter a given list of required tables exist in pg
pub async fn pred_tables_exist(pg_con_pool: &PgPool, table_names: &Vec<&str>) -> Result<bool, anyhow::Error> {
info!("Checking whether PostgreSQL tables exist");

let table_query: &str = r#"select table_name from information_schema.tables;"#;

let rows = sqlx::query(table_query)
.fetch_all(pg_con_pool)
.await
.map_err(|err| {
error!("Failed to execute query: {}", err);
anyhow::Error::new(err)
})?;

let pg_table_names: Vec<String> = rows.into_iter().map(|row| row.get(0)).collect();
let all_tables_exist = table_names.iter().all(|table_name| pg_table_names.contains(&table_name.to_string()));


Ok(all_tables_exist)
}

//create necessary tables and triggers in pg if they don't exist yet
pub async fn create_tables(pg_con_pool: &PgPool) -> Result<(), anyhow::Error> {
info!("Creating PostgreSQL tables");

let create_tables_queries = vec![
"CREATE TABLE IF NOT EXISTS patient (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS specimen (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS condition (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE TABLE IF NOT EXISTS observation (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resource JSONB NOT NULL
)",
"CREATE OR REPLACE FUNCTION update_last_updated()
RETURNS TRIGGER AS $$
BEGIN
NEW.last_updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON patient
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON specimen
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON condition
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
"CREATE TRIGGER update_last_updated_trigger
BEFORE UPDATE ON observation
FOR EACH ROW
EXECUTE PROCEDURE update_last_updated();",
];

for query in create_tables_queries {
sqlx::query(query)
.execute(pg_con_pool)
.await?;
}

Ok(())
}
16 changes: 16 additions & 0 deletions src/graceful_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use tracing::info;

pub async fn wait_for_signal() {
use tokio::signal::unix::{signal,SignalKind};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only compile on unix now because of the import. I usually just use tokio::signal::ctrl_c which sadly does not work in docker for some reason so I set STOPSIGNAL sigint in the Dockerfile.

let mut sigterm = signal(SignalKind::terminate())
.expect("Unable to register shutdown handler");
let mut sigint = signal(SignalKind::interrupt())
.expect("Unable to register shutdown handler");
let signal = tokio::select! {
_ = sigterm.recv() => "SIGTERM",
_ = sigint.recv() => "SIGINT"
};
// The following does not print in docker-compose setups but it does when run individually.
// Probably a docker-compose error.
info!("Received signal ({signal}) - shutting down gracefully.");
}
Loading
Loading