forked from PhilippeNguyen/keras_wavenet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_wavenet.py
139 lines (121 loc) · 5.72 KB
/
run_wavenet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import argparse
import keras
import keras.backend as K
from scipy.io.wavfile import read
from keras_wavenet.utils.wavenet_utils import (simple_load_wavfile,
sample_to_categorical,
categorical_to_sample)
from keras_wavenet.layers.wavenet import custom_objs
from keras_wavenet.utils.queued_wavenet_utils import load_model,batch_model_reset_states
from keras_wavenet.utils.wavenet_utils import inv_mu_law_numpy
from keras_wavenet.utils.audio_generator_utils import WavGenerator
from keras_wavenet.models.audio_outputs import get_output_processor
import pickle
import numpy as np
import sys
import librosa
import os
import json
from functools import partial
fs = os.path.sep
'''Converts and tests the wavenet model using the queued method as described
here : https://arxiv.org/pdf/1611.09482.pdf
It is much more efficient over the naive method.
'''
def synthesize(model,encoding,num_timesteps,
init_state=None,verbose=False,
preprocess_func_str="lambda x : x/128.",
output_processor='sparse_categorical',
output_processor_kwargs=None):
batch_model_reset_states(model)
num_ch = 1
num_batch,encoding_len,_ = encoding.shape
full_audio = np.zeros((num_batch,num_timesteps,num_ch),dtype=np.float32)
preprocess_func = eval(preprocess_func_str)
output_channels = model.output_shape[-1]
output_proc = get_output_processor(output_processor,output_channels,
output_processor_kwargs)
if output_processor.endswith('tfp'):
sampler = partial(output_proc.sample,model=model)
else:
sampler = output_proc.sample
encoding_hop = num_timesteps // encoding_len
if init_state is None:
y = np.zeros((num_batch,1,num_ch),dtype=np.float32)
else:
y = init_state
for idx in range(num_timesteps):
if idx % 100 == 0 and verbose:
sys.stdout.write("\r "+str(idx)+" / " +str(num_timesteps))
sys.stdout.flush()
enc_idx = idx // encoding_hop
enc = np.expand_dims(encoding[:,enc_idx,:],axis=1)
model_out = model.predict([enc,y])
new_sample = sampler(model_out)
y = preprocess_func(new_sample)
full_audio[:,idx,:] = new_sample[:,0,:]
return full_audio
parser = argparse.ArgumentParser()
parser.add_argument('--model', dest='model',
action='store', required=True,
help='path to the model hdf5')
parser.add_argument('--folder', dest='folder',
action='store',default=None,
help='path to the wavfile to encode')
parser.add_argument('--output_folder', dest='output_folder',
action='store',default=None,
help='path write output samples')
parser.add_argument('--config_json', dest='config_json',
action='store',default=None,
help='path to the config json')
parser.add_argument('--num_timesteps', dest='num_timesteps',
action='store',default=None,type=int,
help='number of timesteps to generate,must be a multiple of the encoding_len')
args = parser.parse_args()
output_folder = args.output_folder if args.output_folder.endswith(fs) else args.output_folder+fs
os.makedirs(output_folder,exist_ok=True)
config_json = json.load(open(args.config_json,'r'))
sample_rate = config_json['generator_dict']['load_kwargs']['sample_rate']
if args.num_timesteps is None:
num_timesteps = config_json['generator_dict']['expected_len']
else:
num_timesteps = args.num_timesteps
config_json['generator_dict']['expected_len'] = num_timesteps
config_json['generator_dict']['target_size'] = [num_timesteps,1]
encoding_size = config_json['model_dict']['latent_size']
preprocess_func_str = config_json['model_dict']['preprocess_func_str']
used_mu_law = config_json['generator_dict']['mu_law']
output_processor = config_json['model_dict']['output_processor']
output_processor_kwargs = config_json['model_dict']['output_processor_kwargs']
batch_size = 32
generator = WavGenerator(**config_json['generator_dict'])
generator.random_transforms = False
train_gen = generator.flow_from_directory(args.folder,
shuffle=True,
follow_links=True,
batch_size=batch_size,
)
test_x,test_y,filenames = train_gen.next(return_filenames=True)
#need to handle stochastic encoding
encoder = load_model(args.model,queued=False,
new_inputs=['input_1'],
batch_input_shapes=[(None,num_timesteps,1)],
new_outputs=['z_mean'],
custom_objects=custom_objs)
encoding_1 = encoder.predict(test_x)
model = load_model(args.model,queued=True,
new_inputs=['decoder_input','temporal_shift'],
batch_input_shapes=[(None,1,encoding_size),(None,1,1)],
custom_objects=custom_objs,batch_size=batch_size)
print('synthesizing')
init_val = np.zeros((batch_size,1,1),dtype=np.float32)
init_val = generator.preprocess_pipeline(init_val)
full_audio = synthesize(model,encoding_1,num_timesteps,verbose=True,
preprocess_func_str=preprocess_func_str,
output_processor=output_processor,
init_state=init_val,
output_processor_kwargs=output_processor_kwargs)
if used_mu_law:
full_audio = inv_mu_law_numpy(full_audio)
for idx,waveform in enumerate(full_audio):
librosa.output.write_wav(output_folder+'test_'+str(idx)+'.wav',waveform,sr=sample_rate)