Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification ttl #227

Merged
merged 12 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Concordium AG developers@concordium.com"]
edition = "2021"
name = "notification-server"
version = "0.3.4"
version = "0.3.5"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -12,6 +12,7 @@ axum = "0.7"
axum-prometheus = "0.7"
backoff = { version = "0.4", features = ["tokio"] }
bytes = "1.6"
chrono = "0.4.38"
clap = { version = "4.5", features = ["derive", "env"] }
concordium-rust-sdk = { version = "*", path = "../deps/concordium-rust-sdk" }
deadpool-postgres = "0.14"
Expand Down
49 changes: 47 additions & 2 deletions notification-server/src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use axum_prometheus::{
metrics_exporter_prometheus::PrometheusBuilder,
};
use backoff::{future::retry, ExponentialBackoff};
use chrono::Utc;
use clap::Parser;
use concordium_rust_sdk::{
types::AbsoluteBlockHeight,
Expand Down Expand Up @@ -98,6 +99,14 @@ struct Args {
env = "NOTIFICATION_SERVER_PROMETHEUS_ADDRESS"
)]
listen_address: Option<std::net::SocketAddr>,
#[arg(
long = "notification-ttl-mins",
default_value_t = 60,
help = "This variable defines the maximum allowable time (in minutes) after which a \
notification is no longer being emitted.",
env = "NOTIFICATION_SERVER_NOTIFICATION_TTL_MIN"
)]
notification_ttl_min: u64,
}

const DATABASE_RETRY_DELAY: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -346,6 +355,37 @@ async fn traverse_chain(
processed_height
}

/// Retrieves the block height from the earliest block which is no earlier than
/// the current time minus the notification TTL. Compares this height with the
/// current block height processed and returns the max function of those two
/// values. If an error occurs while obtaining this value it returns an error.
async fn max_block_height_of_current_and_time_based_lower_bound(
concordium_client: &mut Client,
current_height: AbsoluteBlockHeight,
lower_bound_block_time: Duration,
) -> anyhow::Result<AbsoluteBlockHeight> {
let current_block_height = concordium_client
.get_consensus_info()
.await?
.last_finalized_block_height;
let lower_bound_block_height = AbsoluteBlockHeight {
// We are not faster than 2 sec per block, hence this should be conservative enough
height: current_block_height.height - lower_bound_block_time.as_secs(),
};
let lower_bound_time: chrono::DateTime<Utc> = Utc::now() - lower_bound_block_time;
let time_ago_block = concordium_client
lassemand marked this conversation as resolved.
Show resolved Hide resolved
.find_first_finalized_block_no_earlier_than(lower_bound_block_height.., lower_bound_time)
.await?;

if time_ago_block.block_height > current_height {
lassemand marked this conversation as resolved.
Show resolved Hide resolved
let blocks_skipped_count = time_ago_block.block_height.height - current_height.height;
info!("Skipping {} blocks", blocks_skipped_count);
counter!("block.process_skipped").increment(blocks_skipped_count);
return Ok(time_ago_block.block_height);
}
Ok(current_height)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
Expand Down Expand Up @@ -409,12 +449,17 @@ async fn main() -> anyhow::Result<()> {
let gcloud = GoogleCloud::new(http_client, retry_policy, service_account, &project_id);
let database_connection = DatabaseConnection::create(args.db_connection).await?;
let mut concordium_client = Client::new(endpoint).await?;
let mut height = if let Some(height) = database_connection
let mut height = if let Some(current_height) = database_connection
.get_processed_block_height()
.await
.context("Failed to get processed block height")?
{
height
max_block_height_of_current_and_time_based_lower_bound(
&mut concordium_client,
current_height,
Duration::from_secs(args.notification_ttl_min * 60),
)
.await?
} else {
concordium_client
.get_consensus_info()
Expand Down