-
Notifications
You must be signed in to change notification settings - Fork 37
Control Agent Development
Fig.1 Agent thread path of execution
The development processes of control agent according to the agent thread path of execution are explained in detail as follows:
Step1: Load Agent Configuration
1.1 load parameters setting for agent
#1. @params agent
agent_id = get_config('agent_id')
LOG_DATA_PERIOD = get_config('poll_time')
device_monitor_time = get_config('device_monitor_time')
publish_address = 'ipc:///tmp/volttron-lite-agent-publish'
subscribe_address = 'ipc:///tmp/volttron-lite-agent-subscribe'
debug_agent = False
agentknowledge = dict(day=["day"], hour=["hour"], minute=["minute"], temperature=["temp", "temperature",
"current_temp"], thermostat_mode=["tmode", "ther_mode", "thermostat_mode"],
fan_mode=["fmode", "fan_mode"], heat_setpoint=["t_heat", "temp_heat", "heat_setpoint"],
cool_setpoint=["t_cool", "temp_cool", "cool_setpoint"], thermostat_state=["tstate",
"thermostat_state"], fan_state=["fstate", "fan_state"])
agentAPImapping = dict(temperature=[], thermostat_mode=[], fan_mode=[], heat_setpoint=[],
cool_setpoint=[], thermostat_state=[], fan_state=[])1.2 load parameters setting device information
#2. @params device_info
# building = get_config('building')
# zone = get_config('zone')
# room = get_config('room')
model = get_config('model')
device_type = get_config('type')
address = get_config('address')
# mac_address = get_config('mac_address')1.3 load parameters for database interface
#3. @params agent & DB interfaces
db_host = get_config('db_host')
db_port = get_config('db_port')
db_database = get_config('db_database')
db_user = get_config('db_user')
db_password = get_config('db_password')
_topic_Agent_UI = 'building1/999/thermostat/'+agent_id+'/'
_topic_Agent_sMAP = 'datalogger/log/building1/999/thermostat/'+agent_idStep2: Initialize Device Object
2.1 create a device object from the API interface (with initial configurations)
#4. @params device_api
api = get_config('api')
apiLib = importlib.import_module("testAPI."+api)
#4.1 initialize thermostat device object
Thermostat = apiLib.API(model=model, device_type=device_type, api=api, address=address, agent_id=agent_id)
print("{0}agent is initialized for {1} using API={2} at {3}".format(agent_id, Thermostat.get_variable('model'),
Thermostat.get_variable('api'),
Thermostat.get_variable('address')))2.2 setup notification information
#5. @params notification_info
send_notification = False
email_fromaddr = settings.NOTIFICATION['email']['fromaddr']
email_recipients = settings.NOTIFICATION['email']['recipients']
email_username = settings.NOTIFICATION['email']['username']
email_password = settings.NOTIFICATION['email']['password']
email_mailServer = settings.NOTIFICATION['email']['mailServer']
alert_too_hot = settings.NOTIFICATION['thermostat']['too_hot']
alert_too_cold = settings.NOTIFICATION['thermostat']['too_cold']
send_notification_status = False
time_delay_send_notification = 600Step3: Initialize Agent
There are two essential methods to initialize an agent
3.1 __init__() method
3.1.1 initialize agent variables
3.1.2 setup connections with databases
3.1.3 send notification to building admin/user (optional)
class Agent(PublishMixin, BaseAgent):
#1. agent initialization
def __init__(self, **kwargs):
super(Agent, self).__init__(**kwargs)
#1. initialize all agent variables
self.variables = kwargs
self.valid_data = False
self._keep_alive = True
self.first_time_update = True
self.topic = _topic_Agent_sMAP
self.flag = 1
#2. setup connection with db -> Connect to bemossdb database
try:
self.con = psycopg2.connect(host=db_host, port=db_port, database=db_database, user=db_user,
password=db_password)
self.cur = self.con.cursor() # open a cursor to perfomm database operations
print("{} connects to the database name {} successfully".format(agent_id, db_database))
except:
print("ERROR: {} fails to connect to the database name {}".format(agent_id, db_database))
#3. send notification to notify building admin
self.send_notification = send_notification
self.send_notification_status = send_notification_status
self.time_send_notification = 0
if self.send_notification:
self.subject = 'Message from ' + agent_id
self.text = 'Now an agent device_type {} for {} with API {} at address {} is launched!'.format(
Thermostat.get_variable('device_type'), Thermostat.get_variable('model'),
Thermostat.get_variable('api'), Thermostat.get_variable('address'))
emailService = EmailService()
emailService.sendEmail(email_fromaddr, email_recipients, email_username, email_password, self.subject,
self.text, email_mailServer)
#These set and get methods allow scalability
def set_variable(self, k, v): # k=key, v=value
self.variables[k] = v
def get_variable(self, k):
return self.variables.get(k, None) # default of get_variable is none3.2 setup() method
#2. agent setup method
def setup(self):
super(Agent, self).setup()
#1. Do a one time push when we start up so we don't have to wait for the periodic
Thermostat.identifyDevice()
self.timer(1, self.deviceMonitorBehavior)Step4: DeviceMonitor Behavior
4.1 declare the DeviceMonitor Behavior to be the TickerBehavior (periodically run by specifying "device_monitor_time")
#deviceMonitorBehavior (TickerBehavior)
@periodic(device_monitor_time)
def deviceMonitorBehavior(self):4.2 get current status of a device by calling getDeviceStatus() method of an API interface, then map keywords and variables to agent knowledge
#step1: get current status of a thermostat, then map keywords and variables to agent knowledge
try:
Thermostat.getDeviceStatus()
#mapping variables from API to Agent's knowledge
for APIKeyword, APIvariable in Thermostat.variables.items():
if debug_agent:
print (APIKeyword, APIvariable)
self.set_variable(self.getKeyword(APIKeyword), APIvariable) # set variables of agent from API variables
agentAPImapping[self.getKeyword(APIKeyword)] = APIKeyword # map keyword of agent and API
except:
print("device connection is not successful")
if self.first_time_update:
if self.get_variable('heat_setpoint') is None:
self.set_variable('heat_setpoint', 70)
else:
pass
if self.get_variable('cool_setpoint') is None:
self.set_variable('cool_setpoint', 70)
else:
pass
self.first_time_update = False
else:
pass4.3 update PostgreSQL (meta-data database) with the current status of a device
#step2: update PostgresQL (meta-data) database
try:
self.cur.execute("UPDATE dashboard_current_status SET temperature=%s WHERE id=%s",
(self.get_variable('temperature'), agent_id))
self.con.commit()
self.cur.execute("UPDATE dashboard_current_status SET fmode=%s WHERE id=%s",
(self.get_variable('fan_mode'), agent_id))
self.con.commit()
#TODO check thermostat mode
if self.get_variable('thermostat_mode') == "HEAT":
self.cur.execute("UPDATE dashboard_current_status SET setpoint=%s WHERE id=%s",
(self.get_variable('heat_setpoint'), agent_id))
self.con.commit()
self.cur.execute("UPDATE dashboard_current_status SET mode=%s WHERE id=%s", ('HEAT', agent_id))
self.con.commit()
elif self.get_variable('thermostat_mode') == "COOL":
self.cur.execute("UPDATE dashboard_current_status SET setpoint=%s WHERE id=%s",
(self.get_variable('cool_setpoint'), agent_id))
self.con.commit()
self.cur.execute("UPDATE dashboard_current_status SET mode=%s WHERE id=%s", ('COOL', agent_id))
self.con.commit()
elif self.get_variable('thermostat_mode') == "OFF":
self.cur.execute("UPDATE dashboard_current_status SET mode=%s WHERE id=%s", ('OFF', agent_id))
self.con.commit()
elif self.get_variable('thermostat_mode') == "AUTO":
self.cur.execute("UPDATE dashboard_current_status SET mode=%s WHERE id=%s", ('AUTO', agent_id))
self.con.commit()
else:
pass
print("{} updates database name {} during deviceMonitorBehavior successfully".format(agent_id,db_database))
except:
print("ERROR: {} failed to update database name {}".format(agent_id, db_database))4.4 update sMAP (time-series) database
#step3: update sMAP (time-series) database
try:
self.publish_logdata1()
self.publish_logdata2()
self.publish_logdata3()
self.publish_logdata4()
self.publish_logdata5()
self.publish_logdata6()
self.publish_logdata7()
print "{} success update sMAP database".format(agent_id)
except:
print("ERROR: {} fails to update sMAP database".format(agent_id))4.5 send notification to building admin/user if the notified conditions is met
#step4: send notification to a user if required
if self.send_notification:
if self.send_notification_status:
pass
else:
if self.get_variable('temperature') >= alert_too_hot: # notify the user if there is something error
print('send notification message from {}'.format(agent_id))
self.text = 'Too hot! now the current temperature is {0} F which is exceed {1} F'.format(
self.get_variable('temperature'), alert_too_hot)
emailService = EmailService()
emailService.sendEmail(email_fromaddr, email_recipients, email_username, email_password,
self.subject, self.text, email_mailServer)
self.send_notification_status = True
self.time_send_notification = time.time()
elif self.get_variable('temperature') <= alert_too_cold:
print('send notification message from {}'.format(agent_id))
self.text = 'Too cold! now the current temperature is {0} F which is below {1} F'.format(
self.get_variable('temperature'), alert_too_hot)
emailService = EmailService()
emailService.sendEmail(email_fromaddr, email_recipients, email_username, email_password,
self.subject, self.text, email_mailServer)
self.send_notification_status = True
self.time_send_notification = time.time()
if time.time() - self.time_send_notification >= time_delay_send_notification:
self.send_notification_status = False4.6 (if required) printout agent's knowledge and mapping fields for debugging purpose
#step5: debug agent knowledge
if debug_agent:
print("printing agent's knowledge")
for k,v in self.variables.items():
print (k,v)
print('')
if debug_agent:
print("printing agentAPImapping's fields")
for k, v in agentAPImapping.items():
if k is None:
agentAPImapping.update({v: v})
agentAPImapping.pop(k)
for k, v in agentAPImapping.items():
print (k, v)Step5: UpdateDeviceStatus Behavior
5.1 declare the UpdateDeviceStatus Behavior to be the GenericBehavior which will be run according to the filtered message with the specific topic defined in @matching.match_exact() decorator
In this example, the topic is "'/ui/agent/'+_topic_Agent_UI+'device_status'" where "_topic_Agent_UI" is defined in the (1) Load Agent Configuration Step
#updateUIBehavior (generic behavior)
@matching.match_exact('/ui/agent/'+_topic_Agent_UI+'device_status')Note: for the inter-exchange topic between Applications layer and OS layer, please check the API between APP layer and OS layer
for the inter-exchange topic between agents in the OS layer, please check #TODO add agent communication page
5.2 the main UpdateDeviceStatus behavior that will send back a current device status of an agent's knowledge to any entity (e.g. UI or another agent) that triggers this behavior
def updateUIBehavior(self, topic, headers, message, match):
print agent_id + " got\nTopic: {topic}".format(topic=topic)
print "Headers: {headers}".format(headers=headers)
print "Message: {message}\n".format(message=message)
#reply message
topic = '/agent/ui/'+_topic_Agent_UI+'device_status/response'
now = datetime.utcnow().isoformat(' ') + 'Z'
headers = {
'AgentID': agent_id,
headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.JSON,
headers_mod.DATE: now,
headers_mod.FROM: agent_id,
headers_mod.TO: 'ui'
}
_data = {'temperature': self.get_variable('temperature'), 'thermostat_mode':
self.get_variable('thermostat_mode'), 'fan_mode': self.get_variable('fan_mode'),
'heat_setpoint': self.get_variable('heat_setpoint'), 'cool_setpoint': self.get_variable('cool_setpoint'),
'thermostat_state': self.get_variable('thermostat_state'), 'fan_state': self.get_variable('fan_state')
}
message = json.dumps(_data)
message = message.encode(encoding='utf_8')
self.publish(topic, headers, message)Step6: DeviceControl Behavior
6.1 declare the DeviceControl Behavior to be the GenericBehavior which will be run according to the filtered message with the specific topic defined in @matching.match_exact() decorator
In this example, the topic is "'/ui/agent/'+_topic_Agent_UI+'update'" where "_topic_Agent_UI" is defined in the (1) Load Agent Configuration Step
#deviceControlBehavior (generic behavior)
@matching.match_exact('/ui/agent/'+_topic_Agent_UI+'update')Note: for the inter-exchange topic between Applications layer and OS layer, please check the API between APP layer and OS layer
for the inter-exchange topic between agents in the OS layer, please check #TODO add agent communication page
6.2 the main DeviceControl behavior will send a control message to change a current status of a device based on the received message from another entity (e.g. UI or another agent) that triggers this behavior
def deviceControlBehavior(self, topic, headers, message, match):
print agent_id + " got\nTopic: {topic}".format(topic=topic)
print "Headers: {headers}".format(headers=headers)
print "Message: {message}\n".format(message=message)
#step1: change device status according to the receive message
if self.isPostmsgValid(message[0]): # check if the data is valid
setDeviceStatusResult = Thermostat.setDeviceStatus(json.loads(message[0])) # convert received message from string to JSON
#TODO need to do additional checking whether the device setting is actually success!!!!!!!!
#step2: update agent's knowledge on this device
Thermostat.getDeviceStatus()
#step3: send reply message back to the UI
topic = '/agent/ui/'+_topic_Agent_UI+'update/response'
now = datetime.utcnow().isoformat(' ') + 'Z'
headers = {
'AgentID': agent_id,
headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.PLAIN_TEXT,
headers_mod.DATE: now,
}
if setDeviceStatusResult:
message = 'success'
else:
message = 'failure'
else:
print("The POST message is invalid, check thermostat_mode, heat_setpoint, cool_coolsetpoint "
"setting and try again\n")
message = 'failure'
self.publish(topic, headers, message)Note: isPostmsgValid() method is called to verify the received message content from another entity (e.g. UI or another agent) before parsing and calling an API interface to actually communicate and control a device.
Step7: IdentifyDevice Behavior
7.1 declare the DeviceIdentify Behavior to be the GenericBehavior which will be run according to the filtered message with the specific topic defined in @matching.match_exact() decorator
In this example, the topic is "'/ui/agent/'+_topic_Agent_UI+'identify'" where "_topic_Agent_UI" is defined in the (1) Load Agent Configuration Step
#deviceIdentifyBehavior (generic behavior)
@matching.match_exact('/ui/agent/'+_topic_Agent_UI+'identify')Note: for the inter-exchange topic between Applications layer and OS layer, please check the API between APP layer and OS layer
for the inter-exchange topic between agents in the OS layer, please check agent communication page
7.2 the main DeviceIdentify behavior will call a method in an API Interface to identify a device which is triggered by the received message from another entity (e.g. UI or another agent). The identification process depends on device type, available identification methods (e.g. LED blinking or sound beeping) which are defined in a corresponding API Interface of a particular device.
def deviceIdentifyBehavior(self,topic,headers,message,match):
print agent_id+ " got\nTopic: {topic}".format(topic=topic)
print "Headers: {headers}".format(headers=headers)
print "Message: {message}\n".format(message=message)
#step1: change device status according to the receive message
identifyDeviceResult = Thermostat.identifyDevice()
#TODO need to do additional checking whether the device setting is actually success!!!!!!!!
#step2: send reply message back to the UI
topic = '/agent/ui/'+_topic_Agent_UI+'identify/response'
now = datetime.utcnow().isoformat(' ') + 'Z'
headers = {
'AgentID': agent_id,
headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.PLAIN_TEXT,
headers_mod.DATE: now,
}
if identifyDeviceResult:
message = 'success'
else:
message = 'failure'
self.publish(topic, headers, message)Helper methods for the Control Agent
Helper method 1: set() method to insert or update an agent's knowledge
def set_variable(self, k, v): # k=key, v=value
self.variables[k] = vHelper method 2: get() method to retrieve an agent's knowledge
def get_variable(self, k):
return self.variables.get(k, None)Helper method 3: getKeyword() method for agent's knowledge mapping
def getKeyword(self, APIKeyword):
for k, v in agentknowledge.items():
if APIKeyword in agentknowledge[k]:
return k
flag = 1
break
else:
flag = 0
pass
if flag == 0: # if flag still 0 means that a APIKeyword is not in an agent knowledge,
# then add it to agent knowledge
return APIKeywordHelper method 4: isPostmsgValid() method for checking validity of a post message that will be used by the deviceControl Behavior
def isPostmsgValid(self, postmsg): # check validity of postmsg
dataValidity = True
try:
_data = json.dumps(postmsg)
_data = json.loads(_data)
for k,v in _data.items():
if k == 'thermostat_mode':
if _data.get('thermostat_mode') == "HEAT":
for k, v in _data.items():
if k == 'cool_setpoint':
dataValidity = False
break
elif _data.get('thermostat_mode') == "COOL":
for k, v in _data.items():
if k == 'heat_setpoint':
dataValidity = False
break
except:
dataValidity = True
print("dataValidity failed to validate data comes from UI")
return dataValidityHelper method 5: publish_logdata() to send a device current status to the sMAP database by publishing a data to the Information Exchange Bus (IEB) that will be filtered and retrieved by the sMAP driver
def publish_logdata1(self):
headers = {
headers_mod.FROM: agent_id,
headers_mod.CONTENT_TYPE: headers_mod.CONTENT_TYPE.JSON,
}
mytime = int(time.time())
content = {
"temperature": {
"Readings": [[mytime, float(self.get_variable("temperature"))]],
"Units": "F",
"data_type": "double"
}
}
print("{} published temperature to an IEB".format(agent_id))
self.publish(_topic_Agent_sMAP, headers, json.dumps(content))Reference: Final Agent Code Structure in Python