Skip to content

Commit

Permalink
zmq parse and insert tx in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Sep 22, 2024
1 parent ea7816e commit bd06b79
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 13 deletions.
66 changes: 66 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ qr_code = { version = "2.0.0", features = ["bmp"] }
base64 = "0.21.0"
percent-encoding = "2.2.0"
bitcoin-private = "0.1.0"
bitcoin_slices = { version = "0.6.2", features = ["bitcoin", "slice_cache"] }
bitcoin_slices = { version = "0.6.2", features = ["bitcoin", "slice_cache", "sha2"] }
clap = { version = "4.2.7", features = ["derive", "env"] }
url = "2.3.1"
fxhash = "0.2.1"
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ pub enum Error {
#[error(transparent)]
Prometheus(#[from] prometheus::Error),

#[error(transparent)]
Zmq(#[from] async_zmq::Error),

#[error(transparent)]
ZmqSubscribe(#[from] async_zmq::SubscribeError),

#[error("Bitcoin core RPC chaininfo failed status_code:{0}")]
RpcChainInfo(StatusCode),

Expand Down
9 changes: 7 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use threads::zmq::update_tx_zmq;
use threads::zmq::update_tx_zmq_infallible;
use tokio::time::sleep;

mod base_text_decorator;
Expand Down Expand Up @@ -173,6 +173,8 @@ pub async fn inner_main(mut args: Arguments) -> Result<(), Error> {
// keep chain info updated
let shared_state_chain = shared_state.clone();
let shared_state_mempool = shared_state.clone();
let shared_state_zmq = shared_state.clone();

let chain_info_chain = chain_info.clone();

let shared_state_addresses = shared_state.clone();
Expand All @@ -194,7 +196,10 @@ pub async fn inner_main(mut args: Arguments) -> Result<(), Error> {
}

if let Some(socket) = zmq_rawtx {
let _ = tokio::spawn(async move { update_tx_zmq(&socket).await });
let _ =
tokio::spawn(
async move { update_tx_zmq_infallible(&socket, shared_state_zmq).await },
);
}

update_mempool(shared_state_mempool).await;
Expand Down
34 changes: 24 additions & 10 deletions src/threads/zmq.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use async_zmq::{subscribe, Context};
use bitcoin::{hashes::Hash, Txid};
use bitcoin_slices::{bsl, Parse};
use futures::StreamExt;

pub async fn update_tx_zmq(socket: &SocketAddr) {
use crate::{state::SharedState, Error};

pub async fn update_tx_zmq_infallible(socket: &SocketAddr, state: Arc<SharedState>) {
if let Err(e) = update_tx_zmq(socket, state).await {
log::error!("{:?}", e);
}
}

async fn update_tx_zmq(socket: &SocketAddr, state: Arc<SharedState>) -> Result<(), Error> {
log::info!("Start update_tx_zmq!");

let context = Context::new();
let url = format!("tcp://{socket}");

let mut sub = subscribe(&url)
.unwrap()
.with_context(&context)
.connect()
.unwrap();
sub.set_subscribe("rawtx").unwrap();
let mut sub = subscribe(&url).unwrap().with_context(&context).connect()?;
sub.set_subscribe("rawtx")?;

while let Some(msg) = sub.next().await {
// Received message is a type of Result<MessageBuf>
let msg = msg.unwrap();
// | "rawtx" | <serialized transaction> | <uint32 sequence number in Little Endian>
if let Some(tx) = msg.get(1) {
if let Ok(tx) = bsl::Transaction::parse(tx) {
let txid = tx.parsed().txid_sha2();
let txid = Txid::from_byte_array(txid.into());
log::info!("inserting {}", txid);

log::info!("{:?}", msg.iter());
let _ = state.txs.lock().await.insert(txid, tx.parsed());
}
}
}
Ok(())
}

0 comments on commit bd06b79

Please sign in to comment.