From 50c3b65ebad26cb380bca330803083004ef13d64 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 14 May 2024 11:51:36 -0400 Subject: [PATCH 1/2] Attempt to connect lineage of tables pre- and post-BashOperators --- linkage_dag.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/linkage_dag.py b/linkage_dag.py index 0124bb9..36510fc 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -3,6 +3,7 @@ from datetime import datetime from airflow import DAG +from airflow.lineage.entities import File from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -394,6 +395,23 @@ create_cset_ids = BashOperator( task_id="create_cset_ids", bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', + # These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task - + # but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files + # outside the VM it runs on and this task depends on all of these things, directly or indirectly + inlets=[ + File(url=f"gcs:{bucket}.{tmp_dir}/article_pairs/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/simhash_input/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/unlink/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/ids_to_drop/data*.jsonl"), + File(url=f"gcs:{bucket}.{tmp_dir}/prev_id_mapping/*"), + ], + outlets=[ + File(url=f"gcs:{bucket}.{tmp_dir}/id_mapping.jsonl"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), + ], ) wait_for_cset_ids = GCSObjectExistenceSensor( @@ -425,6 +443,8 @@ "fields_to_lid": "title,abstract", "region": "us-east1", }, + inlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_input/lid_input*")], + outlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_output/lid*")], ) # turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and From 5caaa210629627855a597619bb3b3440ebfd9af8 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Fri, 17 May 2024 09:42:10 -0400 Subject: [PATCH 2/2] Try switching to BigQueryTable inlets and outlets... --- linkage_dag.py | 50 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/linkage_dag.py b/linkage_dag.py index 36510fc..68497bf 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -3,7 +3,7 @@ from datetime import datetime from airflow import DAG -from airflow.lineage.entities import File +from airflow.composer.data_lineage.entities import BigQueryTable from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -399,18 +399,32 @@ # but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files # outside the VM it runs on and this task depends on all of these things, directly or indirectly inlets=[ - File(url=f"gcs:{bucket}.{tmp_dir}/article_pairs/*"), - File(url=f"gcs:{bucket}.{tmp_dir}/simhash_input/*"), - File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), - File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), - File(url=f"gcs:{bucket}.{tmp_dir}/unlink/*"), - File(url=f"gcs:{bucket}.{tmp_dir}/ids_to_drop/data*.jsonl"), - File(url=f"gcs:{bucket}.{tmp_dir}/prev_id_mapping/*"), + BigQueryTable( + project_id=project_id, dataset_id=production_dataset, table_id="sources" + ), + BigQueryTable( + project_id=project_id, + dataset_id=staging_dataset, + table_id="all_match_pairs_with_um", + ), + BigQueryTable( + project_id=project_id, + dataset_id=staging_dataset, + table_id="simhash_input", + ), + BigQueryTable( + project_id=project_id, dataset_id=staging_dataset, table_id="unlink" + ), + BigQueryTable( + project_id=project_id, + dataset_id=staging_dataset, + table_id="ids_to_drop", + ), ], outlets=[ - File(url=f"gcs:{bucket}.{tmp_dir}/id_mapping.jsonl"), - File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), - File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), + BigQueryTable( + project_id=project_id, dataset_id=staging_dataset, table_id="id_mapping" + ), ], ) @@ -443,8 +457,18 @@ "fields_to_lid": "title,abstract", "region": "us-east1", }, - inlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_input/lid_input*")], - outlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_output/lid*")], + inlets=[ + BigQueryTable( + project_id=project_id, dataset_id=staging_dataset, table_id="lid_input" + ) + ], + outlets=[ + BigQueryTable( + project_id=project_id, + dataset_id=staging_dataset, + table_id="all_metadata_with_cld2_lid", + ) + ], ) # turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and