Skip to content

Commit

Permalink
Merge branch 'master' into mentos
Browse files Browse the repository at this point in the history
  • Loading branch information
kszucs authored Mar 10, 2017
2 parents 3d5e9eb + 772b952 commit 67de2be
Show file tree
Hide file tree
Showing 29 changed files with 2,598 additions and 75 deletions.
66 changes: 33 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[![Build Status](http://drone.lensa.com:8000/api/badges/lensacom/satyr/status.svg)](http://drone.lensa.com:8000/lensacom/satyr)
[![Join the chat at https://gitter.im/lensacom/satyr](https://badges.gitter.im/lensacom/satyr.svg)](https://gitter.im/lensacom/satyr?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![Build Status](http://drone.daskos.com:8000/api/badges/daskos/mentor/status.svg)](http://drone.daskos.com:8000/daskos/mentor)
[![Join the chat at https://gitter.im/daskos/mentor](https://badges.gitter.im/daskos/mentor.svg)](https://gitter.im/daskos/mentor?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

![satyr](https://s3.amazonaws.com/lensa-rnd-misc/satyr2.png)
![mentor](https://s3.amazonaws.com/daskos-rnd-misc/mentor2.png)

# An extensible Mesos library for Python
###### aka. the distributed snake-charmer


Satyr's intention is to simplify the process of writing python frameworks
for Mesos. Satyr provides multiple components and interfaces to cover various
Mentor's intention is to simplify the process of writing python frameworks
for Mesos. Mentor provides multiple components and interfaces to cover various
levels of complexity needs.

## Notable Features
Expand All @@ -21,7 +21,7 @@ levels of complexity needs.

## Install

`pip install mentor` or use [lensa/satyr](https://hub.docker.com/r/lensa/satyr/) Docker image
`pip install mentor` or use [daskos/mentor](https://hub.docker.com/r/daskos/mentor/) Docker image

Requirements:
- mesos.interface (installable via pip)
Expand All @@ -40,8 +40,8 @@ It's almost identical to python's
but runs processes on a Mesos cluster (concurrently).

```python
from satyr.apis.futures import MesosPoolExecutor
from satyr.proxies.messages import Cpus, Mem
from mentor.apis.futures import MesosPoolExecutor
from mentor.proxies.messages import Cpus, Mem

with MesosPoolExecutor(name='futures-pool') as executor:
def mul(a, b):
Expand All @@ -63,10 +63,10 @@ but runs processes on a Mesos cluster (concurrently).

```python
from __future__ import print_function
from satyr.apis.multiprocessing import Pool
from mentor.apis.multiprocessing import Pool


with Pool(name='satyr-pool') as pool:
with Pool(name='mentor-pool') as pool:
def mul(a, b):
return a * b

Expand All @@ -88,32 +88,32 @@ Basic scheduler to submit various kind of workloads, eg.:

```python
from __future__ import print_function
from satyr.scheduler import QueueScheduler, Running
from satyr.messages import PythonTask
from satyr.proxies.messages import Disk, Mem, Cpus
from mentor.scheduler import QueueScheduler, Running
from mentor.messages import PythonTask
from mentor.proxies.messages import Disk, Mem, Cpus


scheduler = QueueScheduler()
task = PythonTask(fn=sum, args=[range(10)], name='satyr-task',
task = PythonTask(fn=sum, args=[range(10)], name='mentor-task',
resources=[Cpus(0.1), Mem(128), Disk(512)])

with Running(scheduler, name='satyr-scheduler'):
with Running(scheduler, name='mentor-scheduler'):
res = scheduler.submit(task) # return AsyncResult
print(res.get(timeout=30))
```

### Custom Scheduler

You can make your own scheduler built on QueueScheduler or for more complex
needs there's a [Scheduler](satyr/interface.py) interface which you can use
needs there's a [Scheduler](mentor/interface.py) interface which you can use
to create one from scratch. (However in this case you'll have to implement
some of the functionalities already in [QueueScheduler](satyr/scheduler.py))
some of the functionalities already in [QueueScheduler](mentor/scheduler.py))

```python
from __future__ import print_function
from satyr.scheduler import QueueScheduler, Running
from satyr.messages import PythonTask
from satyr.proxies.messages import Disk, Mem, Cpus
from mentor.scheduler import QueueScheduler, Running
from mentor.messages import PythonTask
from mentor.proxies.messages import Disk, Mem, Cpus


class CustomScheduler(QueueScheduler):
Expand All @@ -131,10 +131,10 @@ class CustomScheduler(QueueScheduler):


scheduler = CustomScheduler()
task = PythonTask(fn=sum, args=[range(9)], name='satyr-task',
task = PythonTask(fn=sum, args=[range(9)], name='mentor-task',
resources=[Cpus(0.1), Mem(128), Disk(512)])

with Running(scheduler, name='satyr-custom-scheduler'):
with Running(scheduler, name='mentor-custom-scheduler'):
res = scheduler.submit(task)
print(res.get(timeout=60))
```
Expand All @@ -145,8 +145,8 @@ helping hand with comparable Offers and TaskInfos (basic arithmetic operators
are also overloaded).

```python
from satyr.interface import Scheduler
from satyr.proxies.messages import Offer, TaskInfo
from mentor.interface import Scheduler
from mentor.proxies.messages import Offer, TaskInfo


class CustomScheduler(Scheduler):
Expand All @@ -164,7 +164,7 @@ class CustomScheduler(Scheduler):

## Optimized Task Placement

Satyr implements multiple weighted heuristics to solve the
Mentor implements multiple weighted heuristics to solve the
[Bin-Packing Problem](https://en.wikipedia.org/wiki/Bin_packing_problem):

- First-Fit
Expand All @@ -173,7 +173,7 @@ Satyr implements multiple weighted heuristics to solve the
- Best-Fit
- Best-Fit-Decreasing

see [binpack.py](satyr/binpack.py).
see [binpack.py](mentor/binpack.py).

The benefits of using bin-packing has been proven by
[Netflix/Fenzo](https://github.com/Netflix/Fenzo) in
Expand All @@ -188,24 +188,24 @@ value with `/bin/sh -c`. Also, if you want to run your task in a Docker
container you can provide some additional information for the task.

```python
from satyr.proxies.messages import TaskInfo, CommandInfo
from mentor.proxies.messages import TaskInfo, CommandInfo


task = TaskInfo(name='command-task', command=CommandInfo(value='echo 100'))
task.container.type = 'DOCKER'
task.container.docker.image = 'lensacom/satyr:latest'
task.container.docker.image = 'daskos/mentor:latest'
```

### Python

[PythonTask](/satyr/messages.py) is capable of running arbitrary python code on
[PythonTask](/mentor/messages.py) is capable of running arbitrary python code on
your cluster. It sends [cloudpickled](https://github.com/cloudpipe/cloudpickle)
methods and arguments to the matched mesos-slave for execution.
Note that python tasks run in [lensa/satyr](https://hub.docker.com/r/lensa/satyr/)
Note that python tasks run in [daskos/mentor](https://hub.docker.com/r/daskos/mentor/)
Docker container by default.

```python
from satyr.messages import PythonTask
from mentor.messages import PythonTask


# You can pass a function or a lambda in place of sum for fn.
Expand All @@ -214,15 +214,15 @@ task = PythonTask(name='python-task', fn=sum, args=[range(5)])

## Custom Task

Customs tasks can be written by extending [TaskInfo](/satyr/proxies/messages.py)
Customs tasks can be written by extending [TaskInfo](/mentor/proxies/messages.py)
or any existing descendants.
If you're walking down the former path you'll most likely have to deal with
protobuf in your code; worry not, we have some magic wrappers for you to provide
customizable messages.

```python
from __future__ import print_function
from satyr.proxies.messages import TaskInfo
from mentor.proxies.messages import TaskInfo
from mesos.interface import mesos_pb2


Expand Down
1 change: 0 additions & 1 deletion mentor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from .executor import ThreadExecutor, ProcessExecutor, ExecutorDriver
from .messages import PythonTask, PythonTaskStatus


__version__ = _pkg_resources.get_distribution('mentor').version

__all__ = ('QueueScheduler',
Expand Down
2 changes: 1 addition & 1 deletion mentor/apis/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ def map(self, func, *iterables, **kwargs):
def shutdown(self, wait=True):
if wait:
self.scheduler.wait(-1)
self.stop()
self.stop()
2 changes: 1 addition & 1 deletion mentor/apis/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ def apply_async(self, func, args=[], kwds={}, name='multiprocessing',
task = PythonTask(name=name, fn=func, args=args, kwargs=kwds,
resources=resources, executor=executor, **kwargs)
self.scheduler.submit(task)
return AsyncResult(task)
return AsyncResult(task)
6 changes: 3 additions & 3 deletions mentor/apis/tests/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def raiser():

with MesosPoolExecutor(name='futures-pool') as executor:
future = executor.submit(raiser)
e = future.exception(timeout=30)
assert isinstance(e, RemoteException)
assert isinstance(e, TypeError)
exc = future.exception(timeout=30)
assert isinstance(exc, RemoteException)
assert isinstance(exc, TypeError)


def test_multiple_submit(resources):
Expand Down
2 changes: 1 addition & 1 deletion mentor/apis/tests/test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def feed(i, queue):
results = [cp.loads(queue.get()) for i in range(4)]
assert sorted(results) == range(4)


def test_map_async(resources):
with Pool(name='test-pool') as pool:
results = pool.map_async(
Expand Down
132 changes: 132 additions & 0 deletions mentor/binpack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
from __future__ import absolute_import, division, print_function

import operator


def weight(items, **kwargs):
if not len(kwargs):
raise ValueError('Missing attribute for weighting items!')
scaled = []
for attr, weight in kwargs.items():
values = [float(getattr(item, attr)) for item in items]
try:
s = sum(values)
scaled.append([weight * (v / s) for v in values])
except ZeroDivisionError:
# s equals to zero, attr wont contribute
scaled.append([0] * len(items))

return map(sum, zip(*scaled))


def ff(items, targets):
"""First-Fit
This is perhaps the simplest packing heuristic;
it simply packs items in the next available bin.
Complexity O(n^2)
"""
bins = [(target, []) for target in targets]
skip = []

for item in items:
for target, content in bins:
if item <= (target - sum(content)):
content.append(item)
break
else:
skip.append(item)
return bins, skip


def ffd(items, targets, **kwargs):
"""First-Fit Decreasing
This is perhaps the simplest packing heuristic;
it simply packs items in the next available bin.
This algorithm differs only from Next-Fit Decreasing
in having a 'sort'; that is, the items are pre-sorted
(largest to smallest).
Complexity O(n^2)
"""
sizes = zip(items, weight(items, **kwargs))
sizes = sorted(sizes, key=operator.itemgetter(1), reverse=True)
items = map(operator.itemgetter(0), sizes)
return ff(items, targets)


def mr(items, targets, **kwargs):
"""Max-Rest
Complexity O(n^2)
"""
bins = [(target, []) for target in targets]
skip = []

for item in items:
capacities = [target - sum(content) for target, content in bins]
weighted = weight(capacities, **kwargs)

(target, content), capacity, _ = max(zip(bins, capacities, weighted),
key=operator.itemgetter(2))
if item <= capacity:
content.append(item)
else:
skip.append(item)
return bins, skip


def mrpq(items, targets):
"""Max-Rest Priority Queue
Complexity O(n*log(n))
"""
raise NotImplementedError()


def bf(items, targets, **kwargs):
"""Best-Fit
Complexity O(n^2)
"""
bins = [(target, []) for target in targets]
skip = []

for item in items:
containers = []
capacities = []
for target, content in bins:
capacity = target - sum(content)
if item <= capacity:
containers.append(content)
capacities.append(capacity - item)

if len(capacities):
weighted = zip(containers, weight(capacities, **kwargs))
content, _ = min(weighted, key=operator.itemgetter(1))
content.append(item)
else:
skip.append(item)
return bins, skip


def bfd(items, targets, **kwargs):
"""Best-Fit Decreasing
Complexity O(n^2)
"""
sizes = zip(items, weight(items, **kwargs))
sizes = sorted(sizes, key=operator.itemgetter(1), reverse=True)
items = map(operator.itemgetter(0), sizes)
return bf(items, targets, **kwargs)


def bfh(items, targets):
"""Best-Fit-Heap
Slightly Improved Complexity
"""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion mentor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,4 @@ def on_outbound_error(self, driver, event):
import logging
logging.basicConfig(level=logging.DEBUG)
driver = ExecutorDriver(ThreadExecutor())
driver.start(block=True)
driver.start(block=True)
Loading

0 comments on commit 67de2be

Please sign in to comment.