diff --git a/demos/workflow/Makefile b/demos/workflow/Makefile new file mode 100644 index 00000000..113d604c --- /dev/null +++ b/demos/workflow/Makefile @@ -0,0 +1,18 @@ +# Build example compuational service +# author: Manuel Guidon + +build: + python solver/build-scripts/build.py --imagename=sidecar-solver --version=1.1 --registry=masu.speag.com --namespace=comp.services + docker build -t masu.speag.com/comp.backend/simcore.director:1.0 -f ./director/Dockerfile-prod ./director + docker build -t masu.speag.com/comp.backend/simcore.sidecar:1.0 -f ./sidecar/Dockerfile-prod ./sidecar + +publish: + python solver/build-scripts/build.py --imagename=sidecar-solver --version=1.1 --registry=masu.speag.com --namespace=comp.services --publish + docker push masu.speag.com/comp.backend/simcore.director:1.0 + docker push masu.speag.com/comp.backend/simcore.sidecar:1.0 + +demo: + echo "this should launch the demo (detached if possible)" + +stop: + echo "this should stop the demo" diff --git a/demos/workflow/README.md b/demos/workflow/README.md new file mode 100644 index 00000000..f7ab33de --- /dev/null +++ b/demos/workflow/README.md @@ -0,0 +1,86 @@ +# Workflow + +An experimental project for designing a complete workflow which includes pipeline extraction, job scheduling, result storage and display in the framework of a scalable docker swarm + +## Description +The computational backend of simcore consists of a director that exposes all available services to the user via the frontend. It is also responsible to convert pipelines created by the user into inter-dependant jobs that can be scheduled asynchronously. +Scheduling is done using the python celery framework with rabbitMQ as a broker and MongoDB as a backend to store data. The celery workers are implemented as sidedcars that themselves can run on-shot docker containers with the actual compuational services. The full stack can be deployed in a docker swarm. + +## Serivces, APIs and Documentation + +### Director Service +Entry point for frontend + +``` +localhost:8010/run_pipeline (POST) +localhost:8010/calc/0.0/1.0/100/"sin(x)" +``` +returns urls with job status and result + +### Flower Service +Visualization of underlying celery task queue +``` +localhost:5555 +``` + +### Mongo Express +Visualization of underlying MongoDB + +``` +localhost:8081 +``` + +### RabbitMQ +Visualization of broker +``` +localhost:15672 +``` + +### Visualizer +Visualization of docker swarm +``` +localhost:5000 +``` + +### Registry +Visualization of simcore docker registry +``` +masu.speag.com:5001 +``` + + +### Pipeline descriptor +Example: +``` +{ + "input": + [ + { + "name": "N", + "value": 10 + }, + { + "name": "xmin", + "value": -1.0 + }, + { + "name": "xmax", + "value": 1.0 + }, + { + "name": "func", + "value": "exp(x)*sin(x)" + } + ], + "container": + { + "name": "masu.speag.com/comp.services/sidecar-solver", + "tag": "1.1" + } +} + +``` + +### Orchestration diagram +![workflow](images/comp.backend.png) + diff --git a/demos/workflow/director/.gitignore b/demos/workflow/director/.gitignore new file mode 100644 index 00000000..f0ff5347 --- /dev/null +++ b/demos/workflow/director/.gitignore @@ -0,0 +1,2 @@ +# examples with 3rd party code +example_images/comp.services/ diff --git a/demos/workflow/director/Dockerfile b/demos/workflow/director/Dockerfile new file mode 100644 index 00000000..e88ee1e7 --- /dev/null +++ b/demos/workflow/director/Dockerfile @@ -0,0 +1,13 @@ +FROM continuumio/miniconda3 +MAINTAINER Manuel Guidon + +RUN conda install flask plotly pymongo numpy +RUN conda install -c conda-forge celery +RUN pip install docker + + +EXPOSE 8010 + +WORKDIR /work + +CMD ["python", "director.py"] diff --git a/demos/workflow/director/Dockerfile-prod b/demos/workflow/director/Dockerfile-prod new file mode 100644 index 00000000..0d84f217 --- /dev/null +++ b/demos/workflow/director/Dockerfile-prod @@ -0,0 +1,14 @@ +FROM continuumio/miniconda +MAINTAINER Manuel Guidon + +RUN conda install flask plotly pymongo numpy +RUN conda install -c conda-forge celery +RUN pip install docker + +EXPOSE 8010 + +WORKDIR /work +ADD *.py /work/ +ADD ./templates /work/templates + +CMD ["python", "director.py"] diff --git a/demos/workflow/director/director.py b/demos/workflow/director/director.py new file mode 100644 index 00000000..b7109b9d --- /dev/null +++ b/demos/workflow/director/director.py @@ -0,0 +1,213 @@ +from flask import Flask, make_response, request, url_for, render_template +import json +import hashlib +from pymongo import MongoClient +import gridfs +from bson import ObjectId +from werkzeug.exceptions import NotFound, ServiceUnavailable +import requests +from worker import celery +from celery.result import AsyncResult +import celery.states as states +from celery import signature +import numpy as np +import docker +import sys +import plotly + +app = Flask(__name__) + + +task_info = {} + +def create_graph(x,y): + graph = [ + dict( + data=[ + dict( + x=x, + y=y, + type='scatter' + ), + ], + layout=dict( + title='scatter plot' + ) + ) + ] + return graph + + +def nice_json(arg): + response = make_response(json.dumps(arg, sort_keys = True, indent=4)) + response.headers['Content-type'] = "application/json" + return response + +def output_exists(output_hash): + db_client = MongoClient("mongodb://database:27017/") + output_database = db_client.output_database + output_collections = output_database.output_collections + exists = output_collections.find_one({"_hash" : str(output_hash)}) + return not exists is None + +def parse_input_data(data): + if "input" in data: + inp = data["input"] + data_hash = hashlib.sha256(json.dumps(inp, sort_keys=True).encode('utf-8')).hexdigest() + db_client = MongoClient("mongodb://database:27017/") + input_database = db_client.input_database + input_collections = input_database.input_collections + exists = input_collections.find_one({"_hash" : str(data_hash)}) + if exists == None: + cp_data = data.copy() + cp_data["_hash"] = [str(data_hash)] + input_collections.insert_one(cp_data) + + return [data_hash, not exists is None] + +def parse_container_data(data): + if "container" in data: + container = data["container"] + container_name = container["name"] + container_tag = container['tag'] + client = docker.from_env(version='auto') + client.login(registry="masu.speag.com/v2", username="z43", password="z43") + img = client.images.pull(container_name, tag=container_tag) + container_hash = str(img.id).split(':')[1] + return container_hash + +def start_computation(data): + try: + # req = requests.post("http://sidecar:8000/setup", json = data) + # req2 = requests.get("http://sidecar:8000/preprocess") + # req3 = requests.get("http://sidecar:8000/process") + # req4 = requests.get("http://sidecar:8000/postprocess") +# print data +# sys.stdout.flush() + req = requests.post("http://sidecar:8000/run", json = data) + + except requests.exceptions.ConnectionError: + raise ServiceUnavailable("The computational service is unavailable.") + +@app.route('/add//') +def add(param1,param2): + task = celery.send_task('mytasks.add', args=[param1, param2], kwargs={}) + return "check status of {id} ".format(id=task.id, + url=url_for('check_task',id=task.id,_external=True)) + +@app.route('/check/') +def check_task(id): + res = celery.AsyncResult(id) + if res.state==states.PENDING: + return res.state + else: + db_client = MongoClient("mongodb://database:27017/") + output_database = db_client.output_database + output_collections = output_database.output_collections + exists = output_collections.find_one({"_hash" : str(res.result)}) + if exists is not None: + file_db = db_client.file_db + fs = gridfs.GridFS(file_db) + for file_id in exists["ids"]: + data = fs.get(file_id).read() + data_array = np.fromstring(data, sep="\t") + x = data_array[0::2] + y = data_array[1::2] + graph = create_graph(x, y) + ids = ['graph-{}'.format(i) for i, _ in enumerate(graph)] + # objects to their JSON equivalents + graphJSON = json.dumps(graph, cls=plotly.utils.PlotlyJSONEncoder) + + return render_template('layouts/index.html', + ids=ids, + graphJSON=graphJSON) + + return str(res.result) + + +@app.route("/services", methods=['GET']) +def services(): + return nice_json(registered_services) + +@app.route("/service/", methods=['GET']) +def service(id): + return nice_json(registered_services[id]) + +@app.route("/task/", methods=['Get']) +def task(id): + return "42" + +@app.route("/run_pipeline", methods=['POST']) +def run_pipeline(): + data = request.get_json() + hashstr = "" + [input_hash, input_exists] = parse_input_data(data) + + container_hash = parse_container_data(data) + + combined = hashlib.sha256() + combined.update(input_hash.encode('utf-8')) + combined.update(container_hash.encode('utf-8')) + output_hash = combined.hexdigest() + + output_ready = output_exists(output_hash) + task = celery.send_task('mytasks.run', args=[data], kwargs={}) + + return "check status of {id} ".format(id=task.id, + url=url_for('check_task',id=task.id,_external=True)) + + +# @app.route("/calc", methods=['GET']) +@app.route('/calc////') +def calc(x_min, x_max, N, f): + #ata = request.get_json() + + data_str = """{ + "input": + [ + { + "name": "N", + "value": %s + }, + { + "name": "xmin", + "value": %f + }, + { + "name": "xmax", + "value": %f + }, + { + "name": "func", + "value": %s + } + ], + "container": + { + "name": "masu.speag.com/comp.services/sidecar-solver", + "tag": "1.1" + } + }""" % (N, x_min, x_max, f) + + data = json.loads(data_str) + print(type(data)) + sys.stdout.flush() + hashstr = "" + [input_hash, input_exists] = parse_input_data(data) + + container_hash = parse_container_data(data) + + combined = hashlib.sha256() + combined.update(input_hash.encode('utf-8')) + combined.update(container_hash.encode('utf-8')) + output_hash = combined.hexdigest() + + output_ready = output_exists(output_hash) + task = celery.send_task('mytasks.run', args=[data], kwargs={}) + + return "check status of {id} ".format(id=task.id, + url=url_for('check_task',id=task.id,_external=True)) + + +if __name__ == "__main__": + app.run(port=8010, debug=True, host='0.0.0.0') diff --git a/demos/workflow/director/templates/layouts/index.html b/demos/workflow/director/templates/layouts/index.html new file mode 100644 index 00000000..890165bd --- /dev/null +++ b/demos/workflow/director/templates/layouts/index.html @@ -0,0 +1,39 @@ + + + + + + + + + {% for id in ids %} +

{{id}}

+
+ {% endfor %} + + + + +
+ + + + + + + + +
+ + diff --git a/demos/workflow/director/worker.py b/demos/workflow/director/worker.py new file mode 100644 index 00000000..2f334aa4 --- /dev/null +++ b/demos/workflow/director/worker.py @@ -0,0 +1,10 @@ +import os +from celery import Celery + +env=os.environ +CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','amqp://z43:z43@rabbit:5672') +CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','rpc://') + +celery= Celery('tasks', + broker=CELERY_BROKER_URL, + backend=CELERY_RESULT_BACKEND) diff --git a/demos/workflow/docker-compose-swarm.yml b/demos/workflow/docker-compose-swarm.yml new file mode 100644 index 00000000..1e96de46 --- /dev/null +++ b/demos/workflow/docker-compose-swarm.yml @@ -0,0 +1,98 @@ +version: '3' +services: + director: + image: masu.speag.com/comp.backend/simcore.director:1.0 + volumes: + - /var/run/docker.sock:/var/run/docker.sock + ports: + - "8010:8010" + dns: + - 172.16.8.15 + deploy: + replicas: 1 + restart_policy: + condition: on-failure + placement: + constraints: [node.role == worker] + sidecar: + image: masu.speag.com/comp.backend/simcore.sidecar:1.0 + volumes: + - input:/input + - output:/output + - log:/log + - /var/run/docker.sock:/var/run/docker.sock + ports: + - "8000:8000" + dns: + - 172.16.8.15 + deploy: + replicas: 5 + restart_policy: + condition: on-failure + placement: + constraints: [node.role == worker] + database: + image: mongo:3.4.0 + environment: + - MONGO_DATA_DIR=/data/db + - MONGO_LOG_DIR=/dev/null + volumes: + - db:/data/db + ports: + - "28017:28017" + command: mongod --httpinterface --rest --smallfiles --logpath=/dev/null # --quiet + deploy: + replicas: 1 + restart_policy: + condition: on-failure + placement: + constraints: [node.role == manager] + database_ui: + image: mongo-express + ports: + - "8081:8081" + environment: + - ME_CONFIG_MONGODB_SERVER=database + depends_on: + - database + deploy: + replicas: 1 + restart_policy: + condition: on-failure + placement: + constraints: [node.role == manager] + visualizer: + image: dockersamples/visualizer:stable + ports: + - "5000:8080" + volumes: + - /var/run/docker.sock:/var/run/docker.sock + deploy: + placement: + constraints: [node.role == manager] + rabbit: + image: rabbitmq:3-management + environment: + - RABBITMQ_DEFAULT_USER=z43 + - RABBITMQ_DEFAULT_PASS=z43 + ports: + - "15672:15672" + deploy: + placement: + constraints: [node.role == manager] + flower: + image: ondrejit/flower:latest + command: --broker=amqp://z43:z43@rabbit:5672 + ports: + - 5555:5555 + depends_on: + - director + deploy: + placement: + constraints: [node.role == manager] +volumes: + input: + output: + log: + db: + diff --git a/demos/workflow/docker-compose.yml b/demos/workflow/docker-compose.yml new file mode 100644 index 00000000..65da3203 --- /dev/null +++ b/demos/workflow/docker-compose.yml @@ -0,0 +1,67 @@ +version: '2' +services: + director: + build: + context: . + dockerfile: ./director/Dockerfile + volumes: + - ./director:/work + - /var/run/docker.sock:/var/run/docker.sock + ports: + - "8010:8010" + depends_on: + - rabbit + sidecar: + build: + context: . + dockerfile: ./sidecar/Dockerfile + volumes: + - input:/input + - output:/output + - log:/log + - ./sidecar:/work + - /var/run/docker.sock:/var/run/docker.sock + ports: + - "8000:8000" + dns: + - 172.16.8.15 + database: + image: mongo:3.4.0 + environment: + - MONGO_DATA_DIR=/data/db + - MONGO_LOG_DIR=/dev/null + volumes: + - db:/data/db + ports: + - "28017:28017" + command: mongod --httpinterface --rest --smallfiles --logpath=/dev/null # --quiet + database_ui: + image: mongo-express +# links: +# - database:mongo + ports: + - "8081:8081" + environment: + - ME_CONFIG_MONGODB_SERVER=database + depends_on: + - database + rabbit: + image: rabbitmq:3-management + environment: + - RABBITMQ_DEFAULT_USER=z43 + - RABBITMQ_DEFAULT_PASS=z43 + ports: + - "15672:15672" + flower: + image: ondrejit/flower:latest + command: --broker=amqp://z43:z43@rabbit:5672 + ports: + - 5555:5555 + depends_on: + - director +volumes: + input: + output: + log: + db: + diff --git a/demos/workflow/images/comp.backend.png b/demos/workflow/images/comp.backend.png new file mode 100755 index 00000000..6a7e00f3 Binary files /dev/null and b/demos/workflow/images/comp.backend.png differ diff --git a/demos/workflow/scripts/deploy.sh b/demos/workflow/scripts/deploy.sh new file mode 100755 index 00000000..1bcd71f8 --- /dev/null +++ b/demos/workflow/scripts/deploy.sh @@ -0,0 +1 @@ +docker login -u z43 -p z43 masu.speag.com && docker stack deploy -c docker-compose-swarm.yml workflow --with-registry-auth diff --git a/demos/workflow/scripts/do_forward.sh b/demos/workflow/scripts/do_forward.sh new file mode 100755 index 00000000..de85cb50 --- /dev/null +++ b/demos/workflow/scripts/do_forward.sh @@ -0,0 +1,6 @@ +./pf 5000 -e manager1 +./pf 8081 -e manager1 +./pf 15672 -e manager1 +./pf 5555 -e manager1 +./pf 8010 -e worker1 + diff --git a/demos/workflow/scripts/pf b/demos/workflow/scripts/pf new file mode 100755 index 00000000..c5af938a --- /dev/null +++ b/demos/workflow/scripts/pf @@ -0,0 +1,133 @@ +#!/bin/bash + +readonly PROGNAME=$(basename $0) + +port="${1}" +foreground="false" +stop="false" +environment="default" +quiet="false" +hostport="$1" + +usage="${PROGNAME} [-h] [-s] [-f] [-e] [-hp] -- Forwards a docker-machine port so that you can access it locally + +where: + -h, --help Show this help text + -s, --stop Stop the port forwarding process + -f, --foreground Run the docker-machine ssh client in foreground instead of background + -e, --environment The name of the docker-machine environment (default is default) + -q, --quiet Don't print anything to the console, not even errors + +examples: + # Port forward port 8047 in docker-machine environment default + \$ ${PROGNAME} 8047 + + # Port forward docker port 8047 to host port 8087 in docker-machine environment default + \$ ${PROGNAME} 8087:8047 + + # Port forward port 8047 in docker-machine dev + \$ ${PROGNAME} 8047 -e dev + + # Runs in foreground (port forwarding is automatically stopped when process is terminated) + \$ ${PROGNAME} 8047 -f + + # Stop the port forwarding for this port + \$ ${PROGNAME} 8047 -s" + +if [ $# -eq 0 ]; then + echo "$usage" + exit 1 +fi + +if [ -z "$1" ]; then + echo "You need to specify the port to forward" >&2 + echo "$usage" + exit 1 +fi + +if [ "$#" -ne 0 ]; then + while [ "$#" -gt 0 ] + do + case "$1" in + -h|--help) + echo "$usage" + exit 0 + ;; + -f|--foreground) + foreground="true" + ;; + -s|--stop) + stop="true" + ;; + -e|--environment) + environment="$2" + ;; + -q|--quiet) + quiet="true" + ;; + --) + break + ;; + -*) + echo "Invalid option '$1'. Use --help to see the valid options" >&2 + exit 1 + ;; + # an option argument, continue + *) ;; + esac + shift + done +fi + +pidport() { + lsof -n -i4TCP:$1 | grep --exclude-dir={.bzr,CVS,.git,.hg,.svn} LISTEN +} + +# Check if port contains ":", if so we should split +if [[ $port == *":"* ]]; then + # Split by : + ports=(${port//:/ }) + if [[ ${#ports[@]} != 2 ]]; then + if [[ $quiet == "false" ]]; then + echo "Port forwarding should be defined as hostport:targetport, for example: 8090:8080" + fi + exit 1 + fi + + + hostport=${ports[0]} + port=${ports[1]} +fi + + +if [[ ${stop} == "true" ]]; then + result=`pidport $hostport` + + if [ -z "${result}" ]; then + if [[ $quiet == "false" ]]; then + echo "Port $hostport is not forwarded, cannot stop" + fi + exit 1 + fi + + process=`echo "${result}" | awk '{ print $1 }'` + if [[ $process != "ssh" ]]; then + if [[ $quiet == "false" ]]; then + echo "Port $hostport is bound by process ${process} and not by docker-machine, won't stop" + fi + exit 1 + fi + + pid=`echo "${result}" | awk '{ print $2 }'` && + kill $pid && + echo "Stopped port forwarding for $hostport" +else + docker-machine ssh $environment `if [[ ${foreground} == "false" ]]; then echo "-f -N"; fi` -L $hostport:localhost:$port && + if [[ $quiet == "false" ]] && [[ $foreground == "false" ]]; then + printf "Forwarding port $port" + if [[ $hostport -ne $port ]]; then + printf " to host port $hostport" + fi + echo " in docker-machine environment $environment." + fi +fi diff --git a/demos/workflow/scripts/swarm-node-vbox-setup.sh b/demos/workflow/scripts/swarm-node-vbox-setup.sh new file mode 100755 index 00000000..b40bda76 --- /dev/null +++ b/demos/workflow/scripts/swarm-node-vbox-setup.sh @@ -0,0 +1,67 @@ +#!/bin/bash + +# Swarm mode using Docker Machine + +managers=1 +workers=2 + +# create manager machines +echo "======> Creating $managers manager machines ..."; +for node in $(seq 1 $managers); +do + echo "======> Creating manager$node machine ..."; + docker-machine create -d virtualbox manager$node; +done + +# create worker machines +echo "======> Creating $workers worker machines ..."; +for node in $(seq 1 $workers); +do + echo "======> Creating worker$node machine ..."; + docker-machine create -d virtualbox worker$node; +done + +# list all machines +docker-machine ls + +# initialize swarm mode and create a manager +echo "======> Initializing first swarm manager ..." +docker-machine ssh manager1 "docker swarm init --listen-addr $(docker-machine ip manager1) --advertise-addr $(docker-machine ip manager1)" + +# get manager and worker tokens +export manager_token=`docker-machine ssh manager1 "docker swarm join-token manager -q"` +export worker_token=`docker-machine ssh manager1 "docker swarm join-token worker -q"` + +echo "manager_token: $manager_token" +echo "worker_token: $worker_token" + +# other masters join swarm +for node in $(seq 2 $managers); +do + echo "======> manager$node joining swarm as manager ..." + docker-machine ssh manager$node \ + "docker swarm join \ + --token $manager_token \ + --listen-addr $(docker-machine ip manager$node) \ + --advertise-addr $(docker-machine ip manager$node) \ + $(docker-machine ip manager1)" +done + +# show members of swarm +docker-machine ssh manager1 "docker node ls" + +# workers join swarm +for node in $(seq 1 $workers); +do + echo "======> worker$node joining swarm as worker ..." + docker-machine ssh worker$node \ + "docker swarm join \ + --token $worker_token \ + --listen-addr $(docker-machine ip worker$node) \ + --advertise-addr $(docker-machine ip worker$node) \ + $(docker-machine ip manager1)" +done + +# show members of swarm +docker-machine ssh manager1 "docker node ls" + diff --git a/demos/workflow/scripts/swarm-node-vbox-teardown.sh b/demos/workflow/scripts/swarm-node-vbox-teardown.sh new file mode 100755 index 00000000..1ae83ac8 --- /dev/null +++ b/demos/workflow/scripts/swarm-node-vbox-teardown.sh @@ -0,0 +1,7 @@ +#!/bin/bash +### Warning: This will remove all docker machines running ### +# Stop machines +docker-machine stop $(docker-machine ls -q) + +# remove machines +docker-machine rm $(docker-machine ls -q) diff --git a/demos/workflow/sidecar/Dockerfile b/demos/workflow/sidecar/Dockerfile new file mode 100644 index 00000000..6614ce91 --- /dev/null +++ b/demos/workflow/sidecar/Dockerfile @@ -0,0 +1,12 @@ +FROM continuumio/miniconda3 +MAINTAINER Manuel Guidon + +RUN conda install flask plotly pymongo +RUN conda install -c conda-forge celery + +RUN pip install docker + +EXPOSE 8000 + +WORKDIR /work +ENTRYPOINT celery -A sidecar worker -c 1 --loglevel=info diff --git a/demos/workflow/sidecar/Dockerfile-prod b/demos/workflow/sidecar/Dockerfile-prod new file mode 100644 index 00000000..ac71c7da --- /dev/null +++ b/demos/workflow/sidecar/Dockerfile-prod @@ -0,0 +1,17 @@ +FROM continuumio/miniconda3 +MAINTAINER Manuel Guidon + +RUN conda install flask plotly pymongo +RUN conda install -c conda-forge celery + +RUN pip install docker + +EXPOSE 8000 + +WORKDIR /work +ADD *.py /work/ + +ENTRYPOINT celery -A sidecar worker -c 1 --loglevel=info + + + diff --git a/demos/workflow/sidecar/README.md b/demos/workflow/sidecar/README.md new file mode 100644 index 00000000..20f72899 --- /dev/null +++ b/demos/workflow/sidecar/README.md @@ -0,0 +1,31 @@ +# Sidecar: Use sidecar container to control computaional service + + +Overview +======== + +Sidecar is an experimental project for implenting the sidecar pattern +for the control of computational services + +Building Services +================= +To build the sidecar use + +``` +$ cd sidecar +$ docker-compose up +``` + +The solver can be build using + +``` +$ cd solver +$ make build +``` + + +APIs and Documentation +====================== + +## Sidecar (port 8000) + diff --git a/demos/workflow/sidecar/sidecar.py b/demos/workflow/sidecar/sidecar.py new file mode 100644 index 00000000..7530aab8 --- /dev/null +++ b/demos/workflow/sidecar/sidecar.py @@ -0,0 +1,191 @@ +import json +import hashlib +import docker +import os +import sys +import time +import shutil +import uuid +from celery import Celery +from concurrent.futures import ThreadPoolExecutor +from pymongo import MongoClient +import gridfs + +env=os.environ +CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','amqp://z43:z43@rabbit:5672'), +CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','rpc://') + +celery= Celery('tasks', + broker=CELERY_BROKER_URL, + backend=CELERY_RESULT_BACKEND) + + +io_dirs = {} +buddy = None +buddy_image ="" +buddy_image = 'solver' +job_id = "" + +def delete_contents(folder): + for the_file in os.listdir(folder): + file_path = os.path.join(folder, the_file) + try: + if os.path.isfile(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): shutil.rmtree(file_path) + except Exception as e: + print(e) + +def create_directories(): + global io_dirs + global job_id + job_id = str(uuid.uuid4()) + for d in ['input', 'output', 'log']: + dir = os.path.join("/", d, job_id) + io_dirs[d] = dir + if not os.path.exists(dir): + os.makedirs(dir) + else: + delete_contents(dir) + + +def parse_input_data(data): + global io_dirs + for d in data: + if "type" in d and d["type"] == "url": + r = requests.get(d["url"]) + filename = os.path.join(io_dirs['input'], d["name"]) + with open(filename, 'wb') as f: + f.write(r.content) + filename = os.path.join(io_dirs['input'], 'input.json') + with open(filename, 'w') as f: + f.write(json.dumps(data)) + +def fetch_container(data): + global buddy_image + buddy_name = data['name'] + buddy_tag = data['tag'] + client = docker.from_env(version='auto') + client.login(registry="masu.speag.com/v2", username="z43", password="z43") + img = client.images.pull(buddy_name, tag=buddy_tag) + buddy_image = buddy_name + ":" + buddy_tag + + +def prepare_input_and_container(data): + if 'input' in data: + parse_input_data(data['input']) + + if 'container' in data: + fetch_container(data['container']) + +def dump_log(): + global buddy_image + +def start_container(name, stage, io_env): + client = docker.from_env(version='auto') + buddy = client.containers.run(buddy_image, "run", + detach=False, remove=True, + volumes = {'workflow_input' : {'bind' : '/input'}, + 'workflow_output' : {'bind' : '/output'}, + 'workflow_log' : {'bind' : '/log'}}, + environment=io_env) + + # buddy.remove() + # hash output + output_hash = hash_job_output() + + store_job_output(output_hash) + + return output_hash + +def hash_job_output(): + output_hash = hashlib.sha256() + directory = io_dirs['output'] + + if not os.path.exists (directory): + return -1 + + try: + for root, dirs, files in os.walk(directory): + for names in files: + filepath = os.path.join(root,names) + try: + f1 = open(filepath, 'rb') + except: + # You can't open the file for some reason + f1.close() + continue + + while 1: + # Read file in as little chunks + buf = f1.read(4096) + if not buf : break + output_hash.update(buf) + f1.close() + except: + import traceback + # Print the stack traceback + traceback.print_exc() + return -2 + + return output_hash.hexdigest() + +def store_job_output(output_hash): + db_client = MongoClient("mongodb://database:27017/") + output_database = db_client.output_database + output_collections = output_database.output_collections + file_db = db_client.file_db + fs = gridfs.GridFS(file_db) + directory = io_dirs['output'] + data = {} + if not os.path.exists (directory): + return + try: + output_file_list = [] + ids = [] + for root, dirs, files in os.walk(directory): + for names in files: + filepath = os.path.join(root,names) + file_id = fs.put(open(filepath,'rb')) + ids.append(file_id) + with open(filepath, 'rb') as f: + file_data = f.read() + current = { 'filename' : names, 'contents' : file_data } + output_file_list.append(current) + + data["output"] = output_file_list + data["_hash"] = output_hash + data["ids"] = ids + + output_collections.insert_one(data) + + except: + import traceback + # Print the stack traceback + traceback.print_exc() + return -2 + +def do_run(data): + global buddy_image + global job_id + # add files if any and dump json + + prepare_input_and_container(data) + io_env = [] + io_env.append("INPUT_FOLDER=/input/"+job_id) + io_env.append("OUTPUT_FOLDER=/output/"+job_id) + io_env.append("LOG_FOLDER=/log/"+job_id) + + + return start_container(buddy_image, "run", io_env) + +@celery.task(name='mytasks.add') +def add(x, y): + time.sleep(5) # lets sleep for a while before doing the gigantic addition task! + return x + y + +@celery.task(name='mytasks.run') +def run(data): + create_directories() + return str(do_run(data)) + diff --git a/demos/workflow/solver/Dockerfile b/demos/workflow/solver/Dockerfile new file mode 100644 index 00000000..86acea14 --- /dev/null +++ b/demos/workflow/solver/Dockerfile @@ -0,0 +1,17 @@ +FROM alpine + +MAINTAINER Manuel guidon + +RUN apk add --no-cache g++ bash jq + +WORKDIR /work + +ADD ./code /work +ADD ./simcore.io /simcore.io +RUN chmod +x /simcore.io/* + +ENV PATH="/simcore.io:${PATH}" + +RUN gcc -c -fPIC -lm tinyexpr.c -o libtiny.o +RUN g++ -std=c++11 -o test main.cpp libtiny.o +RUN rm *.cpp *.c *.h diff --git a/demos/workflow/solver/build-scripts/build.py b/demos/workflow/solver/build-scripts/build.py new file mode 100644 index 00000000..c007c8a3 --- /dev/null +++ b/demos/workflow/solver/build-scripts/build.py @@ -0,0 +1,66 @@ +import sys, os +from optparse import OptionParser +import docker +import json + +dir_path = os.path.dirname(os.path.realpath(__file__)) +parent_path = os.path.abspath(os.path.join(dir_path, os.pardir)) +path, folder_name = os.path.split(parent_path) + +def main(argv): + parser = OptionParser() + + parser.add_option( + "-i", "--imagename", dest="imagename", help="name for the image", default=folder_name) + + parser.add_option( + "-v", "--version", dest="version", help="version for the image", default='') + + parser.add_option( + "-r", "--registry", dest="registry", help="docker registry to use", default='') + + parser.add_option( + "-n", "--namespace", dest="namespace", help="which namespace to use", default="") + + parser.add_option( + "-p", "--publish", action="store_true", dest="publish", help="publish in registry") + + + (options, args) = parser.parse_args(sys.argv) + + registry = "" + namespace = "" + image_name = "" + version = "" + + _settings = {} + _input = {} + _output = {} + + labels = {} + for f in ["settings", "input", "output"]: + json_file = os.path.join(dir_path,''.join((f,".json"))) + with open(json_file) as json_data: + label_dict = json.load(json_data) + labels["io.simcore."+f] = str(label_dict[f]) + + tag = '' + if options.registry: + tag = options.registry + "/" + if options.namespace: + tag = tag + options.namespace + "/" + tag = tag + options.imagename + if options.version: + tag = tag + ":" + options.version + client = docker.from_env(version='auto') + + client.images.build(path=parent_path, tag=tag, labels=labels) + if options.publish: + client.login(registry=options.registry, username="z43", password="z43") + for line in client.api.push(tag, stream=True): + print line + + +if __name__ == "__main__": + main(sys.argv[1:]) + diff --git a/demos/workflow/solver/build-scripts/input.json b/demos/workflow/solver/build-scripts/input.json new file mode 100644 index 00000000..6092df5b --- /dev/null +++ b/demos/workflow/solver/build-scripts/input.json @@ -0,0 +1,14 @@ +{ + "input": [ + { + "name": "N", + "value": 10 , + "type" : "int" + }, + { + "name": "inputfile", + "url": "https://outbox.itis.ethz.ch/guidon/whassup.txt", + "type" : "url" + } + ] +} diff --git a/demos/workflow/solver/build-scripts/output.json b/demos/workflow/solver/build-scripts/output.json new file mode 100644 index 00000000..874df61c --- /dev/null +++ b/demos/workflow/solver/build-scripts/output.json @@ -0,0 +1,4 @@ +{ + "output": [ + ] +} diff --git a/demos/workflow/solver/build-scripts/settings.json b/demos/workflow/solver/build-scripts/settings.json new file mode 100644 index 00000000..5dbe2960 --- /dev/null +++ b/demos/workflow/solver/build-scripts/settings.json @@ -0,0 +1,16 @@ +{ + "settings": [ + { + "name": "N", + "text": "Number of iterations", + "type": "int", + "value":5 + }, + { + "name": "inputfile", + "text": "Inputfile for solver", + "type": "url", + "value": "" + } + ] +} diff --git a/demos/workflow/solver/code/libtiny.o b/demos/workflow/solver/code/libtiny.o new file mode 100644 index 00000000..bb600ddc Binary files /dev/null and b/demos/workflow/solver/code/libtiny.o differ diff --git a/demos/workflow/solver/code/main.cpp b/demos/workflow/solver/code/main.cpp new file mode 100644 index 00000000..3940bdcf --- /dev/null +++ b/demos/workflow/solver/code/main.cpp @@ -0,0 +1,71 @@ +extern "C" { + #include "tinyexpr.h" +} + +#include +#include +#include +#include +#include +#include +#include + + +int main(int argc, char* argv[]) +{ + int N = 100; + double xmin=0.f; + double xmax=1.f; + std::string filename = "output.dat"; + std::string func = "sin(x)"; + double x; + te_variable vars[] = {{"x", &x}}; + int err; + + if (argc>1) + { + xmin = std::atof(argv[1]); + } + if (argc>2) + { + xmax = std::atof(argv[2]); + } + if (argc>3) + { + N = std::atoi(argv[3]); + } + if (argc>4) + { + func = std::string(argv[4]); + } + if (argc>5) + { + filename = std::string(argv[5]); + } + + std::cout <(N); + + std::ofstream output; + output.open(filename); + for(int i=0; i<=N; i++) + { + x = xmin + static_cast(i)*dx; + double y = te_eval(expr); + double p = static_cast(i) / static_cast(N) * 100.f; + output << x << "\t" << y << std::endl; + std::cout << "Progress: " << p << " %" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + output.close(); + + return 0; +} diff --git a/demos/workflow/solver/code/tinyexpr.c b/demos/workflow/solver/code/tinyexpr.c new file mode 100755 index 00000000..91a1848a --- /dev/null +++ b/demos/workflow/solver/code/tinyexpr.c @@ -0,0 +1,653 @@ +/* + * TINYEXPR - Tiny recursive descent parser and evaluation engine in C + * + * Copyright (c) 2015, 2016 Lewis Van Winkle + * + * http://CodePlea.com + * + * This software is provided 'as-is', without any express or implied + * warranty. In no event will the authors be held liable for any damages + * arising from the use of this software. + * + * Permission is granted to anyone to use this software for any purpose, + * including commercial applications, and to alter it and redistribute it + * freely, subject to the following restrictions: + * + * 1. The origin of this software must not be misrepresented; you must not + * claim that you wrote the original software. If you use this software + * in a product, an acknowledgement in the product documentation would be + * appreciated but is not required. + * 2. Altered source versions must be plainly marked as such, and must not be + * misrepresented as being the original software. + * 3. This notice may not be removed or altered from any source distribution. + */ + +/* COMPILE TIME OPTIONS */ + +/* Exponentiation associativity: +For a^b^c = (a^b)^c and -a^b = (-a)^b do nothing. +For a^b^c = a^(b^c) and -a^b = -(a^b) uncomment the next line.*/ +/* #define TE_POW_FROM_RIGHT */ + +/* Logarithms +For log = base 10 log do nothing +For log = natural log uncomment the next line. */ +/* #define TE_NAT_LOG */ + +#include "tinyexpr.h" +#include +#include +#include +#include +#include + +#ifndef NAN +#define NAN (0.0/0.0) +#endif + +#ifndef INFINITY +#define INFINITY (1.0/0.0) +#endif + + +typedef double (*te_fun2)(double, double); + +enum { + TOK_NULL = TE_CLOSURE7+1, TOK_ERROR, TOK_END, TOK_SEP, + TOK_OPEN, TOK_CLOSE, TOK_NUMBER, TOK_VARIABLE, TOK_INFIX +}; + + +enum {TE_CONSTANT = 1}; + + +typedef struct state { + const char *start; + const char *next; + int type; + union {double value; const double *bound; const void *function;}; + void *context; + + const te_variable *lookup; + int lookup_len; +} state; + + +#define TYPE_MASK(TYPE) ((TYPE)&0x0000001F) + +#define IS_PURE(TYPE) (((TYPE) & TE_FLAG_PURE) != 0) +#define IS_FUNCTION(TYPE) (((TYPE) & TE_FUNCTION0) != 0) +#define IS_CLOSURE(TYPE) (((TYPE) & TE_CLOSURE0) != 0) +#define ARITY(TYPE) ( ((TYPE) & (TE_FUNCTION0 | TE_CLOSURE0)) ? ((TYPE) & 0x00000007) : 0 ) +#define NEW_EXPR(type, ...) new_expr((type), (const te_expr*[]){__VA_ARGS__}) + +static te_expr *new_expr(const int type, const te_expr *parameters[]) { + const int arity = ARITY(type); + const int psize = sizeof(void*) * arity; + const int size = (sizeof(te_expr) - sizeof(void*)) + psize + (IS_CLOSURE(type) ? sizeof(void*) : 0); + te_expr *ret = malloc(size); + memset(ret, 0, size); + if (arity && parameters) { + memcpy(ret->parameters, parameters, psize); + } + ret->type = type; + ret->bound = 0; + return ret; +} + + +void te_free_parameters(te_expr *n) { + if (!n) return; + switch (TYPE_MASK(n->type)) { + case TE_FUNCTION7: case TE_CLOSURE7: te_free(n->parameters[6]); + case TE_FUNCTION6: case TE_CLOSURE6: te_free(n->parameters[5]); + case TE_FUNCTION5: case TE_CLOSURE5: te_free(n->parameters[4]); + case TE_FUNCTION4: case TE_CLOSURE4: te_free(n->parameters[3]); + case TE_FUNCTION3: case TE_CLOSURE3: te_free(n->parameters[2]); + case TE_FUNCTION2: case TE_CLOSURE2: te_free(n->parameters[1]); + case TE_FUNCTION1: case TE_CLOSURE1: te_free(n->parameters[0]); + } +} + + +void te_free(te_expr *n) { + if (!n) return; + te_free_parameters(n); + free(n); +} + + +static double pi() {return 3.14159265358979323846;} +static double e() {return 2.71828182845904523536;} +static double fac(double a) {/* simplest version of fac */ + if (a < 0.0) + return NAN; + if (a > UINT_MAX) + return INFINITY; + unsigned int ua = (unsigned int)(a); + unsigned long int result = 1, i; + for (i = 1; i <= ua; i++) { + if (i > ULONG_MAX / result) + return INFINITY; + result *= i; + } + return (double)result; +} +static double ncr(double n, double r) { + if (n < 0.0 || r < 0.0 || n < r) return NAN; + if (n > UINT_MAX || r > UINT_MAX) return INFINITY; + unsigned long int un = (unsigned int)(n), ur = (unsigned int)(r), i; + unsigned long int result = 1; + if (ur > un / 2) ur = un - ur; + for (i = 1; i <= ur; i++) { + if (result > ULONG_MAX / (un - ur + i)) + return INFINITY; + result *= un - ur + i; + result /= i; + } + return result; +} +static double npr(double n, double r) {return ncr(n, r) * fac(r);} + +static const te_variable functions[] = { + /* must be in alphabetical order */ + {"abs", fabs, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"acos", acos, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"asin", asin, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"atan", atan, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"atan2", atan2, TE_FUNCTION2 | TE_FLAG_PURE, 0}, + {"ceil", ceil, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"cos", cos, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"cosh", cosh, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"e", e, TE_FUNCTION0 | TE_FLAG_PURE, 0}, + {"exp", exp, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"fac", fac, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"floor", floor, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"ln", log, TE_FUNCTION1 | TE_FLAG_PURE, 0}, +#ifdef TE_NAT_LOG + {"log", log, TE_FUNCTION1 | TE_FLAG_PURE, 0}, +#else + {"log", log10, TE_FUNCTION1 | TE_FLAG_PURE, 0}, +#endif + {"log10", log10, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"ncr", ncr, TE_FUNCTION2 | TE_FLAG_PURE, 0}, + {"npr", npr, TE_FUNCTION2 | TE_FLAG_PURE, 0}, + {"pi", pi, TE_FUNCTION0 | TE_FLAG_PURE, 0}, + {"pow", pow, TE_FUNCTION2 | TE_FLAG_PURE, 0}, + {"sin", sin, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"sinh", sinh, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"sqrt", sqrt, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"tan", tan, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {"tanh", tanh, TE_FUNCTION1 | TE_FLAG_PURE, 0}, + {0, 0, 0, 0} +}; + +static const te_variable *find_builtin(const char *name, int len) { + int imin = 0; + int imax = sizeof(functions) / sizeof(te_variable) - 2; + + /*Binary search.*/ + while (imax >= imin) { + const int i = (imin + ((imax-imin)/2)); + int c = strncmp(name, functions[i].name, len); + if (!c) c = '\0' - functions[i].name[len]; + if (c == 0) { + return functions + i; + } else if (c > 0) { + imin = i + 1; + } else { + imax = i - 1; + } + } + + return 0; +} + +static const te_variable *find_lookup(const state *s, const char *name, int len) { + int iters; + const te_variable *var; + if (!s->lookup) return 0; + + for (var = s->lookup, iters = s->lookup_len; iters; ++var, --iters) { + if (strncmp(name, var->name, len) == 0 && var->name[len] == '\0') { + return var; + } + } + return 0; +} + + + +static double add(double a, double b) {return a + b;} +static double sub(double a, double b) {return a - b;} +static double mul(double a, double b) {return a * b;} +static double divide(double a, double b) {return a / b;} +static double negate(double a) {return -a;} +static double comma(double a, double b) {(void)a; return b;} + + +void next_token(state *s) { + s->type = TOK_NULL; + + do { + + if (!*s->next){ + s->type = TOK_END; + return; + } + + /* Try reading a number. */ + if ((s->next[0] >= '0' && s->next[0] <= '9') || s->next[0] == '.') { + s->value = strtod(s->next, (char**)&s->next); + s->type = TOK_NUMBER; + } else { + /* Look for a variable or builtin function call. */ + if (s->next[0] >= 'a' && s->next[0] <= 'z') { + const char *start; + start = s->next; + while ((s->next[0] >= 'a' && s->next[0] <= 'z') || (s->next[0] >= '0' && s->next[0] <= '9') || (s->next[0] == '_')) s->next++; + + const te_variable *var = find_lookup(s, start, s->next - start); + if (!var) var = find_builtin(start, s->next - start); + + if (!var) { + s->type = TOK_ERROR; + } else { + switch(TYPE_MASK(var->type)) + { + case TE_VARIABLE: + s->type = TOK_VARIABLE; + s->bound = var->address; + break; + + case TE_CLOSURE0: case TE_CLOSURE1: case TE_CLOSURE2: case TE_CLOSURE3: + case TE_CLOSURE4: case TE_CLOSURE5: case TE_CLOSURE6: case TE_CLOSURE7: + s->context = var->context; + + case TE_FUNCTION0: case TE_FUNCTION1: case TE_FUNCTION2: case TE_FUNCTION3: + case TE_FUNCTION4: case TE_FUNCTION5: case TE_FUNCTION6: case TE_FUNCTION7: + s->type = var->type; + s->function = var->address; + break; + } + } + + } else { + /* Look for an operator or special character. */ + switch (s->next++[0]) { + case '+': s->type = TOK_INFIX; s->function = add; break; + case '-': s->type = TOK_INFIX; s->function = sub; break; + case '*': s->type = TOK_INFIX; s->function = mul; break; + case '/': s->type = TOK_INFIX; s->function = divide; break; + case '^': s->type = TOK_INFIX; s->function = pow; break; + case '%': s->type = TOK_INFIX; s->function = fmod; break; + case '(': s->type = TOK_OPEN; break; + case ')': s->type = TOK_CLOSE; break; + case ',': s->type = TOK_SEP; break; + case ' ': case '\t': case '\n': case '\r': break; + default: s->type = TOK_ERROR; break; + } + } + } + } while (s->type == TOK_NULL); +} + + +static te_expr *list(state *s); +static te_expr *expr(state *s); +static te_expr *power(state *s); + +static te_expr *base(state *s) { + /* = | | {"(" ")"} | | "(" {"," } ")" | "(" ")" */ + te_expr *ret; + int arity; + + switch (TYPE_MASK(s->type)) { + case TOK_NUMBER: + ret = new_expr(TE_CONSTANT, 0); + ret->value = s->value; + next_token(s); + break; + + case TOK_VARIABLE: + ret = new_expr(TE_VARIABLE, 0); + ret->bound = s->bound; + next_token(s); + break; + + case TE_FUNCTION0: + case TE_CLOSURE0: + ret = new_expr(s->type, 0); + ret->function = s->function; + if (IS_CLOSURE(s->type)) ret->parameters[0] = s->context; + next_token(s); + if (s->type == TOK_OPEN) { + next_token(s); + if (s->type != TOK_CLOSE) { + s->type = TOK_ERROR; + } else { + next_token(s); + } + } + break; + + case TE_FUNCTION1: + case TE_CLOSURE1: + ret = new_expr(s->type, 0); + ret->function = s->function; + if (IS_CLOSURE(s->type)) ret->parameters[1] = s->context; + next_token(s); + ret->parameters[0] = power(s); + break; + + case TE_FUNCTION2: case TE_FUNCTION3: case TE_FUNCTION4: + case TE_FUNCTION5: case TE_FUNCTION6: case TE_FUNCTION7: + case TE_CLOSURE2: case TE_CLOSURE3: case TE_CLOSURE4: + case TE_CLOSURE5: case TE_CLOSURE6: case TE_CLOSURE7: + arity = ARITY(s->type); + + ret = new_expr(s->type, 0); + ret->function = s->function; + if (IS_CLOSURE(s->type)) ret->parameters[arity] = s->context; + next_token(s); + + if (s->type != TOK_OPEN) { + s->type = TOK_ERROR; + } else { + int i; + for(i = 0; i < arity; i++) { + next_token(s); + ret->parameters[i] = expr(s); + if(s->type != TOK_SEP) { + break; + } + } + if(s->type != TOK_CLOSE || i != arity - 1) { + s->type = TOK_ERROR; + } else { + next_token(s); + } + } + + break; + + case TOK_OPEN: + next_token(s); + ret = list(s); + if (s->type != TOK_CLOSE) { + s->type = TOK_ERROR; + } else { + next_token(s); + } + break; + + default: + ret = new_expr(0, 0); + s->type = TOK_ERROR; + ret->value = NAN; + break; + } + + return ret; +} + + +static te_expr *power(state *s) { + /* = {("-" | "+")} */ + int sign = 1; + while (s->type == TOK_INFIX && (s->function == add || s->function == sub)) { + if (s->function == sub) sign = -sign; + next_token(s); + } + + te_expr *ret; + + if (sign == 1) { + ret = base(s); + } else { + ret = NEW_EXPR(TE_FUNCTION1 | TE_FLAG_PURE, base(s)); + ret->function = negate; + } + + return ret; +} + +#ifdef TE_POW_FROM_RIGHT +static te_expr *factor(state *s) { + /* = {"^" } */ + te_expr *ret = power(s); + + int neg = 0; + te_expr *insertion = 0; + + if (ret->type == (TE_FUNCTION1 | TE_FLAG_PURE) && ret->function == negate) { + te_expr *se = ret->parameters[0]; + free(ret); + ret = se; + neg = 1; + } + + while (s->type == TOK_INFIX && (s->function == pow)) { + te_fun2 t = s->function; + next_token(s); + + if (insertion) { + /* Make exponentiation go right-to-left. */ + te_expr *insert = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, insertion->parameters[1], power(s)); + insert->function = t; + insertion->parameters[1] = insert; + insertion = insert; + } else { + ret = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, ret, power(s)); + ret->function = t; + insertion = ret; + } + } + + if (neg) { + ret = NEW_EXPR(TE_FUNCTION1 | TE_FLAG_PURE, ret); + ret->function = negate; + } + + return ret; +} +#else +static te_expr *factor(state *s) { + /* = {"^" } */ + te_expr *ret = power(s); + + while (s->type == TOK_INFIX && (s->function == pow)) { + te_fun2 t = s->function; + next_token(s); + ret = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, ret, power(s)); + ret->function = t; + } + + return ret; +} +#endif + + + +static te_expr *term(state *s) { + /* = {("*" | "/" | "%") } */ + te_expr *ret = factor(s); + + while (s->type == TOK_INFIX && (s->function == mul || s->function == divide || s->function == fmod)) { + te_fun2 t = s->function; + next_token(s); + ret = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, ret, factor(s)); + ret->function = t; + } + + return ret; +} + + +static te_expr *expr(state *s) { + /* = {("+" | "-") } */ + te_expr *ret = term(s); + + while (s->type == TOK_INFIX && (s->function == add || s->function == sub)) { + te_fun2 t = s->function; + next_token(s); + ret = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, ret, term(s)); + ret->function = t; + } + + return ret; +} + + +static te_expr *list(state *s) { + /* = {"," } */ + te_expr *ret = expr(s); + + while (s->type == TOK_SEP) { + next_token(s); + ret = NEW_EXPR(TE_FUNCTION2 | TE_FLAG_PURE, ret, expr(s)); + ret->function = comma; + } + + return ret; +} + + +#define TE_FUN(...) ((double(*)(__VA_ARGS__))n->function) +#define M(e) te_eval(n->parameters[e]) + + +double te_eval(const te_expr *n) { + if (!n) return NAN; + + switch(TYPE_MASK(n->type)) { + case TE_CONSTANT: return n->value; + case TE_VARIABLE: return *n->bound; + + case TE_FUNCTION0: case TE_FUNCTION1: case TE_FUNCTION2: case TE_FUNCTION3: + case TE_FUNCTION4: case TE_FUNCTION5: case TE_FUNCTION6: case TE_FUNCTION7: + switch(ARITY(n->type)) { + case 0: return TE_FUN(void)(); + case 1: return TE_FUN(double)(M(0)); + case 2: return TE_FUN(double, double)(M(0), M(1)); + case 3: return TE_FUN(double, double, double)(M(0), M(1), M(2)); + case 4: return TE_FUN(double, double, double, double)(M(0), M(1), M(2), M(3)); + case 5: return TE_FUN(double, double, double, double, double)(M(0), M(1), M(2), M(3), M(4)); + case 6: return TE_FUN(double, double, double, double, double, double)(M(0), M(1), M(2), M(3), M(4), M(5)); + case 7: return TE_FUN(double, double, double, double, double, double, double)(M(0), M(1), M(2), M(3), M(4), M(5), M(6)); + default: return NAN; + } + + case TE_CLOSURE0: case TE_CLOSURE1: case TE_CLOSURE2: case TE_CLOSURE3: + case TE_CLOSURE4: case TE_CLOSURE5: case TE_CLOSURE6: case TE_CLOSURE7: + switch(ARITY(n->type)) { + case 0: return TE_FUN(void*)(n->parameters[0]); + case 1: return TE_FUN(void*, double)(n->parameters[1], M(0)); + case 2: return TE_FUN(void*, double, double)(n->parameters[2], M(0), M(1)); + case 3: return TE_FUN(void*, double, double, double)(n->parameters[3], M(0), M(1), M(2)); + case 4: return TE_FUN(void*, double, double, double, double)(n->parameters[4], M(0), M(1), M(2), M(3)); + case 5: return TE_FUN(void*, double, double, double, double, double)(n->parameters[5], M(0), M(1), M(2), M(3), M(4)); + case 6: return TE_FUN(void*, double, double, double, double, double, double)(n->parameters[6], M(0), M(1), M(2), M(3), M(4), M(5)); + case 7: return TE_FUN(void*, double, double, double, double, double, double, double)(n->parameters[7], M(0), M(1), M(2), M(3), M(4), M(5), M(6)); + default: return NAN; + } + + default: return NAN; + } + +} + +#undef TE_FUN +#undef M + +static void optimize(te_expr *n) { + /* Evaluates as much as possible. */ + if (n->type == TE_CONSTANT) return; + if (n->type == TE_VARIABLE) return; + + /* Only optimize out functions flagged as pure. */ + if (IS_PURE(n->type)) { + const int arity = ARITY(n->type); + int known = 1; + int i; + for (i = 0; i < arity; ++i) { + optimize(n->parameters[i]); + if (((te_expr*)(n->parameters[i]))->type != TE_CONSTANT) { + known = 0; + } + } + if (known) { + const double value = te_eval(n); + te_free_parameters(n); + n->type = TE_CONSTANT; + n->value = value; + } + } +} + + +te_expr *te_compile(const char *expression, const te_variable *variables, int var_count, int *error) { + state s; + s.start = s.next = expression; + s.lookup = variables; + s.lookup_len = var_count; + + next_token(&s); + te_expr *root = list(&s); + + if (s.type != TOK_END) { + te_free(root); + if (error) { + *error = (s.next - s.start); + if (*error == 0) *error = 1; + } + return 0; + } else { + optimize(root); + if (error) *error = 0; + return root; + } +} + + +double te_interp(const char *expression, int *error) { + te_expr *n = te_compile(expression, 0, 0, error); + double ret; + if (n) { + ret = te_eval(n); + te_free(n); + } else { + ret = NAN; + } + return ret; +} + +static void pn (const te_expr *n, int depth) { + int i, arity; + printf("%*s", depth, ""); + + switch(TYPE_MASK(n->type)) { + case TE_CONSTANT: printf("%f\n", n->value); break; + case TE_VARIABLE: printf("bound %p\n", n->bound); break; + + case TE_FUNCTION0: case TE_FUNCTION1: case TE_FUNCTION2: case TE_FUNCTION3: + case TE_FUNCTION4: case TE_FUNCTION5: case TE_FUNCTION6: case TE_FUNCTION7: + case TE_CLOSURE0: case TE_CLOSURE1: case TE_CLOSURE2: case TE_CLOSURE3: + case TE_CLOSURE4: case TE_CLOSURE5: case TE_CLOSURE6: case TE_CLOSURE7: + arity = ARITY(n->type); + printf("f%d", arity); + for(i = 0; i < arity; i++) { + printf(" %p", n->parameters[i]); + } + printf("\n"); + for(i = 0; i < arity; i++) { + pn(n->parameters[i], depth + 1); + } + break; + } +} + + +void te_print(const te_expr *n) { + pn(n, 0); +} diff --git a/demos/workflow/solver/code/tinyexpr.h b/demos/workflow/solver/code/tinyexpr.h new file mode 100644 index 00000000..5d0dc0c8 --- /dev/null +++ b/demos/workflow/solver/code/tinyexpr.h @@ -0,0 +1,86 @@ +/* + * TINYEXPR - Tiny recursive descent parser and evaluation engine in C + * + * Copyright (c) 2015, 2016 Lewis Van Winkle + * + * http://CodePlea.com + * + * This software is provided 'as-is', without any express or implied + * warranty. In no event will the authors be held liable for any damages + * arising from the use of this software. + * + * Permission is granted to anyone to use this software for any purpose, + * including commercial applications, and to alter it and redistribute it + * freely, subject to the following restrictions: + * + * 1. The origin of this software must not be misrepresented; you must not + * claim that you wrote the original software. If you use this software + * in a product, an acknowledgement in the product documentation would be + * appreciated but is not required. + * 2. Altered source versions must be plainly marked as such, and must not be + * misrepresented as being the original software. + * 3. This notice may not be removed or altered from any source distribution. + */ + +#ifndef __TINYEXPR_H__ +#define __TINYEXPR_H__ + + +#ifdef __cplusplus +extern "C" { +#endif + + + +typedef struct te_expr { + int type; + union {double value; const double *bound; const void *function;}; + void *parameters[1]; +} te_expr; + + +enum { + TE_VARIABLE = 0, + + TE_FUNCTION0 = 8, TE_FUNCTION1, TE_FUNCTION2, TE_FUNCTION3, + TE_FUNCTION4, TE_FUNCTION5, TE_FUNCTION6, TE_FUNCTION7, + + TE_CLOSURE0 = 16, TE_CLOSURE1, TE_CLOSURE2, TE_CLOSURE3, + TE_CLOSURE4, TE_CLOSURE5, TE_CLOSURE6, TE_CLOSURE7, + + TE_FLAG_PURE = 32 +}; + +typedef struct te_variable { + const char *name; + const void *address; + int type; + void *context; +} te_variable; + + + +/* Parses the input expression, evaluates it, and frees it. */ +/* Returns NaN on error. */ +double te_interp(const char *expression, int *error); + +/* Parses the input expression and binds variables. */ +/* Returns NULL on error. */ +te_expr *te_compile(const char *expression, const te_variable *variables, int var_count, int *error); + +/* Evaluates the expression. */ +double te_eval(const te_expr *n); + +/* Prints debugging information on the syntax tree. */ +void te_print(const te_expr *n); + +/* Frees the expression. */ +/* This is safe to call on NULL pointers. */ +void te_free(te_expr *n); + + +#ifdef __cplusplus +} +#endif + +#endif /*__TINYEXPR_H__*/ diff --git a/demos/workflow/solver/simcore.io/do_postprocess b/demos/workflow/solver/simcore.io/do_postprocess new file mode 100644 index 00000000..b3917445 --- /dev/null +++ b/demos/workflow/solver/simcore.io/do_postprocess @@ -0,0 +1,2 @@ +#!/bin/bash +echo 'Running Postprocess' diff --git a/demos/workflow/solver/simcore.io/do_preprocess b/demos/workflow/solver/simcore.io/do_preprocess new file mode 100644 index 00000000..20eb2f44 --- /dev/null +++ b/demos/workflow/solver/simcore.io/do_preprocess @@ -0,0 +1,7 @@ +#!/bin/bash +arg1=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="xmin") .value') +arg2=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="xmax") .value') +arg3=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="N") .value') +echo $arg1 > $INPUT_FOLDER/input +echo $arg2 >> $INPUT_FOLDER/input +echo $arg3 >> $INPUT_FOLDER/input diff --git a/demos/workflow/solver/simcore.io/do_process b/demos/workflow/solver/simcore.io/do_process new file mode 100644 index 00000000..3f2f31cd --- /dev/null +++ b/demos/workflow/solver/simcore.io/do_process @@ -0,0 +1,10 @@ +#!/bin/bash +arg1=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="xmin") .value') +arg2=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="xmax") .value') +arg3=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="N") .value') +arg4=$(cat $INPUT_FOLDER/input.json | jq '.[] | select(.name =="func") .value') +temp="${arg4%\"}" +temp="${temp#\"}" +arg5=$OUTPUT_FOLDER/output + +./test $arg1 $arg2 $arg3 $temp $arg5 > $LOG_FOLDER/log.dat diff --git a/demos/workflow/solver/simcore.io/postprocess b/demos/workflow/solver/simcore.io/postprocess new file mode 100644 index 00000000..2033cb59 --- /dev/null +++ b/demos/workflow/solver/simcore.io/postprocess @@ -0,0 +1,2 @@ +#!/bin/bash +/bin/bash do_postprocess diff --git a/demos/workflow/solver/simcore.io/preprocess b/demos/workflow/solver/simcore.io/preprocess new file mode 100644 index 00000000..20a8fbe8 --- /dev/null +++ b/demos/workflow/solver/simcore.io/preprocess @@ -0,0 +1,2 @@ +#!/bin/bash +/bin/bash do_preprocess diff --git a/demos/workflow/solver/simcore.io/process b/demos/workflow/solver/simcore.io/process new file mode 100644 index 00000000..3af43012 --- /dev/null +++ b/demos/workflow/solver/simcore.io/process @@ -0,0 +1,2 @@ +#!/bin/bash +/bin/bash do_process diff --git a/demos/workflow/solver/simcore.io/run b/demos/workflow/solver/simcore.io/run new file mode 100644 index 00000000..64e9efc3 --- /dev/null +++ b/demos/workflow/solver/simcore.io/run @@ -0,0 +1,5 @@ +#!/bin/bash +/bin/bash do_preprocess +/bin/bash do_process +/bin/bash do_postprocess +