Skip to content

Commit

Permalink
need a redo
Browse files Browse the repository at this point in the history
  • Loading branch information
arttii committed Feb 28, 2017
1 parent e8a9fa1 commit 3d5e9eb
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 71 deletions.
15 changes: 12 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,21 @@ services:
# MESOS_IMAGE_PROVISIONER_BACKEND=aufs
## To enable docker containerizer too`
- MESOS_CONTAINERIZERS=mesos,docker
- MESOS_SWITCH_USER=0
volumes:
- /sys/fs/cgroup:/sys/fs/cgroup
# ## To cache docker downloaded images
# # /tmp/mesos/store/docker:/tmp/mesos/store/docker
- /tmp/mesos/store/docker:/tmp/mesos/store/docker
# ## To enable docker containerizer
- /usr/bin/docker:/usr/bin/docker
- /usr/bin/docker:/usr/local/bin/docker
- /var/run/docker.sock:/var/run/docker.sock
- /cgroup:/cgroup
- /sys:/sys

privileged: true

# marathon:
# network_mode: "host"
# image: mesosphere/marathon:v1.3.7
# command: --master zk://localhost:2181/mesos --zk zk://localhost:2181/marathon
# restart: "always"
#
145 changes: 81 additions & 64 deletions mentor/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,25 @@
import logging
from uuid import uuid4
from mentos.utils import decode_data,encode_data
from functools import wraps
from six import iteritems,iterkeys,get_function_code

log = logging.getLogger(__name__)

logging.getLogger().setLevel(logging.DEBUG)
# u('string') replaces the forwards-incompatible u'string'
if six.PY3:
def u(string):
return string
else:
import codecs

def u(string):
return codecs.unicode_escape_decode(string)[0]

# dict.iteritems(), dict.iterkeys() is also incompatible
if six.PY3:
iteritems = dict.items
iterkeys = dict.keys
else:
iteritems = dict.iteritems
iterkeys = dict.iterkeys

def bunchify(x):



def decode_message(x):
""" Recursively transforms a dictionary into a Message via copy.
"""
if isinstance(x, dict):
return Message((k, decode_message(v)) for k, v in iteritems(x))
elif isinstance(x, (list, tuple)):
return type(x)(decode_message(v) for v in x)
else:
return x


class Message(dict):
Expand Down Expand Up @@ -109,6 +103,10 @@ def __repr__(self):
def __dir__(self):
return self.keys()

@staticmethod
def fromDict(d):
return decode_message(d)

def Cpus(value):
return Message({'name': 'cpus', 'role': '*', 'scalar': Message({'value':value}), 'type': 'SCALAR'})

Expand Down Expand Up @@ -236,7 +234,9 @@ def ports(self):


class TaskInfo(ResourceMixin, Message):
pass
@staticmethod
def fromDict(d):
return TaskInfo(**decode_message(d))



Expand All @@ -255,25 +255,12 @@ def agent_id(self):
except KeyError:
return self["slave_id"]

@staticmethod
def fromDict(d):
return Offer(**decode_message(d))


class PythonTaskStatus(Message):

def __init__(self, data=None, **kwargs):
super(PythonTaskStatus, self).__init__(**kwargs)
self.labels = Message(labels=Message(key='python'))
self.data = encode_data(cloudpickle.dumps(data))

@property
def result(self):
return cloudpickle.loads(decode_data(self["data"]))

@property
def exception(self):
try:
return remote_exception(*self.data)
except:
return None
class TaskStatus(Message):

def is_staging(self):
return self.state == 'TASK_STAGING'
Expand All @@ -300,6 +287,33 @@ def has_failed(self):
def has_terminated(self):
return self.has_succeeded() or self.has_failed()


class PythonTaskStatus(TaskStatus):

def __init__(self, data=None, **kwargs):

self.labels = Message(labels=[Message(key='python')])
self.data = encode_data(cloudpickle.dumps(kwargs.get("data",None)))

super(PythonTaskStatus, self).__init__(**kwargs)

@property
def result(self):
return cloudpickle.loads(decode_data(self["data"]))

@property
def exception(self):
try:
return remote_exception(*self.result)
except:
return None



@staticmethod
def fromDict(d):
return PythonTaskStatus(**decode_message(d))

# TODO create custom messages per executor
class PythonTask(TaskInfo):

Expand All @@ -316,26 +330,12 @@ def __init__(self, fn=None, args=[], kwargs={},
self.retries = retries
self.attempt = 1

self.labels = Message(labels=Message(key='python'))
self.labels = Message(labels=[Message(key='python')])

def __call__(self):
fn, args, kwargs = cloudpickle.loads(decode_data(self.data))
return fn(*args, **kwargs)

def retry(self, status):
if self.attempt < self.retries:
log.info('Task {} attempt #{} rescheduled due to failure with state '
'{} and message {}'.format(self.task_id, self.attempt,
status.state, status.message))
self.attempt += 1
status.state = 'TASK_STAGING'
else:
log.error('Aborting due to task {} failed for {} attempts in state '
'{} with message {}'.format(self.task_id, self.retries,
status.state, status.message))
raise RuntimeError('Task {} failed with state {} and message {}'.format(
self.task_id, status.state, status.message))

def update(self, status):
self.on_update(status)
if status.has_succeeded():
Expand All @@ -355,15 +355,6 @@ def on_fail(self, status):
log.error('Task {} has been failed with state {} due to {}'.format(
self.task_id.value, status.state, status.message))

try:
raise status.exception # won't retry due to code error in PythonTaskStatus
except KeyError as e:
# not a code error, e.g. problem during deployment
self.retry(status)
else:
log.error('Aborting due to task {} failed with state {} and message '
'{}'.format(self.task_id, status.state, status.message))


class PythonExecutor(Message):

Expand All @@ -373,7 +364,7 @@ def __init__(self, id, docker='satyr', force_pull=False,
envs={}, uris=[], **kwds):
super(PythonExecutor, self).__init__(**kwds)
self.container = Message(
type='MESOS',
type='DOCKER',
mesos=Message(
image=Message(type='DOCKER',
docker=Message(name=docker))))
Expand All @@ -385,7 +376,7 @@ def __init__(self, id, docker='satyr', force_pull=False,
self.envs = envs
self.uris = uris

self.labels = Message(labels=Message(key='python'))
self.labels = Message(labels=[Message(key='python')])

@property
def docker(self):
Expand Down Expand Up @@ -420,4 +411,30 @@ def envs(self):
@envs.setter
def envs(self, value):
envs = [{'name': k, 'value': v} for k, v in value.items()]
self.command.environment = Message(variables=envs)
self.command.environment = Message(variables=envs)




def transform(repeat=False,**trigger):
def decorator(func):
names = getattr(func,'_names',None)
if names is None:
code = get_function_code(func)
names = code.co_varnames[:code.co_argcount]
@wraps(func)
def decorated(*args,**kwargs):
all_args = kwargs.copy()
for n,v in zip(names,args):
all_args[n] = v
for k,v in trigger.items():
if k in all_args:
if repeat:
all_args[k] = [v.fromDict(arg) for arg in all_args[k]]
else:
all_args[k] = v.fromDict(all_args[k])
return func(**all_args)
decorated._names = names
return decorated
return decorator

6 changes: 3 additions & 3 deletions mentor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from mentor.constraint import pour
from mentos.interface import Scheduler
from mentor.placement import bfd
from mentor.messages import TaskInfo,Offer,Message
from mentor.messages import TaskInfo,Offer,Message,transform,TaskStatus,PythonTaskStatus
from mentos.scheduler import SchedulerDriver
from mentor.utils import Interruptable, timeout

Expand Down Expand Up @@ -59,8 +59,8 @@ def submit(self, task): # supports commandtask, pythontask etc.
assert isinstance(task, TaskInfo)
self.tasks[task.task_id.value] = task

@transform(repeat=True,offers=Offer)
def on_offers(self, driver, offers):
offers = [Offer(f) for f in offers]
log.info('Received offers: {}'.format(sum(offers)))
self.report()

Expand Down Expand Up @@ -90,8 +90,8 @@ def on_offers(self, driver, offers):
except Exception as ex:
log.exception('Exception occured during task launch!')

@transform(repeat=False, status=PythonTaskStatus)
def on_update(self, driver, status):
#status = Message(**status)
task = self.tasks[status.task_id.value]
log.info('Updated task {} state to {}'.format(status.task_id,
status.state))
Expand Down
2 changes: 1 addition & 1 deletion mentor/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_scheduler_retries(mocker):
#states = ['TASK_STARTING', 'TASK_FAILED', 'TASK_FAILED']
states = ['TASK_FAILED', 'TASK_FAILED', 'TASK_FAILED']
for ((args, kwargs), state) in zip(sched.on_update.call_args_list, states):
assert args[1].state == state
assert args[1]["state"] == state


def test_scheduler_constraints(mocker):
Expand Down

0 comments on commit 3d5e9eb

Please sign in to comment.