Skip to content

Commit

Permalink
feat: add state machine for showcasing command parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
m-barker committed Mar 19, 2024
1 parent 5225ac9 commit 3009811
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 5 deletions.
3 changes: 2 additions & 1 deletion skills/src/lasr_skills/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
from .listen_for import ListenFor
from .play_motion import PlayMotion
from .receive_object import ReceiveObject
from .handover_object import HandoverObject
from .handover_object import HandoverObject
from .ask_and_listen import AskAndListen
1 change: 1 addition & 0 deletions tasks/gpsr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ include_directories(
catkin_install_python(PROGRAMS
scripts/parse_gpsr_xmls.py
nodes/commands/question_answer
nodes/command_parser
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

Expand Down
172 changes: 172 additions & 0 deletions tasks/gpsr/nodes/command_parser
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/usr/bin/env python3
import argparse
import smach
import rospy
from typing import Dict

from gpsr.load_known_data import GPSRDataLoader
from gpsr.regex_command_parser import Configuration, gpsr_compile_and_parse
from lasr_skills import AskAndListen, Say

ENGISH_MAPPING = {
"goToLoc": "Go to location",
"findPrsInRoom": "Find people in room",
"meetPrsAtBeac": "Meet person at beacon",
"countPrsInRoom": "Count people in room",
"tellPrsInfoInLoc": "Tell person information in location",
"talkInfoToGestPrsInRoom": "Talk information to guest in room",
"answerToGestPrsInRoom": "Answer to guest in room",
"followNameFromBeacToRoom": "Follow name from beacon to room",
"guideNameFromBeacToBeac": "Guide name from beacon to beacon",
"guidePrsFromBeacToBeac": "Guide person from beacon to beacon",
"guideClothPrsFromBeacToBeac": "Guide clothed person from beacon to beacon",
"greetClothDscInRm": "Greet clothed description in room",
"greetNameInRm": "Greet name in room",
"meetNameAtLocThenFindInRm": "Meet name at location then find in room",
"countClothPrsInRoom": "Count clothed people in room",
"tellPrsInfoAtLocToPrsAtLoc": "Tell person information at location to person at location",
"followPrsAtLoc": "Follow person at location",
"takeObjFromPlcmt": "Take object from placement",
"findObjInRoom": "Find object in room",
"countObjOnPlcmt": "Count object on placement",
"tellObjPropOnPlcmt": "Tell object properties on placement",
"bringMeObjFromPlcmt": "Bring me object from placement",
"tellCatPropOnPlcmt": "Tell category properties on placement",
}


def parse_args() -> Dict:
parser = argparse.ArgumentParser(description="GPSR Command Parser")
parser.add_argument(
"--data-dir",
type=str,
default="../data/mock_data/",
help="Path to the directory that contains the data json files.",
)
known, unknown = parser.parse_known_args()
return vars(known)


class ParseCommand(smach.State):
def __init__(self, data_config: Configuration):
smach.State.__init__(
self,
outcomes=["succeeded", "failed"],
input_keys=["transcribed_speech"],
output_keys=["command"],
)
self.data_config = data_config

def execute(self, userdata):
try:
userdata.command = gpsr_compile_and_parse(
self.data_config, userdata.transcribed_speech.lower()
)
except Exception as e:
rospy.logerr(e)
return "failed"
return "succeeded"


class OutputParsedCommand(smach.State):
def __init__(self):
smach.State.__init__(
self,
outcomes=["succeeded", "failed"],
input_keys=["command"],
output_keys=["command_string"],
)

def execute(self, userdata):
try:
command = userdata.command
english_command = ENGISH_MAPPING[command["command"]]
command_parameters = command[command["command"]]
rospy.loginfo(f"Command: {english_command}")
rospy.loginfo(f"Parameters: {command_parameters}")
tts_phrase = f"I parsed the command as you want me to: {english_command}, with the following parameters:"
for key, value in command_parameters.items():
if isinstance(value, list):
value = " and ".join(value)
tts_phrase += f" {key}: {value},"
except Exception as e:
rospy.logerr(e)
return "failed"
rospy.loginfo(tts_phrase)
userdata.command_string = tts_phrase
return "succeeded"


class CommandParserStateMachine(smach.StateMachine):
def __init__(self, config: Configuration):
smach.StateMachine.__init__(
self,
outcomes=["succeeded", "failed"],
input_keys=["tts_phrase", "command_string"],
output_keys=["command"],
)
self.config = config
with self:
smach.StateMachine.add(
"GET_COMMAND",
AskAndListen(),
transitions={"succeeded": "PARSE_COMMAND", "failed": "GET_COMMAND"},
remapping={
"tts_phrase": "tts_phrase",
"transcribed_speech": "transcribed_speech",
},
)
smach.StateMachine.add(
"PARSE_COMMAND",
ParseCommand(data_config=self.config),
transitions={
"succeeded": "OUTPUT_PARSED_COMMAND",
"failed": "GET_COMMAND",
},
remapping={
"transcribed_speech": "transcribed_speech",
"command": "command",
},
)
smach.StateMachine.add(
"OUTPUT_PARSED_COMMAND",
OutputParsedCommand(),
transitions={
"succeeded": "SAY_PARSED_COMMAND",
"failed": "GET_COMMAND",
},
remapping={"command": "command", "tts_phrase": "tts_phrase"},
)
smach.StateMachine.add(
"SAY_PARSED_COMMAND",
Say(),
transitions={
"succeeded": "GET_COMMAND",
"aborted": "GET_COMMAND",
"preempted": "GET_COMMAND",
},
remapping={"text": "command_string"},
)


if __name__ == "__main__":
rospy.init_node("gpsr_command_parser")
args = parse_args()
data_loader = GPSRDataLoader(data_dir=args["data_dir"])
gpsr_known_data: Dict = data_loader.load_data()
config = Configuration(
{
"person_names": gpsr_known_data["names"],
"location_names": gpsr_known_data["non_placeable_locations"],
"placement_location_names": gpsr_known_data["placeable_locations"],
"room_names": gpsr_known_data["rooms"],
"object_names": gpsr_known_data["objects"],
"object_categories_plural": gpsr_known_data["categories_plural"],
"object_categories_singular": gpsr_known_data["categories_singular"],
}
)
rospy.loginfo("GPSR Command Parser: Initialized")
sm = CommandParserStateMachine(config)
sm.userdata.tts_phrase = "I am ready to receive a command; ask away!"
result = sm.execute()
rospy.spin()
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ def _load_locations(self) -> Tuple[List[str], List[str]]:
for location, attributes in locations.items():
if attributes["placeable"]:
placeable_locations.append(location)
else:
non_placeable_locations.append(location)
non_placeable_locations.append(location)

return placeable_locations, non_placeable_locations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,22 @@ def gpsr_parse(matches: Dict[str, str]):
write_into[actual_key] = value
else:
write_into[key] = value

return result


def gpsr_compile_and_parse(config: Configuration, input: str) -> dict:
return gpsr_parse(re.compile(gpsr_regex(config)).match(input).groupdict())
input = input.lower()
# remove punctuation
input = re.sub(r"[^\w\s]", "", input)
print(input)
if input[0] == " ":
input = input[1:]

regex_str = gpsr_regex(config)
regex = re.compile(regex_str)
matches = regex.match(input)
matches = matches.groupdict()
return gpsr_parse(matches)


if __name__ == "__main__":
Expand Down

0 comments on commit 3009811

Please sign in to comment.