Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to channel and worker pool for download notifier #217

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,44 @@ pub struct ConfigIngester {
pub snapshots: ConfigIngestStream,
pub accounts: ConfigIngestStream,
pub transactions: ConfigIngestStream,
#[serde(default = "ConfigIngester::default_download_metadata_publish")]
pub download_metadata_publish: ConfigDownloadMetadataPublish,
}

impl ConfigIngester {
pub fn default_download_metadata_publish() -> ConfigDownloadMetadataPublish {
ConfigDownloadMetadataPublish::default()
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConfigDownloadMetadataPublish {
#[serde(
default = "ConfigDownloadMetadataPublish::default_max_concurrency",
deserialize_with = "deserialize_usize_str"
)]
pub max_concurrency: usize,
#[serde(default = "ConfigDownloadMetadataPublish::default_stream_name")]
pub stream_name: String,
#[serde(
default = "ConfigDownloadMetadataPublish::default_stream_maxlen",
deserialize_with = "deserialize_usize_str"
)]
pub stream_maxlen: usize,
}

impl ConfigDownloadMetadataPublish {
pub fn default_stream_name() -> String {
"METADATA_JSON".to_owned()
}

pub const fn default_max_concurrency() -> usize {
10
}

pub const fn default_stream_maxlen() -> usize {
10_000_000
}
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -287,11 +325,6 @@ impl ConfigPostgres {
#[derive(Debug, Clone, Default, Deserialize)]
pub struct ConfigIngesterDownloadMetadata {
pub stream: ConfigIngestStream,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_num_threads",
deserialize_with = "deserialize_usize_str"
)]
pub _num_threads: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_max_attempts",
deserialize_with = "deserialize_usize_str"
Expand All @@ -303,23 +336,6 @@ pub struct ConfigIngesterDownloadMetadata {
rename = "request_timeout_ms"
)]
pub request_timeout: Duration,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_stream_maxlen",
deserialize_with = "deserialize_usize_str"
)]
pub stream_maxlen: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_stream_max_size",
deserialize_with = "deserialize_usize_str"
)]
pub _pipeline_max_size: usize,
#[serde(
default = "ConfigIngesterDownloadMetadata::default_pipeline_max_idle",
deserialize_with = "deserialize_duration_str",
rename = "pipeline_max_idle_ms"
)]
pub _pipeline_max_idle: Duration,

#[serde(
default = "ConfigIngesterDownloadMetadata::default_retry_max_delay_ms",
deserialize_with = "deserialize_usize_str"
Expand All @@ -333,22 +349,6 @@ pub struct ConfigIngesterDownloadMetadata {
}

impl ConfigIngesterDownloadMetadata {
pub const fn default_num_threads() -> usize {
2
}

pub const fn default_pipeline_max_idle() -> Duration {
Duration::from_millis(10)
}

pub const fn default_stream_max_size() -> usize {
10
}

pub const fn default_stream_maxlen() -> usize {
10_000_000
}

pub const fn default_max_attempts() -> usize {
3
}
Expand Down
169 changes: 120 additions & 49 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,143 @@
use {
crate::{
config::{ConfigIngester, REDIS_STREAM_DATA_KEY},
config::{ConfigDownloadMetadataPublish, ConfigIngester, REDIS_STREAM_DATA_KEY},
postgres::{create_pool as pg_create_pool, report_pgpool},
prom::redis_xadd_status_inc,
prom::{download_metadata_publish_time, redis_xadd_status_inc},
redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle},
util::create_shutdown,
},
das_core::{
DownloadMetadata, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig,
DownloadMetadataNotifier,
create_download_metadata_notifier, DownloadMetadata, DownloadMetadataInfo,
DownloadMetadataJsonRetryConfig,
},
futures::{future::BoxFuture, stream::StreamExt},
futures::stream::StreamExt,
program_transformers::ProgramTransformer,
redis::aio::MultiplexedConnection,
std::sync::Arc,
tokio::time::{sleep, Duration},
tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task::{JoinHandle, JoinSet},
time::{sleep, Duration},
},
tracing::warn,
};

fn download_metadata_notifier_v2(
connection: MultiplexedConnection,
stream: String,
stream_maxlen: usize,
) -> anyhow::Result<DownloadMetadataNotifier> {
Ok(
Box::new(
move |info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
pub struct DownloadMetadataPublish {
handle: JoinHandle<()>,
sender: Option<UnboundedSender<DownloadMetadataInfo>>,
}

impl DownloadMetadataPublish {
pub fn new(handle: JoinHandle<()>, sender: UnboundedSender<DownloadMetadataInfo>) -> Self {
Self {
handle,
sender: Some(sender),
}
}

pub fn take_sender(&mut self) -> Option<UnboundedSender<DownloadMetadataInfo>> {
self.sender.take()
}

pub async fn stop(self) -> Result<(), tokio::task::JoinError> {
self.handle.await
}
}

#[derive(Default)]
pub struct DownloadMetadataPublishBuilder {
config: Option<ConfigDownloadMetadataPublish>,
connection: Option<MultiplexedConnection>,
}

impl DownloadMetadataPublishBuilder {
pub fn build() -> DownloadMetadataPublishBuilder {
DownloadMetadataPublishBuilder::default()
}

pub fn config(mut self, config: ConfigDownloadMetadataPublish) -> Self {
self.config = Some(config);
self
}

pub fn connection(mut self, connection: MultiplexedConnection) -> Self {
self.connection = Some(connection);
self
}

pub fn start(self) -> DownloadMetadataPublish {
let config = self.config.expect("Config must be set");
let connection = self.connection.expect("Connection must be set");

let (sender, mut rx) = unbounded_channel::<DownloadMetadataInfo>();
let stream = config.stream_name;
let stream_maxlen = config.stream_maxlen;
let worker_count = config.max_concurrency;

let handle = tokio::spawn(async move {
let mut tasks = JoinSet::new();

while let Some(download_metadata_info) = rx.recv().await {
if tasks.len() >= worker_count {
tasks.join_next().await;
}

let mut connection = connection.clone();
let stream = stream.clone();
Box::pin(async move {

let info_bytes = serde_json::to_vec(&info)?;

let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async::<_, redis::Value>(&mut connection)
.await;

let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, "metadata_notifier",status, 1);

Ok(())
})
},
),
)
let start_time = tokio::time::Instant::now();

tasks.spawn(async move {
match serde_json::to_vec(&download_metadata_info) {
Ok(info_bytes) => {
let xadd = redis::cmd("XADD")
.arg(&stream)
.arg("MAXLEN")
.arg("~")
.arg(stream_maxlen)
.arg("*")
.arg(REDIS_STREAM_DATA_KEY)
.arg(info_bytes)
.query_async::<_, redis::Value>(&mut connection)
.await;

let status = xadd.map(|_| ()).map_err(|_| ());

redis_xadd_status_inc(&stream, "metadata_json", status, 1);
let elapsed_time = start_time.elapsed().as_secs_f64();

download_metadata_publish_time(elapsed_time);
}
Err(_) => {
tracing::error!("download_metadata_info failed to bytes")
}
}
});
}

while tasks.join_next().await.is_some() {}
});

DownloadMetadataPublish::new(handle, sender)
}
}

pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
let redis_client = redis::Client::open(config.redis)?;
let connection = redis_client.get_multiplexed_tokio_connection().await?;
let pool = pg_create_pool(config.postgres).await?;

let download_metadata_stream = config.download_metadata.stream.clone();
let download_metadata_stream_maxlen = config.download_metadata.stream_maxlen;
let mut download_metadata_publish = DownloadMetadataPublishBuilder::build()
.connection(connection.clone())
.config(config.download_metadata_publish)
.start();

let download_metadata_notifier = download_metadata_notifier_v2(
connection.clone(),
download_metadata_stream.name.clone(),
download_metadata_stream_maxlen,
)?;
let download_metadata_json_sender = download_metadata_publish
.take_sender()
.expect("Take ownership of sender");

let create_download_metadata_sender = download_metadata_json_sender.clone();
let download_metadata_notifier =
create_download_metadata_notifier(create_download_metadata_sender).await;

let program_transformer = Arc::new(ProgramTransformer::new(
pool.clone(),
Expand Down Expand Up @@ -111,7 +179,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
let snapshots = IngestStream::build()
.config(config.snapshots)
.connection(connection.clone())
.handler(AccountHandle::new(Arc::clone(&program_transformer)))
.handler(AccountHandle::new(program_transformer))
.start()
.await?;

Expand Down Expand Up @@ -145,6 +213,9 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
.into_iter()
.collect::<anyhow::Result<()>>()?;

drop(download_metadata_json_sender);
download_metadata_publish.stop().await?;

report.abort();

pool.close().await;
Expand Down
12 changes: 12 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ lazy_static::lazy_static! {
Opts::new("bubblegum_tree_corrupt_proofs", "Number of corrupt proofs in the bubblegum tree"),
&["tree"]
).unwrap();

static ref DOWNLOAD_METADATA_PUBLISH_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new("download_metadata_publish_time", "Time taken for publish download notification to redis"),
&[]
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Expand Down Expand Up @@ -132,6 +137,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(BUBBLEGUM_TREE_NOT_FOUND_PROOFS);
register!(BUBBLEGUM_TREE_CORRECT_PROOFS);
register!(BUBBLEGUM_TREE_CORRUPT_PROOFS);
register!(DOWNLOAD_METADATA_PUBLISH_TIME);

VERSION_INFO_METRIC
.with_label_values(&[
Expand Down Expand Up @@ -267,6 +273,12 @@ pub fn download_metadata_json_task_status_count_inc(status: u16) {
.inc();
}

pub fn download_metadata_publish_time(value: f64) {
DOWNLOAD_METADATA_PUBLISH_TIME
.with_label_values(&[])
.observe(value);
}

#[derive(Debug, Clone, Copy)]
pub enum ProgramTransformerTaskStatusKind {
Success,
Expand Down
3 changes: 2 additions & 1 deletion program_transformers/src/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ where
delegate::delegate(parsing_result, bundle, txn, ix_str).await?;
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? {
let mint = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await?;
if let Some(info) = mint {
download_metadata_notifier(info)
.await
.map_err(ProgramTransformerError::DownloadMetadataNotify)?;
kespinola marked this conversation as resolved.
Show resolved Hide resolved
Expand Down