-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: add TASK option * refactor: print_warning and print_question methods * feat: add WhisperService class for audio transcription * refactor: input prompt formatting in audio_utils.py * feat: add AudioDeviceManager class and test cases * feat: add AudioRecorder class and its unit tests * feat: add AudioProcessor and unit tests for audio processing functionality * feat: add AudioService class and its unit tests * refactor: remove audio_transcriber test file * refactor: audio transcriber to use services * docs: add TASK configuration option. Expanded text.
- Loading branch information
1 parent
a067b05
commit 6202f21
Showing
19 changed files
with
821 additions
and
559 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from src.audio_utils import ( | ||
choose_audio_device, | ||
choose_sample_rate, | ||
find_supported_sample_rates, | ||
) | ||
from src.cli_interface import CliInterface | ||
|
||
|
||
class AudioDeviceManager: | ||
def __init__(self, pyaudio_instance): | ||
self.pyaudio_instance = pyaudio_instance | ||
self.device_index, self.chosen_sample_rate = self.setup_audio_device() | ||
|
||
def setup_audio_device(self): | ||
device_index = choose_audio_device(self.pyaudio_instance) | ||
supported_rates = find_supported_sample_rates(self.pyaudio_instance, device_index) | ||
if not supported_rates: | ||
CliInterface.print_error("No supported sample rates found for the device.") | ||
self.pyaudio_instance.terminate() | ||
exit(1) | ||
chosen_sample_rate = choose_sample_rate(supported_rates) | ||
return device_index, chosen_sample_rate |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import audioop | ||
import math | ||
import os | ||
import tempfile | ||
import time | ||
import wave | ||
from queue import Queue | ||
from threading import Thread | ||
|
||
import pyaudio | ||
|
||
from src.cli_interface import CliInterface | ||
from src.config import RECORDING_DURATION | ||
from src.whisper_service import WhisperService | ||
|
||
|
||
class AudioProcessor: | ||
def __init__(self, chosen_sample_rate): | ||
self.processing_queue = Queue() | ||
self.audio_buffer = bytes() | ||
self.desired_length = chosen_sample_rate * 2 * RECORDING_DURATION | ||
self.chosen_sample_rate = chosen_sample_rate | ||
self.whisper_transcription = WhisperService() | ||
self.is_processing = True | ||
self.processing_thread = Thread(target=self.process_audio_chunks_queue) | ||
self.processing_thread.start() | ||
|
||
def start_processing_thread(self): | ||
""" | ||
Start a new thread for processing audio chunks. | ||
""" | ||
self.processing_thread = Thread(target=self.process_audio_chunks_queue) | ||
self.processing_thread.start() | ||
|
||
def process_audio_chunks_queue(self): | ||
""" | ||
Continuously process audio chunks from the queue until the processing is stopped. | ||
""" | ||
while self.is_processing or not self.processing_queue.empty(): | ||
if not self.processing_queue.empty(): | ||
( | ||
audio_chunk, | ||
temp_file_path, | ||
volume_db, | ||
) = self.processing_queue.get() # Adjusted to include volume_db | ||
self.whisper_transcription.transcribe_audio_chunk(temp_file_path, volume_db) # Pass volume_db to the method | ||
CliInterface.print_success( | ||
"Processed audio chunk with volume {:.2f} dB and size {} bytes.".format(volume_db, len(audio_chunk)) | ||
) | ||
else: | ||
time.sleep(0.1) # Sleep briefly to avoid busy waiting | ||
|
||
def stop_processing(self): | ||
""" | ||
Stop the processing of audio chunks and wait for the processing thread to finish. | ||
""" | ||
self.is_processing = False | ||
self.processing_thread.join() | ||
self.whisper_transcription.output_transcription_results() | ||
|
||
def is_processing_completed(self): | ||
""" | ||
Check if the processing of audio chunks is completed. | ||
:return: True if the processing queue is empty and there are no active tasks in the transcription service. | ||
""" | ||
return self.processing_queue.empty() and self.whisper_transcription.active_tasks == 0 | ||
|
||
def process_audio_chunk_volume(self, in_data): | ||
""" | ||
Calculate the volume of an audio chunk in decibels. | ||
:param in_data: The audio data. | ||
:return: The volume of the audio data in decibels. | ||
""" | ||
rms = audioop.rms(in_data, 2) # Assuming 16-bit audio | ||
return 20 * math.log10(rms) if rms > 0 else -float("inf") | ||
|
||
def audio_callback(self, in_data, _frame_count, _time_info, _status): | ||
""" | ||
Callback function for the audio stream. | ||
Adds the incoming audio data to the buffer and processes it when it reaches the desired length. | ||
:param in_data: The incoming audio data. | ||
:param _frame_count: The number of frames in the audio data. | ||
:param _time_info: Information about the timing of the audio data. | ||
:param _status: The status of the audio stream. | ||
:return: A tuple containing None and pyaudio.paContinue, indicating that the stream should continue. | ||
""" | ||
self.audio_buffer += in_data | ||
if len(self.audio_buffer) >= self.desired_length: | ||
self.process_and_queue_chunk() | ||
return (None, pyaudio.paContinue) | ||
|
||
def process_and_queue_chunk(self): | ||
""" | ||
Process an audio chunk from the buffer, calculate its volume, and add it to the queue for transcription. | ||
""" | ||
temp_file, temp_file_path = tempfile.mkstemp(suffix=".wav") | ||
os.close(temp_file) | ||
with wave.open(temp_file_path, "wb") as wave_file: | ||
wave_file.setnchannels(1) | ||
wave_file.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16)) | ||
wave_file.setframerate(self.chosen_sample_rate) | ||
wave_file.writeframes(self.audio_buffer) | ||
volume_db = self.process_audio_chunk_volume(self.audio_buffer) | ||
self.processing_queue.put((self.audio_buffer, temp_file_path, volume_db)) # Ensure this matches the expected unpacking | ||
|
||
self.audio_buffer = bytes() # Clear the buffer for the next chunk | ||
|
||
def finalize_recording(self): | ||
""" | ||
Process the remaining audio data in the buffer when the recording is finalized. | ||
""" | ||
if len(self.audio_buffer) > 0: | ||
self.process_and_queue_chunk() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import time | ||
|
||
import pyaudio | ||
|
||
from src.cli_interface import CliInterface, start_pause_message | ||
from src.config import FRAMES_PER_BUFFER | ||
|
||
|
||
class AudioRecorder: | ||
def __init__(self, audio_device_manager, pyaudio_instance, audio_processor): | ||
self.audio_device_manager = audio_device_manager | ||
self.pyaudio_instance = pyaudio_instance | ||
self.audio_processor = audio_processor | ||
self.recording = False | ||
self.stream = None | ||
|
||
def toggle_recording(self): | ||
""" | ||
Toggle the recording state. | ||
If the application is currently recording, it will be paused. | ||
If the application is currently paused, it will start recording. | ||
""" | ||
if self.recording: | ||
self.pause_recording() | ||
print(CliInterface.colorize("\r\n\u23f8", bold=True) + " Recording paused. " + start_pause_message) | ||
else: | ||
self.start_recording() | ||
print(CliInterface.colorize("\r\n\u25cf", red=True) + " Recording started. " + start_pause_message) | ||
|
||
def start_recording(self): | ||
""" | ||
Start recording audio. Opens a new stream with the chosen audio device and sample rate. | ||
The stream's callback function is set to the transcription service's audio callback function. | ||
""" | ||
if not self.recording: | ||
CliInterface.print_info("Initializing recording...") | ||
self.stream = self.pyaudio_instance.open( | ||
format=pyaudio.paInt16, | ||
channels=1, | ||
rate=self.audio_device_manager.chosen_sample_rate, | ||
input=True, | ||
input_device_index=self.audio_device_manager.device_index, | ||
frames_per_buffer=FRAMES_PER_BUFFER, | ||
stream_callback=self.audio_processor.audio_callback, | ||
) | ||
self.stream.start_stream() | ||
self.recording = True | ||
|
||
def pause_recording(self, stop=False): | ||
""" | ||
Pause recording audio. Stop the audio stream and wait for the transcription service to finish processing. | ||
If stop is True, it indicates that the recording is being stopped, not just paused. | ||
:param stop: Whether the recording is being stopped. | ||
""" | ||
CliInterface.print_info( | ||
"Pausing recording... wait for processing to complete" | ||
if not stop | ||
else "Stopping recording... wait for processing to complete" | ||
) | ||
|
||
if self.recording: | ||
if self.stream: | ||
self.stream.stop_stream() | ||
self.stream.close() | ||
self.stream = None | ||
self.audio_processor.finalize_recording() | ||
self.recording = False | ||
|
||
while not self.audio_processor.is_processing_completed(): | ||
time.sleep(0.1) # Adjust sleep time as necessary |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import os | ||
import sys | ||
import termios | ||
import tty | ||
|
||
import pyaudio | ||
from pynput import keyboard | ||
|
||
from src.audio_device_manager import AudioDeviceManager | ||
from src.audio_processor import AudioProcessor | ||
from src.audio_recorder import AudioRecorder | ||
from src.cli_interface import CliInterface, start_pause_message | ||
|
||
|
||
class AudioService: | ||
def __init__(self): | ||
CliInterface.print_welcome() | ||
self.pyaudio_instance = pyaudio.PyAudio() | ||
self.audio_device_manager = AudioDeviceManager(self.pyaudio_instance) | ||
self.audio_processor = AudioProcessor(self.audio_device_manager.chosen_sample_rate) | ||
self.audio_recorder = AudioRecorder(self.audio_device_manager, self.pyaudio_instance, self.audio_processor) | ||
CliInterface.print_info(start_pause_message) | ||
|
||
def on_key_press(self, key): | ||
""" | ||
Handle a key press event. If the space bar is pressed, toggle the recording state. | ||
If the escape key is pressed, stop recording and processing, terminate the PyAudio instance, and exit the application. | ||
:param key: The key that was pressed. | ||
""" | ||
if key == keyboard.Key.space: | ||
self.audio_recorder.toggle_recording() | ||
elif key == keyboard.Key.esc: | ||
if self.audio_recorder.recording: | ||
self.audio_recorder.pause_recording(stop=True) | ||
self.audio_processor.stop_processing() | ||
self.audio_recorder.pyaudio_instance.terminate() | ||
CliInterface.print_exit() | ||
return False | ||
return None | ||
|
||
def run(self): | ||
""" | ||
Start the main loop of the application. Listens for key press events and handles them with the on_key_press function. | ||
""" | ||
# Check if sys.stdin is a real file | ||
if os.isatty(sys.stdin.fileno()): | ||
# Save the current terminal settings | ||
old_settings = termios.tcgetattr(sys.stdin) | ||
|
||
try: | ||
# Disable echoing | ||
tty.setcbreak(sys.stdin.fileno()) | ||
|
||
with keyboard.Listener(on_press=self.on_key_press) as listener: # type: ignore | ||
listener.join() | ||
finally: | ||
# Restore the old terminal settings | ||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) | ||
else: | ||
# sys.stdin is not a real file, so just run the listener | ||
with keyboard.Listener(on_press=self.on_key_press) as listener: # type: ignore | ||
listener.join() |
Oops, something went wrong.