Skip to content

Commit ac11412

Browse files
committed
Added handling rpc methods to connectors on the new UI
1 parent 41b3166 commit ac11412

File tree

4 files changed

+150
-97
lines changed

4 files changed

+150
-97
lines changed

thingsboard_gateway/connectors/modbus/modbus_connector.py

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ def server_side_rpc_handler(self, server_rpc_request):
642642
if connector_type == self._connector_type:
643643
rpc_method = rpc_method_name
644644
server_rpc_request['device'] = server_rpc_request['params'].split(' ')[0].split('=')[-1]
645-
except (IndexError, ValueError):
645+
except (IndexError, ValueError, AttributeError):
646646
pass
647647

648648
if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None:
@@ -679,17 +679,24 @@ def server_side_rpc_handler(self, server_rpc_request):
679679
break
680680
else:
681681
self.__log.error("Received rpc request, but method %s not found in config for %s.",
682-
rpc_method,
683-
self.get_name())
682+
rpc_method,
683+
self.get_name())
684684
self.__gateway.send_rpc_reply(server_rpc_request[DEVICE_SECTION_PARAMETER],
685685
server_rpc_request[DATA_PARAMETER][RPC_ID_PARAMETER],
686686
{rpc_method: "METHOD NOT FOUND!"})
687687
else:
688688
self.__log.debug("Received RPC to connector: %r", server_rpc_request)
689+
results = []
690+
for device in self.__slaves:
691+
server_rpc_request[DEVICE_SECTION_PARAMETER] = device.device_name
692+
results.append(self.__process_request(server_rpc_request, server_rpc_request['params'], return_result=True))
693+
694+
return results
695+
689696
except Exception as e:
690697
self.__log.exception(e)
691698

692-
def __process_request(self, content, rpc_command_config, request_type='RPC'):
699+
def __process_request(self, content, rpc_command_config, request_type='RPC', return_result=False):
693700
self.__log.debug('Processing %s request', request_type)
694701
if rpc_command_config is not None:
695702
device = ModbusConnector.__get_device_by_name(content[DEVICE_SECTION_PARAMETER], self.__slaves)
@@ -745,16 +752,33 @@ def __process_request(self, content, rpc_command_config, request_type='RPC'):
745752
if content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None
746753
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)) is not None:
747754
if isinstance(response, Exception) or isinstance(response, ExceptionResponse):
748-
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
749-
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
750-
content={
751-
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
752-
},
753-
success_sent=False)
755+
if not return_result:
756+
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
757+
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
758+
content={
759+
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
760+
},
761+
success_sent=False)
762+
else:
763+
return {
764+
'device': content[DEVICE_SECTION_PARAMETER],
765+
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
766+
'content': {
767+
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
768+
},
769+
'success_sent': False
770+
}
754771
else:
755-
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
756-
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
757-
content=response)
772+
if not return_result:
773+
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
774+
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
775+
content=response)
776+
else:
777+
return {
778+
'device': content[DEVICE_SECTION_PARAMETER],
779+
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
780+
'content': response
781+
}
758782

759783
self.__log.debug("%r", response)
760784

thingsboard_gateway/connectors/mqtt/mqtt_connector.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ def __process_rpc_request(self, content, rpc_config):
853853
.replace("${methodName}", str(content['data']['method'])) \
854854
.replace("${requestId}", str(content["data"]["id"]))
855855

856-
if content['device']:
856+
if content.get('device'):
857857
request_topic = request_topic.replace("${deviceName}", str(content["device"]))
858858

859859
request_topic = TBUtility.replace_params_tags(request_topic, content)
@@ -882,7 +882,7 @@ def __process_rpc_request(self, content, rpc_config):
882882
return
883883
if not expects_response or not defines_timeout:
884884
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
885-
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
885+
self.__gateway.send_rpc_reply(device=content.get('device', ''), req_id=content["data"]["id"],
886886
success_sent=True)
887887

888888
# Everything went out smoothly: RPC is served
@@ -907,28 +907,31 @@ def server_side_rpc_handler(self, content):
907907
except ValueError:
908908
pass
909909

910-
# check if RPC method is reserved get/set
911-
if rpc_method == 'get' or rpc_method == 'set':
912-
params = {}
913-
for param in content['data']['params'].split(';'):
914-
try:
915-
(key, value) = param.split('=')
916-
except ValueError:
917-
continue
910+
if content.get('device'):
911+
# check if RPC method is reserved get/set
912+
if rpc_method == 'get' or rpc_method == 'set':
913+
params = {}
914+
for param in content['data']['params'].split(';'):
915+
try:
916+
(key, value) = param.split('=')
917+
except ValueError:
918+
continue
918919

919-
if key and value:
920-
params[key] = value
920+
if key and value:
921+
params[key] = value
921922

922-
return self.__process_rpc_request(content, params)
923-
else:
924-
# Check whether one of my RPC handlers can handle this request
925-
for rpc_config in self.__server_side_rpc:
926-
if search(rpc_config["deviceNameFilter"], content["device"]) \
927-
and search(rpc_config["methodFilter"], rpc_method) is not None:
923+
return self.__process_rpc_request(content, params)
924+
else:
925+
# Check whether one of my RPC handlers can handle this request
926+
for rpc_config in self.__server_side_rpc:
927+
if search(rpc_config["deviceNameFilter"], content["device"]) \
928+
and search(rpc_config["methodFilter"], rpc_method) is not None:
928929

929-
return self.__process_rpc_request(content, rpc_config)
930+
return self.__process_rpc_request(content, rpc_config)
930931

931-
self.__log.error("RPC not handled: %s", content)
932+
self.__log.error("RPC not handled: %s", content)
933+
else:
934+
return self.__process_rpc_request(content, content['data']['params'])
932935

933936
@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
934937
def _publish(self, request_topic, data_to_send, retain):

thingsboard_gateway/connectors/opcua/opcua_connector.py

Lines changed: 89 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ def on_attributes_update(self, content):
516516
def server_side_rpc_handler(self, content):
517517
try:
518518
if content.get('data') is None:
519-
content['data'] = {'params': content['params'], 'method': content['method']}
519+
content['data'] = {'params': content['params'], 'method': content['method'], 'id': content['id']}
520520

521521
rpc_method = content["data"].get("method")
522522

@@ -528,76 +528,102 @@ def server_side_rpc_handler(self, content):
528528
content['device'] = content['params'].split(' ')[0].split('=')[-1]
529529
except (ValueError, IndexError):
530530
self.__log.error('Invalid RPC method name: %s', rpc_method)
531+
except AttributeError:
532+
pass
531533

532-
# firstly check if a method is not service
533-
if rpc_method == 'set' or rpc_method == 'get':
534-
full_path = ''
535-
args_list = []
536-
device = content.get('device')
534+
if content.get('device'):
535+
# firstly check if a method is not service
536+
if rpc_method == 'set' or rpc_method == 'get':
537+
full_path = ''
538+
args_list = []
539+
device = content.get('device')
537540

538-
try:
539-
args_list = content['data']['params'].split(';')
541+
try:
542+
args_list = content['data']['params'].split(';')
543+
544+
if 'ns' in content['data']['params']:
545+
full_path = ';'.join(
546+
[item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
547+
else:
548+
full_path = args_list[0].split('=')[-1]
549+
except IndexError:
550+
self.__log.error('Not enough arguments. Expected min 2.')
551+
self.__gateway.send_rpc_reply(device=device,
552+
req_id=content['data'].get('id'),
553+
content={content['data'][
554+
'method']: 'Not enough arguments. Expected min 2.',
555+
'code': 400})
556+
557+
result = {}
558+
if rpc_method == 'get':
559+
task = self.__loop.create_task(self.__read_value(full_path, result))
560+
561+
while not task.done():
562+
sleep(.2)
563+
elif rpc_method == 'set':
564+
value = args_list[2].split('=')[-1]
565+
task = self.__loop.create_task(self.__write_value(full_path, value, result))
566+
567+
while not task.done():
568+
sleep(.2)
540569

541-
if 'ns' in content['data']['params']:
542-
full_path = ';'.join([item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
543-
else:
544-
full_path = args_list[0].split('=')[-1]
545-
except IndexError:
546-
self.__log.error('Not enough arguments. Expected min 2.')
547570
self.__gateway.send_rpc_reply(device=device,
548571
req_id=content['data'].get('id'),
549-
content={content['data'][
550-
'method']: 'Not enough arguments. Expected min 2.',
551-
'code': 400})
552-
553-
result = {}
554-
if rpc_method == 'get':
555-
task = self.__loop.create_task(self.__read_value(full_path, result))
556-
557-
while not task.done():
558-
sleep(.2)
559-
elif rpc_method == 'set':
560-
value = args_list[2].split('=')[-1]
561-
task = self.__loop.create_task(self.__write_value(full_path, value, result))
562-
563-
while not task.done():
564-
sleep(.2)
565-
566-
self.__gateway.send_rpc_reply(device=device,
567-
req_id=content['data'].get('id'),
568-
content={content['data']['method']: result})
569-
else:
570-
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]
571-
572-
for rpc in device.config['rpc_methods']:
573-
if rpc['method'] == content["data"]['method']:
574-
arguments_from_config = rpc["arguments"]
575-
arguments = content["data"].get("params") if content["data"].get(
576-
"params") is not None else arguments_from_config
577-
method_name = content['data']['method']
578-
579-
try:
580-
result = {}
581-
task = self.__loop.create_task(
582-
self.__call_method(device.path, method_name, arguments, result))
583-
584-
while not task.done():
585-
sleep(.2)
586-
587-
self.__gateway.send_rpc_reply(content["device"],
588-
content["data"]["id"],
589-
{content["data"]["method"]: result, "code": 200})
590-
self.__log.debug("method %s result is: %s", rpc['method'], result)
591-
except Exception as e:
592-
self.__log.exception(e)
572+
content={content['data']['method']: result})
573+
else:
574+
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]
575+
576+
for rpc in device.config['rpc_methods']:
577+
if rpc['method'] == content["data"]['method']:
578+
arguments_from_config = rpc["arguments"]
579+
arguments = content["data"].get("params") if content["data"].get(
580+
"params") is not None else arguments_from_config
581+
method_name = content['data']['method']
582+
583+
try:
584+
result = {}
585+
task = self.__loop.create_task(
586+
self.__call_method(device.path, method_name, arguments, result))
587+
588+
while not task.done():
589+
sleep(.2)
590+
591+
self.__gateway.send_rpc_reply(content["device"],
592+
content["data"]["id"],
593+
{content["data"]["method"]: result, "code": 200})
594+
self.__log.debug("method %s result is: %s", rpc['method'], result)
595+
except Exception as e:
596+
self.__log.exception(e)
597+
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
598+
{"error": str(e), "code": 500})
599+
else:
600+
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
593601
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
594-
{"error": str(e), "code": 500})
595-
else:
596-
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
602+
{"error": "%s - Method not found" % rpc_method,
603+
"code": 404})
604+
else:
605+
results = []
606+
for device in self.__device_nodes:
607+
content['device'] = device.name
608+
609+
arguments = content['data']['params']["arguments"]
610+
611+
try:
612+
result = {}
613+
task = self.__loop.create_task(
614+
self.__call_method(device.path, rpc_method, arguments, result))
615+
616+
while not task.done():
617+
sleep(.2)
618+
619+
results.append(result)
620+
self.__log.debug("method %s result is: %s", rpc_method, result)
621+
except Exception as e:
622+
self.__log.exception(e)
597623
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
598-
{"error": "%s - Method not found" % rpc_method,
599-
"code": 404})
624+
{"error": str(e), "code": 500})
600625

626+
return results
601627
except Exception as e:
602628
self.__log.exception(e)
603629

thingsboard_gateway/gateway/tb_gateway_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1242,7 +1242,7 @@ def _rpc_request_handler(self, request_id, content):
12421242
connector_name)
12431243
content['id'] = request_id
12441244
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content) # noqa E501
1245-
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
1245+
elif module == 'gateway' or (self.__remote_shell and module in self.__remote_shell.shell_commands):
12461246
result = self.__rpc_gateway_processing(request_id, content)
12471247
else:
12481248
log.error("Connector \"%s\" not found", module)

0 commit comments

Comments
 (0)