Skip to content

Commit

Permalink
fix/ocp_missing_context
Browse files Browse the repository at this point in the history
the OCP query messages were dropping message source and destination
  • Loading branch information
JarbasAl committed Jul 12, 2024
1 parent 6a6ebbb commit 22c2cf0
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 11 deletions.
21 changes: 10 additions & 11 deletions ovos_core/intent_services/ocp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def handle_play_intent(self, message: Message):
skills=skills, message=message)

# tell OCP to play
self.bus.emit(Message('ovos.common_play.reset'))
self.bus.emit(message.forward('ovos.common_play.reset'))
if not results:
self.speak_dialog("cant.play",
data={"phrase": query,
Expand All @@ -580,10 +580,7 @@ def handle_play_intent(self, message: Message):
LOG.debug(f"OCP Best match: {best}")
results = [r for r in results if r.as_dict != best.as_dict]
results.insert(0, best)
self.bus.emit(Message('add_context',
{'context': "Playing",
'word': "",
'origin': OCP_ID}))
self.set_context("Playing", origin=OCP_ID)

# ovos-PHAL-plugin-mk1 will display music icon in response to play message
player = self.get_player(message)
Expand Down Expand Up @@ -896,15 +893,16 @@ def filter_results(self, results: list, phrase: str, lang: str,
def _search(self, phrase: str, media_type: MediaType, lang: str,
skills: Optional[List[str]] = None,
message: Optional[Message] = None) -> list:
self.bus.emit(Message("ovos.common_play.search.start"))
self.bus.emit(message.reply("ovos.common_play.search.start"))
self.enclosure.mouth_think() # animate mk1 mouth during search

# Now we place a query on the messsagebus for anyone who wants to
# attempt to service a 'play.request' message.
results = []
for r in self._execute_query(phrase,
media_type=media_type,
skills=skills):
skills=skills,
message=message):
results += r["results"]

results = self.normalize_results(results)
Expand All @@ -917,18 +915,19 @@ def _search(self, phrase: str, media_type: MediaType, lang: str,
else: # no filtering if skill explicitly requested
LOG.debug(f"Got {len(results)} usable results from {skills}")

self.bus.emit(Message("ovos.common_play.search.end"))
self.bus.emit(message.reply("ovos.common_play.search.end"))
return results

def _execute_query(self, phrase: str,
media_type: MediaType = Union[int, MediaType],
skills: Optional[List[str]] = None) -> list:
skills: Optional[List[str]] = None,
message: Optional[Message] = None) -> list:
""" actually send the search to OCP skills"""
media_type = self._normalize_media_enum(media_type)

with self.search_lock:
# stop any search still happening
self.bus.emit(Message("ovos.common_play.search.stop"))
self.bus.emit(message.reply("ovos.common_play.search.stop"))

query = OCPQuery(query=phrase, media_type=media_type,
config=self.config, bus=self.bus)
Expand All @@ -940,7 +939,7 @@ def _execute_query(self, phrase: str,
LOG.debug(f"{skill_id} can't handle {media_type} queries")
continue
LOG.debug(f"Searching OCP Skill: {skill_id}")
query.send(skill_id)
query.send(skill_id, source_message=message)
query.wait()
results += query.results

Expand Down
102 changes: 102 additions & 0 deletions test/end2end/routing/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from time import sleep
from unittest import TestCase
from ovos_core.intent_services.ocp_service import PlayerState, MediaState, OCPPlayerProxy

from ovos_bus_client.message import Message
from ovos_bus_client.session import SessionManager, Session
Expand Down Expand Up @@ -81,3 +82,104 @@ def wait_for_n_messages(n):
else:
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")


class TestOCPRouting(TestCase):

def setUp(self):
self.skill_id = "skill-fake-fm.openvoiceos"
self.core = get_minicroft(self.skill_id)

def tearDown(self) -> None:
self.core.stop()

def test_no_session(self):
self.assertIsNotNone(self.core.intent_service.ocp)
messages = []

def new_msg(msg):
nonlocal messages
m = Message.deserialize(msg)
if m.msg_type in ["gui.status.request",
"ovos.common_play.status",
"ovos.skills.settings_changed"]:
return # skip these
messages.append(m)
print(len(messages), m.msg_type, m.context.get("source"), m.context.get("destination"))

def wait_for_n_messages(n):
nonlocal messages
t = time.time()
while len(messages) < n:
sleep(0.1)
if time.time() - t > 10:
raise RuntimeError("did not get the number of expected messages under 10 seconds")

self.core.bus.on("message", new_msg)

sess = Session("test-session",
pipeline=[
"converse",
"ocp_high"
])
self.core.intent_service.ocp.ocp_sessions[sess.session_id] = OCPPlayerProxy(
session_id=sess.session_id, available_extractors=[], ocp_available=True,
player_state=PlayerState.STOPPED, media_state=MediaState.NO_MEDIA)
utt = Message("recognizer_loop:utterance",
{"utterances": ["play some radio station"]},
{"session": sess.serialize(), # explicit
"source": "A", "destination": "B"})
self.core.bus.emit(utt)

# confirm all expected messages are sent
expected_messages = [
"recognizer_loop:utterance",
"intent.service.skills.activated",
"ovos.common_play.activate",
"ocp:play",
"enclosure.active_skill",
"speak",
"ovos.common_play.search.start",
"enclosure.mouth.think",
"ovos.common_play.search.stop", # any ongoing previous search
"ovos.common_play.query", # media type radio
# skill searching (radio)
"ovos.common_play.skill.search_start",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.skill.search_end",
"ovos.common_play.search.end",
# good results because of radio media type
"ovos.common_play.reset",
"add_context", # NowPlaying context
"ovos.common_play.play", # OCP api,
"ovos.utterance.handled" # handle_utterance returned (intent service)
]
wait_for_n_messages(len(expected_messages))

self.assertEqual(len(expected_messages), len(messages))

for idx, m in enumerate(messages):
self.assertEqual(m.msg_type, expected_messages[idx])

# verify that source and destination are swapped after utterance
for m in messages:
if m.msg_type in ["recognizer_loop:utterance"]:
self.assertEqual(m.context["source"], "A")
self.assertEqual(m.context["destination"], "B")
elif m.msg_type in ["ovos.common_play.play",
"ovos.common_play.reset",
"ovos.common_play.query"]:
# OCP messages that should make it to the client
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")
elif m.msg_type.startswith("ovos.common_play"):
# internal search messages, should not leak to external clients
self.assertEqual(messages[0].context["source"], "A")
self.assertEqual(messages[0].context["destination"], "B")
else:
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")

0 comments on commit 22c2cf0

Please sign in to comment.