-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocessing.py
339 lines (270 loc) · 11.1 KB
/
processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# -------------------------------------------------------
# Image processing functions
# Useful for brain MRI analysis
#
# Sergi Valverde 2018
# svalverde@eia.udg.edu
#
# -------------------------------------------------------
import numpy as np
import nibabel as nib
from scipy.ndimage import label
from scipy.ndimage import labeled_comprehension as lc
from scipy.interpolate import interp1d
from sklearn.mixture import GaussianMixture
import SimpleITK as sitk
def filter_regions_volume(input_mask, threshold=0.5, min_volume=10):
"""
Remove regions with region volume < min_volume. Optionally, an initial
threshold is applied to binarize probabilities before filtering.
Inputs:
- mask: 3D np.ndarray
- threshold: binarize parameter (def: 0.5)
- min_volume: Minimum region size used for filtering (in voxels) (def: 10)
Output:
- mask: 3D np.ndarray mask > threshold where regions < 10 have been
filtered
"""
mask = input_mask >= threshold
regions, num_regions = label(mask)
labels = np.arange(1, num_regions+1)
output_mask = np.zeros_like(mask)
prob_mask = np.zeros(mask.shape)
if num_regions > 0:
region_vol = lc(mask, regions, labels, np.sum, int, 0)
for l in labels:
if region_vol[l-1] > min_volume:
current_voxels = np.stack(np.where(regions == l), axis=1)
int_mean = np.median(input_mask[current_voxels[:, 0],
current_voxels[:, 1],
current_voxels[:, 2]])
output_mask[current_voxels[:, 0],
current_voxels[:, 1],
current_voxels[:, 2]] = 1
prob_mask[current_voxels[:, 0],
current_voxels[:, 1],
current_voxels[:, 2]] = int_mean
return output_mask, prob_mask
def histogram_matching(mov_scan, ref_scan,
histogram_levels=2048,
match_points=100,
set_th_mean=True):
"""
Histogram matching following the method developed on
Nyul et al 2001 (ITK implementation)
inputs:
- mov_scan: np.array containing the image to normalize
- ref_scan np.array containing the reference image
- histogram levels
- number of matched points
- Threshold Mean setting
outputs:
- histogram matched image
"""
# convert np arrays into itk image objects
ref = sitk.GetImageFromArray(ref_scan.astype('float32'))
mov = sitk.GetImageFromArray(mov_scan.astype('float32'))
# perform histogram matching
caster = sitk.CastImageFilter()
caster.SetOutputPixelType(ref.GetPixelID())
matcher = sitk.HistogramMatchingImageFilter()
matcher.SetNumberOfHistogramLevels(histogram_levels)
matcher.SetNumberOfMatchPoints(match_points)
matcher.SetThresholdAtMeanIntensity(set_th_mean)
matched_vol = matcher.Execute(mov, ref)
return sitk.GetArrayFromImage(matched_vol)
def n4_normalization(input_scan, max_iters=400, levels=1):
"""
N4ITK: Improved N3 Bias Correction, Tustison et al. 2010)
inputs:
- input scan: np.array containing the image to process
- max_iters: number of processing iterations
- levels
outputs:
- bias corrected image
"""
# convert np array into itk image objects
scan = sitk.GetImageFromArray(input_scan.astype('float32'))
# process the input image
mask = sitk.OtsuThreshold(scan, 0, 1, 200)
corrector = sitk.N4BiasFieldCorrectionImageFilter()
corrector.SetMaximumNumberOfIterations([max_iters] * levels)
output = corrector.Execute(scan, mask)
return sitk.GetArrayFromImage(output)
def normalize_data(im,
norm_type='standard',
brainmask=None,
datatype=np.float32):
"""
Zero mean normalization
inputs:
- im: input data
- nomr_type: 'zero_one', 'standard'
outputs:
- normalized image
"""
mask = np.copy(im > 0 if brainmask is None else brainmask)
if norm_type == 'standard':
im = im.astype(dtype=datatype) - im[np.nonzero(im)].mean()
im = im / im[np.nonzero(im)].std()
if norm_type == 'zero_one':
min_int = abs(im.min())
max_int = im.max()
if im.min() < 0:
im = im.astype(dtype=datatype) + min_int
im = im / (max_int + min_int)
else:
im = (im.astype(dtype=datatype) - min_int) / max_int
# do not apply normalization to non-brain parts
# im[mask==0] = 0
return im
def nyul_apply_standard_scale(input_image,
standard_hist,
input_mask=None,
interp_type='linear'):
"""
Based on J.Reinhold code:
https://github.com/jcreinhold/intensity-normalization
Use Nyul and Udupa method ([1,2]) to normalize the intensities
of a MRI image passed as input.
Args:
input_image (np.ndarray): input image to normalize
standard_hist (str): path to output or use standard histogram landmarks
input_mask (nii): optional brain mask
Returns:
normalized (np.ndarray): normalized input image
References:
[1] N. Laszlo G and J. K. Udupa, “On Standardizing the MR Image
Intensity Scale,” Magn. Reson. Med., vol. 42, pp. 1072–1081,
1999.
[2] M. Shah, Y. Xiao, N. Subbanna, S. Francis, D. L. Arnold,
D. L. Collins, and T. Arbel, “Evaluating intensity
normalization on MRIs of human brain with multiple sclerosis,”
Med. Image Anal., vol. 15, no. 2, pp. 267–282, 2011.
"""
# load learned standard scale and the percentiles
standard_scale, percs = np.load(standard_hist)
# apply transformation to image
return do_hist_normalization(input_image,
percs,
standard_scale,
input_mask,
interp_type=interp_type)
def get_landmarks(img, percs):
"""
get the landmarks for the Nyul and Udupa norm method for a specific image
Based on J.Reinhold code:
https://github.com/jcreinhold/intensity-normalization
Args:
img (nibabel.nifti1.Nifti1Image): image on which to find landmarks
percs (np.ndarray): corresponding landmark percentiles to extract
Returns:
landmarks (np.ndarray): intensity values corresponding to percs in img
"""
landmarks = np.percentile(img, percs)
return landmarks
def nyul_train_standard_scale(img_fns,
mask_fns=None,
i_min=1,
i_max=99,
i_s_min=1,
i_s_max=100,
l_percentile=10,
u_percentile=90,
step=10):
"""
determine the standard scale for the set of images
Based on J.Reinhold code:
https://github.com/jcreinhold/intensity-normalization
Args:
img_fns (list): set of NifTI MR image paths which are to be normalized
mask_fns (list): set of corresponding masks (if not provided, estimated)
i_min (float): minimum percentile to consider in the images
i_max (float): maximum percentile to consider in the images
i_s_min (float): minimum percentile on the standard scale
i_s_max (float): maximum percentile on the standard scale
l_percentile (int): middle percentile lower bound (e.g., for deciles 10)
u_percentile (int): middle percentile upper bound (e.g., for deciles 90)
step (int): step for middle percentiles (e.g., for deciles 10)
Returns:
standard_scale (np.ndarray): average landmark intensity for images
percs (np.ndarray): array of all percentiles used
"""
# compute masks is those are not entered as a parameters
mask_fns = [None] * len(img_fns) if mask_fns is None else mask_fns
percs = np.concatenate(([i_min],
np.arange(l_percentile, u_percentile+1, step),
[i_max]))
standard_scale = np.zeros(len(percs))
# process each image in order to build the standard scale
for i, (img_fn, mask_fn) in enumerate(zip(img_fns, mask_fns)):
print('processing scan ', img_fn)
img_data = nib.load(img_fn).get_data()
mask = nib.load(mask_fn) if mask_fn is not None else None
mask_data = img_data > img_data.mean() \
if mask is None else mask.get_data()
masked = img_data[mask_data > 0]
landmarks = get_landmarks(masked, percs)
min_p = np.percentile(masked, i_min)
max_p = np.percentile(masked, i_max)
f = interp1d([min_p, max_p], [i_s_min, i_s_max])
landmarks = np.array(f(landmarks))
standard_scale += landmarks
standard_scale = standard_scale / len(img_fns)
return standard_scale, percs
def do_hist_normalization(input_image,
landmark_percs,
standard_scale,
mask=None,
interp_type='linear'):
"""
do the Nyul and Udupa histogram normalization routine with a given set of
learned landmarks
Based on J.Reinhold code:
https://github.com/jcreinhold/intensity-normalization
Args:
img (np.ndarray): image on which to find landmarks
landmark_percs (np.ndarray): corresponding landmark points of standard scale
standard_scale (np.ndarray): landmarks on the standard scale
mask (np.ndarray): foreground mask for img
Returns:
normalized (np.ndarray): normalized image
"""
mask_data = input_image > input_image.mean() if mask is None else mask
masked = input_image[mask_data > 0]
landmarks = get_landmarks(masked, landmark_percs)
if interp_type == 'linear':
f = interp1d(landmarks, standard_scale, fill_value='extrapolate')
# apply transformation to input image
return f(input_image)
def gmm_clustering(img_data, n_classes=3, max_iter=50, brainmask=None):
"""
GMM clustering
inputs:
- img_data: np.array containing the image to process
- n_classes: number of classes
- max_iter: number of iterations to perform
- brainmask: input brainmask
outputs:
- res_array: np.array n_classes x img_data with output probabilities
- labels: np.array with resulting class labels
"""
if brainmask is None:
brainmask = img_data > 0
brain = img_data
brain = np.expand_dims(img_data[brainmask].flatten(), 1)
gmm = GaussianMixture(n_classes, max_iter=max_iter)
gmm.fit(brain)
classes_ = np.argsort(gmm.means_.T.squeeze())
# predict probabilities
res = gmm.predict_proba(brain)
res_array = []
for c in classes_:
prob_class = np.zeros_like(img_data)
prob_class[brainmask > 0] = res[:, c]
res_array.append(prob_class)
# predict labels
labels = np.array(res_array)
labels = np.argmax(res_array, axis=0)
labels[brainmask > 0] += 1
return res_array, labels