From ebadd0b9173f0bf15f97c889ff961882aadbbf62 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 25 Nov 2024 16:30:31 -0800 Subject: [PATCH 01/10] Defines an SIO and MQ `configured_personas_changed` event that is emitted any time the server makes a change to the database Leaves existing logic untouched to allow clients to request an update from the server at any time Relates to https://github.com/NeonGeckoCom/neon-llm-core/issues/8 --- chat_server/blueprints/personas.py | 19 ++++++++++++-- services/klatchat_observer/controller.py | 33 +++++++++++++++++++++--- 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index f299652e..c4a5ec2b 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -25,9 +25,12 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import json + from fastapi import APIRouter from starlette.responses import JSONResponse +from chat_server.server_utils.api_dependencies import CurrentUserModel from chat_server.server_utils.enums import RequestModelType, UserRoles from chat_server.server_utils.http_exceptions import ( ItemNotFoundException, @@ -47,7 +50,7 @@ PersonaData, ) from chat_server.server_utils.api_dependencies.validators import permitted_access - +from chat_server.sio.server import sio from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI @@ -61,7 +64,7 @@ async def list_personas( current_user: CurrentUserData, request_model: ListPersonasQueryModel = permitted_access(ListPersonasQueryModel), -): +) -> JSONResponse: """Lists personas matching query params""" filters = [] if request_model.llms: @@ -112,6 +115,7 @@ async def add_persona( if existing_model: raise DuplicatedItemException MongoDocumentsAPI.PERSONAS.add_item(data=request_model.model_dump()) + await _notify_personas_changed() return KlatAPIResponse.OK @@ -131,6 +135,7 @@ async def set_persona( MongoDocumentsAPI.PERSONAS.update_item( filters=mongo_filter, data=request_model.model_dump() ) + await _notify_personas_changed() return KlatAPIResponse.OK @@ -140,6 +145,7 @@ async def delete_persona( ): """Deletes persona""" MongoDocumentsAPI.PERSONAS.delete_item(item_id=request_model.persona_id) + await _notify_personas_changed() return KlatAPIResponse.OK @@ -157,4 +163,13 @@ async def toggle_persona_state( ) if updated_data.matched_count == 0: raise ItemNotFoundException + await _notify_personas_changed() return KlatAPIResponse.OK + + +async def _notify_personas_changed(): + response = await list_personas(CurrentUserModel(_id="", nickname="", + first_name="", last_name=""), + ListPersonasQueryModel(only_enabled=True)) + enabled_personas = json.loads(response.body.decode()) + sio.emit("configured_personas_changed", enabled_personas) diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index f2baf2e6..39687eeb 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -30,6 +30,8 @@ import time from queue import Queue +from typing import Optional + import cachetools.func from threading import Event, Timer @@ -323,7 +325,7 @@ def get_neon_service(self, wait_timeout: int = 10) -> None: LOG.info("Joining sync consumer") sync_consumer.join() if not self.neon_service_event.is_set(): - LOG.warning(f"Failed to get neon_service in {wait_timeout} seconds") + LOG.warning(f"Failed to get neon response in {wait_timeout} seconds") self.__neon_service_id = "" def register_sio_handlers(self): @@ -351,6 +353,8 @@ def register_sio_handlers(self): ) self._sio.on("prompts_data_updated", handler=self.forward_prompts_data_update) self._sio.on("auth_expired", handler=self._handle_auth_expired) + self._sio.on("configured_personas_changed", + handler=self._handle_personas_changed) def connect_sio(self): """ @@ -435,6 +439,7 @@ def get_neon_request_structure(msg_data: dict): if requested_skill == "tts": utterance = msg_data.pop("utterance", "") or msg_data.pop("text", "") request_dict = { + "msg_type": "neon.get_tts", "data": { "utterance": utterance, "text": utterance, @@ -443,12 +448,14 @@ def get_neon_request_structure(msg_data: dict): } elif requested_skill == "stt": request_dict = { + "msg_type": "neon.get_stt", "data": { "audio_data": msg_data.pop("audio_data", msg_data["message_body"]), } } else: request_dict = { + "msg_type": "recognizer_loop:utterance", "data": { "utterances": [msg_data["message_body"]], }, @@ -458,13 +465,17 @@ def get_neon_request_structure(msg_data: dict): return request_dict def _handle_neon_recipient(self, recipient_data: dict, msg_data: dict): + """ + Handle a chat message intended for Neon. + """ msg_data.setdefault("message_body", msg_data.pop("messageText", "")) msg_data.setdefault("message_id", msg_data.pop("messageID", "")) recipient_data.setdefault("context", {}) pattern = re.compile("Neon", re.IGNORECASE) msg_data["message_body"] = ( - pattern.sub("", msg_data["message_body"], 1).strip("<>@,.:|- ").capitalize() + pattern.sub("", msg_data["message_body"], 1).strip("<>@,.:|- \n") ) + # This is really referencing an MQ endpoint (i.e. stt, tts), not a skill msg_data.setdefault( "requested_skill", recipient_data["context"].pop("service", "recognizer") ) @@ -838,6 +849,7 @@ def on_subminds_state(self, body: dict): @create_mq_callback() def on_get_configured_personas(self, body: dict): + # Handles request to get all defined personas response_data = self._fetch_persona_api(user_id=body.get("user_id")) response_data["items"] = [ item @@ -880,7 +892,7 @@ def handle_get_prompts_data(self, body: dict): LOG.error(f"Failed to fetch prompts from {url}: {ex}") @cachetools.func.ttl_cache(ttl=15) - def _fetch_persona_api(self, user_id: str) -> dict: + def _fetch_persona_api(self, user_id: Optional[str]) -> dict: query_string = self._build_persona_api_query(user_id=user_id) url = f"{self.server_url}/personas/list?{query_string}" try: @@ -892,12 +904,25 @@ def _fetch_persona_api(self, user_id: str) -> dict: data = {"items": []} return data + def _handle_personas_changed(self, data: dict): + """ + SIO handler called when configured personas are modified. This emits an + MQ message to allow any connected listeners to maintain a set of known + personas. + """ + self.send_message( + request_data=data, + vhost=self.get_vhost("llm"), + queue="configured_personas_changed", + expiration=5000, + ) + def _refresh_default_persona_llms(self, data): for item in data["items"]: if default_llm := item.get("default_llm"): self.default_persona_llms[item["id"]] = item["id"] + "_" + default_llm - def _build_persona_api_query(self, user_id: str) -> str: + def _build_persona_api_query(self, user_id: Optional[str]) -> str: url_query_params = f"only_enabled=true" if user_id: url_query_params += f"&user_id={user_id}" From 4cb870027174f51870881a0949f0694604bb14ec Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 26 Nov 2024 13:22:57 -0800 Subject: [PATCH 02/10] Refactor persona change notifications to be scoped per-LLM to match existing implementation --- chat_server/blueprints/personas.py | 36 ++++++++++++++++++------ services/klatchat_observer/controller.py | 13 +++++---- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index c4a5ec2b..52f6bdfa 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -27,6 +27,7 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import json +from typing import List, Optional from fastapi import APIRouter from starlette.responses import JSONResponse @@ -115,7 +116,7 @@ async def add_persona( if existing_model: raise DuplicatedItemException MongoDocumentsAPI.PERSONAS.add_item(data=request_model.model_dump()) - await _notify_personas_changed() + await _notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -135,7 +136,7 @@ async def set_persona( MongoDocumentsAPI.PERSONAS.update_item( filters=mongo_filter, data=request_model.model_dump() ) - await _notify_personas_changed() + await _notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -167,9 +168,28 @@ async def toggle_persona_state( return KlatAPIResponse.OK -async def _notify_personas_changed(): - response = await list_personas(CurrentUserModel(_id="", nickname="", - first_name="", last_name=""), - ListPersonasQueryModel(only_enabled=True)) - enabled_personas = json.loads(response.body.decode()) - sio.emit("configured_personas_changed", enabled_personas) +async def _notify_personas_changed(supported_llms: Optional[List[str]] = None): + """ + Emit an SIO event for each LLM affected by a persona change. This sends a + complete set of personas rather than only the changed one to prevent sync + conflicts and simplify client-side logic. + :param supported_llms: List of LLM names affected by a transaction. If None, + then updates all LLMs listed in database configuration + """ + resp = await list_personas(CurrentUserModel(_id="", nickname="", + first_name="", last_name=""), + ListPersonasQueryModel(only_enabled=True)) + enabled_personas = json.loads(resp.body.decode()) + valid_personas = {} + if supported_llms: + # Only broadcast updates for LLMs affected by an insert/change request + for llm in supported_llms: + valid_personas[llm] = [per for per in enabled_personas["items"] if + llm in per["supported_llms"]] + else: + # Delete request does not have LLM context, update everything + for persona in enabled_personas["items"]: + for llm in persona["supported_llms"]: + valid_personas.setdefault(llm, []) + valid_personas[llm].append(persona) + sio.emit("configured_personas_changed", {"personas": valid_personas}) diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index 39687eeb..5e3ab81a 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -910,12 +910,13 @@ def _handle_personas_changed(self, data: dict): MQ message to allow any connected listeners to maintain a set of known personas. """ - self.send_message( - request_data=data, - vhost=self.get_vhost("llm"), - queue="configured_personas_changed", - expiration=5000, - ) + for llm, personas in data["personas"].items(): + self.send_message( + request_data={"items": personas}, + vhost=self.get_vhost("llm"), + queue=f"{llm}_personas_input", + expiration=5000, + ) def _refresh_default_persona_llms(self, data): for item in data["items"]: From 9d28b8b29f6bfda531fc548e53f954687344a521 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 26 Nov 2024 13:51:03 -0800 Subject: [PATCH 03/10] Remove irrelevant changes to observer Change comment to docstring --- services/klatchat_observer/controller.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index 5e3ab81a..e6381c3c 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -325,7 +325,7 @@ def get_neon_service(self, wait_timeout: int = 10) -> None: LOG.info("Joining sync consumer") sync_consumer.join() if not self.neon_service_event.is_set(): - LOG.warning(f"Failed to get neon response in {wait_timeout} seconds") + LOG.warning(f"Failed to get neon_service in {wait_timeout} seconds") self.__neon_service_id = "" def register_sio_handlers(self): @@ -439,7 +439,6 @@ def get_neon_request_structure(msg_data: dict): if requested_skill == "tts": utterance = msg_data.pop("utterance", "") or msg_data.pop("text", "") request_dict = { - "msg_type": "neon.get_tts", "data": { "utterance": utterance, "text": utterance, @@ -448,14 +447,12 @@ def get_neon_request_structure(msg_data: dict): } elif requested_skill == "stt": request_dict = { - "msg_type": "neon.get_stt", "data": { "audio_data": msg_data.pop("audio_data", msg_data["message_body"]), } } else: request_dict = { - "msg_type": "recognizer_loop:utterance", "data": { "utterances": [msg_data["message_body"]], }, @@ -473,9 +470,8 @@ def _handle_neon_recipient(self, recipient_data: dict, msg_data: dict): recipient_data.setdefault("context", {}) pattern = re.compile("Neon", re.IGNORECASE) msg_data["message_body"] = ( - pattern.sub("", msg_data["message_body"], 1).strip("<>@,.:|- \n") + pattern.sub("", msg_data["message_body"], 1).strip("<>@,.:|- ").capitalize() ) - # This is really referencing an MQ endpoint (i.e. stt, tts), not a skill msg_data.setdefault( "requested_skill", recipient_data["context"].pop("service", "recognizer") ) @@ -849,7 +845,9 @@ def on_subminds_state(self, body: dict): @create_mq_callback() def on_get_configured_personas(self, body: dict): - # Handles request to get all defined personas + """ + Handles requests to get all defined personas for a specific LLM service + """ response_data = self._fetch_persona_api(user_id=body.get("user_id")) response_data["items"] = [ item From 754e4c9c02d2adaed24a9dc307426145f3bc5ae6 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Tue, 26 Nov 2024 15:12:05 -0800 Subject: [PATCH 04/10] Refactor `notify_personas_changed` into `server_utils` submodule per review --- chat_server/blueprints/personas.py | 39 +++----------- chat_server/server_utils/socketio_utils.py | 63 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 33 deletions(-) create mode 100644 chat_server/server_utils/socketio_utils.py diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index 52f6bdfa..94104f9f 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -25,13 +25,10 @@ # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import json -from typing import List, Optional from fastapi import APIRouter from starlette.responses import JSONResponse -from chat_server.server_utils.api_dependencies import CurrentUserModel from chat_server.server_utils.enums import RequestModelType, UserRoles from chat_server.server_utils.http_exceptions import ( ItemNotFoundException, @@ -51,7 +48,7 @@ PersonaData, ) from chat_server.server_utils.api_dependencies.validators import permitted_access -from chat_server.sio.server import sio +from chat_server.server_utils.socketio_utils import notify_personas_changed from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI @@ -116,7 +113,7 @@ async def add_persona( if existing_model: raise DuplicatedItemException MongoDocumentsAPI.PERSONAS.add_item(data=request_model.model_dump()) - await _notify_personas_changed(request_model.supported_llms) + await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -136,7 +133,7 @@ async def set_persona( MongoDocumentsAPI.PERSONAS.update_item( filters=mongo_filter, data=request_model.model_dump() ) - await _notify_personas_changed(request_model.supported_llms) + await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -146,7 +143,7 @@ async def delete_persona( ): """Deletes persona""" MongoDocumentsAPI.PERSONAS.delete_item(item_id=request_model.persona_id) - await _notify_personas_changed() + await notify_personas_changed() return KlatAPIResponse.OK @@ -164,32 +161,8 @@ async def toggle_persona_state( ) if updated_data.matched_count == 0: raise ItemNotFoundException - await _notify_personas_changed() + await notify_personas_changed() return KlatAPIResponse.OK -async def _notify_personas_changed(supported_llms: Optional[List[str]] = None): - """ - Emit an SIO event for each LLM affected by a persona change. This sends a - complete set of personas rather than only the changed one to prevent sync - conflicts and simplify client-side logic. - :param supported_llms: List of LLM names affected by a transaction. If None, - then updates all LLMs listed in database configuration - """ - resp = await list_personas(CurrentUserModel(_id="", nickname="", - first_name="", last_name=""), - ListPersonasQueryModel(only_enabled=True)) - enabled_personas = json.loads(resp.body.decode()) - valid_personas = {} - if supported_llms: - # Only broadcast updates for LLMs affected by an insert/change request - for llm in supported_llms: - valid_personas[llm] = [per for per in enabled_personas["items"] if - llm in per["supported_llms"]] - else: - # Delete request does not have LLM context, update everything - for persona in enabled_personas["items"]: - for llm in persona["supported_llms"]: - valid_personas.setdefault(llm, []) - valid_personas[llm].append(persona) - sio.emit("configured_personas_changed", {"personas": valid_personas}) + diff --git a/chat_server/server_utils/socketio_utils.py b/chat_server/server_utils/socketio_utils.py new file mode 100644 index 00000000..b2d31fa4 --- /dev/null +++ b/chat_server/server_utils/socketio_utils.py @@ -0,0 +1,63 @@ +# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework +# All trademark and other rights reserved by their respective owners +# Copyright 2008-2022 Neongecko.com Inc. +# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, +# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo +# BSD-3 License +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from this +# software without specific prior written permission. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json + +from typing import Optional, List + +from chat_server.server_utils.api_dependencies import (CurrentUserModel, + ListPersonasQueryModel) +from chat_server.sio.server import sio + + +async def notify_personas_changed(supported_llms: Optional[List[str]] = None): + """ + Emit an SIO event for each LLM affected by a persona change. This sends a + complete set of personas rather than only the changed one to prevent sync + conflicts and simplify client-side logic. + :param supported_llms: List of LLM names affected by a transaction. If None, + then updates all LLMs listed in database configuration + """ + from chat_server.blueprints.personas import list_personas + resp = await list_personas(CurrentUserModel(_id="", nickname="", + first_name="", last_name=""), + ListPersonasQueryModel(only_enabled=True)) + enabled_personas = json.loads(resp.body.decode()) + valid_personas = {} + if supported_llms: + # Only broadcast updates for LLMs affected by an insert/change request + for llm in supported_llms: + valid_personas[llm] = [per for per in enabled_personas["items"] if + llm in per["supported_llms"]] + else: + # Delete request does not have LLM context, update everything + for persona in enabled_personas["items"]: + for llm in persona["supported_llms"]: + valid_personas.setdefault(llm, []) + valid_personas[llm].append(persona) + sio.emit("configured_personas_changed", {"personas": valid_personas}) From b3b56559d57e87582fc84fcac112551e59980513 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 2 Dec 2024 17:30:19 -0800 Subject: [PATCH 05/10] Add locking around `configured_personas_changed` to ensure timestamps are in the same order of persona responses Includes `update_time` in TTL cached query so cached responses include accurate timestamps --- chat_server/server_utils/socketio_utils.py | 42 +++++++++++++--------- services/klatchat_observer/controller.py | 5 ++- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/chat_server/server_utils/socketio_utils.py b/chat_server/server_utils/socketio_utils.py index b2d31fa4..2c4412ab 100644 --- a/chat_server/server_utils/socketio_utils.py +++ b/chat_server/server_utils/socketio_utils.py @@ -27,14 +27,19 @@ # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import json +from time import time from typing import Optional, List +from asyncio import Lock from chat_server.server_utils.api_dependencies import (CurrentUserModel, ListPersonasQueryModel) from chat_server.sio.server import sio +_LOCK = Lock() + + async def notify_personas_changed(supported_llms: Optional[List[str]] = None): """ Emit an SIO event for each LLM affected by a persona change. This sends a @@ -44,20 +49,23 @@ async def notify_personas_changed(supported_llms: Optional[List[str]] = None): then updates all LLMs listed in database configuration """ from chat_server.blueprints.personas import list_personas - resp = await list_personas(CurrentUserModel(_id="", nickname="", - first_name="", last_name=""), - ListPersonasQueryModel(only_enabled=True)) - enabled_personas = json.loads(resp.body.decode()) - valid_personas = {} - if supported_llms: - # Only broadcast updates for LLMs affected by an insert/change request - for llm in supported_llms: - valid_personas[llm] = [per for per in enabled_personas["items"] if - llm in per["supported_llms"]] - else: - # Delete request does not have LLM context, update everything - for persona in enabled_personas["items"]: - for llm in persona["supported_llms"]: - valid_personas.setdefault(llm, []) - valid_personas[llm].append(persona) - sio.emit("configured_personas_changed", {"personas": valid_personas}) + async with _LOCK: + resp = await list_personas(CurrentUserModel(_id="", nickname="", + first_name="", last_name=""), + ListPersonasQueryModel(only_enabled=True)) + update_time = time() + enabled_personas = json.loads(resp.body.decode()) + valid_personas = {} + if supported_llms: + # Only broadcast updates for LLMs affected by an insert/change request + for llm in supported_llms: + valid_personas[llm] = [per for per in enabled_personas["items"] if + llm in per["supported_llms"]] + else: + # Delete request does not have LLM context, update everything + for persona in enabled_personas["items"]: + for llm in persona["supported_llms"]: + valid_personas.setdefault(llm, []) + valid_personas[llm].append(persona) + sio.emit("configured_personas_changed", {"personas": valid_personas, + "update_time": update_time}) diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index e6381c3c..d87c40a4 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -896,6 +896,7 @@ def _fetch_persona_api(self, user_id: Optional[str]) -> dict: try: response = self._fetch_klat_server(url=url) data = response.json() + data['update_time'] = time.time() self._refresh_default_persona_llms(data=data) except KlatAPIAuthorizationError: LOG.error(f"Failed to fetch personas from {url = }") @@ -910,7 +911,9 @@ def _handle_personas_changed(self, data: dict): """ for llm, personas in data["personas"].items(): self.send_message( - request_data={"items": personas}, + request_data={ + "items": personas, + "update_time": data.get("update_time") or time.time()}, vhost=self.get_vhost("llm"), queue=f"{llm}_personas_input", expiration=5000, From f4c3d635dc84fffd63d03059b46f14efe1c73756 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Thu, 5 Dec 2024 12:13:54 -0800 Subject: [PATCH 06/10] Remove unnecessary newline changes --- chat_server/blueprints/personas.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index 94104f9f..2497459c 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -163,6 +163,3 @@ async def toggle_persona_state( raise ItemNotFoundException await notify_personas_changed() return KlatAPIResponse.OK - - - From 29eb7a227637f996a1ee400a049585ff51ef3b83 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 Jan 2025 14:06:45 -0800 Subject: [PATCH 07/10] Note limitation of `notify_personas_changed` implementation to address feedback --- chat_server/blueprints/personas.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index 2497459c..e004bbc9 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -113,6 +113,7 @@ async def add_persona( if existing_model: raise DuplicatedItemException MongoDocumentsAPI.PERSONAS.add_item(data=request_model.model_dump()) + # TODO: This will not scale to multiple server instances await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -133,6 +134,7 @@ async def set_persona( MongoDocumentsAPI.PERSONAS.update_item( filters=mongo_filter, data=request_model.model_dump() ) + # TODO: This will not scale to multiple server instances await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -143,6 +145,7 @@ async def delete_persona( ): """Deletes persona""" MongoDocumentsAPI.PERSONAS.delete_item(item_id=request_model.persona_id) + # TODO: This will not scale to multiple server instances await notify_personas_changed() return KlatAPIResponse.OK @@ -161,5 +164,6 @@ async def toggle_persona_state( ) if updated_data.matched_count == 0: raise ItemNotFoundException + # TODO: This will not scale to multiple server instances await notify_personas_changed() return KlatAPIResponse.OK From 8311f68133c88927569394f83032e3c474df4e8d Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 Jan 2025 14:20:29 -0800 Subject: [PATCH 08/10] Refactor `socketio_utils` to `persona_utils` Move `list_personas` logic to a util method Support no current user in `list_personas` to retrieve all personas with no user filtering --- chat_server/blueprints/personas.py | 30 ++------------ .../{socketio_utils.py => persona_utils.py} | 41 ++++++++++++++++--- 2 files changed, 38 insertions(+), 33 deletions(-) rename chat_server/server_utils/{socketio_utils.py => persona_utils.py} (69%) diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index e004bbc9..af85fbdb 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -48,7 +48,7 @@ PersonaData, ) from chat_server.server_utils.api_dependencies.validators import permitted_access -from chat_server.server_utils.socketio_utils import notify_personas_changed +from chat_server.server_utils.persona_utils import notify_personas_changed, list_personas as _list_personas from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI @@ -61,34 +61,10 @@ @router.get("/list") async def list_personas( current_user: CurrentUserData, - request_model: ListPersonasQueryModel = permitted_access(ListPersonasQueryModel), + request_model: ListPersonasQueryModel = permitted_access(ListPersonasQueryModel) ) -> JSONResponse: """Lists personas matching query params""" - filters = [] - if request_model.llms: - filters.append( - MongoFilter( - key="supported_llms", - value=request_model.llms, - logical_operator=MongoLogicalOperators.ALL, - ) - ) - if request_model.user_id and request_model.user_id != "*": - filters.append(MongoFilter(key="user_id", value=request_model.user_id)) - else: - user_filter = [{"user_id": None}, {"user_id": current_user.user_id}] - filters.append( - MongoFilter(value=user_filter, logical_operator=MongoLogicalOperators.OR) - ) - if request_model.only_enabled: - filters.append(MongoFilter(key="enabled", value=True)) - items = MongoDocumentsAPI.PERSONAS.list_items( - filters=filters, result_as_cursor=False - ) - for item in items: - item["id"] = item.pop("_id") - item["enabled"] = item.get("enabled", False) - return JSONResponse(content={"items": items}) + return await _list_personas(current_user, request_model) @router.get("/get/{persona_id}") diff --git a/chat_server/server_utils/socketio_utils.py b/chat_server/server_utils/persona_utils.py similarity index 69% rename from chat_server/server_utils/socketio_utils.py rename to chat_server/server_utils/persona_utils.py index 2c4412ab..44475402 100644 --- a/chat_server/server_utils/socketio_utils.py +++ b/chat_server/server_utils/persona_utils.py @@ -32,10 +32,12 @@ from typing import Optional, List from asyncio import Lock -from chat_server.server_utils.api_dependencies import (CurrentUserModel, - ListPersonasQueryModel) -from chat_server.sio.server import sio +from starlette.responses import JSONResponse +from chat_server.server_utils.api_dependencies import ListPersonasQueryModel, CurrentUserData +from chat_server.sio.server import sio +from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators +from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI _LOCK = Lock() @@ -48,10 +50,8 @@ async def notify_personas_changed(supported_llms: Optional[List[str]] = None): :param supported_llms: List of LLM names affected by a transaction. If None, then updates all LLMs listed in database configuration """ - from chat_server.blueprints.personas import list_personas async with _LOCK: - resp = await list_personas(CurrentUserModel(_id="", nickname="", - first_name="", last_name=""), + resp = await list_personas(None, ListPersonasQueryModel(only_enabled=True)) update_time = time() enabled_personas = json.loads(resp.body.decode()) @@ -69,3 +69,32 @@ async def notify_personas_changed(supported_llms: Optional[List[str]] = None): valid_personas[llm].append(persona) sio.emit("configured_personas_changed", {"personas": valid_personas, "update_time": update_time}) + + +async def list_personas(current_user: CurrentUserData, + request_model: ListPersonasQueryModel) -> JSONResponse: + filters = [] + if request_model.llms: + filters.append( + MongoFilter( + key="supported_llms", + value=request_model.llms, + logical_operator=MongoLogicalOperators.ALL, + ) + ) + if request_model.user_id and request_model.user_id != "*": + filters.append(MongoFilter(key="user_id", value=request_model.user_id)) + elif current_user: + user_filter = [{"user_id": None}, {"user_id": current_user.user_id}] + filters.append( + MongoFilter(value=user_filter, logical_operator=MongoLogicalOperators.OR) + ) + if request_model.only_enabled: + filters.append(MongoFilter(key="enabled", value=True)) + items = MongoDocumentsAPI.PERSONAS.list_items( + filters=filters, result_as_cursor=False + ) + for item in items: + item["id"] = item.pop("_id") + item["enabled"] = item.get("enabled", False) + return JSONResponse(content={"items": items}) \ No newline at end of file From 8960e39c8e5839438c473116ef7f90eebd29749a Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Mon, 6 Jan 2025 14:21:03 -0800 Subject: [PATCH 09/10] Round `time` to an `int` per review --- chat_server/server_utils/persona_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chat_server/server_utils/persona_utils.py b/chat_server/server_utils/persona_utils.py index 44475402..639130c7 100644 --- a/chat_server/server_utils/persona_utils.py +++ b/chat_server/server_utils/persona_utils.py @@ -53,7 +53,7 @@ async def notify_personas_changed(supported_llms: Optional[List[str]] = None): async with _LOCK: resp = await list_personas(None, ListPersonasQueryModel(only_enabled=True)) - update_time = time() + update_time = round(time()) enabled_personas = json.loads(resp.body.decode()) valid_personas = {} if supported_llms: From f418abedacf96e8ce10ea601a9bfc8ba5adb732b Mon Sep 17 00:00:00 2001 From: NeonKirill Date: Mon, 27 Jan 2025 17:13:02 +0100 Subject: [PATCH 10/10] Refactored event-driven persona updates to work per single persona update --- chat_server/blueprints/personas.py | 35 ++++-- chat_server/server_utils/persona_utils.py | 100 ------------------ chat_server/sio/handlers/user_message.py | 1 + services/klatchat_observer/controller.py | 47 ++++++-- .../mongo_utils/queries/dao/abc.py | 13 ++- 5 files changed, 73 insertions(+), 123 deletions(-) delete mode 100644 chat_server/server_utils/persona_utils.py diff --git a/chat_server/blueprints/personas.py b/chat_server/blueprints/personas.py index af85fbdb..d5cf13f8 100644 --- a/chat_server/blueprints/personas.py +++ b/chat_server/blueprints/personas.py @@ -48,7 +48,6 @@ PersonaData, ) from chat_server.server_utils.api_dependencies.validators import permitted_access -from chat_server.server_utils.persona_utils import notify_personas_changed, list_personas as _list_personas from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI @@ -64,7 +63,31 @@ async def list_personas( request_model: ListPersonasQueryModel = permitted_access(ListPersonasQueryModel) ) -> JSONResponse: """Lists personas matching query params""" - return await _list_personas(current_user, request_model) + filters = [] + if request_model.llms: + filters.append( + MongoFilter( + key="supported_llms", + value=request_model.llms, + logical_operator=MongoLogicalOperators.ALL, + ) + ) + if request_model.user_id and request_model.user_id != "*": + filters.append(MongoFilter(key="user_id", value=request_model.user_id)) + elif current_user: + user_filter = [{"user_id": None}, {"user_id": current_user.user_id}] + filters.append( + MongoFilter(value=user_filter, logical_operator=MongoLogicalOperators.OR) + ) + if request_model.only_enabled: + filters.append(MongoFilter(key="enabled", value=True)) + items = MongoDocumentsAPI.PERSONAS.list_items( + filters=filters, result_as_cursor=False + ) + for item in items: + item["id"] = item.pop("_id") + item["enabled"] = item.get("enabled", False) + return JSONResponse(content={"items": items}) @router.get("/get/{persona_id}") @@ -89,8 +112,6 @@ async def add_persona( if existing_model: raise DuplicatedItemException MongoDocumentsAPI.PERSONAS.add_item(data=request_model.model_dump()) - # TODO: This will not scale to multiple server instances - await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -110,8 +131,6 @@ async def set_persona( MongoDocumentsAPI.PERSONAS.update_item( filters=mongo_filter, data=request_model.model_dump() ) - # TODO: This will not scale to multiple server instances - await notify_personas_changed(request_model.supported_llms) return KlatAPIResponse.OK @@ -121,8 +140,6 @@ async def delete_persona( ): """Deletes persona""" MongoDocumentsAPI.PERSONAS.delete_item(item_id=request_model.persona_id) - # TODO: This will not scale to multiple server instances - await notify_personas_changed() return KlatAPIResponse.OK @@ -140,6 +157,4 @@ async def toggle_persona_state( ) if updated_data.matched_count == 0: raise ItemNotFoundException - # TODO: This will not scale to multiple server instances - await notify_personas_changed() return KlatAPIResponse.OK diff --git a/chat_server/server_utils/persona_utils.py b/chat_server/server_utils/persona_utils.py deleted file mode 100644 index 639130c7..00000000 --- a/chat_server/server_utils/persona_utils.py +++ /dev/null @@ -1,100 +0,0 @@ -# NEON AI (TM) SOFTWARE, Software Development Kit & Application Framework -# All trademark and other rights reserved by their respective owners -# Copyright 2008-2022 Neongecko.com Inc. -# Contributors: Daniel McKnight, Guy Daniels, Elon Gasper, Richard Leeds, -# Regina Bloomstine, Casimiro Ferreira, Andrii Pernatii, Kirill Hrymailo -# BSD-3 License -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# 3. Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, -# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import json -from time import time - -from typing import Optional, List -from asyncio import Lock - -from starlette.responses import JSONResponse - -from chat_server.server_utils.api_dependencies import ListPersonasQueryModel, CurrentUserData -from chat_server.sio.server import sio -from utils.database_utils.mongo_utils import MongoFilter, MongoLogicalOperators -from utils.database_utils.mongo_utils.queries.wrapper import MongoDocumentsAPI - -_LOCK = Lock() - - -async def notify_personas_changed(supported_llms: Optional[List[str]] = None): - """ - Emit an SIO event for each LLM affected by a persona change. This sends a - complete set of personas rather than only the changed one to prevent sync - conflicts and simplify client-side logic. - :param supported_llms: List of LLM names affected by a transaction. If None, - then updates all LLMs listed in database configuration - """ - async with _LOCK: - resp = await list_personas(None, - ListPersonasQueryModel(only_enabled=True)) - update_time = round(time()) - enabled_personas = json.loads(resp.body.decode()) - valid_personas = {} - if supported_llms: - # Only broadcast updates for LLMs affected by an insert/change request - for llm in supported_llms: - valid_personas[llm] = [per for per in enabled_personas["items"] if - llm in per["supported_llms"]] - else: - # Delete request does not have LLM context, update everything - for persona in enabled_personas["items"]: - for llm in persona["supported_llms"]: - valid_personas.setdefault(llm, []) - valid_personas[llm].append(persona) - sio.emit("configured_personas_changed", {"personas": valid_personas, - "update_time": update_time}) - - -async def list_personas(current_user: CurrentUserData, - request_model: ListPersonasQueryModel) -> JSONResponse: - filters = [] - if request_model.llms: - filters.append( - MongoFilter( - key="supported_llms", - value=request_model.llms, - logical_operator=MongoLogicalOperators.ALL, - ) - ) - if request_model.user_id and request_model.user_id != "*": - filters.append(MongoFilter(key="user_id", value=request_model.user_id)) - elif current_user: - user_filter = [{"user_id": None}, {"user_id": current_user.user_id}] - filters.append( - MongoFilter(value=user_filter, logical_operator=MongoLogicalOperators.OR) - ) - if request_model.only_enabled: - filters.append(MongoFilter(key="enabled", value=True)) - items = MongoDocumentsAPI.PERSONAS.list_items( - filters=filters, result_as_cursor=False - ) - for item in items: - item["id"] = item.pop("_id") - item["enabled"] = item.get("enabled", False) - return JSONResponse(content={"items": items}) \ No newline at end of file diff --git a/chat_server/sio/handlers/user_message.py b/chat_server/sio/handlers/user_message.py index 609732ff..21bf6619 100644 --- a/chat_server/sio/handlers/user_message.py +++ b/chat_server/sio/handlers/user_message.py @@ -184,6 +184,7 @@ async def broadcast(sid, data): msg_type = data.pop("msg_type", None) msg_receivers = data.pop("to", None) if msg_type: + LOG.info(f"received broadcast message - {msg_type}") await sio.emit( msg_type, data=data, diff --git a/services/klatchat_observer/controller.py b/services/klatchat_observer/controller.py index d87c40a4..8cb1af54 100644 --- a/services/klatchat_observer/controller.py +++ b/services/klatchat_observer/controller.py @@ -353,8 +353,11 @@ def register_sio_handlers(self): ) self._sio.on("prompts_data_updated", handler=self.forward_prompts_data_update) self._sio.on("auth_expired", handler=self._handle_auth_expired) - self._sio.on("configured_personas_changed", - handler=self._handle_personas_changed) + # Persona update events + self._sio.on("persona_updated", + handler=self._handle_persona_updated) + self._sio.on("persona_deleted", + handler=self._handle_persona_deleted) def connect_sio(self): """ @@ -903,19 +906,47 @@ def _fetch_persona_api(self, user_id: Optional[str]) -> dict: data = {"items": []} return data - def _handle_personas_changed(self, data: dict): + def _handle_persona_updated(self, data: dict): """ SIO handler called when configured personas are modified. This emits an MQ message to allow any connected listeners to maintain a set of known personas. + + If list of supported llms excludes personas from the previous configuration - emits "delete persona" event to those LLMs + """ + old_state = data.get('old_state', {}) + new_state = data.get('new_state', {}) + + excluded_supported_llms = set(old_state['supported_llms']) - set(new_state['supported_llms']) + + if excluded_supported_llms: + self._handle_persona_deleted(data={ + "persona_name": old_state['persona_name'], + "user_id": old_state['user_id'], + "supported_llms": excluded_supported_llms, + }) + + for llm in new_state['supported_llms']: + self.send_message( + request_data=new_state, + vhost=self.get_vhost("llm"), + exchange=f"{llm}_persona_updated", + exchange_type=ExchangeType.fanout, + expiration=5000, + ) + + def _handle_persona_deleted(self, data: dict): + """ + SIO handler called when configured personas are deleted. This emits an + MQ message to allow any connected listeners to maintain a set of known + personas. """ - for llm, personas in data["personas"].items(): + for llm in data.pop('supported_llms', []): self.send_message( - request_data={ - "items": personas, - "update_time": data.get("update_time") or time.time()}, + request_data=data, vhost=self.get_vhost("llm"), - queue=f"{llm}_personas_input", + exchange=f"{llm}_persona_deleted", + exchange_type=ExchangeType.fanout, expiration=5000, ) diff --git a/utils/database_utils/mongo_utils/queries/dao/abc.py b/utils/database_utils/mongo_utils/queries/dao/abc.py index b542ad7d..4129a0bc 100644 --- a/utils/database_utils/mongo_utils/queries/dao/abc.py +++ b/utils/database_utils/mongo_utils/queries/dao/abc.py @@ -29,6 +29,8 @@ from abc import ABC, abstractmethod import pymongo +import pymongo.results + from neon_sftp import NeonSFTPConnector from utils.database_utils import DatabaseController @@ -149,7 +151,8 @@ def _build_list_items_filter( mongo_filters.append(contains_filter) return mongo_filters - def _build_contains_filter(self, key, lookup_set) -> MongoFilter | None: + @staticmethod + def _build_contains_filter(key, lookup_set) -> MongoFilter | None: mongo_filter = None if key and lookup_set: lookup_set = list(set(lookup_set)) @@ -160,13 +163,13 @@ def _build_contains_filter(self, key, lookup_set) -> MongoFilter | None: ) return mongo_filter - def add_item(self, data: dict) -> bool: + def add_item(self, data: dict) -> pymongo.results.InsertOneResult: """Inserts provided data into the object's document""" return self._execute_query(command=MongoCommands.INSERT_ONE, data=data) def update_item( self, filters: list[dict | MongoFilter], data: dict, data_action: str = "set" - ) -> bool: + ) -> pymongo.results.UpdateResult: """Updates provided data into the object's document""" return self._execute_query( command=MongoCommands.UPDATE_ONE, @@ -177,7 +180,7 @@ def update_item( def update_items( self, filters: list[dict | MongoFilter], data: dict, data_action: str = "set" - ) -> bool: + ) -> pymongo.results.UpdateResult: """Updates provided data into the object's documents""" return self._execute_query( command=MongoCommands.UPDATE_MANY, @@ -196,7 +199,7 @@ def get_item( def delete_item( self, item_id: str = None, filters: list[dict | MongoFilter] = None - ) -> None: + ) -> pymongo.results.DeleteResult: filters = self._build_item_selection_filters(item_id=item_id, filters=filters) if not filters: raise