-
Notifications
You must be signed in to change notification settings - Fork 2
/
process.py
137 lines (121 loc) · 5.24 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import numpy as np
import cv2
from skimage.util import img_as_float
import tensorflow as tf
import matplotlib.pyplot as plt
import time
import scipy.io
from scipy.sparse import spdiags
import random
def resize_image(img,h_new,w_old,h_old):
"I believe reszing image before face detection will speed up"
r = h_new / float(h_old)
dim = (int(w_old * r), h_new)
resized = cv2.resize(img, dim, interpolation = cv2.INTER_AREA)
return resized
def non_skin_remove(patches):
patches_hsv=cv2.cvtColor(patches,cv2.COLOR_BGR2HSV)
lower = np.array([0, 48, 80], dtype = "uint8")
upper = np.array([20, 255, 255], dtype = "uint8")
skinMask = cv2.inRange(patches_hsv, lower, upper)
if skinMask.sum()>5:
return patches
def preprocess_raw_video(videoFilePath, dim=36):
#########################################################################
# set up
t = []
i = 0
print(videoFilePath)
vidObj = cv2.VideoCapture(videoFilePath);
fps = vidObj.get(cv2.CAP_PROP_FPS) #calculate fps
print('fps', fps)
totalFrames = int(vidObj.get(cv2.CAP_PROP_FRAME_COUNT)) # get total frame size
Xsub = np.zeros((totalFrames, dim, dim, 3), dtype = np.float32)
height = vidObj.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = vidObj.get(cv2.CAP_PROP_FRAME_WIDTH)
success, img = vidObj.read()
dims = img.shape
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
#########################################################################
# Crop each frame size into dim x dim
while success:
t.append(vidObj.get(cv2.CAP_PROP_POS_MSEC))# current timestamp in milisecond
img=resize_image(img,300,width,height)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
for (x,y,w,h) in faces:
roi = img[y:y+h, x:x+w]
vidLxL = cv2.resize(roi,(dim,dim))
#vidLxL = cv2.resize(img_as_float(img[:, int(width/2)-int(height/2 + 1):int(height/2)+int(width/2), :]), (dim, dim), interpolation = cv2.INTER_AREA)
vidLxL = cv2.rotate(vidLxL, cv2.ROTATE_90_CLOCKWISE) # rotate 90 degree
# vidLxL= non_skin_remove(vidLxL)
vidLxL = cv2.cvtColor(vidLxL, cv2.COLOR_BGR2RGB)
Xsub[i, :, :, :] = vidLxL/255.0
success, img = vidObj.read() # read the next one
i = i + 1
n=random.randint(0,len(Xsub))
sample_img=Xsub[n]
# plt.imshow(Xsub[n])
# plt.title('Sample Preprocessed Frame')
# plt.show()
#########################################################################
# Normalized Frames in the motion branch
normalized_len = len(t) - 1
print('length',len(t),len(Xsub))
dXsub = np.zeros((normalized_len, dim, dim, 3), dtype = np.float32)
for j in range(normalized_len - 1):
dXsub[j, :, :, :] = (Xsub[j+1, :, :, :] - Xsub[j, :, :, :]) / (Xsub[j+1, :, :, :] + Xsub[j, :, :, :])
dXsub = dXsub / np.std(dXsub)
#########################################################################
# Normalize raw frames in the apperance branch
Xsub = Xsub - np.mean(Xsub)
Xsub = Xsub / np.std(Xsub)
Xsub = Xsub[:totalFrames-1, :, :, :]
#########################################################################
# Plot an example of data after preprocess
dXsub = np.concatenate((dXsub, Xsub), axis = 3);
return dXsub,fps,sample_img
def detrend(signal, Lambda):
"""detrend(signal, Lambda) -> filtered_signal
This function applies a detrending filter.
This code is based on the following article "An advanced detrending method with application
to HRV analysis". Tarvainen et al., IEEE Trans on Biomedical Engineering, 2002.
*Parameters*
``signal`` (1d numpy array):
The signal where you want to remove the trend.
``Lambda`` (int):
The smoothing parameter.
*Returns*
``filtered_signal`` (1d numpy array):
The detrended signal.
"""
signal_length = signal.shape[0]
# observation matrix
H = np.identity(signal_length)
# second-order difference matrix
ones = np.ones(signal_length)
minus_twos = -2 * np.ones(signal_length)
diags_data = np.array([ones, minus_twos, ones])
diags_index = np.array([0, 1, 2])
D = spdiags(diags_data, diags_index, (signal_length - 2), signal_length).toarray()
filtered_signal = np.dot((H - np.linalg.inv(H + (Lambda ** 2) * np.dot(D.T, D))), signal)
return filtered_signal
def remove_outliers(x):
Q1 = np.quantile(x,0.25)
Q3 = np.quantile(x,0.75)
IQR = Q3 - Q1
y=np.where((x < (Q1-1.5*IQR)) | (x > (Q3+1.5*IQR)), np.median(x), x)
return y
from scipy import signal
def fourier_analysis(array,fps):
def remove_outliers(arr, thr):
return next(f[0] for f in enumerate(arr) if f[1] > thr)
MinFreq = 50 # bpm
MaxFreq = 120 # bpm
freqs, psd = signal.periodogram(array, fs=fps, window=None, \
detrend='constant', return_onesided=True, \
scaling='density')
min_idx = remove_outliers(freqs, MinFreq/60.0) - 1
max_idx = remove_outliers(freqs, MaxFreq/60.0) + 1
hr_estimated = freqs[ min_idx + np.argmax(psd[min_idx : max_idx]) ]
return hr_estimated