From 50c3b65ebad26cb380bca330803083004ef13d64 Mon Sep 17 00:00:00 2001 From: Jennifer Melot Date: Tue, 14 May 2024 11:51:36 -0400 Subject: [PATCH] Attempt to connect lineage of tables pre- and post-BashOperators --- linkage_dag.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/linkage_dag.py b/linkage_dag.py index 0124bb9..36510fc 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -3,6 +3,7 @@ from datetime import datetime from airflow import DAG +from airflow.lineage.entities import File from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -394,6 +395,23 @@ create_cset_ids = BashOperator( task_id="create_cset_ids", bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', + # These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task - + # but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files + # outside the VM it runs on and this task depends on all of these things, directly or indirectly + inlets=[ + File(url=f"gcs:{bucket}.{tmp_dir}/article_pairs/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/simhash_input/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/unlink/*"), + File(url=f"gcs:{bucket}.{tmp_dir}/ids_to_drop/data*.jsonl"), + File(url=f"gcs:{bucket}.{tmp_dir}/prev_id_mapping/*"), + ], + outlets=[ + File(url=f"gcs:{bucket}.{tmp_dir}/id_mapping.jsonl"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"), + File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"), + ], ) wait_for_cset_ids = GCSObjectExistenceSensor( @@ -425,6 +443,8 @@ "fields_to_lid": "title,abstract", "region": "us-east1", }, + inlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_input/lid_input*")], + outlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_output/lid*")], ) # turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and