From 3fdaba20b80c7eee56ecf63c365f1da47ee9625d Mon Sep 17 00:00:00 2001 From: Anmol Sachan Date: Mon, 28 May 2018 11:55:29 +0530 Subject: [PATCH] Changed Unmanage flow - incorporated review comments tendrl-bug-id: Tendrl/commons#841 bugzilla: 1583590 Signed-off-by: Anmol Sachan --- __init__.py | 0 tendrl/commons/flows/service_utils.py | 55 +++++ .../flows/unmanage_cluster/__init__.py | 209 +++++++++--------- .../atoms/unmanage_cluster/__init__.py | 4 +- .../stop_integration_services/__init__.py | 51 +---- .../stop_monitoring_services/__init__.py | 51 +---- 6 files changed, 169 insertions(+), 201 deletions(-) create mode 100644 __init__.py create mode 100644 tendrl/commons/flows/service_utils.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tendrl/commons/flows/service_utils.py b/tendrl/commons/flows/service_utils.py new file mode 100644 index 00000000..aff91936 --- /dev/null +++ b/tendrl/commons/flows/service_utils.py @@ -0,0 +1,55 @@ +""" This utility can be used to handle(start, stop, etc.) tendrl services. +""" + +from tendrl.commons.utils import cmd_utils +from tendrl.commons.utils import log_utils as logger + + +def stop_service(services, params): + for service in services: + srv = NS.tendrl.objects.Service(service=service) + if not srv.running: + logger.log( + "debug", + NS.publisher_id, + { + "message": "%s not running on " + "%s" % (service, NS.node_context.fqdn) + }, + job_id=params['job_id'], + flow_id=params['flow_id'], + ) + continue + + _cmd_str = "systemctl stop %s" % service + cmd = cmd_utils.Command(_cmd_str) + err, out, rc = cmd.run() + if err: + logger.log( + "error", + NS.publisher_id, + { + "message": "Could not stop %s" + " service. Error: %s" % (service, err) + }, + job_id=params['job_id'], + flow_id=params['flow_id'], + ) + return False + + _cmd_str = "systemctl disable %s" % service + cmd = cmd_utils.Command(_cmd_str) + err, out, rc = cmd.run() + if err: + logger.log( + "error", + NS.publisher_id, + { + "message": "Could not disable %s" + " service. Error: %s" % (service, err) + }, + job_id=params['job_id'], + flow_id=params['flow_id'], + ) + return False + return True diff --git a/tendrl/commons/flows/unmanage_cluster/__init__.py b/tendrl/commons/flows/unmanage_cluster/__init__.py index a4ee7b7b..ac131e9d 100644 --- a/tendrl/commons/flows/unmanage_cluster/__init__.py +++ b/tendrl/commons/flows/unmanage_cluster/__init__.py @@ -25,34 +25,29 @@ def run(self): raise FlowExecutionFailedError( "Cluster is already in un-managed state" ) - if (_cluster.status is not None and - _cluster.status != "" and - _cluster.current_job['status'] == 'in_progress' and - _cluster.status in - ["importing", "unmanaging", "expanding"]): - # Checking if the cluster is being unmanaged by the parent job - _job = NS.tendrl.objects.Job(job_id=self.job_id).load() - if 'parent' in _job.payload: - if _job.payload['parent'] != _cluster.locked_by['job_id']: + + try: + # To be executed on parent job accepting node. + if 'Node[]' not in self.parameters: + # Checking if another job is in progress or not on the cluster + if _cluster.current_job['status'] == 'in_progress' and \ + ( + 'job_id' in _cluster.locked_by and + _cluster.locked_by['job_id'] != "" + ) and ( + _cluster.status in ['importing', 'unmanaging', + 'expanding'] + ): raise FlowExecutionFailedError( - "Another job in progress for cluster, " - "please wait till the job finishes (job_id: %s) " - "(integration_id: %s) " % ( + "Another job in progress for cluster." + " Please wait till the job finishes " + "(job_id: %s) (integration_id: %s) " % + ( _cluster.current_job['job_id'], _cluster.integration_id ) ) - else: - raise FlowExecutionFailedError( - "Another job in progress for cluster, please wait till " - "the job finishes (job_id: %s) (integration_id: %s) " % ( - _cluster.current_job['job_id'], - _cluster.integration_id - ) - ) - - try: - if 'Node[]' not in self.parameters: + # Lock the cluster if no other job is being executed. _lock_details = { 'node_id': NS.node_context.node_id, 'fqdn': NS.node_context.fqdn, @@ -125,89 +120,21 @@ def run(self): continue # Deleting cluster details - etcd_keys_to_delete = [] - etcd_keys_to_delete.append( - "/clusters/%s/nodes" % integration_id + logger.log( + "info", + NS.publisher_id, + { + "message": "Deleting cluster details from etcd." + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'], ) - etcd_keys_to_delete.append( - "/clusters/%s/Bricks" % integration_id - ) - etcd_keys_to_delete.append( - "/clusters/%s/Volumes" % integration_id - ) - etcd_keys_to_delete.append( - "/clusters/%s/GlobalDetails" % integration_id - ) - etcd_keys_to_delete.append( - "/clusters/%s/TendrlContext" % integration_id - ) - etcd_keys_to_delete.append( - "/clusters/%s/Utilization" % integration_id - ) - etcd_keys_to_delete.append( - "/clusters/%s/raw_map" % integration_id - ) - etcd_keys_to_delete.append( - "/alerting/clusters/%s" % integration_id - ) - nodes = etcd_utils.read( - "/clusters/%s/nodes" % integration_id - ) - node_ids = [] - for node in nodes.leaves: - node_id = node.key.split("/")[-1] - node_ids.append(node_id) - key = "/alerting/nodes/%s" % node_id - etcd_keys_to_delete.append( - key - ) - try: - # delete node alerts from /alerting/alerts - node_alerts = etcd_utils.read(key) - for node_alert in node_alerts.leaves: - etcd_keys_to_delete.append( - "/alerting/alerts/%s" % node_alert.key.split( - "/")[-1] - ) - except etcd.EtcdKeyNotFound: - # No node alerts, continue - pass - - # Find the alerting/alerts entries to be deleted - try: - cluster_alert_ids = etcd_utils.read( - "/alerting/clusters/%s" % integration_id - ) - for entry in cluster_alert_ids.leaves: - ca_id = entry.key.split("/")[-1] - etcd_keys_to_delete.append( - "/alerting/alerts/%s" % ca_id - ) - except etcd.EtcdKeyNotFound: - # No cluster alerts, continue - pass + self.delete_cluster_details(integration_id) - # Remove the cluster details - for key in list(set(etcd_keys_to_delete)): - try: - etcd_utils.delete(key, recursive=True) - except etcd.EtcdKeyNotFound: - logger.log( - "debug", - NS.publisher_id, - { - "message": "%s key not found for deletion" % - key - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - continue - # remove short name + # Marking job as finished _cluster = NS.tendrl.objects.Cluster( integration_id=integration_id ).load() - _cluster.short_name = "" _cluster.status = "" _cluster.is_managed = "no" _cluster.locked_by = {} @@ -240,3 +167,83 @@ def run(self): _cluster.errors = _errors _cluster.save() raise ex + + def delete_cluster_details(self, integration_id): + etcd_keys_to_delete = [] + etcd_keys_to_delete.append( + "/clusters/%s/nodes" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/Bricks" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/Volumes" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/GlobalDetails" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/TendrlContext" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/Utilization" % integration_id + ) + etcd_keys_to_delete.append( + "/clusters/%s/raw_map" % integration_id + ) + etcd_keys_to_delete.append( + "/alerting/clusters/%s" % integration_id + ) + nodes = etcd_utils.read( + "/clusters/%s/nodes" % integration_id + ) + node_ids = [] + for node in nodes.leaves: + node_id = node.key.split("/")[-1] + node_ids.append(node_id) + key = "/alerting/nodes/%s" % node_id + etcd_keys_to_delete.append( + key + ) + try: + # delete node alerts from /alerting/alerts + node_alerts = etcd_utils.read(key) + for node_alert in node_alerts.leaves: + etcd_keys_to_delete.append( + "/alerting/alerts/%s" % node_alert.key.split( + "/")[-1] + ) + except etcd.EtcdKeyNotFound: + # No node alerts, continue + pass + + # Find the alerting/alerts entries to be deleted + try: + cluster_alert_ids = etcd_utils.read( + "/alerting/clusters/%s" % integration_id + ) + for entry in cluster_alert_ids.leaves: + ca_id = entry.key.split("/")[-1] + etcd_keys_to_delete.append( + "/alerting/alerts/%s" % ca_id + ) + except etcd.EtcdKeyNotFound: + # No cluster alerts, continue + pass + + # Remove the cluster details + for key in list(set(etcd_keys_to_delete)): + try: + etcd_utils.delete(key, recursive=True) + except etcd.EtcdKeyNotFound: + logger.log( + "debug", + NS.publisher_id, + { + "message": "%s key not found for deletion" % + key + }, + job_id=self.parameters['job_id'], + flow_id=self.parameters['flow_id'], + ) + continue diff --git a/tendrl/commons/objects/cluster/atoms/unmanage_cluster/__init__.py b/tendrl/commons/objects/cluster/atoms/unmanage_cluster/__init__.py index d5ec1bdf..d7e04b4c 100644 --- a/tendrl/commons/objects/cluster/atoms/unmanage_cluster/__init__.py +++ b/tendrl/commons/objects/cluster/atoms/unmanage_cluster/__init__.py @@ -87,8 +87,8 @@ def run(self): flow_id=self.parameters['flow_id'] ) loop_count = 0 - # Wait for (no of nodes) * 6 minutes for unmanage to complete - wait_count = (len(node_list) - 1) * 36 + # Wait for (no of nodes) * 1 minute for unmanage to complete + wait_count = (len(node_list) - 1) * 6 while True: parent_job = NS.tendrl.objects.Job( job_id=self.parameters['job_id'] diff --git a/tendrl/commons/objects/node/atoms/stop_integration_services/__init__.py b/tendrl/commons/objects/node/atoms/stop_integration_services/__init__.py index b0a79cdc..c8d19add 100644 --- a/tendrl/commons/objects/node/atoms/stop_integration_services/__init__.py +++ b/tendrl/commons/objects/node/atoms/stop_integration_services/__init__.py @@ -1,6 +1,5 @@ +from tendrl.commons.flows import service_utils from tendrl.commons import objects -from tendrl.commons.utils import cmd_utils -from tendrl.commons.utils import log_utils as logger class StopIntegrationServices(objects.BaseAtom): @@ -9,50 +8,4 @@ def __init__(self, *args, **kwargs): def run(self): services = ["tendrl-gluster-integration"] - for service in services: - srv = NS.tendrl.objects.Service(service=service) - if not srv.running: - logger.log( - "debug", - NS.publisher_id, - { - "message": "%s not running on " - "%s" % (service, NS.node_context.fqdn) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - continue - - _cmd_str = "systemctl stop %s" % service - cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() - if err: - logger.log( - "error", - NS.publisher_id, - { - "message": "Could not stop %s" - " service. Error: %s" % (service, err) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - return False - - _cmd_str = "systemctl disable %s" % service - cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() - if err: - logger.log( - "error", - NS.publisher_id, - { - "message": "Could not disable %s" - " service. Error: %s" % (service, err) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - return False - return True + return service_utils.stop_service(services, self.parameters) diff --git a/tendrl/commons/objects/node/atoms/stop_monitoring_services/__init__.py b/tendrl/commons/objects/node/atoms/stop_monitoring_services/__init__.py index d3114dca..b1f566b7 100644 --- a/tendrl/commons/objects/node/atoms/stop_monitoring_services/__init__.py +++ b/tendrl/commons/objects/node/atoms/stop_monitoring_services/__init__.py @@ -1,6 +1,5 @@ +from tendrl.commons.flows import service_utils from tendrl.commons import objects -from tendrl.commons.utils import cmd_utils -from tendrl.commons.utils import log_utils as logger class StopMonitoringServices(objects.BaseAtom): @@ -9,50 +8,4 @@ def __init__(self, *args, **kwargs): def run(self): services = ["collectd"] - for service in services: - srv = NS.tendrl.objects.Service(service=service) - if not srv.running: - logger.log( - "debug", - NS.publisher_id, - { - "message": "%s not running on " - "%s" % (service, NS.node_context.fqdn) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - continue - - _cmd_str = "systemctl stop %s" % service - cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() - if err: - logger.log( - "error", - NS.publisher_id, - { - "message": "Could not stop %s" - " service. Error: %s" % (service, err) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - return False - - _cmd_str = "systemctl disable %s" % service - cmd = cmd_utils.Command(_cmd_str) - err, out, rc = cmd.run() - if err: - logger.log( - "error", - NS.publisher_id, - { - "message": "Could not disable %s" - " service. Error: %s" % (service, err) - }, - job_id=self.parameters['job_id'], - flow_id=self.parameters['flow_id'], - ) - return False - return True + return service_utils.stop_service(services, self.parameters)