Skip to content
This repository has been archived by the owner on Aug 28, 2019. It is now read-only.

Commit

Permalink
Workflow (#35)
Browse files Browse the repository at this point in the history
Issue #12: Prototype that pipelines computational services
  • Loading branch information
mguidon authored and sbenkler committed Dec 13, 2017
1 parent 68ae415 commit bc41fbe
Show file tree
Hide file tree
Showing 36 changed files with 1,982 additions and 0 deletions.
18 changes: 18 additions & 0 deletions demos/workflow/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Build example compuational service
# author: Manuel Guidon

build:
python solver/build-scripts/build.py --imagename=sidecar-solver --version=1.1 --registry=masu.speag.com --namespace=comp.services
docker build -t masu.speag.com/comp.backend/simcore.director:1.0 -f ./director/Dockerfile-prod ./director
docker build -t masu.speag.com/comp.backend/simcore.sidecar:1.0 -f ./sidecar/Dockerfile-prod ./sidecar

publish:
python solver/build-scripts/build.py --imagename=sidecar-solver --version=1.1 --registry=masu.speag.com --namespace=comp.services --publish
docker push masu.speag.com/comp.backend/simcore.director:1.0
docker push masu.speag.com/comp.backend/simcore.sidecar:1.0

demo:
echo "this should launch the demo (detached if possible)"

stop:
echo "this should stop the demo"
86 changes: 86 additions & 0 deletions demos/workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Workflow

An experimental project for designing a complete workflow which includes pipeline extraction, job scheduling, result storage and display in the framework of a scalable docker swarm

## Description
The computational backend of simcore consists of a director that exposes all available services to the user via the frontend. It is also responsible to convert pipelines created by the user into inter-dependant jobs that can be scheduled asynchronously.
Scheduling is done using the python celery framework with rabbitMQ as a broker and MongoDB as a backend to store data. The celery workers are implemented as sidedcars that themselves can run on-shot docker containers with the actual compuational services. The full stack can be deployed in a docker swarm.

## Serivces, APIs and Documentation

### Director Service
Entry point for frontend

```
localhost:8010/run_pipeline (POST)
localhost:8010/calc/0.0/1.0/100/"sin(x)"
```
returns urls with job status and result

### Flower Service
Visualization of underlying celery task queue
```
localhost:5555
```

### Mongo Express
Visualization of underlying MongoDB

```
localhost:8081
```

### RabbitMQ
Visualization of broker
```
localhost:15672
```

### Visualizer
Visualization of docker swarm
```
localhost:5000
```

### Registry
Visualization of simcore docker registry
```
masu.speag.com:5001
```


### Pipeline descriptor
Example:
```
{
"input":
[
{
"name": "N",
"value": 10
},
{
"name": "xmin",
"value": -1.0
},
{
"name": "xmax",
"value": 1.0
},
{
"name": "func",
"value": "exp(x)*sin(x)"
}
],
"container":
{
"name": "masu.speag.com/comp.services/sidecar-solver",
"tag": "1.1"
}
}
```

### Orchestration diagram
![workflow](images/comp.backend.png)

2 changes: 2 additions & 0 deletions demos/workflow/director/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# examples with 3rd party code
example_images/comp.services/
13 changes: 13 additions & 0 deletions demos/workflow/director/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM continuumio/miniconda3
MAINTAINER Manuel Guidon <guidon@itis.ethz.ch>

RUN conda install flask plotly pymongo numpy
RUN conda install -c conda-forge celery
RUN pip install docker


EXPOSE 8010

WORKDIR /work

CMD ["python", "director.py"]
14 changes: 14 additions & 0 deletions demos/workflow/director/Dockerfile-prod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM continuumio/miniconda
MAINTAINER Manuel Guidon <guidon@itis.ethz.ch>

RUN conda install flask plotly pymongo numpy
RUN conda install -c conda-forge celery
RUN pip install docker

EXPOSE 8010

WORKDIR /work
ADD *.py /work/
ADD ./templates /work/templates

CMD ["python", "director.py"]
213 changes: 213 additions & 0 deletions demos/workflow/director/director.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
from flask import Flask, make_response, request, url_for, render_template
import json
import hashlib
from pymongo import MongoClient
import gridfs
from bson import ObjectId
from werkzeug.exceptions import NotFound, ServiceUnavailable
import requests
from worker import celery
from celery.result import AsyncResult
import celery.states as states
from celery import signature
import numpy as np
import docker
import sys
import plotly

app = Flask(__name__)


task_info = {}

def create_graph(x,y):
graph = [
dict(
data=[
dict(
x=x,
y=y,
type='scatter'
),
],
layout=dict(
title='scatter plot'
)
)
]
return graph


def nice_json(arg):
response = make_response(json.dumps(arg, sort_keys = True, indent=4))
response.headers['Content-type'] = "application/json"
return response

def output_exists(output_hash):
db_client = MongoClient("mongodb://database:27017/")
output_database = db_client.output_database
output_collections = output_database.output_collections
exists = output_collections.find_one({"_hash" : str(output_hash)})
return not exists is None

def parse_input_data(data):
if "input" in data:
inp = data["input"]
data_hash = hashlib.sha256(json.dumps(inp, sort_keys=True).encode('utf-8')).hexdigest()
db_client = MongoClient("mongodb://database:27017/")
input_database = db_client.input_database
input_collections = input_database.input_collections
exists = input_collections.find_one({"_hash" : str(data_hash)})
if exists == None:
cp_data = data.copy()
cp_data["_hash"] = [str(data_hash)]
input_collections.insert_one(cp_data)

return [data_hash, not exists is None]

def parse_container_data(data):
if "container" in data:
container = data["container"]
container_name = container["name"]
container_tag = container['tag']
client = docker.from_env(version='auto')
client.login(registry="masu.speag.com/v2", username="z43", password="z43")
img = client.images.pull(container_name, tag=container_tag)
container_hash = str(img.id).split(':')[1]
return container_hash

def start_computation(data):
try:
# req = requests.post("http://sidecar:8000/setup", json = data)
# req2 = requests.get("http://sidecar:8000/preprocess")
# req3 = requests.get("http://sidecar:8000/process")
# req4 = requests.get("http://sidecar:8000/postprocess")
# print data
# sys.stdout.flush()
req = requests.post("http://sidecar:8000/run", json = data)

except requests.exceptions.ConnectionError:
raise ServiceUnavailable("The computational service is unavailable.")

@app.route('/add/<int:param1>/<int:param2>')
def add(param1,param2):
task = celery.send_task('mytasks.add', args=[param1, param2], kwargs={})
return "<a href='{url}'>check status of {id} </a>".format(id=task.id,
url=url_for('check_task',id=task.id,_external=True))

@app.route('/check/<string:id>')
def check_task(id):
res = celery.AsyncResult(id)
if res.state==states.PENDING:
return res.state
else:
db_client = MongoClient("mongodb://database:27017/")
output_database = db_client.output_database
output_collections = output_database.output_collections
exists = output_collections.find_one({"_hash" : str(res.result)})
if exists is not None:
file_db = db_client.file_db
fs = gridfs.GridFS(file_db)
for file_id in exists["ids"]:
data = fs.get(file_id).read()
data_array = np.fromstring(data, sep="\t")
x = data_array[0::2]
y = data_array[1::2]
graph = create_graph(x, y)
ids = ['graph-{}'.format(i) for i, _ in enumerate(graph)]
# objects to their JSON equivalents
graphJSON = json.dumps(graph, cls=plotly.utils.PlotlyJSONEncoder)

return render_template('layouts/index.html',
ids=ids,
graphJSON=graphJSON)

return str(res.result)


@app.route("/services", methods=['GET'])
def services():
return nice_json(registered_services)

@app.route("/service/<id>", methods=['GET'])
def service(id):
return nice_json(registered_services[id])

@app.route("/task/<id>", methods=['Get'])
def task(id):
return "42"

@app.route("/run_pipeline", methods=['POST'])
def run_pipeline():
data = request.get_json()
hashstr = ""
[input_hash, input_exists] = parse_input_data(data)

container_hash = parse_container_data(data)

combined = hashlib.sha256()
combined.update(input_hash.encode('utf-8'))
combined.update(container_hash.encode('utf-8'))
output_hash = combined.hexdigest()

output_ready = output_exists(output_hash)
task = celery.send_task('mytasks.run', args=[data], kwargs={})

return "<a href='{url}'>check status of {id} </a>".format(id=task.id,
url=url_for('check_task',id=task.id,_external=True))


# @app.route("/calc", methods=['GET'])
@app.route('/calc/<float:x_min>/<float:x_max>/<int:N>/<string:f>')
def calc(x_min, x_max, N, f):
#ata = request.get_json()

data_str = """{
"input":
[
{
"name": "N",
"value": %s
},
{
"name": "xmin",
"value": %f
},
{
"name": "xmax",
"value": %f
},
{
"name": "func",
"value": %s
}
],
"container":
{
"name": "masu.speag.com/comp.services/sidecar-solver",
"tag": "1.1"
}
}""" % (N, x_min, x_max, f)

data = json.loads(data_str)
print(type(data))
sys.stdout.flush()
hashstr = ""
[input_hash, input_exists] = parse_input_data(data)

container_hash = parse_container_data(data)

combined = hashlib.sha256()
combined.update(input_hash.encode('utf-8'))
combined.update(container_hash.encode('utf-8'))
output_hash = combined.hexdigest()

output_ready = output_exists(output_hash)
task = celery.send_task('mytasks.run', args=[data], kwargs={})

return "<a href='{url}'>check status of {id} </a>".format(id=task.id,
url=url_for('check_task',id=task.id,_external=True))


if __name__ == "__main__":
app.run(port=8010, debug=True, host='0.0.0.0')
39 changes: 39 additions & 0 deletions demos/workflow/director/templates/layouts/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<!doctype html>
<html>

<head>
</head>

<body>

{% for id in ids %}
<h3>{{id}}</h3>
<div id="{{id}}"></div>
{% endfor %}

</body>


<footer>
<!-- D3.js -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/d3/3.5.6/d3.min.js"></script>
<!-- jQuery -->
<script src="https://code.jquery.com/jquery-2.1.4.min.js"></script>
<!-- Plotly.js -->
<script src="https://d14fo0winaifog.cloudfront.net/plotly-basic.js"></script>

<script type="text/javascript">

var graphs = {{graphJSON | safe}};
var ids = {{ids | safe}};

for(var i in graphs) {
Plotly.plot(ids[i], // the ID of the div, created above
graphs[i].data,
graphs[i].layout || {});
}

</script>
</footer>

</html>
Loading

0 comments on commit bc41fbe

Please sign in to comment.