Skip to content

Commit

Permalink
Revert changes in xref release extraction
Browse files Browse the repository at this point in the history
Not sure why the plars approach doesn't work.

It needs fixing and this approach works so we will use it for the release
  • Loading branch information
afg1 committed Oct 5, 2023
1 parent 84a56c4 commit a97b8b8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 62 deletions.
38 changes: 29 additions & 9 deletions files/precompute/fetch-xref-info.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
COPY(
SELECT
rna.id,
xref.upi,
xref.last
FROM xref
JOIN rna
ON
rna.upi = xref.upi
CREATE TEMP TABLE xref_releases AS
SELECT
SELECT
rna.id,
rna.id as rna_id,
xref.upi,
xref.upi,
xref.last
xref.last
FROM xref
FROM xref
JOIN rna
JOIN rna
ON
ON
rna.upi = xref.upi
rna.upi = xref.upi
;

CREATE INDEX ix_xref_releases_upi ON xref_releases(upi);

COPY (
SELECT
rna_id,
upi,
max(last)
from xref_releases
group by rna_id, upi
order by rna_id ASC

) TO STDOUT (FORMAT CSV)
129 changes: 76 additions & 53 deletions utils/precompute/src/releases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,60 +109,83 @@ pub fn write_max(filename: &Path, output: &Path) -> Result<()> {
/// }
/// ```
pub fn select_new(xrefs: &Path, known: &Path, output: &Path, streaming: bool) -> Result<()> {
println!("{:?} {:?} {:?}", xrefs, known, streaming);
let known_path = known.to_str().unwrap().to_owned();
let xrefs_path = xrefs.to_str().unwrap().to_owned();
let xref_records: LazyFrame = LazyCsvReader::new(xrefs_path)
.has_header(false)
.low_memory(streaming)
.finish()?
.rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"])
.group_by(["id"])
.agg([col("last").max().alias("last"), col("upi").first().alias("upi")])
.sort("id", Default::default());


let known_records: LazyFrame = LazyCsvReader::new(known_path)
.has_header(false)
.low_memory(streaming)
.finish()
.unwrap()
.rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"])
.group_by(["id"])
.agg([col("last").max().alias("last")])
.sort("id", Default::default());


let selection: LazyFrame = xref_records
.join(
known_records,
[col("id")],
[col("id")],
JoinArgs::new(JoinType::Outer),
)
.rename(vec!["last", "last_right"], vec!["last_xref", "last_precompute"])
.with_columns([
col("last_xref").gt(col("last_precompute")).alias("selected"),
col("last_precompute").gt(col("last_xref")).alias("error_state"),
]);
// .select([col("upi"), col("selected"), col("error_state")]);

let check: LazyFrame = selection.clone();

// // check we are not in a catastrophic error state - precompute should never be newer than
// // xref
let selected_urs = selection.filter(col("selected").eq(true)).with_streaming(streaming).collect()?;
let error_urs = check.filter(col("error_state").eq(true)).with_streaming(streaming).collect()?;
if error_urs.height() > 0 {
return Err(anyhow!("Precompute newer than xref for these UPIs: {:?}", error_urs));
// println!("{:?} {:?} {:?}", xrefs, known, streaming);
// let known_path = known.to_str().unwrap().to_owned();
// let xrefs_path = xrefs.to_str().unwrap().to_owned();
// let xref_records: LazyFrame = LazyCsvReader::new(xrefs_path)
// .has_header(false)
// .low_memory(streaming)
// .finish()?
// .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"])
// .group_by(["upi"])
// .agg([col("last").max().alias("last"), col("id").first().alias("id")])
// .sort("id", Default::default());


// let known_records: LazyFrame = LazyCsvReader::new(known_path)
// .has_header(false)
// .low_memory(streaming)
// .finish()
// .unwrap()
// .rename(vec!["column_1", "column_2", "column_3"], vec!["id", "upi", "last"])
// .group_by(["upi"])
// .agg([col("last").max().alias("last"), col("id").first().alias("id")])
// .sort("id", Default::default());


// let selection: LazyFrame = xref_records
// .join(
// known_records,
// [col("upi")],
// [col("upi")],
// JoinArgs::new(JoinType::Outer),
// )
// .rename(vec!["last", "last_right"], vec!["last_xref", "last_precompute"])
// .with_columns([
// col("last_xref").gt(col("last_precompute")).alias("selected"),
// col("last_precompute").gt(col("last_xref")).alias("error_state"),
// ]);
// // .select([col("upi"), col("selected"), col("error_state")]);

// let check: LazyFrame = selection.clone();

// // // check we are not in a catastrophic error state - precompute should never be newer than
// // // xref
// let selected_urs = selection.filter(col("selected").eq(true)).with_streaming(streaming).collect()?;
// let error_urs = check.filter(col("error_state").eq(true)).with_streaming(streaming).collect()?;
// if error_urs.height() > 0 {
// return Err(anyhow!("Precompute newer than xref for these UPIs: {:?}", error_urs));
// }
// println!("{:?}", selected_urs);

// let mut selected_upis =
// selected_urs.select(["upi"])?.unique(None, UniqueKeepStrategy::First, None)?;

// let out_stream: File = File::create(output).unwrap();
// CsvWriter::new(out_stream).has_header(false).finish(&mut selected_upis)?;

let xref_records = entries(xrefs)?.map(|e: UrsEntry| (e.id, e)).assume_sorted_by_key();
let known_records = entries(known)?.map(|e: UrsEntry| (e.id, e)).assume_sorted_by_key();

let mut writer = csv::Writer::from_writer(File::create(output)?);
let pairs = xref_records.outer_join(known_records);
for (_key, (xref, pre)) in pairs {
match (xref, pre) {
(Some(x), Some(p)) => match x.release.cmp(&p.release) {
Less => Err(anyhow!(
"This should never happen, too small release for {:?} vs {:?}",
&x,
&p
))?,
Equal => (),
Greater => writer.write_record(&[x.urs])?,
},
(Some(x), None) => writer.write_record(&[x.urs])?,
(None, Some(_)) => (),
(None, None) => (),
}
}
println!("{:?}", selected_urs);

let mut selected_upis =
selected_urs.select(["upi"])?.unique(None, UniqueKeepStrategy::First, None)?;

let out_stream: File = File::create(output).unwrap();
CsvWriter::new(out_stream).has_header(false).finish(&mut selected_upis)?;
writer.flush()?;

Ok(())
}
Expand Down

0 comments on commit a97b8b8

Please sign in to comment.