From a97b8b8ab58eaf19180635f0eb0330c73981f965 Mon Sep 17 00:00:00 2001 From: Andrew Green Date: Thu, 5 Oct 2023 10:43:37 +0100 Subject: [PATCH] Revert changes in xref release extraction Not sure why the plars approach doesn't work. It needs fixing and this approach works so we will use it for the release --- files/precompute/fetch-xref-info.sql | 38 ++++++-- utils/precompute/src/releases.rs | 129 ++++++++++++++++----------- 2 files changed, 105 insertions(+), 62 deletions(-) diff --git a/files/precompute/fetch-xref-info.sql b/files/precompute/fetch-xref-info.sql index 15a5ca5d..99f7dc4b 100644 --- a/files/precompute/fetch-xref-info.sql +++ b/files/precompute/fetch-xref-info.sql @@ -1,11 +1,31 @@ -COPY( - SELECT - rna.id, - xref.upi, - xref.last - FROM xref - JOIN rna - ON - rna.upi = xref.upi +CREATE TEMP TABLE xref_releases AS +SELECT +SELECT + rna.id, + rna.id as rna_id, + xref.upi, + xref.upi, + xref.last + xref.last +FROM xref +FROM xref +JOIN rna +JOIN rna +ON +ON + rna.upi = xref.upi + rna.upi = xref.upi +; + +CREATE INDEX ix_xref_releases_upi ON xref_releases(upi); + +COPY ( +SELECT + rna_id, + upi, + max(last) +from xref_releases +group by rna_id, upi +order by rna_id ASC ) TO STDOUT (FORMAT CSV) diff --git a/utils/precompute/src/releases.rs b/utils/precompute/src/releases.rs index bbb2309d..2e80d23b 100644 --- a/utils/precompute/src/releases.rs +++ b/utils/precompute/src/releases.rs @@ -109,60 +109,83 @@ pub fn write_max(filename: &Path, output: &Path) -> Result<()> { /// } /// ``` pub fn select_new(xrefs: &Path, known: &Path, output: &Path, streaming: bool) -> Result<()> { - println!("{:?} {:?} {:?}", xrefs, known, streaming); - let known_path = known.to_str().unwrap().to_owned(); - let xrefs_path = xrefs.to_str().unwrap().to_owned(); - let xref_records: LazyFrame = LazyCsvReader::new(xrefs_path) - .has_header(false) - .low_memory(streaming) - .finish()? - .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"]) - .group_by(["id"]) - .agg([col("last").max().alias("last"), col("upi").first().alias("upi")]) - .sort("id", Default::default()); - - - let known_records: LazyFrame = LazyCsvReader::new(known_path) - .has_header(false) - .low_memory(streaming) - .finish() - .unwrap() - .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"]) - .group_by(["id"]) - .agg([col("last").max().alias("last")]) - .sort("id", Default::default()); - - - let selection: LazyFrame = xref_records - .join( - known_records, - [col("id")], - [col("id")], - JoinArgs::new(JoinType::Outer), - ) - .rename(vec!["last", "last_right"], vec!["last_xref", "last_precompute"]) - .with_columns([ - col("last_xref").gt(col("last_precompute")).alias("selected"), - col("last_precompute").gt(col("last_xref")).alias("error_state"), - ]); - // .select([col("upi"), col("selected"), col("error_state")]); - - let check: LazyFrame = selection.clone(); - - // // check we are not in a catastrophic error state - precompute should never be newer than - // // xref - let selected_urs = selection.filter(col("selected").eq(true)).with_streaming(streaming).collect()?; - let error_urs = check.filter(col("error_state").eq(true)).with_streaming(streaming).collect()?; - if error_urs.height() > 0 { - return Err(anyhow!("Precompute newer than xref for these UPIs: {:?}", error_urs)); + // println!("{:?} {:?} {:?}", xrefs, known, streaming); + // let known_path = known.to_str().unwrap().to_owned(); + // let xrefs_path = xrefs.to_str().unwrap().to_owned(); + // let xref_records: LazyFrame = LazyCsvReader::new(xrefs_path) + // .has_header(false) + // .low_memory(streaming) + // .finish()? + // .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"]) + // .group_by(["upi"]) + // .agg([col("last").max().alias("last"), col("id").first().alias("id")]) + // .sort("id", Default::default()); + + + // let known_records: LazyFrame = LazyCsvReader::new(known_path) + // .has_header(false) + // .low_memory(streaming) + // .finish() + // .unwrap() + // .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"]) + // .group_by(["upi"]) + // .agg([col("last").max().alias("last"), col("id").first().alias("id")]) + // .sort("id", Default::default()); + + + // let selection: LazyFrame = xref_records + // .join( + // known_records, + // [col("upi")], + // [col("upi")], + // JoinArgs::new(JoinType::Outer), + // ) + // .rename(vec!["last", "last_right"], vec!["last_xref", "last_precompute"]) + // .with_columns([ + // col("last_xref").gt(col("last_precompute")).alias("selected"), + // col("last_precompute").gt(col("last_xref")).alias("error_state"), + // ]); + // // .select([col("upi"), col("selected"), col("error_state")]); + + // let check: LazyFrame = selection.clone(); + + // // // check we are not in a catastrophic error state - precompute should never be newer than + // // // xref + // let selected_urs = selection.filter(col("selected").eq(true)).with_streaming(streaming).collect()?; + // let error_urs = check.filter(col("error_state").eq(true)).with_streaming(streaming).collect()?; + // if error_urs.height() > 0 { + // return Err(anyhow!("Precompute newer than xref for these UPIs: {:?}", error_urs)); + // } + // println!("{:?}", selected_urs); + + // let mut selected_upis = + // selected_urs.select(["upi"])?.unique(None, UniqueKeepStrategy::First, None)?; + + // let out_stream: File = File::create(output).unwrap(); + // CsvWriter::new(out_stream).has_header(false).finish(&mut selected_upis)?; + + let xref_records = entries(xrefs)?.map(|e: UrsEntry| (e.id, e)).assume_sorted_by_key(); + let known_records = entries(known)?.map(|e: UrsEntry| (e.id, e)).assume_sorted_by_key(); + + let mut writer = csv::Writer::from_writer(File::create(output)?); + let pairs = xref_records.outer_join(known_records); + for (_key, (xref, pre)) in pairs { + match (xref, pre) { + (Some(x), Some(p)) => match x.release.cmp(&p.release) { + Less => Err(anyhow!( + "This should never happen, too small release for {:?} vs {:?}", + &x, + &p + ))?, + Equal => (), + Greater => writer.write_record(&[x.urs])?, + }, + (Some(x), None) => writer.write_record(&[x.urs])?, + (None, Some(_)) => (), + (None, None) => (), + } } - println!("{:?}", selected_urs); - - let mut selected_upis = - selected_urs.select(["upi"])?.unique(None, UniqueKeepStrategy::First, None)?; - - let out_stream: File = File::create(output).unwrap(); - CsvWriter::new(out_stream).has_header(false).finish(&mut selected_upis)?; + writer.flush()?; Ok(()) }