diff --git a/Cargo.lock b/Cargo.lock index 960170f721..0c968d05fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3881,6 +3881,7 @@ dependencies = [ "bs58 0.4.0", "bytes", "directory", + "environment", "eth1", "eth2", "ethereum_serde_utils", @@ -5221,6 +5222,7 @@ version = "0.2.0" dependencies = [ "chrono", "lighthouse_metrics", + "once_cell", "parking_lot 0.12.3", "serde", "serde_json", diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index df133ce397..89cd935bd4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -32,6 +32,7 @@ use execution_layer::ExecutionLayer; use futures::channel::mpsc::Receiver; use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH}; use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals}; +use logging::SSE_LOGGING_COMPONENTS; use monitoring_api::{MonitoringHttpClient, ProcessType}; use network::{NetworkConfig, NetworkSenders, NetworkService}; use slasher::Slasher; @@ -541,7 +542,7 @@ where beacon_processor_send: None, beacon_processor_reprocess_send: None, eth1_service: Some(genesis_service.eth1_service.clone()), - // sse_logging_components: runtime_context.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), }); // Discard the error from the oneshot. @@ -773,7 +774,7 @@ where beacon_processor_reprocess_send: Some( beacon_processor_channels.work_reprocessing_tx.clone(), ), - // sse_logging_components: runtime_context.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), // log: log.clone(), }); diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 1d5764b7b1..7bf0d60d3d 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -44,6 +44,8 @@ store = { workspace = true } bytes = { workspace = true } beacon_processor = { workspace = true } rand = { workspace = true } +serde_json = { workspace = true } +environment = { workspace = true } [dev-dependencies] serde_json = { workspace = true } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 077caf9df6..064729031b 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -44,6 +44,8 @@ pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; +use environment::SSE_LOG_CHANNEL_SIZE; +use environment::{EnvironmentBuilder, LoggerConfig}; use eth2::types::{ self as api_types, BroadcastValidation, EndpointVersion, ForkChoice, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, @@ -54,6 +56,7 @@ use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, Pubsu use lighthouse_version::version_with_platform; use logging::crit; use logging::SSELoggingComponents; +use logging::SSE_LOGGING_COMPONENTS; use network::{NetworkMessage, NetworkSenders, ValidatorSubscriptionMessage}; use operation_pool::ReceivedPreCapella; use parking_lot::RwLock; @@ -61,6 +64,7 @@ pub use publish_blocks::{ publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock, }; use serde::{Deserialize, Serialize}; + use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; @@ -135,7 +139,7 @@ pub struct Context { pub beacon_processor_send: Option>, pub beacon_processor_reprocess_send: Option>, pub eth1_service: Option, - // pub sse_logging_components: Option, + pub sse_logging_components: Option, } /// Configuration for the HTTP server. @@ -486,13 +490,6 @@ pub fn serve( }, ); - // Create a `warp` filter that provides access to the logger. - let inner_ctx = ctx.clone(); - // let log_filter = warp::any().map(move || inner_ctx.log.clone()); - - // let inner_components = ctx.sse_logging_components.clone(); - // let sse_component_filter = warp::any().map(move || inner_components.clone()); - // Create a `warp` filter that provides access to local system information. let system_info = Arc::new(RwLock::new(sysinfo::System::new())); { @@ -4526,49 +4523,43 @@ pub fn serve( // Subscribe to logs via Server Side Events // /lighthouse/logs - // let lighthouse_log_events = warp::path("lighthouse") - // .and(warp::path("logs")) - // .and(warp::path::end()) - // .and(task_spawner_filter) - // // .and(sse_component_filter) - // .then( - // |task_spawner: TaskSpawner| { - // task_spawner.blocking_response_task(Priority::P1, move || { - // if let Some(logging_components) = sse_component { - // // Build a JSON stream - // let s = BroadcastStream::new(logging_components.sender.subscribe()).map( - // |msg| { - // match msg { - // Ok(data) => { - // // Serialize to json - // match data.to_json_string() { - // // Send the json as a Server Side Event - // Ok(json) => Ok(Event::default().data(json)), - // Err(e) => { - // Err(warp_utils::reject::server_sent_event_error( - // format!("Unable to serialize to JSON {}", e), - // )) - // } - // } - // } - // Err(e) => Err(warp_utils::reject::server_sent_event_error( - // format!("Unable to receive event {}", e), - // )), - // } - // }, - // ); - - // Ok::<_, warp::Rejection>(warp::sse::reply( - // warp::sse::keep_alive().stream(s), - // )) - // } else { - // Err(warp_utils::reject::custom_server_error( - // "SSE Logging is not enabled".to_string(), - // )) - // } - // }) - // }, - // ); + let lighthouse_log_events = warp::path("lighthouse") + .and(warp::path("logs")) + .and(warp::path::end()) + .and(task_spawner_filter) + .then(|task_spawner: TaskSpawner| { + task_spawner.blocking_response_task(Priority::P1, move || { + if let Some(logging_components) = SSE_LOGGING_COMPONENTS.lock().unwrap().as_ref() { + // Build a JSON stream + let s = + BroadcastStream::new(logging_components.sender.subscribe()).map(|msg| { + match msg { + Ok(data) => { + // Serialize to json + match serde_json::to_string(&data) + .map_err(|e| format!("{:?}", e)) + { + // Send the json as a Server Side Event + Ok(json) => Ok(Event::default().data(json)), + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to serialize to JSON {}", e), + )), + } + } + Err(e) => Err(warp_utils::reject::server_sent_event_error( + format!("Unable to receive event {}", e), + )), + } + }); + + Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) + } else { + Err(warp_utils::reject::custom_server_error( + "SSE Logging is not enabled".to_string(), + )) + } + }) + }); // Define the ultimate set of routes that will be provided to the server. // Use `uor` rather than `or` in order to simplify types (see `UnifyingOrFilter`). @@ -4654,7 +4645,7 @@ pub fn serve( .uor(get_lighthouse_merge_readiness) .uor(get_events) .uor(get_expected_withdrawals) - // .uor(lighthouse_log_events.boxed()) + .uor(lighthouse_log_events.boxed()) .recover(warp_utils::reject::handle_rejection), ) .boxed() diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 4d0419686b..bba3801d64 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -235,7 +235,7 @@ pub async fn create_api_server_with_config( beacon_processor_send: Some(beacon_processor_send), beacon_processor_reprocess_send: Some(reprocess_send), eth1_service: Some(eth1_service), - // sse_logging_components: None, + sse_logging_components: None, // log, }); diff --git a/common/logging/Cargo.toml b/common/logging/Cargo.toml index cac6d073f2..6d8652a470 100644 --- a/common/logging/Cargo.toml +++ b/common/logging/Cargo.toml @@ -23,3 +23,4 @@ tracing-core = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } tracing-appender = { workspace = true } +once_cell = "1.17.1" diff --git a/common/logging/src/lib.rs b/common/logging/src/lib.rs index 9aaa77655e..2215de1b58 100644 --- a/common/logging/src/lib.rs +++ b/common/logging/src/lib.rs @@ -19,6 +19,7 @@ pub mod tracing_logging_layer; mod tracing_metrics_layer; pub use sse_logging_components::SSELoggingComponents; +pub use sse_logging_components::SSE_LOGGING_COMPONENTS; pub use tracing_metrics_layer::MetricsLayer; /// The minimum interval between log messages indicating that a queue is full. diff --git a/common/logging/src/sse_logging_components.rs b/common/logging/src/sse_logging_components.rs index 244d09fbd1..352188f4b8 100644 --- a/common/logging/src/sse_logging_components.rs +++ b/common/logging/src/sse_logging_components.rs @@ -2,21 +2,31 @@ //! there are subscribers to a HTTP SSE stream. use crate::async_record::AsyncRecord; +use once_cell::sync::Lazy; +use serde_json; +use serde_json::json; +use serde_json::Value; use slog::{Drain, OwnedKVList, Record}; use std::panic::AssertUnwindSafe; use std::sync::Arc; +use std::sync::Mutex; use tokio::sync::broadcast::Sender; +use tracing::field::{Field, Visit}; +use tracing::{Event, Subscriber}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::Layer; /// Default log level for SSE Events. // NOTE: Made this a constant. Debug level seems to be pretty intense. Can make this // configurable later if needed. -const LOG_LEVEL: slog::Level = slog::Level::Info; - +const LOG_LEVEL: tracing::Level = tracing::Level::INFO; +pub static SSE_LOGGING_COMPONENTS: Lazy>> = + Lazy::new(|| Mutex::new(None)); /// The components required in the HTTP API task to receive logged events. #[derive(Clone)] pub struct SSELoggingComponents { /// The channel to receive events from. - pub sender: Arc>>, + pub sender: Arc>>, } impl SSELoggingComponents { @@ -24,23 +34,65 @@ impl SSELoggingComponents { pub fn new(channel_size: usize) -> Self { let (sender, _receiver) = tokio::sync::broadcast::channel(channel_size); - let sender = Arc::new(AssertUnwindSafe(sender)); - SSELoggingComponents { sender } + SSELoggingComponents { + sender: Arc::new(sender), + } } } -impl Drain for SSELoggingComponents { - type Ok = (); - type Err = &'static str; - - fn log(&self, record: &Record, logger_values: &OwnedKVList) -> Result { - if record.level().is_at_least(LOG_LEVEL) { - // Attempt to send the logs - match self.sender.send(AsyncRecord::from(record, logger_values)) { - Ok(_num_sent) => {} // Everything got sent - Err(_err) => {} // There are no subscribers, do nothing - } +impl Layer for SSELoggingComponents { + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + if *event.metadata().level() > LOG_LEVEL { + return; } - Ok(()) + + let mut visitor = TracingEventVisitor::new(); + event.record(&mut visitor); + let log_entry = visitor.finish(event.metadata()); + + let _ = self.sender.send(Arc::new(log_entry)); + } +} +struct TracingEventVisitor { + fields: serde_json::Map, +} + +impl TracingEventVisitor { + fn new() -> Self { + TracingEventVisitor { + fields: serde_json::Map::new(), + } + } + + fn finish(self, metadata: &tracing::Metadata<'_>) -> Value { + let mut log_entry = serde_json::Map::new(); + log_entry.insert("time".to_string(), json!(chrono::Local::now().to_rfc3339())); + log_entry.insert("level".to_string(), json!(metadata.level().to_string())); + log_entry.insert("target".to_string(), json!(metadata.target())); + log_entry.insert("fields".to_string(), Value::Object(self.fields)); + Value::Object(log_entry) + } +} + +impl Visit for TracingEventVisitor { + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + self.fields + .insert(field.name().to_string(), json!(format!("{:?}", value))); + } + + fn record_str(&mut self, field: &Field, value: &str) { + self.fields.insert(field.name().to_string(), json!(value)); + } + + fn record_i64(&mut self, field: &Field, value: i64) { + self.fields.insert(field.name().to_string(), json!(value)); + } + + fn record_u64(&mut self, field: &Field, value: u64) { + self.fields.insert(field.name().to_string(), json!(value)); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.fields.insert(field.name().to_string(), json!(value)); } } diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 80035808ef..d989c9c3a0 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -11,9 +11,9 @@ use eth2_config::Eth2Config; use eth2_network_config::Eth2NetworkConfig; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; - -// use logging::{SELoggingComponents}; use logging::tracing_logging_layer::LoggingLayer; +use logging::SSELoggingComponents; +use logging::SSE_LOGGING_COMPONENTS; use serde::{Deserialize, Serialize}; use std::io::{Result as IOResult, Write}; use std::path::PathBuf; @@ -25,7 +25,6 @@ use tracing_appender::non_blocking::NonBlocking; use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec}; - #[cfg(target_family = "unix")] use { futures::Future, @@ -37,7 +36,7 @@ use { use {futures::channel::oneshot, std::cell::RefCell}; const LOG_CHANNEL_SIZE: usize = 16384; -const SSE_LOG_CHANNEL_SIZE: usize = 2048; +pub const SSE_LOG_CHANNEL_SIZE: usize = 2048; /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -90,7 +89,7 @@ pub struct RuntimeContext { pub eth_spec_instance: E, pub eth2_config: Eth2Config, pub eth2_network_config: Option>, - // pub sse_logging_components: Option, + pub sse_logging_components: Option, } impl RuntimeContext { @@ -103,7 +102,7 @@ impl RuntimeContext { eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), eth2_network_config: self.eth2_network_config.clone(), - // sse_logging_components: self.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), } } @@ -117,7 +116,7 @@ impl RuntimeContext { pub struct EnvironmentBuilder { runtime: Option>, // log: Option, - // sse_logging_components: Option, + sse_logging_components: Option, eth_spec_instance: E, eth2_config: Eth2Config, eth2_network_config: Option, @@ -129,7 +128,7 @@ impl EnvironmentBuilder { Self { runtime: None, // log: None, - // sse_logging_components: None, + sse_logging_components: None, eth_spec_instance: MinimalEthSpec, eth2_config: Eth2Config::minimal(), eth2_network_config: None, @@ -143,7 +142,7 @@ impl EnvironmentBuilder { Self { runtime: None, // log: None, - // sse_logging_components: None, + sse_logging_components: None, eth_spec_instance: MainnetEthSpec, eth2_config: Eth2Config::mainnet(), eth2_network_config: None, @@ -157,7 +156,7 @@ impl EnvironmentBuilder { Self { runtime: None, // log: None, - // sse_logging_components: None, + sse_logging_components: None, eth_spec_instance: GnosisEthSpec, eth2_config: Eth2Config::gnosis(), eth2_network_config: None, @@ -189,12 +188,6 @@ impl EnvironmentBuilder { Ok(()) } - /// Initializes the logger using the specified configuration. - /// The logger is "async" because it has a dedicated thread that accepts logs and then - /// asynchronously flushes them to stdout/files/etc. This means the thread that raised the log - /// does not have to wait for the logs to be flushed. - /// The logger can be duplicated and more detailed logs can be output to `logfile`. - /// Note that background file logging will spawn a new thread. pub fn init_tracing(mut self, config: LoggerConfig) -> (Self, LoggingLayer, LoggingLayer) { let mut log_path = config.path.unwrap(); @@ -210,7 +203,7 @@ impl EnvironmentBuilder { } let Ok(file_appender) = RollingFileAppender::builder() .rotation(Rotation::DAILY) - .max_log_files(2) + .max_log_files(config.max_log_number) .filename_prefix("beacon") .filename_suffix("log") .build(path.clone()) @@ -233,9 +226,25 @@ impl EnvironmentBuilder { guard: stdout_guard, }; + if config.sse_logging { + let mut global_sse_logging_component = SSE_LOGGING_COMPONENTS.lock().unwrap(); + + if global_sse_logging_component.is_none() { + *global_sse_logging_component = + Some(SSELoggingComponents::new(SSE_LOG_CHANNEL_SIZE)); + } + } + (self, file_logging_layer, stdout_logging_layer) } /* + /// Initializes the logger using the specified configuration. + /// The logger is "async" because it has a dedicated thread that accepts logs and then + /// asynchronously flushes them to stdout/files/etc. This means the thread that raised the log + /// does not have to wait for the logs to be flushed. + /// The logger can be duplicated and more detailed logs can be output to `logfile`. + /// Note that background file logging will spawn a new thread. + pub fn initialize_logger(mut self, config: LoggerConfig) -> Result { // Setting up the initial logger format and build it. let stdout_drain = if let Some(ref format) = config.log_format { @@ -383,7 +392,7 @@ impl EnvironmentBuilder { signal: Some(signal), exit, // log: self.log.ok_or("Cannot build environment without log")?, - // sse_logging_components: self.sse_logging_components, + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), eth_spec_instance: self.eth_spec_instance, eth2_config: self.eth2_config, eth2_network_config: self.eth2_network_config.map(Arc::new), @@ -402,7 +411,7 @@ pub struct Environment { signal: Option>, exit: async_channel::Receiver<()>, // log: Logger, - // sse_logging_components: Option, + sse_logging_components: Option, eth_spec_instance: E, pub eth2_config: Eth2Config, pub eth2_network_config: Option>, @@ -429,7 +438,7 @@ impl Environment { eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), eth2_network_config: self.eth2_network_config.clone(), - // sse_logging_components: self.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), } } @@ -445,7 +454,7 @@ impl Environment { eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), eth2_network_config: self.eth2_network_config.clone(), - // sse_logging_components: self.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), } } diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 5607244172..66aa7a7e0f 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -17,13 +17,12 @@ use ethereum_hashing::have_sha_extensions; use futures::TryFutureExt; use lighthouse_version::VERSION; use logging::crit; -use logging::tracing_logging_layer::LoggingLayer; +use logging::SSE_LOGGING_COMPONENTS; use malloc_utils::configure_memory_allocator; use std::backtrace::Backtrace; use std::path::PathBuf; use std::process::exit; use std::sync::LazyLock; -use std::sync::{Arc, Mutex}; use task_executor::ShutdownReason; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::EnvFilter; @@ -644,6 +643,7 @@ fn run( .with(discv5_layer) .with(file_logging_layer) .with(stdout_logging_layer) + .with(SSE_LOGGING_COMPONENTS.lock().unwrap().clone().unwrap()) .try_init() { eprintln!("Failed to initialize dependency logging: {e}"); diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 62588ff9a0..2a2fa45862 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -83,7 +83,7 @@ pub struct Context { pub spec: Arc, pub config: Config, // pub log: Logger, - // pub sse_logging_components: Option, + pub sse_logging_components: Option, pub slot_clock: T, pub _phantom: PhantomData, } diff --git a/validator_client/src/http_api/test_utils.rs b/validator_client/src/http_api/test_utils.rs index a760c630da..31423c1ead 100644 --- a/validator_client/src/http_api/test_utils.rs +++ b/validator_client/src/http_api/test_utils.rs @@ -134,7 +134,7 @@ impl ApiTester { spec, config: http_config, // log, - // sse_logging_components: None, + sse_logging_components: None, slot_clock, _phantom: PhantomData, }); diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 980dc085f6..b7db963f17 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -44,6 +44,7 @@ use duties_service::{sync::SyncDutiesMap, DutiesService}; use environment::RuntimeContext; use eth2::{reqwest::ClientBuilder, types::Graffiti, BeaconNodeHttpClient, StatusCode, Timeouts}; use http_api::ApiSecret; +use logging::SSE_LOGGING_COMPONENTS; use notifier::spawn_notifier; use parking_lot::RwLock; use preparation_service::{PreparationService, PreparationServiceBuilder}; @@ -65,7 +66,6 @@ use tokio::{ use tracing::{debug, error, info, warn}; use types::{EthSpec, Hash256, PublicKeyBytes}; use validator_store::ValidatorStore; - /// The interval between attempts to contact the beacon node during startup. const RETRY_DELAY: Duration = Duration::from_secs(2); @@ -553,7 +553,7 @@ impl ProductionValidatorClient { graffiti_flag: self.config.graffiti, spec: self.context.eth2_config.spec.clone(), config: self.config.http_api.clone(), - // sse_logging_components: self.context.sse_logging_components.clone(), + sse_logging_components: SSE_LOGGING_COMPONENTS.lock().unwrap().clone(), slot_clock: self.slot_clock.clone(), // log: log.clone(), _phantom: PhantomData,