diff --git a/src/bench.rs b/src/bench.rs index a21f953..7362682 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -46,7 +46,11 @@ async fn run_commands_on_single_thread(limiter: Arc, config: Client // prepare pipeline let mut p = Vec::new(); for _ in 0..pipeline_cnt { - p.push(cmd.gen_cmd()); + if context.is_loading { + p.push(cmd.gen_cmd_with_lock()); + } else { + p.push(cmd.gen_cmd()); + } } let instant = std::time::Instant::now(); client.run_commands(p).await; @@ -60,7 +64,7 @@ async fn run_commands_on_single_thread(limiter: Arc, config: Client local.await; } -fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: SharedContext, mut wg: WaitGroup, load: bool, quiet: bool) -> BenchmarkResult { +fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: SharedContext, mut wg: WaitGroup, quiet: bool) -> BenchmarkResult { let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); let mut result = BenchmarkResult::default(); @@ -93,10 +97,10 @@ fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: Sh result.qps = (cnt - overall_cnt_overhead) as f64 / overall_time.elapsed().as_secs_f64(); } if !quiet { - if load { - print!("\r\x1B[2KData loading qps: {:.0}, {:.2}%", qps, histogram.cnt() as f64 / case.count as f64 * 100f64); + if context.is_loading { + println!("\x1B[F\x1B[2KData loading qps: {:.0}, {:.2}%", qps, histogram.cnt() as f64 / case.count as f64 * 100f64); } else { - print!("\r\x1B[2Kqps: {:.0}(overall {:.0}), conn: {}, {}", qps, result.qps, conn, histogram); + println!("\x1B[F\x1B[2Kqps: {:.0}(overall {:.0}), conn: {}, {}", qps, result.qps, conn, histogram); } } std::io::stdout().flush().unwrap(); @@ -113,10 +117,10 @@ fn wait_finish(case: &Case, mut auto_connection: AutoConnection, mut context: Sh } } let conn: u64 = auto_connection.active_conn(); - if load { - print!("\r\x1B[2KData loaded, qps: {:.0}, time elapsed: {:.2}s\n", result.qps, overall_time.elapsed().as_secs_f64()); + if context.is_loading { + println!("\x1B[F\x1B[2KData loaded, qps: {:.0}, time elapsed: {:.2}s\n", result.qps, overall_time.elapsed().as_secs_f64()); } else { - print!("\r\x1B[2Kqps: {:.0}, conn: {}, {}\n", result.qps, conn, histogram) + println!("\x1B[F\x1B[2Kqps: {:.0}, conn: {}, {}\n", result.qps, conn, histogram) }; result.avg_latency_ms = histogram.avg() as f64 / 1_000.0; result.p99_latency_ms = histogram.percentile(0.99) as f64 / 1_000.0; @@ -139,7 +143,7 @@ pub fn do_benchmark(client_config: ClientConfig, cores: Vec, case: Case, lo let mut thread_handlers = Vec::new(); let wg = WaitGroup::new(); let core_ids = core_affinity::get_core_ids().unwrap(); - let context = SharedContext::new(case.count, case.seconds); + let context = SharedContext::new(case.count, case.seconds, load); for inx in 0..cores.len() { let client_config = client_config.clone(); let case = case.clone(); @@ -159,7 +163,7 @@ pub fn do_benchmark(client_config: ClientConfig, cores: Vec, case: Case, lo } // log thread - let result = wait_finish(&case, auto_connection, context, wg, load, quiet); + let result = wait_finish(&case, auto_connection, context, wg, quiet); // join all threads for thread_handler in thread_handlers { diff --git a/src/command/mod.rs b/src/command/mod.rs index 392386c..2e5b17c 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -33,22 +33,30 @@ impl Command { } pub fn gen_cmd(&mut self) -> redis::Cmd { let mut cmd = redis::Cmd::new(); + let mut cmd_str = String::new(); for ph in self.argv.iter_mut() { for arg in ph.gen() { - cmd.arg(arg); + cmd_str.push_str(&arg); } } + for word in cmd_str.split_whitespace() { + cmd.arg(word); + } cmd } #[allow(dead_code)] pub fn gen_cmd_with_lock(&mut self) -> redis::Cmd { - let mut cmd = redis::Cmd::new(); let _lock = self.lock.lock().unwrap(); + let mut cmd = redis::Cmd::new(); + let mut cmd_str = String::new(); for ph in self.argv.iter_mut() { for arg in ph.gen() { - cmd.arg(arg); + cmd_str.push_str(&arg); } } + for word in cmd_str.split_whitespace() { + cmd.arg(word); + } cmd } pub fn to_string(&self) -> String { diff --git a/src/command/parser.rs b/src/command/parser.rs index b5edcd0..3481e4c 100644 --- a/src/command/parser.rs +++ b/src/command/parser.rs @@ -3,7 +3,6 @@ use nom::{ sequence::delimited, branch::alt, bytes::complete::{is_not, tag}, - character::complete::multispace0, multi::many0, combinator::{map, all_consuming}, }; @@ -13,16 +12,15 @@ fn parse_string(input: &str) -> IResult<&str, PlaceholderEnum> { let s = alt(( delimited(tag("\""), is_not("\""), tag("\"")), delimited(tag("\'"), is_not("\'"), tag("\'")), - delimited(multispace0, is_not("{ "), multispace0) - )); + is_not("{") + )); map(s, PlaceholderEnum::new_string)(input) } fn parse_placeholder(input: &str) -> IResult<&str, PlaceholderEnum> { let inner = delimited(tag("{"), is_not("}"), tag("}")); - let eat_whitespace = delimited(multispace0, inner, multispace0); - map(eat_whitespace, PlaceholderEnum::new)(input) + map(inner, PlaceholderEnum::new)(input) } @@ -36,7 +34,7 @@ mod tests { #[test] fn test_root() { - let (nm, args) = match parse_all("aa {key sequence 100} bbb") { + let (nm, args) = match parse_all("aa test_{key sequence 100} bbb") { Ok((nm, args)) => (nm, args), Err(e) => { println!("Error: {:?}", e); diff --git a/src/shared_context.rs b/src/shared_context.rs index c288382..95bebc1 100644 --- a/src/shared_context.rs +++ b/src/shared_context.rs @@ -8,6 +8,7 @@ use std::time::Instant; #[derive(Clone)] pub struct SharedContext { + pub is_loading: bool, // limit by max_count current_count: Arc, max_count: u64, @@ -24,8 +25,9 @@ pub struct SharedContext { } impl SharedContext { - pub fn new(max_count: u64, max_seconds: u64) -> Self { + pub fn new(max_count: u64, max_seconds: u64, is_loading: bool) -> Self { SharedContext { + is_loading, current_count: Arc::new(AtomicU64::new(0)), max_count, instant: Arc::new(RwLock::new(None)),