From a218f918aa07c099f4c35e27925712905c0b4028 Mon Sep 17 00:00:00 2001 From: Jeff Hansen Date: Fri, 19 May 2023 17:09:44 -0400 Subject: [PATCH 1/4] cli: add log level option, disable hyper and h2 logs, use debug level --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/publisher.rs | 12 ++++---- src/api/subscriber.rs | 22 +++++++------- src/main.rs | 39 +++++++++++++++++++++---- src/subscriptions/subscription_actor.rs | 2 +- 6 files changed, 54 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7fc8947..514df50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -230,7 +230,7 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "deltio" -version = "0.1.0" +version = "0.2.0" dependencies = [ "async-stream", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 331c389..756bb6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltio" -version = "0.1.0" +version = "0.2.0" edition = "2021" authors = ["Jeff Hansen"] description = "A Google Cloud Pub/Sub emulator alternative for local testing and CI" diff --git a/src/api/publisher.rs b/src/api/publisher.rs index 0c5df96..f08abbc 100644 --- a/src/api/publisher.rs +++ b/src/api/publisher.rs @@ -61,7 +61,7 @@ impl Publisher for PublisherService { message_storage_policy: None, }; - log::info!( + log::debug!( "{}: creating topic took {:?}", topic_name_str, start.elapsed() @@ -108,7 +108,7 @@ impl Publisher for PublisherService { message_ids: result.message_ids.iter().map(|m| m.to_string()).collect(), }); - log::info!( + log::debug!( "{}: publishing {} messages took {:?}", &topic_name, request.messages.len(), @@ -128,7 +128,7 @@ impl Publisher for PublisherService { let topic = self.get_topic_internal(&topic_name).await?; - log::info!("{}: getting topic took {:?}", &topic_name, start.elapsed()); + log::debug!("{}: getting topic took {:?}", &topic_name, start.elapsed()); Ok(Response::new(Topic { name: topic.name.to_string(), labels: Default::default(), @@ -176,7 +176,7 @@ impl Publisher for PublisherService { next_page_token: page_token.unwrap_or(String::default()), }; - log::info!( + log::debug!( "{}: listing {} topics took {:?}", &request.project, response.topics.len(), @@ -204,7 +204,7 @@ impl Publisher for PublisherService { ListSubscriptionsError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: listing {} subscriptions took {:?}", &topic_name, page.subscriptions.len(), @@ -246,7 +246,7 @@ impl Publisher for PublisherService { DeleteError::Closed => conflict(), })?; - log::info!("{}: deleting topic took {:?}", &topic_name, start.elapsed()); + log::debug!("{}: deleting topic took {:?}", &topic_name, start.elapsed()); Ok(Response::new(())) } diff --git a/src/api/subscriber.rs b/src/api/subscriber.rs index 091b01d..a43c858 100644 --- a/src/api/subscriber.rs +++ b/src/api/subscriber.rs @@ -89,7 +89,7 @@ impl Subscriber for SubscriberService { let subscription_info = subscription.get_info().await.map_err(|e| match e { GetInfoError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: creating subscription ({:?})", subscription_name.clone(), start.elapsed() @@ -121,7 +121,7 @@ impl Subscriber for SubscriberService { GetInfoError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: getting subscription ({:?})", subscription_name.clone(), start.elapsed() @@ -177,7 +177,7 @@ impl Subscriber for SubscriberService { next_page_token: page_token.unwrap_or(String::default()), }; - log::info!( + log::debug!( "{}: listing subscriptions ({:?})", &request.project, start.elapsed() @@ -194,11 +194,11 @@ impl Subscriber for SubscriberService { let subscription_name = parser::parse_subscription_name(&request.subscription)?; let subscription = get_subscription(&self.subscription_manager, &subscription_name)?; - log::info!("{}: deleting subscription", &subscription_name); + log::debug!("{}: deleting subscription", &subscription_name); subscription.delete().await.map_err(|e| match e { DeleteError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: deleting subscription ({:?})", subscription_name.clone(), start.elapsed() @@ -256,7 +256,7 @@ impl Subscriber for SubscriberService { AcknowledgeMessagesError::Closed => Status::internal("System is shutting down"), })?; - log::info!( + log::debug!( "{}: ack {} messages ({:?})", &subscription_name, &ack_ids.len(), @@ -279,7 +279,7 @@ impl Subscriber for SubscriberService { pull_messages(&subscription, request.max_messages as u16).await?; // If we got messages, return them. if !received_messages.is_empty() { - log::info!( + log::debug!( "{}: pulled {} messages", &subscription_name, received_messages.len() @@ -331,7 +331,7 @@ impl Subscriber for SubscriberService { let subscription_name = parser::parse_subscription_name(&request.subscription)?; let subscription = get_subscription(&self.subscription_manager, &subscription_name)?; - log::info!( + log::debug!( "{}: starting streaming pull ({:?})", subscription_name, start.elapsed() @@ -370,7 +370,7 @@ impl Subscriber for SubscriberService { // If we received any messages, yield them back. if !received_messages.is_empty() { - log::info!( + log::debug!( "{}: streaming-pulled {} messages", &subscription_name, received_messages.len() @@ -546,7 +546,7 @@ async fn handle_streaming_pull_request( AcknowledgeMessagesError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: acked {} messages ({:?})", &subscription.name, ack_id_count, @@ -571,7 +571,7 @@ async fn handle_streaming_pull_request( ModifyDeadlineError::Closed => conflict(), })?; - log::info!( + log::debug!( "{}: modified {} deadlines ({:?})", &subscription.name, modifications_count, diff --git a/src/main.rs b/src/main.rs index 12c9974..6c11f50 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,23 +9,40 @@ struct Cli { /// The hostname + port to listen on. #[arg(short, long, value_name = "ADDR", default_value = "0.0.0.0:8085")] bind: SocketAddr, + + /// The log level to use. + #[arg(short, long, value_name = "LEVEL", default_value = "info")] + log: LogLevelArg, +} + +#[derive(clap::ValueEnum, Clone)] +enum LogLevelArg { + Off, + Error, + Warn, + Info, + Debug, + Trace, } #[tokio::main] async fn main() -> Result<(), Box> { + // Parse the CLI arguments. + let args = Cli::parse(); + // Configure the logger. env_logger::builder() .format_target(false) - .filter_level(LevelFilter::Info) + .filter_level(map_log_level(args.log)) + // Turn off noisy library logs. + .filter_module("h2", LevelFilter::Off) + .filter_module("hyper", LevelFilter::Off) .parse_default_env() .init(); - // Parse the CLI arguments. - let args = Cli::parse(); - // Shutdown signal (Ctrl + C) let signal = async { - // Ignore errors. + // Ignore errors from the signal. let _ = tokio::signal::ctrl_c().await; }; @@ -40,3 +57,15 @@ async fn main() -> Result<(), Box> { Ok(()) } + +/// Maps the log level argument to the `LevelFilter` enum. +fn map_log_level(level: LogLevelArg) -> LevelFilter { + match level { + LogLevelArg::Off => LevelFilter::Off, + LogLevelArg::Error => LevelFilter::Error, + LogLevelArg::Warn => LevelFilter::Warn, + LogLevelArg::Info => LevelFilter::Info, + LogLevelArg::Debug => LevelFilter::Debug, + LogLevelArg::Trace => LevelFilter::Trace, + } +} diff --git a/src/subscriptions/subscription_actor.rs b/src/subscriptions/subscription_actor.rs index ccda589..cfaffa1 100644 --- a/src/subscriptions/subscription_actor.rs +++ b/src/subscriptions/subscription_actor.rs @@ -285,7 +285,7 @@ impl SubscriptionActor { /// Handles expired messages by putting them back into the backlog. fn handle_expired_messages(&mut self, expired: Vec) { - log::info!("{}: {} messages expired", &self.info.name, expired.len()); + log::debug!("{}: {} messages expired", &self.info.name, expired.len()); self.backlog .append(expired.into_iter().map(|p| p.into_message())); From 8904e1dc1c2a3b8af9cbd97b7ff3772daaae25d0 Mon Sep 17 00:00:00 2001 From: Jeff Hansen Date: Fri, 19 May 2023 17:18:36 -0400 Subject: [PATCH 2/4] cli: add option for running in single-threaded mode --- src/main.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6c11f50..6efb299 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,10 @@ struct Cli { /// The log level to use. #[arg(short, long, value_name = "LEVEL", default_value = "info")] log: LogLevelArg, + + /// Whether to run Deltio on a single thread instead of a worker pool of threads (one per CPU) + #[arg(short, long)] + single_thread: bool, } #[derive(clap::ValueEnum, Clone)] @@ -25,11 +29,24 @@ enum LogLevelArg { Trace, } -#[tokio::main] -async fn main() -> Result<(), Box> { - // Parse the CLI arguments. +fn main() -> Result<(), Box> { let args = Cli::parse(); + let mut builder = if args.single_thread { + tokio::runtime::Builder::new_current_thread() + } else { + tokio::runtime::Builder::new_multi_thread() + }; + + builder + .enable_time() + .enable_io() + .thread_name("deltio worker"); + + let runtime = builder.build()?; + runtime.block_on(main_core(args)) +} +async fn main_core(args: Cli) -> Result<(), Box> { // Configure the logger. env_logger::builder() .format_target(false) From 6cf5e01bc08fb5bffc627b47c3a1928071509245 Mon Sep 17 00:00:00 2001 From: Jeff Hansen Date: Fri, 19 May 2023 17:34:09 -0400 Subject: [PATCH 3/4] perf: only collect system time for duration checking when debug logging is enabled --- src/api/publisher.rs | 35 +++++++++++++---------------- src/api/subscriber.rs | 52 +++++++++++++++++++------------------------ src/lib.rs | 1 + src/tracing/mod.rs | 31 ++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 48 deletions(-) create mode 100644 src/tracing/mod.rs diff --git a/src/api/publisher.rs b/src/api/publisher.rs index f08abbc..2442c3d 100644 --- a/src/api/publisher.rs +++ b/src/api/publisher.rs @@ -8,6 +8,7 @@ use crate::topics::{ PublishMessagesError, }; use crate::topics::{TopicMessage, TopicName}; +use crate::tracing::ActivitySpan; use bytes::Bytes; use std::collections::HashMap; use std::sync::Arc; @@ -39,7 +40,7 @@ impl PublisherService { #[async_trait::async_trait] impl Publisher for PublisherService { async fn create_topic(&self, request: Request) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.name)?; let topic_name_str = topic_name.to_string(); @@ -61,11 +62,7 @@ impl Publisher for PublisherService { message_storage_policy: None, }; - log::debug!( - "{}: creating topic took {:?}", - topic_name_str, - start.elapsed() - ); + log::debug!("{}: creating topic {}", topic_name_str, start); Ok(Response::new(response)) } @@ -82,7 +79,7 @@ impl Publisher for PublisherService { &self, request: Request, ) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.topic)?; @@ -109,10 +106,10 @@ impl Publisher for PublisherService { }); log::debug!( - "{}: publishing {} messages took {:?}", + "{}: publishing {} messages {}", &topic_name, request.messages.len(), - start.elapsed() + start ); Ok(response) @@ -122,13 +119,13 @@ impl Publisher for PublisherService { &self, request: Request, ) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.topic)?; let topic = self.get_topic_internal(&topic_name).await?; - log::debug!("{}: getting topic took {:?}", &topic_name, start.elapsed()); + log::debug!("{}: getting topic {}", &topic_name, start); Ok(Response::new(Topic { name: topic.name.to_string(), labels: Default::default(), @@ -144,7 +141,7 @@ impl Publisher for PublisherService { &self, request: Request, ) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let paging = parser::parse_paging(request.page_size, &request.page_token)?; let project_id = parser::parse_project_id(&request.project)?; @@ -177,10 +174,10 @@ impl Publisher for PublisherService { }; log::debug!( - "{}: listing {} topics took {:?}", + "{}: listing {} topics {}", &request.project, response.topics.len(), - start.elapsed() + start ); Ok(Response::new(response)) } @@ -189,7 +186,7 @@ impl Publisher for PublisherService { &self, request: Request, ) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.topic)?; @@ -205,10 +202,10 @@ impl Publisher for PublisherService { })?; log::debug!( - "{}: listing {} subscriptions took {:?}", + "{}: listing {} subscriptions {}", &topic_name, page.subscriptions.len(), - start.elapsed() + start ); Ok(Response::new(ListTopicSubscriptionsResponse { subscriptions: page @@ -236,7 +233,7 @@ impl Publisher for PublisherService { &self, request: Request, ) -> Result, Status> { - let start = std::time::Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.topic)?; @@ -246,7 +243,7 @@ impl Publisher for PublisherService { DeleteError::Closed => conflict(), })?; - log::debug!("{}: deleting topic took {:?}", &topic_name, start.elapsed()); + log::debug!("{}: deleting topic {}", &topic_name, start); Ok(Response::new(())) } diff --git a/src/api/subscriber.rs b/src/api/subscriber.rs index a43c858..c4b9770 100644 --- a/src/api/subscriber.rs +++ b/src/api/subscriber.rs @@ -17,6 +17,7 @@ use crate::subscriptions::{ }; use crate::topics::topic_manager::TopicManager; use crate::topics::GetTopicError; +use crate::tracing::ActivitySpan; use futures::Stream; use std::pin::Pin; use std::sync::Arc; @@ -49,7 +50,7 @@ impl Subscriber for SubscriberService { &self, request: Request, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let topic_name = parser::parse_topic_name(&request.topic)?; @@ -90,9 +91,9 @@ impl Subscriber for SubscriberService { GetInfoError::Closed => conflict(), })?; log::debug!( - "{}: creating subscription ({:?})", + "{}: creating subscription {}", subscription_name.clone(), - start.elapsed() + start ); Ok(Response::new(map_to_subscription_resource( &subscription, @@ -104,7 +105,7 @@ impl Subscriber for SubscriberService { &self, request: Request, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let subscription_name = parser::parse_subscription_name(&request.subscription)?; @@ -122,9 +123,9 @@ impl Subscriber for SubscriberService { })?; log::debug!( - "{}: getting subscription ({:?})", + "{}: getting subscription {}", subscription_name.clone(), - start.elapsed() + start ); Ok(Response::new(map_to_subscription_resource( &subscription, @@ -145,7 +146,7 @@ impl Subscriber for SubscriberService { &self, request: Request, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let paging = parser::parse_paging(request.page_size, &request.page_token)?; @@ -177,11 +178,7 @@ impl Subscriber for SubscriberService { next_page_token: page_token.unwrap_or(String::default()), }; - log::debug!( - "{}: listing subscriptions ({:?})", - &request.project, - start.elapsed() - ); + log::debug!("{}: listing subscriptions {}", &request.project, start); Ok(Response::new(response)) } @@ -189,7 +186,7 @@ impl Subscriber for SubscriberService { &self, request: Request, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let subscription_name = parser::parse_subscription_name(&request.subscription)?; let subscription = get_subscription(&self.subscription_manager, &subscription_name)?; @@ -199,9 +196,9 @@ impl Subscriber for SubscriberService { DeleteError::Closed => conflict(), })?; log::debug!( - "{}: deleting subscription ({:?})", + "{}: deleting subscription {}", subscription_name.clone(), - start.elapsed() + start ); Ok(Response::new(())) } @@ -238,7 +235,7 @@ impl Subscriber for SubscriberService { &self, request: Request, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let request = request.get_ref(); let ack_ids = request .ack_ids @@ -257,10 +254,10 @@ impl Subscriber for SubscriberService { })?; log::debug!( - "{}: ack {} messages ({:?})", + "{}: ack {} messages {}", &subscription_name, &ack_ids.len(), - start.elapsed() + start ); Ok(Response::new(())) @@ -321,7 +318,7 @@ impl Subscriber for SubscriberService { &self, request: Request>, ) -> Result, Status> { - let start = Instant::now(); + let start = ActivitySpan::start(); let mut stream = request.into_inner(); let request = match stream.next().await { None => return Err(Status::cancelled("The request was canceled")), @@ -331,11 +328,7 @@ impl Subscriber for SubscriberService { let subscription_name = parser::parse_subscription_name(&request.subscription)?; let subscription = get_subscription(&self.subscription_manager, &subscription_name)?; - log::debug!( - "{}: starting streaming pull ({:?})", - subscription_name, - start.elapsed() - ); + log::debug!("{}: starting streaming pull {}", subscription_name, start); // Pulls messages and streams them to the client. let pull_stream = { @@ -531,7 +524,7 @@ async fn handle_streaming_pull_request( // Ack messages if appropriate. if !request.ack_ids.is_empty() { - let start = Instant::now(); + let start = ActivitySpan::start(); let ack_ids = request .ack_ids .iter() @@ -547,15 +540,16 @@ async fn handle_streaming_pull_request( })?; log::debug!( - "{}: acked {} messages ({:?})", + "{}: acked {} messages {}", &subscription.name, ack_id_count, - start.elapsed() + start ); } // Extend deadlines if requested to do so. if !request.modify_deadline_ack_ids.is_empty() { + let start = ActivitySpan::start(); let now = Instant::now(); let deadline_modifications = parser::parse_deadline_modifications( now, @@ -572,10 +566,10 @@ async fn handle_streaming_pull_request( })?; log::debug!( - "{}: modified {} deadlines ({:?})", + "{}: modified {} deadlines {}", &subscription.name, modifications_count, - now.elapsed() + start ); } diff --git a/src/lib.rs b/src/lib.rs index 6fc1524..7b2d54d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod paging; pub mod pubsub_proto; pub mod subscriptions; pub mod topics; +mod tracing; use crate::api::subscriber::SubscriberService; use crate::pubsub_proto::publisher_server::PublisherServer; diff --git a/src/tracing/mod.rs b/src/tracing/mod.rs new file mode 100644 index 0000000..d3f185e --- /dev/null +++ b/src/tracing/mod.rs @@ -0,0 +1,31 @@ +use log::log_enabled; +use log::Level::Debug; +use std::fmt::{Display, Formatter}; +use tokio::time::Instant; + +/// Tracks the start time of an activity span. +/// This is used for measuring how long operations take +/// but only when the appropriate log level is set to avoid +/// a potentially expensive syscall. +pub struct ActivitySpan(Option); + +impl ActivitySpan { + /// Starts a new activity span. + pub fn start() -> Self { + if log_enabled!(Debug) { + Self(Some(Instant::now())) + } else { + Self(None) + } + } +} + +impl Display for ActivitySpan { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(started) = self.0 { + write!(f, "({:?})", &started.elapsed()) + } else { + Ok(()) + } + } +} From 449d59d9ce8b96631b04e31ffd4d0a5a6a878d4f Mon Sep 17 00:00:00 2001 From: Jeff Hansen Date: Fri, 19 May 2023 17:34:53 -0400 Subject: [PATCH 4/4] chore: update packages --- Cargo.lock | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 514df50..d67bbc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,9 +182,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.2.7" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34d21f9bf1b425d2968943631ec91202fe5e837264063503708b83013f8fc938" +checksum = "93aae7a4192245f70fe75dd9157fc7b4a5bf53e88d30bd4396f7d8f9284d5acc" dependencies = [ "clap_builder", "clap_derive", @@ -193,9 +193,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.2.7" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "914c8c79fb560f238ef6429439a30023c862f7a28e688c58f7203f12b29970bd" +checksum = "4f423e341edefb78c9caba2d9c7f7687d0e72e89df3ce3394554754393ac3990" dependencies = [ "anstream", "anstyle", @@ -206,9 +206,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.2.0" +version = "4.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9644cd56d6b87dbe899ef8b053e331c0637664e9e21a33dfcdc36093f5c5c4" +checksum = "191d9573962933b4027f932c600cd252ce27a8ad5979418fe78e43c07996f27b" dependencies = [ "heck", "proc-macro2", @@ -218,9 +218,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.4.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a2dd5a6fe8c6e3502f568a6353e5273bbb15193ad9a89e457b9970798efbea1" +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" [[package]] name = "colorchoice" @@ -772,9 +772,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4ec6d5fe0b140acb27c9a0444118cf55bfbb4e0b259739429abb4521dd67c16" +checksum = "fa1fb82fc0c281dd9671101b66b771ebbe1eaf967b96ac8740dcba4b70005ca8" dependencies = [ "unicode-ident", ]