Skip to content

Commit

Permalink
Merge pull request Tendrl#1083 from GowthamShanmugam/1602858
Browse files Browse the repository at this point in the history
Timeout only parent job when it is not processed for long time
  • Loading branch information
shtripat authored Apr 27, 2019
2 parents 7fcbe3b + 9455259 commit e30ac2c
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 73 deletions.
8 changes: 4 additions & 4 deletions tendrl/commons/flows/unmanage_cluster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ def run(self):
).load()
if _cluster.is_managed == "no":
if _cluster.current_job['job_name'] == self.__class__.__name__ \
and _cluster.current_job['status'] == 'finished':
raise FlowExecutionFailedError(
"Cluster is already in un-managed state"
)
and _cluster.current_job['status'] == 'finished':
raise FlowExecutionFailedError(
"Cluster is already in un-managed state"
)
if _cluster.current_job['status'] == 'in_progress' and \
(
'job_id' in _cluster.locked_by and
Expand Down
87 changes: 41 additions & 46 deletions tendrl/commons/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def run(self):
_job_sync_interval = 5
NS.node_context = NS.node_context.load()
NS.tendrl_context = NS.tendrl_context.load()
if "tendrl/monitor" not in NS.node_context.tags:
if "tendrl/monitor" not in NS.node_context.tags and \
"tendrl/integration/monitoring" not in NS.node_context.tags:
# Storage node will enter in this block
if NS.tendrl_context.integration_id is None or \
NS.node_context.fqdn is None:
time.sleep(_job_sync_interval)
Expand Down Expand Up @@ -94,11 +96,10 @@ def process_job(jid):
pass

# tendrl-node-agent tagged as tendrl/monitor will ensure
# >10 min old "new" jobs are timed out and marked as
# "failed" (the parent job of these jobs will also be
# marked as "failed")
if "tendrl/monitor" in NS.node_context.tags and \
_timeout == "yes" and job.status == "new":
# >10 min old "new" parent jobs are timed out and marked
# as "failed"
if "tendrl/monitor" in NS.node_context.tags and _timeout == "yes" and \
job.status == "new" and job.payload.get('parent') is None:
_valid_until = job.valid_until

if _valid_until:
Expand All @@ -110,42 +111,31 @@ def process_job(jid):
# Job has "new" status since 10 minutes,
# mark status as "failed" and Job.error =
# "Timed out"
try:
job = job.load()
if job.status == "new":
job.status = "failed"
job.save()
except etcd.EtcdCompareFailed:
pass
else:
job = NS.tendrl.objects.Job(job_id=jid).load()
if job.status == "new":
_msg = str("Timed-out (>10min as 'new')")
job.errors = _msg
job.save()
if job.payload.get('parent') is None:
integration_id = NS.tendrl_context.integration_id
alert_utils.alert_job_status(
"failed",
"Job timed out (job_id: %s)" % jid,
integration_id=integration_id or
job.payload['parameters'].get(
'TendrlContext.integration_id'
),
cluster_name=NS.tendrl_context.cluster_name or
job.payload['parameters'].get(
'TendrlContext.cluster_name'
)
)
return
_msg = str("Timed-out (>10min as 'new')")
job.errors = _msg
job.status = "failed"
job.save()
integration_id = NS.tendrl_context.integration_id
alert_utils.alert_job_status(
"failed",
"Job timed out (job_id: %s)" % jid,
integration_id=integration_id or
job.payload['parameters'].get(
'TendrlContext.integration_id'
),
cluster_name=NS.tendrl_context.cluster_name or
job.payload['parameters'].get(
'TendrlContext.cluster_name'
)
)
return
else:
_now_plus_10 = time_utils.now() + datetime.timedelta(minutes=10)
_epoch_start = datetime.datetime(1970, 1, 1).replace(tzinfo=utc)

_now_plus_10_epoch = (_now_plus_10 -
_epoch_start).total_seconds()
time.sleep(7)
job = job.load()
job = NS.tendrl.objects.Job(job_id=jid).load()
if job.status == "new":
# To avoid server and storage node do save same time
job.valid_until = int(_now_plus_10_epoch)
Expand Down Expand Up @@ -190,7 +180,7 @@ def process_job(jid):
lock_info = dict(node_id=NS.node_context.node_id,
fqdn=NS.node_context.fqdn,
type=NS.type)
job = job.load()
job = NS.tendrl.objects.Job(job_id=jid).load()
job.locked_by = lock_info
job.status = "processing"
job.save(ttl=DEFAULT_JOB_TTL)
Expand All @@ -209,8 +199,8 @@ def process_job(jid):
obj_name, flow_name)
else:
runnable_flow = current_ns.ns.get_flow(flow_name)

job = job.load()
time.sleep(3)
job = NS.tendrl.objects.Job(job_id=jid).load()
lock_info = dict(node_id=NS.node_context.node_id,
fqdn=NS.node_context.fqdn,
type=NS.type)
Expand All @@ -219,28 +209,33 @@ def process_job(jid):

the_flow = runnable_flow(parameters=job.payload[
'parameters'], job_id=job.job_id)
# Tendrl server does not have fqdn in node_context
logger.log(
"info",
NS.publisher_id,
{"message": "Starting Job %s" %
job.job_id},
{"message": "Starting %s Job: %s on %s" %
(job.payload['run'].split('.')[-1],
job.job_id,
NS.node_context.fqdn or "server")},
job_id=job.job_id,
flow_id=the_flow.parameters['flow_id']
)

logger.log(
"info",
NS.publisher_id,
{"message": "Running %s" %
job.payload['run'].split('.')[-1]},
{"message": "Running %s job: %s on %s" %
(job.payload['run'].split('.')[-1],
job.job_id,
NS.node_context.fqdn or "server")},
job_id=job.job_id,
flow_id=the_flow.parameters['flow_id']
)

the_flow.run()

try:
job = job.load()
job = NS.tendrl.objects.Job(job_id=jid).load()
job.status = "finished"
job.save()
except etcd.EtcdCompareFailed:
Expand Down Expand Up @@ -305,7 +300,7 @@ def process_job(jid):
)

try:
job = job.load()
job = NS.tendrl.objects.Job(job_id=jid).load()
job.status = "failed"
job.save()
except etcd.EtcdCompareFailed:
Expand All @@ -314,7 +309,7 @@ def process_job(jid):
"job status invalid"
raise FlowExecutionFailedError(_msg)
else:
job = job.load()
job = NS.tendrl.objects.Job(job_id=jid).load()
job.errors = _trace
if job.payload.get('parent') is None:
alert_utils.alert_job_status(
Expand Down
24 changes: 12 additions & 12 deletions tendrl/commons/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from inspect import getframeinfo
from inspect import stack
import json
from ruamel import yaml
import sys
from tendrl.commons.utils.time_utils import now # flake8:noqa
import traceback

is_collectd_imported = False
if '/usr/lib64/collectd' in sys.path:
is_collectd_imported = True
Expand All @@ -16,11 +20,6 @@
# An appropriate solution needs to be carved out


from tendrl.commons.utils.time_utils import now # flake8:noqa

import traceback
from ruamel import yaml

class Message(object):
"""At the time of message object intialization
Expand Down Expand Up @@ -96,13 +95,13 @@ def validate(self):
# Check payload type is dict
if type(self.payload) != dict:
return False

# Check mandatory fields
if (self.priority not in priorities or
self.node_id is None or
"message" not in self.payload):
return False

if self.job_id is not None:
if self.flow_id is None:
return False
Expand All @@ -119,7 +118,7 @@ def __init__(self, priority, publisher, payload):
# skip last function call
# This will give traceback upto before try function call
formatted_stack = traceback.extract_stack()[:-2]
_, _ , exc_traceback = sys.exc_info()
_, _, exc_traceback = sys.exc_info()
# This will give traceback inside try block
recent_call = traceback.extract_tb(exc_traceback)
caller = getframeinfo(stack()[1][0])
Expand Down Expand Up @@ -149,13 +148,14 @@ def format_exception(self, formatted_stack):
tb = []
for item in formatted_stack:
file, line, function, statement = item
tb.append({"file" : file,
"line" : line,
"function" : function,
"statement" : statement
tb.append({"file": file,
"line": line,
"function": function,
"statement": statement
})
return tb


# To serialize when json contains old message object
def serialize_message(obj):
if isinstance(obj, Message):
Expand Down
2 changes: 1 addition & 1 deletion tendrl/commons/tests/manager/test_manager_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def job_consume():
return "Thread Consuming Job"
monkeypatch.setattr(jobs, 'JobConsumerThread', job_consume)
manager = Manager("test")
assert manager._sds_sync_thread is "test"
assert manager._sds_sync_thread == "test"
assert manager._message_handler_thread is None
assert manager._job_consumer_thread == "Thread Consuming Job"

Expand Down
2 changes: 1 addition & 1 deletion tendrl/commons/tests/objects/test_objects_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def test_save():
with patch.object(Client, "write", return_value=True):
with patch.object(Client, "read",
return_value=maps.NamedDict(value="")):
obj.save(True)
obj.save(True)
with patch.object(Client, "write", return_value=True):
with patch.object(objects.BaseObject, "_hash",
return_value=None):
Expand Down
8 changes: 4 additions & 4 deletions tendrl/commons/tests/utils/test_ansible_module_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ def test_AnsibleExecutableGenerationFailed_constructor():
ansible_obj = AnsibleExecutableGenerationFailed()
assert ansible_obj.message == "Executabe could not be generated for " \
"module None . Error: None"
ansible_obj = AnsibleExecutableGenerationFailed(module_path="Test\path",
ansible_obj = AnsibleExecutableGenerationFailed(module_path="Test\\path",
arguments="temp_args",
err="No Error")
assert ansible_obj.message == "Executabe could not be generated for " \
"module Test\path . Error: No Error"
"module Test\\path . Error: No Error"


def test_AnsibleModuleNotFound_constructor():
ansible_obj = AnsibleModuleNotFound()
assert ansible_obj.message == "Ansible module None not found"
ansible_obj = AnsibleModuleNotFound(module_path="Test\path")
assert ansible_obj.message == "Ansible module Test\path not found"
ansible_obj = AnsibleModuleNotFound(module_path="Test\\path")
assert ansible_obj.message == "Ansible module Test\\path not found"


@mock.patch('tendrl.commons.event.Event.__init__',
Expand Down
8 changes: 4 additions & 4 deletions tendrl/commons/utils/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def log(log_priority, publisher_id, log_payload, job_id=None,
)
)
except Exception:
if log_priority.lower() == "error":
sys.stderr.write(log_payload.get("message", "") + "\n")
else:
sys.stdout.write(log_payload.get("message", "") + "\n")
if log_priority.lower() == "error":
sys.stderr.write(log_payload.get("message", "") + "\n")
else:
sys.stdout.write(log_payload.get("message", "") + "\n")
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ commands = python check_commit_msg.py
# E123, E125 skipped as they are invalid PEP-8.
# see: http://flake8.readthedocs.io/en/latest/config.html#settings
show-source = True
ignore = E123,E125,W504
ignore = E123,E125,W504,E402
builtins = _, NS
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build

Expand Down

0 comments on commit e30ac2c

Please sign in to comment.