Skip to content

Commit

Permalink
Merge pull request #5 from icsi-berkeley/process_killing
Browse files Browse the repository at this point in the history
kills agents gracefully
  • Loading branch information
vivekraghuram authored Mar 8, 2017
2 parents 5591f19 + 338aecf commit 257dc68
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 47 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ __pycache__/

*.pyc
*.pyc
*.DS_Store
13 changes: 9 additions & 4 deletions src/main/nluas/Transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TransportSecurityError(TransportError):
######################################################################
#
# The main class for Transport. On creation, sets up a thread to
# listen for incoming messages.
# listen for incoming messages.
#

class Transport():
Expand All @@ -107,6 +107,11 @@ def send(self, dest, ntuple):
self._pyre.shout(dest, json.dumps(ntuple).encode('utf-8'))
# send()

def broadcast(self, ntuple):
'''Send given ntuple to Transport all destinations. If the destination isn't listening then the message will (currently) be silently ignored.'''
self._pyre.shout(self._globalchannel, json.dumps(ntuple).encode('utf-8'))
# broadcast()

# Notes on subscribe
#
# The callback is called in the same thread that listens for pyre
Expand Down Expand Up @@ -150,7 +155,7 @@ def unsubscribe_all(self):
# unsubscribe_all()

# Notes on get()
#
#
# If you already subscribe to remote, temporarly overrides
# the subscribe. The subscribed callback will NOT be called.
# The subscription is replaced after get() returns.
Expand Down Expand Up @@ -187,7 +192,7 @@ def get_callback(tup, **kw):

# Set the subscription
self._subscribers[remote] = get_callback

# Wait for the callback to be called.
e.wait()

Expand Down Expand Up @@ -225,7 +230,7 @@ def __init__(self, myname, port=None, prefix=None):

# dict of remote name to callback. See subscribe method above.
self._subscribers = {}

# Callback for all message (or None if none registered)
self._subscribe_all = None

Expand Down
20 changes: 9 additions & 11 deletions src/main/nluas/app/core_solver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""
Simple solver "core". Contains capabilities for unpacking
a JSON n-tuple, as well as routing this n-tuple based
on the predicate_type (command, query, assertion, etc.).
Other general capabilities can be added. The design
is general enough that the same "unpacking" and "routing"
Simple solver "core". Contains capabilities for unpacking
a JSON n-tuple, as well as routing this n-tuple based
on the predicate_type (command, query, assertion, etc.).
Other general capabilities can be added. The design
is general enough that the same "unpacking" and "routing"
method can be used, as long as a new method is written for a given
predicate_type.
predicate_type.
"Route_action" can be called by command/query/assertion methods,
to route each parameter to the task-specific method. E.g., "solve_move",
Expand Down Expand Up @@ -56,7 +56,6 @@ def __init__(self, args):
self.eventFeatures=None
self.parameter_templates = OrderedDict()
#self.initialize_templates()



def setup_solver_parser(self):
Expand All @@ -65,6 +64,8 @@ def setup_solver_parser(self):
return parser

def callback(self, ntuple):
if self.is_quit(ntuple):
return self.close()
self.solve(ntuple)

def initialize_templates(self):
Expand Down Expand Up @@ -109,7 +110,7 @@ def solve(self, ntuple):
def broadcast(self):
""" Here, does nothing. Later, an AgentSolver will broadcast information back to BossSolver. """
pass

def update_world(self, discovered=[]):
for item in discovered:
self.world.append(item)
Expand Down Expand Up @@ -189,9 +190,6 @@ def route_dispatch(self, dispatch_function, parameters):
""" Simply runs dispatch_function on PARAMETERS. """
return dispatch_function(parameters)

def close(self):
return

def check_for_clarification(self, ntuple):
""" Will need to be replaced by a process that checks whether ntuple needs clarification.
Requires some sort of context/world model. """
Expand Down
34 changes: 26 additions & 8 deletions src/main/nluas/core_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Author: seantrott <seantrott@icsi.berkeley.edu>
Defines a CoreAgent, which uses the Transport module. Can be initialized
by just feeding it a channel name. All "Agents" inherit from the CoreAgent.
by just feeding it a channel name. All "Agents" inherit from the CoreAgent.
------
See LICENSE.txt for licensing information.
Expand All @@ -14,6 +14,8 @@
import os
import sys
import logging
import json
import time

from collections import OrderedDict

Expand Down Expand Up @@ -52,7 +54,7 @@ def read_templates(self, filename):
return base

def unify_templates(self, child, parent):
""" Unifies a child and parent template. Adds all parent key-value pairs
""" Unifies a child and parent template. Adds all parent key-value pairs
unless the key already exists in the child. """
child.update({key:value for (key, value) in parent.items() if key not in child})
return child
Expand All @@ -69,6 +71,8 @@ def initialize(self, args):
self.logfile = args.logfile
self.loglevel = args.loglevel
self.logagent = args.logagent
self._keep_alive = True
self._broadcasted = False

def setup_parser(self):
parser = argparse.ArgumentParser()
Expand All @@ -78,17 +82,31 @@ def setup_parser(self):
parser.add_argument("-logagent", type=str, help="indicate agent responsible for logging output")
return parser

def close(self):
#self.transport.join()
print("Transport needs a QUIT procedure.")
sys.exit()
def close(self, quit_federation=False):
if not self._broadcasted:
self._broadcasted = True
self.transport.broadcast({"text": "QUIT", "type": "QUIT"}) # application-level quit

if quit_federation:
time.sleep(0.5)
self.transport.quit_federation() # transport-level quit

self._keep_alive = False

def keep_alive(self, func=None):
while self._keep_alive:
if func:
func()
else:
time.sleep(0.1)

def is_quit(self, ntuple):
""" Checks if an ntuple is the application quit message """
return "type" in ntuple and ntuple["type"] == 'QUIT'

def callback(self, ntuple):
print("{} received {}.".format(self.name, ntuple))

def subscribe_mass(self, ports):
for port in ports:
self.transport.subscribe(port, self.callback)


7 changes: 2 additions & 5 deletions src/main/nluas/language/text_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def prompt(self):
#print("Clarification is: {}".format(self.clarification))
msg = input("> ")
if msg == "q":
self.transport.quit_federation()
quit()
self.close(True)
elif msg == None or msg =="":
pass
else:
Expand All @@ -67,6 +66,4 @@ def output_stream(self, tag, message):

if __name__ == "__main__":
text = TextAgent(sys.argv[1:])
while True:
text.prompt()

text.keep_alive(text.prompt())
26 changes: 7 additions & 19 deletions src/main/nluas/language/user_agent.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

"""
The User-Agent (also called UI-Agent, Agent-UI) receives text/speech
as input, and produces an n-tuple, which it sends to a ProblemSolver.
as input, and produces an n-tuple, which it sends to a ProblemSolver.
It feeds the text through the ECG Analyzer (running on a local server)
to produce a SemSpec, which it then runs through the CoreSpecializer to produce
the n-tuple.
the n-tuple.
Interaction with the user is modulated through the output_stream method, which
allows designers to subclass the User-Agent and define a new mode of interaction.
Expand Down Expand Up @@ -78,7 +78,7 @@ def initialize_UI(self):

def initialize_analyzer(self):
self.analyzer = Analyzer(self.analyzer_port)

def initialize_specializer(self):
try:
self.specializer=CoreSpecializer(self.analyzer)
Expand Down Expand Up @@ -130,7 +130,7 @@ def speech_callback(self, ntuple):
#ntuple = json.loads(ntuple)
text = ntuple['text'].lower()
print("Got {}".format(text))
new_ntuple = self.process_input(text)
new_ntuple = self.process_input(text)
if new_ntuple and new_ntuple != "null" and "predicate_type" in new_ntuple:
self.transport.send(self.solve_destination, new_ntuple)

Expand All @@ -141,7 +141,9 @@ def text_callback(self, ntuple):
specialize = True
#ntuple = json.loads(ntuple)
msg = ntuple['text']
if ntuple['type'] == "standard":
if self.is_quit(ntuple):
self.close()
elif ntuple['type'] == "standard":
if msg == None or msg == "":
specialize = False
elif msg.lower() == "d":
Expand All @@ -159,7 +161,6 @@ def text_callback(self, ntuple):
self.clarification = False



def callback(self, ntuple):
print(ntuple)
#ntuple = self.decoder.convert_JSON_to_ntuple(ntuple)
Expand All @@ -182,7 +183,6 @@ def write_file(self, json_ntuple, msg):




def process_clarification(self, tag, msg, ntuple):
self.clarification = True
#self.output_stream(tag, msg)
Expand All @@ -203,15 +203,6 @@ def clarify_ntuple(self, ntuple, descriptor):
new[key] = value
return new


def prompt(self):
while True:
s = input("> ")
if s == "q":
self.transport.quit_federation()
quit()


def check_spelling(self, msg):
table = self.spell_checker.spell_check(msg)
if table:
Expand All @@ -229,6 +220,3 @@ def check_spelling(self, msg):

if __name__ == "__main__":
ui = UserAgent(sys.argv[1:])
ui.prompt()


0 comments on commit 257dc68

Please sign in to comment.