Skip to content

Commit

Permalink
Recorder, camera and demo fixes. Also, some new audio files
Browse files Browse the repository at this point in the history
  • Loading branch information
yorevs committed Dec 18, 2024
1 parent d661a58 commit 80612e5
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 160 deletions.
52 changes: 29 additions & 23 deletions src/demo/components/camera_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import os

min_distance = 1.6

# fmt: off
MENU = dedent(f"""\
> Camera Demo options
Expand All @@ -29,45 +31,49 @@
init_context("camera-demo")
photo: ImageMetadata
while opt := line_input(MENU, placeholder="Select an option"):
cursor.write()
cursor.writeln()
if opt == "1" and (name := strip_escapes(line_input("Photo name: "))):
pic_file, pic_data = camera.capture(name)
face_files, face_datas = camera.detect_faces(pic_data, name)
cursor.write()
cursor.write(f"Photo taken: {pic_file} Detected faces: {len(face_files)}")
cursor.writeln()
cursor.writeln(f"Photo taken: {pic_file} Detected faces: {len(face_files)}")
if opt == "2" and not (name := line_input("Press [Enter] key when ready")):
if photo := camera.identify():
cursor.write()
cursor.write(f"Identified person: {photo.caption} URI: {photo.uri} DIST: {photo.distance}")
cursor.writeln()
cursor.writeln(f"Identified person: {photo.caption} URI: {photo.uri} DIST: {photo.distance}")
open_command(photo.uri)
else:
cursor.write()
cursor.write("No identification was possible!")
cursor.writeln()
cursor.writeln("No identification was possible!")
while opt == "3" and (query := line_input("Query photo: ", "Type in the description (<empty> to return)")):
cursor.write()
cursor.write(f"Showing result for: {query}")
cursor.writeln()
cursor.writeln(f"Showing results for: {query}")
results: list[ImageMetadata] = store.query_image(query)
for photo in results:
cursor.write()
cursor.write(f"Showing photo: {photo.caption} URI: {photo.uri} DIST: {photo.distance}")
# Filtering by distances less than 0.7
for photo in list(filter(lambda r: r.distance <= min_distance, results)):
cursor.writeln()
cursor.writeln(f"Showing photo: {photo.caption} URI: {photo.uri} DIST: {photo.distance}")
open_command(photo.uri)
cursor.writeln()
while opt == "4" and (query := line_input("Query face: ", "Type in the description (<empty> to return)")):
cursor.write()
cursor.write(f"Showing result for: {query}")
cursor.writeln()
cursor.writeln(f"Showing results for: {query}")
results: list[ImageMetadata] = store.query_face(query)
for photo in results:
cursor.write()
cursor.write(f"Showing face: {photo.caption} URI: {photo.uri} DIST:", photo.distance)
# Filtering by distances less than 0.7
for photo in list(filter(lambda r: r.distance <= min_distance, results)):
cursor.writeln()
cursor.writeln(f"Showing face: {photo.caption} URI: {photo.uri} DIST: {photo.distance}")
open_command(photo.uri)
cursor.writeln()
if opt == "5":
count: int = store.sync_store(re_caption=False)
cursor.write()
cursor.write(f"Synchronized files: {count}")
cursor.writeln()
cursor.writeln(f"Synchronized files: {count}")
if opt == "6" and (query := line_input("Path to import: ", "File, folder path or glob")):
images, faces = camera.import_images(query.strip(), True)
cursor.write()
cursor.write(f"Imported images: {images} Detected faces: {faces}")
cursor.writeln()
cursor.writeln(f"Imported images: {images} Detected faces: {faces}")
if opt == "7":
print(f"```json\n{os.linesep.join(store.enlist())}\n```")
cursor.write(os.linesep)
cursor.write("Done")
cursor.writeln(os.linesep)
cursor.writeln("Done")
23 changes: 23 additions & 0 deletions src/demo/devel/eleven_labs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import os

import requests

ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")


def get_available_voices():
url = "https://api.elevenlabs.io/v1/voices"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
voices = response.json().get("voices", [])
for voice in sorted(voices, key=lambda v: v["name"]):
print(f"Name: {voice['name']}, ID: {voice['voice_id']}")
else:
print(f"Failed to retrieve voices: {response.status_code}, {response.text}")


if __name__ == "__main__":
get_available_voices()
68 changes: 68 additions & 0 deletions src/demo/devel/eleven_labs_recon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os

import speech_recognition as sr
import requests

# Replace these with your actual API keys
from askai.core.component.audio_player import player

# -------------------- Configuration --------------------

ELEVENLABS_API_KEY = os.environ.get("ELEVENLABS_API_KEY")

# ElevenLabs Voice ID (obtained from the previous step)
VOICE_ID = "pqHfZKP75CvOlQylNhV4"

# Path to save the synthesized audio
AUDIO_OUTPUT_PATH = "response_audio.mp3"


def recognize_speech():
recognizer = sr.Recognizer()
with sr.Microphone() as source:
print("Adjusting for ambient noise... Please wait.")
recognizer.adjust_for_ambient_noise(source, duration=0.2)
print("Listening... Please speak into the microphone.")
audio = recognizer.listen(source)
try:
# Using Google Speech Recognition
text = recognizer.recognize_google(audio)
print(f"Recognized Text: {text}")
return text
except sr.UnknownValueError:
print("Google Speech Recognition could not understand audio.")
except sr.RequestError as e:
print(f"Could not request results from Google Speech Recognition service; {e}")
return None


def synthesize_speech(text):
print("Synthesizing speech with ElevenLabs...")
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"

headers = {
"Content-Type": "application/json",
"xi-api-key": ELEVENLABS_API_KEY,
}

data = {"text": text, "voice_settings": {"stability": 0.75, "similarity_boost": 0.75}}

response = requests.post(url, headers=headers, json=data)

if response.status_code == 200:
with open(AUDIO_OUTPUT_PATH, "wb") as f:
f.write(response.content)
print(f"Audio synthesized and saved as {AUDIO_OUTPUT_PATH}")
player.play_audio_file(AUDIO_OUTPUT_PATH)
else:
print(f"Failed to synthesize speech: {response.status_code}, {response.text}")


def main():
input_text = recognize_speech()
if input_text:
synthesize_speech(input_text)


if __name__ == "__main__":
main()
Binary file added src/demo/devel/response_audio.mp3
Binary file not shown.
58 changes: 29 additions & 29 deletions src/main/askai/core/component/audio_player.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
# -*- coding: utf-8 -*-

"""
@project: HsPyLib-AskAI
@package: askai.core.component
@file: audio_player.py
@created: Wed, 22 Feb 2024
@author: <B>H</B>ugo <B>S</B>aporetti <B>J</B>unior
@site: https://github.com/yorevs/askai
@license: MIT - Please refer to <https://opensource.org/licenses/MIT>
Copyright (c) 2024, AskAI
@project: HsPyLib-AskAI
@package: askai.core.component
@file: audio_player.py
@created: Wed, 22 Feb 2024
@author: <B>H</B>ugo <B>S</B>aporetti <B>J</B>unior
@site: https://github.com/yorevs/askai
@license: MIT - Please refer to <https://opensource.org/licenses/MIT>
Copyright (c) 2024, AskAI
"""
from askai.__classpath__ import classpath
from clitt.core.term.terminal import Terminal
Expand Down Expand Up @@ -39,28 +39,44 @@ class AudioPlayer(metaclass=Singleton):
SFX_DIR = str(classpath.resource_path) + "/sound-fx"

@staticmethod
def play_audio_file(path_to_audio_file: str | Path, tempo: int = 1) -> bool:
def play_audio_file(path_to_audio_file: str | Path, tempo: int = 1, loop: float | None = None) -> bool:
"""Play the specified audio file using the ffplay (ffmpeg) application.
:param path_to_audio_file: The path to the audio file (e.g., MP3) to be played.
:param tempo: The playback speed (default is 1).
:param loop: Whether to loop the audio playback (None for no looping).
:return: True if the audio file is played successfully, otherwise False.
"""
if file_is_not_empty(str(path_to_audio_file)):
try:
loop_args = f"-loop {loop} " if loop else ""
_, _, code = Terminal.shell_exec(
f'ffplay -af "atempo={tempo}" -v 0 -nodisp -autoexit {path_to_audio_file}'
f'ffplay -af "atempo={tempo}" -v 0 -nodisp -autoexit {loop_args}{path_to_audio_file}'
)
return code == ExitStatus.SUCCESS
except FileNotFoundError:
log.error("Audio file was not found: %s !", path_to_audio_file)

return False

def __init__(self):
@staticmethod
def play_sfx(filename: str, file_ext: Literal[".mp3", ".wav", ".m4a"] = ".mp3", loop: float | None = None) -> bool:
"""Play a sound effect audio file.
:param filename: The name of the sound effect file (without the extension).
:param file_ext: The file extension of the sound effect (default is ".mp3").
:param loop: Whether to loop the audio playback (None for no looping).
:return: True if the sound effect is played successfully, otherwise False.
"""
filename = f"{AudioPlayer.SFX_DIR}/{ensure_endswith(filename, file_ext)}"
check_argument(
which("ffplay") is not None, "ffmpeg::ffplay is required to play audio"
file_is_not_empty(filename),
f"Sound effects file does not exist: {filename}",
)

return AudioPlayer.play_audio_file(filename, loop=loop)

def __init__(self):
check_argument(which("ffplay") is not None, "ffmpeg::ffplay is required to play audio")

@lru_cache
def start_delay(self) -> float:
"""Determine the amount of delay before start streaming the text."""
Expand Down Expand Up @@ -93,21 +109,5 @@ def audio_length(self, path_to_audio_file: str) -> float:

return out

def play_sfx(
self, filename: str, file_ext: Literal[".mp3", ".wav", ".m4a"] = ".mp3"
) -> bool:
"""Play a sound effect audio file.
:param filename: The name of the sound effect file (without the extension).
:param file_ext: The file extension of the sound effect (default is ".mp3").
:return: True if the sound effect is played successfully, otherwise False.
"""
filename = f"{self.SFX_DIR}/{ensure_endswith(filename, file_ext)}"
check_argument(
file_is_not_empty(filename),
f"Sound effects file does not exist: {filename}",
)

return self.play_audio_file(filename)


assert (player := AudioPlayer().INSTANCE) is not None
Loading

0 comments on commit 80612e5

Please sign in to comment.