Skip to content

Commit

Permalink
Attempt to connect lineage of tables pre- and post-BashOperators
Browse files Browse the repository at this point in the history
  • Loading branch information
jmelot committed May 14, 2024
1 parent 97aa649 commit 50c3b65
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime

from airflow import DAG
from airflow.lineage.entities import File
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -394,6 +395,23 @@
create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
# These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task -
# but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files
# outside the VM it runs on and this task depends on all of these things, directly or indirectly
inlets=[
File(url=f"gcs:{bucket}.{tmp_dir}/article_pairs/*"),
File(url=f"gcs:{bucket}.{tmp_dir}/simhash_input/*"),
File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"),
File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"),
File(url=f"gcs:{bucket}.{tmp_dir}/unlink/*"),
File(url=f"gcs:{bucket}.{tmp_dir}/ids_to_drop/data*.jsonl"),
File(url=f"gcs:{bucket}.{tmp_dir}/prev_id_mapping/*"),
],
outlets=[
File(url=f"gcs:{bucket}.{tmp_dir}/id_mapping.jsonl"),
File(url=f"gcs:{bucket}.{gcs_folder}/simhash_results/*"),
File(url=f"gcs:{bucket}.{gcs_folder}/simhash_indexes/*"),
],
)

wait_for_cset_ids = GCSObjectExistenceSensor(
Expand Down Expand Up @@ -425,6 +443,8 @@
"fields_to_lid": "title,abstract",
"region": "us-east1",
},
inlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_input/lid_input*")],
outlets=[File(url=f"gcs:{bucket}.{tmp_dir}/lid_output/lid*")],
)

# turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and
Expand Down

0 comments on commit 50c3b65

Please sign in to comment.