From b4b181b02c9a172f7f71a2fe0a831c10c26f9888 Mon Sep 17 00:00:00 2001 From: Rebecca Date: Fri, 11 Aug 2023 11:30:55 -0400 Subject: [PATCH 1/2] Remove cld-lid table from the production dataset --- linkage_dag.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/linkage_dag.py b/linkage_dag.py index 2d05466..151a3c8 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -409,8 +409,9 @@ # we're about to copy tables from staging to production, so do checks to make sure we haven't broken anything # along the way check_queries = [] - production_tables = ["sources", "references", "all_metadata_with_cld2_lid"] - for table_name in production_tables: + staging_tables = ["sources", "references", "all_metadata_with_cld2_lid"] + production_tables = ["sources", "references"] + for table_name in staging_tables: check_queries.append(BigQueryCheckOperator( task_id="check_monotonic_increase_"+table_name.lower(), sql=(f"select (select count(0) from {staging_dataset}.{table_name}) >= " From 2d2f835f0750615331960c76e79e7b899d28e073 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Wed, 16 Aug 2023 14:59:55 -0400 Subject: [PATCH 2/2] Update pipeline and simhash_input to referene a copy of the cld2 table from the last run of the pipeline --- linkage_dag.py | 22 +++++++++++++++++++++- sql/simhash_input.sql | 2 +- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/linkage_dag.py b/linkage_dag.py index 151a3c8..4711aa8 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -409,7 +409,8 @@ # we're about to copy tables from staging to production, so do checks to make sure we haven't broken anything # along the way check_queries = [] - staging_tables = ["sources", "references", "all_metadata_with_cld2_lid"] + all_metadata_table = "all_metadata_with_cld2_lid" + staging_tables = ["sources", "references", all_metadata_table] production_tables = ["sources", "references"] for table_name in staging_tables: check_queries.append(BigQueryCheckOperator( @@ -490,6 +491,25 @@ ) start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert + # We don't show the "all metadata" table in the production dataset, but we do need to + # be able to diff the current data from the data used in the last run in simhash_input + copy_cld2 = BigQueryToBigQueryOperator( + task_id=f"copy_{all_metadata_table}", + source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], + destination_project_dataset_table=f"{staging_dataset}.{all_metadata_table}_last_run", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE" + ) + + snapshot_cld2 = BigQueryToBigQueryOperator( + task_id=f"snapshot_{all_metadata_table}", + source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], + destination_project_dataset_table=f"{backup_dataset}.{all_metadata_table}_{curr_date}", + create_disposition="CREATE_IF_NEEDED", + write_disposition="WRITE_TRUNCATE" + ) + start_production_cp >> copy_cld2 >> snapshot_cld2 >> success_alert + # task structure clear_tmp_dir >> metadata_sequences_start (metadata_sequences_end >> union_ids >> check_unique_input_ids >> union_metadata >> export_metadata >> diff --git a/sql/simhash_input.sql b/sql/simhash_input.sql index a8d395a..9310842 100644 --- a/sql/simhash_input.sql +++ b/sql/simhash_input.sql @@ -11,7 +11,7 @@ where id not in ( select a.id from {{staging_dataset}}.all_metadata_norm_filt a left join - {{production_dataset}}.all_metadata_with_cld2_lid b + {{staging_dataset}}.all_metadata_with_cld2_lid_last_run b on a.id = b.id where (a.title = b.title) and (a.abstract = b.abstract) and (a.year = b.year) and (a.title != "") and (a.title is not null) and (a.abstract != "") and (a.abstract is not null) and (a.year is not null)