|
1 | | -import pytest, os, tempfile |
| 1 | +import pytest, mock, os, time, random, threading, tempfile |
| 2 | +from tests.mocks import * |
2 | 3 | from auroraapi.audio import * |
| 4 | +from auroraapi.audio import _is_silent, _pyaudio_record |
3 | 5 |
|
4 | 6 | class TestAudioFile(object): |
5 | | - def test_create(self): |
| 7 | + def setup(self): |
6 | 8 | with open("tests/assets/hw.wav", "rb") as f: |
7 | | - d = f.read() |
8 | | - a = AudioFile(d) |
9 | | - assert isinstance(a, AudioFile) |
10 | | - assert len(a.audio.raw_data) + 44 == len(d) |
| 9 | + self.wav_data = f.read() |
| 10 | + |
| 11 | + def test_create(self): |
| 12 | + a = AudioFile(self.wav_data) |
| 13 | + assert isinstance(a, AudioFile) |
| 14 | + assert len(a.audio.raw_data) + 44 == len(self.wav_data) |
11 | 15 |
|
12 | 16 | def test_write_to_file(self): |
13 | 17 | path = os.path.join(tempfile.gettempdir(), "out.wav") |
14 | | - with open("tests/assets/hw.wav", "rb") as f: |
15 | | - d1 = f.read() |
16 | | - a1 = AudioFile(d1) |
17 | | - a1.write_to_file(path) |
| 18 | + a1 = AudioFile(self.wav_data) |
| 19 | + a1.write_to_file(path) |
18 | 20 |
|
19 | | - with open(path, "rb") as f: |
20 | | - d2 = f.read() |
21 | | - a2 = AudioFile(d2) |
22 | | - assert d2 == d1 |
23 | | - assert a1.audio == a2.audio |
| 21 | + with open(path, "rb") as f: |
| 22 | + d2 = f.read() |
| 23 | + a2 = AudioFile(d2) |
| 24 | + assert d2 == self.wav_data |
| 25 | + assert a1.audio == a2.audio |
24 | 26 |
|
25 | 27 | os.remove(path) |
26 | 28 |
|
27 | 29 | def test_get_wav(self): |
28 | | - with open("tests/assets/hw.wav", "rb") as f: |
29 | | - d = f.read() |
30 | | - a = AudioFile(d) |
31 | | - w = a.get_wav() |
32 | | - assert d == w |
| 30 | + a = AudioFile(self.wav_data) |
| 31 | + wav = a.get_wav() |
| 32 | + assert self.wav_data == wav |
33 | 33 |
|
34 | 34 | def test_pad(self): |
35 | | - with open("tests/assets/hw.wav", "rb") as f: |
36 | | - d = f.read() |
37 | | - a = AudioFile(d) |
38 | | - # adds 1s of padding to each side (2 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
39 | | - pa = a.pad(1) |
40 | | - w = pa.get_wav() |
41 | | - assert len(w) == len(d) + 64000 |
42 | | - assert w[-1] == 0 |
43 | | - assert w[44] == 0 |
44 | | - assert w[44 + 32000] == d[44] |
| 35 | + a = AudioFile(self.wav_data) |
| 36 | + # adds 1s of padding to each side (2 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
| 37 | + pa = a.pad(1) |
| 38 | + wav = pa.get_wav() |
| 39 | + assert len(wav) == len(self.wav_data) + 64000 |
| 40 | + assert wav[-1] == 0 |
| 41 | + assert wav[44] == 0 |
| 42 | + assert wav[44 + 32000] == self.wav_data[44] |
45 | 43 |
|
46 | 44 | def test_pad_left(self): |
47 | | - with open("tests/assets/hw.wav", "rb") as f: |
48 | | - d = f.read() |
49 | | - a = AudioFile(d) |
50 | | - # adds 1s of padding to left side (1 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
51 | | - pa = a.pad_left(1) |
52 | | - w = pa.get_wav() |
53 | | - assert len(w) == len(d) + 32000 |
54 | | - assert w[-1] == d[-1] |
55 | | - assert w[44] == 0 |
56 | | - assert w[44 + 32000] == d[44] |
| 45 | + a = AudioFile(self.wav_data) |
| 46 | + # adds 1s of padding to left side (1 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
| 47 | + pa = a.pad_left(1) |
| 48 | + wav = pa.get_wav() |
| 49 | + assert len(wav) == len(self.wav_data) + 32000 |
| 50 | + assert wav[-1] == self.wav_data[-1] |
| 51 | + assert wav[44] == 0 |
| 52 | + assert wav[44 + 32000] == self.wav_data[44] |
57 | 53 |
|
58 | 54 | def test_pad_right(self): |
59 | | - with open("tests/assets/hw.wav", "rb") as f: |
60 | | - d = f.read() |
61 | | - a = AudioFile(d) |
62 | | - # adds 1s of padding to right side (1 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
63 | | - pa = a.pad_right(1) |
64 | | - w = pa.get_wav() |
65 | | - assert len(w) == len(d) + 32000 |
66 | | - assert w[-1] == 0 |
67 | | - assert w[44] == d[44] |
68 | | - assert w[-32000] == 0 |
| 55 | + a = AudioFile(self.wav_data) |
| 56 | + # adds 1s of padding to right side (1 x (16000 Hz * 16 bits/sample * 1 byte/8bits)) |
| 57 | + pa = a.pad_right(1) |
| 58 | + wav = pa.get_wav() |
| 59 | + assert len(wav) == len(self.wav_data) + 32000 |
| 60 | + assert wav[-1] == 0 |
| 61 | + assert wav[44] == self.wav_data[44] |
| 62 | + assert wav[-32000] == 0 |
69 | 63 |
|
70 | 64 | def test_trim_silent(self): |
| 65 | + # replace all data with zeros: |
| 66 | + # d = [x for x in self.wav_data[44:]] |
| 67 | + d = self.wav_data[0:44] + bytes(r'\0' * len(self.wav_data[44:]), 'utf8') |
| 68 | + a = AudioFile(d) |
| 69 | + t = a.trim_silent() |
| 70 | + # TODO: actually add a test here. for now, it just checks for compilation |
| 71 | + # also, for some reason, trim_silent doesn't work, so figure that out |
| 72 | + |
| 73 | + def test_play(self): |
| 74 | + with mock.patch('pyaudio.PyAudio', new=MockPyAudio): |
| 75 | + a = AudioFile(self.wav_data) |
| 76 | + a.play() |
| 77 | + |
| 78 | + # for some reason, a.play() plays a couple of extra bytes, so we can't do |
| 79 | + # an exact equality check here |
| 80 | + assert len(MockStream.data) >= len(self.wav_data[44:]) |
| 81 | + assert (len(MockStream.data) - len(self.wav_data[44:]))/len(MockStream.data) < 0.001 |
| 82 | + MockStream.reset_data() |
| 83 | + |
| 84 | + def test_play_stop(self): |
| 85 | + def stop_audio(timeout, audio): |
| 86 | + time.sleep(timeout) |
| 87 | + audio.stop() |
| 88 | + |
| 89 | + def play_audio(audio): |
| 90 | + audio.play() |
| 91 | + |
| 92 | + with mock.patch('pyaudio.PyAudio', new=MockPyAudio): |
| 93 | + a = AudioFile(self.wav_data) |
| 94 | + |
| 95 | + t1 = threading.Thread(target=play_audio, args=(a,)) |
| 96 | + t2 = threading.Thread(target=stop_audio, args=(0.1, a)) |
| 97 | + |
| 98 | + t1.start() |
| 99 | + t2.start() |
| 100 | + t1.join() |
| 101 | + t2.join() |
| 102 | + |
| 103 | + # we stopped playback after 0.1 seconds, so expect the stream audio len |
| 104 | + # to be much less than the input audio len (TODO: make this more precise) |
| 105 | + assert len(MockStream.data) < len(self.wav_data[44:]) |
| 106 | + assert (len(MockStream.data) - len(self.wav_data[44:]))/len(self.wav_data[44:]) < 0.5 |
| 107 | + MockStream.reset_data() |
| 108 | + |
| 109 | +class TestAudio(object): |
| 110 | + def setup(self): |
71 | 111 | with open("tests/assets/hw.wav", "rb") as f: |
72 | | - d = f.read() |
73 | | - a = AudioFile(d) |
74 | | - t = a.trim_silent() |
75 | | - # TODO: actually add a test here. for now, it just checks for compilation |
| 112 | + self.wav_data = f.read() |
76 | 113 |
|
77 | | - # def test_play(self): |
| 114 | + def test_record(self): |
| 115 | + with mock.patch('auroraapi.audio._pyaudio_record', new=mock_pyaudio_record): |
| 116 | + a = record() |
| 117 | + assert a.get_wav() == self.wav_data |
78 | 118 |
|
| 119 | + def test_stream(self): |
| 120 | + with mock.patch('auroraapi.audio._pyaudio_record', new=mock_pyaudio_record): |
| 121 | + first = True |
| 122 | + header = [] |
| 123 | + data = [] |
| 124 | + count = 0 |
| 125 | + for chunk in stream(): |
| 126 | + if first: |
| 127 | + header = chunk |
| 128 | + first = False |
| 129 | + data.extend(chunk) |
| 130 | + |
| 131 | + assert len(header) == 44 |
| 132 | + assert len(data) == len(self.wav_data) |
| 133 | + |
| 134 | + def test__is_silent_empty(self): |
| 135 | + assert _is_silent([]) |
| 136 | + |
| 137 | + def test__is_silent_quiet(self): |
| 138 | + assert _is_silent([random.randint(0, SILENT_THRESH - 1) for i in range(1024)]) |
| 139 | + |
| 140 | + def test__is_silent_mixed(self): |
| 141 | + assert not _is_silent([random.randint(0, 2*SILENT_THRESH) for i in range(1024)]) |
| 142 | + |
| 143 | + def test__is_silent_loud(self): |
| 144 | + assert not _is_silent([random.randint(SILENT_THRESH//2, 3*SILENT_THRESH) for i in range(1024)]) |
| 145 | + |
| 146 | + def test__pyaudio_record_silence(self): |
| 147 | + # set record mode to silent, and start loud, so that we don't infinitly |
| 148 | + # remove silent data |
| 149 | + MockStream.read_mode = "silent" |
| 150 | + MockStream.start_loud = True |
| 151 | + with mock.patch('pyaudio.PyAudio', new=MockPyAudio): |
| 152 | + # should record up to 1 second of silence |
| 153 | + data = [] |
| 154 | + for chunk in _pyaudio_record(0, 1.0): |
| 155 | + data.extend(chunk) |
| 156 | + assert len(data) == 16384 |
79 | 157 |
|
| 158 | + def test__pyaudio_record_mixed(self): |
| 159 | + # set record mode to random noise |
| 160 | + MockStream.read_mode = "random" |
| 161 | + with mock.patch('pyaudio.PyAudio', new=MockPyAudio): |
| 162 | + # should record up to 1 second of silence |
| 163 | + data = [] |
| 164 | + for chunk in _pyaudio_record(1.0, 0): |
| 165 | + data.extend(chunk) |
| 166 | + assert len(data) >= 16384 |
| 167 | + |
| 168 | + def test__pyaudio_record_loud(self): |
| 169 | + # set record mode to loud |
| 170 | + MockStream.read_mode = "loud" |
| 171 | + with mock.patch('pyaudio.PyAudio', new=MockPyAudio): |
| 172 | + # should record up to 1 second of silence |
| 173 | + data = [] |
| 174 | + for chunk in _pyaudio_record(1.0, 0): |
| 175 | + data.extend(chunk) |
| 176 | + assert len(data) == 16384 |
| 177 | + |
| 178 | + |
0 commit comments