-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipeline_functions.py
338 lines (269 loc) · 15.1 KB
/
pipeline_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import numpy as np
import pandas as pd
import scanpy as sc
import matplotlib.pyplot as plt
from sklearn.mixture import GaussianMixture # different in updated sklearn (prev GMM)
from sklearn.metrics import confusion_matrix
from matplotlib.colors import rgb2hex
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior() # NN was built under TF v1
def training_data_select(data, marker, celltypes, cell_types, correct_ind):
'''
:param data: input sc data (as anndata)
:param marker: marker genes (listed in array)
:param celltypes: matrix indicating which genes correspond to which cell type
:param cell_types: array indicating which cell types we're searching for
:param correct_ind: patch to exclude subsets if necessary. default - np.arange(len(celltypes))
:return labels: cell type labels for ideal examples of each expected cell type
:return ideal_index: indices of the ideal cells in the total data set (important for reshuffling later on)
:return training_cells: the expression profiles for these ideal cells
:return test_cells: the remainder of the data set (those not chosen as ideal examples). provides us with training/
testing split for the neural network classifier
'''
# finding the columns corresponding to each marker gene
ind = np.zeros(len(marker))
for i in range(len(ind)):
ind[i] = np.where(data.var_names == marker[i])[0][0] # just the index number
on_off = np.zeros((data.X.shape[0], len(marker))) # number cells x number marker genes - to store binary expr
# fitting GMMS for each marker gene in master list
for j in range(len(marker)): # for each marker gene for each cell type
expr = data.X[:, ind[j].astype(int)]
expr = expr.reshape(-1, 1) # reshaping to one dimensional array for further analysis
# fit models with 1-5 components
N = np.arange(1, 6) # fitting models with 1-5 components (not > 5 for simplicity's sake)
models = [None for p in range(len(N))]
# fitting a GMM for each possible component combo
for k in range(len(N)):
models[k] = GaussianMixture(N[k]).fit(expr.toarray())
# compute the AIC and the BIC
AIC = [m.aic(expr.toarray()) for m in models]
BIC = [m.bic(expr.toarray()) for m in models] # Lauffenburger paper chooses best GMM by minimizing BIC
# example for plotting BIC by number of Gaussian components
# plt.plot(N, BIC, marker='o')
# plt.xlabel('Number of Gaussian components in fitted GMM')
# plt.ylabel('Bayesian information criterion (BIC)')
# plt.show()
# choosing best model and determining which cells are ON/OFF
M_best = models[np.argmin(BIC)] # best GMM by minimizing BIC
x = np.linspace(-0.5, np.max(expr.toarray()), len(expr.toarray()))
logprob = M_best.score(x.reshape(-1, 1))
pdf = np.exp(logprob) # will allow us to understand percentiles of marker gene expression
# need cluster membership based on chosen model - this will determine ON/OFF states
gmm_labels = M_best.predict(expr.toarray())
gmm_avg = np.zeros(len(np.unique(gmm_labels)))
for h in np.unique(gmm_labels):
gmm_ind = np.where(gmm_labels == h)[0]
gmm_avg[h] = np.mean(expr.toarray()[gmm_ind])
ind_min = np.argmin(expr.toarray()) # to identify GMM cluster with smallest mean
ind_max = np.argmax(expr.toarray()) # to identify GMM cluster with largest mean
if len(np.unique(gmm_labels)) > 2:
ind_sort = np.argsort(gmm_avg)
on_marker0 = np.where(gmm_labels != ind_sort[0])[0]
on_marker1 = np.where(gmm_labels != ind_sort[1])[0]
on_marker = np.intersect1d(on_marker0, on_marker1)
else:
on_marker = np.where(gmm_labels != gmm_labels[ind_min])[0] # on for cells not in smallest cluster
# now need to make on/off determinations for each cell in data set
for i in range(len(on_marker)): # for indices of the proper cluster membership
on_off[on_marker[i], j] = 1
labels = []
ideal_index = []
for i in correct_ind: # for each cell type - intended for easy exclusion of cell subtypes with extra logic
test_on = np.where(celltypes[i, :] == 1)[0] # indices of ON marker genes
# code currently not set up to handle cell types with no ON genes
for j in range(data.X.shape[0]): # for each cell in data set
# want to hold on to ideal cells and their cell type label
ongenes = on_off[j, test_on]
all_on = np.sum(ongenes)
if all_on == len(test_on):
# now need to include check for OFF
test_off = np.where(celltypes[i, :] == -1)[0]
if test_off.shape[0] == 0 or np.sum(on_off[j, test_off]) == 0:
# one last check for maybe genes...
test_mayb = np.where(celltypes[i, :] == 2)[0]
if test_mayb.shape[0] == 0 or np.sum(on_off[j, test_mayb]) >= 2: # no more than one exception
# should probably make this an adaptable/user defined threshold
labels.append(i)
ideal_index.append(j)
# removing cells that have been classified as more than one cell type
doub = np.array([x for x in ideal_index if ideal_index.count(x) > 1])
g = np.array([])
for i in range(len(doub)):
g = np.hstack((g, np.where(ideal_index == doub[i])[0])) # indices of doubled elements
ideal_index = np.delete(np.asarray(ideal_index), g.astype('int'))
labels = np.delete(np.asarray(labels), g.astype('int')) # converted to numpy array
training_cells = data.X[ideal_index, :]
print('Percent of data chosen for training set: ', len(labels)/data.X.shape[0])
print('Cell type breakdown:')
for k in range(len(cell_types)):
perc = np.where(labels == k)[0]
print('{}: {}'.format(cell_types[k], len(perc)))
# returning test_cells - remainder of data set
indices = np.arange(0, data.X.shape[0]).tolist()
test_ind = np.delete(indices, ideal_index)
test_cells = data.X[test_ind, :]
return labels, ideal_index, training_cells, test_cells
def viz_training_data(tot_data, tot_lab, tot_ideal_ind, types, emb, emb_type, cmap, title, fig_size, leg_adjust):
'''
:param tot_data: sc data from which a training set is being selected (anndata)
:param tot_lab: labels assigned for ideal cell type examples
:param tot_ideal_ind: indices of ideal examples
:param types: expected cell types (listed explicitly)
:param emb: 2D tSNE (or other 2D projection) embedding for total dataset
:param emb_type: string of 2D embedding type, e.g. 'UMAP'
:param cmap: colormap of choice (used to label different cell types in visually pleasing manner), dictionary or subscriptable colors
:param title: title of output figure
:param fig_size: figure size (int, int)
:param leg_adjust: adjustment (if necessary) to force legend within window
:return:
'''
# need to establish indices in original data for training and testing data split
indices = np.arange(0, tot_data.shape[0])
test_ind = np.delete(indices, tot_ideal_ind)
all_labels = np.zeros(tot_data.shape[0])
# establishing a label array defining ideal examples + test data
for i in range(len(tot_ideal_ind)):
all_labels[tot_ideal_ind[i]] = tot_lab[i]
for j in range(len(test_ind)):
all_labels[test_ind[j]] = -1
fig, ax = plt.subplots(figsize=fig_size)
scatter_x = emb[:, 0]
scatter_y = emb[:, 1]
for g in np.unique(all_labels):
i = np.where(all_labels == g)
if g == -1:
ax.scatter(scatter_x[i], scatter_y[i], label='Test data', c='xkcd:light grey', alpha=0.75, s=6)
else:
ax.scatter(scatter_x[i], scatter_y[i], label=types[int(g)], c=rgb2hex(cmap[int(g)]), s=6)
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), prop={'size': 8})
plt.title(title)
ax.axes.get_xaxis().set_ticks([])
ax.axes.get_yaxis().set_ticks([])
plt.xlabel(emb_type + '1')
plt.ylabel(emb_type + '2')
plt.subplots_adjust(right=leg_adjust)
plt.show()
def one_hot_encode(labels):
'''
:param labels: cell type labels
:return: labels_onehot: the same cell type labels, just one hot encoded
'''
labels_onehot = np.zeros((len(labels), len(np.unique(labels))))
for i in range(len(labels)):
labels_onehot[i, labels[i]] = 1
return labels_onehot
def cell_type_classifier(ideal_labels, ideal_cells, test_cells, train_ind, learning_rate, training_epochs, batch_size, display_step):
'''
:param ideal_labels: cell type labels for ideal examples
:param ideal_cells: "ideal" examples of each cell type as selected by the above function
:param test_cells: cells that aren't in training/validation set
:param train_ind: indices for random selection of ideal cells for training set. Remainder of ideal reserved for validation
:param learning_rate: controls increments by which neural network can change during training epochs
:param training_epochs: the number of iterations over which the neural network is allowed to learn
:param batch_size: size of subsets of training data tested during each training epoch
:param display_step: how frequently the training epoch cost is displayed
:return: pred_lab: cell type labels assigned to each cell in the testing data set
:return: likelihood: probability value associated with the assigned cell type label
'''
labels = ideal_labels[train_ind.astype('int'), :] # 2D one hot encoded labels
training_cells = ideal_cells[train_ind.astype('int'), :]
valid_ind = np.delete(np.arange(len(ideal_labels)), train_ind.astype('int'))
valid_lab = ideal_labels[valid_ind.astype('int'), :]
valid_cells = ideal_cells[valid_ind.astype('int'), :]
rows = training_cells.shape[1]
x = tf.placeholder(tf.float32, [None, rows]) # data shape
y = tf.placeholder(tf.float32, [None, labels.shape[1]])
# hidden layer variables + transformation
W_3 = tf.Variable(tf.truncated_normal([rows, 750], stddev=1e-4))
b_3 = tf.Variable(tf.random_normal([750]))
hidden = tf.nn.relu(tf.matmul(x, W_3) + b_3)
# set model weights + biases - can alter width of this hidden layer
W = tf.Variable(tf.truncated_normal([750, labels.shape[1]], stddev=1e-4)) # originally 130, can be altered
b = tf.Variable(tf.random_normal([labels.shape[1]]))
# after hidden layer - nonlinearity could go here
after = tf.matmul(hidden, W) + b
# output layer
pred = tf.nn.softmax(after) # softmax is final 3rd layer
# minimize error using cross entropy [could choose alternate entropy eqn here]
cost = tf.reduce_mean(-tf.reduce_sum(y * tf.log(pred + 1e-10), reduction_indices=1))
# issue with log function if/when pred output hits 0
# simplest fix here to stay nonzero (can also consider clipping, built in tensorflow functions?)
# gradient descent optimization
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
# Initialize the variables (i.e. assign their default value)
init = tf.global_variables_initializer()
# Start training
with tf.Session() as sess:
# Run the initializer
sess.run(init)
# Training cycle
for epoch in range(training_epochs):
avg_cost = 0.
total_batch = int(training_cells.shape[0] / batch_size)
# Loop over all batches
for i in range(total_batch):
# batch x is random sample from training cells
# batch y is random sample from training cell LABELS - using batch helps with overfitting
rand_ind = np.arange(0, training_cells.shape[0])
rand = np.random.choice(rand_ind, batch_size)
batch_xs = training_cells[rand, :].todense() # batch_size random # cells
batch_ys = labels[rand, :]
# Run optimization op (backprop) and cost op (to get loss value)
_, c = sess.run([optimizer, cost], feed_dict={x: batch_xs, y: batch_ys})
# Compute average loss
avg_cost += c / total_batch
# Display logs per epoch step
if (epoch + 1) % display_step == 0:
print("Epoch:", '%04d' % (epoch + 1), "cost=", "{:.9f}".format(avg_cost))
print("Optimization Finished!")
# evaluate accuracy of classification with validation set
correct_prediction = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print("Accuracy:", accuracy.eval({x: valid_cells.todense(),
y: valid_lab}))
# confusion matrix generation for validation set
x_batch, y_true_batch = valid_cells, valid_lab
feed_dict = {
x: x_batch.todense(),
y: y_true_batch,
}
true = sess.run(pred, feed_dict=feed_dict)
lab_not_hot = np.argmax(valid_lab, axis=1)
prediction = np.argmax(true, axis=1)
colorm = confusion_matrix(lab_not_hot, prediction)
# testing data (aka the rest of the data set)
true1 = sess.run(pred, feed_dict={x: test_cells.todense()}) # need to evaluate the readout here
likelihood = np.amax(true1, axis=1)
pred_lab = np.argmax(true1, axis=1)
return pred_lab, likelihood, colorm, pred
def process_label(tot_prob, tot_lab, total_predicted_lab, tot_ideal_ind, total_data, thresh):
'''
:param tot_prob: the probability values associated with the assigned cell type labels, important info when setting
confidence threshold
:param tot_lab: cell type labels associated with the ideal (training/valid) data set
:param total_predicted_lab: cell type labels output by the neural network
:param tot_ideal_ind: indices of the ideal data set within the total data set, important for reshuffling cell
type labels to their proper order wrt the original dataset
:param total_data: for shape of total dataset
:param thresh: threshold to distinguish "poorly classified" cells
:return: total_lab_reshuff: the cell type classification labels put back into the proper order
'''
# 0. thresholding based on likelihood
total_predicted_lab_ = np.zeros(len(total_predicted_lab))
for k in range(len(tot_prob)):
if tot_prob[k] < thresh:
total_predicted_lab_[k] = -1
else:
total_predicted_lab_[k] = total_predicted_lab[k]
# 1. reordering
indices = np.arange(0, total_data.X.shape[0])
test_ind = np.delete(indices, tot_ideal_ind.astype('int'))
total_lab_reshuff = np.zeros(total_data.X.shape[0])
tot_prob_reshuff = np.zeros(total_data.X.shape[0])
for i in range(len(tot_ideal_ind)):
total_lab_reshuff[tot_ideal_ind[i]] = tot_lab[i]
tot_prob_reshuff[tot_ideal_ind[i]] = 1.01 # encoding "ideal" cells in probability info
for j in range(len(test_ind)):
total_lab_reshuff[test_ind[j]] = total_predicted_lab_[j]
tot_prob_reshuff[test_ind[j]] = tot_prob[j]
return total_lab_reshuff, tot_prob_reshuff