-
Notifications
You must be signed in to change notification settings - Fork 1
/
create_merge_ids.py
247 lines (229 loc) · 9.65 KB
/
create_merge_ids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
Creates match sets from pairs of linked articles, assigns each match set an id, and writes out a mapping from
each id to each article in its match set.
"""
import argparse
import json
import multiprocessing
import os
def create_cset_article_id(idx: int):
"""
Create CSET article id, e.g. carticle_0000000001
:param idx: article number
:return: string in the form carticle_0000000001
"""
zero_padding = "0" * (10 - len(str(idx)))
return f"carticle_{zero_padding}{idx}"
def get_connected_articles(adj_list: dict, key: str) -> set:
"""
Given a dict where a key-value pair corresponds to an article match and a particular article `key`,
returns a set of articles matched to `key`.
:param adj_list: a dict of key-value pairs corresponding to matched articles
:param key: an article to match in `adj_list`
:return: a set of matched articles
"""
conn_articles = {key}
to_explore = adj_list[key]
while len(to_explore) > 0:
v = to_explore.pop()
if v not in conn_articles:
conn_articles.add(v)
to_explore = to_explore.union(
{k for k in adj_list[v] if k not in conn_articles}
)
return conn_articles
def get_exclude_matches(exclude_dir: str) -> dict:
"""
Build dict mapping ids to sets of other ids they should not be matched to
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: dict mapping an id to a set of ids that are not valid matches
"""
dont_match = {}
if not exclude_dir:
return dont_match
for fi in os.listdir(exclude_dir):
with open(os.path.join(exclude_dir, fi)) as f:
for line in f:
js = json.loads(line)
if js["id1"] not in dont_match:
dont_match[js["id1"]] = set()
if js["id2"] not in dont_match:
dont_match[js["id2"]] = set()
dont_match[js["id1"]].add(js["id2"])
dont_match[js["id2"]].add(js["id1"])
return dont_match
def create_match_sets(match_dir: str, exclude_dir: str = None) -> list:
"""
Given a directory of exported jsonl files containing article matches, generates a list of sets of matched articles,
including "transitive matches".
:param match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param exclude_dir: directory of jsonl files containing article pairs that should not be matched together
:return: list of sets of matched articles
"""
print("reading pairs to not match")
dont_match = get_exclude_matches(exclude_dir)
print("getting adjacency lists")
adj_list = {}
for fi in os.listdir(match_dir):
with open(os.path.join(match_dir, fi)) as f:
for line in f:
js = json.loads(line)
key1 = js["id1"]
key2 = js["id2"]
if key1 not in adj_list:
adj_list[key1] = set()
if key2 not in dont_match.get(key1, set()):
adj_list[key1].add(key2)
# even if we're in a scenario where (according to a changed metric) A matches B but B doesn't match A,
# this will ensure they get added to the same match set
if key2 not in adj_list:
adj_list[key2] = set()
if key1 not in dont_match.get(key2, set()):
adj_list[key2].add(key1)
print("getting connected articles")
seen_ids = set()
match_sets = []
for k in adj_list.keys():
if k in seen_ids:
continue
# grab every connected article
match_set = get_connected_articles(adj_list, k)
for matched_key in match_set:
seen_ids.add(matched_key)
match_sets.append(match_set)
return match_sets
def create_matches(
match_sets: list, ids_to_drop: str, prev_id_mapping_dir: str = None
) -> iter:
"""
Given a match set, creates an id for that match set, and writes out a jsonl mapping each article in the match
set to that id
:param match_sets: list of match sets
:param match_file: file where id mapping should be written
:param ids_to_drop: directory containing merged ids that should not be used in jsonl form
:param prev_id_mapping_dir: optional dir containing previous id mappings in jsonl form
:return: a generator of tuples with two elements: a list of jsons containing orig_id, merged_id matches to be
written, and an identifier for the batch
"""
prev_orig_to_merg = {}
merg_to_orig = {}
max_merg = "carticle_0"
if prev_id_mapping_dir is not None:
for fi in os.listdir(prev_id_mapping_dir):
with open(os.path.join(prev_id_mapping_dir, fi)) as f:
for line in f:
js = json.loads(line.strip())
orig_id = js["orig_id"]
merg_id = js["merged_id"]
assert orig_id not in prev_orig_to_merg
prev_orig_to_merg[orig_id] = merg_id
if merg_id not in merg_to_orig:
merg_to_orig[merg_id] = set()
merg_to_orig[merg_id].add(orig_id)
if merg_id > max_merg:
max_merg = merg_id
ignore_ids = set()
for fi in os.listdir(ids_to_drop):
with open(os.path.join(ids_to_drop, fi)) as f:
for line in f:
js = json.loads(line.strip())
ignore_ids.add(js["merged_id"])
match_id = int(max_merg.split("carticle_")[1]) + 1
batch_size = 1_000_000
batch_count = 0
batch = []
for ms in match_sets:
cset_article_id = None
# if we have exactly one existing id, reuse it, even if new articles are matched to it.
# if two articles that previously had different carticle ids are now in the same match set,
# create a new carticle id
existing_ids = set([prev_orig_to_merg[m] for m in ms if m in prev_orig_to_merg])
if len(existing_ids) == 1 and list(existing_ids)[0] not in ignore_ids:
cset_article_id = existing_ids.pop()
# In some cases, merged ids can "split apart", if their constituent articles no longer
# match. We'll detect this case by checking whether the old set of articles assigned to
# this merged id contain any entries missing from our current set
if (cset_article_id and (len(merg_to_orig[cset_article_id] - set(ms)) > 0)) or (
not cset_article_id
):
cset_article_id = create_cset_article_id(match_id)
match_id += 1
for article in ms:
match = {"merged_id": cset_article_id, "orig_id": article}
if len(batch) == batch_size:
yield batch, batch_count
batch = [match]
batch_count += 1
else:
batch.append(match)
yield batch, batch_count
def write_batch(match_batch_with_output_dir: tuple) -> None:
"""
Write a batch of matches to disk
:param match_batch_with_output_dir: tuple of (a tuple containing a list of jsons containing a merged id and orig
id, and an identifier for the batch), and a directory where matches should be written
:return: None
"""
match_batch, output_dir = match_batch_with_output_dir
matches, batch_id = match_batch
with open(os.path.join(output_dir, f"matches_{batch_id}.jsonl"), "w") as f:
for match in matches:
f.write(json.dumps(match) + "\n")
def write_matches(
exact_match_dir,
exclude_dir,
ids_to_drop,
prev_id_mapping_dir,
output_dir,
) -> None:
"""
Generate merged id-orig id pairs and write them out as a directory of jsonls
:param exact_match_dir: directory of jsonls containing matched orig_ids from exact metadata match
:param exclude_dir: directory of article pairs that should not be matched
:param ids_to_drop: file containing ids that should not be used
:param prev_id_mapping_dir: directory of jsonl containing previous mapping between orig ids and merged ids
:param output_dir: directory where jsonls containing new mappings between orig ids and merged ids should be written
:return: None
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with multiprocessing.Pool() as p:
match_sets = create_match_sets(exact_match_dir, exclude_dir)
match_batches = create_matches(match_sets, ids_to_drop, prev_id_mapping_dir)
output_batches = ((mb, output_dir) for mb in match_batches)
list(p.imap(write_batch, output_batches))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--exact_match_dir",
required=True,
help="directory of jsonls containing matched orig_ids from exact metadata match",
)
parser.add_argument(
"--exclude_dir",
required=True,
help="directory of article pairs that should not be matched",
)
parser.add_argument(
"--ids_to_drop",
required=True,
help="file containing ids that should not be used",
)
parser.add_argument(
"--prev_id_mapping_dir",
help="directory of jsonl containing previous mapping between orig ids and merged ids",
)
parser.add_argument(
"--output_dir",
required=True,
help="directory where jsonls containing new mappings between orig ids and "
"merged ids should be written",
)
args = parser.parse_args()
write_matches(
args.exact_match_dir,
args.exclude_dir,
args.ids_to_drop,
args.prev_id_mapping_dir,
args.output_dir,
)