-
Notifications
You must be signed in to change notification settings - Fork 0
/
decode
executable file
·470 lines (390 loc) · 18.1 KB
/
decode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
#!/usr/bin/python3
from scipy.io import wavfile
import numpy as np
from signal_processing_utilities import process_signal
import argparse
import bitstring
def convert_bytes_to_bit_string(data_to_decode, end_zero_padding):
"""This function converts bytes into a string of bits.
Args:
data_to_decode (str): This is the string of bytes that will be
converted into bits.
end_zero_padding (int): This is the number of zeroes that were
padded into the final byte. They will be
removed so the resultant bit string can be
properly decompressed.
Returns:
bit_string (str): This is the string of bits that will need to
be parsed into hexadecimal pairs that
represent the bytes of the decompressed wave
file.
"""
bit_string = ""
for byteIndex in range(0, len(data_to_decode)):
bit_string_byte = format(data_to_decode[byteIndex], "b")
bit_string_byte = "0" * (8 - len(bit_string_byte)) + bit_string_byte
bit_string += bit_string_byte
bit_string = bit_string[: len(bit_string) - end_zero_padding]
return bit_string
def find_key_by_value_in_node_mapping_dictionary(
val_str_to_find, node_mapping_dictionary
):
"""This function allows the searching of the node_mapping_dictionary
for a key given a value.
Args:
val_str_to_find (str): This is expected to be a string of bits.
node_mapping_dictionary (dict): This is the dictionary that maps
hexadecimal values as keys in the
dictionary to uniquely identifying
strings of bits as values.
Returns:
key_mapped_to_value (list): This function returns the
hexadecimal pair found as a key in
the node_mapping_dictionary given a
string of bit values. If the given
string of bits is not found in the
dictionary, the return value is
'None'.
"""
try:
key_mapped_to_value = list(node_mapping_dictionary.keys())[
list(node_mapping_dictionary.values()).index(val_str_to_find)
]
except ValueError:
return None
return key_mapped_to_value
def huffman_decoding(
huffman_encoded_data: str, unique_amplitude_l_exists: bool = False
):
"""This is the algorithm that decodes a huffman encoded string of bytes.
Args:
huffman_encoded_data (str): This is the string of bytes to be
decoded.
unique_amplitude_l_exists (bool): This is an indicator variable
which indicates if the
encoded data contains a
nested unique_amplitudes_l.
If unique_amplitudes_l is
set to 'True', then
unique_amplitudes_l exists
as bytes in the data at the
penultimate index location.
Otherwise, the data was
compressed using either
huffman encoding exclusively
or using huffman encoding
on the neural spike detected
data. Defaults to False.
Returns:
decoded_wav_bytes (bytes): This is the byte string that has been
decoded by the huffman decoding
algorithm.
unique_amplitudes_l (list): This is the list of unique amplitudes in the original data.
"""
# Capturing the indices of the huffman_encoded_data
""" The last two bytes of the huffman_encoded_data are the length of
the indices within the huffman_encoded_data.
"""
indices_length = int.from_bytes(huffman_encoded_data[-2:])
reconstructed_indices_bytes = huffman_encoded_data[-(indices_length) - 2 : -2]
reconstructed_indices = [
int.from_bytes(reconstructed_indices_bytes[index : index + 4])
for index in range(0, len(reconstructed_indices_bytes), 4)
]
# Capturing the End Zero Padding:
end_zero_padding = reconstructed_indices[-1]
# Capture unique_amplitudes_l
if unique_amplitude_l_exists:
unique_amplitude_l_bytes = huffman_encoded_data[
reconstructed_indices[3] : reconstructed_indices[4]
]
unique_amplitude_l = np.frombuffer(unique_amplitude_l_bytes, dtype=np.int16)
else:
unique_amplitude_l = None
# Node Mapping Dictionary Keys:
reconstructed_node_mapping_dictionary_keys_byte_string = huffman_encoded_data[
0 : reconstructed_indices[0]
]
reconstructed_node_mapping_dictionary_keys_string = str(
reconstructed_node_mapping_dictionary_keys_byte_string, encoding="utf-8"
)
reconstructed_node_mapping_dictionary_keys_l = [
reconstructed_node_mapping_dictionary_keys_string[index : index + 2]
for index in range(0, len(reconstructed_node_mapping_dictionary_keys_string), 2)
]
# Node Mapping Dictionary Values Expansion:
# Node Mapping Dictionary Values (rle_compressed):
node_mapping_dict_values_byte_string_bits_to_bytes = huffman_encoded_data[
reconstructed_indices[0] : reconstructed_indices[1]
]
# Node Mapping Dictionary Values Indices:
node_mapping_dict_values_indices_length_compressed_byte_string = (
huffman_encoded_data[reconstructed_indices[1] : reconstructed_indices[2]]
)
reconstructed_node_mapping_dict_values_indices_length_l = process_signal.decode_rle(
node_mapping_dict_values_indices_length_compressed_byte_string
)
# Capturing the bit_string:
bit_string_bytes = huffman_encoded_data[
reconstructed_indices[2] : reconstructed_indices[3]
]
bit_string = convert_bytes_to_bit_string(bit_string_bytes, end_zero_padding)
# Reconstructing the node_value_dictionary_values_list:
node_mapping_dict_values_byte_string_byte_padding_length = (
node_mapping_dict_values_byte_string_bits_to_bytes[-1]
)
node_mapping_dict_values_byte_string_bits_to_bytes = (
node_mapping_dict_values_byte_string_bits_to_bytes[:-1]
)
reconstructed_node_mapping_dict_values_l = [
bitstring.BitArray(
node_mapping_dict_values_byte_string_bits_to_bytes[
byte_index : byte_index + 1
]
).bin
for byte_index in range(
0, len(node_mapping_dict_values_byte_string_bits_to_bytes)
)
]
reconstructed_node_mapping_dict_values = "".join(
reconstructed_node_mapping_dict_values_l
)
reconstructed_node_mapping_dict_values = reconstructed_node_mapping_dict_values[
:-(node_mapping_dict_values_byte_string_byte_padding_length)
]
node_mapping_dict_values_reconstructed_l = []
start_index = 0
for index in range(0, len(reconstructed_node_mapping_dict_values_indices_length_l)):
stop_index = reconstructed_node_mapping_dict_values_indices_length_l[index]
node_mapping_dict_values_reconstructed_l.append(
reconstructed_node_mapping_dict_values[
start_index : start_index + stop_index
]
)
start_index += stop_index
# Parsing the Node Mapping Dictionary
reconstructed_node_mapping_dictionary = {}
for index in range(0, len(reconstructed_node_mapping_dictionary_keys_l)):
# Key
key_str = reconstructed_node_mapping_dictionary_keys_l[index]
# Value
value_str = node_mapping_dict_values_reconstructed_l[index]
reconstructed_node_mapping_dictionary[key_str] = value_str
reconstructed_node_mapping_dictionary_sorted = dict(
sorted(
reconstructed_node_mapping_dictionary.items(),
key=lambda items: len(items[1]),
)
)
# Parse the string of bits into hexadecimal values.
hex_value_array = []
bitLength = 0
while len(bit_string) > 0:
key = find_key_by_value_in_node_mapping_dictionary(
bit_string[:bitLength], reconstructed_node_mapping_dictionary_sorted
)
if key is not None:
hex_value_array.append(key)
bit_string = bit_string[bitLength:]
bitLength = 0
else:
bitLength += 1
hex_wav_str = ""
hex_wav_str = hex_wav_str.join(hex_value_array)
decoded_wav_bytes = bytes.fromhex(hex_wav_str)
return decoded_wav_bytes, unique_amplitude_l
def read_encoded_file(compressed_file_path: str):
"""The main driving method that will decode a huffman encoded file.
Args:
compressed_file_path (str, optional): The path of the compressed
file to decompress.
"""
# Retrieve the encoded file for decoding and parse the file.
with open(compressed_file_path, "rb+") as file:
huffman_encoded_data = file.read()
return huffman_encoded_data
def process_huffman_encoded_file(huffman_encoded_data):
"""This is the driver function that processes a huffman encoded file
format.
Args:
huffman_encoded_data (bytes): This is the string of bytes to be
decoded.
Returns:
rate (int): This is the rate at which the data was sampled. This
value is known in advance to be 19531.
data (list): This is a numpy array of a list of integer values.
"""
decoded_wav_bytes, _ = huffman_decoding(huffman_encoded_data)
# The sample rate of the data is known in advance.
rate = 19531
data = np.frombuffer(decoded_wav_bytes, dtype=np.int16)
return rate, data
def process_spike_detection_huffman_encoded_data(huffman_encoded_data):
"""This is the driver function that processes a huffman encoded file
format that has been encoded in such a way as to only detect neural
spikes.
Args:
huffman_encoded_data (bytes): This is the string of bytes to be
decoded. This data has also been encoded
to include a format that contains the
deconstructed representation of the
original amplitudes.
Returns:
rate (int): This is the rate at which the data was sampled.
data (list): This is a numpy array of a list of integer values.
"""
decoded_wav_bytes, _ = huffman_decoding(huffman_encoded_data)
encoded_data = process_signal.convert_byte_string_to_encoded_data(decoded_wav_bytes)
rate, data = process_signal.decode_data(encoded_data)
return rate, data
def process_huffman_encoded_amplitude_indices(
huffman_encoded_data, method_of_compression
):
"""This function accepts huffman encoded data which contains a list
of unique amplitudes. The data is decoded, the bytes are
returned from huffman decoding, then the data is converted into
integer amplitudes before being returned with the sample rate.
Args:
huffman_encoded_data (bytes): This is the compressed
representation of the huffman
encoded data.
method_of_compression (str): This character will indicate the
data type of the indices. If the
method of compression is equal
to 'w', then the indices are
usigned 16-bit integers.
Otherwise, the method of
compression is equal to 'u' and
indicates the data is unsigned
8-bit integers.
Returns:
rate (int): This is the rate at which the data was sampled. This
value is known in advance to be 19531.
data (list): This is a numpy array of a list of integer values.
"""
decoded_wav_bytes, unique_amplitudes_l = huffman_decoding(
huffman_encoded_data, unique_amplitude_l_exists=True
)
if method_of_compression == "u":
dtype = np.uint8
else:
dtype = np.uint16
indices = np.frombuffer(decoded_wav_bytes, dtype=dtype)
data = unique_amplitudes_l[indices]
rate = 19531 # This value is known in advance.
return rate, data
def decompress(byte_string: bytes):
"""This function accepts a compressed byte string compressed
using "brainwire" compression from the encode module. It then
decompresses this data into the original array of amplitudes except
only detected neural spike information is present. There are
zero-valued amplitudes at all other locations of the original
waveform. The decompressed representation returned as the
sample_rate and corresponding amplitude_array.
Args:
file (str): This is the string of the compressed file path. The
expected encoding file type is ".brainwire"
"""
method_of_compression, byte_string = extract_method_of_compression(byte_string)
if method_of_compression == "h":
# Data is huffman encoded exclusively.
rate, data = process_huffman_encoded_file(byte_string)
elif method_of_compression == "u" or method_of_compression == "w":
# Data is contains a dictionary of unique amplitudes
# and is huffman encoded.
rate, data = process_huffman_encoded_amplitude_indices(
byte_string, method_of_compression
)
elif method_of_compression == "n":
# Data is compressed using neural spike detection.
rate, data = process_spike_detection_huffman_encoded_data(byte_string)
else:
raise ValueError("Method of compression is not 'h', 'u', 'w' or 'n'.")
return rate, data
def extract_method_of_compression(huffman_encoded_data):
"""This function will extract the variable 'method_of_compression'
from the data. This will allow for the data to be decoded
intelligently.
Args:
huffman_encoded_data (bytes): This is a string of bytes that
represent data that was encoded using a huffman encoding
technique. The last byte of the huffman_encoded_data is
the method of compression. This variable is removed from
the huffman_encoded_data so that the huffman_encoded_data
is of the proper format to be parsed.
Returns:
method_of_compression (str): This is the variable that is used
to determine how the
huffman_encoded data was
encoded.
huffman_encoded_data (bytes): This is the string of bytes to be
decoded. The
huffman_encoded_data has had
the last byte removed from the
larger string of bytes. This
data is prepared to be parsed
via the huffman_decoding
function.
"""
method_of_compression = huffman_encoded_data[-1:].decode(encoding="utf-8")
huffman_encoded_data = huffman_encoded_data[:-1]
return method_of_compression, huffman_encoded_data
def initialize_argument_parser():
"""This function will initialize the argument parser with command
line arguments.
Returns:
This function will return the parser to parse arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"compressed_file_path",
help=(
"This is the compressed output file path. It is presumed"
+ " to end this new file name with a '.brainwire' "
+ "file extension. A sample file name is "
+ "'compressed_file.wav.brainwire.",
),
)
parser.add_argument(
"decompressed_file_path",
help=(
"This is the absolute file path to the reconstructed raw "
+ "neural data. This is used to name the output file "
+ "along with the extension. A sample file extension is "
+ "'reconstructed_neural_data.wav.brainwire.copy'.",
),
)
return parser
def main(args):
"""This is the main driver logic of the decode function."""
huffman_encoded_data = read_encoded_file(
compressed_file_path=args.compressed_file_path
)
method_of_compression, huffman_encoded_data = extract_method_of_compression(
huffman_encoded_data
)
if method_of_compression == "h":
# The data is huffman encoded exclusively.
rate, data = process_huffman_encoded_file(huffman_encoded_data)
elif method_of_compression == "u" or method_of_compression == "w":
# The data contains a dictionary of unique amplitudes
# and is huffman encoded.
rate, data = process_huffman_encoded_amplitude_indices(
huffman_encoded_data, method_of_compression
)
elif method_of_compression == "n":
# The data is compressed using neural spike detection.
rate, data = process_spike_detection_huffman_encoded_data(huffman_encoded_data)
else:
raise ValueError("Method of compression is not 'h', 'u', or 'n'.")
wavfile.write(
filename=args.decompressed_file_path,
rate=rate,
data=data,
)
if __name__ == "__main__":
parser = initialize_argument_parser()
args = parser.parse_args()
print("compressed_file_path: {}".format(args.compressed_file_path))
print("decompressed_file_path: {}".format(args.decompressed_file_path))
main(args)