diff --git a/linkage_dag.py b/linkage_dag.py index 6ef5374..9bb800d 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -339,6 +339,7 @@ ) prep_environment_script_sequence = [ + f"/snap/bin/gsutil cp gs://{bucket}/{gcs_folder}/vm_scripts/*.sh .", "cd /mnt/disks/data", "rm -rf run", "mkdir run", @@ -364,7 +365,7 @@ update_simhash_index = BashOperator( task_id="update_simhash_index", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &"', + bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"', ) wait_for_simhash_index = GCSObjectExistenceSensor( @@ -374,20 +375,9 @@ deferrable=True ) - ids_script_sequence = [ - ( - "python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping " - "--merge_file id_mapping.jsonl --current_ids_dir article_pairs" - ), - f"/snap/bin/gsutil -m cp id_mapping.jsonl gs://{bucket}/{gcs_folder}/tmp/", - f"/snap/bin/gsutil -m cp simhash_results/* gs://{bucket}/{gcs_folder}/simhash_results/", - f"/snap/bin/gsutil -m cp new_simhash_indexes/* gs://{bucket}/{gcs_folder}/simhash_indexes/", - ] - ids_vm_script = " && ".join(ids_script_sequence) - create_cset_ids = BashOperator( task_id="create_cset_ids", - bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{ids_vm_script}"', + bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', ) wait_for_cset_ids = GCSObjectExistenceSensor( diff --git a/push_to_airflow.sh b/push_to_airflow.sh index b15fa86..6e5e6ad 100755 --- a/push_to_airflow.sh +++ b/push_to_airflow.sh @@ -9,9 +9,7 @@ gsutil rm -r gs://airflow-data-exchange/article_linkage/schemas/* gsutil cp schemas/* gs://airflow-data-exchange/article_linkage/schemas/ gsutil rm -r gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/article_linkage/* gsutil -m cp schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/article_linkage/ -gsutil cp utils/create_merge_ids.py gs://airflow-data-exchange/article_linkage/vm_scripts/ -gsutil cp utils/run_simhash.py gs://airflow-data-exchange/article_linkage/vm_scripts/ -gsutil cp utils/my_simhash.py gs://airflow-data-exchange/article_linkage/vm_scripts/ +gsutil cp utils/* gs://airflow-data-exchange/article_linkage/vm_scripts/ gsutil cp utils/article_linkage_lid_dataflow_requirements.txt gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/requirements/article_linkage_lid_dataflow_requirements.txt gsutil cp utils/article_linkage_text_clean_requirements.txt gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/requirements/article_linkage_text_clean_requirements.txt gsutil cp utils/clean_corpus.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/linkage_scripts/ diff --git a/sql/arxiv_id_match.sql b/sql/arxiv_id_match.sql index 5291f15..0d7e43b 100644 --- a/sql/arxiv_id_match.sql +++ b/sql/arxiv_id_match.sql @@ -2,13 +2,13 @@ -- on arxiv where possible WITH arxiv_pwc_mapping AS ( SELECT - gcp_cset_arxiv_metadata.arxiv_metadata_latest.id AS id1, - papers_with_code.papers_with_abstracts.paper_url AS id2 + arxiv_metadata_latest.id AS id1, + papers_with_abstracts.paper_url AS id2 FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest INNER JOIN papers_with_code.papers_with_abstracts - ON gcp_cset_arxiv_metadata.arxiv_metadata_latest.id = papers_with_code.papers_with_abstracts.arxiv_id + ON arxiv_metadata_latest.id = papers_with_abstracts.arxiv_id ) SELECT diff --git a/sql/arxiv_metadata.sql b/sql/arxiv_metadata.sql index 5a73675..b27dff8 100644 --- a/sql/arxiv_metadata.sql +++ b/sql/arxiv_metadata.sql @@ -1,13 +1,13 @@ -- get arxiv metadata used for matching SELECT - gcp_cset_arxiv_metadata.arxiv_metadata_latest.id, - gcp_cset_arxiv_metadata.arxiv_metadata_latest.title, - gcp_cset_arxiv_metadata.arxiv_metadata_latest.abstract, - lower(gcp_cset_arxiv_metadata.arxiv_metadata_latest.doi) AS clean_doi, - extract(YEAR FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest.created) AS year, + arxiv_metadata_latest.id, + arxiv_metadata_latest.title, + arxiv_metadata_latest.abstract, + lower(arxiv_metadata_latest.doi) AS clean_doi, + extract(YEAR FROM arxiv_metadata_latest.created) AS year, a.last_names, NULL AS references --noqa: L029 FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest LEFT JOIN {{ staging_dataset }}.arxiv_authors AS a - ON a.id = gcp_cset_arxiv_metadata.arxiv_metadata_latest.id + ON a.id = arxiv_metadata_latest.id diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh index ba7c2b3..3af086a 100644 --- a/utils/run_ids_scripts.sh +++ b/utils/run_ids_scripts.sh @@ -1,3 +1,4 @@ +cd /mnt/disks/data/run python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs /snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/ /snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ diff --git a/utils/run_simhash_scripts.sh b/utils/run_simhash_scripts.sh index f7c0924..78db4a3 100644 --- a/utils/run_simhash_scripts.sh +++ b/utils/run_simhash_scripts.sh @@ -1,3 +1,4 @@ +cd /mnt/disks/data/run python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes cp -r article_pairs usable_ids cp simhash_results/* article_pairs/