Skip to content

Commit

Permalink
2.0.0 (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel authored Dec 16, 2023
2 parents 373ef5b + 87f711a commit a04f51a
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 94 deletions.
72 changes: 66 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,80 @@
# Changelog

## [1.0.2a2](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.0.2a2) (2023-07-18)
## [1.1.1a9](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a9) (2023-12-14)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.0.2a1...1.0.2a2)
[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a8...1.1.1a9)

**Merged pull requests:**

- Update dependencies and mark deprecation [\#58](https://github.com/NeonGeckoCom/neon_messagebus/pull/58) ([NeonDaniel](https://github.com/NeonDaniel))
- Update dependencies to prep release [\#68](https://github.com/NeonGeckoCom/neon_messagebus/pull/68) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.0.2a1](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.0.2a1) (2023-06-27)
## [1.1.1a8](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a8) (2023-11-28)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.0.1...1.0.2a1)
[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a7...1.1.1a8)

**Merged pull requests:**

- Update Docker to use OVOS\_CONFIG envvars [\#57](https://github.com/NeonGeckoCom/neon_messagebus/pull/57) ([NeonDaniel](https://github.com/NeonDaniel))
- Add language support handler and Update dependencies [\#67](https://github.com/NeonGeckoCom/neon_messagebus/pull/67) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a7](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a7) (2023-11-18)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a6...1.1.1a7)

**Merged pull requests:**

- Update MQ Connector with routing fix [\#66](https://github.com/NeonGeckoCom/neon_messagebus/pull/66) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a6](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a6) (2023-11-15)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a5...1.1.1a6)

**Merged pull requests:**

- Update neon-messagebus-mq-connector to support response event routing [\#65](https://github.com/NeonGeckoCom/neon_messagebus/pull/65) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a5](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a5) (2023-11-14)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a4...1.1.1a5)

**Implemented enhancements:**

- Refactor services into `NeonBusService` [\#32](https://github.com/NeonGeckoCom/neon_messagebus/issues/32)

**Merged pull requests:**

- Refactor Service and Update Dependencies [\#63](https://github.com/NeonGeckoCom/neon_messagebus/pull/63) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a4](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a4) (2023-11-14)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a3...1.1.1a4)

**Merged pull requests:**

- Update dependencies and add Docker extras [\#64](https://github.com/NeonGeckoCom/neon_messagebus/pull/64) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a3](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a3) (2023-07-27)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a2...1.1.1a3)

**Merged pull requests:**

- Update MQ dependencies for K8s compat [\#62](https://github.com/NeonGeckoCom/neon_messagebus/pull/62) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a2](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a2) (2023-07-25)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.1a1...1.1.1a2)

**Merged pull requests:**

- Update configuration handling for Kubernetes compat [\#61](https://github.com/NeonGeckoCom/neon_messagebus/pull/61) ([NeonDaniel](https://github.com/NeonDaniel))

## [1.1.1a1](https://github.com/NeonGeckoCom/neon_messagebus/tree/1.1.1a1) (2023-07-25)

[Full Changelog](https://github.com/NeonGeckoCom/neon_messagebus/compare/1.1.0...1.1.1a1)

**Merged pull requests:**

- Update Docker default config path [\#60](https://github.com/NeonGeckoCom/neon_messagebus/pull/60) ([NeonDaniel](https://github.com/NeonDaniel))



Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ LABEL vendor=neon.ai \

ENV OVOS_CONFIG_BASE_FOLDER neon
ENV OVOS_CONFIG_FILENAME neon.yaml
ENV XDG_CONFIG_HOME /config

EXPOSE 8181

Expand All @@ -19,7 +20,7 @@ ADD . /neon_messagebus
WORKDIR /neon_messagebus

RUN pip install wheel \
&& pip install .
&& pip install .[docker]

COPY docker_overlay/ /

Expand Down
4 changes: 4 additions & 0 deletions docker_overlay/etc/neon/neon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ websocket:
ssl_cert:
ssl_key:
shared_connection: true
logs:
level_overrides:
warning:
- pika
system:
protected_keys:
remote:
Expand Down
126 changes: 102 additions & 24 deletions neon_messagebus/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@
from time import sleep
from os.path import expanduser, isfile
from threading import Thread, Event

from ovos_bus_client import MessageBusClient, Message
from ovos_utils.process_utils import StatusCallbackMap, ProcessStatus
from tornado import web, ioloop
from ovos_utils.log import LOG
from ovos_config.config import Configuration
from ovos_messagebus.event_handler import MessageBusEventHandler
from neon_messagebus.util.config import load_message_bus_config
from ovos_messagebus.load_config import load_message_bus_config

from neon_messagebus.util.mq_connector import start_mq_connector
from neon_messagebus.util.signal_utils import SignalManager


def on_ready():
Expand Down Expand Up @@ -64,38 +71,95 @@ def __init__(self, ready_hook=on_ready, error_hook=on_error,
stopping_hook=on_stopping, alive_hook=on_alive,
started_hook=on_started,
config=None, debug=False, daemonic=False):
alive_hook()
self._started = Event()
super().__init__()
self.config = config or load_message_bus_config()
callbacks = StatusCallbackMap(on_ready=ready_hook,
on_error=error_hook,
on_stopping=stopping_hook,
on_alive=alive_hook,
on_started=started_hook)
self.service_id = "bus"
self.status = ProcessStatus(self.service_id, callback_map=callbacks)
self.status.set_alive()

self.config = config or Configuration()
self.debug = debug
self.daemon = daemonic
self._stopping = Event()
self._running = Event()

self._started_hook = started_hook
self._ready_hook = ready_hook
self._stopping_hook = stopping_hook
self._error_hook = error_hook

self._bus = None
self._app = None
self._loop = None
self._loop_thread = None
self._signal_manager = None
self._mq_connector = None

@property
def started(self) -> Event:
return self._started
return self._running

def run(self):
self._started_hook()
self.status.set_started()
self._stopping.clear()

LOG.info('Starting message bus service...')
self._init_tornado()
self._listen()
self._loop_thread = Thread(target=ioloop.IOLoop.instance().start)
self._loop_thread.start()

self._bus = self._init_bus_client()
self._init_signal_manager()
self._init_mq_connector()

self.status.set_ready()
self._running.set()
LOG.info('Message bus service started!')
ioloop.IOLoop.instance().start()
# self._stopping.wait()
# _loop.stop()
# self._started.set()
self._stopping.wait()

def _init_bus_client(self) -> MessageBusClient:
config_dict = {k: v for k, v in self.config.get("websocket", {}).items()
if k in ("host", "port", "route", "ssl")}
config_dict['host'] = "0.0.0.0"
bus = MessageBusClient(**config_dict)
bus.run_in_thread()
bus.on('neon.languages.get', self._handle_get_languages)

return bus

def _handle_get_languages(self, message: Message):
"""
Handle a request to get languages supported by Neon Core.
@param message: neon.languages.get Message
"""
from neon_utils.language_utils import get_supported_languages
supported_langs = get_supported_languages()
self._bus.emit(message.response({"stt": list(supported_langs.stt),
"tts": list(supported_langs.tts),
"skills": list(supported_langs.skills)
}))

def _init_signal_manager(self):
self._signal_manager = SignalManager(self._bus)
LOG.info("Signal Manager started")

def _init_mq_connector(self):
if not self.config.get("MQ"):
LOG.info("No MQ Configuration")
return
try:
self._mq_connector = start_mq_connector(self.config)
if self._mq_connector:
LOG.info(f"MQ Connection Established to "
f"{self._mq_connector.config.get('server')}:"
f"{self._mq_connector.config.get('port')}")
else:
LOG.info("No MQ Credentials provided")
except ImportError as e:
LOG.warning(f"MQ Connector module not available: {e}")
except Exception as e:
LOG.error("Connector not started")
LOG.exception(e)

def _init_tornado(self):
# Disable all tornado logging so mycroft loglevel isn't overridden
Expand All @@ -105,13 +169,14 @@ def _init_tornado(self):
asyncio.set_event_loop(self._loop)

def _listen(self):
routes = [(self.config.route, MessageBusEventHandler)]
config = load_message_bus_config(**self.config.get('websocket', {}))
routes = [(config.route, MessageBusEventHandler)]
application = web.Application(routes, debug=self.debug)
ssl_options = None
LOG.info(f"Starting Messagebus server with config: {self.config}")
if self.config.ssl:
cert = expanduser(self.config.ssl_cert)
key = expanduser(self.config.ssl_key)
LOG.info(f"Starting Messagebus server with config: {config}")
if config.ssl:
cert = expanduser(config.ssl_cert)
key = expanduser(config.ssl_key)
if not isfile(key) or not isfile(cert):
LOG.error(
"ssl keys dont exist, falling back to unsecured socket")
Expand All @@ -121,23 +186,36 @@ def _listen(self):
ssl_options = {"certfile": cert, "keyfile": key}
if ssl_options:
LOG.info("wss listener started")
self._app = application.listen(self.config.port, self.config.host,
self._app = application.listen(config.port, config.host,
ssl_options=ssl_options)
else:
LOG.info("ws listener started")
self._app = application.listen(self.config.port, self.config.host)
self._app = application.listen(config.port, config.host)

def shutdown(self):
LOG.info("Messagebus Server shutting down.")
self._stopping_hook()
self.status.set_stopping()
self._app.stop()
loop = ioloop.IOLoop.instance()
loop.add_callback(loop.stop)
sleep(1)
loop.close()
self._loop.call_soon_threadsafe(self._loop.stop)
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except RuntimeError as e:
LOG.debug(e)
while self._loop.is_running():
LOG.debug("Waiting for loop to stop...")
sleep(1)
self._loop.close()
self._loop_thread.join()

if self._mq_connector:
from pika.exceptions import StreamLostError
try:
self._mq_connector.stop()
except StreamLostError:
pass

self._stopping.set()
LOG.info("Messagebus service stopped")
41 changes: 6 additions & 35 deletions neon_messagebus/service/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@
from ovos_utils.process_utils import reset_sigint_handler, PIDLock as Lock
from neon_utils.log_utils import init_log
from neon_utils.process_utils import start_malloc, snapshot_malloc, print_malloc
from ovos_bus_client.client import MessageBusClient
from ovos_config.config import Configuration
from neon_messagebus.service import NeonBusService
from neon_messagebus.util.signal_utils import SignalManager
from neon_messagebus.util.mq_connector import start_mq_connector
from neon_messagebus.util.config import load_message_bus_config


def main(**kwargs):
Expand All @@ -47,47 +43,22 @@ def main(**kwargs):
config = Configuration()
debug = Configuration().get('debug', False)
malloc_running = start_malloc(config, stack_depth=4)
service = NeonBusService(debug=debug, daemonic=True, **kwargs)
service.start()
messagebus_config = load_message_bus_config()
config_dict = messagebus_config._asdict()
config_dict['host'] = "0.0.0.0"
if not service.started.wait(10):
LOG.warning("Timeout waiting for service start")
client = MessageBusClient(**config_dict)
SignalManager(client)
LOG.info("Signal Manager Initialized")

connector = None
try:
connector = start_mq_connector(config_dict)
if connector:
LOG.info("MQ Connection Established")
else:
LOG.info("No MQ Credentials provided")
except ImportError as e:
LOG.warning(f"MQ Connector module not available: {e}")
except Exception as e:
LOG.error("Connector not started")
LOG.exception(e)
kwargs.setdefault("debug", debug)
kwargs.setdefault("config", config)

service._ready_hook()
service = NeonBusService(daemonic=True, **kwargs)
service.start()
LOG.debug("Waiting for exit signal")
wait_for_exit_signal()

if malloc_running:
try:
print_malloc(snapshot_malloc())
except Exception as e:
LOG.error(e)
service.shutdown()

if connector:
from pika.exceptions import StreamLostError
try:
connector.stop()
except StreamLostError:
pass
lock.delete()
LOG.info("Messagebus service stopped")


def deprecated_entrypoint():
Expand Down
13 changes: 5 additions & 8 deletions neon_messagebus/util/mq_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,18 @@
from ovos_utils.log import LOG


def start_mq_connector(bus_config: dict):
def start_mq_connector(config: dict):
"""
Start the MQ Connector module to handle MQ API requests
@param config: Configuration object
"""
from neon_messagebus_mq_connector.controller import ChatAPIProxy
config = Configuration()
mq_config = config.get("MQ", {})
bus_config = bus_config or dict(config).get("websocket")
config = config or Configuration()

if "neon_chat_api" not in mq_config.get("users", {}):
if "neon_chat_api" not in config.get("MQ", {}).get("users", {}):
LOG.info("Skipping MQ Connector init")
return None
bus_config = bus_config
chat_connector = ChatAPIProxy(service_name="neon_chat_api",
config={"MQ": mq_config,
"MESSAGEBUS": bus_config})
config=config)
chat_connector.run(run_sync=False)
return chat_connector
Loading

0 comments on commit a04f51a

Please sign in to comment.