Skip to content

Commit

Permalink
Merge pull request #32 from georgetown-cset/other-table-renaming
Browse files Browse the repository at this point in the history
Rename remaining tables
  • Loading branch information
jmelot authored Aug 1, 2023
2 parents 4b5c296 + 3b070f2 commit caec1f7
Show file tree
Hide file tree
Showing 15 changed files with 22 additions and 209 deletions.
51 changes: 17 additions & 34 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
DAGS_DIR, get_default_args, get_post_success


production_dataset = "gcp_cset_links_v3"
production_dataset = "literature"
staging_dataset = f"staging_{production_dataset}"

with DAG("article_linkage_updater",
Expand Down Expand Up @@ -410,8 +410,7 @@
# we're about to copy tables from staging to production, so do checks to make sure we haven't broken anything
# along the way
check_queries = []
production_tables = ["all_metadata_with_cld2_lid", "article_links", "article_links_with_dataset",
"article_merged_meta", "references", "article_links_nested"]
production_tables = ["sources", "references", "all_metadata_with_cld2_lid"]
for table_name in production_tables:
check_queries.append(BigQueryCheckOperator(
task_id="check_monotonic_increase_"+table_name.lower(),
Expand All @@ -420,15 +419,12 @@
use_legacy_sql=False
))

for table_name, pk in [("article_links", "orig_id"), ("article_links_with_dataset", "orig_id"),
("article_merged_meta", "merged_id")]:
check_queries.append(BigQueryCheckOperator(
task_id="check_pks_are_unique_"+table_name.lower(),
sql=f"select count({pk}) = count(distinct({pk})) from {staging_dataset}.{table_name}",
use_legacy_sql=False
))

check_queries.extend([
BigQueryCheckOperator(
task_id="check_pks_are_unique_sources",
sql=f"select count(orig_id) = count(distinct(orig_id)) from {staging_dataset}.sources",
use_legacy_sql=False
),
BigQueryCheckOperator(
task_id="all_ids_survived",
sql=(f"select count(0) = 0 from (select id from {staging_dataset}.union_ids "
Expand Down Expand Up @@ -463,38 +459,26 @@

# We're done! Checks passed, so copy to production and post success to slack
start_production_cp = DummyOperator(task_id="start_production_cp")
success_alert = get_post_success("Article linkage update succeeded!", dag)
curr_date = datetime.now().strftime("%Y%m%d")
with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f:
table_desc = json.loads(f.read())

push_to_production = []
for table in production_tables:
push_to_production.append(BigQueryToBigQueryOperator(
push_to_production = BigQueryToBigQueryOperator(
task_id="copy_"+table.lower(),
source_project_dataset_tables=[f"{staging_dataset}.{table}"],
destination_project_dataset_table=f"{production_dataset}.{table}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
))

wait_for_production_copy = DummyOperator(task_id="wait_for_production_copy")

snapshots = []
curr_date = datetime.now().strftime("%Y%m%d")
for table in ["article_links", "article_links_nested"]:
# mk the snapshot predictions table
snapshots.append(BigQueryToBigQueryOperator(
)
snapshot = BigQueryToBigQueryOperator(
task_id=f"snapshot_{table}",
source_project_dataset_tables=[f"{production_dataset}.{table}"],
destination_project_dataset_table=f"{backup_dataset}.{table}_{curr_date}",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE"
))

wait_for_snapshots = DummyOperator(task_id="wait_for_snapshots")

success_alert = get_post_success("Article linkage update succeeded!", dag)

with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f:
table_desc = json.loads(f.read())
for table in production_tables:
)
pop_descriptions = PythonOperator(
task_id="populate_column_documentation_for_" + table,
op_kwargs={
Expand All @@ -504,7 +488,7 @@
},
python_callable=update_table_descriptions
)
wait_for_snapshots >> pop_descriptions >> success_alert
start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert

downstream_tasks = [
TriggerDagRunOperator(task_id="trigger_article_classification", trigger_dag_id="article_classification"),
Expand All @@ -520,7 +504,6 @@
(last_combination_query >> heavy_compute_inputs >> gce_instance_start >> [create_cset_ids, run_lid] >>
gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries)

(last_transform_query >> check_queries >> start_production_cp >> push_to_production >> wait_for_production_copy >>
snapshots >> wait_for_snapshots)
last_transform_query >> check_queries >> start_production_cp

success_alert >> downstream_tasks
57 changes: 0 additions & 57 deletions schemas/all_metadata_norm.json

This file was deleted.

14 changes: 0 additions & 14 deletions schemas/article_links.json

This file was deleted.

14 changes: 0 additions & 14 deletions schemas/article_links_nested.json

This file was deleted.

38 changes: 0 additions & 38 deletions schemas/article_merged_meta.json

This file was deleted.

14 changes: 0 additions & 14 deletions schemas/paper_references_merged.json

This file was deleted.

File renamed without changes.
5 changes: 1 addition & 4 deletions schemas/table_descriptions.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
{
"all_metadata_with_cld2_lid": "All metadata for the articles used in linkage.",
"article_links": "Maps orig_ids to merged_ids",
"article_links_nested": "Maps one merged_id to all its constitutent orig_ids",
"article_links_with_dataset": "Maps a merged id to each of its orig_ids and their dataset",
"article_merged_meta": "*Deprecated* -- use gcp_cset_links_v2.corpus_merged",
"sources": "Maps a merged id to each of its orig_ids and their dataset",
"references": "Maps a paper's merged id to the merged ids of papers it references"
}
7 changes: 2 additions & 5 deletions sequences/generate_merged_metadata.tsv
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
article_links_with_meta
article_links_nested
article_merged_meta
article_links_with_dataset
references
sources
references
6 changes: 0 additions & 6 deletions sql/article_links_nested.sql

This file was deleted.

12 changes: 0 additions & 12 deletions sql/article_links_with_meta.sql

This file was deleted.

9 changes: 0 additions & 9 deletions sql/article_merged_meta.sql

This file was deleted.

2 changes: 1 addition & 1 deletion sql/references.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ FROM (
SELECT
orig_id
FROM
{{ staging_dataset }}.article_links_with_dataset )) AS references
{{ staging_dataset }}.sources )) AS references
LEFT JOIN
{{ staging_dataset }}.article_links AS links1
ON
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion sql/union_metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mapped_references as (
meta
cross join unnest(split(references, ",")) as orig_id_ref
inner join
{{ production_dataset }}.article_links
{{ production_dataset }}.sources
on orig_id_ref = orig_id
group by id
)
Expand Down

0 comments on commit caec1f7

Please sign in to comment.