From 3c116139f72ba4cb22be90db0714d409d05a5bef Mon Sep 17 00:00:00 2001 From: Daniel McKnight <34697904+NeonDaniel@users.noreply.github.com> Date: Wed, 22 May 2024 16:51:22 -0700 Subject: [PATCH] Implement websocket client class (#14) # Description Implements a WS communication-based client Refactor to minimize duplicate code # Issues # Other Notes - Goes with https://github.com/NeonGeckoCom/neon-hana/pull/20 --------- Co-authored-by: Daniel McKnight --- .github/workflows/publish_test_build.yml | 1 + README.md | 16 ++ neon_nodes/__init__.py | 27 ++ neon_nodes/configuration/system.yaml | 5 + neon_nodes/voice_client.py | 42 +--- neon_nodes/websocket_client.py | 305 +++++++++++++++++++++++ requirements/voice_client.txt | 3 +- requirements/websocket_client.txt | 2 + setup.py | 3 +- 9 files changed, 371 insertions(+), 33 deletions(-) create mode 100644 neon_nodes/websocket_client.py create mode 100644 requirements/websocket_client.txt diff --git a/.github/workflows/publish_test_build.yml b/.github/workflows/publish_test_build.yml index de99eed..b03636d 100644 --- a/.github/workflows/publish_test_build.yml +++ b/.github/workflows/publish_test_build.yml @@ -15,6 +15,7 @@ jobs: with: version_file: "neon_nodes/version.py" publish_prerelease: true + update_changelog: true trigger_os_build: runs-on: ubuntu-latest needs: publish_alpha_release diff --git a/README.md b/README.md index de32b82..697de17 100644 --- a/README.md +++ b/README.md @@ -7,3 +7,19 @@ processing, and presenting a response to the user. The voice client will start a service that listens for a wake word on the local system, sends recorded speech to a HANA endpoint for processing, and plays back the response. + +## Websocket Client +The websocket client starts a local listener service and establishes a websocket +connection to a remove HANA server. Compared to the Voice Client, this has +lower latency and allows for asynchronous messages from the HANA server. + +## Configuration +This service is configured via `~/.config/neon/neon.yaml`. + +```yaml +neon_node: + description: Neon Node # Friendly description of the node + hana_address: https://hana.neonaiservices.com # Hana server HTTP address + hana_username: node_user # Hana node user username + hana_password: node_password # Hana node user password +``` diff --git a/neon_nodes/__init__.py b/neon_nodes/__init__.py index d782cbb..5ce744f 100644 --- a/neon_nodes/__init__.py +++ b/neon_nodes/__init__.py @@ -23,3 +23,30 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from mock import Mock +from ovos_utils.log import LOG + +class MockTransformers(Mock): + def transform(self, chunk): + return chunk, dict() + + +def on_ready(): + LOG.info("ready") + + +def on_stopping(): + LOG.info("stopping") + + +def on_error(e="unknown"): + LOG.error(e) + + +def on_alive(): + LOG.debug("alive") + + +def on_started(): + LOG.debug("started") \ No newline at end of file diff --git a/neon_nodes/configuration/system.yaml b/neon_nodes/configuration/system.yaml index ee4bf1d..c980ae1 100644 --- a/neon_nodes/configuration/system.yaml +++ b/neon_nodes/configuration/system.yaml @@ -1,8 +1,13 @@ neon_node: description: Neon Node + # TODO: Stable release address + hana_address: https://hana.neonaibeta.com + hana_username: neon + hana_password: neon microphone: module: ovos-microphone-plugin-alsa listener: + sample_rate: 16000 wake_word: hey_mycroft stand_up_word: "" VAD: diff --git a/neon_nodes/voice_client.py b/neon_nodes/voice_client.py index 7e62d2f..981f2d4 100644 --- a/neon_nodes/voice_client.py +++ b/neon_nodes/voice_client.py @@ -47,30 +47,7 @@ from pydub import AudioSegment from pydub.playback import play - -class MockTransformers(Mock): - def transform(self, chunk): - return chunk, dict() - - -def on_ready(): - LOG.info("ready") - - -def on_stopping(): - LOG.info("stopping") - - -def on_error(e="unknown"): - LOG.error(e) - - -def on_alive(): - LOG.debug("alive") - - -def on_started(): - LOG.debug("started") +from neon_nodes import on_alive, on_error, on_ready, on_started, on_stopping, MockTransformers class NeonVoiceClient: @@ -81,7 +58,9 @@ def __init__(self, bus=None, ready_hook=on_ready, error_hook=on_error, self.stopping_hook = stopping_hook alive_hook() self.config = Configuration() - self._device_data = self.config.get('neon_node', {}) + self._node_config = self.config.get('neon_node', {}) + self._hana_address = self._node_config.get('hana_address') + LOG.init(self.config.get("logging")) self.bus = bus or FakeBus() self.lang = self.config.get('lang') or "en-us" @@ -108,7 +87,7 @@ def __init__(self, bus=None, ready_hook=on_ready, error_hook=on_error, self._error_sound = None self._network_info = dict() - self._node_data = dict() + self._node_data = {"description": self._node_config.get("description")} started_hook() self.run() @@ -240,18 +219,21 @@ def get_audio_response(self, audio: bytes): audio_data = b64encode(audio).decode("utf-8") transcript = request_backend("neon/get_stt", {"encoded_audio": audio_data, - "lang_code": self.lang}) + "lang_code": self.lang}, + server_url=self._hana_address) transcribed = transcript['transcripts'][0] LOG.info(transcribed) response = request_backend("neon/get_response", {"lang_code": self.lang, "user_profile": self.user_profile, "node_data": self.node_data, - "utterance": transcribed}) + "utterance": transcribed}, + server_url=self._hana_address) answer = response['answer'] LOG.info(answer) audio = request_backend("neon/get_tts", {"lang_code": self.lang, - "to_speak": answer}) + "to_speak": answer}, + server_url=self._hana_address) audio_bytes = b64decode(audio['encoded_audio']) play(AudioSegment.from_file(io.BytesIO(audio_bytes), format="wav")) LOG.info(f"Playback completed") @@ -272,6 +254,4 @@ def main(*args, **kwargs): if __name__ == "__main__": - # environ.setdefault("OVOS_CONFIG_BASE_FOLDER", "neon") - # environ.setdefault("OVOS_CONFIG_FILENAME", "diana.yaml") main() diff --git a/neon_nodes/websocket_client.py b/neon_nodes/websocket_client.py new file mode 100644 index 0000000..917230b --- /dev/null +++ b/neon_nodes/websocket_client.py @@ -0,0 +1,305 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Development System +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2021 Neongecko.com Inc. +# BSD-3 +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import io +import json +import requests + +from os.path import join, isfile, dirname +from threading import Thread, Event +from unittest.mock import Mock +from base64 import b64decode, b64encode + +from ovos_plugin_manager.microphone import OVOSMicrophoneFactory +from ovos_plugin_manager.vad import OVOSVADFactory +from ovos_dinkum_listener.voice_loop.voice_loop import DinkumVoiceLoop +from ovos_dinkum_listener.voice_loop.hotwords import HotwordContainer +from ovos_config.config import Configuration +from ovos_utils.messagebus import FakeBus +from ovos_utils.log import LOG +from ovos_bus_client.message import Message +from neon_utils.net_utils import get_adapter_info +from neon_utils.user_utils import get_default_user_config +from speech_recognition import AudioData +from pydub import AudioSegment +from pydub.playback import play +from websocket import WebSocketApp + +from neon_nodes import on_alive, on_error, on_ready, on_started, on_stopping, MockTransformers + + +class NeonWebsocketClient: + def __init__(self, bus=None, ready_hook=on_ready, error_hook=on_error, + stopping_hook=on_stopping, alive_hook=on_alive, + started_hook=on_started): + self.error_hook = error_hook + self.stopping_hook = stopping_hook + alive_hook() + self.config = Configuration() + node_config = self.config["neon_node"] + server_addr = node_config["hana_address"] + self._connected = Event() + + auth_data = requests.post(f"{server_addr}/auth/login", json={ + "username": node_config["hana_username"], + "password": node_config["hana_password"]}).json() + LOG.info(auth_data) + + def ws_connect(*_, **__): + self._connected.set() + + def ws_disconnect(*_, **__): + if not self._connected.is_set(): + LOG.info("WS disconnected on shutdown") + return + error = "Websocket unexpectedly disconnected" + self.error_hook(error) + raise ConnectionError(error) + + def ws_error(_, exception): + self.error_hook(exception) + raise ConnectionError(f"Failed to connect: {exception}") + + ws_address = server_addr.replace("http", "ws", 1) + self.websocket = WebSocketApp(f"{ws_address}/node/v1?token={auth_data['access_token']}", + on_message=self._on_ws_data, + on_open=ws_connect, + on_error=ws_error, + on_close=ws_disconnect) + Thread(target=self.websocket.run_forever, daemon=True).start() + self._device_data = self.config.get('neon_node', {}) + LOG.init(self.config.get("logging")) + self.bus = bus or FakeBus() + self.lang = self.config.get('lang') or "en-us" + self._mic = OVOSMicrophoneFactory.create(self.config) + self._mic.start() + self._hotwords = HotwordContainer(self.bus) + self._hotwords.load_hotword_engines() + self._vad = OVOSVADFactory.create(self.config) + + self._voice_loop = DinkumVoiceLoop(mic=self._mic, + hotwords=self._hotwords, + stt=Mock(), + fallback_stt=Mock(), + vad=self._vad, + transformers=MockTransformers(), + stt_audio_callback=self.on_stt_audio, + listenword_audio_callback= + self.on_hotword_audio) + self._voice_loop.start() + self._voice_thread = None + self._watchdog_event = Event() + + self._listening_sound = None + self._error_sound = None + + self._network_info = dict() + self._node_data = dict() + + started_hook() + self.run() + self._wait_for_connection() + ready_hook() + + def _wait_for_connection(self): + LOG.debug("Waiting for WS connection") + if not self._connected.wait(30): + error = f"Timeout waiting for connection to {self.websocket.url}" + self.error_hook(error) + raise TimeoutError(error) + + @property + def listening_sound(self) -> AudioSegment: + """ + Get an AudioSegment representation of the configured listening sound + """ + if not self._listening_sound: + res_file = Configuration().get('sounds').get('start_listening') + if not isfile(res_file): + res_file = join(dirname(__file__), "res", "start_listening.wav") + self._listening_sound = AudioSegment.from_file(res_file, + format="wav") + return self._listening_sound + + @property + def error_sound(self) -> AudioSegment: + """ + Get an AudioSegment representation of the configured error sound + """ + if not self._error_sound: + res_file = Configuration().get('sounds').get('error') + if not isfile(res_file): + res_file = join(dirname(__file__), "res", "error.wav") + self._error_sound = AudioSegment.from_file(res_file, format="wav") + return self._error_sound + + @property + def network_info(self) -> dict: + """ + Get networking information about this client, including IP addresses and + MAC address. + """ + if not self._network_info: + self._network_info = get_adapter_info() + public_ip = requests.get('https://api.ipify.org').text + self._network_info["public"] = public_ip + LOG.debug(f"Resolved network info: {self._network_info}") + return self._network_info + + @property + def node_data(self): + """ + Get information about this node from configuration and networking status + """ + if not self._node_data: + self._node_data = {"device_description": self._node_data.get( + 'description', 'node voice client'), + "networking": { + "local_ip": self.network_info.get('ipv4'), + "public_ip": self.network_info.get('public'), + "mac_address": self.network_info.get('mac')} + } + LOG.info(f"Resolved node_data: {self._node_data}") + return self._node_data + + @property + def user_profile(self) -> dict: + """ + Get a user profile from local disk + """ + return get_default_user_config() + + def _on_ws_data(self, _, serialized: str): + try: + message = Message.deserialize(serialized) + self.on_response(message) + except Exception as e: + LOG.exception(e) + + def run(self): + """ + Start the voice thread as a daemon and return + """ + try: + self._voice_thread = Thread(target=self._voice_loop.run, + daemon=True) + self._voice_thread.start() + except Exception as e: + self.error_hook(repr(e)) + raise e + + def watchdog(self): + """ + Runs in a loop to make sure the voice loop is running. If the loop is + unexpectedly stopped, raise an exception to kill this process. + """ + try: + while not self._watchdog_event.wait(30): + if not self._voice_thread.is_alive(): + self.error_hook("11") + raise RuntimeError("Voice Thread not alive") + if not self._voice_loop._is_running: + self.error_hook("12") + raise RuntimeError("Voice Loop not running") + except KeyboardInterrupt: + self.shutdown() + + def on_stt_audio(self, audio_bytes: bytes, context: dict): + """ + Callback when there is a recorded STT segment. + @param audio_bytes: bytes of recorded audio + @param context: dict context associated with recorded audio + """ + LOG.debug(f"Got {len(audio_bytes)} bytes of audio") + wav_data = AudioData(audio_bytes, self._mic.sample_rate, + self._mic.sample_width).get_wav_data() + try: + self.on_input(wav_data) + except Exception as e: + play(self.error_sound) + # Unknown error, restart to be safe + self.error_hook(repr(e)) + raise e + + def on_hotword_audio(self, audio: bytes, context: dict): + """ + Callback when a hotword is detected. + @param audio: bytes of detected hotword audio + @param context: dict context associated with recorded hotword + """ + payload = context + msg_type = "recognizer_loop:wakeword" + play(self.listening_sound) + LOG.info(f"Emitting hotword event: {msg_type}") + # emit ww event + self.bus.emit(Message(msg_type, payload, context)) + # TODO: Optionally save/upload hotword audio + + def on_input(self, audio: bytes): + """ + Handle recorded audio input and get/speak a response. + @param audio: bytes of STT audio + """ + audio_data = b64encode(audio).decode("utf-8") + data = {"msg_type": "neon.audio_input", + "data": {"audio_data": audio_data, "lang": self.lang}} + self.websocket.send(json.dumps(data)) + + def on_response(self, message: Message): + if message.msg_type == "klat.response": + LOG.info(f"Response=" + f"{message.data['responses'][self.lang]['sentence']}") + encoded_audio = message.data['responses'][self.lang]['audio'] + audio_bytes = b64decode(encoded_audio.get('female') or + encoded_audio.get('male')) + play(AudioSegment.from_file(io.BytesIO(audio_bytes), format="wav")) + LOG.info(f"Playback completed") + elif message.msg_type == "neon.alert_expired": + LOG.info(f"Alert expired: {message.data}") + elif message.msg_type == "neon.audio_input.response": + LOG.info(f"Got STT: {message.data.get('transcripts')}") + else: + LOG.warning(f"Ignoring message: {message.msg_type}") + + def shutdown(self): + """ + Cleanly stop all threads and shutdown this service + """ + self.stopping_hook() + self._connected.clear() + self.websocket.close() + self._watchdog_event.set() + self._voice_loop.stop() + self._voice_thread.join(30) + + +def main(*args, **kwargs): + client = NeonWebsocketClient(*args, **kwargs) + client.watchdog() + + +if __name__ == "__main__": + main() diff --git a/requirements/voice_client.txt b/requirements/voice_client.txt index 52854d5..d39808f 100644 --- a/requirements/voice_client.txt +++ b/requirements/voice_client.txt @@ -1,5 +1,6 @@ neon-utils[network]~=1.9 ovos-dinkum-listener==0.0.3a29 +# TODO: webrtcvad fails to run in OS image ovos-vad-plugin-silero~=0.0.1 ovos-microphone-plugin-alsa~=0.0.0 # ovos-microphone-plugin-sounddevice Not working on Mark2 where alsa is @@ -8,6 +9,6 @@ pydub~=0.25 SpeechRecognition~=3.10 sdnotify~=0.3 requests~=2.28 - +mock~=5.0 # TODO: Below patching unpinned dependency upstream allowing 2.0.0 which is incompatible numpy~=1.26 diff --git a/requirements/websocket_client.txt b/requirements/websocket_client.txt new file mode 100644 index 0000000..28bdc51 --- /dev/null +++ b/requirements/websocket_client.txt @@ -0,0 +1,2 @@ +websockets~=12.0 + diff --git a/setup.py b/setup.py index 9f295fc..f234ea7 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,8 @@ def get_requirements(requirements_filename: str): author_email='developers@neon.ai', license='BSD-3-Clause', packages=find_packages(), - extras_require={"voice-client": get_requirements("voice_client.txt")}, + extras_require={"voice-client": get_requirements("voice_client.txt"), + "websocket-client": get_requirements("websocket_client.txt")}, package_data={'neon_nodes': ['res/*']}, zip_safe=True, classifiers=[