-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.py
87 lines (61 loc) · 2.24 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import config
from ext import pickle_save, pickle_load
from glob import glob
from random import shuffle
from librosa.core import load
from torch import Tensor
from torch import cos, arange
from torch import no_grad
from numpy import pi
# from scipy.signal.windows import hann, hanning
from scipy.io.wavfile import write
##
def hann():
return 0.5 * (1 - cos(2*pi * arange(0,config.frame_len,1)/config.frame_len)) .view(1,-1)
def ihann():
h = 1/hann()
h[0] = 0
return h
##
def save_data():
pickle_save([load(file,config.sample_rate)[0] for file in glob(config.data_path+'/*.wav')], config.data_path+'.pk')
def load_data(frames=False):
data = [Tensor(sequence[:config.frame_len+(len(sequence)-config.frame_len)//config.frame_stride*config.frame_stride])
for sequence in pickle_load(config.data_path+'.pk')]
if not frames:
if config.use_gpu:
data = [sequence.cuda() for sequence in data]
return [e.view(1,1,-1) for e in data]
else:
data = [d.view(1,-1,1) for d in data]
#hann_w = hann()
frames = [[sequence[:,i*config.frame_stride:i*config.frame_stride+config.frame_len,:] #*hann_w
for i in range((sequence.size(1)-config.frame_len)//config.frame_stride+1)]
for sequence in data]
if config.use_gpu:
frames = [[frame.cuda() for frame in seq] for seq in frames]
return frames
def split_data(data, dev_ratio=None, do_shuffle=False):
if not dev_ratio: dev_ratio = config.dev_ratio
if do_shuffle: shuffle(data)
if dev_ratio:
hm_train = int(len(data)*(1-dev_ratio))
data_dev = data[hm_train:]
data = data[:hm_train]
return data, data_dev
else:
return data, []
def batchify_data(data, batch_size=None, do_shuffle=True):
if not batch_size: batch_size = config.batch_size
if do_shuffle: shuffle(data)
hm_batches = int(len(data)/batch_size)
return [data[i*batch_size:(i+1)*batch_size] for i in range(hm_batches)] \
if hm_batches else [data]
def file_output(file, sequence):
sequence.resize(sequence.shape[2])
write(f'{file}.wav', config.sample_rate, sequence)
##
def main():
save_data()
if __name__ == '__main__':
main()