Skip to content

Commit

Permalink
log orphan-related events via the event logger
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsutton committed Dec 18, 2023
1 parent 3a7407a commit 2ced245
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 57 deletions.
201 changes: 151 additions & 50 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use kaspa_p2p_lib::{
use kaspa_utils::iter::IterExtensions;
use kaspa_utils::networking::PeerId;
use parking_lot::{Mutex, RwLock};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::time::Instant;
use std::{collections::hash_map::Entry, fmt::Display};
use std::{
iter::once,
ops::Deref,
Expand Down Expand Up @@ -69,26 +69,35 @@ const MAX_ORPHANS_UPPER_BOUND: usize = 1024;
/// The min time to wait before allowing another parallel request
const REQUEST_SCOPE_WAIT_TIME: Duration = Duration::from_secs(1);

/// Represents a block event to be logged
#[derive(Debug, PartialEq)]
pub enum BlockSource {
Relay,
Submit,
pub enum BlockLogEvent {
/// Accepted block via *relay*
Relay(Hash),
/// Accepted block via *submit block*
Submit(Hash),
/// Orphaned block with x missing roots
Orphaned(Hash, usize),
/// Detected a known orphan with x missing roots
OrphanRoots(Hash, usize),
/// Unorphaned x blocks with hash being a representative
Unorphaned(Hash, usize),
}

pub struct AcceptedBlockLogger {
pub struct BlockEventLogger {
bps: usize,
sender: UnboundedSender<(Hash, BlockSource)>,
receiver: Mutex<Option<UnboundedReceiver<(Hash, BlockSource)>>>,
sender: UnboundedSender<BlockLogEvent>,
receiver: Mutex<Option<UnboundedReceiver<BlockLogEvent>>>,
}

impl AcceptedBlockLogger {
impl BlockEventLogger {
pub fn new(bps: usize) -> Self {
let (sender, receiver) = unbounded_channel();
Self { bps, sender, receiver: Mutex::new(Some(receiver)) }
}

pub fn log(&self, hash: Hash, source: BlockSource) {
self.sender.send((hash, source)).unwrap();
pub fn log(&self, event: BlockLogEvent) {
self.sender.send(event).unwrap();
}

/// Start the logger listener. Must be called from an async tokio context
Expand All @@ -99,21 +108,110 @@ impl AcceptedBlockLogger {
let chunk_stream = UnboundedReceiverStream::new(receiver).chunks_timeout(chunk_limit, Duration::from_secs(1));
tokio::pin!(chunk_stream);
while let Some(chunk) = chunk_stream.next().await {
if let Some((i, h)) =
chunk.iter().filter_map(|(h, s)| if *s == BlockSource::Submit { Some(*h) } else { None }).enumerate().last()
{
let submit = i + 1; // i is the last index so i + 1 is the number of submit blocks
let relay = chunk.len() - submit;
match (submit, relay) {
(1, 0) => info!("Accepted block {} via submit block", h),
(n, 0) => info!("Accepted {} blocks ...{} via submit block", n, h),
(n, m) => info!("Accepted {} blocks ...{}, {} via relay and {} via submit block", n + m, h, m, n),
#[derive(Default)]
struct LogSummary {
// Representative
relay_rep: Option<Hash>,
submit_rep: Option<Hash>,
orphan_rep: Option<Hash>,
unorphan_rep: Option<Hash>,
// Counts
relay_count: usize,
submit_count: usize,
orphan_count: usize,
unorphan_count: usize,
orphan_roots_count: usize,
}

struct LogHash {
op: Option<Hash>,
}

impl From<Option<Hash>> for LogHash {
fn from(op: Option<Hash>) -> Self {
Self { op }
}
} else {
let h = chunk.last().expect("chunk is never empty").0;
match chunk.len() {
1 => info!("Accepted block {} via relay", h),
n => info!("Accepted {} blocks ...{} via relay", n, h),
}

impl Display for LogHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(hash) = self.op {
hash.fmt(f)
} else {
Ok(())
}
}
}

impl LogSummary {
fn relay(&self) -> LogHash {
self.relay_rep.into()
}

fn submit(&self) -> LogHash {
self.submit_rep.into()
}

fn orphan(&self) -> LogHash {
self.orphan_rep.into()
}

fn unorphan(&self) -> LogHash {
self.unorphan_rep.into()
}
}

let summary = chunk.into_iter().fold(LogSummary::default(), |mut summary, ev| {
match ev {
BlockLogEvent::Relay(hash) => {
summary.relay_count += 1;
summary.relay_rep = Some(hash)
}
BlockLogEvent::Submit(hash) => {
summary.submit_count += 1;
summary.submit_rep = Some(hash)
}
BlockLogEvent::Orphaned(hash, roots_count) => {
summary.orphan_roots_count += roots_count;
summary.orphan_count += 1;
summary.orphan_rep = Some(hash)
}
BlockLogEvent::OrphanRoots(_, roots_count) => {
summary.orphan_roots_count += roots_count;
}
BlockLogEvent::Unorphaned(hash, count) => {
summary.unorphan_count += count;
summary.unorphan_rep = Some(hash)
}
}
summary
});

match (summary.submit_count, summary.relay_count) {
(0, 0) => {}
(1, 0) => info!("Accepted block {} via submit block", summary.submit()),
(n, 0) => info!("Accepted {} blocks ...{} via submit block", n, summary.submit()),
(0, 1) => info!("Accepted block {} via relay", summary.relay()),
(0, m) => info!("Accepted {} blocks ...{} via relay", m, summary.relay()),
(n, m) => {
info!("Accepted {} blocks ...{}, {} via relay and {} via submit block", n + m, summary.submit(), m, n)
}
}

match (summary.unorphan_count, summary.orphan_count, summary.orphan_roots_count) {
(0, 0, 0) => {}
(1, 0, 0) => info!("Unorphaned block {}", summary.unorphan()),
(n, 0, 0) => info!("Unorphaned {} block(s) ...{}", n, summary.unorphan()),
(0, m, l) => info!("Orphaned {} block(s) ...{} and queued {} missing roots", m, summary.orphan(), l),
(n, m, l) => {
info!(
"Unorphaned {} block(s) ...{}, orphaned {} block(s) ...{} and queued {} missing roots",
n,
summary.unorphan(),
m,
summary.orphan(),
l
)
}
}
}
Expand All @@ -139,7 +237,7 @@ pub struct FlowContextInner {
notification_root: Arc<ConsensusNotificationRoot>,

// Special sampling logger used only for high-bps networks where logs must be throttled
accepted_block_logger: Option<AcceptedBlockLogger>,
block_event_logger: Option<BlockEventLogger>,

// Orphan parameters
orphan_resolution_range: u32,
Expand Down Expand Up @@ -240,7 +338,7 @@ impl FlowContext {
mining_manager,
tick_service,
notification_root,
accepted_block_logger: if config.bps() > 1 { Some(AcceptedBlockLogger::new(config.bps() as usize)) } else { None },
block_event_logger: if config.bps() > 1 { Some(BlockEventLogger::new(config.bps() as usize)) } else { None },
orphan_resolution_range,
max_orphans,
config,
Expand All @@ -261,7 +359,7 @@ impl FlowContext {
}

pub fn start_async_services(&self) {
if let Some(logger) = self.accepted_block_logger.as_ref() {
if let Some(logger) = self.block_event_logger.as_ref() {
logger.start();
}
}
Expand Down Expand Up @@ -352,11 +450,7 @@ impl FlowContext {
}

pub async fn add_orphan(&self, consensus: &ConsensusProxy, orphan_block: Block) -> Option<OrphanOutput> {
if self.is_log_throttled() {
debug!("Received a block with missing parents, adding to orphan pool: {}", orphan_block.hash());
} else {
info!("Received a block with missing parents, adding to orphan pool: {}", orphan_block.hash());
}
self.log_block_event(BlockLogEvent::Orphaned(orphan_block.hash(), 0));
self.orphans_pool.write().await.add_orphan(consensus, orphan_block).await
}

Expand All @@ -380,13 +474,17 @@ impl FlowContext {
Err(e) => warn!("Validation failed for orphan block {}: {}", block.hash(), e),
}
}
match unorphaned_blocks.len() {
0 => {}
1 => info!("Unorphaned block {}", unorphaned_blocks[0].0.hash()),
n => match self.is_log_throttled() {
true => info!("Unorphaned {} blocks ...{}", n, unorphaned_blocks.last().unwrap().0.hash()),
false => info!("Unorphaned {} blocks: {}", n, unorphaned_blocks.iter().map(|b| b.0.hash()).reusable_format(", ")),
},

// Log or send to event logger
if !unorphaned_blocks.is_empty() {
if let Some(logger) = self.block_event_logger.as_ref() {
logger.log(BlockLogEvent::Unorphaned(unorphaned_blocks[0].0.hash(), unorphaned_blocks.len()));
} else {
match unorphaned_blocks.len() {
1 => info!("Unorphaned block {}", unorphaned_blocks[0].0.hash()),
n => info!("Unorphaned {} blocks: {}", n, unorphaned_blocks.iter().map(|b| b.0.hash()).reusable_format(", ")),
}
}
}
unorphaned_blocks
}
Expand All @@ -410,26 +508,29 @@ impl FlowContext {
self.hub.broadcast(make_message!(Payload::InvRelayBlock, InvRelayBlockMessage { hash: Some(hash.into()) })).await;

self.on_new_block(consensus, block, virtual_state_task).await;
self.log_block_acceptance(hash, BlockSource::Submit);
self.log_block_event(BlockLogEvent::Submit(hash));

Ok(())
}

pub fn log_block_acceptance(&self, hash: Hash, source: BlockSource) {
if let Some(logger) = self.accepted_block_logger.as_ref() {
logger.log(hash, source)
pub fn log_block_event(&self, event: BlockLogEvent) {
if let Some(logger) = self.block_event_logger.as_ref() {
logger.log(event)
} else {
match source {
BlockSource::Relay => info!("Accepted block {} via relay", hash),
BlockSource::Submit => info!("Accepted block {} via submit block", hash),
match event {
BlockLogEvent::Relay(hash) => info!("Accepted block {} via relay", hash),
BlockLogEvent::Submit(hash) => info!("Accepted block {} via submit block", hash),
BlockLogEvent::Orphaned(orphan, _) => {
info!("Received a block with missing parents, adding to orphan pool: {}", orphan)
}
BlockLogEvent::OrphanRoots(orphan, roots_count) => {
info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots_count)
}
_ => {}
}
}
}

pub fn is_log_throttled(&self) -> bool {
self.accepted_block_logger.is_some()
}

/// Updates the mempool after a new block arrival, relays newly unorphaned transactions
/// and possibly rebroadcast manually added transactions when not in IBD.
///
Expand Down
10 changes: 3 additions & 7 deletions protocol/flows/src/v5/blockrelay/flow.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
flow_context::{BlockSource, FlowContext, RequestScope},
flow_context::{BlockLogEvent, FlowContext, RequestScope},
flow_trait::Flow,
flowcontext::orphans::OrphanOutput,
};
Expand Down Expand Up @@ -188,17 +188,13 @@ impl HandleRelayInvsFlow {
let ctx = self.ctx.clone();
tokio::spawn(async move {
ctx.on_new_block(&session, block, virtual_state_task).await;
ctx.log_block_acceptance(inv.hash, BlockSource::Relay);
ctx.log_block_event(BlockLogEvent::Relay(inv.hash));
});
}
}

fn enqueue_orphan_roots(&mut self, orphan: Hash, roots: Vec<Hash>, known_within_range: bool) {
if self.ctx.is_log_throttled() {
debug!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len());
} else {
info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots.len());
}
self.ctx.log_block_event(BlockLogEvent::OrphanRoots(orphan, roots.len()));
self.invs_route.enqueue_indirect_invs(roots, known_within_range)
}

Expand Down

0 comments on commit 2ced245

Please sign in to comment.