Skip to content

Commit

Permalink
Merge pull request #43 from georgetown-cset/42-lineage
Browse files Browse the repository at this point in the history
Attempt to connect lineage of tables pre- and post-BashOperators
  • Loading branch information
jmelot authored May 17, 2024
2 parents 97aa649 + 5caaa21 commit f451003
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from datetime import datetime

from airflow import DAG
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -394,6 +395,37 @@
create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
# These are also inputs to `update_simhash_index` - in fact some of them are only directly used in that task -
# but I think just reporting them all on this task is ok since `update_simhash_index` doesn't update any files
# outside the VM it runs on and this task depends on all of these things, directly or indirectly
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=production_dataset, table_id="sources"
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="all_match_pairs_with_um",
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="simhash_input",
),
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="unlink"
),
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="ids_to_drop",
),
],
outlets=[
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="id_mapping"
),
],
)

wait_for_cset_ids = GCSObjectExistenceSensor(
Expand Down Expand Up @@ -425,6 +457,18 @@
"fields_to_lid": "title,abstract",
"region": "us-east1",
},
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=staging_dataset, table_id="lid_input"
)
],
outlets=[
BigQueryTable(
project_id=project_id,
dataset_id=staging_dataset,
table_id="all_metadata_with_cld2_lid",
)
],
)

# turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and
Expand Down

0 comments on commit f451003

Please sign in to comment.