Skip to content

Commit

Permalink
Type out redis message handlers (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Oct 12, 2024
1 parent cde4594 commit 0005b3e
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 293 deletions.
113 changes: 28 additions & 85 deletions grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use {
config::{ConfigIngester, REDIS_STREAM_DATA_KEY},
postgres::{create_pool as pg_create_pool, report_pgpool},
prom::redis_xack_inc,
redis::{IngestStream, RedisStreamMessage},
redis::{AccountHandle, DownloadMetadataJsonHandle, IngestStream, TransactionHandle},
util::create_shutdown,
},
das_core::{DownloadMetadata, DownloadMetadataInfo, DownloadMetadataNotifier},
futures::{future::BoxFuture, stream::StreamExt},
program_transformers::{AccountInfo, ProgramTransformer, TransactionInfo},
program_transformers::ProgramTransformer,
redis::aio::MultiplexedConnection,
std::sync::Arc,
tokio::time::{sleep, Duration},
Expand Down Expand Up @@ -60,112 +60,55 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
let download_metadata_stream = config.download_metadata.stream.clone();
let download_metadata_stream_maxlen = config.download_metadata.stream_maxlen;

let accounts_download_metadata_notifier = download_metadata_notifier_v2(
connection.clone(),
download_metadata_stream.name.clone(),
download_metadata_stream_maxlen,
)?;
let snapshots_download_metadata_notifier = download_metadata_notifier_v2(
connection.clone(),
download_metadata_stream.name.clone(),
download_metadata_stream_maxlen,
)?;
let transactions_download_metadata_notifier = download_metadata_notifier_v2(
let download_metadata_notifier = download_metadata_notifier_v2(
connection.clone(),
download_metadata_stream.name.clone(),
download_metadata_stream_maxlen,
)?;

let pt_accounts = Arc::new(ProgramTransformer::new(
let program_transformer = Arc::new(ProgramTransformer::new(
pool.clone(),
accounts_download_metadata_notifier,
));
let pt_snapshots = Arc::new(ProgramTransformer::new(
pool.clone(),
snapshots_download_metadata_notifier,
));
let pt_transactions = Arc::new(ProgramTransformer::new(
pool.clone(),
transactions_download_metadata_notifier,
download_metadata_notifier,
));
let http_client = reqwest::Client::builder()
.timeout(config.download_metadata.request_timeout)
.build()?;
let download_metadata = Arc::new(DownloadMetadata::new(http_client, pool.clone()));

let download_metadata_stream = IngestStream::build()
let download_metadata = Arc::new(DownloadMetadata::new(http_client, pool.clone()));
let download_metadatas = IngestStream::build()
.config(config.download_metadata.stream.clone())
.connection(connection.clone())
.handler(move |info| {
let download_metadata = Arc::clone(&download_metadata);

Box::pin(async move {
let info = DownloadMetadataInfo::try_parse_msg(info)?;

download_metadata
.handle_download(&info)
.await
.map_err(Into::into)
})
})
.handler(DownloadMetadataJsonHandle::new(Arc::clone(
&download_metadata,
)))
.start()
.await?;
let account_stream = IngestStream::build()
.config(config.accounts.clone())

let accounts = IngestStream::build()
.config(config.accounts)
.connection(connection.clone())
.handler(move |info| {
let pt_accounts = Arc::clone(&pt_accounts);

Box::pin(async move {
let info = AccountInfo::try_parse_msg(info)?;

pt_accounts
.handle_account_update(&info)
.await
.map_err(Into::into)
})
})
.handler(AccountHandle::new(Arc::clone(&program_transformer)))
.start()
.await?;
let transactions_stream = IngestStream::build()
.config(config.transactions.clone())

let transactions = IngestStream::build()
.config(config.transactions)
.connection(connection.clone())
.handler(move |info| {
let pt_transactions = Arc::clone(&pt_transactions);

Box::pin(async move {
let info = TransactionInfo::try_parse_msg(info)?;

pt_transactions
.handle_transaction(&info)
.await
.map_err(Into::into)
})
})
.handler(TransactionHandle::new(Arc::clone(&program_transformer)))
.start()
.await?;
let snapshot_stream = IngestStream::build()
.config(config.snapshots.clone())

let snapshots = IngestStream::build()
.config(config.snapshots)
.connection(connection.clone())
.handler(move |info| {
let pt_snapshots = Arc::clone(&pt_snapshots);

Box::pin(async move {
let info = AccountInfo::try_parse_msg(info)?;

pt_snapshots
.handle_account_update(&info)
.await
.map_err(Into::into)
})
})
.handler(AccountHandle::new(Arc::clone(&program_transformer)))
.start()
.await?;

let mut shutdown = create_shutdown()?;

let report_pool = pool.clone();
let report_handle = tokio::spawn(async move {
let report = tokio::spawn(async move {
let pool = report_pool.clone();
loop {
sleep(Duration::from_millis(100)).await;
Expand All @@ -177,12 +120,12 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
warn!("{signal} received, waiting for spawned tasks...");
}

report_handle.abort();
report.abort();

account_stream.stop().await?;
transactions_stream.stop().await?;
download_metadata_stream.stop().await?;
snapshot_stream.stop().await?;
accounts.stop().await?;
transactions.stop().await?;
download_metadatas.stop().await?;
snapshots.stop().await?;

pool.close().await;

Expand Down
12 changes: 12 additions & 0 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ lazy_static::lazy_static! {
&["stream", "status"]
).unwrap();

static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xread_count", "Count of messages seen"),
&["stream"]
).unwrap();

static ref REDIS_XACK_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xack_count", "Total number of processed messages"),
&["stream"]
Expand Down Expand Up @@ -72,6 +77,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(VERSION_INFO_METRIC);
register!(REDIS_STREAM_LENGTH);
register!(REDIS_XADD_STATUS_COUNT);
register!(REDIS_XREAD_COUNT);
register!(REDIS_XACK_COUNT);
register!(PGPOOL_CONNECTIONS);
register!(PROGRAM_TRANSFORMER_TASKS);
Expand Down Expand Up @@ -141,6 +147,12 @@ pub fn redis_xadd_status_inc(stream: &str, status: Result<(), ()>, delta: usize)
.inc_by(delta as u64);
}

pub fn redis_xread_inc(stream: &str, delta: usize) {
REDIS_XREAD_COUNT
.with_label_values(&[stream])
.inc_by(delta as u64)
}

pub fn redis_xack_inc(stream: &str, delta: usize) {
REDIS_XACK_COUNT
.with_label_values(&[stream])
Expand Down
Loading

0 comments on commit 0005b3e

Please sign in to comment.