-
Notifications
You must be signed in to change notification settings - Fork 44
/
train_12ECG_classifier.py
89 lines (71 loc) · 2.69 KB
/
train_12ECG_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python
import numpy as np, os, sys, joblib
from scipy.io import loadmat
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestClassifier
from get_12ECG_features import get_12ECG_features
def train_12ECG_classifier(input_directory, output_directory):
# Load data.
print('Loading data...')
header_files = []
for f in os.listdir(input_directory):
g = os.path.join(input_directory, f)
if not f.lower().startswith('.') and f.lower().endswith('hea') and os.path.isfile(g):
header_files.append(g)
classes = get_classes(input_directory, header_files)
num_classes = len(classes)
num_files = len(header_files)
recordings = list()
headers = list()
for i in range(num_files):
recording, header = load_challenge_data(header_files[i])
recordings.append(recording)
headers.append(header)
# Train model.
print('Training model...')
features = list()
labels = list()
for i in range(num_files):
recording = recordings[i]
header = headers[i]
tmp = get_12ECG_features(recording, header)
features.append(tmp)
for l in header:
if l.startswith('#Dx:'):
labels_act = np.zeros(num_classes)
arrs = l.strip().split(' ')
for arr in arrs[1].split(','):
class_index = classes.index(arr.rstrip()) # Only use first positive index
labels_act[class_index] = 1
labels.append(labels_act)
features = np.array(features)
labels = np.array(labels)
# Replace NaN values with mean values
imputer=SimpleImputer().fit(features)
features=imputer.transform(features)
# Train the classifier
model = RandomForestClassifier().fit(features,labels)
# Save model.
print('Saving model...')
final_model={'model':model, 'imputer':imputer,'classes':classes}
filename = os.path.join(output_directory, 'finalized_model.sav')
joblib.dump(final_model, filename, protocol=0)
# Load challenge data.
def load_challenge_data(header_file):
with open(header_file, 'r') as f:
header = f.readlines()
mat_file = header_file.replace('.hea', '.mat')
x = loadmat(mat_file)
recording = np.asarray(x['val'], dtype=np.float64)
return recording, header
# Find unique classes.
def get_classes(input_directory, filenames):
classes = set()
for filename in filenames:
with open(filename, 'r') as f:
for l in f:
if l.startswith('#Dx'):
tmp = l.split(': ')[1].split(',')
for c in tmp:
classes.add(c.strip())
return sorted(classes)