diff --git a/Cargo.toml b/Cargo.toml index f20616a..fd36865 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,20 @@ bytes = "1.10.0" bincode = "1.3.3" solana-entry = "2" solana-pubkey = "2" +solana-sdk = "=2.2.1" + +# UDP shred reconstruction dependencies +solana-ledger = { git = "https://github.com/jito-foundation/jito-solana.git", branch = "eric/v2.2-merkle-recovery" } +solana-perf = "=2.2.1" +solana-streamer = "=2.2.1" +solana-net-utils = "=2.2.1" +solana-metrics = "=2.2.1" +ahash = "0.8" +itertools = "0.14.0" +crossbeam-channel = "0.5.8" +rand = "0.8" +socket2 = "0.5" +tikv-jemallocator = "0.6" lazy_static = "1.5.0" dashmap = "6" comfy-table = "7.1.0" @@ -37,3 +51,9 @@ thorstreamer-grpc-client = "0.1.3" [build-dependencies] tonic-prost-build = "0.14" protoc-bin-vendored = "3" + +[profile.release] +opt-level = 3 +lto = "fat" +codegen-units = 1 +strip = true diff --git a/src/backend.rs b/src/backend.rs index a0518c0..9c7e374 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -558,10 +558,17 @@ impl BackendConfigPayload { impl BackendEndpointPayload { fn from_endpoint(endpoint: &Endpoint, provided_ip: Option) -> Self { + let (kind_str, url) = match endpoint.kind { + // Backend doesn't know UDP variant: map kind and wrap bare socket addr as URL + crate::config::EndpointKind::UdpShredstream => { + ("shredstream", format!("udp://{}", endpoint.url)) + } + _ => (endpoint.kind.as_str(), endpoint.url.clone()), + }; Self { name: endpoint.name.clone(), - url: endpoint.url.clone(), - kind: Some(endpoint.kind.as_str().to_string()), + url, + kind: Some(kind_str.to_string()), resolved_ip: provided_ip, } } @@ -577,6 +584,13 @@ async fn build_endpoint_payloads(endpoints: &[Endpoint]) -> Vec Option { + // UDP endpoints use bare socket addresses (e.g. "127.0.0.1:8001"), not URLs + if matches!(endpoint.kind, crate::config::EndpointKind::UdpShredstream) { + return endpoint.url.parse::() + .ok() + .map(|addr| addr.ip().to_string()); + } + let parsed = match Url::parse(&endpoint.url) { Ok(url) => url, Err(err) => { diff --git a/src/config.rs b/src/config.rs index d2f5ef4..9022a53 100644 --- a/src/config.rs +++ b/src/config.rs @@ -48,6 +48,8 @@ pub enum EndpointKind { Shredstream, Shreder, Jetstream, + #[serde(alias = "udpshredstream")] + UdpShredstream, } #[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)] @@ -88,6 +90,7 @@ impl EndpointKind { EndpointKind::Shredstream => "shredstream", EndpointKind::Shreder => "shreder", EndpointKind::Jetstream => "jetstream", + EndpointKind::UdpShredstream => "udpshredstream", } } } diff --git a/src/deshred.rs b/src/deshred.rs new file mode 100644 index 0000000..3cdfd32 --- /dev/null +++ b/src/deshred.rs @@ -0,0 +1,486 @@ +use std::{collections::HashSet, hash::Hash, sync::atomic::{AtomicU64, Ordering}}; + +use itertools::Itertools; +use tracing::{debug, warn}; +use solana_ledger::{ + blockstore::MAX_DATA_SHREDS_PER_SLOT, + shred::{ + merkle::{Shred, ShredCode}, + ReedSolomonCache, ShredType, Shredder, + }, +}; +use solana_perf::packet::PacketBatch; +use solana_sdk::clock::{Slot, MAX_PROCESSING_AGE}; + +// metrics for deshred ops +pub struct DeshredMetrics { + pub recovered_count: AtomicU64, + pub entry_count: AtomicU64, + pub txn_count: AtomicU64, + pub unknown_start_position_count: AtomicU64, + pub fec_recovery_error_count: AtomicU64, + pub bincode_deserialize_error_count: AtomicU64, + pub unknown_start_position_error_count: AtomicU64, +} + +impl Default for DeshredMetrics { + fn default() -> Self { + Self { + recovered_count: AtomicU64::new(0), + entry_count: AtomicU64::new(0), + txn_count: AtomicU64::new(0), + unknown_start_position_count: AtomicU64::new(0), + fec_recovery_error_count: AtomicU64::new(0), + bincode_deserialize_error_count: AtomicU64::new(0), + unknown_start_position_error_count: AtomicU64::new(0), + } + } +} + +#[derive(Default, Debug, Copy, Clone, Eq, PartialEq)] +enum ShredStatus { + #[default] + Unknown, + /// Shred that is **not** marked as DATA_COMPLETE_SHRED + NotDataComplete, + /// Shred that is marked as DATA_COMPLETE_SHRED + DataComplete, +} + +/// Tracks per-slot shred information for data shreds +/// Guaranteed to have MAX_DATA_SHREDS_PER_SLOT entries in each Vec +#[derive(Debug)] +pub struct ShredsStateTracker { + data_status: Vec, + data_shreds: Vec>, + already_recovered_fec_sets: Vec, + already_deshredded: Vec, +} + +impl Default for ShredsStateTracker { + fn default() -> Self { + Self { + data_status: vec![ShredStatus::Unknown; MAX_DATA_SHREDS_PER_SLOT], + data_shreds: vec![None; MAX_DATA_SHREDS_PER_SLOT], + already_recovered_fec_sets: vec![false; MAX_DATA_SHREDS_PER_SLOT], + already_deshredded: vec![false; MAX_DATA_SHREDS_PER_SLOT], + } + } +} + +const SLOT_LOOKBACK: Slot = 50; + +pub fn reconstruct_shreds( + packet_batch: PacketBatch, + all_shreds: &mut ahash::HashMap< + Slot, + ( + ahash::HashMap>, + ShredsStateTracker, + ), + >, + slot_fec_indexes_to_iterate: &mut Vec<(Slot, u32)>, + deshredded_entries: &mut Vec<(Slot, Vec)>, + highest_slot_seen: &mut Slot, + rs_cache: &ReedSolomonCache, + metrics: &DeshredMetrics, +) -> usize { + deshredded_entries.clear(); + slot_fec_indexes_to_iterate.clear(); + + // ingest all packets + for packet in packet_batch.iter().filter_map(|p| p.data(..)) { + // Skip heartbeat/keepalive packets (real shreds are ~1228 bytes) + if packet.len() < 64 { + continue; + } + match solana_ledger::shred::Shred::new_from_serialized_shred(packet.to_vec()) + .and_then(Shred::try_from) + { + Ok(shred) => { + let slot = shred.common_header().slot; + let index = shred.index() as usize; + let fec_set_index = shred.fec_set_index(); + let (all_shreds, state_tracker) = all_shreds.entry(slot).or_default(); + if highest_slot_seen.saturating_sub(SLOT_LOOKBACK) > slot { + debug!( + "Old shred slot: {slot}, fec_set_index: {fec_set_index}, index: {index}" + ); + continue; + } + if state_tracker.already_recovered_fec_sets[fec_set_index as usize] + || state_tracker.already_deshredded[index] + { + debug!("Already completed slot: {slot}, fec_set_index: {fec_set_index}, index: {index}"); + continue; + } + let Some(_shred_index) = update_state_tracker(&shred, state_tracker) else { + continue; + }; + + all_shreds + .entry(fec_set_index) + .or_default() + .insert(ComparableShred(shred)); + slot_fec_indexes_to_iterate.push((slot, fec_set_index)); + *highest_slot_seen = std::cmp::max(*highest_slot_seen, slot); + } + Err(_) => {} + } + } + slot_fec_indexes_to_iterate.sort_unstable(); + slot_fec_indexes_to_iterate.dedup(); + + // try recovering by FEC set + let mut total_recovered_count = 0; + for (slot, fec_set_index) in slot_fec_indexes_to_iterate.iter() { + let (all_shreds, state_tracker) = all_shreds.entry(*slot).or_default(); + let shreds = all_shreds.entry(*fec_set_index).or_default(); + let ( + num_expected_data_shreds, + num_expected_coding_shreds, + num_data_shreds, + num_coding_shreds, + ) = get_data_shred_info(shreds); + + // haven't received last data shred, haven't seen any coding shreds, so wait until more arrive + let min_shreds_needed_to_recover = num_expected_data_shreds as usize; + if num_expected_data_shreds == 0 + || shreds.len() < min_shreds_needed_to_recover + || num_data_shreds == num_expected_data_shreds + { + continue; + } + + // try to recover if we have enough shreds in the FEC set + let merkle_shreds = shreds + .iter() + .sorted_by_key(|s| (u8::MAX - s.shred_type() as u8, s.index())) + .map(|s| s.0.clone()) + .collect_vec(); + let recovered = match solana_ledger::shred::merkle::recover(merkle_shreds, rs_cache) { + Ok(r) => r, + Err(e) => { + warn!( + "Failed to recover shreds for slot {slot} fec_set_index {fec_set_index}. \ + num_expected_data_shreds: {num_expected_data_shreds}, num_data_shreds: {num_data_shreds} \ + num_expected_coding_shreds: {num_expected_coding_shreds} num_coding_shreds: {num_coding_shreds} Err: {e}", + ); + continue; + } + }; + + let mut fec_set_recovered_count = 0; + for shred in recovered { + match shred { + Ok(shred) => { + if update_state_tracker(&shred, state_tracker).is_none() { + continue; + } + total_recovered_count += 1; + fec_set_recovered_count += 1; + } + Err(e) => warn!( + "Failed to recover shred for slot {slot}, fec set: {fec_set_index}. Err: {e}" + ), + } + } + + if fec_set_recovered_count > 0 { + debug!("recovered slot: {slot}, fec_index: {fec_set_index}, recovered count: {fec_set_recovered_count}"); + state_tracker.already_recovered_fec_sets[*fec_set_index as usize] = true; + shreds.clear(); + } + } + + // deshred and bincode deserialize + for (slot, fec_set_index) in slot_fec_indexes_to_iterate.iter() { + let (_all_shreds, state_tracker) = all_shreds.entry(*slot).or_default(); + let Some((start_data_complete_idx, end_data_complete_idx, unknown_start)) = + get_indexes(state_tracker, *fec_set_index as usize) + else { + continue; + }; + if unknown_start { + metrics + .unknown_start_position_count + .fetch_add(1, Ordering::Relaxed); + } + + let to_deshred = + &state_tracker.data_shreds[start_data_complete_idx..=end_data_complete_idx]; + let deshredded_payload = match Shredder::deshred( + to_deshred.iter().map(|s| s.as_ref().unwrap().payload()), + ) { + Ok(v) => v, + Err(e) => { + warn!("slot {slot} failed to deshred start_data_complete_idx: {start_data_complete_idx}, end_data_complete_idx: {end_data_complete_idx}. Err: {e}"); + metrics + .fec_recovery_error_count + .fetch_add(1, Ordering::Relaxed); + if unknown_start { + metrics + .unknown_start_position_error_count + .fetch_add(1, Ordering::Relaxed); + } + continue; + } + }; + + let entries = match bincode::deserialize::>( + &deshredded_payload, + ) { + Ok(entries) => entries, + Err(e) => { + debug!( + "Failed to deserialize bincode payload of size {} for slot {slot}, \ + start: {start_data_complete_idx}, end: {end_data_complete_idx}, \ + unknown_start: {unknown_start}. Err: {e}", + deshredded_payload.len() + ); + metrics + .bincode_deserialize_error_count + .fetch_add(1, Ordering::Relaxed); + if unknown_start { + metrics + .unknown_start_position_error_count + .fetch_add(1, Ordering::Relaxed); + } + continue; + } + }; + metrics + .entry_count + .fetch_add(entries.len() as u64, Ordering::Relaxed); + let txn_count: u64 = entries.iter().map(|e| e.transactions.len() as u64).sum(); + metrics.txn_count.fetch_add(txn_count, Ordering::Relaxed); + debug!( + "Decoded slot: {slot} start: {start_data_complete_idx} end: {end_data_complete_idx} entries: {}, txns: {txn_count}", + entries.len(), + ); + + deshredded_entries.push((*slot, entries)); + to_deshred.iter().for_each(|shred| { + let Some(shred) = shred.as_ref() else { + return; + }; + state_tracker.already_recovered_fec_sets[shred.fec_set_index() as usize] = true; + state_tracker.already_deshredded[shred.index() as usize] = true; + }) + } + + // cleanup old slots + if all_shreds.len() > MAX_PROCESSING_AGE { + let slot_threshold = highest_slot_seen.saturating_sub(SLOT_LOOKBACK); + let mut incomplete_count = 0u64; + all_shreds.retain(|slot, (fec_set_indexes, state_tracker)| { + if *slot >= slot_threshold { + return true; + } + for (fec_set_index, _shreds) in fec_set_indexes.iter() { + if !state_tracker.already_recovered_fec_sets[*fec_set_index as usize] { + incomplete_count += 1; + } + } + false + }); + if incomplete_count > 0 { + warn!("Cleaned up old slots with {incomplete_count} incomplete FEC sets"); + } + } + + if total_recovered_count > 0 { + metrics + .recovered_count + .fetch_add(total_recovered_count as u64, Ordering::Relaxed); + } + + total_recovered_count +} + +/// Return the inclusive range of shreds that constitute one complete segment +fn get_indexes( + tracker: &ShredsStateTracker, + index: usize, +) -> Option<(usize, usize, bool)> { + if index >= tracker.data_status.len() { + return None; + } + + // find the right boundary (first DataComplete >= index) + let mut end = index; + while end < tracker.data_status.len() { + if tracker.already_deshredded[end] { + return None; + } + match &tracker.data_status[end] { + ShredStatus::Unknown => return None, + ShredStatus::DataComplete => break, + ShredStatus::NotDataComplete => end += 1, + } + } + if end == tracker.data_status.len() { + return None; + } + + if end == 0 { + return Some((0, 0, false)); + } + if index == 0 { + return Some((0, end, false)); + } + + // find the left boundary (prev DataComplete + 1) + let mut start = index; + let mut next = start - 1; + loop { + match tracker.data_status[next] { + ShredStatus::NotDataComplete => { + if tracker.already_deshredded[next] { + return None; + } + if next == 0 { + return Some((0, end, false)); + } + start = next; + next -= 1; + } + ShredStatus::DataComplete => return Some((start, end, false)), + ShredStatus::Unknown => return Some((start, end, true)), + } + } +} + +/// Upon receiving a new shred, update the state tracker. +/// Returns shred index on new insert, None if already exists. +fn update_state_tracker(shred: &Shred, state_tracker: &mut ShredsStateTracker) -> Option { + let index = shred.index() as usize; + if state_tracker.already_recovered_fec_sets[shred.fec_set_index() as usize] { + return None; + } + if shred.shred_type() == ShredType::Data + && (state_tracker.data_shreds[index].is_some() + || !matches!(state_tracker.data_status[index], ShredStatus::Unknown)) + { + return None; + } + if let Shred::ShredData(s) = &shred { + state_tracker.data_shreds[index] = Some(shred.clone()); + if s.data_complete() || s.last_in_slot() { + state_tracker.data_status[index] = ShredStatus::DataComplete; + } else { + state_tracker.data_status[index] = ShredStatus::NotDataComplete; + } + }; + Some(index) +} + +/// Check if we can reconstruct (having minimum number of data + coding shreds) +fn get_data_shred_info( + shreds: &HashSet, +) -> (u16, u16, u16, u16) { + let mut num_expected_data_shreds = 0; + let mut num_expected_coding_shreds = 0; + let mut num_data_shreds = 0; + let mut num_coding_shreds = 0; + for shred in shreds { + match &shred.0 { + Shred::ShredCode(s) => { + num_coding_shreds += 1; + num_expected_data_shreds = s.coding_header.num_data_shreds; + num_expected_coding_shreds = s.coding_header.num_coding_shreds; + } + Shred::ShredData(s) => { + num_data_shreds += 1; + if num_expected_data_shreds == 0 && (s.data_complete() || s.last_in_slot()) { + num_expected_data_shreds = + (shred.0.index() - shred.0.fec_set_index()) as u16 + 1; + } + } + } + } + ( + num_expected_data_shreds, + num_expected_coding_shreds, + num_data_shreds, + num_coding_shreds, + ) +} + +/// Issue: datashred equality comparison is wrong due to data size being smaller than the 1203 bytes allocated +#[derive(Clone, Debug, Eq)] +pub struct ComparableShred(Shred); + +impl std::ops::Deref for ComparableShred { + type Target = Shred; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Hash for ComparableShred { + fn hash(&self, state: &mut H) { + match &self.0 { + Shred::ShredCode(s) => { + s.common_header.hash(state); + s.coding_header.hash(state); + } + Shred::ShredData(s) => { + s.common_header.hash(state); + s.data_header.hash(state); + } + } + } +} + +impl PartialEq for ComparableShred { + fn eq(&self, other: &Self) -> bool { + match &self.0 { + Shred::ShredCode(s1) => match &other.0 { + Shred::ShredCode(s2) => { + let solana_ledger::shred::ShredVariant::MerkleCode { + proof_size, + chained: _, + resigned, + } = s1.common_header.shred_variant + else { + return false; + }; + + let comparison_len = + ::SIZE_OF_PAYLOAD + .saturating_sub( + usize::from(proof_size) + * solana_ledger::shred::merkle::SIZE_OF_MERKLE_PROOF_ENTRY + + if resigned { + solana_ledger::shred::SIZE_OF_SIGNATURE + } else { + 0 + }, + ); + + s1.coding_header == s2.coding_header + && s1.common_header == s2.common_header + && s1.payload[..comparison_len] == s2.payload[..comparison_len] + } + Shred::ShredData(_) => false, + }, + Shred::ShredData(s1) => match &other.0 { + Shred::ShredCode(_) => false, + Shred::ShredData(s2) => { + let Ok(s1_data) = solana_ledger::shred::layout::get_data(self.payload()) else { + return false; + }; + let Ok(s2_data) = solana_ledger::shred::layout::get_data(other.payload()) + else { + return false; + }; + s1.data_header == s2.data_header + && s1.common_header == s2.common_header + && s1_data == s2_data + } + }, + } + } +} diff --git a/src/main.rs b/src/main.rs index a217f3c..8907bdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,6 @@ +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + pub use { bs58, bytes::Bytes, @@ -18,6 +21,7 @@ pub use { mod analysis; mod backend; mod config; +mod deshred; mod proto; mod providers; mod utils; @@ -276,6 +280,22 @@ async fn main() -> Result<()> { None }; + // Drain the forwarder before backend finalization so all queued signatures are sent + let _ = signature_queues.take(); + if let Some(stop) = forwarder_stop.as_ref() { + stop.store(true, Ordering::Release); + } + if let Some(join) = signature_forwarder.take() + && let Err(err) = join.join() + { + warn!( + "Signature forwarder thread terminated unexpectedly: {:?}", + err + ); + } + + let signatures_collected = shared_counter.load(Ordering::Acquire); + if let Some(handle) = backend_handle { if run_aborted { if let Some(run_id) = backend_run_id.as_ref() { @@ -284,10 +304,19 @@ async fn main() -> Result<()> { info!("Skipping backend finalisation due to user abort"); } // Dropping the handle without calling finish() prevents the backend run from being saved. + } else if signatures_collected == 0 { + let run_id = backend_run_id + .clone() + .unwrap_or_else(|| "unknown".to_string()); + warn!( + run_id = %run_id, + "Skipping backend finalisation: no signatures were forwarded (all endpoints must observe the same transaction)" + ); } else { let run_id = backend_run_id .clone() .unwrap_or_else(|| "unknown".to_string()); + info!(run_id = %run_id, signatures = signatures_collected, "Finalising backend stream"); match handle.finish().await { Ok(result) => { info!(run_id = %run_id, "Backend completed run"); @@ -300,20 +329,6 @@ async fn main() -> Result<()> { } } - let _ = signature_queues.take(); - if let Some(stop) = forwarder_stop.as_ref() { - stop.store(true, Ordering::Release); - } - - if let Some(join) = signature_forwarder - && let Err(err) = join.join() - { - warn!( - "Signature forwarder thread terminated unexpectedly: {:?}", - err - ); - } - if !run_aborted { if let Some(summary) = run_summary.as_ref() { analysis::display_run_summary(summary); diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 0149e11..bc1a4e9 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -21,6 +21,7 @@ pub mod jetstream; pub mod shreder; pub mod shredstream; pub mod thor; +pub mod udp_shredstream; pub mod yellowstone; mod yellowstone_client; @@ -41,6 +42,7 @@ pub fn create_provider(kind: &EndpointKind) -> Box { EndpointKind::Shreder => Box::new(shreder::ShrederProvider), EndpointKind::Shredstream => Box::new(shredstream::ShredstreamProvider), EndpointKind::Jetstream => Box::new(jetstream::JetstreamProvider), + EndpointKind::UdpShredstream => Box::new(udp_shredstream::UdpShredstreamProvider), } } diff --git a/src/providers/udp_shredstream.rs b/src/providers/udp_shredstream.rs new file mode 100644 index 0000000..32142e8 --- /dev/null +++ b/src/providers/udp_shredstream.rs @@ -0,0 +1,327 @@ +use std::{ + collections::HashSet, + error::Error, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use solana_ledger::shred::ReedSolomonCache; +use solana_perf::{ + deduper::Deduper, + packet::PacketBatchRecycler, + recycler::Recycler, +}; +use solana_pubkey::Pubkey; +use solana_sdk::clock::Slot; +use solana_streamer::streamer::{self, StreamerReceiveStats}; +use tokio::task; +use tracing::{info, warn}; + +use crate::{ + config::{Config, Endpoint}, + deshred::{self, ComparableShred, DeshredMetrics, ShredsStateTracker}, + utils::{TransactionData, get_current_timestamp}, +}; + +use super::{ + GeyserProvider, ProviderContext, + common::{TransactionAccumulator, build_signature_envelope, enqueue_signature}, +}; + +// Deduper constants (same as proxy) +const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; +const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB +const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60); + +pub struct UdpShredstreamProvider; + +impl GeyserProvider for UdpShredstreamProvider { + fn process( + &self, + endpoint: Endpoint, + config: Config, + context: ProviderContext, + ) -> task::JoinHandle>> { + task::spawn(async move { + let handle = task::spawn_blocking(move || { + run_udp_shred_receiver(endpoint, config, context) + }); + handle.await? + }) + } +} + +fn run_udp_shred_receiver( + endpoint: Endpoint, + config: Config, + context: ProviderContext, +) -> Result<(), Box> { + let ProviderContext { + shutdown_tx, + mut shutdown_rx, + start_wallclock_secs, + start_instant, + comparator, + signature_tx, + shared_counter, + shared_shutdown, + target_transactions, + total_producers, + progress, + } = context; + + let account_pubkey: Pubkey = config.account.parse()?; + let endpoint_name = endpoint.name.clone(); + + // Parse bind address from endpoint.url (format: "0.0.0.0:20000") + let bind_addr: SocketAddr = endpoint.url.parse() + .map_err(|e| format!("Failed to parse UDP bind address '{}': {}", endpoint.url, e))?; + + info!(endpoint = %endpoint_name, url = %bind_addr, "Connecting"); + + let exit = Arc::new(AtomicBool::new(false)); + let num_threads = std::thread::available_parallelism() + .map(|p| usize::from(p).min(4)) + .unwrap_or(2); + + // Bind UDP sockets with SO_REUSEPORT for multi-threaded reception + let (_port, sockets) = solana_net_utils::multi_bind_in_range_with_config( + bind_addr.ip(), + (bind_addr.port(), bind_addr.port() + 1), + solana_net_utils::SocketConfig::default().reuseport(true), + num_threads, + ) + .map_err(|e| format!("Failed to bind UDP sockets on {}: {}", bind_addr, e))?; + + // Increase receive buffer to 256KB per socket to handle burst shred arrivals + for socket in &sockets { + let sock_ref = socket2::SockRef::from(socket); + if let Err(e) = sock_ref.set_recv_buffer_size(256 * 1024) { + warn!(endpoint = %endpoint_name, error = %e, "Failed to set SO_RCVBUF to 256KB"); + } + } + + info!(endpoint = %endpoint_name, sockets = sockets.len(), port = bind_addr.port(), "Connected"); + + let recycler: PacketBatchRecycler = Recycler::warmed(100, 1024); + let forward_stats = Arc::new(StreamerReceiveStats::new("udp_shred_receiver")); + + // Single channel: all streamer receivers → reconstruction thread (no forwarder threads) + let (packet_tx, packet_rx) = crossbeam_channel::bounded(2048); + + // Start streamer receiver threads (one per socket, all share the same sender) + let mut thread_handles = Vec::new(); + for (i, socket) in sockets.into_iter().enumerate() { + let listen_thread = streamer::receiver( + format!("udpShred{i}"), + Arc::new(socket), + exit.clone(), + packet_tx.clone(), + recycler.clone(), + forward_stats.clone(), + Duration::default(), + false, + None, + false, + ); + thread_handles.push(listen_thread); + } + drop(packet_tx); // drop sender so reconstruction thread can detect disconnection + + // Unified dedup + reconstruction thread: eliminates per-socket forwarder threads + // Single deduper sees ALL packets → better dedup quality, no RwLock overhead + let metrics = Arc::new(DeshredMetrics::default()); + let exit_clone = exit.clone(); + let endpoint_name_recon = endpoint_name.clone(); + + let reconstruct_thread = std::thread::Builder::new() + .name("udpShredRecon".to_string()) + .spawn(move || { + // Inline deduper — no RwLock needed since single-threaded + let mut deduper = Deduper::<2, [u8]>::new( + &mut rand::thread_rng(), + DEDUPER_NUM_BITS, + ); + let mut rng = rand::thread_rng(); + let mut last_dedup_reset = std::time::Instant::now(); + + let mut all_shreds = ahash::HashMap::< + Slot, + ( + ahash::HashMap>, + ShredsStateTracker, + ), + >::default(); + let mut slot_fec_indexes_to_iterate = Vec::<(Slot, u32)>::new(); + let mut deshredded_entries = Vec::<(Slot, Vec)>::new(); + let mut highest_slot_seen: Slot = 0; + let rs_cache = ReedSolomonCache::default(); + + let mut accumulator = TransactionAccumulator::new(); + let mut transaction_count = 0usize; + let mut next_log_time = std::time::Instant::now() + Duration::from_secs(30); + + while !exit_clone.load(Ordering::Relaxed) { + match packet_rx.recv_timeout(Duration::from_millis(100)) { + Ok(mut pkt_batch) => { + // Inline dedup — eliminates channel hop + RwLock from forwarder threads + solana_perf::deduper::dedup_packets_and_count_discards( + &deduper, + std::slice::from_mut(&mut pkt_batch), + ); + + // Periodic dedup reset + if last_dedup_reset.elapsed() >= Duration::from_secs(2) { + deduper.maybe_reset( + &mut rng, + DEDUPER_FALSE_POSITIVE_RATE, + DEDUPER_RESET_CYCLE, + ); + last_dedup_reset = std::time::Instant::now(); + } + + deshred::reconstruct_shreds( + pkt_batch, + &mut all_shreds, + &mut slot_fec_indexes_to_iterate, + &mut deshredded_entries, + &mut highest_slot_seen, + &rs_cache, + &metrics, + ); + + for (_slot, entries) in deshredded_entries.drain(..) { + for entry in entries { + for tx in entry.transactions { + let has_account = tx + .message + .static_account_keys() + .iter() + .any(|key| key == &account_pubkey); + + if !has_account { + continue; + } + + let wallclock = get_current_timestamp(); + let elapsed = start_instant.elapsed(); + let signature = tx.signatures[0].to_string(); + + let tx_data = TransactionData { + wallclock_secs: wallclock, + elapsed_since_start: elapsed, + start_wallclock_secs, + }; + + let updated = accumulator.record( + signature.clone(), + tx_data.clone(), + ); + + if updated { + if let Some(envelope) = build_signature_envelope( + &comparator, + &endpoint_name_recon, + &signature, + tx_data, + total_producers, + ) { + if let Some(target) = target_transactions { + let shared = shared_counter + .fetch_add(1, Ordering::AcqRel) + + 1; + if let Some(tracker) = progress.as_ref() { + tracker.record(shared); + } + if shared >= target + && !shared_shutdown.swap(true, Ordering::AcqRel) + { + info!( + endpoint = %endpoint_name_recon, + target, + "Reached shared signature target; broadcasting shutdown" + ); + let _ = shutdown_tx.send(()); + } + } + + if let Some(sender) = signature_tx.as_ref() { + enqueue_signature(sender, &endpoint_name_recon, &signature, envelope); + } + } + } + + transaction_count += 1; + } + } + } + + // Periodic stats logging + if std::time::Instant::now() >= next_log_time { + let recovered = metrics.recovered_count.load(Ordering::Relaxed); + let entries = metrics.entry_count.load(Ordering::Relaxed); + let txns = metrics.txn_count.load(Ordering::Relaxed); + info!( + endpoint = %endpoint_name_recon, + recovered, + entries, + txns, + matched = transaction_count, + unique = accumulator.len(), + "UDP shreds stats" + ); + next_log_time = std::time::Instant::now() + Duration::from_secs(30); + } + } + Err(crossbeam_channel::RecvTimeoutError::Timeout) => {} + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break, + } + } + + // Flush accumulated signatures to comparator + let unique_signatures = accumulator.len(); + let collected = accumulator.into_inner(); + comparator.add_batch(&endpoint_name_recon, collected); + info!( + endpoint = %endpoint_name_recon, + total_transactions = transaction_count, + unique_signatures, + "Reconstruction thread exiting" + ); + })?; + thread_handles.push(reconstruct_thread); + + // Wait for shutdown signal + loop { + match shutdown_rx.try_recv() { + Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Closed) => { + info!(endpoint = %endpoint_name, "Received shutdown signal"); + break; + } + Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => { + info!(endpoint = %endpoint_name, "Shutdown signal lagged"); + break; + } + Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {} + } + if exit.load(Ordering::Relaxed) { + break; + } + std::thread::sleep(Duration::from_millis(100)); + } + + // Signal all threads to exit + exit.store(true, Ordering::Relaxed); + + // Wait for threads to finish + for handle in thread_handles { + let _ = handle.join(); + } + + info!(endpoint = %endpoint_name, "UDP shred receiver stopped"); + Ok(()) +}