diff --git a/linkage_dag.py b/linkage_dag.py index 3335d0d..8c17a6b 100644 --- a/linkage_dag.py +++ b/linkage_dag.py @@ -335,6 +335,12 @@ destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/unlink/data*.jsonl", export_format="NEWLINE_DELIMITED_JSON", ), + BigQueryToGCSOperator( + task_id="export_ids_to_drop", + source_project_dataset_table=f"{staging_dataset}.ids_to_drop", + destination_cloud_storage_uris=f"gs://{bucket}/{tmp_dir}/ids_to_drop/data*.jsonl", + export_format="NEWLINE_DELIMITED_JSON", + ), ] # Start up godzilla of article linkage, update simhash indexes of title+abstract, run simhash, then create the @@ -362,6 +368,7 @@ f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_indexes .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{gcs_folder}/simhash_results .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/unlink .", + f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/ids_to_drop .", f"/snap/bin/gsutil -m cp -r gs://{bucket}/{tmp_dir}/prev_id_mapping .", "mkdir new_simhash_indexes", ] diff --git a/sequences/merge_combined_metadata.tsv b/sequences/merge_combined_metadata.tsv index f3a2d5f..1bec41c 100644 --- a/sequences/merge_combined_metadata.tsv +++ b/sequences/merge_combined_metadata.tsv @@ -3,3 +3,4 @@ metadata_match all_match_pairs_with_um simhash_input lid_input +ids_to_drop \ No newline at end of file diff --git a/sql/ids_to_drop.sql b/sql/ids_to_drop.sql new file mode 100644 index 0000000..235308e --- /dev/null +++ b/sql/ids_to_drop.sql @@ -0,0 +1,6 @@ +select distinct + merged_id +from + literature.sources +where + orig_id in (select id1 from staging_literature.unlink) \ No newline at end of file diff --git a/tests/static/test_create_match_keys/ids_to_drop/data.jsonl b/tests/static/test_create_match_keys/ids_to_drop/data.jsonl new file mode 100644 index 0000000..bae7d7e --- /dev/null +++ b/tests/static/test_create_match_keys/ids_to_drop/data.jsonl @@ -0,0 +1 @@ +{"merged_id": "carticle_0000000003"} \ No newline at end of file diff --git a/tests/static/test_create_match_keys/input/input.jsonl b/tests/static/test_create_match_keys/input/input.jsonl index bd95067..b2effcf 100644 --- a/tests/static/test_create_match_keys/input/input.jsonl +++ b/tests/static/test_create_match_keys/input/input.jsonl @@ -3,3 +3,5 @@ {"orig_id": "D", "merged_id": "carticle_0000000002"} {"orig_id": "E", "merged_id": "carticle_0000000002"} {"orig_id": "F", "merged_id": "carticle_0000000001"} +{"orig_id": "I", "merged_id": "carticle_0000000003"} +{"orig_id": "J", "merged_id": "carticle_0000000003"} \ No newline at end of file diff --git a/tests/test_create_merge_ids.py b/tests/test_create_merge_ids.py index ef6fefc..ee4b8cd 100644 --- a/tests/test_create_merge_ids.py +++ b/tests/test_create_merge_ids.py @@ -75,26 +75,32 @@ def test_skip_matches(self): ) def test_create_match_keys(self): - # the first set will contain two old elts from the same match set and one new elt; should keep its id - # the next will contain one elt from one match set, two from another; should change ids - # the last will contain only new ids; should get a new id - match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}] + # The first set (A, B, C) contains two old elts from the same match set and one new elt; should keep its id. + # The next (D, E, F) contains one elt from one match set, two from another; should change ids. + # Another (G, H) contains only new ids; should get a new id. + # The last two (I and J) are two different match sets that share an old id and are in ids_to_drop; + # each should get a new id (this is in case of unlinking). + match_sets = [{"A", "B", "C"}, {"D", "E", "F"}, {"G", "H"}, {"I"}, {"J"}] out_dir = os.path.join(static_dir, "test_create_match_keys", "output") if os.path.exists(out_dir): shutil.rmtree(out_dir) os.mkdir(out_dir) out_fi = os.path.join(out_dir, "output.jsonl") id_mapping_dir = os.path.join(static_dir, "test_create_match_keys", "input") + ids_to_drop = os.path.join(static_dir, "test_create_match_keys", "ids_to_drop") expected_output = [ {"orig_id": "A", "merged_id": "carticle_0000000001"}, {"orig_id": "B", "merged_id": "carticle_0000000001"}, {"orig_id": "C", "merged_id": "carticle_0000000001"}, - {"orig_id": "D", "merged_id": "carticle_0000000003"}, - {"orig_id": "E", "merged_id": "carticle_0000000003"}, - {"orig_id": "F", "merged_id": "carticle_0000000003"}, - {"orig_id": "G", "merged_id": "carticle_0000000004"}, - {"orig_id": "H", "merged_id": "carticle_0000000004"}, + {"orig_id": "D", "merged_id": "carticle_0000000004"}, + {"orig_id": "E", "merged_id": "carticle_0000000004"}, + {"orig_id": "F", "merged_id": "carticle_0000000004"}, + {"orig_id": "G", "merged_id": "carticle_0000000005"}, + {"orig_id": "H", "merged_id": "carticle_0000000005"}, + {"orig_id": "I", "merged_id": "carticle_0000000006"}, + {"orig_id": "J", "merged_id": "carticle_0000000007"}, ] - create_match_keys(match_sets, out_fi, id_mapping_dir) + print(expected_output) + create_match_keys(match_sets, out_fi, ids_to_drop, id_mapping_dir) out = [json.loads(x) for x in open(out_fi).readlines()] self.assertEqual(expected_output, sorted(out, key=lambda x: x["orig_id"])) diff --git a/utils/create_merge_ids.py b/utils/create_merge_ids.py index 77c2dc6..1680873 100644 --- a/utils/create_merge_ids.py +++ b/utils/create_merge_ids.py @@ -128,13 +128,14 @@ def create_match_sets( def create_match_keys( - match_sets: list, match_file: str, prev_id_mapping_dir: str = None + match_sets: list, match_file: str, ids_to_drop: str, prev_id_mapping_dir: str = None ): """ Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match set to that id :param match_sets: list of match sets :param match_file: file where id mapping should be written + :param ids_to_drop: directory containing merged ids that should not be used in jsonl form :param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form :return: None """ @@ -152,6 +153,12 @@ def create_match_keys( prev_orig_to_merg[orig_id] = merg_id if merg_id > max_merg: max_merg = merg_id + ignore_ids = set() + for fi in os.listdir(ids_to_drop): + with open(os.path.join(ids_to_drop, fi)) as f: + for line in f: + js = json.loads(line.strip()) + ignore_ids.add(js["merged_id"]) match_id = int(max_merg.split("carticle_")[1]) + 1 num_new, num_old = 0, 0 for ms in match_sets: @@ -162,7 +169,7 @@ def create_match_keys( existing_ids = set( [prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg] ) - if len(existing_ids) == 1: + if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids: cset_article_id = existing_ids.pop() num_old += 1 else: @@ -189,6 +196,11 @@ def create_match_keys( required=True, help="directory of article pairs that should not be matched", ) + parser.add_argument( + "--ids_to_drop", + required=True, + help="file containing ids that should not be used", + ) parser.add_argument( "--merge_file", required=True, help="file where merged ids should be written" ) @@ -209,4 +221,4 @@ def create_match_keys( match_sets = create_match_sets( args.match_dir, args.current_ids_dir, args.exclude_dir ) - create_match_keys(match_sets, args.merge_file, args.prev_id_mapping_dir) + create_match_keys(match_sets, args.merge_file, args.ids_to_drop, args.prev_id_mapping_dir) diff --git a/utils/run_ids_scripts.sh b/utils/run_ids_scripts.sh index 0660c5c..59a0db8 100644 --- a/utils/run_ids_scripts.sh +++ b/utils/run_ids_scripts.sh @@ -1,6 +1,6 @@ cd /mnt/disks/data/run gsutil rm gs://airflow-data-exchange/article_linkage/tmp/done_files/ids_are_done -python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs +python3 create_merge_ids.py --match_dir usable_ids --exclude_dir unlink --ids_to_drop ids_to_drop --prev_id_mapping_dir prev_id_mapping --merge_file id_mapping.jsonl --current_ids_dir article_pairs /snap/bin/gsutil -m cp id_mapping.jsonl gs://airflow-data-exchange/article_linkage/tmp/ /snap/bin/gsutil -m cp simhash_results/* gs://airflow-data-exchange/article_linkage/simhash_results/ /snap/bin/gsutil -m cp new_simhash_indexes/* gs://airflow-data-exchange/article_linkage/simhash_indexes/