Skip to content

Commit

Permalink
Programmatically create and destroy linkage VM on each run
Browse files Browse the repository at this point in the history
Closes #50
  • Loading branch information
jmelot committed Oct 25, 2024
1 parent 4f3e24f commit 2756b12
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
99 changes: 74 additions & 25 deletions linkage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineDeleteInstanceOperator,
ComputeEngineInsertInstanceOperator,
ComputeEngineStartInstanceOperator,
ComputeEngineStopInstanceOperator,
)
Expand Down Expand Up @@ -64,8 +66,7 @@
sql_dir = f"sql/{gcs_folder}"
backup_dataset = production_dataset + "_backups"
project_id = PROJECT_ID
gce_zone = GCP_ZONE
gce_resource_id = "godzilla-of-article-linkage"
gce_resource_id = "article-linkage"

# We keep several intermediate outputs in a tmp dir on gcs, so clean it out at the start of each run. We clean at
# the start of the run so if the run fails we can examine the failed data
Expand Down Expand Up @@ -307,7 +308,7 @@
last_combination_query >> next
last_combination_query = next

# Now, we need to prep some inputs for RAM and CPU-intensive code that will run on "godzilla of article linkage".
# Now, we need to prep some inputs for RAM and CPU-intensive code that will run on the linkage VM
heavy_compute_inputs = [
BigQueryToGCSOperator(
task_id="export_old_cset_ids",
Expand Down Expand Up @@ -341,19 +342,55 @@
),
]

# Start up godzilla of article linkage, create the merged ids
gce_instance_create = ComputeEngineInsertInstanceOperator(
task_id=f"create_{gce_resource_id}",
project_id=PROJECT_ID,
zone=GCP_ZONE,
body={
"name": gce_resource_id,
"machine_type": f"zones/{GCP_ZONE}/machineTypes/m1-megamem-96",
"disks": [
{
"boot": True,
"auto_delete": True,
"initialize_params": {
"disk_size_gb": "500",
"disk_type": f"zones/{GCP_ZONE}/diskTypes/pd-balanced",
"source_image": "projects/ubuntu-os-cloud/global/images/ubuntu-2204-jammy-v20240927",
},
}
],
"network_interfaces": [
{
"access_configs": [
{"name": "External NAT", "network_tier": "PREMIUM"}
],
"stack_type": "IPV4_ONLY",
"subnetwork": "regions/us-east1/subnetworks/default",
}
],
"service_accounts": [
{
"email": "dataloader@gcp-cset-projects.iam.gserviceaccount.com",
"scopes": [
"https://www.googleapis.com/auth/devstorage.full_control",
"https://www.googleapis.com/auth/cloud-platform",
],
}
],
},
)

gce_instance_start = ComputeEngineStartInstanceOperator(
project_id=project_id,
zone=gce_zone,
task_id=f"start-{gce_resource_id}",
project_id=PROJECT_ID,
zone=GCP_ZONE,
resource_id=gce_resource_id,
task_id="start-" + gce_resource_id,
)

prep_environment_script_sequence = [
f"/snap/bin/gsutil cp gs://{bucket}/{gcs_folder}/vm_scripts/*.sh .",
"cd /mnt/disks/data",
"rm -rf run",
"mkdir run",
"rm -rf run; mkdir run",
"cd run",
f"/snap/bin/gsutil cp gs://{bucket}/{gcs_folder}/vm_scripts/* .",
"rm -rf input_data",
Expand All @@ -369,12 +406,12 @@

prep_environment = BashOperator(
task_id="prep_environment",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"',
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "{prep_environment_vm_script}"',
)

create_cset_ids = BashOperator(
task_id="create_cset_ids",
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"',
bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {GCP_ZONE} --command "bash run_ids_scripts.sh &> log &"',
inlets=[
BigQueryTable(
project_id=project_id, dataset_id=production_dataset, table_id="sources"
Expand All @@ -398,6 +435,7 @@
project_id=project_id, dataset_id=staging_dataset, table_id="id_mapping"
),
],
retries=0,
)

wait_for_cset_ids = GCSObjectExistenceSensor(
Expand All @@ -407,6 +445,20 @@
deferrable=True,
)

gce_instance_stop = ComputeEngineStopInstanceOperator(
project_id=PROJECT_ID,
zone=GCP_ZONE,
resource_id=gce_resource_id,
task_id=f"stop-{gce_resource_id}",
)

gce_instance_delete = ComputeEngineDeleteInstanceOperator(
task_id=f"delete_{gce_resource_id}",
project_id=PROJECT_ID,
zone=GCP_ZONE,
resource_id=gce_resource_id,
)

# while the carticle ids are updating, run lid on the titles and abstracts
lid_dataflow_options = {
"project": project_id,
Expand Down Expand Up @@ -445,15 +497,6 @@
on_execute_callback=clear_gcs_dir(DATA_BUCKET, f"{tmp_dir}/lid_output/lid"),
)

# turn off the expensive godzilla of article linkage when we're done with it, then import the id mappings and
# lid back into BQ
gce_instance_stop = ComputeEngineStopInstanceOperator(
project_id=project_id,
zone=gce_zone,
resource_id=gce_resource_id,
task_id="stop-" + gce_resource_id,
)

import_id_mapping = GCSToBigQueryOperator(
task_id="import_id_mapping",
bucket=bucket,
Expand Down Expand Up @@ -668,17 +711,23 @@
(
last_combination_query
>> heavy_compute_inputs
>> gce_instance_start
>> gce_instance_create
>> gce_instance_start.as_setup()
>> prep_environment
>> create_cset_ids
>> wait_for_cset_ids
>> gce_instance_stop
>> gce_instance_stop.as_teardown()
>> gce_instance_delete
)

gce_instance_start >> run_lid >> gce_instance_stop
# Ensure that delete doesn't run if we're in the teardown condition so we'll have a chance to review the outputs
wait_for_cset_ids >> gce_instance_delete
gce_instance_start >> gce_instance_stop

gce_instance_create >> run_lid >> gce_instance_delete

(
gce_instance_stop
gce_instance_delete
>> [import_id_mapping, import_lid]
>> start_final_transform_queries
)
Expand Down
2 changes: 1 addition & 1 deletion utils/run_ids_scripts.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cd /mnt/disks/data/run
cd run
gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done
python3 create_merge_ids.py --exact_match_dir exact_matches --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --output_dir new_id_mappings
/snap/bin/gsutil -m cp -r new_id_mappings gs://airflow-data-exchange/article_linkage/tmp/
Expand Down

0 comments on commit 2756b12

Please sign in to comment.