forked from p2/ClinicalTrialsNLP
-
Notifications
You must be signed in to change notification settings - Fork 8
/
nlp.py
206 lines (158 loc) · 5.74 KB
/
nlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
#
# cTAKES and RegEx wizardry
#
# 2012-12-14 Created by Pascal Pfiffner
#
import os
import re
import logging
class NLPProcessing (object):
""" Abstract base class for handling NLP pipelines. """
# print('\n(nlp.py) Initializing NLP w/ object:', object, '\n')
def __init__(self):
# print('\n(nlp.py) Setting definitions for self')
self.name = 'nlp'
self.bin = '.'
self.root = None
self.cleanup = True
self.did_prepare = False
# print('(nlp.py) Definitions set as:', '\n(nlp.py) Self Name:', self.name, '\n(nlp.py) Self bin:', self.bin, '\n(nlp.py) Self root:',
# self.root, '\n(nlp.py) Self cleanup:', self.cleanup, '\n(nlp.py) Self did prepare:', self.did_prepare, '\n')
# -------------------------------------------------------------------------- Preparations
def set_relative_root(self, directory):
self.root = os.path.abspath(directory if directory is not None else '.')
def prepare(self):
""" Performs steps necessary to setup the pipeline, such as creating
input and output directories or pipes. """
# print('Preparations started w/:\n','Root =', self.root, '\n')
self._prepare()
self.did_prepare = True
def _prepare(self):
if self.root is None:
raise Exception("No root directory defined for NLP process %s" % self.name)
if not os.path.exists(self.root):
os.mkdir(self.root)
self._create_directories_if_needed()
if not os.path.exists(self.root):
raise Exception(
"Failed to create root directory for NLP process %s" % self.name)
def _create_directories_if_needed(self):
""" Override to create directories needed to run the pipeline. """
pass
# -------------------------------------------------------------------------- Running
def run(self):
""" Runs the NLP pipeline, raises an exception on error. """
if not self.did_prepare:
self.prepare()
self._run()
def _run(self):
""" Internal use, subclasses should override this method since it is
called after necessary preparation has been performed. """
raise Exception("Cannot run an abstract NLP pipeline class instance")
def write_input(self, text, filename):
if not self.did_prepare:
self.prepare()
return self._write_input(text, filename)
def _write_input(self, text, filename):
return False
def parse_output(self, filename, **kwargs):
if not self.did_prepare:
self.prepare()
return self._parse_output(filename, **kwargs)
def _parse_output(self, filename, **kwargs):
""" return a dictionary (or None) like:
{ 'snomed': [1, 2, 2], 'rxnorm': [4, 5, 6] }
"""
return None
# ------------------------------------------------------------------------------ Helper Functions
def split_inclusion_exclusion(string):
""" Returns a tuple of lists describing inclusion and exclusion criteria.
"""
if not string or len(string)< 1:
raise Exception('No string given')
# split on newlines
rows = re.compile(r'(?:\n\s*){2,}').split(string)
# loop all rows
missed = []
inc = []
exc = []
at_inc = False
at_exc = False
for string in rows:
if len(string) < 1 or 'none' == string:
continue
clean = re.sub(r'[\n\s]+', ' ', string).strip()
# detect switching to inclusion criteria
# exclusion criteria sometimes say "None if patients fulfill inclusion
# criteria.", try to avoid detecting that as header!
if re.search(r'^[^\w]*inclusion criteria', clean, re.IGNORECASE) is not None \
and re.search(r'exclusion', clean, re.IGNORECASE) is None:
at_inc = True
at_exc = False
# detect switching to exclusion criteria
elif re.search(r'exclusion criteria', clean, re.IGNORECASE) is not None \
and re.search(r'inclusion', clean, re.IGNORECASE) is None:
at_inc = False
at_exc = True
# assign accordingly
elif at_inc:
inc.append(clean)
elif at_exc:
exc.append(clean)
else:
missed.append(clean)
# if there was no inclusion/exclusion split, we assume the text describes inclusion criteria
if len(inc) < 1 or len(exc) < 1:
logging.debug(
"No explicit separation of inclusion/exclusion criteria found, assuming the text to describe inclusion criteria")
inc.extend(missed)
exc = []
return (inc, exc)
def list_to_sentences(string):
""" Splits text at newlines and puts it back together after stripping new-
lines and enumeration symbols, joined by a period.
"""
if string is None:
return None
lines = string.splitlines()
curr = ''
processed = []
for line in lines:
stripped = line.strip()
# empty line
if 0 == len(stripped):
if curr:
processed.append(re.sub(r'\.\s*$', '', curr))
curr = ''
# beginning a new fragment
elif not curr or 0 == len(curr):
curr = re.sub(r'^[-\d\.\(\)]+\s*', '', stripped)
# new line item? true when it starts with "-", "1." or "1)" (with
# optional dash) or if the indent level is less than before (simple
# whitespace count) (NO LONGER IMPLEMENTED)
elif re.match(r'^-\s+', stripped) \
or re.match(r'^\d+\.\s+', stripped) \
or re.match(r'^(-\s*)?\d+\)\s+', stripped):
if curr:
processed.append(re.sub(r'\.\s*$', '', curr))
curr = re.sub(r'^(-|(\d+\.)|((-\s*)?\d+\)))\s*', '', stripped)
# append to previous fragment
else:
curr = '%s %s' % (curr, stripped)
if curr:
processed.append(re.sub(r'\.\s*$', '', curr))
sentences = '. '.join(processed) if len(processed) > 0 else ''
if len(sentences) > 0:
sentences += '.'
return sentences
def list_trim(string):
""" Trim text phases that are part of the string because the string was
pulled off of a list, e.g. a leading "-" or "1."
"""
string.strip()
string = re.sub('\s+', ' ', string) # multi-whitespace
string = re.sub('^-\s+', '', string, count=1) # leading "-"
string = re.sub('^\d+\.\s+', '', string, count=1) # leading "1."
string = re.sub('^(-\s*)?\d+\)\s+', '', string, count=1) # leading "1)" with optional dash
return string