-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
421 lines (376 loc) · 17.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
"""
Collection of functions to parse XML files
==> Avoid code duplication & others
"""
import xmltodict
import json
import os, sys
from collections import Counter, OrderedDict
import numpy as np
import pandas as pd
import time, datetime
import re
from bidict import bidict # bidirectional dictionary - allows for looking up key from value
### Read/Write JSON
def get_xml_as_dict(filepath:str):
with open(filepath) as in_file:
xml = in_file.read()
d = xmltodict.parse(xml)
return d
def get_json_as_dict(filepath:str):
with open(filepath) as in_file:
d = json.load(filepath)
return d
def dump_json(d:dict, filepath:str):
with open(filepath, 'w') as out_file:
json.dump(d, out_file)
### Parse XML
def parse_w(d:dict, replace_name=False):
"""
Input:
-------
d: dict
data inside a text tag (w)
replace_name: bool
whether to replace parent/child name with specific tag (default False)
Output:
-------
loc: int
word: str
is_shortened: bool
"""
kys = list(d.keys())
word = d["#text"]
lemma = ""
pos = ""
if "@untranscribed" in kys: # currently not taken into account
loc = 0
elif "mor" in kys: # @index starts at 1
loc = int(d["mor"]["gra"]["@index"]) -1
#if "mw" in d["mor"].keys():
try:
lemma = d["mor"]["mw"]["stem"]
pos = "_".join(list(d["mor"]["mw"]["pos"].values()))
except KeyError as e:
if str(e) == "'mw'": # sometimes mw is a list - compound words such as "butterfly", "raincoat"...
# in this case, mwc only contains whole pos, but mw is a list with individual pos and stem
lemma = "".join([x["stem"] for x in d["mor"]["mwc"]["mw"]])
pos = "_".join(list(d["mor"]["mwc"]["pos"].values()))
if "mor-post" in d["mor"].keys(): # can be a list too
if isinstance(d["mor"]["mor-post"], list):
lemma += " "+" ".join([mp_x["mw"]["stem"] for mp_x in d["mor"]["mor-post"]])
pos += " "+" ".join(["_".join(list(mp_x["mw"]["pos"].values())) for mp_x in d["mor"]["mor-post"]])
else: # OrderedDict
lemma += " "+d["mor"]["mor-post"]["mw"]["stem"]
pos += " "+"_".join(list(d["mor"]["mor-post"]["mw"]["pos"].values()))
elif "@type" in kys and d["@type"] == "fragment":
# TODO: see u327 # cannot be taken into account
loc = None
elif "@type" in kys and d["@type"] == "filler":
loc = None
else:
#print(d)
#raise Exception
loc = None
is_shortened = ("shortening" in kys)
return loc, word, lemma, pos, is_shortened
def missing_position(d:dict): # TODO: see u258
# min is supposed to be 0 and max is supposed to be len(d) - 1
if len(d) == 0:
return [0]
else:
mx = max(d.keys())
return sorted(list(set(range(0,mx+1)) - set(d.keys())))+[mx+1] # same as "0" above if no difference
def age_months(s:str) -> int:
"""Age stored under format: "P1Y08M" or "P1Y01M14D" (or just "P1Y"); returning age in months
Input:
-------
s: `str`
formatted age in raw data
Output:
-------
age: `int`
"""
pat = re.compile("^P([0-9]{1,2})Y([0-9]{2})M")
try:
age = re.findall(pat, s)[0]
age = int(age[0])*12 + int(age[1])
except IndexError as e:
#if "list index out of range" in str(e):
pat = re.compile("^P([0-9]{1,2})Y")
age = re.findall(pat, s)[0]
age = int(age)*12 # only 1 argument
return age
def adapt_punct(s:str) -> str:
"""Add space before punctuation group (==> tokens) if punctuation is ? ! .
"""
return re.sub(re.compile("([a-z]+)([.?!]+)"), r'\1 \2',s)
def parse_xml(d:dict):
"""
Input:
-------
d: dict
JSON data read from XML file from childes interaction
Output:
-------
new_shape: dict
JSON structure similar to Datcha JSON
lines: list of dict
main data to be written
errors: list
list of utterances generating errors (unsolved patterns with parse_w)
"""
punct = {
'p':'.', 'q':'?', 'trail off':'...', 'e': '!', 'interruption': '+/',
"interruption question":'+/?',
"quotation next line": '',
"quotation precedes": '',
"trail off question":'...?',
"comma": ',',
"broken for coding": '',
"self interruption": '-'
}
new_shape = {"header":{}, "annotation":{}, "documents":[]} # JSON
lines = []
errors = []
for k,v in d["CHAT"].items():
if k[0] == '@':
new_shape["header"][k] = v
# storing participant
for locutor in d["CHAT"]["Participants"]["participant"]:
if locutor["@id"] == "CHI":
new_shape["header"]["target_child"] = {
'name': locutor["@name"] if "@name" in locutor.keys() else "Unknown",
'age': age_months(locutor["@age"]) if "@age" in locutor.keys() else 0
}
if "@language" in locutor.keys():
new_shape["header"]['language'] = locutor["@language"]
# storing annotator
for cmt in (d["CHAT"]["comment"] if isinstance(d["CHAT"]["comment"], list) else [d["CHAT"]["comment"]]):
if cmt["@type"] == "Transcriber":
new_shape["header"]["transcriber"] = cmt["#text"]
# counter for names
n_prop = []
for utterance in d["CHAT"]["u"]:
#print(utterance["@uID"])
doc = {"by": utterance["@who"], "id": utterance["@uID"][1:], "tokens":[], "segments":{}}
# words
l_words = {}
l_lemmas = {}
l_pos = {}
ut_keys = utterance.keys()
for key in ut_keys:
if key == "w":
for w_word in (utterance["w"] if type(utterance["w"]) == list else [utterance["w"]]): # str or dict/OrderedDict transformed
if isinstance(w_word, str):
loc = 1 if (len(l_words) == 0) else (max(l_words.keys())+1)
l_words[loc] = w_word
elif isinstance(w_word, dict) or isinstance(w_word, OrderedDict):
# if the word has a location, it can replace words with _no_ location.
loc, word, lemma, pos, _ = parse_w(w_word) # is_shortened not used rn
if loc is not None:
l_words[loc] = word
l_lemmas[loc] = lemma
l_pos[loc] = pos
if pos == 'n_prop':
n_prop.append(word)
else:
errors.append(utterance["@uID"])
if key == "g":
l_g = (utterance["g"] if isinstance(utterance["g"], list) else [utterance["g"]])
for utter_g in l_g:
# no respect of order
if ("g" in utter_g.keys()): # nested g ==> take into account later
l_g += utter_g["g"] if isinstance(utter_g["g"], list) else [utter_g["g"]]
if ("w" in utter_g.keys()): # nested w
utter_gw = utter_g["w"] if isinstance(utter_g["w"], list) else [utter_g["w"]]
for w_word in utter_gw:
if isinstance(w_word, str): # TODO: check place in sentence (could be overwritten)
loc = 1 if (len(l_words) == 0) else (max(l_words.keys())+1)
l_words[loc] = w_word
else:
loc, word, lemma, pos, _ = parse_w(w_word) # is_shortened not used rn
if loc is not None:
l_words[loc] = word
l_lemmas[loc] = lemma
l_pos[loc] = pos
if pos == 'n_prop':
n_prop.append(word)
else:
errors.append(utterance["@uID"])
if key == "a": # either dict, list of non existent
for l in (utterance["a"] if type(utterance["a"]) == list else [utterance["a"]]):
if l["@type"] == "time stamp":
doc["time"] = l["#text"]
elif l["@type"] == "speech act":
# warning: l["#text"] == TAG is not necessary clean
try:
tag = l["#text"].upper().strip().replace('0', 'O').replace(';',':').replace('-',':')
tag = tag.replace('|','') # extra pipe found
except:
print("\tTag Error:", l["#text"], utterance["@uID"])
if tag[:2] == '$ ':
tag = tag[2:]
doc["segments"]["label"] = tag
elif l["@type"] == "gesture":
doc["segments"]["action"] = l["#text"]
elif l["@type"] == "action":
doc["segments"]["action"] = l["#text"]
elif l["@type"] == "actions": # same as previous :|
doc["segments"]["action"] = l["#text"]
# translations
elif l["@type"] == "english translation":
doc["segments"]["translation"] = adapt_punct(l["#text"])
if key == "t" or key == "tagMarker":
# either punctuation location is specified or is added when it appears in the sentence
pct = punct[utterance["t"]["@type"]]
if ("mor" in utterance["t"].keys()) and ("gra" in utterance["t"]["mor"].keys()) and (utterance["t"]["mor"]["gra"]["@relation"] == "PUNCT"):
loc = int(utterance["t"]["mor"]["gra"]["@index"]) -1
l_words[loc] = pct
l_lemmas[loc] = pct
else:
# TODO append to rest of the sentence
loc = 1 if (len(l_words) == 0) else (max(l_words.keys())+1)
l_words[loc] = pct
# Once the utterance has been cleared: create list of tokens
# TODO: before doing that check that all ranks are accounted for
for i,k in enumerate(sorted(list(l_words.keys()))):
doc["tokens"].append({
"id": i,
"word": l_words[k],
"lemma": None if k not in l_lemmas.keys() else l_lemmas[k],
"pos": None if k not in l_pos.keys() else l_pos[k],
#"shortened": False
})
sentence = " ".join([x["word"] for x in doc["tokens"]])
doc["segments"]["end"] = len(sentence.split(' '))
doc["segments"]["sentence"] = sentence
doc["segments"]["lemmas"] = " ".join([x["lemma"] for x in doc["tokens"] if x["lemma"] is not None])
doc["segments"]["pos"] = " ".join([x["pos"] for x in doc["tokens"] if x["pos"] is not None])
# split tags
if "label" in doc["segments"].keys():
doc["segments"]["label_int"] = select_tag(doc["segments"]["label"], keep_part='first')
doc["segments"]["label_illoc"] = select_tag(doc["segments"]["label"], keep_part='second')
doc["segments"]["label_ilcat"] = select_tag(doc["segments"]["label"], keep_part='adapt_second')
else:
doc["segments"]["label"] = None
doc["segments"]["label_int"] = None
doc["segments"]["label_illoc"] = None
doc["segments"]["label_ilcat"] = None
# add to json
new_shape['documents'].append(doc)
# add to tsv output
line = format_line(doc)
lines.append(line)
return new_shape, lines, errors, n_prop
### Tag modification
ILLOC = pd.read_csv('illocutionary_force_code.csv', sep=' ', header=0, keep_default_na=False).set_index('Code')
def select_tag(s:str, keep_part='all'):
if s[:2] == '$ ': # some tags have errors
s = s[2:]
if keep_part == 'all':
return s.strip()
# tag must start by '$'; otherwise remore space.
# split on ' ' if more than one tag - keep the first
s = s.strip().replace('$', '').split(' ')[0]
if len(s) == 5:
s = s[:3]+':'+s[3:] # a few instances in Gaeltacht of unsplitted tags
l = s.split(':')
if keep_part == 'first': # aka 'interchange'
return check_interchange(l[0])
elif keep_part == 'second': # aka 'illocutionary'
return None if len(l) <2 else check_illocutionary(l[1])
else: # keep_part == 'illocutionary_category
return None if len(l) < 2 else adapt_tag(check_illocutionary(l[1]))
def adapt_tag(s:str):
return None if s not in ILLOC.index.tolist() else ILLOC.loc[s]['Name'][:3].upper()
def check_interchange(tag:str):
int_errors={
"DJ6F":"DJF", "DCCA":"DCC", "RN":None,
'D':None, 'DJFA':"DJF", 'DCJF':"DJF", 'DNIA':"NIA",
'YY':"YYY", 'DCCC':"DCC", 'DDJF':"DJF", 'DC':"DCC", "SDS":"DSS"
}
if tag in int_errors.keys():
return int_errors[tag]
return tag
def check_illocutionary(tag:str):
il_errors={"AS":"SA", "CTP":"CT"}
if tag in il_errors.keys():
return il_errors[tag]
return tag
def dataset_labels(dataname:str, add_empty_labels:bool=False) -> bidict:
"""For a given tag, return all possible labels; order will be used to index labels in data
Input:
-------
dataname: `str`
column name, must be in `SPA_1`, `SPA_2`, `SPA_2A`
Output:
-------
b: `bidict`
dictionary `{label: index}` to be used to transform data
"""
if dataname == "SPA_1":
labels = ['DRP','DSS','NCS','NFA','NIN','NMA','PRO','PSS','SAT','TXT','OOO','YYY','MRK','NIA','CMO','DCA','DFW','DHA','DHS','DRE','DCC','DJF','DNP']
elif dataname == "SPA_2":
labels = [ "AC", "AD", "AL", "CL", "CS", "DR", "GI", "GR", "RD", "RP", "RQ", "SS", "WD", "CX", "EA", "EI", "EC", "EX", "RT", "SC", "FP", "PA", "PD", "PF", "SI", "TD", "DC", "DP", "ND", "YD", "CM", "EM", "EN", "ES", "MK", "TO", "XA", "AP", "CN", "DW", "ST", "WS", "AQ", "AA", "AN", "EQ", "NA", "QA", "QN", "RA", "SA", "TA", "TQ", "YQ", "YA", "PR", "TX", "AB", "CR", "DS", "ED", "ET", "PM", "RR", "CT", "YY", "OO"]
elif dataname == "SPA_2A":
labels = ['DIR', 'SPE', 'VOC', 'STA', 'QUE', 'TEX', 'MAR', 'PER', 'COM', 'EVA', 'DEC', 'DEM']
else:
raise RuntimeError(f"Unknown indexer: {dataname}")
if add_empty_labels:
labels.append("NOL") # No label for this sentence
labels.append("NAT") # Not a valid tag
labels.append("NEE") # Not enough examples
return bidict({label:i for i,label in enumerate(labels)})
def check_tag_pattern(data_path:str) -> str:
"""Deduce tag from dataset name.
"""
j = re.compile('spa_[0-9]{1}[a]{0,1}')
pat = re.findall(j, data_path)
if len(pat) > 0:
return pat[0]
return None
#### name_change
def replace_pnoun(word):
parents = ['Mommy', 'Mom', 'Daddy', 'Mama', 'Momma', 'Ma', 'Mummy', 'Papa']
children = ['Sarah', 'Bryce', 'James', 'Colin', 'Liam', 'Christina', 'Elena', 'Christopher', 'Matthew', 'Margaret', 'Corrina', 'Michael', 'Erin', 'Kate', 'Zachary', 'Andrew', 'John', 'David', 'Jamie', 'Erica', 'Nathan', 'Max', 'Abigail', 'Sara', 'Jenessa', 'Benjamin', 'Rory', 'Amanda', 'Alexandra', 'Daniel', 'Norman', 'Lindsay', 'Rachel', 'Paula', 'Zackary', 'Kristen', 'Joanna', 'Laura', 'Meghan', 'Krystal', 'Elana', 'Anne', 'Elizabeth', 'Chi', 'Corinna', 'Eleanora', 'John', 'Laurie'] # firstnames - full
children += ['Maggie', 'Zack', 'Brycie', 'Chrissie', 'Zach', 'Annie', 'El', 'Dan', 'Matt', 'Matty', 'Johnny', 'Mika', 'Elly', 'Micha', 'Mikey', 'Mickey', 'Chrissy', 'Chris', 'Abbie', 'Lexy', 'Meg', 'Andy', 'Liz', 'Mike', 'Abby', 'Danny', 'Col', 'Kryst', 'Ben'] # nicknames
if word in parents:
return '__MOT__'
if word in children:
return '__CHI__'
return word
#### different line formats
def update_line_format(line_format):
if isinstance(line_format, str) and line_format == "default_daad":
line_format = ["spa_all", "utterance", "time_stamp", "speaker", "sentence"]
elif isinstance(line_format, str) and line_format == "extended_daad":
line_format = ["spa_all", "utterance", "time_stamp", "speaker", "sentence", "lemmas", "pos", "not_continuous"]
elif isinstance(line_format, str):
raise ValueError("line_format should be str with value 'default_daad' or 'extended_daad' or list")
elif isinstance(line_format, list):
return line_format
else: # other formats
raise ValueError("line_format should be str with value 'default_daad' or 'extended_daad' or list")
def format_line(document):
"""
Input:
-------
document: dict
JSON data for the given line
"""
locations = {
"utterance": document["id"],
"spa_all": document["segments"]["label"],
"spa_1": document["segments"]["label_int"],
"spa_2": document["segments"]["label_illoc"],
"spa_2a": document["segments"]["label_ilcat"],
"time_stamp": None if 'time' not in document.keys() else document["time"],
"speaker": document["by"],
"sentence": document["segments"]["sentence"],
"lemmas": document["segments"]["lemmas"],
"pos": document["segments"]["pos"],
"translation": None if 'translation' not in document["segments"].keys() else document["segments"]["translation"],
"action": None if 'action' not in document["segments"].keys() else document["segments"]["action"]
}
return locations