From 496b9c9b137b569eb97147ca718d8997bade5c2d Mon Sep 17 00:00:00 2001 From: Seamus Abshere Date: Tue, 4 Mar 2025 09:27:49 -0500 Subject: [PATCH 1/4] --select-columns technically redundant with `xsv select`, but sometimes when processing very large files you don't want to emit all that data to a pipeline just to immediately discard it --- scrubcsv/CHANGELOG.md | 6 +++ scrubcsv/src/main.rs | 103 ++++++++++++++++++++++++++++++++++++---- scrubcsv/tests/tests.rs | 41 ++++++++++++++++ 3 files changed, 142 insertions(+), 8 deletions(-) diff --git a/scrubcsv/CHANGELOG.md b/scrubcsv/CHANGELOG.md index 983161a..d57fff2 100644 --- a/scrubcsv/CHANGELOG.md +++ b/scrubcsv/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.0.1] - 2025-03-04 + +### Added + +- Added `--select-columns` which accepts a CSV of headers (in their final post-processed form) and only returns those headers. While this is technically redundant with `xsv select`, it allows earlier field selection when processing very large files. + ## [1.0.0] - 2022-05-25 ### Added diff --git a/scrubcsv/src/main.rs b/scrubcsv/src/main.rs index 4bfe58a..6b999c1 100644 --- a/scrubcsv/src/main.rs +++ b/scrubcsv/src/main.rs @@ -93,6 +93,14 @@ struct Opt { #[structopt(long = "reserve-column-names")] reserve_column_names: Option, + /// Allow selecting columns by name. Should be submitted as a CSV list of column names. + /// If not specified, all columns will be included. + /// Column names should be specified in their final form, after any cleaning. + /// So if you pass `--clean-column-names=stable`, and your original column name is "Column Name", + /// you should pass "column_name" here. + #[structopt(long = "select-columns")] + select_columns: Option, + /// Drop any rows where the specified column is empty or NULL. Can be passed /// more than once. Useful for cleaning primary key columns before /// upserting. Uses the cleaned form of column names. @@ -226,13 +234,75 @@ fn run() -> Result<()> { hdr = new_hdr; } + // Calculate the number of expected columns read by the reader + // Different from the number of columns in the header + let expected_cols = hdr.len(); + + // If we have a list of columns to select, filter the header to only include those columns. + // Also create a Vec to track which columns to keep for later processing + let mut selected_cols = None; + // And store the length of the selected columns for later use + let mut selected_col_len = 0; + + if let Some(ref select_columns) = opt.select_columns { + let mut select_columns_rdr = csv::ReaderBuilder::new() + .has_headers(false) + .from_reader(select_columns.as_bytes()); + + let selected_columns_vec: Vec = match select_columns_rdr + .records() + .next() + { + Some(Ok(record)) => record.iter().map(|s| s.to_string()).collect(), + Some(Err(_)) => { + return Err(format_err!("The provided CSV of headers is invalid.")); + } + None => { + return Err(format_err!("The provided CSV of headers is empty.")); + } + }; + + // You might have submitted a single newline + if selected_columns_vec.is_empty() { + return Err(format_err!("The provided CSV of headers is empty.")); + } + + // Now we make sure that all of the selected columns are in the header + for col in selected_columns_vec.iter() { + if !hdr + .iter() + .any(|h| &String::from_utf8_lossy(h).to_string() == col) + { + return Err(format_err!( + "The provided CSV of headers does not contain the column {:?}", + col + )); + } + } + + selected_col_len = selected_columns_vec.len(); + let mut new_hdr = ByteRecord::default(); + let mut new_selected_cols = Vec::with_capacity(selected_col_len); + for col in hdr.iter() { + let col_str = String::from_utf8_lossy(col); + let keep = selected_columns_vec.contains(&col_str.to_string()); + debug!( + "column: {:?}, keep: {}, comparing against: {:?}", + col_str, keep, select_columns + ); + new_selected_cols.push(keep); + if keep { + new_hdr.push_field(col); + } + } + hdr = new_hdr; + selected_cols = Some(new_selected_cols); + } + // Write our header to our output. wtr.write_byte_record(&hdr) .context("cannot write headers")?; - // Calculate the number of expected columns. - let expected_cols = hdr.len(); - // Just in case --drop-row-if-null was passed, precompute which columns are // required to contain a value. let required_cols = hdr @@ -255,6 +325,7 @@ fn run() -> Result<()> { let use_fast_path = null_re.is_none() && !opt.replace_newlines && !opt.trim_whitespace + && opt.select_columns.is_none() && opt.drop_row_if_null.is_empty(); // Iterate over all the rows, checking to make sure they look reasonable. @@ -289,7 +360,21 @@ fn run() -> Result<()> { .context("cannot write record")?; } else { // We need to apply one or more cleanups, so run the slow path. - let cleaned = record.into_iter().map(|mut val: &[u8]| -> Cow<'_, [u8]> { + // Process each column, but only keep selected columns if specified + let mut cleaned = if !selected_cols.is_none() { + Vec::with_capacity(selected_col_len) + } else { + Vec::with_capacity(record.len()) + }; + + for (i, mut val) in record.into_iter().enumerate() { + // Skip this column if it's not in the selected columns + if let Some(ref selected_cols) = selected_cols { + if !selected_cols[i] { + continue; + } else { + } + } // Convert values matching `--null` regex to empty strings. if let Some(ref null_re) = null_re { if null_re.is_match(val) { @@ -317,21 +402,23 @@ fn run() -> Result<()> { } // Fix newlines. - if opt.replace_newlines + let processed_val = if opt.replace_newlines && (val.contains(&b'\n') || val.contains(&b'\r')) { NEWLINE_RE.replace_all(val, &b" "[..]) } else { Cow::Borrowed(val) - } - }); + }; + + cleaned.push(processed_val); + } if opt.drop_row_if_null.is_empty() { // Still somewhat fast! wtr.write_record(cleaned).context("cannot write record")?; } else { // We need to rebuild the record, check for null columns, // and only output the record if everything's OK. - let row = cleaned.collect::>>(); + let row = &cleaned; // Use the cleaned Vec directly for (value, &is_required_col) in row.iter().zip(required_cols.iter()) { // If the column is NULL but shouldn't be, bail on this row. if is_required_col && value.is_empty() { diff --git a/scrubcsv/tests/tests.rs b/scrubcsv/tests/tests.rs index 05cd114..28a667c 100644 --- a/scrubcsv/tests/tests.rs +++ b/scrubcsv/tests/tests.rs @@ -256,3 +256,44 @@ a,b,c "# ); } + +#[test] +fn select_columns() { + let testdir = TestDir::new("scrubcsv", "select_columns"); + let output = testdir + .cmd() + .arg("--select-columns=c1,c3") + .output_with_stdin( + r#"c1,c2,c3 +a,b,c +d,e,f +g,h,i +"#, + ) + .expect("error running scrubcsv"); + eprintln!("{}", output.stderr_str()); + assert_eq!( + output.stdout_str(), + r#"c1,c3 +a,c +d,f +g,i +"# + ); +} + +#[test] +fn select_columns_error_if_selected_columns_are_not_in_header() { + let testdir = TestDir::new( + "scrubcsv", + "select_columns_error_if_selected_columns_are_not_in_header", + ); + let output = testdir + .cmd() + .arg("--select-columns=a,b") + .output_with_stdin(r#"c1,c2,c3"#) + .expect_failure(); + assert!(output + .stderr_str() + .contains("The provided CSV of headers does not contain the column \"a\"")); +} From 0eafd41123d5105532aee3dff0837a1db1c74b57 Mon Sep 17 00:00:00 2001 From: Seamus Abshere Date: Tue, 4 Mar 2025 12:40:08 -0500 Subject: [PATCH 2/4] respect original order --- scrubcsv/src/main.rs | 86 ++++++++++++++++++++++++++++++++++------- scrubcsv/tests/tests.rs | 36 +++++++++++++++++ 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/scrubcsv/src/main.rs b/scrubcsv/src/main.rs index 6b999c1..ca0e8ba 100644 --- a/scrubcsv/src/main.rs +++ b/scrubcsv/src/main.rs @@ -242,7 +242,11 @@ fn run() -> Result<()> { // Also create a Vec to track which columns to keep for later processing let mut selected_cols = None; // And store the length of the selected columns for later use - let mut selected_col_len = 0; + let mut selected_cols_len = 0; + // Store if we need to reorder the columns + let mut selected_cols_require_reordering = false; + // Store the order of the selected columns + let mut selected_cols_order = None; if let Some(ref select_columns) = opt.select_columns { let mut select_columns_rdr = csv::ReaderBuilder::new() @@ -280,23 +284,50 @@ fn run() -> Result<()> { } } - selected_col_len = selected_columns_vec.len(); - let mut new_hdr = ByteRecord::default(); - let mut new_selected_cols = Vec::with_capacity(selected_col_len); + // The positions of selected columns in the original header + selected_cols_len = selected_columns_vec.len(); + let mut max_position_seen: i32 = -1; + let mut new_selected_cols: Vec = Vec::with_capacity(selected_cols_len); + // make an array of [final_position, original_position] + let mut final_to_original_position: Vec<(usize, usize)> = + Vec::with_capacity(selected_cols_len); + // this is the last time that i will know the original order of the columns (from hdr) + let mut original_position = 0; for col in hdr.iter() { let col_str = String::from_utf8_lossy(col); - let keep = selected_columns_vec.contains(&col_str.to_string()); - debug!( - "column: {:?}, keep: {}, comparing against: {:?}", - col_str, keep, select_columns - ); - new_selected_cols.push(keep); - if keep { - new_hdr.push_field(col); + match selected_columns_vec + .iter() + .position(|c| c == &col_str.to_string()) + { + Some(final_position) => { + new_selected_cols.push(true); + final_to_original_position + .push((final_position, original_position)); + original_position += 1; + if (final_position as i32) < max_position_seen { + selected_cols_require_reordering = true; + } else { + max_position_seen = final_position as i32; + } + } + None => new_selected_cols.push(false), } } + // so we'll have a final_to_original_position like { 2 => 1, 1 => 0, 0 => 2 } + // crete a "selected_cols_order" vec which has the values ordered by the keys + final_to_original_position.sort_by_key(|(k, _)| *k); + let new_selected_cols_order: Vec = + final_to_original_position.iter().map(|(_, v)| *v).collect(); + + // The new header is the selected columns + let mut new_hdr = ByteRecord::default(); + for col in selected_columns_vec.iter() { + new_hdr.push_field(col.as_bytes()); + } + hdr = new_hdr; selected_cols = Some(new_selected_cols); + selected_cols_order = Some(new_selected_cols_order); } // Write our header to our output. @@ -362,7 +393,7 @@ fn run() -> Result<()> { // We need to apply one or more cleanups, so run the slow path. // Process each column, but only keep selected columns if specified let mut cleaned = if !selected_cols.is_none() { - Vec::with_capacity(selected_col_len) + Vec::with_capacity(selected_cols_len) } else { Vec::with_capacity(record.len()) }; @@ -372,7 +403,6 @@ fn run() -> Result<()> { if let Some(ref selected_cols) = selected_cols { if !selected_cols[i] { continue; - } else { } } // Convert values matching `--null` regex to empty strings. @@ -412,6 +442,34 @@ fn run() -> Result<()> { cleaned.push(processed_val); } + if selected_cols_require_reordering { + // https://stackoverflow.com/a/69774341/310192 + fn sort_by_indices(data: &mut [T], mut indices: Vec) { + for idx in 0..data.len() { + if indices[idx] != idx { + let mut current_idx = idx; + loop { + let target_idx = indices[current_idx]; + indices[current_idx] = current_idx; + if indices[target_idx] == target_idx { + break; + } + data.swap(current_idx, target_idx); + current_idx = target_idx; + } + } + } + } + + // about 25% faster than creating a vector iteratively + sort_by_indices( + &mut cleaned, + selected_cols_order + .as_ref() + .expect("selected_cols_order should have a value here") + .clone(), + ); + } if opt.drop_row_if_null.is_empty() { // Still somewhat fast! wtr.write_record(cleaned).context("cannot write record")?; diff --git a/scrubcsv/tests/tests.rs b/scrubcsv/tests/tests.rs index 28a667c..5b5df99 100644 --- a/scrubcsv/tests/tests.rs +++ b/scrubcsv/tests/tests.rs @@ -297,3 +297,39 @@ fn select_columns_error_if_selected_columns_are_not_in_header() { .stderr_str() .contains("The provided CSV of headers does not contain the column \"a\"")); } + +#[test] +fn select_columns_respects_order() { + let testdir = TestDir::new("scrubcsv", "select_columns"); + let output = testdir + .cmd() + .arg("--select-columns=c5,c2,c4") + .output_with_stdin( + r#"c1,c2,c3,c4,c5 +c1-1,c2-1,c3-1,c4-1,c5-1 +c1-2,c2-2,c3-2,c4-2,c5-2 +c1-3,c2-3,c3-3,c4-3,c5-3 +"#, + ) + .expect("error running scrubcsv"); + eprintln!("{}", output.stderr_str()); + eprintln!("got"); + eprintln!("{}", output.stdout_str()); + eprintln!("expected"); + eprintln!( + "{}", + r#"c5,c2,c4 +c5-1,c2-1,c4-1 +c5-2,c2-2,c4-2 +c5-3,c2-3,c4-3 +"# + ); + assert_eq!( + output.stdout_str(), + r#"c5,c2,c4 +c5-1,c2-1,c4-1 +c5-2,c2-2,c4-2 +c5-3,c2-3,c4-3 +"# + ); +} From ccd07935fe0771f0bc76b916ec557ed6807e837a Mon Sep 17 00:00:00 2001 From: Seamus Abshere Date: Tue, 4 Mar 2025 16:39:06 -0500 Subject: [PATCH 3/4] cargo clippy --fix --- scrubcsv/src/main.rs | 2 +- scrubcsv/src/util.rs | 2 +- scrubcsv/tests/tests.rs | 17 ++++++++--------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/scrubcsv/src/main.rs b/scrubcsv/src/main.rs index ca0e8ba..431203a 100644 --- a/scrubcsv/src/main.rs +++ b/scrubcsv/src/main.rs @@ -392,7 +392,7 @@ fn run() -> Result<()> { } else { // We need to apply one or more cleanups, so run the slow path. // Process each column, but only keep selected columns if specified - let mut cleaned = if !selected_cols.is_none() { + let mut cleaned = if selected_cols.is_some() { Vec::with_capacity(selected_cols_len) } else { Vec::with_capacity(record.len()) diff --git a/scrubcsv/src/util.rs b/scrubcsv/src/util.rs index d0e7227..705a8ac 100644 --- a/scrubcsv/src/util.rs +++ b/scrubcsv/src/util.rs @@ -20,7 +20,7 @@ impl FromStr for CharSpecifier { type Err = Error; fn from_str(s: &str) -> Result { - if s.as_bytes().len() == 1 { + if s.len() == 1 { Ok(CharSpecifier(Some(s.as_bytes()[0]))) } else { match s { diff --git a/scrubcsv/tests/tests.rs b/scrubcsv/tests/tests.rs index 5b5df99..3934711 100644 --- a/scrubcsv/tests/tests.rs +++ b/scrubcsv/tests/tests.rs @@ -47,7 +47,7 @@ fn stdin_and_delimiter_and_quiet() { let testdir = TestDir::new("scrubcsv", "stdin_and_delimiter_and_quiet"); let output = testdir .cmd() - .args(&["-d", "|"]) + .args(["-d", "|"]) .arg("-q") .output_with_stdin( "\ @@ -78,8 +78,8 @@ a\tb\tc ); let output = testdir .cmd() - .args(&["-d", r"\t"]) - .args(&["--quote", "none"]) + .args(["-d", r"\t"]) + .args(["--quote", "none"]) .arg("in.csv") .expect_success(); assert_eq!( @@ -133,7 +133,7 @@ fn null_normalization() { let testdir = TestDir::new("scrubcsv", "null_normalization"); let output = testdir .cmd() - .args(&["--null", "(?i)null|NIL"]) + .args(["--null", "(?i)null|NIL"]) .output_with_stdin("a,b,c,d,e\nnull,NIL,nil,,not null\n") .expect_success(); assert_eq!(output.stdout_str(), "a,b,c,d,e\n,,,,not null\n") @@ -144,7 +144,7 @@ fn null_normalization_of_null_bytes() { let testdir = TestDir::new("scrubcsv", "null_normalization_of_null_bytes"); let output = testdir .cmd() - .args(&["--null", "\\x00"]) + .args(["--null", "\\x00"]) .output_with_stdin("a,b\n\0,\n") .expect_success(); assert_eq!(output.stdout_str(), "a,b\n,\n") @@ -237,7 +237,7 @@ fn drop_row_if_null() { .cmd() .arg("--drop-row-if-null=c1") .arg("--drop-row-if-null=c2") - .args(&["--null", "NULL"]) + .args(["--null", "NULL"]) .output_with_stdin( r#"c1,c2,c3 1,, @@ -317,12 +317,11 @@ c1-3,c2-3,c3-3,c4-3,c5-3 eprintln!("{}", output.stdout_str()); eprintln!("expected"); eprintln!( - "{}", - r#"c5,c2,c4 + "c5,c2,c4 c5-1,c2-1,c4-1 c5-2,c2-2,c4-2 c5-3,c2-3,c4-3 -"# +" ); assert_eq!( output.stdout_str(), From 95f523333474512e22df1df453444efcdf1f1fb7 Mon Sep 17 00:00:00 2001 From: Seamus Abshere Date: Tue, 4 Mar 2025 16:43:26 -0500 Subject: [PATCH 4/4] don't let people give dup columns in select --- scrubcsv/src/main.rs | 7 +++++++ scrubcsv/tests/tests.rs | 30 +++++++++++++++++++----------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/scrubcsv/src/main.rs b/scrubcsv/src/main.rs index 431203a..c577a52 100644 --- a/scrubcsv/src/main.rs +++ b/scrubcsv/src/main.rs @@ -9,6 +9,7 @@ use log::debug; use regex::{bytes::Regex as BytesRegex, Regex}; use std::{ borrow::Cow, + collections::HashSet, fs, io::{self, prelude::*}, path::PathBuf, @@ -283,6 +284,12 @@ fn run() -> Result<()> { )); } } + let mut seen = HashSet::new(); + if !selected_columns_vec.iter().all(|x| seen.insert(x)) { + return Err(format_err!( + "--select-columns cannot contain duplicate column names" + )); + } // The positions of selected columns in the original header selected_cols_len = selected_columns_vec.len(); diff --git a/scrubcsv/tests/tests.rs b/scrubcsv/tests/tests.rs index 3934711..539c669 100644 --- a/scrubcsv/tests/tests.rs +++ b/scrubcsv/tests/tests.rs @@ -312,17 +312,6 @@ c1-3,c2-3,c3-3,c4-3,c5-3 "#, ) .expect("error running scrubcsv"); - eprintln!("{}", output.stderr_str()); - eprintln!("got"); - eprintln!("{}", output.stdout_str()); - eprintln!("expected"); - eprintln!( - "c5,c2,c4 -c5-1,c2-1,c4-1 -c5-2,c2-2,c4-2 -c5-3,c2-3,c4-3 -" - ); assert_eq!( output.stdout_str(), r#"c5,c2,c4 @@ -332,3 +321,22 @@ c5-3,c2-3,c4-3 "# ); } + +#[test] +fn select_columns_handles_duplicate_selected_columns() { + let testdir = TestDir::new("scrubcsv", "select_columns"); + let output = testdir + .cmd() + .arg("--select-columns=c5,c5,c4") + .output_with_stdin( + r#"c1,c2,c3,c4,c5 +c1-1,c2-1,c3-1,c4-1,c5-1 +c1-2,c2-2,c3-2,c4-2,c5-2 +c1-3,c2-3,c3-3,c4-3,c5-3 +"#, + ) + .expect_failure(); + assert!(output + .stderr_str() + .contains("--select-columns cannot contain duplicate column names")); +}