Skip to content

Commit

Permalink
remove duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Nov 4, 2024
1 parent 70a248b commit bd3834a
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 193 deletions.
193 changes: 2 additions & 191 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,23 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Display;
use std::io::{stdout, Stdout, Write};
use std::num::NonZeroUsize;
use std::ops::Div;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::{fmt, io};

use anyhow::{anyhow, bail, Context};
use bytesize::ByteSize;
use clap::{arg, Arg, ArgAction, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use colored::Colorize;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use numfmt::{Formatter, Scales};
use quickwit_actors::ActorHandle;
use quickwit_common::tower::{Rate, RateEstimator, SmaRateEstimator};
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::search::{CountHits, SortField, SortOrder};
use quickwit_proto::types::IndexId;
Expand All @@ -54,12 +47,11 @@ use tabled::settings::object::{FirstRow, Rows, Segment};
use tabled::settings::panel::Footer;
use tabled::settings::{Alignment, Disable, Format, Modify, Panel, Rotate, Style};
use tabled::{Table, Tabled};
use thousands::Separable;
use tracing::{debug, Level};

use crate::checklist::GREEN_COLOR;
use crate::stats::{mean, percentile, std_deviation};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE};
use crate::{client_args, make_table, prompt_confirmation, ClientArgs};

pub fn build_index_command() -> Command {
Command::new("index")
Expand Down Expand Up @@ -1143,187 +1135,6 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
Ok(())
}

/// Starts a tokio task that displays the indexing statistics
/// every once in awhile.
pub async fn start_statistics_reporting_loop(
pipeline_handle: ActorHandle<IndexingPipeline>,
is_stdin: bool,
) -> anyhow::Result<IndexingStatistics> {
let mut stdout_handle = stdout();
let start_time = Instant::now();
let mut throughput_calculator = ThroughputCalculator::new(start_time);
let mut report_interval = tokio::time::interval(Duration::from_secs(1));

loop {
// TODO fixme. The way we wait today is a bit lame: if the indexing pipeline exits, we will
// still wait up to an entire heartbeat... Ideally we should select between two
// futures.
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if pipeline_handle.state().is_exit() {
break;
}
}
let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await;
if !pipeline_exit_status.is_success() {
bail!(pipeline_exit_status);
}
// If we have received zero docs at this point,
// there is no point in displaying report.
if pipeline_statistics.num_docs == 0 {
return Ok(pipeline_statistics);
}

if is_stdin {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&pipeline_statistics,
)?;
}
// display end of task report
println!();
let secs = Duration::from_secs(start_time.elapsed().as_secs());
if pipeline_statistics.num_invalid_docs == 0 {
println!(
"Indexed {} documents in {}.",
pipeline_statistics.num_docs.separate_with_commas(),
format_duration(secs)
);
} else {
let num_indexed_docs = (pipeline_statistics.num_docs
- pipeline_statistics.num_invalid_docs)
.separate_with_commas();

let error_rate = (pipeline_statistics.num_invalid_docs as f64
/ pipeline_statistics.num_docs as f64)
* 100.0;

println!(
"Indexed {} out of {} documents in {}. Failed to index {} document(s). {}\n",
num_indexed_docs,
pipeline_statistics.num_docs.separate_with_commas(),
format_duration(secs),
pipeline_statistics.num_invalid_docs.separate_with_commas(),
colorize_error_rate(error_rate),
);
}

Ok(pipeline_statistics)
}

fn colorize_error_rate(error_rate: f64) -> ColoredString {
let error_rate_message = format!("({error_rate:.1}% error rate)");
if error_rate < 1.0 {
error_rate_message.yellow()
} else if error_rate < 5.0 {
error_rate_message.truecolor(255, 181, 46) //< Orange
} else {
error_rate_message.red()
}
}

/// A struct to print data on the standard output.
struct Printer<'a> {
pub stdout: &'a mut Stdout,
}

impl Printer<'_> {
pub fn print_header(&mut self, header: &str) -> io::Result<()> {
write!(&mut self.stdout, " {}", header.bright_blue())?;
Ok(())
}

pub fn print_value(&mut self, fmt_args: fmt::Arguments) -> io::Result<()> {
write!(&mut self.stdout, " {fmt_args}")
}

pub fn flush(&mut self) -> io::Result<()> {
self.stdout.flush()
}
}

fn display_statistics(
stdout: &mut Stdout,
throughput_calculator: &mut ThroughputCalculator,
statistics: &IndexingStatistics,
) -> anyhow::Result<()> {
let elapsed_duration = time::Duration::try_from(throughput_calculator.elapsed_time())?;
let elapsed_time = format!(
"{:02}:{:02}:{:02}",
elapsed_duration.whole_hours(),
elapsed_duration.whole_minutes() % 60,
elapsed_duration.whole_seconds() % 60
);
let throughput_mb_s = throughput_calculator.calculate(statistics.total_bytes_processed);
let mut printer = Printer { stdout };
printer.print_header("Num docs")?;
printer.print_value(format_args!("{:>7}", statistics.num_docs))?;
printer.print_header("Parse errs")?;
printer.print_value(format_args!("{:>5}", statistics.num_invalid_docs))?;
printer.print_header("PublSplits")?;
printer.print_value(format_args!("{:>3}", statistics.num_published_splits))?;
printer.print_header("Input size")?;
printer.print_value(format_args!(
"{:>5}MB",
statistics.total_bytes_processed / 1_000_000
))?;
printer.print_header("Thrghput")?;
printer.print_value(format_args!("{throughput_mb_s:>5.2}MB/s"))?;
printer.print_header("Time")?;
printer.print_value(format_args!("{elapsed_time}\n"))?;
printer.flush()?;
Ok(())
}

/// ThroughputCalculator is used to calculate throughput.
struct ThroughputCalculator {
/// Stores the time series of processed bytes value.
processed_bytes_values: VecDeque<(Instant, u64)>,
/// Store the time this calculator started
start_time: Instant,
}

impl ThroughputCalculator {
/// Creates new instance.
pub fn new(start_time: Instant) -> Self {
let processed_bytes_values: VecDeque<(Instant, u64)> = (0..THROUGHPUT_WINDOW_SIZE)
.map(|_| (start_time, 0u64))
.collect();
Self {
processed_bytes_values,
start_time,
}
}

/// Calculates the throughput.
pub fn calculate(&mut self, current_processed_bytes: u64) -> f64 {
self.processed_bytes_values.pop_front();
let current_instant = Instant::now();
let (first_instant, first_processed_bytes) = *self.processed_bytes_values.front().unwrap();
let elapsed_time = (current_instant - first_instant).as_millis() as f64 / 1_000f64;
self.processed_bytes_values
.push_back((current_instant, current_processed_bytes));
(current_processed_bytes - first_processed_bytes) as f64
/ 1_000_000f64
/ elapsed_time.max(1f64)
}

pub fn elapsed_time(&self) -> Duration {
self.start_time.elapsed()
}
}

#[cfg(test)]
mod test {

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit bd3834a

Please sign in to comment.