Skip to content

Commit

Permalink
Merge pull request thingsboard#1452 from samson0v/master
Browse files Browse the repository at this point in the history
Added handling rpc methods to connectors on the new UI
  • Loading branch information
imbeacon authored Jul 8, 2024
2 parents 41b3166 + ac11412 commit 68de3a9
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 97 deletions.
50 changes: 37 additions & 13 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def server_side_rpc_handler(self, server_rpc_request):
if connector_type == self._connector_type:
rpc_method = rpc_method_name
server_rpc_request['device'] = server_rpc_request['params'].split(' ')[0].split('=')[-1]
except (IndexError, ValueError):
except (IndexError, ValueError, AttributeError):
pass

if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None:
Expand Down Expand Up @@ -679,17 +679,24 @@ def server_side_rpc_handler(self, server_rpc_request):
break
else:
self.__log.error("Received rpc request, but method %s not found in config for %s.",
rpc_method,
self.get_name())
rpc_method,
self.get_name())
self.__gateway.send_rpc_reply(server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request[DATA_PARAMETER][RPC_ID_PARAMETER],
{rpc_method: "METHOD NOT FOUND!"})
else:
self.__log.debug("Received RPC to connector: %r", server_rpc_request)
results = []
for device in self.__slaves:
server_rpc_request[DEVICE_SECTION_PARAMETER] = device.device_name
results.append(self.__process_request(server_rpc_request, server_rpc_request['params'], return_result=True))

return results

except Exception as e:
self.__log.exception(e)

def __process_request(self, content, rpc_command_config, request_type='RPC'):
def __process_request(self, content, rpc_command_config, request_type='RPC', return_result=False):
self.__log.debug('Processing %s request', request_type)
if rpc_command_config is not None:
device = ModbusConnector.__get_device_by_name(content[DEVICE_SECTION_PARAMETER], self.__slaves)
Expand Down Expand Up @@ -745,16 +752,33 @@ def __process_request(self, content, rpc_command_config, request_type='RPC'):
if content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)) is not None:
if isinstance(response, Exception) or isinstance(response, ExceptionResponse):
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
success_sent=False)
if not return_result:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
success_sent=False)
else:
return {
'device': content[DEVICE_SECTION_PARAMETER],
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
'content': {
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
'success_sent': False
}
else:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
if not return_result:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
else:
return {
'device': content[DEVICE_SECTION_PARAMETER],
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
'content': response
}

self.__log.debug("%r", response)

Expand Down
43 changes: 23 additions & 20 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ def __process_rpc_request(self, content, rpc_config):
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))

if content['device']:
if content.get('device'):
request_topic = request_topic.replace("${deviceName}", str(content["device"]))

request_topic = TBUtility.replace_params_tags(request_topic, content)
Expand Down Expand Up @@ -882,7 +882,7 @@ def __process_rpc_request(self, content, rpc_config):
return
if not expects_response or not defines_timeout:
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
self.__gateway.send_rpc_reply(device=content.get('device', ''), req_id=content["data"]["id"],
success_sent=True)

# Everything went out smoothly: RPC is served
Expand All @@ -907,28 +907,31 @@ def server_side_rpc_handler(self, content):
except ValueError:
pass

# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue
if content.get('device'):
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue

if key and value:
params[key] = value
if key and value:
params[key] = value

return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], rpc_method) is not None:
return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], rpc_method) is not None:

return self.__process_rpc_request(content, rpc_config)
return self.__process_rpc_request(content, rpc_config)

self.__log.error("RPC not handled: %s", content)
self.__log.error("RPC not handled: %s", content)
else:
return self.__process_rpc_request(content, content['data']['params'])

@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
def _publish(self, request_topic, data_to_send, retain):
Expand Down
152 changes: 89 additions & 63 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def on_attributes_update(self, content):
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
content['data'] = {'params': content['params'], 'method': content['method'], 'id': content['id']}

rpc_method = content["data"].get("method")

Expand All @@ -528,76 +528,102 @@ def server_side_rpc_handler(self, content):
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (ValueError, IndexError):
self.__log.error('Invalid RPC method name: %s', rpc_method)
except AttributeError:
pass

# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')
if content.get('device'):
# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')

try:
args_list = content['data']['params'].split(';')
try:
args_list = content['data']['params'].split(';')

if 'ns' in content['data']['params']:
full_path = ';'.join(
[item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
else:
full_path = args_list[0].split('=')[-1]
except IndexError:
self.__log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})

result = {}
if rpc_method == 'get':
task = self.__loop.create_task(self.__read_value(full_path, result))

while not task.done():
sleep(.2)
elif rpc_method == 'set':
value = args_list[2].split('=')[-1]
task = self.__loop.create_task(self.__write_value(full_path, value, result))

while not task.done():
sleep(.2)

if 'ns' in content['data']['params']:
full_path = ';'.join([item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
else:
full_path = args_list[0].split('=')[-1]
except IndexError:
self.__log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})

result = {}
if rpc_method == 'get':
task = self.__loop.create_task(self.__read_value(full_path, result))

while not task.done():
sleep(.2)
elif rpc_method == 'set':
value = args_list[2].split('=')[-1]
task = self.__loop.create_task(self.__write_value(full_path, value, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data']['method']: result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

for rpc in device.config['rpc_methods']:
if rpc['method'] == content["data"]['method']:
arguments_from_config = rpc["arguments"]
arguments = content["data"].get("params") if content["data"].get(
"params") is not None else arguments_from_config
method_name = content['data']['method']

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, method_name, arguments, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(content["device"],
content["data"]["id"],
{content["data"]["method"]: result, "code": 200})
self.__log.debug("method %s result is: %s", rpc['method'], result)
except Exception as e:
self.__log.exception(e)
content={content['data']['method']: result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

for rpc in device.config['rpc_methods']:
if rpc['method'] == content["data"]['method']:
arguments_from_config = rpc["arguments"]
arguments = content["data"].get("params") if content["data"].get(
"params") is not None else arguments_from_config
method_name = content['data']['method']

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, method_name, arguments, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(content["device"],
content["data"]["id"],
{content["data"]["method"]: result, "code": 200})
self.__log.debug("method %s result is: %s", rpc['method'], result)
except Exception as e:
self.__log.exception(e)
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": str(e), "code": 500})
else:
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": str(e), "code": 500})
else:
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
{"error": "%s - Method not found" % rpc_method,
"code": 404})
else:
results = []
for device in self.__device_nodes:
content['device'] = device.name

arguments = content['data']['params']["arguments"]

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, rpc_method, arguments, result))

while not task.done():
sleep(.2)

results.append(result)
self.__log.debug("method %s result is: %s", rpc_method, result)
except Exception as e:
self.__log.exception(e)
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": "%s - Method not found" % rpc_method,
"code": 404})
{"error": str(e), "code": 500})

return results
except Exception as e:
self.__log.exception(e)

Expand Down
2 changes: 1 addition & 1 deletion thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ def _rpc_request_handler(self, request_id, content):
connector_name)
content['id'] = request_id
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content) # noqa E501
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
elif module == 'gateway' or (self.__remote_shell and module in self.__remote_shell.shell_commands):
result = self.__rpc_gateway_processing(request_id, content)
else:
log.error("Connector \"%s\" not found", module)
Expand Down

0 comments on commit 68de3a9

Please sign in to comment.