diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e417dc..8547f5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,20 +1,80 @@ # Changelog -## [1.0.2a2](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.0.2a2) (2023-07-18) +## [1.1.1a9](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a9) (2023-12-14) -[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.0.2a1...1.0.2a2) +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a8...1.1.1a9) **Merged pull requests:** -- Update dependencies and mark deprecation [\#58](https://github.com/NeonGeckoCom/neon_messagebus/pull/58) ([NeonDaniel](https://github.com/NeonDaniel)) +- Update dependencies to prep release [\#68](https://github.com/NeonGeckoCom/neon_messagebus/pull/68) ([NeonDaniel](https://github.com/NeonDaniel)) -## [1.0.2a1](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.0.2a1) (2023-06-27) +## [1.1.1a8](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a8) (2023-11-28) -[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.0.1...1.0.2a1) +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a7...1.1.1a8) **Merged pull requests:** -- Update Docker to use OVOS\_CONFIG envvars [\#57](https://github.com/NeonGeckoCom/neon_messagebus/pull/57) ([NeonDaniel](https://github.com/NeonDaniel)) +- Add language support handler and Update dependencies [\#67](https://github.com/NeonGeckoCom/neon_messagebus/pull/67) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a7](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a7) (2023-11-18) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a6...1.1.1a7) + +**Merged pull requests:** + +- Update MQ Connector with routing fix [\#66](https://github.com/NeonGeckoCom/neon_messagebus/pull/66) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a6](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a6) (2023-11-15) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a5...1.1.1a6) + +**Merged pull requests:** + +- Update neon-messagebus-mq-connector to support response event routing [\#65](https://github.com/NeonGeckoCom/neon_messagebus/pull/65) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a5](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a5) (2023-11-14) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a4...1.1.1a5) + +**Implemented enhancements:** + +- Refactor services into `NeonBusService` [\#32](https://github.com/NeonGeckoCom/neon_messagebus/issues/32) + +**Merged pull requests:** + +- Refactor Service and Update Dependencies [\#63](https://github.com/NeonGeckoCom/neon_messagebus/pull/63) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a4](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a4) (2023-11-14) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a3...1.1.1a4) + +**Merged pull requests:** + +- Update dependencies and add Docker extras [\#64](https://github.com/NeonGeckoCom/neon_messagebus/pull/64) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a3](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a3) (2023-07-27) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a2...1.1.1a3) + +**Merged pull requests:** + +- Update MQ dependencies for K8s compat [\#62](https://github.com/NeonGeckoCom/neon_messagebus/pull/62) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a2](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a2) (2023-07-25) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a1...1.1.1a2) + +**Merged pull requests:** + +- Update configuration handling for Kubernetes compat [\#61](https://github.com/NeonGeckoCom/neon_messagebus/pull/61) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [1.1.1a1](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a1) (2023-07-25) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.0...1.1.1a1) + +**Merged pull requests:** + +- Update Docker default config path [\#60](https://github.com/NeonGeckoCom/neon_messagebus/pull/60) ([NeonDaniel](https://github.com/NeonDaniel)) diff --git a/Dockerfile b/Dockerfile index 247e8a9..7b86c49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,7 @@ LABEL vendor=neon.ai \ ENV OVOS_CONFIG_BASE_FOLDER neon ENV OVOS_CONFIG_FILENAME neon.yaml +ENV XDG_CONFIG_HOME /config EXPOSE 8181 @@ -19,7 +20,7 @@ ADD . /neon_messagebus WORKDIR /neon_messagebus RUN pip install wheel \ - && pip install . + && pip install .[docker] COPY docker_overlay/ / diff --git a/docker_overlay/etc/neon/neon.yaml b/docker_overlay/etc/neon/neon.yaml index 2dcaa67..55f9071 100644 --- a/docker_overlay/etc/neon/neon.yaml +++ b/docker_overlay/etc/neon/neon.yaml @@ -10,6 +10,10 @@ websocket: ssl_cert: ssl_key: shared_connection: true +logs: + level_overrides: + warning: + - pika system: protected_keys: remote: diff --git a/neon_messagebus/service/__init__.py b/neon_messagebus/service/__init__.py index abc3096..2e926f3 100644 --- a/neon_messagebus/service/__init__.py +++ b/neon_messagebus/service/__init__.py @@ -33,10 +33,17 @@ from time import sleep from os.path import expanduser, isfile from threading import Thread, Event + +from ovos_bus_client import MessageBusClient, Message +from ovos_utils.process_utils import StatusCallbackMap, ProcessStatus from tornado import web, ioloop from ovos_utils.log import LOG +from ovos_config.config import Configuration from ovos_messagebus.event_handler import MessageBusEventHandler -from neon_messagebus.util.config import load_message_bus_config +from ovos_messagebus.load_config import load_message_bus_config + +from neon_messagebus.util.mq_connector import start_mq_connector +from neon_messagebus.util.signal_utils import SignalManager def on_ready(): @@ -64,38 +71,95 @@ def __init__(self, ready_hook=on_ready, error_hook=on_error, stopping_hook=on_stopping, alive_hook=on_alive, started_hook=on_started, config=None, debug=False, daemonic=False): - alive_hook() - self._started = Event() super().__init__() - self.config = config or load_message_bus_config() + callbacks = StatusCallbackMap(on_ready=ready_hook, + on_error=error_hook, + on_stopping=stopping_hook, + on_alive=alive_hook, + on_started=started_hook) + self.service_id = "bus" + self.status = ProcessStatus(self.service_id, callback_map=callbacks) + self.status.set_alive() + + self.config = config or Configuration() self.debug = debug self.daemon = daemonic self._stopping = Event() + self._running = Event() - self._started_hook = started_hook - self._ready_hook = ready_hook - self._stopping_hook = stopping_hook - self._error_hook = error_hook - + self._bus = None self._app = None self._loop = None + self._loop_thread = None + self._signal_manager = None + self._mq_connector = None @property def started(self) -> Event: - return self._started + return self._running def run(self): - self._started_hook() + self.status.set_started() self._stopping.clear() LOG.info('Starting message bus service...') self._init_tornado() self._listen() + self._loop_thread = Thread(target=ioloop.IOLoop.instance().start) + self._loop_thread.start() + + self._bus = self._init_bus_client() + self._init_signal_manager() + self._init_mq_connector() + + self.status.set_ready() + self._running.set() LOG.info('Message bus service started!') - ioloop.IOLoop.instance().start() - # self._stopping.wait() - # _loop.stop() - # self._started.set() + self._stopping.wait() + + def _init_bus_client(self) -> MessageBusClient: + config_dict = {k: v for k, v in self.config.get("websocket", {}).items() + if k in ("host", "port", "route", "ssl")} + config_dict['host'] = "0.0.0.0" + bus = MessageBusClient(**config_dict) + bus.run_in_thread() + bus.on('neon.languages.get', self._handle_get_languages) + + return bus + + def _handle_get_languages(self, message: Message): + """ + Handle a request to get languages supported by Neon Core. + @param message: neon.languages.get Message + """ + from neon_utils.language_utils import get_supported_languages + supported_langs = get_supported_languages() + self._bus.emit(message.response({"stt": list(supported_langs.stt), + "tts": list(supported_langs.tts), + "skills": list(supported_langs.skills) + })) + + def _init_signal_manager(self): + self._signal_manager = SignalManager(self._bus) + LOG.info("Signal Manager started") + + def _init_mq_connector(self): + if not self.config.get("MQ"): + LOG.info("No MQ Configuration") + return + try: + self._mq_connector = start_mq_connector(self.config) + if self._mq_connector: + LOG.info(f"MQ Connection Established to " + f"{self._mq_connector.config.get('server')}:" + f"{self._mq_connector.config.get('port')}") + else: + LOG.info("No MQ Credentials provided") + except ImportError as e: + LOG.warning(f"MQ Connector module not available: {e}") + except Exception as e: + LOG.error("Connector not started") + LOG.exception(e) def _init_tornado(self): # Disable all tornado logging so mycroft loglevel isn't overridden @@ -105,13 +169,14 @@ def _init_tornado(self): asyncio.set_event_loop(self._loop) def _listen(self): - routes = [(self.config.route, MessageBusEventHandler)] + config = load_message_bus_config(**self.config.get('websocket', {})) + routes = [(config.route, MessageBusEventHandler)] application = web.Application(routes, debug=self.debug) ssl_options = None - LOG.info(f"Starting Messagebus server with config: {self.config}") - if self.config.ssl: - cert = expanduser(self.config.ssl_cert) - key = expanduser(self.config.ssl_key) + LOG.info(f"Starting Messagebus server with config: {config}") + if config.ssl: + cert = expanduser(config.ssl_cert) + key = expanduser(config.ssl_key) if not isfile(key) or not isfile(cert): LOG.error( "ssl keys dont exist, falling back to unsecured socket") @@ -121,23 +186,36 @@ def _listen(self): ssl_options = {"certfile": cert, "keyfile": key} if ssl_options: LOG.info("wss listener started") - self._app = application.listen(self.config.port, self.config.host, + self._app = application.listen(config.port, config.host, ssl_options=ssl_options) else: LOG.info("ws listener started") - self._app = application.listen(self.config.port, self.config.host) + self._app = application.listen(config.port, config.host) def shutdown(self): LOG.info("Messagebus Server shutting down.") - self._stopping_hook() + self.status.set_stopping() self._app.stop() loop = ioloop.IOLoop.instance() loop.add_callback(loop.stop) sleep(1) loop.close() - self._loop.call_soon_threadsafe(self._loop.stop) + try: + self._loop.call_soon_threadsafe(self._loop.stop) + except RuntimeError as e: + LOG.debug(e) while self._loop.is_running(): LOG.debug("Waiting for loop to stop...") sleep(1) self._loop.close() + self._loop_thread.join() + + if self._mq_connector: + from pika.exceptions import StreamLostError + try: + self._mq_connector.stop() + except StreamLostError: + pass + + self._stopping.set() LOG.info("Messagebus service stopped") diff --git a/neon_messagebus/service/__main__.py b/neon_messagebus/service/__main__.py index f9fa31c..e979572 100644 --- a/neon_messagebus/service/__main__.py +++ b/neon_messagebus/service/__main__.py @@ -31,12 +31,8 @@ from ovos_utils.process_utils import reset_sigint_handler, PIDLock as Lock from neon_utils.log_utils import init_log from neon_utils.process_utils import start_malloc, snapshot_malloc, print_malloc -from ovos_bus_client.client import MessageBusClient from ovos_config.config import Configuration from neon_messagebus.service import NeonBusService -from neon_messagebus.util.signal_utils import SignalManager -from neon_messagebus.util.mq_connector import start_mq_connector -from neon_messagebus.util.config import load_message_bus_config def main(**kwargs): @@ -47,32 +43,14 @@ def main(**kwargs): config = Configuration() debug = Configuration().get('debug', False) malloc_running = start_malloc(config, stack_depth=4) - service = NeonBusService(debug=debug, daemonic=True, **kwargs) - service.start() - messagebus_config = load_message_bus_config() - config_dict = messagebus_config._asdict() - config_dict['host'] = "0.0.0.0" - if not service.started.wait(10): - LOG.warning("Timeout waiting for service start") - client = MessageBusClient(**config_dict) - SignalManager(client) - LOG.info("Signal Manager Initialized") - - connector = None - try: - connector = start_mq_connector(config_dict) - if connector: - LOG.info("MQ Connection Established") - else: - LOG.info("No MQ Credentials provided") - except ImportError as e: - LOG.warning(f"MQ Connector module not available: {e}") - except Exception as e: - LOG.error("Connector not started") - LOG.exception(e) + kwargs.setdefault("debug", debug) + kwargs.setdefault("config", config) - service._ready_hook() + service = NeonBusService(daemonic=True, **kwargs) + service.start() + LOG.debug("Waiting for exit signal") wait_for_exit_signal() + if malloc_running: try: print_malloc(snapshot_malloc()) @@ -80,14 +58,7 @@ def main(**kwargs): LOG.error(e) service.shutdown() - if connector: - from pika.exceptions import StreamLostError - try: - connector.stop() - except StreamLostError: - pass lock.delete() - LOG.info("Messagebus service stopped") def deprecated_entrypoint(): diff --git a/neon_messagebus/util/mq_connector.py b/neon_messagebus/util/mq_connector.py index a1d93bf..9764d29 100644 --- a/neon_messagebus/util/mq_connector.py +++ b/neon_messagebus/util/mq_connector.py @@ -30,21 +30,18 @@ from ovos_utils.log import LOG -def start_mq_connector(bus_config: dict): +def start_mq_connector(config: dict): """ Start the MQ Connector module to handle MQ API requests + @param config: Configuration object """ from neon_messagebus_mq_connector.controller import ChatAPIProxy - config = Configuration() - mq_config = config.get("MQ", {}) - bus_config = bus_config or dict(config).get("websocket") + config = config or Configuration() - if "neon_chat_api" not in mq_config.get("users", {}): + if "neon_chat_api" not in config.get("MQ", {}).get("users", {}): LOG.info("Skipping MQ Connector init") return None - bus_config = bus_config chat_connector = ChatAPIProxy(service_name="neon_chat_api", - config={"MQ": mq_config, - "MESSAGEBUS": bus_config}) + config=config) chat_connector.run(run_sync=False) return chat_connector diff --git a/requirements/docker.txt b/requirements/docker.txt new file mode 100644 index 0000000..86b909e --- /dev/null +++ b/requirements/docker.txt @@ -0,0 +1 @@ +neon-mana-utils~=0.2 \ No newline at end of file diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 2b6371b..064d0d0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,9 +1,10 @@ -neon-messagebus-mq-connector~=0.3,>=0.3.4 +neon-messagebus-mq-connector~=0.4 +neon-mq-connector~=0.7,>=0.7.1 ovos-messagebus~=0.0.3 ovos_utils~=0.0.32 ovos-config~=0.0.10 -ovos-bus-client~=0.0.4 +ovos-bus-client==0.0.6a21 tornado~=6.0,>=6.0.3 neon_utils[network]~=1.6 click~=8.0 -click-default-group~=1.2 \ No newline at end of file +click-default-group~=1.2 diff --git a/setup.py b/setup.py index bb2fde2..46f3db4 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,9 @@ def get_requirements(requirements_filename: str): url='https://github.com/NeonGeckoCom/neon_messagebus', license='BSD-3-Clause', install_requires=get_requirements("requirements.txt"), + extras_require={ + "docker": get_requirements("docker.txt") + }, author='Neongecko', author_email='developers@neon.ai', description="Neon Messagebus Module", diff --git a/tests/test_messagebus_service.py b/tests/test_messagebus_service.py index 5d4bf64..c65d47c 100644 --- a/tests/test_messagebus_service.py +++ b/tests/test_messagebus_service.py @@ -40,6 +40,9 @@ from neon_messagebus.service import NeonBusService +_mock_langs = Mock(stt={"stt1", "stt2"}, tts={"tts1", "tts2", "tts3"}, + skills={"skills1"}) + class TestMessagebusService(unittest.TestCase): def test_bus_service(self): called_count = 0 @@ -55,21 +58,24 @@ def _callback_method(message): started = Mock() ready = Mock() stopping = Mock() + service = NeonBusService(alive_hook=alive, started_hook=started, ready_hook=ready, stopping_hook=stopping, debug=True, daemonic=True) + # Test init alive.assert_called_once() started.assert_not_called() ready.assert_not_called() stopping.assert_not_called() + # Test service start service.start() + LOG.info("Waiting for service start") + self.assertTrue(service.started.wait(15)) alive.assert_called_once() started.assert_called_once() - ready.assert_not_called() + ready.assert_called_once() stopping.assert_not_called() - LOG.info("Waiting for service start") - self.assertTrue(service.started.wait(15)) - LOG.info("Service started") + # Test client connections for i in range(32): client = MessageBusClient() client.run_in_thread() @@ -87,7 +93,7 @@ def _callback_method(message): sleep(1) self.assertEqual(len(clients), called_count) - + # Test shutdown self.assertTrue(service.started.is_set()) self.assertTrue(service.is_alive()) service.shutdown() @@ -95,6 +101,22 @@ def _callback_method(message): service.join() self.assertFalse(service.is_alive()) + @patch("neon_utils.language_utils.get_supported_languages") + def test_get_languages(self, get_langs): + from ovos_utils.messagebus import FakeBus + get_langs.return_value = _mock_langs + bus = FakeBus() + service = NeonBusService() + service._bus = bus + on_langs = Mock() + bus.on("neon.languages.get.response", on_langs) + service._handle_get_languages(Message("neon.languages.get")) + on_langs.assert_called_once() + resp = on_langs.call_args[0][0] + self.assertEqual(resp.data, {"stt": list(_mock_langs.stt), + "tts": list(_mock_langs.tts), + "skills": list(_mock_langs.skills)}) + def test_service_shutdown(self): service = NeonBusService(daemonic=False) service.start() diff --git a/tests/test_utils.py b/tests/test_utils.py index 5c11b1e..c6866ca 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -36,6 +36,7 @@ import mock from ovos_bus_client import MessageBusClient, Message +from ovos_utils.messagebus import FakeBus sys.path.append(os.path.dirname(os.path.dirname(__file__))) from neon_messagebus.service import NeonBusService @@ -180,20 +181,17 @@ def test_decode_binary_message(self): class TestSignalUtils(unittest.TestCase): - @classmethod - def setUpClass(cls) -> None: - from neon_messagebus.util.signal_utils import SignalManager - cls.service = NeonBusService(debug=True, daemonic=True) - cls.service.start() - cls.service.started.wait() - cls.signal_manager = SignalManager() + from neon_messagebus.util.signal_utils import SignalManager + from neon_utils.signal_utils import init_signal_bus - @classmethod - def tearDownClass(cls) -> None: - cls.service.shutdown() + bus = FakeBus() + bus.connected_event = Event() + bus.connected_event.set() + signal_manager = SignalManager(bus) + init_signal_bus(bus) def test_00_init_signal_handlers(self): - from neon_utils.signal_utils import init_signal_handlers + from neon_utils.signal_utils import init_signal_handlers, init_signal_bus init_signal_handlers() from neon_utils.signal_utils import _manager_create_signal, \ _create_signal diff --git a/version.py b/version.py index c14a38c..098c280 100644 --- a/version.py +++ b/version.py @@ -26,4 +26,4 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -__version__ = "1.1.0" +__version__ = "2.0.0"