-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgira_cde.py
369 lines (317 loc) · 19.6 KB
/
gira_cde.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
import logging, logging.handlers
from configparser import ConfigParser
from datetime import date, datetime
from pprint import pprint
import redcap_enums as renums
from redcap_cde import RedcapCDE
from r4_gira_clin_var import R4GiraClinVar
import omop_cde
from emailer import Emailer
from errorhandler import ErrorHandler
if __name__ == "__main__":
try:
# Setup logging
error_handler = ErrorHandler(logging.WARNING)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
fh = logging.handlers.RotatingFileHandler('gira_cde.log', maxBytes=10000000, backupCount=10)
fh.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s:%(levelname)s:%(name)s: %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('===============================================')
logger.info('Begin GIRA CDE script')
# Read config file
parser = ConfigParser(inline_comment_prefixes=['#'])
parser.read('./config.ini')
# Development environment configuraiton
DEVELOPMENT = parser.getboolean('GENERAL', 'DEVELOPMENT')
# REDCap
redcap_api_endpoint = parser.get('REDCAP', 'LOCAL_REDCAP_URL')
redcap_api_token = parser.get('REDCAP', 'LOCAL_REDCAP_API_KEY')
# R4
r4_api_endpoint = parser.get('R4', 'R4_URL')
r4_api_token = parser.get('R4', 'R4_API_KEY')
# Email
email_host = parser.get('EMAIL', 'SMTP_HOST')
email_port = parser.get('EMAIL', 'SMTP_PORT')
email_from = parser.get('EMAIL', 'FROM_ADDR')
email_to = parser.get('EMAIL', 'TO_ADDRS')
email_to = [e.strip() for e in email_to.split(';') if e.strip()] # split emails by ; and get rid of empty
# OMOP
omop_instance = parser.get('OMOP', 'DB_NAME')
logger.info(f'OMOP instance: {omop_instance}')
# Emailer to notify dev of failures
emailer = Emailer(email_host, email_port, email_from, email_to)
# Redcap configuration
redcap_cde = RedcapCDE(redcap_api_endpoint, redcap_api_token)
# R4 configuration
r4 = R4GiraClinVar(r4_api_endpoint, r4_api_token)
# While we're developing the script, force double check of which projects we're working on
CHECK_BEFORE_RUNNING = DEVELOPMENT
redcap_project_title = redcap_cde.export_project_info()['project_title']
if CHECK_BEFORE_RUNNING and (input(f'Working on redcap project: {redcap_project_title}. Enter the project title to continue:\n') != redcap_project_title):
print('Exiting')
exit()
r4_project_title = r4.export_project_info()['project_title']
if CHECK_BEFORE_RUNNING and (input(f'Working on R4 project: {r4_project_title}. Enter the project title to continue:\n') != r4_project_title):
print('Exiting')
exit()
timestamp = datetime.now().isoformat()
# Get participants still needing CDE
participant_info = redcap_cde.get_participants_needing_cde(omop_instance)
if not participant_info:
logger.info('No participants need CDE')
else:
logger.info(f'{len(participant_info)} participants have been collected for CDE')
# Currently in development. Show what information has been collecetd and verify before continuing to send data out
if CHECK_BEFORE_RUNNING:
pprint(participant_info)
if input('Enter "yes" to continue: ') != 'yes':
logger.debug('Exiting script prior to performing CDE.')
exit()
# OMOP connection
logger.debug('Connecting to OMOP SQL Server')
sql_engine = omop_cde.sql_server_engine()
omop = omop_cde.OmopExtractor(sql_engine.connect())
for p in participant_info:
record_id = p[RedcapCDE.FIELD_RECORD_ID]
dob_str = p[RedcapCDE.FIELD_DOB]
dob_date = date.fromisoformat(dob_str)
mrn = p[RedcapCDE.FIELD_MRN]
cde_status = p[RedcapCDE.FIELD_GIRA_CDE_STATUS]
cde_script_message = p[RedcapCDE.FIELD_GIRA_CDE_SCRIPT_OUTPUT]
logger.info(f'Starting CDE. CUIMC ID:{record_id}, MRN:{mrn}, DOB:{dob_str}')
# If there was a previous script message, add a visual separator for new script message
if cde_script_message:
cde_script_message += '===============================\n'
# Initiate cde_result and keep track of queried information
cde_result = {
RedcapCDE.FIELD_RECORD_ID: record_id,
RedcapCDE.FIELD_GIRA_CDE_MRN_QUERIED: mrn,
RedcapCDE.FIELD_GIRA_CDE_DOB_QUERIED: dob_str,
RedcapCDE.FIELD_GIRA_CDE_OMOP_INSTANCE: omop_instance
}
cde_script_message += f"{timestamp}\nmrn:{mrn}\ndob:{dob_str}\ndatabase:{omop_instance}\n"
# Find the patient info in OMOP
match: omop_cde.MrnMatch = None
match, dob_sim = omop.find_closest_patient(mrn, dob_date)
if not match:
# No matching MRNs were found.
if cde_status == RedcapCDE.GiraCdeStatus.MRN_NOT_FOUND_PROCEED.value:
# Approved to proceed filling in participant's clinical variables with missing values
msg = 'MRN was not found in OMOP database, but approved to proceed with missing values'
logger.info(msg)
cde_script_message += f'{msg}\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_SCRIPT_OUTPUT] = cde_script_message
cde_result[RedcapCDE.FIELD_EHR_PARTICIPANT_FIRST_NAME_LOCAL] = p[RedcapCDE.FIELD_NAME_FIRST]
cde_result[RedcapCDE.FIELD_EHR_PARTICIPANT_LAST_NAME_LOCAL] = p[RedcapCDE.FIELD_NAME_LAST]
cde_result[RedcapCDE.FIELD_EHR_DATE_OF_BIRTH_LOCAL] = RedcapCDE.MISSING_DATE
RedcapCDE.fill_missing_values(cde_result)
cde_result[RedcapCDE.FIELD_GIRA_CDE_STATUS] = RedcapCDE.GiraCdeStatus.COMPLETED.value
cde_result[RedcapCDE.FIELD_GIRA_CDE_REVIEW_STATUS] = RedcapCDE.GiraReviewStatus.NOT_NEEDED.value
cde_result[RedcapCDE.FIELD_GIRA_CLINICAL_VARIABLES_LOCAL_COMPLETE] = renums.Complete.INCOMPLETE.value
redcap_cde.update_gira_cde(cde_result)
else:
msg = 'MRN was not found in OMOP database'
logger.info(msg)
cde_script_message += f'{msg}\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_SCRIPT_OUTPUT] = cde_script_message
cde_result[RedcapCDE.FIELD_GIRA_CDE_STATUS] = RedcapCDE.GiraCdeStatus.MRN_NOT_FOUND.value
redcap_cde.update_gira_cde(cde_result)
continue
# Check DOB
cde_result[RedcapCDE.FIELD_EHR_DATE_OF_BIRTH_LOCAL] = match.dob.isoformat()
if dob_sim != 1:
# DOB in REDCap does not match EHR
if cde_status == RedcapCDE.GiraCdeStatus.BDMM_PROCEED.value:
# Instructed to proceed with CDE despite DOB mismatch
msg = 'DOB mismatched, but instructed to proceed'
logger.info(msg)
cde_script_message += f"{msg}\n"
else:
msg = 'DOB found in OMOP does not match with REDCap. Review needed.'
logger.info(msg)
cde_script_message += f'{msg}\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_SCRIPT_OUTPUT] = cde_script_message
cde_result[RedcapCDE.FIELD_GIRA_CDE_STATUS] = RedcapCDE.GiraCdeStatus.BDMM_REVIEW_NEEDED.value
redcap_cde.update_gira_cde(cde_result)
continue
# OMOP doesn't have names. Copy from REDCap
cde_result[RedcapCDE.FIELD_EHR_PARTICIPANT_FIRST_NAME_LOCAL] = p[RedcapCDE.FIELD_NAME_FIRST]
cde_result[RedcapCDE.FIELD_EHR_PARTICIPANT_LAST_NAME_LOCAL] = p[RedcapCDE.FIELD_NAME_LAST]
msg = f'Extracting for OMOP person_id: {match.omop_person_id}'
logger.info(msg)
cde_script_message += f'{msg}\n'
# Allergies
allergies_count = omop.count_positive_allergy_tests(match.omop_person_id)
cde_result[RedcapCDE.FIELD_COUNT_POSITIVE_ALLERGY_LOCAL] = allergies_count
cde_result[RedcapCDE.FIELD_ALLERGY_TEST_FLAG_LOCAL] = 1 if allergies_count >= 2 else 0
# Numerical labs
needs_review = list()
dbp, sbp, hdl, cho, tri, a1c = omop.extract_gira_labs(match.omop_person_id)
# systolic blood pressure
if sbp is not None:
sbp_value = sbp.value_as_number
cde_result[RedcapCDE.FIELD_SBP_LAB_NAME_LOCAL] = sbp.concept_name
cde_result[RedcapCDE.FIELD_SBP_DATE_AT_EVENT_LOCAL] = sbp.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_SBP_MEASUREMENT_CONCEPT_ID_LOCAL] = str(sbp.measurement_concept_id)
cde_result[RedcapCDE.FIELD_SBP_VALUE_MOST_RECENT_LOCAL] = sbp_value
if sbp_value < 60 or sbp_value > 240:
needs_review.append('SBP [60-240]')
else:
RedcapCDE.fill_missing_sbp(cde_result)
# diastolic blood pressure
if dbp is not None:
dbp_value = dbp.value_as_number
cde_result[RedcapCDE.FIELD_DBP_LAB_NAME_LOCAL] = dbp.concept_name
cde_result[RedcapCDE.FIELD_DBP_DATE_AT_EVENT_LOCAL] = dbp.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_DBP_MEASUREMENT_CONCEPT_ID_LOCAL] = str(dbp.measurement_concept_id)
cde_result[RedcapCDE.FIELD_DBP_VALUE_MOST_RECENT_LOCAL] = dbp_value
if dbp_value < 0 or (sbp is not None and (dbp_value >= sbp_value)):
needs_review.append('DBP [0-SBP]')
else:
RedcapCDE.fill_missing_dbp(cde_result)
# HDL
if hdl is not None:
hdl_value = hdl.value_as_number
cde_result[RedcapCDE.FIELD_HDL_LAB_NAME_LOCAL] = hdl.concept_name
cde_result[RedcapCDE.FIELD_HDL_DATE_AT_EVENT_LOCAL] = hdl.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_HDL_MEASUREMENT_CONCEPT_ID_LOCAL] = str(hdl.measurement_concept_id)
cde_result[RedcapCDE.FIELD_HDL_VALUE_MOST_RECENT_LOCAL] = hdl_value
if hdl_value < 5 or hdl_value > 200:
needs_review.append('HDL [5-200]')
else:
RedcapCDE.fill_missing_hdl(cde_result)
# Total Cholesterol
if cho is not None:
cho_value = cho.value_as_number
cde_result[RedcapCDE.FIELD_TOTALCHOLEST_LAB_NAME_LOCAL] = cho.concept_name
cde_result[RedcapCDE.FIELD_TOTALCHOLEST_DATE_AT_EVENT_LOCAL] = cho.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_TOTALCHOLEST_MEASUREMENT_CONCEPT_ID_LOCAL] = str(cho.measurement_concept_id)
cde_result[RedcapCDE.FIELD_TOTALCHOLEST_VALUE_MOST_RECENT_LOCAL] = cho_value
if cho_value < 50 or cho_value > 1000:
needs_review.append('total cholesterol [50-1000]')
else:
RedcapCDE.fill_missing_cho(cde_result)
# Triglyceride
if tri is not None:
cde_result[RedcapCDE.FIELD_TRIGLYCERIDE_LAB_NAME_LOCAL] = tri.concept_name
cde_result[RedcapCDE.FIELD_TRIGLYCERIDE_DATE_AT_EVENT_LOCAL] = tri.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_TRIGLYCERIDE_MEASUREMENT_CONCEPT_ID_LOCAL] = str(tri.measurement_concept_id)
cde_result[RedcapCDE.FIELD_TRIGLYCERIDE_VALUE_MOST_RECENT_LOCAL] = tri.value_as_number
else:
RedcapCDE.fill_missing_tri(cde_result)
# A1c
if a1c is not None:
a1c_value = a1c.value_as_number
cde_result[RedcapCDE.FIELD_A1C_LAB_NAME_LOCAL] = a1c.concept_name
cde_result[RedcapCDE.FIELD_A1C_DATE_AT_EVENT_LOCAL] = a1c.measurement_date.isoformat()
cde_result[RedcapCDE.FIELD_A1C_MEASUREMENT_CONCEPT_ID_LOCAL] = str(a1c.measurement_concept_id)
cde_result[RedcapCDE.FIELD_A1C_VALUE_MOST_RECENT_LOCAL] = a1c_value
if a1c_value < 2 or a1c_value > 20:
needs_review.append('a1c [2-20]')
else:
RedcapCDE.fill_missing_a1c(cde_result)
# Wheeze
RedcapCDE.fill_missing_wheeze(cde_result)
wheeze_events = omop.wheeze_events(match.omop_person_id)
if len(wheeze_events) >= 1:
cde_result[RedcapCDE.FIELD_AGE_AT_FIRST_WHEEZE_EVENT_LOCAL] = f'{wheeze_events[0].age:0.3f}'
if len(wheeze_events) >= 2:
cde_result[RedcapCDE.FIELD_AGE_AT_FIRST_WHEEZE_EVENT_LOCAL] = f'{wheeze_events[1].age:0.3f}'
cde_result[RedcapCDE.FIELD_WHEEZING_FLAG_LOCAL] = 1
# Eczema
RedcapCDE.fill_missing_eczema(cde_result)
eczema_events = omop.eczema_events(match.omop_person_id)
if len(eczema_events) >= 1:
cde_result[RedcapCDE.FIELD_AGE_AT_FIRST_ECZEMA_EVENT_LOCAL] = f'{eczema_events[0].age:0.3f}'
if len(eczema_events) >= 2:
cde_result[RedcapCDE.FIELD_AGE_AT_SECOND_ECZEMA_EVENT_LOCAL] = f'{eczema_events[1].age:0.3f}'
cde_result[RedcapCDE.FIELD_ECZEMA_FLAG_LOCAL] = 1
msg = 'CDE complete'
logger.info(msg)
cde_script_message += f'{msg}\n'
# Check which labs need review
if needs_review:
msg = f'The following labs need review (lab [range]): {", ".join(needs_review)}'
logger.info(msg)
cde_script_message += f'{msg}\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_REVIEW_STATUS] = RedcapCDE.GiraReviewStatus.NEEDED.value
cde_result[RedcapCDE.FIELD_GIRA_CLINICAL_VARIABLES_LOCAL_COMPLETE] = renums.Complete.INCOMPLETE.value
else:
msg = 'No review needed for lab measurements'
logger.debug(msg)
cde_script_message += f'{msg}\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_REVIEW_STATUS] = RedcapCDE.GiraReviewStatus.NOT_NEEDED.value
# Mark this instrument as incomplete (until uploaded to R4)
cde_result[RedcapCDE.FIELD_GIRA_CLINICAL_VARIABLES_LOCAL_COMPLETE] = renums.Complete.INCOMPLETE.value
# Update local GIRA CDE instrument
cde_result[RedcapCDE.FIELD_GIRA_CDE_STATUS] = RedcapCDE.GiraCdeStatus.COMPLETED.value
cde_result[RedcapCDE.FIELD_GIRA_CDE_SCRIPT_OUTPUT] = cde_script_message
logger.debug(cde_result)
redcap_cde.update_gira_cde(cde_result)
logger.info('Finished CDE')
# Get participants with CDE needing upload to R4
participant_info = redcap_cde.get_participants_to_upload_r4()
if not participant_info:
logger.info('No participants with CDE needing upload to R4')
else:
logger.info(f'{len(participant_info)} participants have been collected for R4 upload')
# Currently in development. Show what information has been collecetd and verify before continuing to send data out
if CHECK_BEFORE_RUNNING:
pprint(participant_info)
if input('Enter "yes" to continue: ') != 'yes':
logger.debug('Exiting script prior to performing R4 upload.')
exit()
for p in participant_info:
record_id = p[RedcapCDE.FIELD_RECORD_ID]
r4_record_id = p[RedcapCDE.FIELD_R4_RECORD_ID]
r4_script_message = p[RedcapCDE.FIELD_GIRA_CDE_R4_SCRIPT_OUTPUT]
logger.info(f'Starting R4 Upload. CUIMC ID:{record_id}, R4 Record ID:{r4_record_id}')
# If there was a previous script message, add a visual separator for new script message
if r4_script_message:
r4_script_message += '===============================\n'
r4_script_message += f"{timestamp}\nUploading to R4\n"
# Initiate cde_result and gira clinical variable record
cde_result = {
RedcapCDE.FIELD_RECORD_ID: record_id
}
gira_cv_record = {
RedcapCDE.FIELD_R4_RECORD_ID: r4_record_id,
R4GiraClinVar.FIELD_GIRA_CLINICAL_VARIABLES_COMPLETE: renums.Complete.COMPLETE.value
}
if DEVELOPMENT:
# In dev environment, using redcap project clone instead of R4, which uses cuimc_id as record_id
gira_cv_record[RedcapCDE.FIELD_RECORD_ID] = record_id
# Copy over all data fields in GIRA Clinical Variables Instrument
# All data fields have a copy in the GIRA Clinical Variables local instrument with a "_local" suffix appended
for f in R4GiraClinVar.DATA_FIELDS:
gira_cv_record[f] = p[f + RedcapCDE.LOCAL_FIELD_SUFFIX]
# Import to R4
update_result = r4.import_records([gira_cv_record])
if update_result['count'] != 1:
logger.error(f'R4 not updated correctly for participant {record_id}. Update result: {update_result}')
r4_script_message += 'Upload to R4 failed\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_R4_SCRIPT_OUTPUT] = r4_script_message
else:
logger.info('R4 GIRA CDE instrument updated')
r4_script_message += 'Upload to R4 succeeded\n'
cde_result[RedcapCDE.FIELD_GIRA_CDE_R4_SCRIPT_OUTPUT] = r4_script_message
cde_result[RedcapCDE.FIELD_GIRA_CLINICAL_VARIABLES_LOCAL_COMPLETE] = renums.Complete.COMPLETE.value
# Update local instrument with status
update_result = redcap_cde.update_gira_cde(cde_result)
if update_result['count'] != 1:
logger.error(f'Local CDE not updated correctly for participant {record_id}. Update result: {update_result}')
else:
logger.info('Local GIRA CDE instrument updated')
logger.info('Script completed normally')
except Exception as e:
logger.exception(e)
finally:
if error_handler.fired:
emailer.sendmail('GIRA CDE error', 'An issue occurred in the GIRA CDE script. Please check the logs.')