-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathttt.py
executable file
·414 lines (347 loc) · 13.6 KB
/
ttt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
#!/usr/bin/env python
import json
import os
import subprocess
import time
from datetime import datetime
from pathlib import Path
import apprise
import requests
from better_profanity import profanity
# Let's increase our nice value by 5. We're important but let's not
# impact system functionality overall.
os.nice(5)
# Are we using Deepgram, Whisper, or transformers?
# We only need torch if using transformers so let's not
# waste GPU ram if we're using another service
if os.environ.get("TTT_DEEPGRAM_KEY", False):
whisper_variant = "deepgram"
elif os.environ.get("TTT_WHISPERCPP_URL", False):
whisper_variant = "whispercpp"
else:
whisper_variant = "transformers"
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
# Before we start the main loop, let's globally set up transformers
# We will load up the model, etc now so we only need to
# use the PIPE constant in the function.
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = os.environ.get(
"TTT_TRANSFORMERS_MODEL_ID", "openai/whisper-large-v3-turbo"
)
print(f"We are using {torch_dtype} on {device} with {model_id}")
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_id,
torch_dtype=torch_dtype,
low_cpu_mem_usage=True,
use_safetensors=True,
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
PIPE = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
torch_dtype=torch_dtype,
device=device,
)
# If an ambulance is coming for you stroke is still a bad word,
# we don't want to censor it in this case.
profanity.load_censor_words(whitelist_words=["stroke"])
def transcribe_transformers(calljson, audiofile):
"""
Transcribes audio from the given file using transformers.
Args:
calljson (dict): JSON data containing call information.
audiofile (str): Path to the audio file.
Returns:
dict: Updated calljson with transcribed text.
"""
audiofile = str(audiofile)
# Set the return argument to english
# result = PIPE(audiofile, generate_kwargs={"language": "english"})
result = PIPE(audiofile, generate_kwargs={"return_timestamps": True})
calljson["text"] = result["text"]
return calljson
def send_notifications(calljson, audiofile, destinations):
# sourcery skip: do-not-use-bare-except
"""
Sends notifications based on call information.
Args:
calljson (dict): JSON data containing call information.
audiofile (str): Path to the audio file.
destinations (dict): Dictionary mapping short names to talkgroup URLs.
Returns:
None
"""
# Run ai text through profanity filter
body = profanity.censor(calljson["text"])
title = (
calljson["talkgroup_description"]
+ " @ "
+ str(datetime.fromtimestamp(calljson["start_time"]))
)
short_name = str(calljson["short_name"])
talkgroup = str(calljson["talkgroup"])
try:
notify_url = destinations[short_name][talkgroup]
# If TTT_ATTACH_AUDIO is set to True, attach it to apprise notification
attach_audio = os.environ.get("TTT_ATTACH_AUDIO", "False").lower() in (
"true",
"1",
"t",
)
apobj = apprise.Apprise()
apobj.add(notify_url)
if attach_audio:
audio_notification(audiofile, apobj, body, title)
else:
apobj.notify(
body=body,
title=title,
)
# trunk-ignore(ruff/E722)
except:
print(
"Notification generation failed. This is usually a missing destination in destination.csv"
)
def audio_notification(audiofile, apobj, body, title):
"""
Notifies with audio attachment if possible, else with text only.
Args:
audiofile (str): Path to the audio file.
apobj: Apprise object for notifications.
body (str): Body of the notification.
title (str): Title of the notification.
Returns:
None
"""
# Try and except to handle ffmpeg encoding failures
# If it fails, just upload the text and skip the audio attachment
try:
aacfile = Path(audiofile).with_suffix(".m4a")
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-hide_banner",
"-loglevel",
"error",
"-i",
audiofile,
"-ac",
"1",
"-af",
"highpass=f=200,lowpass=f=3000,anlmdn,loudnorm=i=-14",
"-b:a",
"64k",
"-c:a",
"aac",
aacfile,
]
subprocess.run(ffmpeg_cmd, check=True, timeout=30)
aacfile = str(aacfile)
apobj.notify(
body=body,
title=title,
attach=aacfile,
)
# Remove aacfile; audiofile and json unlinked later
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
except PermissionError:
print(f"No permission to delete {aacfile}.")
except subprocess.CalledProcessError:
print(
f"ffmpeg file conversion error with {aacfile}. We will skip audio on this file and post text only."
)
apobj.notify(
body=body,
title=title,
)
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
except subprocess.TimeoutExpired:
print(
f"ffmpeg file conversion error exceeded 30 seconds on {aacfile}. We will skip audio on this file and post text only."
)
apobj.notify(
body=body,
title=title,
)
try:
Path(aacfile).unlink()
except FileNotFoundError:
print(f"File {aacfile} not found.")
def import_notification_destinations():
"""Imports notification destinations from a CSV file.
Returns:
dict: A dictionary containing the notification destinations.
Explanation:
This function reads a CSV file containing notification destinations. Each row in the CSV file represents
a destination, with the first column as the key, the second column as the sub-key, and the third column
as the value. The function constructs a dictionary where the keys are the values from the first column,
and the values are nested dictionaries with the sub-keys and values from the second and third columns,
respectively. The resulting dictionary is returned.
"""
import csv
destinations = {}
with open("destinations.csv", newline="") as inp:
reader = csv.reader(inp)
next(reader, None) # skip the headers
for row in reader:
if row[0] in destinations:
destinations[row[0]][row[1]] = row[2]
else:
destinations[row[0]] = {row[1]: row[2]}
return destinations
def main():
"""Runs the main loop for transcribing audio files and sending notifications.
Explanation:
This function imports the notification destinations, searches for JSON files in the "media/transcribe" directory,
transcribes the corresponding audio files using different methods based on environment variables,
sends notifications using the transcribed text and the audio files, and deletes the JSON and audio files.
Args:
None
Returns:
None
Raises:
None
Examples:
None
"""
# Import the apprise destinations to send calls
destinations = import_notification_destinations()
while 1:
# First lets search the media directory for all json, sorted by creation time
jsonlist = sorted(
Path("media/transcribe").rglob("*.[jJ][sS][oO][nN]"), key=os.path.getctime
)
# If the queue is empty, pause for 5 seconds and then restart polling
if not jsonlist:
print("Empty queue. Sleep 5 seconds and check again.")
time.sleep(5)
continue
# We seem to be racing the filesystem when a file is detected. Give it 3
# seconds to settle before we work on a list.
time.sleep(3)
for jsonfile in jsonlist:
# Ok, let's grab the first json and pull it out and then the matching wav file
audiofile = Path(jsonfile).with_suffix(".wav")
print(f"Processing: {audiofile}")
# Now load the actual json data into calljson
calljson = jsonfile.read_text()
calljson = json.loads(calljson)
# Send the json and audiofile to a function to transcribe
# If TTT_DEEPGRAM_KEY is set, use deepgram, else
# if TTT_WHISPER_URL is set, use whisper.cpp else
# transformers
if whisper_variant == "deepgram":
calljson = transcribe_deepgram(calljson, audiofile)
elif whisper_variant == "whispercpp":
calljson = transcribe_whispercpp(calljson, audiofile)
else:
calljson = transcribe_transformers(calljson, audiofile)
# When Whisper process a file with no speech, it tends to spit out "you"
# Just "you" and nothing else.
# So if the transcript is just "you", don't bother sending the notification,
# we will just delete the files and keep going to the next call.
if calljson["text"].strip() != "you":
send_notifications(calljson, audiofile, destinations)
# And now delete the files from the transcribe directory
try:
Path(jsonfile).unlink()
except FileNotFoundError:
print(f"File {jsonfile} not found.")
except PermissionError:
print(f"No permission to delete {jsonfile}.")
try:
Path(audiofile).unlink()
except FileNotFoundError:
print(f"File {audiofile} not found.")
except PermissionError:
print(f"No permission to delete {audiofile}.")
def transcribe_whispercpp(calljson, audiofile):
"""Transcribes audio file using whisper.cpp.
Args:
calljson (dict): A dictionary containing the JSON data.
audiofile (Path): The path to the audio file.
Returns:
dict: The updated calljson dictionary with the transcript.
Explanation:
This function sends the audio file to whisper.cpp for transcription. It constructs a multipart/form-data
request with the audio file and other parameters. The response from whisper.cpp is parsed as JSON and
merged into the calljson dictionary. The updated calljson dictionary is then returned.
"""
whisper_url = os.environ.get("TTT_WHISPERCPP_URL", "http://whisper:8080")
# Now send the files over to whisper for transcribing
files = {
"file": (None, audiofile.read_bytes()),
"temperature": (None, "0.0"),
"temperature_inc": (None, "0.2"),
"response_format": (None, "json"),
}
try:
response = requests.post(f"{whisper_url}/inference", files=files, timeout=10)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"A request error occurred while trying to post to whisper.cpp: {e}")
raise RuntimeError(
"A request error occurred while trying to post to whisper.cpp."
) from e
calltext = response.json()
# And now merge that dict into calljson so [text] in calljson is the transcript
calljson = {**calljson, **calltext}
return calljson
def transcribe_deepgram(calljson, audiofile):
"""Transcribes audio file using Deepgram API.
Args:
calljson (dict): A dictionary containing the JSON data.
audiofile (Path): The path to the audio file.
Returns:
dict: The updated calljson dictionary with the transcript.
Explanation:
This function sends the audio file to the Deepgram API for transcription. It constructs a POST request
with the audio file and necessary headers. The response from Deepgram is parsed as JSON, and the
transcript is extracted and added to the calljson dictionary. The updated calljson dictionary is then
returned.
"""
deepgram_key = os.environ.get("TTT_DEEPGRAM_KEY")
headers = {
"Authorization": f"Token {deepgram_key}",
"Content-Type": "audio/wav",
}
params = {
"model": "nova-2-phonecall",
"language": "en-US",
"smart_format": "true",
}
data = audiofile.read_bytes()
try:
response = requests.post(
"https://api.deepgram.com/v1/listen",
params=params,
headers=headers,
data=data,
timeout=10,
)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"A request error occurred while trying to post to Deepgram: {e}")
raise RuntimeError(
"A request error occurred while trying to post to Deepgram."
) from e
json = response.json()
# We take the json returned from deepgram and pull out the "transcript"
# then tack it onto the calljson dict as "text" which is what whisper
# normally uses
calltext = json["results"]["channels"][0]["alternatives"][0]["transcript"]
calljson["text"] = calltext
return calljson
if __name__ == "__main__":
main()