From 3d5e9ebfdfb0cde1cf3e360d79c6ce58ddc4fd8b Mon Sep 17 00:00:00 2001 From: arttii Date: Tue, 28 Feb 2017 09:56:44 +0100 Subject: [PATCH] need a redo --- docker-compose.yml | 15 +++- mentor/messages.py | 145 ++++++++++++++++++--------------- mentor/scheduler.py | 6 +- mentor/tests/test_scheduler.py | 2 +- 4 files changed, 97 insertions(+), 71 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 59cf667..1491af1 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,12 +50,21 @@ services: # MESOS_IMAGE_PROVISIONER_BACKEND=aufs ## To enable docker containerizer too` - MESOS_CONTAINERIZERS=mesos,docker + - MESOS_SWITCH_USER=0 volumes: - /sys/fs/cgroup:/sys/fs/cgroup # ## To cache docker downloaded images - # # /tmp/mesos/store/docker:/tmp/mesos/store/docker + - /tmp/mesos/store/docker:/tmp/mesos/store/docker # ## To enable docker containerizer - - /usr/bin/docker:/usr/bin/docker + - /usr/bin/docker:/usr/local/bin/docker - /var/run/docker.sock:/var/run/docker.sock + - /cgroup:/cgroup + - /sys:/sys + privileged: true - \ No newline at end of file +# marathon: +# network_mode: "host" +# image: mesosphere/marathon:v1.3.7 +# command: --master zk://localhost:2181/mesos --zk zk://localhost:2181/marathon +# restart: "always" +# \ No newline at end of file diff --git a/mentor/messages.py b/mentor/messages.py index fb3a3c8..3ad9984 100644 --- a/mentor/messages.py +++ b/mentor/messages.py @@ -6,31 +6,25 @@ import logging from uuid import uuid4 from mentos.utils import decode_data,encode_data +from functools import wraps +from six import iteritems,iterkeys,get_function_code + log = logging.getLogger(__name__) logging.getLogger().setLevel(logging.DEBUG) -# u('string') replaces the forwards-incompatible u'string' -if six.PY3: - def u(string): - return string -else: - import codecs - - def u(string): - return codecs.unicode_escape_decode(string)[0] - -# dict.iteritems(), dict.iterkeys() is also incompatible -if six.PY3: - iteritems = dict.items - iterkeys = dict.keys -else: - iteritems = dict.iteritems - iterkeys = dict.iterkeys - -def bunchify(x): + + + +def decode_message(x): """ Recursively transforms a dictionary into a Message via copy. """ + if isinstance(x, dict): + return Message((k, decode_message(v)) for k, v in iteritems(x)) + elif isinstance(x, (list, tuple)): + return type(x)(decode_message(v) for v in x) + else: + return x class Message(dict): @@ -109,6 +103,10 @@ def __repr__(self): def __dir__(self): return self.keys() + @staticmethod + def fromDict(d): + return decode_message(d) + def Cpus(value): return Message({'name': 'cpus', 'role': '*', 'scalar': Message({'value':value}), 'type': 'SCALAR'}) @@ -236,7 +234,9 @@ def ports(self): class TaskInfo(ResourceMixin, Message): - pass + @staticmethod + def fromDict(d): + return TaskInfo(**decode_message(d)) @@ -255,25 +255,12 @@ def agent_id(self): except KeyError: return self["slave_id"] + @staticmethod + def fromDict(d): + return Offer(**decode_message(d)) -class PythonTaskStatus(Message): - - def __init__(self, data=None, **kwargs): - super(PythonTaskStatus, self).__init__(**kwargs) - self.labels = Message(labels=Message(key='python')) - self.data = encode_data(cloudpickle.dumps(data)) - - @property - def result(self): - return cloudpickle.loads(decode_data(self["data"])) - - @property - def exception(self): - try: - return remote_exception(*self.data) - except: - return None +class TaskStatus(Message): def is_staging(self): return self.state == 'TASK_STAGING' @@ -300,6 +287,33 @@ def has_failed(self): def has_terminated(self): return self.has_succeeded() or self.has_failed() + +class PythonTaskStatus(TaskStatus): + + def __init__(self, data=None, **kwargs): + + self.labels = Message(labels=[Message(key='python')]) + self.data = encode_data(cloudpickle.dumps(kwargs.get("data",None))) + + super(PythonTaskStatus, self).__init__(**kwargs) + + @property + def result(self): + return cloudpickle.loads(decode_data(self["data"])) + + @property + def exception(self): + try: + return remote_exception(*self.result) + except: + return None + + + + @staticmethod + def fromDict(d): + return PythonTaskStatus(**decode_message(d)) + # TODO create custom messages per executor class PythonTask(TaskInfo): @@ -316,26 +330,12 @@ def __init__(self, fn=None, args=[], kwargs={}, self.retries = retries self.attempt = 1 - self.labels = Message(labels=Message(key='python')) + self.labels = Message(labels=[Message(key='python')]) def __call__(self): fn, args, kwargs = cloudpickle.loads(decode_data(self.data)) return fn(*args, **kwargs) - def retry(self, status): - if self.attempt < self.retries: - log.info('Task {} attempt #{} rescheduled due to failure with state ' - '{} and message {}'.format(self.task_id, self.attempt, - status.state, status.message)) - self.attempt += 1 - status.state = 'TASK_STAGING' - else: - log.error('Aborting due to task {} failed for {} attempts in state ' - '{} with message {}'.format(self.task_id, self.retries, - status.state, status.message)) - raise RuntimeError('Task {} failed with state {} and message {}'.format( - self.task_id, status.state, status.message)) - def update(self, status): self.on_update(status) if status.has_succeeded(): @@ -355,15 +355,6 @@ def on_fail(self, status): log.error('Task {} has been failed with state {} due to {}'.format( self.task_id.value, status.state, status.message)) - try: - raise status.exception # won't retry due to code error in PythonTaskStatus - except KeyError as e: - # not a code error, e.g. problem during deployment - self.retry(status) - else: - log.error('Aborting due to task {} failed with state {} and message ' - '{}'.format(self.task_id, status.state, status.message)) - class PythonExecutor(Message): @@ -373,7 +364,7 @@ def __init__(self, id, docker='satyr', force_pull=False, envs={}, uris=[], **kwds): super(PythonExecutor, self).__init__(**kwds) self.container = Message( - type='MESOS', + type='DOCKER', mesos=Message( image=Message(type='DOCKER', docker=Message(name=docker)))) @@ -385,7 +376,7 @@ def __init__(self, id, docker='satyr', force_pull=False, self.envs = envs self.uris = uris - self.labels = Message(labels=Message(key='python')) + self.labels = Message(labels=[Message(key='python')]) @property def docker(self): @@ -420,4 +411,30 @@ def envs(self): @envs.setter def envs(self, value): envs = [{'name': k, 'value': v} for k, v in value.items()] - self.command.environment = Message(variables=envs) \ No newline at end of file + self.command.environment = Message(variables=envs) + + + + +def transform(repeat=False,**trigger): + def decorator(func): + names = getattr(func,'_names',None) + if names is None: + code = get_function_code(func) + names = code.co_varnames[:code.co_argcount] + @wraps(func) + def decorated(*args,**kwargs): + all_args = kwargs.copy() + for n,v in zip(names,args): + all_args[n] = v + for k,v in trigger.items(): + if k in all_args: + if repeat: + all_args[k] = [v.fromDict(arg) for arg in all_args[k]] + else: + all_args[k] = v.fromDict(all_args[k]) + return func(**all_args) + decorated._names = names + return decorated + return decorator + diff --git a/mentor/scheduler.py b/mentor/scheduler.py index f47b156..39882e1 100644 --- a/mentor/scheduler.py +++ b/mentor/scheduler.py @@ -10,7 +10,7 @@ from mentor.constraint import pour from mentos.interface import Scheduler from mentor.placement import bfd -from mentor.messages import TaskInfo,Offer,Message +from mentor.messages import TaskInfo,Offer,Message,transform,TaskStatus,PythonTaskStatus from mentos.scheduler import SchedulerDriver from mentor.utils import Interruptable, timeout @@ -59,8 +59,8 @@ def submit(self, task): # supports commandtask, pythontask etc. assert isinstance(task, TaskInfo) self.tasks[task.task_id.value] = task + @transform(repeat=True,offers=Offer) def on_offers(self, driver, offers): - offers = [Offer(f) for f in offers] log.info('Received offers: {}'.format(sum(offers))) self.report() @@ -90,8 +90,8 @@ def on_offers(self, driver, offers): except Exception as ex: log.exception('Exception occured during task launch!') + @transform(repeat=False, status=PythonTaskStatus) def on_update(self, driver, status): - #status = Message(**status) task = self.tasks[status.task_id.value] log.info('Updated task {} state to {}'.format(status.task_id, status.state)) diff --git a/mentor/tests/test_scheduler.py b/mentor/tests/test_scheduler.py index 87e74e3..2dfcf7a 100644 --- a/mentor/tests/test_scheduler.py +++ b/mentor/tests/test_scheduler.py @@ -128,7 +128,7 @@ def test_scheduler_retries(mocker): #states = ['TASK_STARTING', 'TASK_FAILED', 'TASK_FAILED'] states = ['TASK_FAILED', 'TASK_FAILED', 'TASK_FAILED'] for ((args, kwargs), state) in zip(sched.on_update.call_args_list, states): - assert args[1].state == state + assert args[1]["state"] == state def test_scheduler_constraints(mocker):