Skip to content

Commit

Permalink
Merge pull request Tendrl#1079 from GowthamShanmugam/1645221
Browse files Browse the repository at this point in the history
Un-manage should not fail when any one or few storage nodes are down
  • Loading branch information
shtripat authored May 15, 2019
2 parents 71d5727 + 15d6d7f commit e6f20cd
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 55 deletions.
4 changes: 2 additions & 2 deletions tendrl/commons/flows/unmanage_cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ def run(self):
raise FlowExecutionFailedError(
"Another job in progress for cluster."
" Please wait till the job finishes "
"(job_id: %s) (integration_id: %s) " %
"(job_id: %s) (cluster: %s) " %
(
_cluster.current_job['job_id'],
_cluster.integration_id
_cluster.short_name
)
)

Expand Down
2 changes: 1 addition & 1 deletion tendrl/commons/objects/cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def __init__(self, integration_id=None, short_name=None,
self.conf_overrides = conf_overrides
self.node_identifier = node_identifier
self.last_sync = last_sync
self.is_managed = is_managed
self.is_managed = is_managed or "no"
self.current_job = current_job
self.status = status
self.volume_profiling_flag = volume_profiling_flag
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import etcd
import json

from tendrl.commons import objects
from tendrl.commons.utils import etcd_utils
Expand Down Expand Up @@ -83,6 +84,45 @@ def run(self):
# No cluster alerts, continue
pass

try:
index_key = "/indexes/tags/tendrl/integration/%s" % integration_id
_node_ids = etcd_utils.read(
index_key
).value
_node_ids = json.loads(_node_ids)
for _node_id in _node_ids[:]:
node_obj = NS.tendrl.objects.NodeContext(
node_id=_node_id
).load()
# Remove cluster indexes for down node
if node_obj.status.lower() == "down":
_node_ids.remove(_node_id)
# Removing down node details
logger.log(
"warning",
NS.publisher_id,
{
"message": "Deleting down node %s details" %
node_obj.fqdn
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
etcd_keys_to_delete.append(
"/nodes/%s" % _node_id
)
etcd_utils.write(index_key, json.dumps(_node_ids))
except (
etcd.EtcdKeyNotFound,
ValueError,
TypeError,
AttributeError,
IndexError
):
# If index details not present then we don't need to stop
# un-manage flow, Because when node-agent work properly these
# details are populated again by the node sync
pass
# Remove the cluster details
for key in list(set(etcd_keys_to_delete)):
try:
Expand All @@ -105,5 +145,4 @@ def run(self):
).load()
cluster.short_name = ""
cluster.save()

return True
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ def __init__(self, *args, **kwargs):

def run(self):
integration_id = self.parameters['TendrlContext.integration_id']
logger.log(
"info",
NS.get("publisher_id", None),
{
"message": "Setting cluster %s is_managed to \"no\":" %
integration_id
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id']
)
try:
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()
logger.log(
"info",
NS.get("publisher_id", None),
{
"message": "Setting cluster %s is_managed to \"no\":" %
_cluster.short_name
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id']
)
_cluster.is_managed = "no"
_cluster.save()
except etcd.EtcdKeyNotFound:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,42 @@ def run(self):
_cluster = NS.tendrl.objects.Cluster(
integration_id=integration_id
).load()

try:
# Get the cluster nodes
nodes = etcd_utils.read("/clusters/%s/nodes" % integration_id)
child_job_ids = []
node_ids = []
node_hostnames = []
for node in nodes.leaves:
node_id = node.key.split("/")[-1]
node_ids.append(node_id)
node_obj = NS.tendrl.objects.NodeContext(
node_id=node.key.split("/")[-1]
).load()
if node_obj.status.lower() == "down":
logger.log(
"warning",
NS.publisher_id,
{
"message": "Skipping stop integration service job "
"creation for the node %s, Status of node is "
"down" % node_obj.fqdn
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue
node_hostnames.append(node_obj.fqdn)
# Create jobs on nodes for stoping services
_job_id = str(uuid.uuid4())
params = {
"Services[]": ["tendrl-gluster-integration"]
}
payload = {
"tags": ["tendrl/node_%s" % node_id],
"tags": ["tendrl/node_%s" % node_obj.node_id],
"run": "tendrl.objects.Node.flows.StopServices",
"status": "new",
"parameters": params,
"parent": self.parameters["job_id"],
"type": "node"
"type": "node",
"node_fqdn": node_obj.fqdn
}
NS.tendrl.objects.Job(
job_id=_job_id,
Expand All @@ -50,7 +65,7 @@ def run(self):
{
"message": "Stop tendrl services (job: %s) "
"on %s in cluster %s" %
(_job_id, node_id, _cluster.short_name)
(_job_id, node_obj.fqdn, _cluster.short_name)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
Expand All @@ -62,17 +77,6 @@ def run(self):
while True:
child_jobs_failed = []
if loop_count >= wait_count:
logger.log(
"error",
NS.publisher_id,
{
"message": "Stop service jobs on cluster(%s) not "
"yet complete on all nodes(%s). Timing out. "
% (_cluster.short_name, str(node_ids))
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
# Marking child jobs as failed which did not complete as
# the parent job has timed out. This has to be done
# explicitly because these jobs will still be processed
Expand All @@ -83,8 +87,34 @@ def run(self):
job_id=child_job_id
).load()
if child_job.status not in ["finished", "failed"]:
if child_job.status == "new":
logger.log(
"error",
NS.publisher_id,
{
"message": "Job %s not yet picked by "
"a node %s, Either node is down or "
"node-agent service is down" % (
child_job_id,
child_job.payload.get("node_fqdn")
)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
child_job.status = "failed"
child_job.save()
logger.log(
"error",
NS.publisher_id,
{
"message": "Stop service jobs on cluster(%s) not "
"yet complete on all nodes(%s). Timing out. "
% (_cluster.short_name, str(node_hostnames))
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
return False
time.sleep(5)
finished = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,38 @@ def run(self):
# Get the cluster nodes
nodes = etcd_utils.read("/clusters/%s/nodes" % integration_id)
child_job_ids = []
node_ids = []
node_hostnames = []
for node in nodes.leaves:
node_id = node.key.split("/")[-1]
node_ids.append(node_id)
node_obj = NS.tendrl.objects.NodeContext(
node_id=node.key.split("/")[-1]
).load()
if node_obj.status.lower() == "down":
logger.log(
"warning",
NS.publisher_id,
{
"message": "Skipping stop monitoring service job "
"creation for the node %s, Status of node is "
"down" % node_obj.fqdn
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
continue
node_hostnames.append(node_obj.fqdn)
# Create jobs on nodes for stoping services
_job_id = str(uuid.uuid4())
params = {
"Services[]": ["collectd"]
}
payload = {
"tags": ["tendrl/node_%s" % node_id],
"tags": ["tendrl/node_%s" % node_obj.node_id],
"run": "tendrl.objects.Node.flows.StopServices",
"status": "new",
"parameters": params,
"parent": self.parameters["job_id"],
"type": "node"
"type": "node",
"node_fqdn": node_obj.fqdn
}
NS.tendrl.objects.Job(
job_id=_job_id,
Expand All @@ -50,7 +66,7 @@ def run(self):
{
"message": "Stop tendrl services (job: %s) "
"on host %s in cluster %s" %
(_job_id, node_id, _cluster.short_name)
(_job_id, node_obj.fqdn, _cluster.short_name)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
Expand All @@ -62,17 +78,6 @@ def run(self):
while True:
child_jobs_failed = []
if loop_count >= wait_count:
logger.log(
"error",
NS.publisher_id,
{
"message": "Stop service jobs on cluster(%s) not "
"yet complete on all nodes(%s). Timing out. "
% (_cluster.short_name, str(node_ids))
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
# Marking child jobs as failed which did not complete as
# the parent job has timed out. This has to be done
# explicitly because these jobs will still be processed
Expand All @@ -83,8 +88,34 @@ def run(self):
job_id=child_job_id
).load()
if child_job.status not in ["finished", "failed"]:
if child_job.status == "new":
logger.log(
"error",
NS.publisher_id,
{
"message": "Job %s not yet picked by "
"a node %s, Either node is down or "
"node-agent service is down" % (
child_job_id,
child_job.payload.get("node_fqdn")
)
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
child_job.status = "failed"
child_job.save()
logger.log(
"error",
NS.publisher_id,
{
"message": "Stop service jobs on cluster(%s) not "
"yet complete on all nodes(%s). Timing out. "
% (_cluster.short_name, str(node_hostnames))
},
job_id=self.parameters['job_id'],
flow_id=self.parameters['flow_id'],
)
return False
time.sleep(5)
finished = True
Expand Down
13 changes: 12 additions & 1 deletion tendrl/commons/objects/definition/master.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ namespace.tendrl:
- Cluster.short_name
pre_run:
- tendrl.objects.Node.atoms.CheckRequiredServicesRunning
- tendrl.objects.GlusterPeer.atoms.GlusterServerPeerCheck
- tendrl.objects.Cluster.atoms.CheckClusterNodesUp
- tendrl.objects.Node.atoms.IsNodeTendrlManaged
- tendrl.objects.Cluster.atoms.ValidImportClusterParams
Expand Down Expand Up @@ -100,7 +101,6 @@ namespace.tendrl:
- Cluster.delete_telemetry_data
pre_run:
- tendrl.objects.Node.atoms.CheckRequiredServicesRunning
- tendrl.objects.Cluster.atoms.CheckClusterNodesUp
post_run:
- tendrl.objects.Cluster.atoms.IsClusterImportReady
run: tendrl.flows.UnmanageCluster
Expand Down Expand Up @@ -1204,6 +1204,17 @@ namespace.tendrl:
list: /clusters/{0}/nodes/{1}/alert_counters
help: "Cluster Alert Counter"
GlusterPeer:
atoms:
GlusterServerPeerCheck:
enabled: true
inputs:
mandatory:
- TendrlContext.integration_id
name: "Check all gluster peer nodes are detected by tendrl"
help: "Check all gluster peer nodes are detected by tendrl"
run: tendrl.objects.GlusterPeer.atoms.GlusterServerPeerCheck
type: check
uuid: e60bb309-677d-407c-a880-6e848d0d5534
enabled: true
attrs:
hostname:
Expand Down
Empty file.
Loading

0 comments on commit e6f20cd

Please sign in to comment.