diff --git a/ovos_core/intent_services/ocp_service.py b/ovos_core/intent_services/ocp_service.py index a2985f31f1f5..16671213b248 100644 --- a/ovos_core/intent_services/ocp_service.py +++ b/ovos_core/intent_services/ocp_service.py @@ -564,7 +564,7 @@ def handle_play_intent(self, message: Message): skills=skills, message=message) # tell OCP to play - self.bus.emit(Message('ovos.common_play.reset')) + self.bus.emit(message.forward('ovos.common_play.reset')) if not results: self.speak_dialog("cant.play", data={"phrase": query, @@ -580,10 +580,7 @@ def handle_play_intent(self, message: Message): LOG.debug(f"OCP Best match: {best}") results = [r for r in results if r.as_dict != best.as_dict] results.insert(0, best) - self.bus.emit(Message('add_context', - {'context': "Playing", - 'word': "", - 'origin': OCP_ID})) + self.set_context("Playing", origin=OCP_ID) # ovos-PHAL-plugin-mk1 will display music icon in response to play message player = self.get_player(message) @@ -896,7 +893,7 @@ def filter_results(self, results: list, phrase: str, lang: str, def _search(self, phrase: str, media_type: MediaType, lang: str, skills: Optional[List[str]] = None, message: Optional[Message] = None) -> list: - self.bus.emit(Message("ovos.common_play.search.start")) + self.bus.emit(message.reply("ovos.common_play.search.start")) self.enclosure.mouth_think() # animate mk1 mouth during search # Now we place a query on the messsagebus for anyone who wants to @@ -904,7 +901,8 @@ def _search(self, phrase: str, media_type: MediaType, lang: str, results = [] for r in self._execute_query(phrase, media_type=media_type, - skills=skills): + skills=skills, + message=message): results += r["results"] results = self.normalize_results(results) @@ -917,18 +915,19 @@ def _search(self, phrase: str, media_type: MediaType, lang: str, else: # no filtering if skill explicitly requested LOG.debug(f"Got {len(results)} usable results from {skills}") - self.bus.emit(Message("ovos.common_play.search.end")) + self.bus.emit(message.reply("ovos.common_play.search.end")) return results def _execute_query(self, phrase: str, media_type: MediaType = Union[int, MediaType], - skills: Optional[List[str]] = None) -> list: + skills: Optional[List[str]] = None, + message: Optional[Message] = None) -> list: """ actually send the search to OCP skills""" media_type = self._normalize_media_enum(media_type) with self.search_lock: # stop any search still happening - self.bus.emit(Message("ovos.common_play.search.stop")) + self.bus.emit(message.reply("ovos.common_play.search.stop")) query = OCPQuery(query=phrase, media_type=media_type, config=self.config, bus=self.bus) @@ -940,7 +939,7 @@ def _execute_query(self, phrase: str, LOG.debug(f"{skill_id} can't handle {media_type} queries") continue LOG.debug(f"Searching OCP Skill: {skill_id}") - query.send(skill_id) + query.send(skill_id, source_message=message) query.wait() results += query.results diff --git a/test/end2end/routing/test_session.py b/test/end2end/routing/test_session.py index 989892792d97..65d98e7c2aaf 100644 --- a/test/end2end/routing/test_session.py +++ b/test/end2end/routing/test_session.py @@ -1,6 +1,7 @@ import time from time import sleep from unittest import TestCase +from ovos_core.intent_services.ocp_service import PlayerState, MediaState, OCPPlayerProxy from ovos_bus_client.message import Message from ovos_bus_client.session import SessionManager, Session @@ -81,3 +82,104 @@ def wait_for_n_messages(n): else: self.assertEqual(m.context["source"], "B") self.assertEqual(m.context["destination"], "A") + + +class TestOCPRouting(TestCase): + + def setUp(self): + self.skill_id = "skill-fake-fm.openvoiceos" + self.core = get_minicroft(self.skill_id) + + def tearDown(self) -> None: + self.core.stop() + + def test_no_session(self): + self.assertIsNotNone(self.core.intent_service.ocp) + messages = [] + + def new_msg(msg): + nonlocal messages + m = Message.deserialize(msg) + if m.msg_type in ["gui.status.request", + "ovos.common_play.status", + "ovos.skills.settings_changed"]: + return # skip these + messages.append(m) + print(len(messages), m.msg_type, m.context.get("source"), m.context.get("destination")) + + def wait_for_n_messages(n): + nonlocal messages + t = time.time() + while len(messages) < n: + sleep(0.1) + if time.time() - t > 10: + raise RuntimeError("did not get the number of expected messages under 10 seconds") + + self.core.bus.on("message", new_msg) + + sess = Session("test-session", + pipeline=[ + "converse", + "ocp_high" + ]) + self.core.intent_service.ocp.ocp_sessions[sess.session_id] = OCPPlayerProxy( + session_id=sess.session_id, available_extractors=[], ocp_available=True, + player_state=PlayerState.STOPPED, media_state=MediaState.NO_MEDIA) + utt = Message("recognizer_loop:utterance", + {"utterances": ["play some radio station"]}, + {"session": sess.serialize(), # explicit + "source": "A", "destination": "B"}) + self.core.bus.emit(utt) + + # confirm all expected messages are sent + expected_messages = [ + "recognizer_loop:utterance", + "intent.service.skills.activated", + "ovos.common_play.activate", + "ocp:play", + "enclosure.active_skill", + "speak", + "ovos.common_play.search.start", + "enclosure.mouth.think", + "ovos.common_play.search.stop", # any ongoing previous search + "ovos.common_play.query", # media type radio + # skill searching (radio) + "ovos.common_play.skill.search_start", + "ovos.common_play.query.response", + "ovos.common_play.query.response", + "ovos.common_play.query.response", + "ovos.common_play.query.response", + "ovos.common_play.query.response", + "ovos.common_play.skill.search_end", + "ovos.common_play.search.end", + # good results because of radio media type + "ovos.common_play.reset", + "add_context", # NowPlaying context + "ovos.common_play.play", # OCP api, + "ovos.utterance.handled" # handle_utterance returned (intent service) + ] + wait_for_n_messages(len(expected_messages)) + + self.assertEqual(len(expected_messages), len(messages)) + + for idx, m in enumerate(messages): + self.assertEqual(m.msg_type, expected_messages[idx]) + + # verify that source and destination are swapped after utterance + for m in messages: + if m.msg_type in ["recognizer_loop:utterance"]: + self.assertEqual(m.context["source"], "A") + self.assertEqual(m.context["destination"], "B") + elif m.msg_type in ["ovos.common_play.play", + "ovos.common_play.reset", + "ovos.common_play.query"]: + # OCP messages that should make it to the client + self.assertEqual(m.context["source"], "B") + self.assertEqual(m.context["destination"], "A") + elif m.msg_type.startswith("ovos.common_play"): + # internal search messages, should not leak to external clients + self.assertEqual(messages[0].context["source"], "A") + self.assertEqual(messages[0].context["destination"], "B") + else: + self.assertEqual(m.context["source"], "B") + self.assertEqual(m.context["destination"], "A")