From 911dacbe1a2e01e5fb83ed21795160b404ced1a2 Mon Sep 17 00:00:00 2001 From: fernandodeluret <38841537+fernandodeluret@users.noreply.github.com> Date: Mon, 3 Feb 2025 09:16:30 -0300 Subject: [PATCH] Migrate to channel and worker pool for download notifier (#217) * `wip` concurrent download metadata notifier with channels * `chore` Clean debugging changes * `chore` remove duplicated config files and debug trace * Refactor to download metadata publisher * Time metric for metadata josn publish --------- Co-authored-by: Kyle Espinola --- grpc-ingest/src/config.rs | 76 +++++----- grpc-ingest/src/ingester.rs | 169 +++++++++++++++------- grpc-ingest/src/prom.rs | 12 ++ program_transformers/src/bubblegum/mod.rs | 3 +- 4 files changed, 172 insertions(+), 88 deletions(-) diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index d778d1c14..578e98d7c 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -230,6 +230,44 @@ pub struct ConfigIngester { pub snapshots: ConfigIngestStream, pub accounts: ConfigIngestStream, pub transactions: ConfigIngestStream, + #[serde(default = "ConfigIngester::default_download_metadata_publish")] + pub download_metadata_publish: ConfigDownloadMetadataPublish, +} + +impl ConfigIngester { + pub fn default_download_metadata_publish() -> ConfigDownloadMetadataPublish { + ConfigDownloadMetadataPublish::default() + } +} + +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConfigDownloadMetadataPublish { + #[serde( + default = "ConfigDownloadMetadataPublish::default_max_concurrency", + deserialize_with = "deserialize_usize_str" + )] + pub max_concurrency: usize, + #[serde(default = "ConfigDownloadMetadataPublish::default_stream_name")] + pub stream_name: String, + #[serde( + default = "ConfigDownloadMetadataPublish::default_stream_maxlen", + deserialize_with = "deserialize_usize_str" + )] + pub stream_maxlen: usize, +} + +impl ConfigDownloadMetadataPublish { + pub fn default_stream_name() -> String { + "METADATA_JSON".to_owned() + } + + pub const fn default_max_concurrency() -> usize { + 10 + } + + pub const fn default_stream_maxlen() -> usize { + 10_000_000 + } } #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] @@ -287,11 +325,6 @@ impl ConfigPostgres { #[derive(Debug, Clone, Default, Deserialize)] pub struct ConfigIngesterDownloadMetadata { pub stream: ConfigIngestStream, - #[serde( - default = "ConfigIngesterDownloadMetadata::default_num_threads", - deserialize_with = "deserialize_usize_str" - )] - pub _num_threads: usize, #[serde( default = "ConfigIngesterDownloadMetadata::default_max_attempts", deserialize_with = "deserialize_usize_str" @@ -303,23 +336,6 @@ pub struct ConfigIngesterDownloadMetadata { rename = "request_timeout_ms" )] pub request_timeout: Duration, - #[serde( - default = "ConfigIngesterDownloadMetadata::default_stream_maxlen", - deserialize_with = "deserialize_usize_str" - )] - pub stream_maxlen: usize, - #[serde( - default = "ConfigIngesterDownloadMetadata::default_stream_max_size", - deserialize_with = "deserialize_usize_str" - )] - pub _pipeline_max_size: usize, - #[serde( - default = "ConfigIngesterDownloadMetadata::default_pipeline_max_idle", - deserialize_with = "deserialize_duration_str", - rename = "pipeline_max_idle_ms" - )] - pub _pipeline_max_idle: Duration, - #[serde( default = "ConfigIngesterDownloadMetadata::default_retry_max_delay_ms", deserialize_with = "deserialize_usize_str" @@ -333,22 +349,6 @@ pub struct ConfigIngesterDownloadMetadata { } impl ConfigIngesterDownloadMetadata { - pub const fn default_num_threads() -> usize { - 2 - } - - pub const fn default_pipeline_max_idle() -> Duration { - Duration::from_millis(10) - } - - pub const fn default_stream_max_size() -> usize { - 10 - } - - pub const fn default_stream_maxlen() -> usize { - 10_000_000 - } - pub const fn default_max_attempts() -> usize { 3 } diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index c72c456d0..39b06cf02 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -1,60 +1,124 @@ use { crate::{ - config::{ConfigIngester, REDIS_STREAM_DATA_KEY}, + config::{ConfigDownloadMetadataPublish, ConfigIngester, REDIS_STREAM_DATA_KEY}, postgres::{create_pool as pg_create_pool, report_pgpool}, - prom::redis_xadd_status_inc, + prom::{download_metadata_publish_time, redis_xadd_status_inc}, redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle}, util::create_shutdown, }, das_core::{ - DownloadMetadata, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig, - DownloadMetadataNotifier, + create_download_metadata_notifier, DownloadMetadata, DownloadMetadataInfo, + DownloadMetadataJsonRetryConfig, }, - futures::{future::BoxFuture, stream::StreamExt}, + futures::stream::StreamExt, program_transformers::ProgramTransformer, redis::aio::MultiplexedConnection, std::sync::Arc, - tokio::time::{sleep, Duration}, + tokio::{ + sync::mpsc::{unbounded_channel, UnboundedSender}, + task::{JoinHandle, JoinSet}, + time::{sleep, Duration}, + }, tracing::warn, }; -fn download_metadata_notifier_v2( - connection: MultiplexedConnection, - stream: String, - stream_maxlen: usize, -) -> anyhow::Result { - Ok( - Box::new( - move |info: DownloadMetadataInfo| -> BoxFuture< - 'static, - Result<(), Box>, - > { +pub struct DownloadMetadataPublish { + handle: JoinHandle<()>, + sender: Option>, +} + +impl DownloadMetadataPublish { + pub fn new(handle: JoinHandle<()>, sender: UnboundedSender) -> Self { + Self { + handle, + sender: Some(sender), + } + } + + pub fn take_sender(&mut self) -> Option> { + self.sender.take() + } + + pub async fn stop(self) -> Result<(), tokio::task::JoinError> { + self.handle.await + } +} + +#[derive(Default)] +pub struct DownloadMetadataPublishBuilder { + config: Option, + connection: Option, +} + +impl DownloadMetadataPublishBuilder { + pub fn build() -> DownloadMetadataPublishBuilder { + DownloadMetadataPublishBuilder::default() + } + + pub fn config(mut self, config: ConfigDownloadMetadataPublish) -> Self { + self.config = Some(config); + self + } + + pub fn connection(mut self, connection: MultiplexedConnection) -> Self { + self.connection = Some(connection); + self + } + + pub fn start(self) -> DownloadMetadataPublish { + let config = self.config.expect("Config must be set"); + let connection = self.connection.expect("Connection must be set"); + + let (sender, mut rx) = unbounded_channel::(); + let stream = config.stream_name; + let stream_maxlen = config.stream_maxlen; + let worker_count = config.max_concurrency; + + let handle = tokio::spawn(async move { + let mut tasks = JoinSet::new(); + + while let Some(download_metadata_info) = rx.recv().await { + if tasks.len() >= worker_count { + tasks.join_next().await; + } + let mut connection = connection.clone(); let stream = stream.clone(); - Box::pin(async move { - - let info_bytes = serde_json::to_vec(&info)?; - - let xadd = redis::cmd("XADD") - .arg(&stream) - .arg("MAXLEN") - .arg("~") - .arg(stream_maxlen) - .arg("*") - .arg(REDIS_STREAM_DATA_KEY) - .arg(info_bytes) - .query_async::<_, redis::Value>(&mut connection) - .await; - - let status = xadd.map(|_| ()).map_err(|_| ()); - - redis_xadd_status_inc(&stream, "metadata_notifier",status, 1); - - Ok(()) - }) - }, - ), - ) + let start_time = tokio::time::Instant::now(); + + tasks.spawn(async move { + match serde_json::to_vec(&download_metadata_info) { + Ok(info_bytes) => { + let xadd = redis::cmd("XADD") + .arg(&stream) + .arg("MAXLEN") + .arg("~") + .arg(stream_maxlen) + .arg("*") + .arg(REDIS_STREAM_DATA_KEY) + .arg(info_bytes) + .query_async::<_, redis::Value>(&mut connection) + .await; + + let status = xadd.map(|_| ()).map_err(|_| ()); + + redis_xadd_status_inc(&stream, "metadata_json", status, 1); + let elapsed_time = start_time.elapsed().as_secs_f64(); + + download_metadata_publish_time(elapsed_time); + } + Err(_) => { + tracing::error!("download_metadata_info failed to bytes") + } + } + }); + } + + while tasks.join_next().await.is_some() {} + }); + + DownloadMetadataPublish::new(handle, sender) + } } pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { @@ -62,14 +126,18 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { let connection = redis_client.get_multiplexed_tokio_connection().await?; let pool = pg_create_pool(config.postgres).await?; - let download_metadata_stream = config.download_metadata.stream.clone(); - let download_metadata_stream_maxlen = config.download_metadata.stream_maxlen; + let mut download_metadata_publish = DownloadMetadataPublishBuilder::build() + .connection(connection.clone()) + .config(config.download_metadata_publish) + .start(); - let download_metadata_notifier = download_metadata_notifier_v2( - connection.clone(), - download_metadata_stream.name.clone(), - download_metadata_stream_maxlen, - )?; + let download_metadata_json_sender = download_metadata_publish + .take_sender() + .expect("Take ownership of sender"); + + let create_download_metadata_sender = download_metadata_json_sender.clone(); + let download_metadata_notifier = + create_download_metadata_notifier(create_download_metadata_sender).await; let program_transformer = Arc::new(ProgramTransformer::new( pool.clone(), @@ -111,7 +179,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { let snapshots = IngestStream::build() .config(config.snapshots) .connection(connection.clone()) - .handler(AccountHandle::new(Arc::clone(&program_transformer))) + .handler(AccountHandle::new(program_transformer)) .start() .await?; @@ -145,6 +213,9 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { .into_iter() .collect::>()?; + drop(download_metadata_json_sender); + download_metadata_publish.stop().await?; + report.abort(); pool.close().await; diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index b4ff567fe..486a1bb6a 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -102,6 +102,11 @@ lazy_static::lazy_static! { Opts::new("bubblegum_tree_corrupt_proofs", "Number of corrupt proofs in the bubblegum tree"), &["tree"] ).unwrap(); + + static ref DOWNLOAD_METADATA_PUBLISH_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("download_metadata_publish_time", "Time taken for publish download notification to redis"), + &[] + ).unwrap(); } pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { @@ -132,6 +137,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS); register!(BUBBLEGUM_TREE_CORRECT_PROOFS); register!(BUBBLEGUM_TREE_CORRUPT_PROOFS); + register!(DOWNLOAD_METADATA_PUBLISH_TIME); VERSION_INFO_METRIC .with_label_values(&[ @@ -267,6 +273,12 @@ pub fn download_metadata_json_task_status_count_inc(status: u16) { .inc(); } +pub fn download_metadata_publish_time(value: f64) { + DOWNLOAD_METADATA_PUBLISH_TIME + .with_label_values(&[]) + .observe(value); +} + #[derive(Debug, Clone, Copy)] pub enum ProgramTransformerTaskStatusKind { Success, diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs index 03a800858..122818635 100644 --- a/program_transformers/src/bubblegum/mod.rs +++ b/program_transformers/src/bubblegum/mod.rs @@ -71,7 +71,8 @@ where delegate::delegate(parsing_result, bundle, txn, ix_str).await?; } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? { + let mint = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await?; + if let Some(info) = mint { download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?;