Skip to content

Commit

Permalink
add sse logging
Browse files Browse the repository at this point in the history
  • Loading branch information
ThreeHrSleep committed Oct 14, 2024
1 parent 3314eba commit 63002ff
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 99 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use execution_layer::ExecutionLayer;
use futures::channel::mpsc::Receiver;
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
use logging::SSE_LOGGING_COMPONENTS;
use monitoring_api::{MonitoringHttpClient, ProcessType};
use network::{NetworkConfig, NetworkSenders, NetworkService};
use slasher::Slasher;
Expand Down Expand Up @@ -541,7 +542,7 @@ where
beacon_processor_send: None,
beacon_processor_reprocess_send: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
// sse_logging_components: runtime_context.sse_logging_components.clone(),
sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(),
});

// Discard the error from the oneshot.
Expand Down Expand Up @@ -773,7 +774,7 @@ where
beacon_processor_reprocess_send: Some(
beacon_processor_channels.work_reprocessing_tx.clone(),
),
// sse_logging_components: runtime_context.sse_logging_components.clone(),
sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(),
// log: log.clone(),
});

Expand Down
2 changes: 2 additions & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ store = { workspace = true }
bytes = { workspace = true }
beacon_processor = { workspace = true }
rand = { workspace = true }
serde_json = { workspace = true }
environment = { workspace = true }

[dev-dependencies]
serde_json = { workspace = true }
Expand Down
95 changes: 43 additions & 52 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub use block_id::BlockId;
use builder_states::get_next_withdrawals;
use bytes::Bytes;
use directory::DEFAULT_ROOT_DIR;
use environment::SSE_LOG_CHANNEL_SIZE;
use environment::{EnvironmentBuilder, LoggerConfig};
use eth2::types::{
self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode,
LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId,
Expand All @@ -54,13 +56,15 @@ use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, Pubsu
use lighthouse_version::version_with_platform;
use logging::crit;
use logging::SSELoggingComponents;
use logging::SSE_LOGGING_COMPONENTS;
use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage};
use operation_pool::ReceivedPreCapella;
use parking_lot::RwLock;
pub use publish_blocks::{
publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock,
};
use serde::{Deserialize, Serialize};

use slot_clock::SlotClock;
use ssz::Encode;
pub use state_id::StateId;
Expand Down Expand Up @@ -135,7 +139,7 @@ pub struct Context<T: BeaconChainTypes> {
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub beacon_processor_reprocess_send: Option<Sender<ReprocessQueueMessage>>,
pub eth1_service: Option<eth1::Service>,
// pub sse_logging_components: Option<SSELoggingComponents>,
pub sse_logging_components: Option<SSELoggingComponents>,
}

/// Configuration for the HTTP server.
Expand Down Expand Up @@ -486,13 +490,6 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// Create a `warp` filter that provides access to the logger.
let inner_ctx = ctx.clone();
// let log_filter = warp::any().map(move || inner_ctx.log.clone());

// let inner_components = ctx.sse_logging_components.clone();
// let sse_component_filter = warp::any().map(move || inner_components.clone());

// Create a `warp` filter that provides access to local system information.
let system_info = Arc::new(RwLock::new(sysinfo::System::new()));
{
Expand Down Expand Up @@ -4526,49 +4523,43 @@ pub fn serve<T: BeaconChainTypes>(

// Subscribe to logs via Server Side Events
// /lighthouse/logs
// let lighthouse_log_events = warp::path("lighthouse")
// .and(warp::path("logs"))
// .and(warp::path::end())
// .and(task_spawner_filter)
// // .and(sse_component_filter)
// .then(
// |task_spawner: TaskSpawner<T::EthSpec>| {
// task_spawner.blocking_response_task(Priority::P1, move || {
// if let Some(logging_components) = sse_component {
// // Build a JSON stream
// let s = BroadcastStream::new(logging_components.sender.subscribe()).map(
// |msg| {
// match msg {
// Ok(data) => {
// // Serialize to json
// match data.to_json_string() {
// // Send the json as a Server Side Event
// Ok(json) => Ok(Event::default().data(json)),
// Err(e) => {
// Err(warp_utils::reject::server_sent_event_error(
// format!("Unable to serialize to JSON {}", e),
// ))
// }
// }
// }
// Err(e) => Err(warp_utils::reject::server_sent_event_error(
// format!("Unable to receive event {}", e),
// )),
// }
// },
// );

// Ok::<_, warp::Rejection>(warp::sse::reply(
// warp::sse::keep_alive().stream(s),
// ))
// } else {
// Err(warp_utils::reject::custom_server_error(
// "SSE Logging is not enabled".to_string(),
// ))
// }
// })
// },
// );
let lighthouse_log_events = warp::path("lighthouse")
.and(warp::path("logs"))
.and(warp::path::end())
.and(task_spawner_filter)
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
task_spawner.blocking_response_task(Priority::P1, move || {
if let Some(logging_components) = SSE_LOGGING_COMPONENTS.lock().unwrap().as_ref() {
// Build a JSON stream
let s =
BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| {
match msg {
Ok(data) => {
// Serialize to json
match serde_json::to_string(&data)
.map_err(|e| format!("{:?}", e))
{
// Send the json as a Server Side Event
Ok(json) => Ok(Event::default().data(json)),
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to serialize to JSON {}", e),
)),
}
}
Err(e) => Err(warp_utils::reject::server_sent_event_error(
format!("Unable to receive event {}", e),
)),
}
});

Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
} else {
Err(warp_utils::reject::custom_server_error(
"SSE Logging is not enabled".to_string(),
))
}
})
});

// Define the ultimate set of routes that will be provided to the server.
// Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`).
Expand Down Expand Up @@ -4654,7 +4645,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_merge_readiness)
.uor(get_events)
.uor(get_expected_withdrawals)
// .uor(lighthouse_log_events.boxed())
.uor(lighthouse_log_events.boxed())
.recover(warp_utils::reject::handle_rejection),
)
.boxed()
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
beacon_processor_send: Some(beacon_processor_send),
beacon_processor_reprocess_send: Some(reprocess_send),
eth1_service: Some(eth1_service),
// sse_logging_components: None,
sse_logging_components: None,
// log,
});

Expand Down
1 change: 1 addition & 0 deletions common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ tracing-core = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-appender = { workspace = true }
once_cell = "1.17.1"
1 change: 1 addition & 0 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod tracing_logging_layer;
mod tracing_metrics_layer;

pub use sse_logging_components::SSELoggingComponents;
pub use sse_logging_components::SSE_LOGGING_COMPONENTS;
pub use tracing_metrics_layer::MetricsLayer;

/// The minimum interval between log messages indicating that a queue is full.
Expand Down
86 changes: 69 additions & 17 deletions common/logging/src/sse_logging_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,97 @@
//! there are subscribers to a HTTP SSE stream.

use crate::async_record::AsyncRecord;
use once_cell::sync::Lazy;
use serde_json;
use serde_json::json;
use serde_json::Value;
use slog::{Drain, OwnedKVList, Record};
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::broadcast::Sender;
use tracing::field::{Field, Visit};
use tracing::{Event, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

/// Default log level for SSE Events.
// NOTE: Made this a constant. Debug level seems to be pretty intense. Can make this
// configurable later if needed.
const LOG_LEVEL: slog::Level = slog::Level::Info;

const LOG_LEVEL: tracing::Level = tracing::Level::INFO;
pub static SSE_LOGGING_COMPONENTS: Lazy<Mutex<Option<SSELoggingComponents>>> =
Lazy::new(|| Mutex::new(None));
/// The components required in the HTTP API task to receive logged events.
#[derive(Clone)]
pub struct SSELoggingComponents {
/// The channel to receive events from.
pub sender: Arc<AssertUnwindSafe<Sender<AsyncRecord>>>,
pub sender: Arc<Sender<Arc<Value>>>,
}

impl SSELoggingComponents {
/// Create a new SSE drain.
pub fn new(channel_size: usize) -> Self {
let (sender, _receiver) = tokio::sync::broadcast::channel(channel_size);

let sender = Arc::new(AssertUnwindSafe(sender));
SSELoggingComponents { sender }
SSELoggingComponents {
sender: Arc::new(sender),
}
}
}

impl Drain for SSELoggingComponents {
type Ok = ();
type Err = &'static str;

fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
if record.level().is_at_least(LOG_LEVEL) {
// Attempt to send the logs
match self.sender.send(AsyncRecord::from(record, logger_values)) {
Ok(_num_sent) => {} // Everything got sent
Err(_err) => {} // There are no subscribers, do nothing
}
impl<S: Subscriber> Layer<S> for SSELoggingComponents {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if *event.metadata().level() > LOG_LEVEL {
return;
}
Ok(())

let mut visitor = TracingEventVisitor::new();
event.record(&mut visitor);
let log_entry = visitor.finish(event.metadata());

let _ = self.sender.send(Arc::new(log_entry));
}
}
struct TracingEventVisitor {
fields: serde_json::Map<String, Value>,
}

impl TracingEventVisitor {
fn new() -> Self {
TracingEventVisitor {
fields: serde_json::Map::new(),
}
}

fn finish(self, metadata: &tracing::Metadata<'_>) -> Value {
let mut log_entry = serde_json::Map::new();
log_entry.insert("time".to_string(), json!(chrono::Local::now().to_rfc3339()));
log_entry.insert("level".to_string(), json!(metadata.level().to_string()));
log_entry.insert("target".to_string(), json!(metadata.target()));
log_entry.insert("fields".to_string(), Value::Object(self.fields));
Value::Object(log_entry)
}
}

impl Visit for TracingEventVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name().to_string(), json!(format!("{:?}", value)));
}

fn record_str(&mut self, field: &Field, value: &str) {
self.fields.insert(field.name().to_string(), json!(value));
}

fn record_i64(&mut self, field: &Field, value: i64) {
self.fields.insert(field.name().to_string(), json!(value));
}

fn record_u64(&mut self, field: &Field, value: u64) {
self.fields.insert(field.name().to_string(), json!(value));
}

fn record_bool(&mut self, field: &Field, value: bool) {
self.fields.insert(field.name().to_string(), json!(value));
}
}
Loading

0 comments on commit 63002ff

Please sign in to comment.