-
Notifications
You must be signed in to change notification settings - Fork 1
/
recognize.py
112 lines (86 loc) · 3.14 KB
/
recognize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import dejavu.fingerprint as fingerprint
import dejavu.decoder as decoder
import numpy as np
import pyaudio
import time
class BaseRecognizer(object):
def __init__(self, dejavu):
self.dejavu = dejavu
self.Fs = fingerprint.DEFAULT_FS
def _recognize(self, *data):
matches = []
for d in data:
matches.extend(self.dejavu.find_matches(d, Fs=self.Fs))
return self.dejavu.align_matches(matches)
def recognize(self):
pass # base class does nothing
class FileRecognizer(BaseRecognizer):
def __init__(self, dejavu):
super(FileRecognizer, self).__init__(dejavu)
def recognize_file(self, filename):
frames, self.Fs, file_hash = decoder.read(filename, self.dejavu.limit)
t = time.time()
match = self._recognize(*frames)
t = time.time() - t
if match:
match['match_time'] = t
return match
def recognize(self, filename):
return self.recognize_file(filename)
class MicrophoneRecognizer(BaseRecognizer):
default_chunksize = 8192
default_format = pyaudio.paInt16
default_channels = 2
default_samplerate = 44100
def __init__(self, dejavu):
super(MicrophoneRecognizer, self).__init__(dejavu)
self.audio = pyaudio.PyAudio()
self.stream = None
self.data = []
self.channels = MicrophoneRecognizer.default_channels
self.chunksize = MicrophoneRecognizer.default_chunksize
self.samplerate = MicrophoneRecognizer.default_samplerate
self.recorded = False
def start_recording(self, channels=default_channels,
samplerate=default_samplerate,
chunksize=default_chunksize):
self.chunksize = chunksize
self.channels = channels
self.recorded = False
self.samplerate = samplerate
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = self.audio.open(
format=self.default_format,
channels=channels,
rate=samplerate,
input=True,
frames_per_buffer=chunksize,
)
self.data = [[] for i in range(channels)]
def process_recording(self):
data = self.stream.read(self.chunksize)
nums = np.fromstring(data, np.int16)
for c in range(self.channels):
self.data[c].extend(nums[c::self.channels])
def stop_recording(self):
self.stream.stop_stream()
self.stream.close()
self.stream = None
self.recorded = True
def recognize_recording(self):
if not self.recorded:
raise NoRecordingError("Recording was not complete/begun")
return self._recognize(*self.data)
def get_recorded_time(self):
return len(self.data[0]) / self.rate
def recognize(self, seconds=10):
self.start_recording()
for i in range(0, int(self.samplerate / self.chunksize
* seconds)):
self.process_recording()
self.stop_recording()
return self.recognize_recording()
class NoRecordingError(Exception):
pass