diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..619c1b7 --- /dev/null +++ b/.flake8 @@ -0,0 +1,5 @@ +[flake8] +ignore = E203, E266, E501, W503, F403, F401 +max-line-length = 120 +max-complexity = 20 +select = B,C,E,F,W,T4,B9 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..deb47f4 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,75 @@ +name: Python application + +on: [pull_request] + +jobs: + build: + name: tests-pass + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.7 + uses: actions/setup-python@v1 + with: + python-version: 3.7 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + - name: Test with pytest + run: | + coverage run -m pytest tests + coverage xml -o coverage/python.xml + - name: Report python coverage + uses: orgoro/coverage@v3 + with: + coverageFile: coverage/python.xml + token: ${{ secrets.GITHUB_TOKEN }} + # The next few steps only apply if you have javascript files + # - name: Setup node + # uses: actions/setup-node@v3 + # with: + # node-version: '18' + # - name: Test with jest + # shell: bash + # run: | + # npm install + # npm test -- --coverage --coverageReporters="json-summary" --coverageReporters="text" | tee ./coverage.txt + # shell: bash + # - name: Report javascript coverage + # uses: MishaKav/jest-coverage-comment@v1.0.20 + # with: + # title: "JavaScript Coverage" + # summary-title: "Summary" + # coverage-title: "Modified Files" + # github-token: ${{ secrets.GITHUB_TOKEN }} + # report-only-changed-files: true + # coverage-path: ./JS-FOLDER-NAME/coverage.txt + # coverage-summary-path: ./JS-FOLDER-NAME/coverage/coverage-summary.json + # coverage-path-prefix: JS-FOLDER-NAME/src/ + # - name: Build output files + # run: | + # npm run build + # - name: Check links in built files + # id: link_check + # run: | + # find public -name "*.js" -exec grep -Eo "(http|https):\/\/[^]\{\}\"'\\\(\)\> ]+" {} \; | sort -u > linklist.txt + # printf '%s\n%s\n%s\n' "# LinkChecker URL list" "# " "$(cat linklist.txt)" > linklist.txt + # linkchecker linklist.txt --check-extern --ignore-url="https://.*\.fastly\.net/.*" --ignore-url="https://.*\.mapbox\..*" --ignore-url=".*//a\W.*" --ignore-url="http://(a|x|тест)" -o failures > output.txt || true + # cat output.txt + # echo "num_links=$(wc -l < output.txt | sed 's/^ *//g')" >> $GITHUB_OUTPUT + # echo "links<> $GITHUB_OUTPUT + # echo "$(cat output.txt)" >> $GITHUB_OUTPUT + # echo "EOFdelimiter" >> $GITHUB_OUTPUT + # - name: Edit PR comment about link checking + # if: steps.link_check.outputs.num_links > 0 + # uses: thollander/actions-comment-pull-request@v2 + # with: + # message: | + # There are ${{ steps.link_check.outputs.num_links }} broken links. Check the code for these links: + # ${{ steps.link_check.outputs.links }} + # comment_tag: link_check_msg + - name: Run linting + run: | + pre-commit run --all-files diff --git a/.github/workflows/rebase-reminder.yml b/.github/workflows/rebase-reminder.yml new file mode 100644 index 0000000..0c30392 --- /dev/null +++ b/.github/workflows/rebase-reminder.yml @@ -0,0 +1,42 @@ +name: Rebase reminder +on: [pull_request, pull_request_review] + +jobs: + build: + name: rebuild-reminder + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Find behind count + id: behind_count + run: | + echo "behind_count=$(git rev-list --count ${{ github.event.pull_request.head.sha }}..${{ github.event.pull_request.base.sha }})" >> $GITHUB_OUTPUT + - name: Find ahead count + id: ahead_count + run: | + echo "ahead_count=$(git rev-list --count ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }})" >> $GITHUB_OUTPUT + - name: Find combined count + id: combined_count + run: | + echo "combined_count=$(expr ${{steps.behind_count.outputs.behind_count}} + ${{steps.ahead_count.outputs.ahead_count}})" >> $GITHUB_OUTPUT + - name: Edit PR comment - rebasing + if: steps.behind_count.outputs.behind_count > 0 && steps.combined_count.outputs.combined_count > 3 + uses: thollander/actions-comment-pull-request@v1 + with: + message: | + Needs rebasing :bangbang: + behind_count is ${{ steps.behind_count.outputs.behind_count }} + ahead_count is ${{ steps.ahead_count.outputs.ahead_count }} + comment_includes: 'rebasing' + - name: Edit PR comment - no rebasing + if: steps.behind_count.outputs.behind_count == 0 || steps.combined_count.outputs.combined_count <= 3 + uses: thollander/actions-comment-pull-request@v1 + with: + message: | + No need for rebasing :+1: + behind_count is ${{ steps.behind_count.outputs.behind_count }} + ahead_count is ${{ steps.ahead_count.outputs.ahead_count }} + comment_includes: 'rebasing' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..92379f6 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,37 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + exclude: "__snapshots__" + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - id: check-json +# The next step only applies if you have javascript files. +# There should be a package.json that installs eslint +# (or eslint-config-react-app if you are using gatsby). +# - repo: https://github.com/pre-commit/mirrors-eslint +# rev: v8.24.0 +# hooks: +# - id: eslint +- repo: https://github.com/PyCQA/isort + rev: 5.11.5 + hooks: + - id: isort +- repo: https://github.com/ambv/black + rev: 22.3.0 + hooks: + - id: black + language_version: python3 +- repo: https://github.com/PyCQA/flake8 + rev: 4.0.1 + hooks: + - id: flake8 +- repo: https://github.com/sqlfluff/sqlfluff + rev: 0.10.1 + hooks: + - id: sqlfluff-lint + - id: sqlfluff-fix diff --git a/.sqlfluff b/.sqlfluff new file mode 100644 index 0000000..7897d10 --- /dev/null +++ b/.sqlfluff @@ -0,0 +1,19 @@ +[sqlfluff] +dialect=bigquery +indent_unit = space +exclude_rules = L014,L018,L027,L032,L034,L042,L044,L031 + +[sqlfluff:rules] +max_line_length = 120 +tab_space_size = 2 + +[sqlfluff:rules:L010] +capitalisation_policy = upper + +[sqlfluff:rules:L030] +extended_capitalisation_policy = upper + +[sqlfluff:templater:jinja:context] +staging_dataset = staging_literature +production_dataset = literature +params = {"strong": "title", "other": "year", "additional_checks": "", "tables": "staging_literature.table"} diff --git a/.sqlfluffignore b/.sqlfluffignore new file mode 100644 index 0000000..4ed18f1 --- /dev/null +++ b/.sqlfluffignore @@ -0,0 +1,2 @@ +evaluation/ +tests/ diff --git a/README.md b/README.md index cdaf0f5..7625da5 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,15 @@ # Article Linking ![Python application](https://github.com/georgetown-cset/article-linking/workflows/Python%20application/badge.svg) -This repository contains a description and supporting code for CSET's current method of -cross-dataset article linking. Note that we use "article" very loosely, although in a way that to our knowledge +This repository contains a description and supporting code for CSET's current method of +cross-dataset article linking. Note that we use "article" very loosely, although in a way that to our knowledge is fairly consistent across corpora. Books, for example, are included. -For each article in arXiv, WOS, Papers With Code, Semantic Scholar, The Lens, and OpenAlex +For each article in arXiv, WOS, Papers With Code, Semantic Scholar, The Lens, and OpenAlex we normalized titles, abstracts, and author last names. For the purpose of matching, we filtered out -titles, abstracts, and DOIs that occurred more than 10 times in the corpus. We then considered each group of articles +titles, abstracts, and DOIs that occurred more than 10 times in the corpus. We then considered each group of articles within or across datasets that shared at least one of the following (non-null) metadata fields: - + * Normalized title * Normalized abstract * Citations @@ -19,8 +19,8 @@ as well as a match on one additional field above, or on * Publication year * Normalized author last names - -to correspond to one article in the merged dataset. We add to this set "near matches" of the concatenation + +to correspond to one article in the merged dataset. We add to this set "near matches" of the concatenation of the normalized title and abstract within a publication year, which we identify using simhash. To do this, we run the `linkage_dag.py` on airflow. The article linkage runs weekly, triggered by the `scholarly_lit_trigger` dag. @@ -31,15 +31,15 @@ For an English description of what the dag does, see [the documentation](methods We have three tables that are most likely to help you use article linkage. -- `gcp_cset_links_v2.article_links` - For each original ID (e.g., from WoS), gives the corresponding CSET ID. +- `gcp_cset_links_v2.article_links` - For each original ID (e.g., from WoS), gives the corresponding CSET ID. This is a many-to-one mapping. Please update your scripts to use `gcp_cset_links_v2.article_links_with_dataset`, which has an additional column that contains the dataset of the `orig_id`. - `gcp_cset_links_v2.all_metadata_with_cld2_lid` - provides CLD2 LID for the titles and abstracts of each -current version of each article's metadata. You can also use this table to get the metadata used in the +current version of each article's metadata. You can also use this table to get the metadata used in the match for each version of the raw articles. Note that the `id` column is _not_ unique as some corpora like WOS have multiple versions of the metadata for different languages. - `gcp_cset_links_v2.article_merged_metadata` - This maps the CSET `merged_id` to a set of merged metadata. -The merging method takes the maximum value of each metadata field across each matched article, which may not +The merging method takes the maximum value of each metadata field across each matched article, which may not be suitable for your purposes. diff --git a/evaluation/positive_match_no_simhash_for_annotators.sql b/evaluation/positive_match_no_simhash_for_annotators.sql index 269c267..98133f0 100644 --- a/evaluation/positive_match_no_simhash_for_annotators.sql +++ b/evaluation/positive_match_no_simhash_for_annotators.sql @@ -1,11 +1,11 @@ -select +select merged_id, data1.orig_id as orig_id1, data2.orig_id as orig_id2, data1.metadata as metadata1, data2.metadata as metadata2 from article_links_v3_eval.positive_match_no_simhash as data1 -inner join +inner join article_links_v3_eval.positive_match_no_simhash as data2 using(merged_id) -where data1.orig_id > data2.orig_id -- avoid having annotators annotate twice \ No newline at end of file +where data1.orig_id > data2.orig_id -- avoid having annotators annotate twice diff --git a/linkage_dag.py b/linkage_dag.py index 41700ef..035c1d7 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -1,41 +1,66 @@ import json import os +from datetime import datetime from airflow import DAG -from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator, BigQueryCheckOperator -from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator -from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator -from airflow.providers.google.cloud.operators.compute import ComputeEngineStartInstanceOperator, ComputeEngineStopInstanceOperator -from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator -from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator -from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator -from datetime import datetime - +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCheckOperator, + BigQueryInsertJobOperator, +) +from airflow.providers.google.cloud.operators.compute import ( + ComputeEngineStartInstanceOperator, + ComputeEngineStopInstanceOperator, +) +from airflow.providers.google.cloud.operators.dataflow import ( + DataflowCreatePythonJobOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator +from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor +from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import ( + BigQueryToBigQueryOperator, +) +from airflow.providers.google.cloud.transfers.bigquery_to_gcs import ( + BigQueryToGCSOperator, +) +from airflow.providers.google.cloud.transfers.gcs_to_bigquery import ( + GCSToBigQueryOperator, +) +from dataloader.airflow_utils.defaults import ( + DAGS_DIR, + DATA_BUCKET, + GCP_ZONE, + PROJECT_ID, + get_default_args, + get_post_success, +) from dataloader.scripts.populate_documentation import update_table_descriptions -from dataloader.airflow_utils.defaults import DATA_BUCKET, PROJECT_ID, GCP_ZONE, \ - DAGS_DIR, get_default_args, get_post_success - production_dataset = "literature" staging_dataset = f"staging_{production_dataset}" - -with DAG("article_linkage_updater", - default_args=get_default_args(), - description="Links articles across our scholarly lit holdings.", - schedule_interval=None, - user_defined_macros = {"staging_dataset": staging_dataset, "production_dataset": production_dataset} - ) as dag: +args = get_default_args(pocs=["Jennifer"]) +args["retries"] = 1 + +with DAG( + "article_linkage_updater", + default_args=args, + description="Links articles across our scholarly lit holdings.", + schedule_interval=None, + user_defined_macros={ + "staging_dataset": staging_dataset, + "production_dataset": production_dataset, + }, +) as dag: bucket = DATA_BUCKET gcs_folder = "article_linkage" tmp_dir = f"{gcs_folder}/tmp" raw_data_dir = f"{gcs_folder}/data" schema_dir = f"{gcs_folder}/schemas" sql_dir = f"sql/{gcs_folder}" - backup_dataset = production_dataset+"_backups" + backup_dataset = production_dataset + "_backups" project_id = PROJECT_ID gce_zone = GCP_ZONE gce_resource_id = "godzilla-of-article-linkage" @@ -43,9 +68,7 @@ # We keep several intermediate outputs in a tmp dir on gcs, so clean it out at the start of each run. We clean at # the start of the run so if the run fails we can examine the failed data clear_tmp_dir = GCSDeleteObjectsOperator( - task_id="clear_tmp_gcs_dir", - bucket_name=bucket, - prefix=tmp_dir + "/" + task_id="clear_tmp_gcs_dir", bucket_name=bucket, prefix=tmp_dir + "/" ) # Next, we'll run a different set of queries for each dataset to convert the metadata we use in the match to a @@ -54,27 +77,35 @@ metadata_sequences_end = [] for dataset in ["arxiv", "wos", "papers_with_code", "openalex", "s2", "lens"]: ds_commands = [] - query_list = [t.strip() for t in open(f"{DAGS_DIR}/sequences/" - f"{gcs_folder}/generate_{dataset}_metadata.tsv")] + query_list = [ + t.strip() + for t in open( + f"{DAGS_DIR}/sequences/" f"{gcs_folder}/generate_{dataset}_metadata.tsv" + ) + ] # run the queries needed to generate the metadata tables for query_name in query_list: - ds_commands.append(BigQueryInsertJobOperator( - task_id=query_name, - configuration={ - "query": { - "query": "{% include '" + f"{sql_dir}/{query_name}.sql" + "' %}", - "useLegacySql": False, - "destinationTable": { - "projectId": project_id, - "datasetId": staging_dataset, - "tableId": query_name - }, - "allowLargeResults": True, - "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" - } - }, - )) + ds_commands.append( + BigQueryInsertJobOperator( + task_id=query_name, + configuration={ + "query": { + "query": "{% include '" + + f"{sql_dir}/{query_name}.sql" + + "' %}", + "useLegacySql": False, + "destinationTable": { + "projectId": project_id, + "datasetId": staging_dataset, + "tableId": query_name, + }, + "allowLargeResults": True, + "createDisposition": "CREATE_IF_NEEDED", + "writeDisposition": "WRITE_TRUNCATE", + } + }, + ) + ) start = ds_commands[0] curr = ds_commands[0] for c in ds_commands[1:]: @@ -93,20 +124,22 @@ "destinationTable": { "projectId": project_id, "datasetId": staging_dataset, - "tableId": "union_ids" + "tableId": "union_ids", }, "allowLargeResults": True, "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" + "writeDisposition": "WRITE_TRUNCATE", } }, ) check_unique_input_ids = BigQueryCheckOperator( - task_id="check_unique_input_ids", - sql=(f"select count(distinct(id)) = count(id) from {staging_dataset}.union_ids"), - use_legacy_sql=False - ) + task_id="check_unique_input_ids", + sql=( + f"select count(distinct(id)) = count(id) from {staging_dataset}.union_ids" + ), + use_legacy_sql=False, + ) # We now take the union of all the metadata and export it to GCS for normalization via Dataflow. We then run # the Dataflow job, and import the outputs back into BQ @@ -119,11 +152,11 @@ "destinationTable": { "projectId": project_id, "datasetId": staging_dataset, - "tableId": "union_metadata" + "tableId": "union_metadata", }, "allowLargeResults": True, "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" + "writeDisposition": "WRITE_TRUNCATE", } }, ) @@ -132,7 +165,7 @@ task_id="export_metadata", source_project_dataset_table=f"{staging_dataset}.union_metadata", destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/union_meta/union*.jsonl", - export_format="NEWLINE_DELIMITED_JSON" + export_format="NEWLINE_DELIMITED_JSON", ) dataflow_options = { @@ -143,7 +176,7 @@ "region": "us-east1", "temp_location": f"gs://{bucket}/{tmp_dir}/clean_dataflow", "save_main_session": True, - "requirements_file": f"{DAGS_DIR}/requirements/article_linkage_text_clean_requirements.txt" + "requirements_file": f"{DAGS_DIR}/requirements/article_linkage_text_clean_requirements.txt", } clean_corpus = DataflowCreatePythonJobOperator( py_file=f"{DAGS_DIR}/linkage_scripts/clean_corpus.py", @@ -154,7 +187,7 @@ "input_dir": f"gs://{bucket}/{tmp_dir}/union_meta/union*", "output_dir": f"gs://{bucket}/{tmp_dir}/cleaned_meta/clean", "fields_to_clean": "title,abstract,last_names", - "region": "us-east1" + "region": "us-east1", }, ) @@ -166,14 +199,16 @@ destination_project_dataset_table=f"{staging_dataset}.all_metadata_norm", source_format="NEWLINE_DELIMITED_JSON", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) filter_norm_metadata = BigQueryInsertJobOperator( task_id="filter_norm_metadata", configuration={ "query": { - "query": "{% include '" + f"{sql_dir}/all_metadata_norm_filt.sql" + "' %}", + "query": "{% include '" + + f"{sql_dir}/all_metadata_norm_filt.sql" + + "' %}", "useLegacySql": False, "destinationTable": { "projectId": project_id, @@ -194,7 +229,7 @@ combine_queries = [] combine_tables = [] for strong in strong_indicators: - for other in strong_indicators+weak_indicators: + for other in strong_indicators + weak_indicators: if strong == other: continue table_name = f"{strong}_{other}" @@ -203,36 +238,49 @@ if other != "year": additional_checks += f' and (a.{other} != "")' if "references" in [strong, other]: - additional_checks += f' and array_length(split(a.references, ",")) > 2' - combine_queries.append(BigQueryInsertJobOperator( - task_id=table_name, - configuration={ - "query": { - "query": "{% include '" + f"{sql_dir}/match_template.sql" + "' %}", - "useLegacySql": False, - "destinationTable": { - "projectId": project_id, - "datasetId": staging_dataset, - "tableId": table_name - }, - "allowLargeResults": True, - "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" - } - }, - params={ - "strong": strong, - "other": other, - "additional_checks": additional_checks - } - )) + additional_checks += ' and array_length(split(a.references, ",")) > 2' + combine_queries.append( + BigQueryInsertJobOperator( + task_id=table_name, + configuration={ + "query": { + "query": "{% include '" + + f"{sql_dir}/match_template.sql" + + "' %}", + "useLegacySql": False, + "destinationTable": { + "projectId": project_id, + "datasetId": staging_dataset, + "tableId": table_name, + }, + "allowLargeResults": True, + "createDisposition": "CREATE_IF_NEEDED", + "writeDisposition": "WRITE_TRUNCATE", + } + }, + params={ + "strong": strong, + "other": other, + "additional_checks": additional_checks, + }, + ) + ) wait_for_combine = DummyOperator(task_id="wait_for_combine") - merge_combine_query_list = [t.strip() for t in open(f"{DAGS_DIR}/sequences/" - f"{gcs_folder}/merge_combined_metadata.tsv")] + merge_combine_query_list = [ + t.strip() + for t in open( + f"{DAGS_DIR}/sequences/" f"{gcs_folder}/merge_combined_metadata.tsv" + ) + ] last_combination_query = wait_for_combine - meta_match_queries = "\nunion all\n".join([f"select all1_id, all2_id from {staging_dataset}.{table}\nunion all\nselect all2_id as all1_id, all1_id as all2_id from {staging_dataset}.{table}" for table in combine_tables]) + meta_match_queries = "\nunion all\n".join( + [ + f"select all1_id, all2_id from {staging_dataset}.{table}\nunion all\nselect all2_id as all1_id, all1_id as all2_id from {staging_dataset}.{table}" + for table in combine_tables + ] + ) for query_name in merge_combine_query_list: next = BigQueryInsertJobOperator( task_id=query_name, @@ -243,14 +291,14 @@ "destinationTable": { "projectId": project_id, "datasetId": staging_dataset, - "tableId": query_name + "tableId": query_name, }, "allowLargeResults": True, "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" + "writeDisposition": "WRITE_TRUNCATE", } }, - params={"tables": meta_match_queries} + params={"tables": meta_match_queries}, ) last_combination_query >> next last_combination_query = next @@ -261,26 +309,38 @@ task_id="export_old_cset_ids", source_project_dataset_table=f"{production_dataset}.sources", destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/prev_id_mapping/prev_id_mapping*.jsonl", - export_format="NEWLINE_DELIMITED_JSON" + export_format="NEWLINE_DELIMITED_JSON", ), BigQueryToGCSOperator( task_id="export_article_pairs", source_project_dataset_table=f"{staging_dataset}.all_match_pairs_with_um", destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/article_pairs/article_pairs*.jsonl", - export_format="NEWLINE_DELIMITED_JSON" + export_format="NEWLINE_DELIMITED_JSON", ), BigQueryToGCSOperator( task_id="export_simhash_input", source_project_dataset_table=f"{staging_dataset}.simhash_input", destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/simhash_input/simhash_input*.jsonl", - export_format="NEWLINE_DELIMITED_JSON" + export_format="NEWLINE_DELIMITED_JSON", ), BigQueryToGCSOperator( task_id="export_lid_input", source_project_dataset_table=f"{staging_dataset}.lid_input", destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/lid_input/lid_input*.jsonl", - export_format="NEWLINE_DELIMITED_JSON" - ) + export_format="NEWLINE_DELIMITED_JSON", + ), + BigQueryToGCSOperator( + task_id="export_unlink", + source_project_dataset_table=f"{staging_dataset}.unlink", + destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/unlink/data*.jsonl", + export_format="NEWLINE_DELIMITED_JSON", + ), + BigQueryToGCSOperator( + task_id="export_ids_to_drop", + source_project_dataset_table=f"{staging_dataset}.ids_to_drop", + destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/ids_to_drop/data*.jsonl", + export_format="NEWLINE_DELIMITED_JSON", + ), ] # Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the @@ -289,10 +349,11 @@ project_id=project_id, zone=gce_zone, resource_id=gce_resource_id, - task_id="start-"+gce_resource_id + task_id="start-" + gce_resource_id, ) - vm_script_sequence = [ + prep_environment_script_sequence = [ + f"/snap/bin/gsutil cp gs://{bucket}/{gcs_folder}/vm_scripts/*.sh .", "cd /mnt/disks/data", "rm -rf run", "mkdir run", @@ -306,23 +367,40 @@ f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/simhash_input .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .", + f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .", + f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .", "mkdir new_simhash_indexes", - ("python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes " - "--new_simhash_indexes new_simhash_indexes"), - "cp -r article_pairs usable_ids", - "cp simhash_results/* article_pairs/", - ("python3 create_merge_ids.py --match_dir usable_ids --prev_id_mapping_dir prev_id_mapping " - "--merge_file id_mapping.jsonl --current_ids_dir article_pairs"), - f"/snap/bin/gsutil -m cp id_mapping.jsonl gs://{bucket}/{gcs_folder}/tmp/", - f"/snap/bin/gsutil -m cp simhash_results/* gs://{bucket}/{gcs_folder}/simhash_results/", - f"/snap/bin/gsutil -m cp new_simhash_indexes/* gs://{bucket}/{gcs_folder}/simhash_indexes/" ] - vm_script = " && ".join(vm_script_sequence) + prep_environment_vm_script = " && ".join(prep_environment_script_sequence) + + prep_environment = BashOperator( + task_id="prep_environment", + bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "{prep_environment_vm_script}"', + ) + + update_simhash_index = BashOperator( + task_id="update_simhash_index", + bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_simhash_scripts.sh &> log &"', + ) + + wait_for_simhash_index = GCSObjectExistenceSensor( + task_id="wait_for_simhash_index", + bucket=DATA_BUCKET, + object=f"{tmp_dir}/done_files/simhash_is_done", + deferrable=True, + ) create_cset_ids = BashOperator( task_id="create_cset_ids", - bash_command=f"gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command \"{vm_script}\"" + bash_command=f'gcloud compute ssh jm3312@{gce_resource_id} --zone {gce_zone} --command "bash run_ids_scripts.sh &> log &"', + ) + + wait_for_cset_ids = GCSObjectExistenceSensor( + task_id="wait_for_cset_ids", + bucket=DATA_BUCKET, + object=f"{tmp_dir}/done_files/ids_are_done", + deferrable=True, ) # while the carticle ids are updating, run lid on the titles and abstracts @@ -334,7 +412,7 @@ "region": "us-east1", "temp_location": f"gs://{bucket}/{tmp_dir}/run_lid", "save_main_session": True, - "requirements_file": f"{DAGS_DIR}/requirements/article_linkage_lid_dataflow_requirements.txt" + "requirements_file": f"{DAGS_DIR}/requirements/article_linkage_lid_dataflow_requirements.txt", } run_lid = DataflowCreatePythonJobOperator( py_file=f"{DAGS_DIR}/linkage_scripts/run_lid.py", @@ -345,7 +423,7 @@ "input_dir": f"gs://{bucket}/{tmp_dir}/lid_input/lid_input*", "output_dir": f"gs://{bucket}/{tmp_dir}/lid_output/lid", "fields_to_lid": "title,abstract", - "region": "us-east1" + "region": "us-east1", }, ) @@ -355,7 +433,7 @@ project_id=project_id, zone=gce_zone, resource_id=gce_resource_id, - task_id="stop-"+gce_resource_id + task_id="stop-" + gce_resource_id, ) import_id_mapping = GCSToBigQueryOperator( @@ -366,7 +444,7 @@ destination_project_dataset_table=f"{staging_dataset}.id_mapping", source_format="NEWLINE_DELIMITED_JSON", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) import_lid = GCSToBigQueryOperator( @@ -377,14 +455,18 @@ destination_project_dataset_table=f"{staging_dataset}.all_metadata_with_cld2_lid", source_format="NEWLINE_DELIMITED_JSON", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) # generate the rest of the tables that will be copied to the production dataset start_final_transform_queries = DummyOperator(task_id="start_final_transform") - final_transform_queries = [t.strip() for t in open(f"{DAGS_DIR}/sequences/" - f"{gcs_folder}/generate_merged_metadata.tsv")] + final_transform_queries = [ + t.strip() + for t in open( + f"{DAGS_DIR}/sequences/" f"{gcs_folder}/generate_merged_metadata.tsv" + ) + ] last_transform_query = start_final_transform_queries for query_name in final_transform_queries: next = BigQueryInsertJobOperator( @@ -396,11 +478,11 @@ "destinationTable": { "projectId": project_id, "datasetId": staging_dataset, - "tableId": query_name + "tableId": query_name, }, "allowLargeResults": True, "createDisposition": "CREATE_IF_NEEDED", - "writeDisposition": "WRITE_TRUNCATE" + "writeDisposition": "WRITE_TRUNCATE", } }, ) @@ -414,30 +496,43 @@ staging_tables = ["sources", "references", all_metadata_table] production_tables = ["sources", "references"] for table_name in staging_tables: - compare_table_name = table_name if table_name != all_metadata_table else all_metadata_table+"_last_run" - compare_dataset = production_dataset if table_name != all_metadata_table else staging_dataset - check_queries.append(BigQueryCheckOperator( - task_id="check_monotonic_increase_"+table_name.lower(), - sql=(f"select (select count(0) from {staging_dataset}.{table_name}) >= " - f"(select 0.8*count(0) from {compare_dataset}.{compare_table_name})"), - use_legacy_sql=False - )) - - check_queries.extend([ - BigQueryCheckOperator( - task_id="check_pks_are_unique_sources", - sql=f"select count(orig_id) = count(distinct(orig_id)) from {staging_dataset}.sources", - use_legacy_sql=False - ), - BigQueryCheckOperator( - task_id="all_ids_survived", - sql=(f"select count(0) = 0 from (select id from {staging_dataset}.union_ids " - f"where id not in (select orig_id from {staging_dataset}.sources))"), - use_legacy_sql=False - ), - BigQueryCheckOperator( - task_id="all_trivial_matches_survived", - sql=f""" + compare_table_name = ( + table_name + if table_name != all_metadata_table + else all_metadata_table + "_last_run" + ) + compare_dataset = ( + production_dataset if table_name != all_metadata_table else staging_dataset + ) + check_queries.append( + BigQueryCheckOperator( + task_id="check_monotonic_increase_" + table_name.lower(), + sql=( + f"select (select count(0) from {staging_dataset}.{table_name}) >= " + f"(select 0.8*count(0) from {compare_dataset}.{compare_table_name})" + ), + use_legacy_sql=False, + ) + ) + + check_queries.extend( + [ + BigQueryCheckOperator( + task_id="check_pks_are_unique_sources", + sql=f"select count(orig_id) = count(distinct(orig_id)) from {staging_dataset}.sources", + use_legacy_sql=False, + ), + BigQueryCheckOperator( + task_id="all_ids_survived", + sql=( + f"select count(0) = 0 from (select id from {staging_dataset}.union_ids " + f"where id not in (select orig_id from {staging_dataset}.sources))" + ), + use_legacy_sql=False, + ), + BigQueryCheckOperator( + task_id="all_trivial_matches_survived", + sql=f""" select count(concat(all1_id, " ", all2_id)) = 0 from @@ -445,65 +540,74 @@ where concat(all1_id, " ", all2_id) not in ( select concat(links1.orig_id, " ", links2.orig_id) - from + from {staging_dataset}.sources links1 left join {staging_dataset}.sources links2 on links1.merged_id = links2.merged_id ) """, - use_legacy_sql=False - ), - BigQueryCheckOperator( - task_id="no_null_references", - sql=f"select count(0) = 0 from {staging_dataset}.references where merged_id is null or ref_id is null", - use_legacy_sql = False - ), - BigQueryCheckOperator( - task_id="no_null_datasets", - sql=f"select count(0) = 0 from {staging_dataset}.sources where dataset is null", - use_legacy_sql=False - ), - ]) + use_legacy_sql=False, + ), + BigQueryCheckOperator( + task_id="no_null_references", + sql=f"select count(0) = 0 from {staging_dataset}.references where merged_id is null or ref_id is null", + use_legacy_sql=False, + ), + BigQueryCheckOperator( + task_id="no_null_datasets", + sql=f"select count(0) = 0 from {staging_dataset}.sources where dataset is null", + use_legacy_sql=False, + ), + ] + ) # We're done! Checks passed, so copy to production and post success to slack start_production_cp = DummyOperator(task_id="start_production_cp") success_alert = get_post_success("Article linkage update succeeded!", dag) curr_date = datetime.now().strftime("%Y%m%d") - with open(f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json") as f: + with open( + f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/table_descriptions.json" + ) as f: table_desc = json.loads(f.read()) trigger_org_er_and_metadata_merge = TriggerDagRunOperator( task_id="trigger_org_er_and_metadata_merge", - trigger_dag_id="org_er_and_metadata_merge" + trigger_dag_id="org_er_and_metadata_merge", ) for table in production_tables: push_to_production = BigQueryToBigQueryOperator( - task_id="copy_"+table.lower(), + task_id="copy_" + table.lower(), source_project_dataset_tables=[f"{staging_dataset}.{table}"], destination_project_dataset_table=f"{production_dataset}.{table}", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) snapshot = BigQueryToBigQueryOperator( task_id=f"snapshot_{table}", source_project_dataset_tables=[f"{production_dataset}.{table}"], destination_project_dataset_table=f"{backup_dataset}.{table}_{curr_date}", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) pop_descriptions = PythonOperator( task_id="populate_column_documentation_for_" + table, op_kwargs={ "input_schema": f"{os.environ.get('DAGS_FOLDER')}/schemas/{gcs_folder}/{table}.json", "table_name": f"{production_dataset}.{table}", - "table_description": table_desc[table] + "table_description": table_desc[table], }, - python_callable=update_table_descriptions + python_callable=update_table_descriptions, + ) + ( + start_production_cp + >> push_to_production + >> snapshot + >> pop_descriptions + >> success_alert + >> trigger_org_er_and_metadata_merge ) - (start_production_cp >> push_to_production >> snapshot >> pop_descriptions >> success_alert >> - trigger_org_er_and_metadata_merge) # We don't show the "all metadata" table in the production dataset, but we do need to # be able to diff the current data from the data used in the last run in simhash_input @@ -512,7 +616,7 @@ source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], destination_project_dataset_table=f"{staging_dataset}.{all_metadata_table}_last_run", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) snapshot_cld2 = BigQueryToBigQueryOperator( @@ -520,16 +624,43 @@ source_project_dataset_tables=[f"{staging_dataset}.{all_metadata_table}"], destination_project_dataset_table=f"{backup_dataset}.{all_metadata_table}_{curr_date}", create_disposition="CREATE_IF_NEEDED", - write_disposition="WRITE_TRUNCATE" + write_disposition="WRITE_TRUNCATE", ) start_production_cp >> copy_cld2 >> snapshot_cld2 >> success_alert # task structure clear_tmp_dir >> metadata_sequences_start - (metadata_sequences_end >> union_ids >> check_unique_input_ids >> union_metadata >> export_metadata >> - clean_corpus >> import_clean_metadata >> filter_norm_metadata >> combine_queries >> wait_for_combine) + ( + metadata_sequences_end + >> union_ids + >> check_unique_input_ids + >> union_metadata + >> export_metadata + >> clean_corpus + >> import_clean_metadata + >> filter_norm_metadata + >> combine_queries + >> wait_for_combine + ) - (last_combination_query >> heavy_compute_inputs >> gce_instance_start >> [create_cset_ids, run_lid] >> - gce_instance_stop >> [import_id_mapping, import_lid] >> start_final_transform_queries) + ( + last_combination_query + >> heavy_compute_inputs + >> gce_instance_start + >> prep_environment + >> update_simhash_index + >> wait_for_simhash_index + >> create_cset_ids + >> wait_for_cset_ids + >> gce_instance_stop + ) + + gce_instance_start >> run_lid >> gce_instance_stop + + ( + gce_instance_stop + >> [import_id_mapping, import_lid] + >> start_final_transform_queries + ) last_transform_query >> check_queries >> start_production_cp diff --git a/metadata_merge_trigger.py b/metadata_merge_trigger.py index 4eb2d52..feb12c2 100644 --- a/metadata_merge_trigger.py +++ b/metadata_merge_trigger.py @@ -1,51 +1,42 @@ +from datetime import datetime, timedelta + from airflow import DAG -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.dummy import DummyOperator -from datetime import timedelta, datetime - -from dataloader.airflow_utils.slack import task_fail_slack_alert - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2022, 3, 5), - "email": ["jennifer.melot@georgetown.edu"], - "email_on_failure": True, - "email_on_retry": True, - "retries": 0, - "retry_delay": timedelta(minutes=5), - "on_failure_callback": task_fail_slack_alert -} +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from dataloader.airflow_utils.defaults import get_default_args -with DAG("org_er_and_metadata_merge", - default_args=default_args, - description="Triggers Org ER and metadata merge dags", - schedule_interval=None, - catchup=False - ) as dag: +with DAG( + "org_er_and_metadata_merge", + default_args=get_default_args(pocs=["Jennifer"]), + description="Triggers Org ER and metadata merge dags", + schedule_interval=None, + catchup=False, +) as dag: trigger_orgfixes1 = TriggerDagRunOperator( task_id="trigger_orgfixes1", trigger_dag_id="org_fixes", - wait_for_completion=True + wait_for_completion=True, ) trigger_bulk_org_er_updater = TriggerDagRunOperator( task_id="trigger_bulk_org_er_updater", trigger_dag_id="bulk_org_er_updater", - wait_for_completion=True + wait_for_completion=True, ) trigger_orgfixes2 = TriggerDagRunOperator( task_id="trigger_orgfixes2", trigger_dag_id="org_fixes", - wait_for_completion=True + wait_for_completion=True, ) trigger_merge = TriggerDagRunOperator( task_id="trigger_merged_article_metadata_updater", trigger_dag_id="merged_article_metadata_updater", - wait_for_completion=True + wait_for_completion=True, ) - trigger_orgfixes1 >> trigger_bulk_org_er_updater >> trigger_orgfixes2 >> trigger_merge - - + ( + trigger_orgfixes1 + >> trigger_bulk_org_er_updater + >> trigger_orgfixes2 + >> trigger_merge + ) diff --git a/methods_documentation/overview.md b/methods_documentation/overview.md index 61a4385..fa82f34 100644 --- a/methods_documentation/overview.md +++ b/methods_documentation/overview.md @@ -1,4 +1,4 @@ -## Normalizing Article Metadata +## Normalizing Article Metadata We are merging five datasets, all of which are structured differently in our internal database. To match article metadata, we first need to extract the columns from this data that we want to use @@ -11,12 +11,12 @@ titles, abstracts, and pubyear so that a match on any of these combinations will the shared WOS id. Finally, for Semantic Scholar, we exclude any documents that have a non-null publication type that is one of Dataset, Editorial, LettersAndComments, News, or Review. -Having generated the metadata tables, we now need to normalize the metadata. To do this, we use +Having generated the metadata tables, we now need to normalize the metadata. To do this, we use the [clean_corpus](../utils/clean_corpus.py) script, which applies several text normalizations to the data. We "normalize" away even whitespace and punctuation. Having normalized our data, we now need to do within and cross-dataset matches, creating one master table -containing all pairs of matched articles. To do this, we use the series of queries in +containing all pairs of matched articles. To do this, we use the series of queries in `sequences/combine_metadata.tsv`. For two articles A and B to be considered a match, we require that they have a non-null match on at least one of: @@ -35,28 +35,28 @@ We then add back in any articles that didn't match anything else, and combine th will be passed to LID and to the simhash and article id assignment code. For LID, we run the CLD2 library on all titles and abstracts using the beam script in `utils/run_lid.py`, taking -the first language in the output. Note that this can result in the same article being assigned multiple +the first language in the output. Note that this can result in the same article being assigned multiple languages, since some articles have multiple versions of metadata. #### Merged article ID assignment To merge articles, we first need to apply one more matching method, which is based on simhash. On each update -of the data, we update a set of saved simhash indexes (one for each year of the data) that cover all articles -we have seen on previous runs of the code. We update these indexes with new articles, and then find similar -articles within the updated indexes. +of the data, we update a set of saved simhash indexes (one for each year of the data) that cover all articles +we have seen on previous runs of the code. We update these indexes with new articles, and then find similar +articles within the updated indexes. Next, we add all the simhash matches as match pairs, and run `utils/create_merge_ids.py`. This script identifies all groups of articles that have been either directly or transitively matched together. We then assign this set -of articles (the "match set") a "carticle" ID. If a match set has exactly one old carticle id previously assigned +of articles (the "match set") a "carticle" ID. If a match set has exactly one old carticle id previously assigned to any of the articles, it keeps that carticle id even if new articles (with no existing carticle id) are added to the match set. Otherwise, the match set gets a new carticle id. -Having obtained the carticle ids, we upload them back to BigQuery, and generate the final output tables, +Having obtained the carticle ids, we upload them back to BigQuery, and generate the final output tables, described in the README. #### Running LID -In parallel with creating the matches, we run the CLD2 library on all titles and abstracts using the beam -script in `utils/run_lid.py`. We take the first language in the output as the language of the whole text. -Note that this can result in the same merged carticle being assigned multiple languages, since some articles +In parallel with creating the matches, we run the CLD2 library on all titles and abstracts using the beam +script in `utils/run_lid.py`. We take the first language in the output as the language of the whole text. +Note that this can result in the same merged carticle being assigned multiple languages, since some articles have multiple versions of metadata. diff --git a/push_to_airflow.sh b/push_to_airflow.sh index 6cbb788..d3bf025 100755 --- a/push_to_airflow.sh +++ b/push_to_airflow.sh @@ -1,18 +1,15 @@ -gsutil cp linkage_dag.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ -gsutil cp scholarly_lit_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ -gsutil cp metadata_merge_trigger.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/ -gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/* -gsutil -m cp sql/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sql/article_linkage/ -gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/article_linkage/* -gsutil -m cp sequences/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/sequences/article_linkage/ +gsutil cp linkage_dag.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/ +gsutil cp metadata_merge_trigger.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/ +gsutil rm -r gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/article_linkage/* +gsutil -m cp sql/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sql/article_linkage/ +gsutil rm -r gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sequences/article_linkage/* +gsutil -m cp sequences/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/sequences/article_linkage/ gsutil rm -r gs://airflow-data-exchange/article_linkage/schemas/* gsutil cp schemas/* gs://airflow-data-exchange/article_linkage/schemas/ -gsutil rm -r gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/article_linkage/* -gsutil -m cp schemas/* gs://us-east1-production2023-cc1-01d75926-bucket/dags/schemas/article_linkage/ -gsutil cp utils/create_merge_ids.py gs://airflow-data-exchange/article_linkage/vm_scripts/ -gsutil cp utils/run_simhash.py gs://airflow-data-exchange/article_linkage/vm_scripts/ -gsutil cp utils/my_simhash.py gs://airflow-data-exchange/article_linkage/vm_scripts/ -gsutil cp utils/article_linkage_lid_dataflow_requirements.txt gs://us-east1-production2023-cc1-01d75926-bucket/dags/requirements/article_linkage_lid_dataflow_requirements.txt -gsutil cp utils/article_linkage_text_clean_requirements.txt gs://us-east1-production2023-cc1-01d75926-bucket/dags/requirements/article_linkage_text_clean_requirements.txt -gsutil cp utils/clean_corpus.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/linkage_scripts/ -gsutil cp utils/run_lid.py gs://us-east1-production2023-cc1-01d75926-bucket/dags/linkage_scripts/ +gsutil rm -r gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/article_linkage/* +gsutil -m cp schemas/* gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/schemas/article_linkage/ +gsutil cp utils/* gs://airflow-data-exchange/article_linkage/vm_scripts/ +gsutil cp utils/article_linkage_lid_dataflow_requirements.txt gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/requirements/article_linkage_lid_dataflow_requirements.txt +gsutil cp utils/article_linkage_text_clean_requirements.txt gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/requirements/article_linkage_text_clean_requirements.txt +gsutil cp utils/clean_corpus.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/linkage_scripts/ +gsutil cp utils/run_lid.py gs://us-east1-production-cc2-202-b42a7a54-bucket/dags/linkage_scripts/ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..43139b7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[tool.black] +py36 = true +include = '\.pyi?$' +exclude = ''' +/( +\.git +| \.venv +| build +| dist +)/ +''' + +[tool.isort] +profile = "black" diff --git a/requirements.txt b/requirements.txt index d3c5899..5d84905 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,3 +45,5 @@ typing==3.7.4.1 typing-extensions==3.7.4.1 wcwidth==0.1.8 zipp==3.0.0 +pre-commit +coverage diff --git a/schemas/all_metadata_norm.json b/schemas/all_metadata_norm.json index b0d35ff..bbc4371 100644 --- a/schemas/all_metadata_norm.json +++ b/schemas/all_metadata_norm.json @@ -1,52 +1,52 @@ [ { - "mode": "NULLABLE", - "name": "id", + "mode": "NULLABLE", + "name": "id", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "title", + "mode": "NULLABLE", + "name": "title", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "abstract", + "mode": "NULLABLE", + "name": "abstract", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "clean_doi", + "mode": "NULLABLE", + "name": "clean_doi", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "year", + "mode": "NULLABLE", + "name": "year", "type": "INTEGER" - }, + }, { - "mode": "REPEATED", - "name": "last_names", + "mode": "REPEATED", + "name": "last_names", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "references", + "mode": "NULLABLE", + "name": "references", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "dataset", + "mode": "NULLABLE", + "name": "dataset", "type": "STRING" }, { - "mode": "NULLABLE", - "name": "title_norm", + "mode": "NULLABLE", + "name": "title_norm", "type": "STRING" - }, + }, { - "mode": "NULLABLE", - "name": "abstract_norm", + "mode": "NULLABLE", + "name": "abstract_norm", "type": "STRING" }, { diff --git a/schemas/all_metadata_with_cld2_lid.json b/schemas/all_metadata_with_cld2_lid.json index 364e650..9a7b333 100644 --- a/schemas/all_metadata_with_cld2_lid.json +++ b/schemas/all_metadata_with_cld2_lid.json @@ -1,97 +1,97 @@ [ { - "mode": "NULLABLE", - "name": "id", + "mode": "NULLABLE", + "name": "id", "type": "STRING", "description": "id of the article from its originating dataset" - }, + }, { - "mode": "NULLABLE", - "name": "title", + "mode": "NULLABLE", + "name": "title", "type": "STRING", "description": "Title of the article" - }, + }, { - "mode": "NULLABLE", - "name": "title_cld2_lid_success", + "mode": "NULLABLE", + "name": "title_cld2_lid_success", "type": "BOOLEAN", "description": "Whether pycld2 LID ran successfully for the title field" }, { - "mode": "NULLABLE", - "name": "title_cld2_lid_is_reliable", + "mode": "NULLABLE", + "name": "title_cld2_lid_is_reliable", "type": "BOOLEAN", "description": "pycld2's assessment of whether its LID output was reliable for the title field" }, { - "mode": "NULLABLE", - "name": "title_cld2_lid_first_result", + "mode": "NULLABLE", + "name": "title_cld2_lid_first_result", "type": "STRING", "description": "The language of the title field's first pycld2 LID result" }, { - "mode": "NULLABLE", - "name": "title_cld2_lid_first_result_short_code", + "mode": "NULLABLE", + "name": "title_cld2_lid_first_result_short_code", "type": "STRING", "description": "The short code of the title field's first pycld2 LID result" }, { - "mode": "NULLABLE", - "name": "abstract", + "mode": "NULLABLE", + "name": "abstract", "type": "STRING", "description": "Abstract of the paper" - }, + }, { - "mode": "NULLABLE", - "name": "abstract_cld2_lid_success", + "mode": "NULLABLE", + "name": "abstract_cld2_lid_success", "type": "BOOLEAN", "description": "Whether pycld2 LID ran successfully for the abstract field" }, { - "mode": "NULLABLE", - "name": "abstract_cld2_lid_is_reliable", + "mode": "NULLABLE", + "name": "abstract_cld2_lid_is_reliable", "type": "BOOLEAN", "description": "pycld2's assessment of whether its LID output was reliable for the abstract field" }, { - "mode": "NULLABLE", - "name": "abstract_cld2_lid_first_result", + "mode": "NULLABLE", + "name": "abstract_cld2_lid_first_result", "type": "STRING", "description": "The language of the abstract field's first pycld2 LID result" }, { - "mode": "NULLABLE", - "name": "abstract_cld2_lid_first_result_short_code", + "mode": "NULLABLE", + "name": "abstract_cld2_lid_first_result_short_code", "type": "STRING", "description": "The short code of the abstract field's first pycld2 LID result" }, { - "mode": "NULLABLE", - "name": "clean_doi", + "mode": "NULLABLE", + "name": "clean_doi", "type": "STRING", "description": "Normalized DOI" - }, + }, { - "mode": "NULLABLE", - "name": "year", + "mode": "NULLABLE", + "name": "year", "type": "INTEGER", "description": "Publication year" - }, + }, { - "mode": "REPEATED", - "name": "last_names", + "mode": "REPEATED", + "name": "last_names", "type": "STRING", "description": "Author last names" - }, + }, { - "mode": "NULLABLE", - "name": "references", + "mode": "NULLABLE", + "name": "references", "type": "STRING", "description": "Ids of papers referenced by this paper" - }, + }, { - "mode": "NULLABLE", - "name": "dataset", + "mode": "NULLABLE", + "name": "dataset", "type": "STRING", "description": "Dataset this paper comes from" } diff --git a/schemas/id_mapping.json b/schemas/id_mapping.json index f1e4bef..d2af472 100644 --- a/schemas/id_mapping.json +++ b/schemas/id_mapping.json @@ -11,4 +11,4 @@ "type": "STRING", "description": "Originating dataset's id for an article" } -] \ No newline at end of file +] diff --git a/schemas/sources.json b/schemas/sources.json index 7fb16d8..a2a09ff 100644 --- a/schemas/sources.json +++ b/schemas/sources.json @@ -17,4 +17,4 @@ "type": "STRING", "description": "Identifier for the dataset of article identified by orig_id" } -] \ No newline at end of file +] diff --git a/schemas/table_descriptions.json b/schemas/table_descriptions.json index 16b2d89..5b2ee2b 100644 --- a/schemas/table_descriptions.json +++ b/schemas/table_descriptions.json @@ -2,4 +2,4 @@ "all_metadata_with_cld2_lid": "All metadata for the articles used in linkage.", "sources": "Maps a merged id to each of its orig_ids and their dataset", "references": "Maps a paper's merged id to the merged ids of papers it references" -} \ No newline at end of file +} diff --git a/scholarly_lit_trigger.py b/scholarly_lit_trigger.py deleted file mode 100644 index 5f9a805..0000000 --- a/scholarly_lit_trigger.py +++ /dev/null @@ -1,42 +0,0 @@ -from airflow import DAG -from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.operators.dummy import DummyOperator -from datetime import timedelta, datetime - -from dataloader.airflow_utils.slack import task_fail_slack_alert - - -default_args = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2022, 3, 5), - "email": ["jennifer.melot@georgetown.edu"], - "email_on_failure": True, - "email_on_retry": True, - "retries": 0, - "retry_delay": timedelta(minutes=5), - "on_failure_callback": task_fail_slack_alert -} - -with DAG("scholarly_lit_trigger", - default_args=default_args, - description="Triggers series of scholarly literature dags", - schedule_interval="0 0 * * 6", - catchup=False - ) as dag: - - start = DummyOperator(task_id="start") - - trigger_linkage = TriggerDagRunOperator( - task_id="trigger_article_linkage_updater", - trigger_dag_id="article_linkage_updater", - wait_for_completion=True - ) - - for prerequisite_dag in ["clarivate_tables_updater", "semantic_scholar_updater"]: - trigger = TriggerDagRunOperator( - task_id="trigger_"+prerequisite_dag, - trigger_dag_id=prerequisite_dag, - wait_for_completion=True - ) - start >> trigger >> trigger_linkage diff --git a/sequences/generate_arxiv_metadata.tsv b/sequences/generate_arxiv_metadata.tsv index 3ef5cc8..dd939e7 100644 --- a/sequences/generate_arxiv_metadata.tsv +++ b/sequences/generate_arxiv_metadata.tsv @@ -1,3 +1,3 @@ arxiv_ids arxiv_authors -arxiv_metadata \ No newline at end of file +arxiv_metadata diff --git a/sequences/generate_lens_metadata.tsv b/sequences/generate_lens_metadata.tsv index c771c39..87adce3 100644 --- a/sequences/generate_lens_metadata.tsv +++ b/sequences/generate_lens_metadata.tsv @@ -1,2 +1,2 @@ lens_ids -lens_metadata \ No newline at end of file +lens_metadata diff --git a/sequences/generate_openalex_metadata.tsv b/sequences/generate_openalex_metadata.tsv index c5229ee..8b2b2b3 100644 --- a/sequences/generate_openalex_metadata.tsv +++ b/sequences/generate_openalex_metadata.tsv @@ -1,2 +1,2 @@ openalex_ids -openalex_metadata \ No newline at end of file +openalex_metadata diff --git a/sequences/generate_papers_with_code_metadata.tsv b/sequences/generate_papers_with_code_metadata.tsv index ef83a52..89a8822 100644 --- a/sequences/generate_papers_with_code_metadata.tsv +++ b/sequences/generate_papers_with_code_metadata.tsv @@ -1,2 +1,2 @@ papers_with_code_ids -papers_with_code_metadata \ No newline at end of file +papers_with_code_metadata diff --git a/sequences/generate_s2_metadata.tsv b/sequences/generate_s2_metadata.tsv index 1204c52..01c3bee 100644 --- a/sequences/generate_s2_metadata.tsv +++ b/sequences/generate_s2_metadata.tsv @@ -1,2 +1,2 @@ s2_ids -s2_metadata \ No newline at end of file +s2_metadata diff --git a/sequences/generate_wos_metadata.tsv b/sequences/generate_wos_metadata.tsv index 3eb533c..f0797b8 100644 --- a/sequences/generate_wos_metadata.tsv +++ b/sequences/generate_wos_metadata.tsv @@ -3,4 +3,4 @@ wos_ids wos_abstracts wos_pubyears wos_titles -wos_metadata \ No newline at end of file +wos_metadata diff --git a/sequences/merge_combined_metadata.tsv b/sequences/merge_combined_metadata.tsv index 76182e8..73b4071 100644 --- a/sequences/merge_combined_metadata.tsv +++ b/sequences/merge_combined_metadata.tsv @@ -2,4 +2,5 @@ arxiv_id_match metadata_match all_match_pairs_with_um simhash_input -lid_input \ No newline at end of file +lid_input +ids_to_drop diff --git a/setup.py b/setup.py index 9857d5b..bb15a2a 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,3 @@ import setuptools -setuptools.setup(packages=setuptools.find_packages()) \ No newline at end of file +setuptools.setup(packages=setuptools.find_packages()) diff --git a/sql/all_match_pairs_with_um.sql b/sql/all_match_pairs_with_um.sql index 6362711..5751552 100644 --- a/sql/all_match_pairs_with_um.sql +++ b/sql/all_match_pairs_with_um.sql @@ -1,112 +1,113 @@ -- add "self matches" for the articles that didn't match anything (this can happen if the article has a lot of null -- fields) to the rest of the article match pairs WITH lens_matches AS ( - (SELECT - lens_id as id1, - CONCAT("https://openalex.org/", id.value) as id2 + (SELECT + lens_id AS id1, + CONCAT("https://openalex.org/", id.value) AS id2 FROM lens.scholarly CROSS JOIN - UNNEST(external_ids) as id + UNNEST(external_ids) AS id WHERE (id.type = "openalex") - AND lens_id in (select id from {{ staging_dataset }}.lens_ids) - ) - UNION ALL - ( + AND lens_id IN (SELECT id FROM {{ staging_dataset }}.lens_ids) + ) + UNION ALL + ( SELECT - CONCAT("https://openalex.org/", id.value) as id1, - lens_id as id2 + CONCAT("https://openalex.org/", id.value) AS id1, + lens_id AS id2 FROM lens.scholarly CROSS JOIN - UNNEST(external_ids) as id + UNNEST(external_ids) AS id WHERE (id.type = "openalex") - AND lens_id in (select id from {{ staging_dataset }}.lens_ids) - ) - UNION ALL - ( + AND lens_id IN (SELECT id FROM {{ staging_dataset }}.lens_ids) + ) + UNION ALL + ( SELECT - lens_id as id1, - alias_lens_id as id2 + lens_id AS id1, + alias_lens_id AS id2 FROM lens.scholarly - CROSS JOIN UNNEST(alias_lens_ids) as alias_lens_id - ) - UNION ALL - ( + CROSS JOIN UNNEST(alias_lens_ids) AS alias_lens_id + ) + UNION ALL + ( SELECT - alias_lens_id as id1, - lens_id as id2, + alias_lens_id AS id1, + lens_id AS id2 FROM lens.scholarly - CROSS JOIN UNNEST(alias_lens_ids) as alias_lens_id - ) - ), - raw_oa_arxiv_matches AS ( - SELECT - id as oa_id, - replace(regexp_replace(open_access.oa_url, r".*(/ftp/arxiv/papers/|/pdf/|/abs/)", ""), ".pdf", "") as arxiv_id - FROM - openalex.works - WHERE - (lower(open_access.oa_url) like "%/arxiv.org%") - ), - oa_arxiv_matches AS ( - SELECT - oa_id as id1, - arxiv_id as id2 - FROM - raw_oa_arxiv_matches - UNION ALL - SELECT - arxiv_id as id1, - oa_id as id2 - FROM - raw_oa_arxiv_matches - ), - pairs AS ( ( + CROSS JOIN UNNEST(alias_lens_ids) AS alias_lens_id + ) +), + +raw_oa_arxiv_matches AS ( + SELECT + id AS oa_id, + REPLACE(REGEXP_REPLACE(open_access.oa_url, r".*(/ftp/arxiv/papers/|/pdf/|/abs/)", ""), ".pdf", "") AS arxiv_id + FROM + openalex.works + WHERE + (LOWER(open_access.oa_url) LIKE "%/arxiv.org%") +), + +oa_arxiv_matches AS ( + SELECT + oa_id AS id1, + arxiv_id AS id2 + FROM + raw_oa_arxiv_matches + UNION ALL + SELECT + arxiv_id AS id1, + oa_id AS id2 + FROM + raw_oa_arxiv_matches +), + +pairs AS ( ( SELECT id AS id1, id AS id2 FROM - {{staging_dataset}}.all_metadata_norm_filt + {{ staging_dataset }}.all_metadata_norm_filt WHERE id NOT IN ( - SELECT - all1_id - FROM - {{staging_dataset}}.metadata_match) + SELECT all1_id + FROM + {{ staging_dataset }}.metadata_match) AND id NOT IN ( - SELECT - id1 - FROM - oa_arxiv_matches) + SELECT id1 + FROM + oa_arxiv_matches) AND id NOT IN ( - SELECT - id1 - FROM - lens_matches)) - UNION ALL ( - SELECT - all1_id AS id1, - all2_id AS id2 - FROM - {{staging_dataset}}.metadata_match) - UNION ALL + SELECT id1 + FROM + lens_matches)) + UNION ALL ( SELECT - id1, - id2 + all1_id AS id1, + all2_id AS id2 FROM - oa_arxiv_matches - UNION ALL - SELECT - id1, - id2 - FROM - lens_matches - ) -SELECT - DISTINCT id1, + {{ staging_dataset }}.metadata_match) + UNION ALL + SELECT + id1, + id2 + FROM + oa_arxiv_matches + UNION ALL + SELECT + id1, + id2 + FROM + lens_matches +) + +SELECT DISTINCT + id1, id2 FROM pairs diff --git a/sql/all_metadata_norm_filt.sql b/sql/all_metadata_norm_filt.sql index dde6e8a..35b7b6d 100644 --- a/sql/all_metadata_norm_filt.sql +++ b/sql/all_metadata_norm_filt.sql @@ -1,25 +1,39 @@ -with meaningfully_titled as ( - select title_norm from (select title_norm, count(distinct(id)) as num_ids from {{ staging_dataset }}.all_metadata_norm group by title_norm) where num_ids < 11 +WITH meaningfully_titled AS ( + SELECT title_norm + FROM (SELECT + title_norm, + count(distinct(id)) AS num_ids + FROM {{ staging_dataset }}.all_metadata_norm GROUP BY title_norm) WHERE num_ids < 11 ), -meaningfully_doied as ( - select clean_doi from (select clean_doi, count(distinct(id)) as num_ids from {{ staging_dataset }}.all_metadata_norm group by clean_doi) where num_ids < 11 +meaningfully_doied AS ( + SELECT clean_doi + FROM (SELECT + clean_doi, + count(distinct(id)) AS num_ids + FROM {{ staging_dataset }}.all_metadata_norm GROUP BY clean_doi) WHERE num_ids < 11 ), -meaningfully_abstracted as ( - select abstract_norm from (select abstract_norm, count(distinct(id)) as num_ids from {{ staging_dataset }}.all_metadata_norm group by abstract_norm) where num_ids < 11 +meaningfully_abstracted AS ( + SELECT abstract_norm + FROM (SELECT + abstract_norm, + count(distinct(id)) AS num_ids + FROM {{ staging_dataset }}.all_metadata_norm GROUP BY abstract_norm) WHERE num_ids < 11 ) -select +SELECT id, title, abstract, - case when clean_doi in (select clean_doi from meaningfully_doied) then clean_doi else null end as clean_doi, + CASE WHEN clean_doi IN (SELECT clean_doi FROM meaningfully_doied) THEN clean_doi END AS clean_doi, year, last_names, references, dataset, - case when title_norm in (select title_norm from meaningfully_titled) then title_norm else null end as title_norm, - case when abstract_norm in (select abstract_norm from meaningfully_abstracted) then abstract_norm else null end as abstract_norm, + CASE WHEN title_norm IN (SELECT title_norm FROM meaningfully_titled) THEN title_norm END AS title_norm, + CASE + WHEN abstract_norm IN (SELECT abstract_norm FROM meaningfully_abstracted) THEN abstract_norm + END AS abstract_norm, last_names_norm -from {{ staging_dataset }}.all_metadata_norm +FROM {{ staging_dataset }}.all_metadata_norm diff --git a/sql/arxiv_authors.sql b/sql/arxiv_authors.sql index 67c2dbd..689aabe 100644 --- a/sql/arxiv_authors.sql +++ b/sql/arxiv_authors.sql @@ -1,5 +1,5 @@ -- get arxiv author last names -select +SELECT id, - ARRAY(select keyname from UNNEST(authors.author)) as last_names -from gcp_cset_arxiv_metadata.arxiv_metadata_latest + ARRAY(SELECT keyname FROM UNNEST(authors.author)) AS last_names +FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest diff --git a/sql/arxiv_id_match.sql b/sql/arxiv_id_match.sql index 838b836..0d7e43b 100644 --- a/sql/arxiv_id_match.sql +++ b/sql/arxiv_id_match.sql @@ -1,16 +1,22 @@ -- papers with code.papers_with_abstracts has an arxiv_id column that is often not null; use this to match -- on arxiv where possible -with arxiv_pwc_mapping as ( - select - arxiv.id as id1, - pwc.paper_url as id2 - from - gcp_cset_arxiv_metadata.arxiv_metadata_latest arxiv - inner join - papers_with_code.papers_with_abstracts pwc - on arxiv.id = pwc.arxiv_id +WITH arxiv_pwc_mapping AS ( + SELECT + arxiv_metadata_latest.id AS id1, + papers_with_abstracts.paper_url AS id2 + FROM + gcp_cset_arxiv_metadata.arxiv_metadata_latest + INNER JOIN + papers_with_code.papers_with_abstracts + ON arxiv_metadata_latest.id = papers_with_abstracts.arxiv_id ) -select id1 as all1_id, id2 as all2_id from arxiv_pwc_mapping -union all -select id2 as all1_id, id1 as all2_id from arxiv_pwc_mapping \ No newline at end of file +SELECT + id1 AS all1_id, + id2 AS all2_id +FROM arxiv_pwc_mapping +UNION ALL +SELECT + id2 AS all1_id, + id1 AS all2_id +FROM arxiv_pwc_mapping diff --git a/sql/arxiv_ids.sql b/sql/arxiv_ids.sql index 1f32f7b..a19f0bc 100644 --- a/sql/arxiv_ids.sql +++ b/sql/arxiv_ids.sql @@ -1,2 +1,2 @@ -- get arxiv article ids (used in validation) -select distinct(id) from gcp_cset_arxiv_metadata.arxiv_metadata_latest \ No newline at end of file +SELECT DISTINCT id FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest diff --git a/sql/arxiv_metadata.sql b/sql/arxiv_metadata.sql index 2a84a3f..b27dff8 100644 --- a/sql/arxiv_metadata.sql +++ b/sql/arxiv_metadata.sql @@ -1,13 +1,13 @@ -- get arxiv metadata used for matching -select - p.id, - p.title, - p.abstract, - lower(p.doi) as clean_doi, - extract(year from p.created) as year, +SELECT + arxiv_metadata_latest.id, + arxiv_metadata_latest.title, + arxiv_metadata_latest.abstract, + lower(arxiv_metadata_latest.doi) AS clean_doi, + extract(YEAR FROM arxiv_metadata_latest.created) AS year, a.last_names, - null as references -- arxiv doesn't have references -from gcp_cset_arxiv_metadata.arxiv_metadata_latest p -left join -{{staging_dataset}}.arxiv_authors a -on a.id = p.id + NULL AS references --noqa: L029 +FROM gcp_cset_arxiv_metadata.arxiv_metadata_latest +LEFT JOIN + {{ staging_dataset }}.arxiv_authors AS a + ON a.id = arxiv_metadata_latest.id diff --git a/sql/author_references_doi.sql b/sql/author_references_doi.sql index 0e927f5..4da1bdb 100644 --- a/sql/author_references_doi.sql +++ b/sql/author_references_doi.sql @@ -1,8 +1,10 @@ -- find articles that match on normalized author last names, references, and doi -select a.id as all1_id, m.id as all2_id -from {{staging_dataset}}.all_metadata_norm_filt a -inner join -{{staging_dataset}}.all_metadata_norm_filt m -on ((a.last_names_norm = m.last_names_norm) and (m.last_names_norm is not null) and (a.last_names_norm != "") and - (a.references = m.references) and (a.references is not null) and (a.references != "") and - (a.clean_doi = m.clean_doi) and (a.clean_doi is not null) and (a.clean_doi != "")) +SELECT + a.id AS all1_id, + m.id AS all2_id +FROM {{ staging_dataset }}.all_metadata_norm_filt AS a +INNER JOIN + {{ staging_dataset }}.all_metadata_norm_filt AS m + ON ((a.last_names_norm = m.last_names_norm) AND (m.last_names_norm IS NOT NULL) AND (a.last_names_norm != "") + AND (a.references = m.references) AND (a.references IS NOT NULL) AND (a.references != "") + AND (a.clean_doi = m.clean_doi) AND (a.clean_doi IS NOT NULL) AND (a.clean_doi != "")) diff --git a/sql/ids_to_drop.sql b/sql/ids_to_drop.sql new file mode 100644 index 0000000..5362b89 --- /dev/null +++ b/sql/ids_to_drop.sql @@ -0,0 +1,5 @@ +SELECT DISTINCT merged_id +FROM + literature.sources +WHERE + orig_id IN (SELECT id1 FROM staging_literature.unlink) diff --git a/sql/lens_ids.sql b/sql/lens_ids.sql index 45e0065..1333e96 100644 --- a/sql/lens_ids.sql +++ b/sql/lens_ids.sql @@ -1,6 +1,5 @@ -- get lens article ids (used in validation) -SELECT - DISTINCT(lens_id) as id +SELECT DISTINCT lens_id AS id FROM lens.scholarly -WHERE (publication_type IS NULL) OR (NOT (publication_type IN ("dataset", "editorial", "letter", "news", "review"))) +WHERE (publication_type IS NULL) OR (NOT(publication_type IN ("dataset", "editorial", "letter", "news", "review"))) diff --git a/sql/lens_metadata.sql b/sql/lens_metadata.sql index 13aa881..7a4f7d1 100644 --- a/sql/lens_metadata.sql +++ b/sql/lens_metadata.sql @@ -1,5 +1,5 @@ WITH - dois AS ( +dois AS ( -- in some cases, lens provides more than one doi per article SELECT lens_id, @@ -10,7 +10,8 @@ WITH UNNEST(external_ids) AS id WHERE id.type = "doi" ), - author_last_names AS ( + +author_last_names AS ( SELECT lens_id, ARRAY_AGG(author.last_name) AS last_names @@ -22,16 +23,18 @@ WITH author.last_name IS NOT NULL GROUP BY lens_id ), - out_citations AS ( + +out_citations AS ( SELECT scholarly.lens_id, - STRING_AGG(reference.lens_id) AS references + STRING_AGG(reference.lens_id) AS references --noqa: L029 FROM lens.scholarly CROSS JOIN UNNEST(references) AS reference GROUP BY lens_id ) + SELECT scholarly.lens_id AS id, title, @@ -44,15 +47,15 @@ FROM lens.scholarly LEFT JOIN dois -USING - (lens_id) + USING + (lens_id) LEFT JOIN author_last_names -USING - (lens_id) + USING + (lens_id) LEFT JOIN out_citations -USING - (lens_id) + USING + (lens_id) WHERE - lens_id IN (SELECT id from {{ staging_dataset }}.lens_ids) + lens_id IN (SELECT id FROM {{ staging_dataset }}.lens_ids) diff --git a/sql/lid_input.sql b/sql/lid_input.sql index 54a1fb2..ba4e90b 100644 --- a/sql/lid_input.sql +++ b/sql/lid_input.sql @@ -1,2 +1,11 @@ -- prepare input for LID (and downstream, for the combined metadata table) -select id, title, abstract, clean_doi, year, last_names, references, dataset from {{staging_dataset}}.all_metadata_norm +SELECT + id, + title, + abstract, + clean_doi, + year, + last_names, + references, + dataset +FROM {{ staging_dataset }}.all_metadata_norm diff --git a/sql/match_template.sql b/sql/match_template.sql index 7e299ad..6fd0e44 100644 --- a/sql/match_template.sql +++ b/sql/match_template.sql @@ -1,8 +1,10 @@ -- find articles that match on one of the stronger indicators (title, abstract, doi, references) and one other indicator -select a.id as all1_id, m.id as all2_id -from {{staging_dataset}}.all_metadata_norm_filt as a -inner join -{{staging_dataset}}.all_metadata_norm_filt as m -on (a.{{ params.strong }} = m.{{ params.strong }}) and - (a.{{ params.strong }} is not null) and (a.{{ params.strong }} != "") and - (a.{{ params.other }} = m.{{ params.other }}) and (a.{{ params.other }} is not null) {{params.additional_checks}} +SELECT + a.id AS all1_id, + m.id AS all2_id +FROM {{ staging_dataset }}.all_metadata_norm_filt AS a +INNER JOIN + {{ staging_dataset }}.all_metadata_norm_filt AS m + ON (a.{{ params.strong }} = m.{{ params.strong }}) + AND (a.{{ params.strong }} IS NOT NULL) AND (a.{{ params.strong }} != "") + AND (a.{{ params.other }} = m.{{ params.other }}) AND (a.{{ params.other }} IS NOT NULL) {{ params.additional_checks }} diff --git a/sql/metadata_match.sql b/sql/metadata_match.sql index c963dd9..180b5c7 100644 --- a/sql/metadata_match.sql +++ b/sql/metadata_match.sql @@ -1,4 +1,7 @@ -- get combined set of metadata matches -select distinct all1_id, all2_id from ( -{{ params.tables }} -) \ No newline at end of file +SELECT DISTINCT + all1_id, + all2_id +FROM ( + {{ params.tables }} +) diff --git a/sql/openalex_ids.sql b/sql/openalex_ids.sql index 7ae3c7f..59f629a 100644 --- a/sql/openalex_ids.sql +++ b/sql/openalex_ids.sql @@ -1,10 +1,9 @@ -- get openalex article ids (used in validation) -SELECT - DISTINCT(id) +SELECT DISTINCT id FROM openalex.works WHERE (type IS NULL) OR NOT (type IN ("dataset", "peer-review", - "grant")) \ No newline at end of file + "grant")) diff --git a/sql/openalex_metadata.sql b/sql/openalex_metadata.sql index e78817f..5979fa2 100644 --- a/sql/openalex_metadata.sql +++ b/sql/openalex_metadata.sql @@ -1,6 +1,6 @@ -- get openalex combined metadata used in matching WITH - author_names AS ( +author_names AS ( SELECT id, ARRAY_AGG(authorship.author.display_name) AS last_names @@ -12,6 +12,7 @@ WITH authorship.author.display_name IS NOT NULL GROUP BY id ) + SELECT id, title, @@ -21,20 +22,19 @@ SELECT -- full names, not last names, but the cleaning script will turn them into last names last_names, ARRAY_TO_STRING(ARRAY( - SELECT - r + SELECT r FROM UNNEST(referenced_works) AS r ORDER BY - r), ",") AS references + r), ",") AS references --noqa: L029 FROM openalex.works LEFT JOIN author_names -USING - (id) + USING + (id) WHERE (type IS NULL) OR NOT (type IN ("dataset", "peer-review", - "grant")) \ No newline at end of file + "grant")) diff --git a/sql/papers_with_code_ids.sql b/sql/papers_with_code_ids.sql index 838934a..c158dc2 100644 --- a/sql/papers_with_code_ids.sql +++ b/sql/papers_with_code_ids.sql @@ -1,2 +1,2 @@ -- get pwc ids (used in validation) -select distinct paper_url as id from papers_with_code.papers_with_abstracts \ No newline at end of file +SELECT DISTINCT paper_url AS id FROM papers_with_code.papers_with_abstracts diff --git a/sql/papers_with_code_metadata.sql b/sql/papers_with_code_metadata.sql index d138aff..b6fe969 100644 --- a/sql/papers_with_code_metadata.sql +++ b/sql/papers_with_code_metadata.sql @@ -1,9 +1,9 @@ -- aggregate pwc metadata -select - paper_url as id, +SELECT + paper_url AS id, title, abstract, - extract(year from date) as year, + extract(YEAR FROM date) AS year, -- these are actually full names, but they will be turned into last names by the cleaning script - authors as last_names -from papers_with_code.papers_with_abstracts \ No newline at end of file + authors AS last_names +FROM papers_with_code.papers_with_abstracts diff --git a/sql/references.sql b/sql/references.sql index 51b003e..7299d6a 100644 --- a/sql/references.sql +++ b/sql/references.sql @@ -12,23 +12,23 @@ WITH references AS ( UNNEST(SPLIT(references, ",")) AS reference WHERE reference IN ( - SELECT - orig_id - FROM - {{ staging_dataset }}.sources ) + SELECT orig_id + FROM + {{ staging_dataset }}.sources ) ) -SELECT - DISTINCT referencing_papers.merged_id AS merged_id, + +SELECT DISTINCT + referencing_papers.merged_id AS merged_id, referenced_papers.merged_id AS ref_id FROM references LEFT JOIN {{ staging_dataset }}.sources AS referencing_papers -ON -references.id = referencing_papers.orig_id + ON + references.id = referencing_papers.orig_id LEFT JOIN {{ staging_dataset }}.sources AS referenced_papers -ON -references.reference = referenced_papers.orig_id + ON + references.reference = referenced_papers.orig_id WHERE (referencing_papers.merged_id IS NOT NULL) AND (referenced_papers.merged_id IS NOT NULL) diff --git a/sql/s2_ids.sql b/sql/s2_ids.sql index 6a1a9e5..ceba4c2 100644 --- a/sql/s2_ids.sql +++ b/sql/s2_ids.sql @@ -1,7 +1,9 @@ -- get s2 article ids (used in validation) -SELECT - DISTINCT(CAST(corpusid AS string)) as id +SELECT DISTINCT CAST(corpusid AS STRING) AS id FROM semantic_scholar.papers -LEFT JOIN unnest(publicationtypes) as publication_type -WHERE (publication_type IS NULL) OR (NOT (publication_type IN ("Dataset", "Editorial", "LettersAndComments", "News", "Review"))) \ No newline at end of file +LEFT JOIN UNNEST(publicationtypes) AS publication_type +WHERE + ( + publication_type IS NULL + ) OR (NOT(publication_type IN ("Dataset", "Editorial", "LettersAndComments", "News", "Review"))) diff --git a/sql/s2_metadata.sql b/sql/s2_metadata.sql index 5f92fdb..55fccae 100644 --- a/sql/s2_metadata.sql +++ b/sql/s2_metadata.sql @@ -3,16 +3,17 @@ WITH paper_authors AS ( SELECT corpusid, -- full names, not last names, but the cleaning script will turn them into last names - ARRAY_AGG(author.name) as last_names + ARRAY_AGG(author.name) AS last_names FROM semantic_scholar.papers - CROSS JOIN UNNEST(authors) as author + CROSS JOIN UNNEST(authors) AS author GROUP BY corpusid ), + paper_references AS ( SELECT - citingcorpusid as corpusid, - ARRAY_TO_STRING(ARRAY_AGG(CAST(citedcorpusid AS string) ORDER BY citedcorpusid), ",") as references + citingcorpusid AS corpusid, + ARRAY_TO_STRING(ARRAY_AGG(CAST(citedcorpusid AS STRING) ORDER BY citedcorpusid), ",") AS references --noqa: L029 FROM semantic_scholar.citations GROUP BY @@ -20,24 +21,24 @@ paper_references AS ( ) SELECT - CAST(corpusid AS string) AS id, + CAST(corpusid AS STRING) AS id, title, abstract, REPLACE(LOWER(externalids.DOI), "https://doi.org/", "") AS clean_doi, - EXTRACT(year FROM publicationdate) as year, + EXTRACT(YEAR FROM publicationdate) AS year, last_names, references FROM semantic_scholar.papers INNER JOIN {{ staging_dataset }}.s2_ids -ON CAST(corpusid AS string) = id + ON CAST(corpusid AS STRING) = id LEFT JOIN semantic_scholar.abstracts -USING(corpusid) + USING (corpusid) LEFT JOIN paper_authors -USING(corpusid) + USING (corpusid) LEFT JOIN paper_references -USING(corpusid) \ No newline at end of file + USING (corpusid) diff --git a/sql/simhash_input.sql b/sql/simhash_input.sql index 9310842..3a04e6f 100644 --- a/sql/simhash_input.sql +++ b/sql/simhash_input.sql @@ -1,18 +1,18 @@ -- get input for simhash, which is articles with not null titles and abstracts that have not already been matched -select +SELECT id, year, - concat(title_norm, abstract_norm) as normalized_text -from {{staging_dataset}}.all_metadata_norm_filt -where - (year is not null) and - (title_norm is not null) and (title_norm != "") and - (abstract_norm is not null) and (abstract_norm != "") and - id not in ( - select a.id from {{staging_dataset}}.all_metadata_norm_filt a - left join - {{staging_dataset}}.all_metadata_with_cld2_lid_last_run b - on a.id = b.id - where (a.title = b.title) and (a.abstract = b.abstract) and (a.year = b.year) and (a.title != "") and - (a.title is not null) and (a.abstract != "") and (a.abstract is not null) and (a.year is not null) - ) + concat(title_norm, abstract_norm) AS normalized_text +FROM {{ staging_dataset }}.all_metadata_norm_filt +WHERE + (year IS NOT NULL) + AND (title_norm IS NOT NULL) AND (title_norm != "") + AND (abstract_norm IS NOT NULL) AND (abstract_norm != "") + AND id NOT IN ( + SELECT a.id FROM {{ staging_dataset }}.all_metadata_norm_filt AS a + LEFT JOIN + {{ staging_dataset }}.all_metadata_with_cld2_lid_last_run AS b + ON a.id = b.id + WHERE (a.title = b.title) AND (a.abstract = b.abstract) AND (a.year = b.year) AND (a.title != "") + AND (a.title IS NOT NULL) AND (a.abstract != "") AND (a.abstract IS NOT NULL) AND (a.year IS NOT NULL) + ) diff --git a/sql/sources.sql b/sql/sources.sql index 9a4b1d3..ccdde1e 100644 --- a/sql/sources.sql +++ b/sql/sources.sql @@ -1,9 +1,9 @@ -- add orig_id dataset to the sources table -select distinct +SELECT DISTINCT a.merged_id, a.orig_id, b.dataset -from {{staging_dataset}}.id_mapping a -inner join -{{staging_dataset}}.union_metadata b -on a.orig_id = b.id +FROM {{ staging_dataset }}.id_mapping AS a +INNER JOIN + {{ staging_dataset }}.union_metadata AS b + ON a.orig_id = b.id diff --git a/sql/union_ids.sql b/sql/union_ids.sql index ac07028..45efde3 100644 --- a/sql/union_ids.sql +++ b/sql/union_ids.sql @@ -1,12 +1,12 @@ -- glue all the ids together (used in validation) -select id from {{ staging_dataset }}.arxiv_ids +SELECT id FROM {{ staging_dataset }}.arxiv_ids UNION ALL -select id from {{ staging_dataset }}.wos_ids +SELECT id FROM {{ staging_dataset }}.wos_ids UNION ALL -select id from {{ staging_dataset }}.papers_with_code_ids +SELECT id FROM {{ staging_dataset }}.papers_with_code_ids UNION ALL -select id from {{ staging_dataset }}.openalex_ids +SELECT id FROM {{ staging_dataset }}.openalex_ids UNION ALL -select id from {{ staging_dataset }}.s2_ids +SELECT id FROM {{ staging_dataset }}.s2_ids UNION ALL -select id from {{ staging_dataset }}.lens_ids +SELECT id FROM {{ staging_dataset }}.lens_ids diff --git a/sql/union_metadata.sql b/sql/union_metadata.sql index 5b3bf8c..18e3cff 100644 --- a/sql/union_metadata.sql +++ b/sql/union_metadata.sql @@ -1,44 +1,87 @@ -- glue all the metadata together into one table -with meta as ( - select cast(id as string) as id, title, abstract, clean_doi, cast(year as int64) as year, last_names, - null as references, "arxiv" as dataset - from {{staging_dataset}}.arxiv_metadata +WITH meta AS ( + SELECT + cast(id AS STRING) AS id, + title, + abstract, + clean_doi, + cast(year AS INT64) AS year, + last_names, + NULL AS references, --noqa: L029 + "arxiv" AS dataset + FROM {{ staging_dataset }}.arxiv_metadata UNION ALL - select cast(id as string) as id, title, abstract, clean_doi, cast(year as int64) as year, last_names, - references, "wos" as dataset - from {{staging_dataset}}.wos_metadata + SELECT + cast(id AS STRING) AS id, + title, + abstract, + clean_doi, + cast(year AS INT64) AS year, + last_names, + references, + "wos" AS dataset + FROM {{ staging_dataset }}.wos_metadata UNION ALL - select cast(id as string) as id, title, abstract, null as clean_doi, cast(year as int64) as year, last_names, - null as references, "pwc" as dataset - from {{staging_dataset}}.papers_with_code_metadata + SELECT + cast(id AS STRING) AS id, + title, + abstract, + NULL AS clean_doi, + cast(year AS INT64) AS year, + last_names, + NULL AS references, --noqa: L029 + "pwc" AS dataset + FROM {{ staging_dataset }}.papers_with_code_metadata UNION ALL - select id, title, abstract, clean_doi, year, last_names, - references, "openalex" as dataset - from {{staging_dataset}}.openalex_metadata + SELECT + id, + title, + abstract, + clean_doi, + year, + last_names, + references, + "openalex" AS dataset + FROM {{ staging_dataset }}.openalex_metadata UNION ALL - select id, title, abstract, clean_doi, year, last_names, - references, "s2" as dataset - from {{staging_dataset}}.s2_metadata + SELECT + id, + title, + abstract, + clean_doi, + year, + last_names, + references, + "s2" AS dataset + FROM {{ staging_dataset }}.s2_metadata UNION ALL - select id, title, abstract, clean_doi, year, last_names, - references, "lens" as dataset - from {{staging_dataset}}.lens_metadata + SELECT + id, + title, + abstract, + clean_doi, + year, + last_names, + references, + "lens" AS dataset + FROM {{ staging_dataset }}.lens_metadata ), + -- add merged id refs -mapped_references as ( - select +mapped_references AS ( + SELECT id, - array_to_string(array_agg(distinct merged_id order by merged_id), ",") as references - from + array_to_string(array_agg(DISTINCT merged_id ORDER BY merged_id), ",") AS references --noqa: L029 + FROM meta - cross join unnest(split(references, ",")) as orig_id_ref - inner join + CROSS JOIN unnest(split(references, ",")) AS orig_id_ref + INNER JOIN {{ production_dataset }}.sources - on orig_id_ref = orig_id - group by id + ON orig_id_ref = orig_id + GROUP BY id ) -select +SELECT id, title, abstract, @@ -47,10 +90,10 @@ select last_names, references, dataset -from +FROM meta -union all -select +UNION ALL +SELECT id, title, abstract, @@ -59,9 +102,9 @@ select last_names, mapped_references.references, dataset -from +FROM meta -inner join +INNER JOIN mapped_references -using(id) -where array_length(split(meta.references, ",")) = array_length(split(mapped_references.references, ",")) + USING (id) +WHERE array_length(split(meta.references, ",")) = array_length(split(mapped_references.references, ",")) diff --git a/sql/wos_abstracts.sql b/sql/wos_abstracts.sql index 9d93672..c20d9fa 100644 --- a/sql/wos_abstracts.sql +++ b/sql/wos_abstracts.sql @@ -1,4 +1,6 @@ -- get wos abstracts. Note that there may be multiple versions of the abstracts (see abstract_id) -SELECT id, STRING_AGG(paragraph_text, "\n" ORDER BY CAST(paragraph_id AS INT64) ASC) as abstract +SELECT + id, + STRING_AGG(paragraph_text, "\n" ORDER BY CAST(paragraph_id AS INT64) ASC) AS abstract FROM gcp_cset_clarivate.wos_abstract_paragraphs_latest -GROUP BY id, abstract_id; \ No newline at end of file +GROUP BY id, abstract_id; diff --git a/sql/wos_authors.sql b/sql/wos_authors.sql index fccee3b..a3ab828 100644 --- a/sql/wos_authors.sql +++ b/sql/wos_authors.sql @@ -1,6 +1,7 @@ -- get author last names -select - id, array_agg(last_name IGNORE NULLS) as last_names -from gcp_cset_clarivate.wos_summary_names_latest -where role="author" -group by id +SELECT + id, + array_agg(last_name IGNORE NULLS) AS last_names +FROM gcp_cset_clarivate.wos_summary_names_latest +WHERE role = "author" +GROUP BY id diff --git a/sql/wos_ids.sql b/sql/wos_ids.sql index 701561d..a08fa5d 100644 --- a/sql/wos_ids.sql +++ b/sql/wos_ids.sql @@ -1,2 +1,2 @@ -- get wos ids (used in validation) -select distinct(id) from gcp_cset_clarivate.wos_dynamic_identifiers_latest \ No newline at end of file +SELECT DISTINCT id FROM gcp_cset_clarivate.wos_dynamic_identifiers_latest diff --git a/sql/wos_metadata.sql b/sql/wos_metadata.sql index af5cd9f..e74e7ea 100644 --- a/sql/wos_metadata.sql +++ b/sql/wos_metadata.sql @@ -8,30 +8,35 @@ SELECT d.last_names, e.references FROM - {{staging_dataset}}.wos_ids ids + {{ staging_dataset }}.wos_ids AS ids LEFT JOIN - {{staging_dataset}}.wos_pubyears a -ON - ids.id = a.id + {{ staging_dataset }}.wos_pubyears AS a + ON + ids.id = a.id LEFT JOIN - {{staging_dataset}}.wos_titles b -ON - ids.id = b.id + {{ staging_dataset }}.wos_titles AS b + ON + ids.id = b.id LEFT JOIN - {{staging_dataset}}.wos_abstracts c -ON - ids.id = c.id + {{ staging_dataset }}.wos_abstracts AS c + ON + ids.id = c.id LEFT JOIN - {{staging_dataset}}.wos_authors d -ON - ids.id = d.id + {{ staging_dataset }}.wos_authors AS d + ON + ids.id = d.id LEFT JOIN - (select id, string_agg(ref_id order by ref_id) as references - from gcp_cset_clarivate.wos_references_latest group by id) e -ON - ids.id = e.id + (SELECT + id, + string_agg(ref_id ORDER BY ref_id) AS references --noqa: L029 + FROM gcp_cset_clarivate.wos_references_latest GROUP BY id) AS e + ON + ids.id = e.id LEFT JOIN - (select id, lower(identifier_value) as clean_doi from gcp_cset_clarivate.wos_dynamic_identifiers_latest where - (identifier_type="doi") and (identifier_value is not null)) f -ON - ids.id = f.id \ No newline at end of file + (SELECT + id, + lower(identifier_value) AS clean_doi + FROM gcp_cset_clarivate.wos_dynamic_identifiers_latest WHERE + (identifier_type = "doi") AND (identifier_value IS NOT NULL)) AS f + ON + ids.id = f.id diff --git a/sql/wos_pubyears.sql b/sql/wos_pubyears.sql index b106722..0921811 100644 --- a/sql/wos_pubyears.sql +++ b/sql/wos_pubyears.sql @@ -1,2 +1,5 @@ -- get wos publication year -select id, pubyear as year from gcp_cset_clarivate.wos_summary_latest \ No newline at end of file +SELECT + id, + pubyear AS year +FROM gcp_cset_clarivate.wos_summary_latest diff --git a/sql/wos_titles.sql b/sql/wos_titles.sql index 41e4377..56b3e7c 100644 --- a/sql/wos_titles.sql +++ b/sql/wos_titles.sql @@ -1,7 +1,7 @@ -- get wos titles (note that there may be more than one per article in different languages) -select +SELECT id, - title_id as title_id, - title as title -from gcp_cset_clarivate.wos_titles_latest -where title_type="item" + title_id AS title_id, + title AS title +FROM gcp_cset_clarivate.wos_titles_latest +WHERE title_type = "item" diff --git a/tests/static/test_create_match_keys/ids_to_drop/data.jsonl b/tests/static/test_create_match_keys/ids_to_drop/data.jsonl new file mode 100644 index 0000000..22cb0f2 --- /dev/null +++ b/tests/static/test_create_match_keys/ids_to_drop/data.jsonl @@ -0,0 +1 @@ +{"merged_id": "carticle_0000000003"} diff --git a/tests/static/test_create_match_keys/input/input.jsonl b/tests/static/test_create_match_keys/input/input.jsonl index bd95067..1876580 100644 --- a/tests/static/test_create_match_keys/input/input.jsonl +++ b/tests/static/test_create_match_keys/input/input.jsonl @@ -3,3 +3,5 @@ {"orig_id": "D", "merged_id": "carticle_0000000002"} {"orig_id": "E", "merged_id": "carticle_0000000002"} {"orig_id": "F", "merged_id": "carticle_0000000001"} +{"orig_id": "I", "merged_id": "carticle_0000000003"} +{"orig_id": "J", "merged_id": "carticle_0000000003"} diff --git a/tests/static/test_skip_matches_ids/data.jsonl b/tests/static/test_skip_matches_ids/data.jsonl new file mode 100644 index 0000000..a8b7d05 --- /dev/null +++ b/tests/static/test_skip_matches_ids/data.jsonl @@ -0,0 +1,6 @@ +{"id1": "A", "id2": "B"} +{"id1": "B", "id2": "A"} +{"id1": "B", "id2": "C"} +{"id1": "C", "id2": "B"} +{"id1": "D", "id2": "E"} +{"id1": "E", "id2": "D"} diff --git a/tests/static/test_skip_matches_ids_to_skip/data.jsonl b/tests/static/test_skip_matches_ids_to_skip/data.jsonl new file mode 100644 index 0000000..673f259 --- /dev/null +++ b/tests/static/test_skip_matches_ids_to_skip/data.jsonl @@ -0,0 +1,2 @@ +{"id1": "B", "id2": "C"} +{"id1": "D", "id2": "E"} diff --git a/tests/test_clean_corpus.py b/tests/test_clean_corpus.py index 7225b7e..3fe07e3 100644 --- a/tests/test_clean_corpus.py +++ b/tests/test_clean_corpus.py @@ -1,5 +1,6 @@ import json import unittest + from utils.clean_corpus import Scrub @@ -10,18 +11,27 @@ class TestCleanCorpus(unittest.TestCase): def test_strip_copyright_yes(self): self.assertEqual("test", self.scrubber.strip_copyright("test (C) 2014 test1")) self.assertEqual("test", self.scrubber.strip_copyright("test (c) 2014 test1")) - self.assertEqual("test", self.scrubber.strip_copyright("test copyright 2014 test1")) + self.assertEqual( + "test", self.scrubber.strip_copyright("test copyright 2014 test1") + ) def test_strip_copyright_no(self): - self.assertEqual("test copyright test1", self.scrubber.strip_copyright("test copyright test1")) - self.assertEqual("(a) the first item (b) the second item (c) the third item", - self.scrubber.strip_copyright("(a) the first item (b) the second item (c) the third item")) + self.assertEqual( + "test copyright test1", + self.scrubber.strip_copyright("test copyright test1"), + ) + self.assertEqual( + "(a) the first item (b) the second item (c) the third item", + self.scrubber.strip_copyright( + "(a) the first item (b) the second item (c) the third item" + ), + ) def test_clean_text_data(self): input_record = { "title": "Something un-normalized!", "abstract": "你好世界 copyright (C) 2014", - "last_names": ["smith", "amy s. li", "界"] + "last_names": ["smith", "amy s. li", "界"], } expected_output_record = { "title": "Something un-normalized!", @@ -29,7 +39,9 @@ def test_clean_text_data(self): "last_names": ["smith", "amy s. li", "界"], "title_norm": "somethingunnormalized", "abstract_norm": "你好世界", - "last_names_norm": "li smith 界" + "last_names_norm": "li smith 界", } scrubber_output_generator = self.scrubber.process(json.dumps(input_record)) - self.assertEqual(json.dumps(expected_output_record), list(scrubber_output_generator)[0]) + self.assertEqual( + json.dumps(expected_output_record), list(scrubber_output_generator)[0] + ) diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index 7fb090c..ee4b8cd 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -1,11 +1,13 @@ import json +import os import shutil import unittest -import os -from utils.create_merge_ids import create_match_sets, create_match_keys + +from utils.create_merge_ids import create_match_keys, create_match_sets static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") + class TestGetCombinedMap(unittest.TestCase): maxDiff = None @@ -35,43 +37,70 @@ def test_get_combined_map5(self): match_dir = os.path.join(static_dir, "test_get_combined_map5") result_set_large = {"A", "B", "C", "D", "E"} result_set_small = {"F", "G", "H"} - expected_result = sorted([result_set_small, result_set_large], key=lambda k : len(k)) - actual_result = sorted(create_match_sets(match_dir), key=lambda k : len(k)) + expected_result = sorted( + [result_set_small, result_set_large], key=lambda k: len(k) + ) + actual_result = sorted(create_match_sets(match_dir), key=lambda k: len(k)) self.assertEqual(actual_result, expected_result) def test_get_match_sets_with_extra_id(self): # test with three disconnected sets. The set A - E will have one extra id (E) that should get filtered, and # the "small" set F-H will all be extra ids that should be filtered. The other small set I-J will have ids # distributed across two id files, but the set should be included. - match_dir = os.path.join(static_dir, "test_get_match_sets_with_extra_id", "match_pairs") + match_dir = os.path.join( + static_dir, "test_get_match_sets_with_extra_id", "match_pairs" + ) ids_dir = os.path.join(static_dir, "test_get_match_sets_with_extra_id", "ids") result_set_large = {"A", "B", "C", "D"} result_set_small = {"I", "J"} - expected_result = sorted([result_set_small, result_set_large], key=lambda k : len(k)) - actual_result = sorted(create_match_sets(match_dir, ids_dir), key=lambda k : len(k)) + expected_result = sorted( + [result_set_small, result_set_large], key=lambda k: len(k) + ) + actual_result = sorted( + create_match_sets(match_dir, ids_dir), key=lambda k: len(k) + ) self.assertEqual(actual_result, expected_result) + def test_skip_matches(self): + # test without matches excluded + match_dir = os.path.join(static_dir, "test_skip_matches_ids") + expected_result_no_excludes = [{"A", "B", "C"}, {"D", "E"}] + self.assertEqual(create_match_sets(match_dir), expected_result_no_excludes) + # test with matches excluded + exclude_dir = os.path.join(static_dir, "test_skip_matches_ids_to_skip") + expected_result_excludes = [{"A", "B"}, {"C"}, {"D"}, {"E"}] + self.assertEqual( + create_match_sets(match_dir, exclude_dir=exclude_dir), + expected_result_excludes, + ) + def test_create_match_keys(self): - # the first set will contain two old elts from the same match set and one new elt; should keep its id - # the next will contain one elt from one match set, two from another; should change ids - # the last will contain only new ids; should get a new id - match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}] + # The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id. + # The next (D, E, F) contains one elt from one match set, two from another; should change ids. + # Another (G, H) contains only new ids; should get a new id. + # The last two (I and J) are two different match sets that share an old id and are in ids_to_drop; + # each should get a new id (this is in case of unlinking). + match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}] out_dir = os.path.join(static_dir, "test_create_match_keys", "output") if os.path.exists(out_dir): shutil.rmtree(out_dir) os.mkdir(out_dir) out_fi = os.path.join(out_dir, "output.jsonl") id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input") + ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop") expected_output = [ {"orig_id": "A", "merged_id": "carticle_0000000001"}, {"orig_id": "B", "merged_id": "carticle_0000000001"}, {"orig_id": "C", "merged_id": "carticle_0000000001"}, - {"orig_id": "D", "merged_id": "carticle_0000000003"}, - {"orig_id": "E", "merged_id": "carticle_0000000003"}, - {"orig_id": "F", "merged_id": "carticle_0000000003"}, - {"orig_id": "G", "merged_id": "carticle_0000000004"}, - {"orig_id": "H", "merged_id": "carticle_0000000004"}, + {"orig_id": "D", "merged_id": "carticle_0000000004"}, + {"orig_id": "E", "merged_id": "carticle_0000000004"}, + {"orig_id": "F", "merged_id": "carticle_0000000004"}, + {"orig_id": "G", "merged_id": "carticle_0000000005"}, + {"orig_id": "H", "merged_id": "carticle_0000000005"}, + {"orig_id": "I", "merged_id": "carticle_0000000006"}, + {"orig_id": "J", "merged_id": "carticle_0000000007"}, ] - create_match_keys(match_sets, out_fi, id_mapping_dir) + print(expected_output) + create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir) out = [json.loads(x) for x in open(out_fi).readlines()] self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"])) diff --git a/tests/test_make_unlink_rows.py b/tests/test_make_unlink_rows.py new file mode 100644 index 0000000..640e6d5 --- /dev/null +++ b/tests/test_make_unlink_rows.py @@ -0,0 +1,41 @@ +import json +import os +import shutil +import unittest + +from utils.make_unlink_rows import make_pairs + +static_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") + + +class TestMakeUnlinkRows(unittest.TestCase): + @staticmethod + def gen_sort_key(pair: tuple) -> str: + return f"{pair[0]}-{pair[1]}" + + def test_make_pairs(self): + manual_to_orig = {"1": {"a", "b"}, "2": {"d", "e"}, "3": {"f"}} + expected_output = sorted( + [ + ("a", "d"), + ("a", "e"), + ("a", "f"), + ("b", "d"), + ("b", "e"), + ("b", "f"), + ("d", "a"), + ("d", "b"), + ("d", "f"), + ("e", "a"), + ("e", "b"), + ("e", "f"), + ("f", "a"), + ("f", "b"), + ("f", "d"), + ("f", "e"), + ], + key=self.gen_sort_key, + ) + self.assertEqual( + expected_output, sorted(make_pairs(manual_to_orig), key=self.gen_sort_key) + ) diff --git a/utils/clean_corpus.py b/utils/clean_corpus.py index 16406b4..3c29c82 100644 --- a/utils/clean_corpus.py +++ b/utils/clean_corpus.py @@ -3,18 +3,25 @@ import json import re import unicodedata -import apache_beam as beam +from typing import Iterable +import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -from gensim.parsing.preprocessing import * +from gensim.parsing.preprocessing import ( + preprocess_string, + strip_non_alphanum, + strip_numeric, + strip_punctuation, + strip_tags, +) from gensim.utils import deaccent -from typing import Iterable """ This script normalizes string fields as needed for matching. The resulting normalized data should only be used for matching purposes as it does not contain whitespace! """ + class Scrub(beam.DoFn): """ Beam pipeline to normalize data for linkage. @@ -49,13 +56,21 @@ def clean_text_data(self, value_to_clean: object, field: str) -> list: """ if value_to_clean is None: return None - cleaning_functions = [lambda x: unicodedata.normalize("NFKC", x), deaccent, strip_tags] + cleaning_functions = [ + lambda x: unicodedata.normalize("NFKC", x), + deaccent, + strip_tags, + ] if field == "abstract": cleaning_functions.append(self.strip_copyright) cleaning_functions += [strip_punctuation, strip_numeric, strip_non_alphanum] if field in ["last_names", "last_name"]: # text is a list, make it into a string - last_names = [x.strip().split()[-1].lower() for x in value_to_clean if len(x.split()) > 0] + last_names = [ + x.strip().split()[-1].lower() + for x in value_to_clean + if len(x.split()) > 0 + ] value_to_clean = " ".join(sorted(last_names)) clean_string_parts = preprocess_string(value_to_clean, cleaning_functions) return [x.strip().lower() for x in clean_string_parts] @@ -86,13 +101,17 @@ def process(self, record_str) -> Iterable: elif field in ["title", "abstract", "last_name", "last_names"]: cleaned = self.clean_text_data(js[field], field) delimiter = "" if field in ["title", "abstract"] else " " - clean_record[field+"_norm"] = delimiter.join(cleaned) if cleaned else None + clean_record[field + "_norm"] = ( + delimiter.join(cleaned) if cleaned else None + ) else: - raise ValueError(field+" is not supported by clean_corpus") + raise ValueError(field + " is not supported by clean_corpus") yield json.dumps(clean_record) -def run_pipeline(input_dir: str, output_dir: str, fields_to_clean: list, pipeline_args: list) -> None: +def run_pipeline( + input_dir: str, output_dir: str, fields_to_clean: list, pipeline_args: list +) -> None: """ Run a beam pipeline that cleans all records within all files in input_dir :param input_dir: Directory of jsonl files to clean. Can be local or gcs @@ -102,20 +121,25 @@ def run_pipeline(input_dir: str, output_dir: str, fields_to_clean: list, pipelin :return: None """ with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p: - (p | "Read from Text" >> beam.io.ReadFromText(input_dir) + ( + p + | "Read from Text" >> beam.io.ReadFromText(input_dir) | "Scrub Text" >> beam.ParDo(Scrub(fields_to_clean)) - | "Write to Text" >> beam.io.WriteToText(output_dir)) + | "Write to Text" >> beam.io.WriteToText(output_dir) + ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--input_dir", required=True) parser.add_argument("--output_dir", required=True) - parser.add_argument("--fields_to_clean", required=True, - help="comma-separated list of fields that should be cleaned within each record") + parser.add_argument( + "--fields_to_clean", + required=True, + help="comma-separated list of fields that should be cleaned within each record", + ) args, pipeline_args = parser.parse_known_args() - run_pipeline(args.input_dir, args.output_dir, args.fields_to_clean.split(","), pipeline_args) - - - + run_pipeline( + args.input_dir, args.output_dir, args.fields_to_clean.split(","), pipeline_args + ) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index 29b7acd..299d133 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -7,13 +7,14 @@ each id to each article in its match set. """ + def create_cset_article_id(idx: int): """ Create CSET article id, e.g. carticle_0000000001 :param idx: article number :return: string in the form carticle_0000000001 """ - zero_padding = "0"*(10-len(str(idx))) + zero_padding = "0" * (10 - len(str(idx))) return f"carticle_{zero_padding}{idx}" @@ -31,7 +32,9 @@ def get_connected_edges(adj_list: dict, key: str) -> set: v = to_explore.pop() if v not in conn_edges: conn_edges.add(v) - to_explore = to_explore.union({k for k in adj_list[v] if k not in conn_edges}) + to_explore = to_explore.union( + {k for k in adj_list[v] if k not in conn_edges} + ) return conn_edges @@ -45,7 +48,7 @@ def get_usable_ids(ids_dir: str) -> set: return None usable_ids = set() for fi in os.listdir(ids_dir): - print("reading "+fi) + print("reading " + fi) with open(os.path.join(ids_dir, fi)) as f: for line in f: js = json.loads(line) @@ -53,14 +56,41 @@ def get_usable_ids(ids_dir: str) -> set: return usable_ids -def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list: +def get_exclude_matches(exclude_dir: str) -> dict: + """ + Build dict mapping ids to sets of other ids they should not be matched to + :param exclude_dir: directory of jsonl files containing article pairs that should not be matched together + :return: dict mapping an id to a set of ids that are not valid matches + """ + dont_match = {} + if not exclude_dir: + return dont_match + for fi in os.listdir(exclude_dir): + with open(os.path.join(exclude_dir, fi)) as f: + for line in f: + js = json.loads(line) + if js["id1"] not in dont_match: + dont_match[js["id1"]] = set() + if js["id2"] not in dont_match: + dont_match[js["id2"]] = set() + dont_match[js["id1"]].add(js["id2"]) + dont_match[js["id2"]].add(js["id1"]) + return dont_match + + +def create_match_sets( + match_dir: str, current_ids_dir: str = None, exclude_dir: str = None +) -> list: """ Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles, including "transitive matches". - :param match_dir: directory of exported jsonl files containing article matches, with keys "`dataset`1_id" and "`dataset`2_id" + :param match_dir: directory of exported jsonl files containing article matches :param current_ids_dir: optional dir containing the current set of ids to use in jsonl form. If None, all ids will be used + :param exclude_dir: directory of jsonl files containing article pairs that should not be matched together :return: list of sets of matched articles """ + print("reading pairs to not match") + dont_match = get_exclude_matches(exclude_dir) print("getting adjacency lists") adj_list = {} usable_ids = get_usable_ids(current_ids_dir) @@ -70,16 +100,20 @@ def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list: js = json.loads(line) key1 = js["id1"] key2 = js["id2"] - if (usable_ids is not None) and ((key1 not in usable_ids) or (key2 not in usable_ids)): + if (usable_ids is not None) and ( + (key1 not in usable_ids) or (key2 not in usable_ids) + ): continue if key1 not in adj_list: adj_list[key1] = set() - adj_list[key1].add(key2) + if key2 not in dont_match.get(key1, set()): + adj_list[key1].add(key2) # even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A, # this will ensure they get added to the same match set if key2 not in adj_list: adj_list[key2] = set() - adj_list[key2].add(key1) + if key1 not in dont_match.get(key2, set()): + adj_list[key2].add(key1) seen_ids = set() match_sets = [] for k in adj_list.keys(): @@ -93,12 +127,15 @@ def create_match_sets(match_dir: str, current_ids_dir: str = None) -> list: return match_sets -def create_match_keys(match_sets: list, match_file: str, prev_id_mapping_dir: str = None): +def create_match_keys( + match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None +): """ Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match set to that id :param match_sets: list of match sets :param match_file: file where id mapping should be written + :param ids_to_drop: directory containing merged ids that should not be used in jsonl form :param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form :return: None """ @@ -116,15 +153,23 @@ def create_match_keys(match_sets: list, match_file: str, prev_id_mapping_dir: st prev_orig_to_merg[orig_id] = merg_id if merg_id > max_merg: max_merg = merg_id - match_id = int(max_merg.split("carticle_")[1])+1 + ignore_ids = set() + for fi in os.listdir(ids_to_drop): + with open(os.path.join(ids_to_drop, fi)) as f: + for line in f: + js = json.loads(line.strip()) + ignore_ids.add(js["merged_id"]) + match_id = int(max_merg.split("carticle_")[1]) + 1 num_new, num_old = 0, 0 for ms in match_sets: cset_article_id = None # if we have exactly one existing id, reuse it, even if new articles are matched to it. # if two articles that previously had different carticle ids are now in the same match set, # create a new carticle id - existing_ids = set([prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg]) - if len(existing_ids) == 1: + existing_ids = set( + [prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg] + ) + if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids: cset_article_id = existing_ids.pop() num_old += 1 else: @@ -132,24 +177,50 @@ def create_match_keys(match_sets: list, match_file: str, prev_id_mapping_dir: st num_new += 1 match_id += 1 for article in ms: - out.write(json.dumps({ - "merged_id": cset_article_id, - "orig_id": article - })+"\n") + out.write( + json.dumps({"merged_id": cset_article_id, "orig_id": article}) + + "\n" + ) print(f"wrote {num_new} new ids and reused {num_old} ids") if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--match_dir", required=True, - help="directory of exported jsonl from bigquery containing pairs of article matches") - parser.add_argument("--merge_file", required=True, help="file where merged ids should be written") - parser.add_argument("--prev_id_mapping_dir", - help="directory of exported jsonl from bigquery containing pairs of article matches") - parser.add_argument("--current_ids_dir", help=("directory containing jsonl with one key, 'id'. " - "These are the ids that should be included in output. " - "If None, no ids will be filtered.")) + parser.add_argument( + "--match_dir", + required=True, + help="directory of exported jsonl from bigquery containing pairs of article matches", + ) + parser.add_argument( + "--exclude_dir", + required=True, + help="directory of article pairs that should not be matched", + ) + parser.add_argument( + "--ids_to_drop", + required=True, + help="file containing ids that should not be used", + ) + parser.add_argument( + "--merge_file", required=True, help="file where merged ids should be written" + ) + parser.add_argument( + "--prev_id_mapping_dir", + help="directory of exported jsonl from bigquery containing pairs of article matches", + ) + parser.add_argument( + "--current_ids_dir", + help=( + "directory containing jsonl with one key, 'id'. " + "These are the ids that should be included in output. " + "If None, no ids will be filtered." + ), + ) args = parser.parse_args() - match_sets = create_match_sets(args.match_dir, args.current_ids_dir) - create_match_keys(match_sets, args.merge_file, args.prev_id_mapping_dir) + match_sets = create_match_sets( + args.match_dir, args.current_ids_dir, args.exclude_dir + ) + create_match_keys( + match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir + ) diff --git a/utils/make_unlink_rows.py b/utils/make_unlink_rows.py new file mode 100644 index 0000000..b9a8305 --- /dev/null +++ b/utils/make_unlink_rows.py @@ -0,0 +1,58 @@ +import argparse +import csv + + +def make_pairs(manual_to_orig: dict) -> list: + """ + Make all pairs of ids that should be unlinked + :param manual_to_orig: Dict mapping manually assigned ids to original ids that we believe to be the same article + :return: A list of pairs of ids that should not be linked together + """ + pairs = [] + for manual1 in manual_to_orig: + for orig1 in manual_to_orig[manual1]: + for manual2 in manual_to_orig: + if manual1 == manual2: + continue + for orig2 in manual_to_orig[manual2]: + pairs.append((orig1, orig2)) + return pairs + + +def write_unlink_rows(unlinking_file: str, output_file: str) -> None: + """ + Write a sql file containing a query that adds new rows to the staging_literature.unlink table + :param unlinking_file: CSV containing two columns, `manual_id` (a manually assigned id marking articles that are the same), + and `orig_id`, the id for the article in its source corpus + :param output_file: SQL file containing a query that adds new rows to staging_literature.unlink + :return: None + """ + manual_to_orig = {} + with open(unlinking_file) as f: + for line in csv.DictReader(f): + if line["manual_id"] not in manual_to_orig: + manual_to_orig[line["manual_id"]] = set() + manual_to_orig[line["manual_id"]].add(line["orig_id"]) + pairs = make_pairs(manual_to_orig) + with open(output_file, mode="w") as out: + out.write( + "create or replace table staging_literature.unlink as\nselect id1, id2 from staging_literature.unlink\nunion all\n" + ) + out.write( + "\nunion all\n".join( + [f'select "{id1}" as id1, "{id2}" as id2' for id1, id2 in pairs] + ) + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "unlinking_file", help="csv with two columns: manual_id and orig_id" + ) + parser.add_argument( + "output_file", help="file where query adding new rows should be written" + ) + args = parser.parse_args() + + write_unlink_rows(args.unlinking_file, args.output_file) diff --git a/utils/mk_dataset_pkl.py b/utils/mk_dataset_pkl.py index 2799323..44bf0a6 100644 --- a/utils/mk_dataset_pkl.py +++ b/utils/mk_dataset_pkl.py @@ -10,8 +10,12 @@ parser.add_argument("output_pkl_file") args = parser.parse_args() - fields = [args.source+"_title", args.source+"_abstract", args.source+"_last_names"] - meta_map = { f: {} for f in fields } + fields = [ + args.source + "_title", + args.source + "_abstract", + args.source + "_last_names", + ] + meta_map = {f: {} for f in fields} for fi in os.listdir(args.input_dir): for line in open(os.path.join(args.input_dir, fi)): js = json.loads(line) @@ -21,6 +25,6 @@ val = " ".join(sorted([x.split()[-1] for x in js[field]])) if val not in meta_map[field]: meta_map[field][val] = [] - meta_map[field][val].append(js[args.source+"_id"]) + meta_map[field][val].append(js[args.source + "_id"]) - pickle.dump(meta_map, open(args.output_pkl_file, mode="wb")) \ No newline at end of file + pickle.dump(meta_map, open(args.output_pkl_file, mode="wb")) diff --git a/utils/mk_id_pkl.py b/utils/mk_id_pkl.py index 6926de3..9a98e06 100644 --- a/utils/mk_id_pkl.py +++ b/utils/mk_id_pkl.py @@ -14,10 +14,13 @@ for fi in os.listdir(args.input_dir): for line in open(os.path.join(args.input_dir, fi)): js = json.loads(line) - id_map[js[args.source+"_id"]] = { - args.source+"_title": js[args.source+"_title"], - args.source+"_abstract": js[args.source+"_abstract"], - args.source+"_last_names": " ".join(sorted([x.split()[-1] for x in js[args.source+"_last_names"]])) + id_map[js[args.source + "_id"]] = { + args.source + "_title": js[args.source + "_title"], + args.source + "_abstract": js[args.source + "_abstract"], + args.source + + "_last_names": " ".join( + sorted([x.split()[-1] for x in js[args.source + "_last_names"]]) + ), } - pickle.dump(id_map, open(args.output_pkl_file, mode="wb")) \ No newline at end of file + pickle.dump(id_map, open(args.output_pkl_file, mode="wb")) diff --git a/utils/mk_indexes.py b/utils/mk_indexes.py index 8226bb1..31ad558 100644 --- a/utils/mk_indexes.py +++ b/utils/mk_indexes.py @@ -5,6 +5,7 @@ import os import pickle import re + from simhash import Simhash, SimhashIndex """ @@ -13,21 +14,26 @@ def get_features(s: str) -> list: - ''' + """ The default feature extraction method, from https://github.com/leonsim/simhash - ''' + """ width = 3 s = s.lower() s = re.sub(r"[^\w]+", "", s) - return [s[i:i + width] for i in range(max(len(s) - width + 1, 1))] + return [s[i : i + width] for i in range(max(len(s) - width + 1, 1))] def write_sim_strings(data_fi: str, output_fi: str) -> None: - ''' + """ Does the similarity matching and writes out the outputs. Basic method from from https://github.com/leonsim/simhash - ''' - data_ids_and_values = [line.strip().split("\t") for line in open(data_fi).readlines()] - objs = [(article_id, Simhash(get_features(article_text))) for article_id, article_text in data_ids_and_values] + """ + data_ids_and_values = [ + line.strip().split("\t") for line in open(data_fi).readlines() + ] + objs = [ + (article_id, Simhash(get_features(article_text))) + for article_id, article_text in data_ids_and_values + ] index = SimhashIndex(objs, k=3) pickle.dump(index, open(output_fi, mode="wb")) @@ -35,11 +41,21 @@ def write_sim_strings(data_fi: str, output_fi: str) -> None: if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("input_dir", help="directory of jsonl") - parser.add_argument("output_dir", help="directory where output indexes should be written") + parser.add_argument( + "output_dir", help="directory where output indexes should be written" + ) args = parser.parse_args() years = [y.strip(".tsv") for y in os.listdir(args.input_dir)] print("running simhash indexes") with multiprocessing.Pool() as p: - p.starmap(write_sim_strings, - [(os.path.join(args.input_dir, year+".tsv"), os.path.join(args.output_dir, year+".pkl")) for year in years]) + p.starmap( + write_sim_strings, + [ + ( + os.path.join(args.input_dir, year + ".tsv"), + os.path.join(args.output_dir, year + ".pkl"), + ) + for year in years + ], + ) diff --git a/utils/my_simhash.py b/utils/my_simhash.py index 52f6c17..7a15493 100644 --- a/utils/my_simhash.py +++ b/utils/my_simhash.py @@ -3,12 +3,12 @@ # it has some logging which was overwhelming airflow removed from __future__ import division, unicode_literals -import re -import sys +import collections import hashlib import logging import numbers -import collections +import re +import sys from itertools import groupby if sys.version_info[0] >= 3: @@ -22,17 +22,14 @@ def _hashfunc(x): class Simhash(object): - - def __init__( - self, value, f=64, reg=r'[\w\u4e00-\u9fcc]+', hashfunc=None, log=None - ): + def __init__(self, value, f=64, reg=r"[\w\u4e00-\u9fcc]+", hashfunc=None, log=None): """ `f` is the dimensions of fingerprints `reg` is meaningful only when `value` is basestring and describes what is considered to be a letter inside parsed string. Regexp object can also be specified (some attempt to handle any letters - is to specify reg=re.compile(r'\w', re.UNICODE)) + is to specify reg=re.compile(r'\w', re.UNICODE)) # noqa: W605 `hashfunc` accepts a utf-8 encoded string and returns a unsigned integer in at least `f` bits. @@ -61,7 +58,7 @@ def __init__( elif isinstance(value, numbers.Integral): self.value = value else: - raise Exception('Bad parameter with type {}'.format(type(value))) + raise Exception("Bad parameter with type {}".format(type(value))) def __eq__(self, other): """ @@ -72,17 +69,17 @@ def __eq__(self, other): return self.value == other.value def _slide(self, content, width=4): - return [content[i:i + width] for i in range(max(len(content) - width + 1, 1))] + return [content[i : i + width] for i in range(max(len(content) - width + 1, 1))] def _tokenize(self, content): content = content.lower() - content = ''.join(re.findall(self.reg, content)) + content = "".join(re.findall(self.reg, content)) ans = self._slide(content) return ans def build_by_text(self, content): features = self._tokenize(content) - features = {k:sum(1 for _ in g) for k, g in groupby(sorted(features))} + features = {k: sum(1 for _ in g) for k, g in groupby(sorted(features))} return self.build_by_features(features) def build_by_features(self, features): @@ -97,16 +94,16 @@ def build_by_features(self, features): features = features.items() for f in features: if isinstance(f, basestring): - h = self.hashfunc(f.encode('utf-8')) + h = self.hashfunc(f.encode("utf-8")) w = 1 else: assert isinstance(f, collections.Iterable) - h = self.hashfunc(f[0].encode('utf-8')) + h = self.hashfunc(f[0].encode("utf-8")) w = f[1] for i in range(self.f): v[i] += w if h & masks[i] else -w # use reversed binary str to keep the backward compatibility - binary_str = ''.join(['0' if i <= 0 else '1' for i in v[::-1]]) + binary_str = "".join(["0" if i <= 0 else "1" for i in v[::-1]]) self.value = int(binary_str, 2) def distance(self, another): @@ -120,7 +117,6 @@ def distance(self, another): class SimhashIndex(object): - def __init__(self, objs, f=64, k=2, log=None): """ `objs` is a list of (obj_id, simhash) @@ -137,13 +133,13 @@ def __init__(self, objs, f=64, k=2, log=None): else: self.log = log - self.log.info('Initializing %s data.', count) + self.log.info("Initializing %s data.", count) self.bucket = collections.defaultdict(set) for i, q in enumerate(objs): if i % 10000 == 0 or i == count - 1: - self.log.info('%s/%s', i + 1, count) + self.log.info("%s/%s", i + 1, count) self.add(*q) @@ -158,12 +154,12 @@ def get_near_dups(self, simhash): for key in self.get_keys(simhash): dups = self.bucket[key] - self.log.debug('key:%s', key) -# if len(dups) > 200: -# self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) + self.log.debug("key:%s", key) + # if len(dups) > 200: + # self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups)) for dup in dups: - sim2, obj_id = dup.split(',', 1) + sim2, obj_id = dup.split(",", 1) sim2 = Simhash(long(sim2, 16), self.f) d = simhash.distance(sim2) @@ -179,7 +175,7 @@ def add(self, obj_id, simhash): assert simhash.f == self.f for key in self.get_keys(simhash): - v = '%x,%s' % (simhash.value, obj_id) + v = "%x,%s" % (simhash.value, obj_id) self.bucket[key].add(v) def delete(self, obj_id, simhash): @@ -190,7 +186,7 @@ def delete(self, obj_id, simhash): assert simhash.f == self.f for key in self.get_keys(simhash): - v = '%x,%s' % (simhash.value, obj_id) + v = "%x,%s" % (simhash.value, obj_id) if v in self.bucket[key]: self.bucket[key].remove(v) @@ -208,7 +204,7 @@ def get_keys(self, simhash): else: m = 2 ** (self.offsets[i + 1] - offset) - 1 c = simhash.value >> offset & m - yield '%x:%x' % (c, i) + yield "%x:%x" % (c, i) def bucket_size(self): return len(self.bucket) diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh new file mode 100644 index 0000000..59a0db8 --- /dev/null +++ b/utils/run_ids_scripts.sh @@ -0,0 +1,9 @@ +cd /mnt/disks/data/run +gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done +python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs +/snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/ +/snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ +/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/ +/snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes_archive/$(date +%F)/ +touch ids_are_done +gsutil cp ids_are_done gs://airflow-data-exchange/article_linkage/tmp/done_files/ diff --git a/utils/run_lid.py b/utils/run_lid.py index 233cf80..1ed7f35 100644 --- a/utils/run_lid.py +++ b/utils/run_lid.py @@ -3,20 +3,21 @@ import argparse import ast -import chardet import json import logging -import pycld2 as cld2 -import apache_beam as beam +from typing import Iterable +import apache_beam as beam +import chardet +import pycld2 as cld2 from apache_beam.options.pipeline_options import PipelineOptions -from typing import Iterable class LangId(beam.DoFn): """ Beam pipeline to do Language ID. """ + def __init__(self, fields_to_lid: list) -> None: self.fields_to_lid = fields_to_lid @@ -36,18 +37,21 @@ def add_cld2_outputs(self, record: dict) -> None: is_reliable, text_bytes_found, details = cld2.detect(record[field]) except cld2.error as e: logging.warning("utf-8 failed, attempting to use result of chardet") + logging.warning(e) encoding = chardet.detect(record[field].encode("utf-8"))["encoding"] if encoding is None: - encoding = "latin-1" # last-ditch effort... - is_reliable, text_bytes_found, details = cld2.detect(record[field].encode("utf-8").decode(encoding)) - record[field+"_cld2_lid_success"] = True + encoding = "latin-1" # last-ditch effort... + is_reliable, text_bytes_found, details = cld2.detect( + record[field].encode("utf-8").decode(encoding) + ) + record[field + "_cld2_lid_success"] = True record[field + "_cld2_lid_is_reliable"] = is_reliable # details looks like: (('RUSSIAN', 'ru', 98, 404.0), ('Unknown', 'un', 0, 0.0), ('Unknown', 'un', 0, 0.0)) # and we want the first language record[field + "_cld2_lid_first_result"] = details[0][0] record[field + "_cld2_lid_first_result_short_code"] = details[0][1] # convert from tuple - #record[field + "_cld2_lid_details"] = [list(d) for d in details] + # record[field + "_cld2_lid_details"] = [list(d) for d in details] except cld2.error as e: logging.warning(e) except UnicodeDecodeError as e: @@ -58,7 +62,10 @@ def process(self, record_str: str) -> Iterable: self.add_cld2_outputs(record) yield json.dumps(record) -def run_pipeline(input_dir: str, output_dir: str, fields_to_lid: list, pipeline_args: list) -> None: + +def run_pipeline( + input_dir: str, output_dir: str, fields_to_lid: list, pipeline_args: list +) -> None: """ Run a beam pipeline that cleans all records within all files in input_dir :param input_dir: Directory of jsonl files to run LID on. Can be local or gcs @@ -68,17 +75,25 @@ def run_pipeline(input_dir: str, output_dir: str, fields_to_lid: list, pipeline_ :return: None """ with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p: - (p | "Read from Text" >> beam.io.ReadFromText(input_dir) + ( + p + | "Read from Text" >> beam.io.ReadFromText(input_dir) | "Run LID" >> beam.ParDo(LangId(fields_to_lid)) - | "Write to Text" >> beam.io.WriteToText(output_dir)) + | "Write to Text" >> beam.io.WriteToText(output_dir) + ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--input_dir", required=True) parser.add_argument("--output_dir", required=True) - parser.add_argument("--fields_to_lid", required=True, - help="comma-separated list of fields that should have lid run on them") + parser.add_argument( + "--fields_to_lid", + required=True, + help="comma-separated list of fields that should have lid run on them", + ) args, pipeline_args = parser.parse_known_args() - run_pipeline(args.input_dir, args.output_dir, args.fields_to_lid.split(","), pipeline_args) \ No newline at end of file + run_pipeline( + args.input_dir, args.output_dir, args.fields_to_lid.split(","), pipeline_args + ) diff --git a/utils/run_simhash.py b/utils/run_simhash.py index 95057b2..3ca56c0 100644 --- a/utils/run_simhash.py +++ b/utils/run_simhash.py @@ -4,8 +4,8 @@ import os import pickle import re - from datetime import datetime + from my_simhash import Simhash, SimhashIndex @@ -16,15 +16,22 @@ def get_features(s: str) -> list: width = 3 s = s.lower() s = re.sub(r"[^\w]+", "", s) - return [s[i:i + width] for i in range(max(len(s) - width + 1, 1))] + return [s[i : i + width] for i in range(max(len(s) - width + 1, 1))] -def write_sim_strings(data_fi: str, output_fi: str, input_index: str = None, output_index: str = None) -> None: +def write_sim_strings( + data_fi: str, output_fi: str, input_index: str = None, output_index: str = None +) -> None: """ Does the similarity matching and writes out the outputs. Basic method from from https://github.com/leonsim/simhash """ - data_ids_and_values = [line.strip().split("\t") for line in open(data_fi).readlines()] - objs = [(article_id, Simhash(get_features(article_text))) for article_id, article_text in data_ids_and_values] + data_ids_and_values = [ + line.strip().split("\t") for line in open(data_fi).readlines() + ] + objs = [ + (article_id, Simhash(get_features(article_text))) + for article_id, article_text in data_ids_and_values + ] index = None if (input_index is None) or not os.path.exists(input_index): index = SimhashIndex(objs, k=3) @@ -32,7 +39,7 @@ def write_sim_strings(data_fi: str, output_fi: str, input_index: str = None, out index = pickle.load(open(input_index, mode="rb")) for obj_id, obj in objs: index.add(obj_id, obj) - print("writing updated index to "+output_index) + print("writing updated index to " + output_index) pickle.dump(index, open(output_index, mode="wb")) with open(output_fi, mode="w") as out: @@ -66,7 +73,9 @@ def get_year_partition(input_dir: str, output_dir: str) -> list: js = json.loads(line) year = js["year"] if year not in year_to_outfi: - year_to_outfi[year] = open(os.path.join(output_dir, year+".tsv"), mode="w") + year_to_outfi[year] = open( + os.path.join(output_dir, year + ".tsv"), mode="w" + ) year_to_outfi[year].write(f"{js['id']}\t{js['normalized_text']}\n") for year in year_to_outfi: year_to_outfi[year].close() @@ -78,17 +87,35 @@ def get_year_partition(input_dir: str, output_dir: str) -> list: parser.add_argument("input_dir", help="directory of jsonl") parser.add_argument("--tmp_dir", default="simhash-tmp") parser.add_argument("--simhash_indexes", help="current simhash indexes") - parser.add_argument("--new_simhash_indexes", help="location where updated indexes should be written") - parser.add_argument("output_dir", help=("directory where output matches should be written. " - "Outputs will be in the form `year`.jsonl")) + parser.add_argument( + "--new_simhash_indexes", help="location where updated indexes should be written" + ) + parser.add_argument( + "output_dir", + help=( + "directory where output matches should be written. " + "Outputs will be in the form `year`.jsonl" + ), + ) args = parser.parse_args() years = get_year_partition(args.input_dir, args.tmp_dir) print("running simhash") day = datetime.now().strftime("%Y-%m-%d") with multiprocessing.Pool() as p: - p.starmap(write_sim_strings, - [(os.path.join(args.tmp_dir, year+".tsv"), os.path.join(args.output_dir, f"{year}_{day}.jsonl"), - None if args.simhash_indexes is None else os.path.join(args.simhash_indexes, f"{year}.pkl"), - None if args.new_simhash_indexes is None else os.path.join(args.new_simhash_indexes, f"{year}.pkl")) - for year in years]) + p.starmap( + write_sim_strings, + [ + ( + os.path.join(args.tmp_dir, year + ".tsv"), + os.path.join(args.output_dir, f"{year}_{day}.jsonl"), + None + if args.simhash_indexes is None + else os.path.join(args.simhash_indexes, f"{year}.pkl"), + None + if args.new_simhash_indexes is None + else os.path.join(args.new_simhash_indexes, f"{year}.pkl"), + ) + for year in years + ], + ) diff --git a/utils/run_simhash_scripts.sh b/utils/run_simhash_scripts.sh new file mode 100644 index 0000000..7b76579 --- /dev/null +++ b/utils/run_simhash_scripts.sh @@ -0,0 +1,7 @@ +cd /mnt/disks/data/run +gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/simhash_is_done +python3 run_simhash.py simhash_input simhash_results --simhash_indexes simhash_indexes --new_simhash_indexes new_simhash_indexes +cp -r article_pairs usable_ids +cp simhash_results/* article_pairs/ +touch simhash_is_done +gsutil cp simhash_is_done gs://airflow-data-exchange/article_linkage/tmp/done_files/ diff --git a/utils/setup.py b/utils/setup.py index 9857d5b..bb15a2a 100644 --- a/utils/setup.py +++ b/utils/setup.py @@ -1,3 +1,3 @@ import setuptools -setuptools.setup(packages=setuptools.find_packages()) \ No newline at end of file +setuptools.setup(packages=setuptools.find_packages())