diff --git a/control/cli.py b/control/cli.py index b61e5f934..644048587 100644 --- a/control/cli.py +++ b/control/cli.py @@ -187,8 +187,8 @@ def delete_bdev(self, args): argument("-n", "--subnqn", help="Subsystem NQN", required=True), argument("-s", "--serial", help="Serial number", required=False), argument("-m", "--max-namespaces", help="Maximum number of namespaces", type=int, default=0, required=False), - argument("-a", "--ana-reporting", help="Enable ANA reporting", type=bool, default=False, required=False), - argument("-t", "--enable-ha", help="Enable automatic HA" , type=bool, default=False, required=False), + argument("-a", "--ana-reporting", help="Enable ANA reporting", action='store_true', required=False), + argument("-t", "--enable-ha", help="Enable automatic HA", action='store_true', required=False), ]) def create_subsystem(self, args): diff --git a/control/grpc.py b/control/grpc.py index 0f452d62f..49722591c 100644 --- a/control/grpc.py +++ b/control/grpc.py @@ -23,6 +23,7 @@ from google.protobuf import json_format from .proto import gateway_pb2 as pb2 from .proto import gateway_pb2_grpc as pb2_grpc +from typing import Dict MAX_ANA_GROUPS = 4 @@ -126,6 +127,10 @@ def _alloc_cluster(self) -> str: ) return name + def get_state(self) -> Dict[str, str]: + local_state_dict = self.gateway_state.local.get_state() + return local_state_dict + def create_bdev_safe(self, request, context=None): """Creates a bdev from an RBD image.""" @@ -171,10 +176,9 @@ def create_bdev(self, request, context=None): with self.rpc_lock: return self.create_bdev_safe(request, context) - def find_bdev_namespaces(self, bdev_name): + def get_bdev_namespaces(self, bdev_name) -> list: ns_list = [] - local_state_dict = self.gateway_state.local.get_state() - local_state_keys = local_state_dict.keys() + local_state_dict = self.get_state() for key, val in local_state_dict.items(): if key.startswith(self.gateway_state.local.NAMESPACE_PREFIX): try: @@ -201,9 +205,11 @@ def delete_bdev_safe(self, request, context=None): """Deletes a bdev.""" self.logger.info(f"Received request to delete bdev {request.bdev_name}") - ns_list = self.find_bdev_namespaces(request.bdev_name) + ns_list = [] + if context: + ns_list = self.get_bdev_namespaces(request.bdev_name) for namespace in ns_list: - # We found a namespace still using this bdev. If --force was used we will try to remove this namespace. + # We found a namespace still using this bdev. If --force was used we will try to remove the namespace from OMAP. # Otherwise fail with EBUSY try: ns_nsid = namespace["nsid"] @@ -215,10 +221,8 @@ def delete_bdev_safe(self, request, context=None): if request.force: self.logger.info(f"Will remove namespace {ns_nsid} from {ns_nqn} as it is using bdev {request.bdev_name}") try: - req_rm_ns = pb2.remove_namespace_req(subsystem_nqn=ns_nqn, nsid=ns_nsid) - # We already hold the lock, so call the safe version, do not try to lock again - ret = self.remove_namespace_safe(req_rm_ns, context) - self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}: {ret.status}") + self.gateway_state.remove_namespace(ns_nqn, str(ns_nsid)) + self.logger.info(f"Removed namespace {ns_nsid} from {ns_nqn}") except Exception as ex: self.logger.error(f"Error removing namespace {ns_nsid} from {ns_nqn}, will delete bdev {request.bdev_name} anyway: {ex}") pass @@ -412,10 +416,34 @@ def remove_namespace(self, request, context=None): with self.rpc_lock: return self.remove_namespace_safe(request, context) + def matching_host_exist(self, subsys_nqn, host_nqn) -> bool: + host_key = "_".join([self.gateway_state.local.HOST_PREFIX + subsys_nqn, host_nqn]) + if self.get_state().get(host_key): + return True + else: + return False + def add_host_safe(self, request, context=None): """Adds a host to a subsystem.""" try: + host_already_exist = self.matching_host_exist(request.subsystem_nqn, request.host_nqn) + if host_already_exist: + if request.host_nqn == "*": + self.logger.error(f"All hosts already allowed to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_allow_any_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"All hosts already allowed to {request.subsystem_nqn}"} + else: + self.logger.error(f"Host {request.host_nqn} already added to {request.subsystem_nqn}") + req = {"subsystem_nqn": request.subsystem_nqn, "host_nqn": request.host_nqn, + "method": "nvmf_subsystem_add_host", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"Host {request.host_nqn} already added to {request.subsystem_nqn}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) if request.host_nqn == "*": # Allow any host access to subsystem self.logger.info(f"Received request to allow any host to" f" {request.subsystem_nqn}") @@ -506,6 +534,13 @@ def remove_host(self, request, context=None): with self.rpc_lock: return self.remove_host_safe(request, context) + def matching_listener_exist(self, nqn, gw_name, trtype, traddr, trsvcid) -> bool: + listener_key = "_".join([self.gateway_state.local.LISTENER_PREFIX + nqn, gw_name, trtype, traddr, trsvcid]) + if self.get_state().get(listener_key): + return True + else: + return False + def create_listener_safe(self, request, context=None): """Creates a listener for a subsystem at a given IP/Port.""" ret = True @@ -514,6 +549,20 @@ def create_listener_safe(self, request, context=None): f" {request.traddr}:{request.trsvcid}.") try: if request.gateway_name == self.gateway_name: + listener_already_exist = self.matching_listener_exist( + request.nqn, request.gateway_name, request.trtype, request.traddr, request.trsvcid) + if listener_already_exist: + self.logger.error(f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}") + req = {"nqn": request.nqn, "trtype": request.trtype, "traddr": request.traddr, + "gateway_name": request.gateway_name, + "trsvcid": request.trsvcid, "adrfam": request.adrfam, + "method": "nvmf_subsystem_add_listener", "req_id": 0} + ret = {"code": -errno.EEXIST, "message": f"{request.nqn} already listens on address {request.traddr} port {request.trsvcid}"} + msg = "\n".join(["request:", "%s" % json.dumps(req, indent=2), + "Got JSON-RPC error response", + "response:", + json.dumps(ret, indent=2)]) + raise Exception(msg) ret = rpc_nvmf.nvmf_subsystem_add_listener( self.spdk_rpc_client, nqn=request.nqn, @@ -533,15 +582,21 @@ def create_listener_safe(self, request, context=None): context.set_details(f"{ex}") return pb2.req_status() - state = self.gateway_state.omap.get_state() - for key,val in state.items(): - if (key.startswith(self.gateway_state.omap.SUBSYSTEM_PREFIX + request.nqn)): - self.logger.debug(f"values of key: {key} val: {val} \n") - req = json_format.Parse(val, pb2.create_subsystem_req()) - self.logger.info(f" enable_ha :{req.enable_ha} \n") - break + state = self.get_state() + req = None + subsys = state.get(self.gateway_state.local.SUBSYSTEM_PREFIX + request.nqn) + if subsys: + self.logger.debug(f"value of sub-system: {subsys}") + try: + req = json_format.Parse(subsys, pb2.create_subsystem_req()) + self.logger.info(f"enable_ha: {req.enable_ha}") + except Exception: + self.logger.error(f"Got exception trying to parse subsystem: {ex}") + pass + else: + self.logger.info(f"No sub-system for {request.nqn}") - if req.enable_ha: + if req and req.enable_ha: for x in range (MAX_ANA_GROUPS): try: ret = rpc_nvmf.nvmf_subsystem_listener_set_ana_state( diff --git a/tests/test_cli.py b/tests/test_cli.py index a8753b074..d883277c1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -108,7 +108,7 @@ def test_create_bdev_ana(self, caplog, gateway): def test_create_subsystem_ana(self, caplog, gateway): - cli(["create_subsystem", "-n", subsystem, "-a", "true", "-t", "true"]) + cli(["create_subsystem", "-n", subsystem, "-a", "-t"]) assert "Failed to create" not in caplog.text cli(["get_subsystems"]) assert serial not in caplog.text