-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathanalysis.py
366 lines (332 loc) · 16 KB
/
analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# Copyright 2018 Martin Haesemeyer. All rights reserved.
#
# Licensed under the MIT license
"""
Module for data analysis classes and functions
"""
from mo_types import MoTypes
from global_defs import GlobalDefs
import numpy as np
from data_stores import SimulationStore, ActivityStore
from core import GradientStandards
import os
import h5py
from sklearn.manifold import SpectralEmbedding
from sklearn.cluster import SpectralClustering
class Analyzer:
"""
Class to abstract analysis by model system
"""
def __init__(self, mo: MoTypes, std: GradientStandards, sim_store_name, act_store_name):
"""
Creates a new analyzer
:param mo: Objecto to identify the model organism and it's network models
:param std: Standardizations for model inputs
:param sim_store_name: File name of persistent storage of simulation data (or None)
:param act_store_name: File name of persistent storage of activity data (or None)
"""
self.mo = mo
self.std = std
self.sim_store_name = sim_store_name
self.act_store_name = act_store_name
def run_simulation(self, path, sim_type, network_state, debug=False, drop_list=None):
"""
Uses a model identified by path to run a naive and a trained and optionally an ideal and unit dropped simulation
:param path: The model path
:param sim_type: The simulation type to run ("r"adial or "l"inear)
:param network_state: The state of the network ("naive", "trained", "ideal", "bfevolve")
:param debug: If true, also return debug dictionary
:param drop_list: If not none should be a list that will be fed to det_drop to determine which units are kept
(1) or dropped (0)
:return:
[0]: Array of x,y,angle positions
[1]: If debug=True dictionary of simulation debug information
"""
with SimulationStore(self.sim_store_name, self.std, self.mo) as sim_store:
if debug:
return sim_store.get_sim_debug(path, sim_type, network_state, drop_list)
else:
return sim_store.get_sim_pos(path, sim_type, network_state, drop_list)
def temperature_activity(self, path, temperature, network_id):
"""
Uses a model identified by path and returns activity of all cells in the temperature branch
:param path: The model path
:param temperature: The temperature stiumulus
:param network_id: The network id for constructing correct unit ids
:return:
[0]: n-timepoints x m-neurons matrix of responses
[1]: 3 x m-neurons matrix with network_id in row 0, layer index in row 1, and unit index in row 2
"""
with ActivityStore(self.act_store_name, self.std, self.mo) as act_store:
return act_store.get_cell_responses(path, temperature, network_id)
def bin_simulation(pos, bins: np.ndarray, simdir="r"):
"""
Bin simulation results along the simulation direction, normalizing occupancy in case of radial simulation
:param pos: Position array obtained from running simulation
:param bins: Array containing bin edges
:param simdir: Determines whether occupancy should be calculated along (r)adius, (x)- or (y)-axis
:return: Relative occupancy (corrected if radial)
"""
if simdir not in ["r", "x", "y"]:
raise ValueError("simdir has to be one of (r)adius, (x)- or (y)-axis")
if simdir == "r":
quantpos = np.sqrt(np.sum(pos[:, :2] ** 2, 1))
elif simdir == "x":
quantpos = pos[:, 0]
else:
quantpos = pos[:, 1]
bin_centers = bins[:-1] + np.diff(bins) / 2
h = np.histogram(quantpos, bins)[0].astype(float)
# for radial histogram normalize by radius to offset area increase
if simdir == "r":
h = h / bin_centers
h = h / h.sum()
return h
def bin_simulation_pt(pos, bins: np.ndarray):
"""
Bin simulation result facing angles
:param pos: Position array obtained from running simulation
:param bins: Array containing bin edges
:return: Relative occupancy
"""
quantpos = MoTypes(False).pt_sim.facing_angle(pos[:, 0], pos[:, 1], pos[:, 2])
# remap angles from -pi to pi
quantpos[quantpos > np.pi] = quantpos[quantpos > np.pi] - 2*np.pi
quantpos[quantpos < -np.pi] = quantpos[quantpos < -np.pi] + 2*np.pi
h = np.histogram(quantpos, bins)[0].astype(float)
h = h / h.sum()
return h
def temp_convert(distances, sim_type):
"""
Converts center or origin distances into temperature values according to our standard simulation types
"""
circle_sim_params = GlobalDefs.circle_sim_params
lin_sim_params = GlobalDefs.lin_sim_params
if sim_type == "r":
return distances / circle_sim_params["radius"] * (circle_sim_params["t_max"] - circle_sim_params["t_min"]) \
+ circle_sim_params["t_min"]
else:
return distances / lin_sim_params["radius"] * (lin_sim_params["t_max"] - lin_sim_params["t_min"]) \
+ lin_sim_params["t_min"]
def create_det_drop_list(network_id, cluster_ids, unit_ids, clust_to_drop, shuffle=False, branch='t'):
"""
Creates a network specific list of deterministic unit drop vectors for a network branch
:param network_id: The id of the network for which to generate drop vectors
:param cluster_ids: For each analyzed unit its cluster membership
:param unit_ids: For each analyzed unit 3xneurons matrix with network_id, layer index and unit index
:param clust_to_drop: Id or list of ids of cluster members to drop
:param shuffle: If true drop indicator will be shuffled in each layer
:param branch: The dictionary identifier to give to the drop list
:return: Dictionary with list of deterministic drop vectors
"""
if type(clust_to_drop) is not list:
clust_to_drop = [clust_to_drop]
# use unit_ids to identify topology of network and drop requested units
det_drop = []
for l_id in np.unique(unit_ids[1, unit_ids[0, :] == network_id]):
in_network_layer = np.logical_and(unit_ids[0, :] == network_id, unit_ids[1, :] == l_id)
drop = np.ones(in_network_layer.sum())
for cd in clust_to_drop:
drop[cluster_ids[in_network_layer] == cd] = 0
if shuffle:
np.random.shuffle(drop)
det_drop.append(drop)
return {branch: det_drop}
def quant_pos(all_pos: np.ndarray, sim_type: str):
"""
Uses simulation type to compute and return the gradient position relevant for temperature calculation
"""
if sim_type == "r":
return np.sqrt(np.sum(all_pos[:, :2] ** 2, 1))
else:
# assume gradient runs in x-direction for linear simulation
return all_pos[:, 0]
def temp_error(all_pos: np.ndarray, sim_type: str):
"""
Compute the (radius corrected) average deviance in degrees from the preferred temperature in a simulation run
:param all_pos: Position array of a simulation
:param sim_type: The type of simulation ("r"adial or "l"inear)
:return: The average deviation from preferred temperature
"""
qp = quant_pos(all_pos, sim_type)
temp_pos = temp_convert(qp, sim_type)
if sim_type == "r":
# form a weighted average, considering points of larger radius less since just by
# chance they will be visited more often
weights = 1 / qp
weights[np.isinf(weights)] = 0 # occurs when 0,0 was picked as starting point only
sum_of_weights = np.nansum(weights)
weighted_sum = np.nansum(np.sqrt((temp_pos - GlobalDefs.tPreferred) ** 2) * weights)
return weighted_sum / sum_of_weights
return np.mean(np.sqrt((temp_pos - GlobalDefs.tPreferred) ** 2))
def preferred_fraction(all_pos: np.ndarray, sim_type: str, delta_t=1.0):
"""
Calculates the (radius corrected) occupancy within delta_t degrees of the preferred temperature
:param all_pos: Position array of a simulation
:param sim_type: The type of simulation ("r"adial or "l"inear)
:param delta_t: Fraction within +/- deltaT degrees will be computed
:return: Fraction spent around preferred temperature
"""
qp = quant_pos(all_pos, sim_type)
temp_pos = temp_convert(qp, sim_type)
weights = np.ones(qp.size) if sim_type == "l" else 1/qp
weights[np.isinf(weights)] = 0
sum_of_weights = np.nansum(weights)
in_delta = np.logical_and(temp_pos > GlobalDefs.tPreferred-delta_t, temp_pos < GlobalDefs.tPreferred+delta_t)
return np.nansum(weights[in_delta]) / sum_of_weights
def trial_average(mat, n_trials):
"""
Computes the trial average of each trace in mat
:param mat: n-timepoints x m-cells matrix of traces
:param n_trials: The number of trials
:return: Trial average activity of shape (n-timepoints/n_trials) x m-cells
"""
if mat.shape[0] % n_trials != 0:
raise ValueError("Number of timepoints can't be divided into select number of trials")
t_length = mat.shape[0] // n_trials
return np.mean(mat.reshape((n_trials, t_length, mat.shape[1])), 0)
def rank_error(y_real: np.ndarray, prediction: np.ndarray):
"""
Compute prediction rank error
:param y_real: Matrix (samplesxfeatures) of real outputs
:param prediction: Matrix (samplesxfeatures) of network predictions
:return: Average rank error across all samples
"""
nsamples = y_real.shape[0]
if prediction.shape[0] != nsamples:
raise ValueError("y_real and prediction need to have same number of samples")
err_sum = 0
for (y, p) in zip(y_real, prediction):
r_y = np.unique(y, return_inverse=True)[1]
r_p = np.unique(p, return_inverse=True)[1]
err_sum += np.sum(np.abs(r_y - r_p))
return err_sum / nsamples
def behavior_by_temperature(db_dict: dict, all_temps: np.ndarray, bins: np.ndarray):
"""
Computes frequency of behavior selection by temperature
:param db_dict: Debug dictionary created during simulation run
:param all_temps: For each simulation position the occupied temperature
:param bins: The bin-edges for dividing the temperature space
:return: A dictionary with behaviors as keys and probability in each bin as values
"""
selector = np.logical_and(db_dict["sel_behav"] != '', db_dict["sel_behav"] != 'N')
behavior_types = np.unique(db_dict["sel_behav"][selector])
occupancy_counts = np.histogram(all_temps, bins)[0].astype(np.float)
result = {k: np.zeros(bins.size-1) for k in behavior_types}
for behav in behavior_types:
b_temps = db_dict["curr_temp"][db_dict["sel_behav"] == behav]
result[behav] = np.histogram(b_temps, bins)[0].astype(np.float) / occupancy_counts
return result
def behavior_by_delta_temp(db_dict: dict, bins: np.ndarray):
"""
Computes frequency of behavior by delta-temperature achieved during the preceding bout
:param db_dict: Debug dictionary created during simulation run
:param bins: The bin-edges for dividing the bout delta temperature space
:return: A dictionary with behaviors as keys and probability in each bin as values
"""
selector = np.logical_and(db_dict["sel_behav"] != '', db_dict["sel_behav"] != 'N')
behavior_types = np.unique(db_dict["sel_behav"][selector])
all_behavs = db_dict["sel_behav"][selector]
all_btemps = db_dict["curr_temp"][selector]
all_deltas = np.zeros(all_btemps.size)
for i in range(1, all_btemps.size):
all_deltas[i] = all_btemps[i] - all_btemps[i-1]
ad_counts = np.histogram(all_deltas, bins)[0].astype(np.float)
result = {k: np.zeros(bins.size - 1) for k in behavior_types}
for behav in behavior_types:
b_dtemps = all_deltas[all_behavs == behav]
result[behav] = np.histogram(b_dtemps, bins)[0].astype(np.float) / ad_counts
return result
def _cluster_responses(response_mat, n_clusters, avg_trials, mem_save, corr_cut=0.6):
"""
Clusters the neuron responses using spectral clustering
:param response_mat: The response matrix with all neuron responses
:param n_clusters: The desired number of clusters
:param avg_trials: If True, perform 3-trial averaging on response_mat input
:param mem_save: If > 0 compute initial clustering on mem_save-fold random subset of data
:param corr_cut: The correlation cutoff to consider a given neuron to be part of a cluster
:return:
[0]: The cluster ids
[1]: 3D embedding coordinates for plotting
"""
if avg_trials:
# create trial average
response_mat = trial_average(response_mat, 3)
# compute pairwise correlations
if mem_save > 0:
sampler = np.random.rand(response_mat.shape[1])
sample = response_mat[:, sampler < 1/mem_save].copy()
pw_corrs = np.corrcoef(sample.T)
pw_corrs[np.isnan(pw_corrs)] = 0
pw_corrs[pw_corrs < 0.2] = 0
# perform spectral clustering
spec_clust = SpectralClustering(n_clusters, affinity="precomputed")
spec_clust_ids = spec_clust.fit_predict(pw_corrs)
spec_emb = SpectralEmbedding(3, affinity="precomputed")
coords = spec_emb.fit_transform(pw_corrs)
# use correlation to cluster centroids to determine final cluster membership
regressors = np.zeros((response_mat.shape[0], n_clusters))
for i in range(n_clusters):
regressors[:, i] = np.mean(sample[:, spec_clust_ids == i], 1)
else:
pw_corrs = np.corrcoef(response_mat.T)
pw_corrs[np.isnan(pw_corrs)] = 0
pw_corrs[pw_corrs < 0.2] = 0
# perform spectral clustering
spec_clust = SpectralClustering(n_clusters, affinity="precomputed")
spec_clust_ids = spec_clust.fit_predict(pw_corrs)
spec_emb = SpectralEmbedding(3, affinity="precomputed")
coords = spec_emb.fit_transform(pw_corrs)
# use correlation to cluster centroids to determine final cluster membership
regressors = np.zeros((response_mat.shape[0], n_clusters))
for i in range(n_clusters):
regressors[:, i] = np.mean(response_mat[:, spec_clust_ids == i], 1)
clust_ids = np.full(response_mat.shape[1], np.nan)
for i in range(response_mat.shape[1]):
max_ix = -1
max_corr = 0
for j in range(n_clusters):
c = np.corrcoef(response_mat[:, i], regressors[:, j])[0, 1]
if c >= corr_cut and c > max_corr:
max_ix = j
max_corr = c
clust_ids[i] = max_ix
return clust_ids, coords
def cluster_activity(n_regs, all_cells, cluster_file=None, avg_trials=True, mem_save=0):
clust_ids, coords = None, None
load_success = False
if cluster_file is not None and os.path.exists(cluster_file):
clfile = h5py.File(cluster_file, "r")
# ensure that same amount of clusters were formed in the file
if np.array(clfile["n_regs"]) == n_regs:
clust_ids = np.array(clfile["clust_ids"])
coords = np.array(clfile["coords"])
# ensure that the same number of cells was clustered - don't check coords as they may
# be subsampled
if clust_ids.size == all_cells.shape[1]:
load_success = True
clfile.close()
if not load_success:
clust_ids, coords = _cluster_responses(all_cells, n_regs, avg_trials, mem_save)
if cluster_file is not None:
clfile = h5py.File(cluster_file, "w")
clfile.create_dataset("n_regs", data=n_regs)
clfile.create_dataset("clust_ids", data=clust_ids)
clfile.create_dataset("coords", data=coords)
clfile.close()
return clust_ids, coords
def turn_coherence(behaviors, n_steps):
"""
Computes the probability of keeping a turn direction, ignoring intervening stay and straight bouts
:param behaviors: Array of consecutive behaviors - 0-stay, 1-straight, 2-left, 3-right
:param n_steps: The number of steps over which to compute probalities
:return: n_steps long array of probabilities of keeping the same turn direction
"""
behaviors = behaviors[behaviors > 1].copy()
instances = []
for i, b in enumerate(behaviors):
if i >= behaviors.size-n_steps:
break
following = behaviors[i+1:i+n_steps+1]
instances.append(following == b)
return np.mean(instances, 0)