Skip to content

Commit

Permalink
Changed Unmanage flow - incorporated review comments
Browse files Browse the repository at this point in the history
tendrl-bug-id: #841
Signed-off-by: Anmol Sachan <anmol13694@gmail.com>
  • Loading branch information
anmolsachan committed May 28, 2018
1 parent 377ed71 commit ae376ce
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 201 deletions.
Empty file added __init__.py
Empty file.
55 changes: 55 additions & 0 deletions tendrl/commons/flows/service_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
""" This utility can be used to handle(start, stop, etc.) tendrl services.
"""

from tendrl.commons.utils import cmd_utils
from tendrl.commons.utils import log_utils as logger


def stop_service(services, params):
for service in services:
srv = NS.tendrl.objects.Service(service=service)
if not srv.running:
logger.log(
"debug",
NS.publisher_id,
{
"message": "%s not running on "
"%s" % (service, NS.node_context.fqdn)
},
job_id=params['job_id'],
flow_id=params['flow_id'],
)
continue

_cmd_str = "systemctl stop %s" % service
cmd = cmd_utils.Command(_cmd_str)
err, out, rc = cmd.run()
if err:
logger.log(
"error",
NS.publisher_id,
{
"message": "Could not stop %s"
" service. Error: %s" % (service, err)
},
job_id=params['job_id'],
flow_id=params['flow_id'],
)
return False

_cmd_str = "systemctl disable %s" % service
cmd = cmd_utils.Command(_cmd_str)
err, out, rc = cmd.run()
if err:
logger.log(
"error",
NS.publisher_id,
{
"message": "Could not disable %s"
" service. Error: %s" % (service, err)
},
job_id=params['job_id'],
flow_id=params['flow_id'],
)
return False
return True
209 changes: 108 additions & 101 deletions tendrl/commons/flows/unmanage_cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,29 @@ def run(self):
raise FlowExecutionFailedError(
"Cluster is already in un-managed state"
)
if (_cluster.status is not None and
_cluster.status != "" and
_cluster.current_job['status'] == 'in_progress' and
_cluster.status in
["importing", "unmanaging", "expanding"]):
# Checking if the cluster is being unmanaged by the parent job
_job = NS.tendrl.objects.Job(job_id=self.job_id).load()
if 'parent' in _job.payload:
if _job.payload['parent'] != _cluster.locked_by['job_id']:

try:
# To be executed on parent job accepting node.
if 'Node[]' not in self.parameters:
# Checking if another job is in progress or not on the cluster
if _cluster.current_job['status'] == 'in_progress' and \
(
'job_id' in _cluster.locked_by and
_cluster.locked_by['job_id'] != ""
) and (
_cluster.status in ['importing', 'unmanaging',
'expanding']
):
raise FlowExecutionFailedError(
"Another job in progress for cluster, "
"please wait till the job finishes (job_id: %s) "
"(integration_id: %s) " % (
"Another job in progress for cluster."
" Please wait till the job finishes "
"(job_id: %s) (integration_id: %s) " %
(
_cluster.current_job['job_id'],
_cluster.integration_id
)
)
else:
raise FlowExecutionFailedError(
"Another job in progress for cluster, please wait till "
"the job finishes (job_id: %s) (integration_id: %s) " % (
_cluster.current_job['job_id'],
_cluster.integration_id
)
)

try:
if 'Node[]' not in self.parameters:
# Lock the cluster if no other job is being executed.
_lock_details = {
'node_id': NS.node_context.node_id,
'fqdn': NS.node_context.fqdn,
Expand Down Expand Up @@ -125,89 +120,21 @@ def run(self):
continue

# Deleting cluster details
etcd_keys_to_delete = []
etcd_keys_to_delete.append(
"/clusters/%s/nodes" % integration_id
logger.log(
"info",
NS.publisher_id,
{
"message": "Deleting cluster details from etcd."
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
etcd_keys_to_delete.append(
"/clusters/%s/Bricks" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/Volumes" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/GlobalDetails" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/TendrlContext" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/Utilization" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/raw_map" % integration_id
)
etcd_keys_to_delete.append(
"/alerting/clusters/%s" % integration_id
)
nodes = etcd_utils.read(
"/clusters/%s/nodes" % integration_id
)
node_ids = []
for node in nodes.leaves:
node_id = node.key.split("/")[-1]
node_ids.append(node_id)
key = "/alerting/nodes/%s" % node_id
etcd_keys_to_delete.append(
key
)
try:
# delete node alerts from /alerting/alerts
node_alerts = etcd_utils.read(key)
for node_alert in node_alerts.leaves:
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % node_alert.key.split(
"/")[-1]
)
except etcd.EtcdKeyNotFound:
# No node alerts, continue
pass

# Find the alerting/alerts entries to be deleted
try:
cluster_alert_ids = etcd_utils.read(
"/alerting/clusters/%s" % integration_id
)
for entry in cluster_alert_ids.leaves:
ca_id = entry.key.split("/")[-1]
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % ca_id
)
except etcd.EtcdKeyNotFound:
# No cluster alerts, continue
pass
self.delete_cluster_details(integration_id)

# Remove the cluster details
for key in list(set(etcd_keys_to_delete)):
try:
etcd_utils.delete(key, recursive=True)
except etcd.EtcdKeyNotFound:
logger.log(
"debug",
NS.publisher_id,
{
"message": "%s key not found for deletion" %
key
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue
# remove short name
# Marking job as finished
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
_cluster.short_name = ""
_cluster.status = ""
_cluster.is_managed = "no"
_cluster.locked_by = {}
Expand Down Expand Up @@ -240,3 +167,83 @@ def run(self):
_cluster.errors = _errors
_cluster.save()
raise ex

def delete_cluster_details(self, integration_id):
etcd_keys_to_delete = []
etcd_keys_to_delete.append(
"/clusters/%s/nodes" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/Bricks" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/Volumes" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/GlobalDetails" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/TendrlContext" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/Utilization" % integration_id
)
etcd_keys_to_delete.append(
"/clusters/%s/raw_map" % integration_id
)
etcd_keys_to_delete.append(
"/alerting/clusters/%s" % integration_id
)
nodes = etcd_utils.read(
"/clusters/%s/nodes" % integration_id
)
node_ids = []
for node in nodes.leaves:
node_id = node.key.split("/")[-1]
node_ids.append(node_id)
key = "/alerting/nodes/%s" % node_id
etcd_keys_to_delete.append(
key
)
try:
# delete node alerts from /alerting/alerts
node_alerts = etcd_utils.read(key)
for node_alert in node_alerts.leaves:
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % node_alert.key.split(
"/")[-1]
)
except etcd.EtcdKeyNotFound:
# No node alerts, continue
pass

# Find the alerting/alerts entries to be deleted
try:
cluster_alert_ids = etcd_utils.read(
"/alerting/clusters/%s" % integration_id
)
for entry in cluster_alert_ids.leaves:
ca_id = entry.key.split("/")[-1]
etcd_keys_to_delete.append(
"/alerting/alerts/%s" % ca_id
)
except etcd.EtcdKeyNotFound:
# No cluster alerts, continue
pass

# Remove the cluster details
for key in list(set(etcd_keys_to_delete)):
try:
etcd_utils.delete(key, recursive=True)
except etcd.EtcdKeyNotFound:
logger.log(
"debug",
NS.publisher_id,
{
"message": "%s key not found for deletion" %
key
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def run(self):
flow_id=self.parameters['flow_id']
)
loop_count = 0
# Wait for (no of nodes) * 6 minutes for unmanage to complete
wait_count = (len(node_list) - 1) * 36
# Wait for (no of nodes) * 1 minute for unmanage to complete
wait_count = (len(node_list) - 1) * 6
while True:
parent_job = NS.tendrl.objects.Job(
job_id=self.parameters['job_id']
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from tendrl.commons.flows import service_utils
from tendrl.commons import objects
from tendrl.commons.utils import cmd_utils
from tendrl.commons.utils import log_utils as logger


class StopIntegrationServices(objects.BaseAtom):
Expand All @@ -9,50 +8,4 @@ def __init__(self, *args, **kwargs):

def run(self):
services = ["tendrl-gluster-integration"]
for service in services:
srv = NS.tendrl.objects.Service(service=service)
if not srv.running:
logger.log(
"debug",
NS.publisher_id,
{
"message": "%s not running on "
"%s" % (service, NS.node_context.fqdn)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue

_cmd_str = "systemctl stop %s" % service
cmd = cmd_utils.Command(_cmd_str)
err, out, rc = cmd.run()
if err:
logger.log(
"error",
NS.publisher_id,
{
"message": "Could not stop %s"
" service. Error: %s" % (service, err)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
return False

_cmd_str = "systemctl disable %s" % service
cmd = cmd_utils.Command(_cmd_str)
err, out, rc = cmd.run()
if err:
logger.log(
"error",
NS.publisher_id,
{
"message": "Could not disable %s"
" service. Error: %s" % (service, err)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
return False
return True
return service_utils.stop_service(services, self.parameters)
Loading

0 comments on commit ae376ce

Please sign in to comment.