Skip to content

Commit

Permalink
Migrate to channel and worker pool for download notifier (#217)
Browse files Browse the repository at this point in the history
* `wip` concurrent download metadata notifier with channels

* `chore` Clean debugging changes

* `chore` remove duplicated config files and debug trace

* Refactor to download metadata publisher

* Time metric for metadata josn publish

---------

Co-authored-by: Kyle Espinola <kyle.s.espinola@gmail.com>
  • Loading branch information
2 people authored and Nagaprasadvr committed Feb 4, 2025
1 parent 5aa04bd commit 911dacb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 88 deletions.
76 changes: 38 additions & 38 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,44 @@ pub struct ConfigIngester {
pub snapshots: ConfigIngestStream,
pub accounts: ConfigIngestStream,
pub transactions: ConfigIngestStream,
#[serde(default = "ConfigIngester::default_download_metadata_publish")]
pub download_metadata_publish: ConfigDownloadMetadataPublish,
}

impl ConfigIngester {
pub fn default_download_metadata_publish() -> ConfigDownloadMetadataPublish {
ConfigDownloadMetadataPublish::default()
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigDownloadMetadataPublish {
#[serde(
default = "ConfigDownloadMetadataPublish::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
#[serde(default = "ConfigDownloadMetadataPublish::default_stream_name")]
pub stream_name: String,
#[serde(
default = "ConfigDownloadMetadataPublish::default_stream_maxlen",
deserialize_with = "deserialize_usize_str"
)]
pub stream_maxlen: usize,
}

impl ConfigDownloadMetadataPublish {
pub fn default_stream_name() -> String {
"METADATA_JSON".to_owned()
}

pub const fn default_max_concurrency() -> usize {
10
}

pub const fn default_stream_maxlen() -> usize {
10_000_000
}
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -287,11 +325,6 @@ impl ConfigPostgres {
#[derive(Debug, Clone, Default, Deserialize)]
pub struct ConfigIngesterDownloadMetadata {
pub stream: ConfigIngestStream,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_num_threads",
deserialize_with = "deserialize_usize_str"
)]
pub _num_threads: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_max_attempts",
deserialize_with = "deserialize_usize_str"
Expand All @@ -303,23 +336,6 @@ pub struct ConfigIngesterDownloadMetadata {
rename = "request_timeout_ms"
)]
pub request_timeout: Duration,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_stream_maxlen",
deserialize_with = "deserialize_usize_str"
)]
pub stream_maxlen: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_stream_max_size",
deserialize_with = "deserialize_usize_str"
)]
pub _pipeline_max_size: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_pipeline_max_idle",
deserialize_with = "deserialize_duration_str",
rename = "pipeline_max_idle_ms"
)]
pub _pipeline_max_idle: Duration,

#[serde(
default = "ConfigIngesterDownloadMetadata::default_retry_max_delay_ms",
deserialize_with = "deserialize_usize_str"
Expand All @@ -333,22 +349,6 @@ pub struct ConfigIngesterDownloadMetadata {
}

impl ConfigIngesterDownloadMetadata {
pub const fn default_num_threads() -> usize {
2
}

pub const fn default_pipeline_max_idle() -> Duration {
Duration::from_millis(10)
}

pub const fn default_stream_max_size() -> usize {
10
}

pub const fn default_stream_maxlen() -> usize {
10_000_000
}

pub const fn default_max_attempts() -> usize {
3
}
Expand Down
169 changes: 120 additions & 49 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,143 @@
use {
crate::{
config::{ConfigIngester, REDIS_STREAM_DATA_KEY},
config::{ConfigDownloadMetadataPublish, ConfigIngester, REDIS_STREAM_DATA_KEY},
postgres::{create_pool as pg_create_pool, report_pgpool},
prom::redis_xadd_status_inc,
prom::{download_metadata_publish_time, redis_xadd_status_inc},
redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle},
util::create_shutdown,
},
das_core::{
DownloadMetadata, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig,
DownloadMetadataNotifier,
create_download_metadata_notifier, DownloadMetadata, DownloadMetadataInfo,
DownloadMetadataJsonRetryConfig,
},
futures::{future::BoxFuture, stream::StreamExt},
futures::stream::StreamExt,
program_transformers::ProgramTransformer,
redis::aio::MultiplexedConnection,
std::sync::Arc,
tokio::time::{sleep, Duration},
tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task::{JoinHandle, JoinSet},
time::{sleep, Duration},
},
tracing::warn,
};

fn download_metadata_notifier_v2(
connection: MultiplexedConnection,
stream: String,
stream_maxlen: usize,
) -> anyhow::Result<DownloadMetadataNotifier> {
Ok(
Box::new(
move |info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
pub struct DownloadMetadataPublish {
handle: JoinHandle<()>,
sender: Option<UnboundedSender<DownloadMetadataInfo>>,
}

impl DownloadMetadataPublish {
pub fn new(handle: JoinHandle<()>, sender: UnboundedSender<DownloadMetadataInfo>) -> Self {
Self {
handle,
sender: Some(sender),
}
}

pub fn take_sender(&mut self) -> Option<UnboundedSender<DownloadMetadataInfo>> {
self.sender.take()
}

pub async fn stop(self) -> Result<(), tokio::task::JoinError> {
self.handle.await
}
}

#[derive(Default)]
pub struct DownloadMetadataPublishBuilder {
config: Option<ConfigDownloadMetadataPublish>,
connection: Option<MultiplexedConnection>,
}

impl DownloadMetadataPublishBuilder {
pub fn build() -> DownloadMetadataPublishBuilder {
DownloadMetadataPublishBuilder::default()
}

pub fn config(mut self, config: ConfigDownloadMetadataPublish) -> Self {
self.config = Some(config);
self
}

pub fn connection(mut self, connection: MultiplexedConnection) -> Self {
self.connection = Some(connection);
self
}

pub fn start(self) -> DownloadMetadataPublish {
let config = self.config.expect("Config must be set");
let connection = self.connection.expect("Connection must be set");

let (sender, mut rx) = unbounded_channel::<DownloadMetadataInfo>();
let stream = config.stream_name;
let stream_maxlen = config.stream_maxlen;
let worker_count = config.max_concurrency;

let handle = tokio::spawn(async move {
let mut tasks = JoinSet::new();

while let Some(download_metadata_info) = rx.recv().await {
if tasks.len() >= worker_count {
tasks.join_next().await;
}

let mut connection = connection.clone();
let stream = stream.clone();
Box::pin(async move {

let info_bytes = serde_json::to_vec(&info)?;

let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async::<_, redis::Value>(&mut connection)
.await;

let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, "metadata_notifier",status, 1);

Ok(())
})
},
),
)
let start_time = tokio::time::Instant::now();

tasks.spawn(async move {
match serde_json::to_vec(&download_metadata_info) {
Ok(info_bytes) => {
let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async::<_, redis::Value>(&mut connection)
.await;

let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, "metadata_json", status, 1);
let elapsed_time = start_time.elapsed().as_secs_f64();

download_metadata_publish_time(elapsed_time);
}
Err(_) => {
tracing::error!("download_metadata_info failed to bytes")
}
}
});
}

while tasks.join_next().await.is_some() {}
});

DownloadMetadataPublish::new(handle, sender)
}
}

pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
let redis_client = redis::Client::open(config.redis)?;
let connection = redis_client.get_multiplexed_tokio_connection().await?;
let pool = pg_create_pool(config.postgres).await?;

let download_metadata_stream = config.download_metadata.stream.clone();
let download_metadata_stream_maxlen = config.download_metadata.stream_maxlen;
let mut download_metadata_publish = DownloadMetadataPublishBuilder::build()
.connection(connection.clone())
.config(config.download_metadata_publish)
.start();

let download_metadata_notifier = download_metadata_notifier_v2(
connection.clone(),
download_metadata_stream.name.clone(),
download_metadata_stream_maxlen,
)?;
let download_metadata_json_sender = download_metadata_publish
.take_sender()
.expect("Take ownership of sender");

let create_download_metadata_sender = download_metadata_json_sender.clone();
let download_metadata_notifier =
create_download_metadata_notifier(create_download_metadata_sender).await;

let program_transformer = Arc::new(ProgramTransformer::new(
pool.clone(),
Expand Down Expand Up @@ -111,7 +179,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
let snapshots = IngestStream::build()
.config(config.snapshots)
.connection(connection.clone())
.handler(AccountHandle::new(Arc::clone(&program_transformer)))
.handler(AccountHandle::new(program_transformer))
.start()
.await?;

Expand Down Expand Up @@ -145,6 +213,9 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
.into_iter()
.collect::<anyhow::Result<()>>()?;

drop(download_metadata_json_sender);
download_metadata_publish.stop().await?;

report.abort();

pool.close().await;
Expand Down
12 changes: 12 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ lazy_static::lazy_static! {
Opts::new("bubblegum_tree_corrupt_proofs", "Number of corrupt proofs in the bubblegum tree"),
&["tree"]
).unwrap();

static ref DOWNLOAD_METADATA_PUBLISH_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new("download_metadata_publish_time", "Time taken for publish download notification to redis"),
&[]
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Expand Down Expand Up @@ -132,6 +137,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);
register!(DOWNLOAD_METADATA_PUBLISH_TIME);

VERSION_INFO_METRIC
.with_label_values(&[
Expand Down Expand Up @@ -267,6 +273,12 @@ pub fn download_metadata_json_task_status_count_inc(status: u16) {
.inc();
}

pub fn download_metadata_publish_time(value: f64) {
DOWNLOAD_METADATA_PUBLISH_TIME
.with_label_values(&[])
.observe(value);
}

#[derive(Debug, Clone, Copy)]
pub enum ProgramTransformerTaskStatusKind {
Success,
Expand Down
3 changes: 2 additions & 1 deletion program_transformers/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ where
delegate::delegate(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? {
let mint = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await?;
if let Some(info) = mint {
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
Expand Down

0 comments on commit 911dacb

Please sign in to comment.