Skip to content

Commit

Permalink
fix: benchmark with multiple producers (#4348)
Browse files Browse the repository at this point in the history
* fix: benchmark with many producers

* fix: interval latency on benchmark
  • Loading branch information
fraidev authored Jan 21, 2025
1 parent d7f262f commit 58c526b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 116 deletions.
29 changes: 11 additions & 18 deletions crates/fluvio-benchmark/src/producer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,18 @@ impl ProducerBenchmark {
let mut tx_controls = Vec::new();
let mut workers_jh = Vec::new();

let (stat_sender, stat_receiver) = unbounded();
let (stats_sender, stats_receiver) = unbounded();
let (end_sender, end_receiver) = unbounded();
let stat_collector =
StatCollector::create(config.num_records, end_sender.clone(), stats_sender.clone());

// Set up producers
for producer_id in 0..config.num_producers {
let (event_sender, event_receiver) = unbounded();
stat_collector.add_producer(event_receiver);
println!("starting up producer {}", producer_id);
let stat_collector = StatCollector::create(
config.num_records,
end_sender.clone(),
stat_sender.clone(),
event_receiver,
);
let (tx_control, rx_control) = unbounded();
let worker =
ProducerWorker::new(producer_id, config.clone(), stat_collector, event_sender)
.await?;
let worker = ProducerWorker::new(producer_id, config.clone(), event_sender).await?;
let jh = spawn(timeout(
config.worker_timeout,
ProducerDriver::main_loop(rx_control, worker),
Expand All @@ -80,28 +75,26 @@ impl ProducerBenchmark {

loop {
select! {
stat_rx = stat_receiver.recv() => {
stat_rx = stats_receiver.recv() => {
if let Ok(stat) = stat_rx {
let human_readable_bytes = ByteSize(stat.bytes_per_sec).to_string();
println!(
//TODO: fix inteval latency
//"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
"{} records sent, {} records/sec: ({}/sec)",
"{} records sent, {} records/sec: ({}/sec), {:.2}ms avg latency, {:.2}ms max latency",
stat.record_send, stat.records_per_sec, human_readable_bytes,
//utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
utils::nanos_to_ms_pritable(stat.latency_avg), utils::nanos_to_ms_pritable(stat.latency_max)
);
}
}
end = end_receiver.recv() => {
if let Ok(end) = end {
let mut latency_yaml = String::new();
latency_yaml.push_str(&format!("{:.2}ms avg latency, {:.2}ms max latency",
utils::nanos_to_ms_pritable(end.histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(1.0))));
utils::nanos_to_ms_pritable(end.latencies_histogram.mean() as u64),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(1.0))));
for percentile in [0.5, 0.95, 0.99] {
latency_yaml.push_str(&format!(
", {:.2}ms p{percentile:4.2}",
utils::nanos_to_ms_pritable(end.histogram.value_at_quantile(percentile)),
utils::nanos_to_ms_pritable(end.latencies_histogram.value_at_quantile(percentile)),
));
}
println!();
Expand Down
4 changes: 0 additions & 4 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use futures_util::future::BoxFuture;

use crate::{
config::{ProducerConfig, RecordKeyAllocationStrategy},
stats_collector::StatCollector,
utils,
};

Expand Down Expand Up @@ -42,13 +41,11 @@ impl ProducerCallback for BenchmarkProducerCallback {
pub(crate) struct ProducerWorker {
fluvio_producer: TopicProducerPool,
records_to_send: Vec<BenchmarkRecord>,
_stat: StatCollector,
}
impl ProducerWorker {
pub(crate) async fn new(
id: u64,
config: ProducerConfig,
stat: StatCollector,
event_sender: Sender<ProduceCompletionBatchEvent>,
) -> Result<Self> {
let fluvio = Fluvio::connect().await?;
Expand Down Expand Up @@ -78,7 +75,6 @@ impl ProducerWorker {
Ok(ProducerWorker {
fluvio_producer,
records_to_send,
_stat: stat,
})
}

Expand Down
Loading

0 comments on commit 58c526b

Please sign in to comment.