-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunctions.py
209 lines (163 loc) · 9.02 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import skimage as ski
import caiman as cm
import numpy as np
import pandas as pd
from collections import defaultdict
import matplotlib.pyplot as plt
def reshape(Y, x_dim, y_dim, z_dim):
# 1st dimension of Y is time X z_stacks, so reshape according to specified z_stacks
try:
Y_reshaped = np.reshape(Y, (int(Y.shape[0]/z_dim), z_dim, x_dim, y_dim))
except ValueError:
print(f"Y.shape: {Y.shape}, x_dim: {x_dim}, y_dim: {y_dim}, z_dim: {z_dim}")
raise ValueError("The dimensions of the video are not compatible with the specified x,y,z dimensions. Please check the dimensions of the video and the x,y,z dimensions specified in experiment_info.py")
# transpose to (n_frames, x, y, z)
Y_reshaped2 = np.transpose(Y_reshaped, (0, 2, 3, 1))
return Y_reshaped2
def background_normalize(Y, background_frames, offset=1000):
# Y has shape (frames, x,y,z), first `background_frames` frames are background
Y = Y + offset
background = np.mean(Y[0:background_frames,:,:,:], axis=0) # get mean of each pixel for 1st 20 frames
Y_normalized = (Y - background)/(background+0.0000000000000000000001) # subtract and divide by background
return Y_normalized
def load_videos_into_list(vids, p, normalize=False):
x_dim, y_dim, z_dim = p['x_dim'], p['y_dim'], p['z_dim']
n_frames_to_analyze = p['n_frames_to_analyze']
background_frames = p['background_frames']
Y_list = []
for v in vids:
Y = cm.load(v)
Y = reshape(Y, x_dim, y_dim, z_dim)
assert Y.shape[0] >= n_frames_to_analyze, f"Number of frames in video is less than n_frames_to_analyze. Please specify a smaller number of frames to analyze."
Y = Y[:n_frames_to_analyze] # only analyze first n_frames
if normalize:
Y = background_normalize(Y, background_frames)
Y_list.append(Y)
return Y_list
def find_binary_mask(Y):
# temporal smoothing
Y_smoothed = np.mean(Y, axis=0)
# spatial smoothing, median filter across pixels
Y_smoothed = ski.filters.median(Y_smoothed, behavior='ndimage')
# thresholding
thresh = ski.filters.threshold_otsu(Y_smoothed)
binary_mask = Y_smoothed > thresh
return binary_mask
def extract_mean_activity_within_binary_mask(Y, binary_mask, p):
x_dim, y_dim, z_dim = p['x_dim'], p['y_dim'], p['z_dim']
# vectorize both the binary mask and the video
binary_volume_R = np.reshape(binary_mask, (x_dim*y_dim*z_dim))
Y_R = np.reshape(Y, (Y.shape[0], -1))
Y_R = np.array(Y_R)
# print(binary_volume_R.shape, Y_R.shape)
return np.mean(Y_R[:,binary_volume_R], axis=1)
def compute_max_responses(mean_activity_within_segment, puffs, n_frames_to_analyze):
maxs_by_samp = defaultdict(lambda : defaultdict(list))
argmaxs_by_samp = defaultdict(lambda : defaultdict(list))
for samp in mean_activity_within_segment:
maxs_per_odor = defaultdict(list)
argmaxs_per_odor = defaultdict(list) # this is to find the frame with maximum activity
# for each odor, extract max value from the corresponding interval
for i,puff in enumerate(puffs):
odor_name = puff.odor_name
# since a subset of videos were concatenates, use index i of odor_of_interest_indices to get the corresponding interval
interval = mean_activity_within_segment[samp][i*n_frames_to_analyze:(i+1)*n_frames_to_analyze]
maxs_per_odor[odor_name].append(np.max(interval))
argmaxs_per_odor[odor_name].append(np.argmax(interval))
for odor in maxs_per_odor:
assert len(maxs_per_odor[odor]) <= 2, f" for odor {odor} there were more than 2 trials. This is unexpected."
assert len(argmaxs_per_odor[odor]) <= 2, f" for odor {odor} there were more than 2 trials. This is unexpected."
maxs_by_samp[samp] = maxs_per_odor
argmaxs_by_samp[samp] = argmaxs_per_odor
return maxs_by_samp, argmaxs_by_samp
def get_odor_name(index, odor_list, odor_encodings):
# get the name of an odor at position `index` in odor_list
odor = odor_list[index]
return odor_encodings[odor]
def calculate_AUC(activity_traces, puffs, p, test=False):
n_frames_to_analyze = p['n_frames_to_analyze']
background_frames = p['background_frames']
aucs_by_samp = defaultdict(lambda : defaultdict(list))
auc_coords = defaultdict(list)
for samp in activity_traces:
aucs_per_odor = defaultdict(list)
# for each odor, extract max value from the corresponding interval
for i, puff in enumerate(puffs):
# get odor name
odor_name = puff.odor_name
# since a subset of videos were concatenates, use index i of odor_of_interest_indices to get the corresponding interval
interval = activity_traces[samp][i*n_frames_to_analyze:(i+1)*n_frames_to_analyze]
# baseline= 1.96*np.std(interval[0:background_frames])
baseline= np.median(interval[0:background_frames])
peak_coord = np.argmax(interval)
# get all indices of interval where activity is above baseline
above_baseline = np.where(interval > baseline)[0]
# find sets of indices in above_baseline that are consecutive, if not consecutive, then activity dropped below baseline
not_consecutive = np.where(np.diff(above_baseline) != 1)[0]
# an extra 1 is added to account for fact that np.diff returns array that is 1 element shorter than the input array, so add 1 to split at correct indices
split_indices = np.split(above_baseline, not_consecutive+1)
# find interal that contains the peak index
for s in split_indices:
if peak_coord in s:
start = s[0] # indices are for where activity is above baseline, so start 1 before
end = s[-1] + 1 # indices are for where activity is above baseline, so end 1 after, and add extra 1 to include the last index
peak_interval = interval[start:end]
break
# calculate area under the curve
auc = np.trapz(peak_interval, dx=1)
aucs_per_odor[odor_name].append(auc)
# for testing
if test and i == 4 and samp == list(activity_traces.keys())[0]:
plt.plot(interval)
plt.axhline(baseline, color='black', linestyle='--')
plt.axvline(peak_coord, color='red')
plt.axvline(start, color='green')
plt.axvline(end, color='green')
print(auc)
for odor in aucs_per_odor:
assert len(aucs_per_odor[odor]) <= 2, f" for odor {odor} there were more than 2 trials. This is unexpected."
aucs_by_samp[samp] = aucs_per_odor
return aucs_by_samp
def subtract_paraffin_trace(mean_activity_within_segment, puffs, n_frames_to_analyze):
mean_activity_within_segment_paraffin_subtracted = {}
num_odors = len(puffs)
# find indices of the odors delivered that correspond to paraffin, should be 2 for 2 trials
paraffin_indices = [p.number for p in puffs if p.odor_name == "paraffin"]
assert len(paraffin_indices) == 2, f"Expected to find 2 paraffin trials, but found {len(paraffin_indices)}"
assert paraffin_indices[0] < paraffin_indices[1], f"Expected paraffin trial 1 to come before paraffin trial 2, but found {paraffin_indices}"
for samp in mean_activity_within_segment:
new_traces = []
# collect paraffin traces for this sample, for both trials; in paraffin_traces dict, keys are trials
paraffin_traces = {}
for i, index in enumerate(paraffin_indices):
interval = mean_activity_within_segment[samp][index*n_frames_to_analyze:(index+1)*n_frames_to_analyze]
paraffin_traces[i] = interval
# subtract paraffin traces from each odor trace
for puff in puffs:
i = puff.number
interval = mean_activity_within_segment[samp][i*n_frames_to_analyze:(i+1)*n_frames_to_analyze]
# if this odor was delivered in the first half, subtract paraffin trace from first trial
if i <= (num_odors/2) - 1:
interval_subtracted = interval - paraffin_traces[0]
else:
interval_subtracted = interval - paraffin_traces[1]
new_traces.extend(interval_subtracted)
mean_activity_within_segment_paraffin_subtracted[samp] = np.array(new_traces)
return mean_activity_within_segment_paraffin_subtracted
def convert_to_df(dict, puffs):
df_list = []
for samp in dict:
df_tmp = pd.DataFrame.from_dict(dict[samp])
df_tmp['samp'] = samp
df_tmp['subpop'] = samp.split('_')[1]
df_tmp['trial'] = df_tmp.index+1
df_list.append(df_tmp)
df = pd.concat(df_list)
df = df.reset_index(drop=True)
df = pd.melt(df, id_vars=['samp', 'subpop', 'trial'], var_name='odor', value_name='value')
odor_order = {}
for puff in puffs:
if puff.trial == 1:
odor_order[puff.odor_name] = puff.number
df['odor_order'] = df['odor'].map(odor_order)
return df