Skip to content

Commit

Permalink
Feature/test deployed dags (#481)
Browse files Browse the repository at this point in the history
* DAG to run the examples automatically

* Changed the way we check for the notification parameter

* Use getattr

* Small fixes

* Another change

* logging

* Parse json

* Don't send solver

* Added notify to rostering

* Updated path to log based on new log folder structure

* Small adjustment to run deployed dags

* Change the logging

* Small changes to dags and cornflow server so the config get extended with the default values defined.

* Deleted unused print

* Changes to dags to be able to be solved automatically

* Changed config schema of facility_location

* Small change to facility location

* Change time limit

* Added seconds

* Small change to schema generator to debug

* changed print for click.echo

* Commented out the schema from models cli command and all its related code

* Added more unit testing for the executions.
Deleted unreacheble code on executions
  • Loading branch information
ggsdc authored Oct 10, 2023
1 parent b0f0195 commit 8590599
Show file tree
Hide file tree
Showing 22 changed files with 704 additions and 261 deletions.
3 changes: 2 additions & 1 deletion cornflow-dags/DAG/activate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def solve(**kwargs):
**kwargs
)
with dag:
if not app.notify:
notify = getattr(app, "notify", True)
if not notify:
t1 = PythonOperator(task_id=app.name, python_callable=solve)
else:
t1 = PythonOperator(
Expand Down
10 changes: 0 additions & 10 deletions cornflow-dags/DAG/bar_cutting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ class BarCutting(ApplicationCore):
solution = Solution
solvers = dict(mip=MipModel, CG=ColumnGeneration)
schema = load_json(os.path.join(os.path.dirname(__file__), "./schemas/config.json"))
schema["properties"]["solver"]["enum"].append("mip.cbc")
schema["properties"]["solver"]["enum"].append("CG.cbc")

@property
def test_cases(self) -> List[Union[Dict, Tuple[Dict, Dict]]]:

options_instance = ["data/example_instance_1.json"]

options_solution = ["data/example_solution_1.json"]
Expand All @@ -44,10 +41,3 @@ def test_cases(self) -> List[Union[Dict, Tuple[Dict, Dict]]]:
)
for i in range(len(options_instance))
]

def get_solver(self, name: str = "mip") -> Union[Type[Experiment], None]:
if "." in name:
solver, _ = name.split(".")
else:
solver = name
return self.solvers.get(solver)
4 changes: 2 additions & 2 deletions cornflow-dags/DAG/bar_cutting/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"properties": {
"solver": {
"type": "string",
"enum": ["mip", "CG"],
"default": "mip"
"enum": ["mip", "CG", "mip.cbc", "CG.cbc"],
"default": "mip.cbc"
},
"timeLimit": {
"type": "number"
Expand Down
12 changes: 1 addition & 11 deletions cornflow-dags/DAG/facility_location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,7 @@ class FacilityLocation(ApplicationCore):
instance = Instance
solution = Solution
solvers = dict(Pyomo=PyomoSolver)
schema = get_empty_schema(
properties=dict(timeLimit=dict(type="number")),
solvers=list(solvers.keys()) + ["Pyomo.cbc"],
)

def get_solver(self, name: str = "Pyomo") -> Union[Type[Experiment], None]:
if "." in name:
solver, _ = name.split(".")
else:
solver = name
return self.solvers.get(solver)
schema = load_json(os.path.join(os.path.dirname(__file__), "./schemas/config.json"))

@property
def test_cases(self):
Expand Down
14 changes: 14 additions & 0 deletions cornflow-dags/DAG/facility_location/schemas/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"solver": {
"type": "string",
"enum": ["Pyomo", "Pyomo.cbc"],
"default": "Pyomo.cbc"
},
"timeLimit": {
"type": "number"
}
}
}
126 changes: 126 additions & 0 deletions cornflow-dags/DAG/run_deployed_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import json
import logging
import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.secrets.environment_variables import EnvironmentVariablesBackend
from airflow.utils.db import create_session
from cornflow_client import CornFlowApiError
from cornflow_client.airflow.dag_utilities import connect_to_cornflow

default_args = {
"owner": "baobab",
"depends_on_past": False,
"start_date": datetime(2020, 2, 1),
"email": [""],
"email_on_failure": False,
"email_on_retry": False,
"retries": -1,
"retry_delay": timedelta(minutes=1),
"catchup": False,
}

logger = logging.getLogger("airflow.task")


def run_examples(**kwargs):
with create_session() as session:
current_examples = {
var.key: json.loads(var.get_val())["instance_1"]
for var in session.query(Variable)
if "_examples" in var.key
}

current_examples = {k: v for k, v in current_examples.items() if v != {}}

cf_client = connect_to_cornflow(EnvironmentVariablesBackend())
executions = []

for key, instance in current_examples.items():
schema = key.split("z_")[1].split("_examples")[0]

try:
response = cf_client.create_instance(
data=instance, name=f"Automatic_instance_run_{schema}", schema=schema
)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Instance example for schema {schema} had an error on creation"
)
continue

instance_id = response["id"]

config = {"timeLimit": 60, "msg": True, "seconds": 60}

try:
response = cf_client.create_execution(
instance_id=instance_id,
config=config,
name=f"Automatic_execution_run_{schema}",
schema=schema,
)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Execution example for schema {schema} had an error on creation"
)
continue

execution_id = response["id"]
executions.append((execution_id, schema))

limit = (len(executions) + 1) * 60
start = datetime.utcnow()

while executions and datetime.utcnow() - start < timedelta(seconds=limit):
for index, (execution, schema) in enumerate(executions):
try:
response = cf_client.get_status(execution_id=execution)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Execution {execution} of schema {schema} had an error on status retrieval"
)
executions.pop(index)
continue

if response["state"] in (1, 2):
logger.info(
f"Execution {execution} of schema {schema} finished successfully"
)
executions.pop(index)
elif response["state"] in (-1, -2, -3, -4, -5, -6):
logger.info(f"Execution {execution} of schema {schema} failed")
executions.pop(index)
else:
continue

time.sleep(15)

if len(executions):
for execution, schema in executions:
logger.info(
f"Execution {execution} of schema {schema} could not be checked"
)

logger.info("Automatic test process finished")


dag = DAG(
"run_deployed_models",
default_args=default_args,
schedule_interval=None,
catchup=False,
)

run_examples_task = PythonOperator(
task_id="run_examples",
python_callable=run_examples,
provide_context=True,
dag=dag,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
STATUS_TIME_LIMIT,
SOLUTION_STATUS_FEASIBLE,
SOLUTION_STATUS_INFEASIBLE,
PYOMO_STOP_MAPPING
PYOMO_STOP_MAPPING,
)
from pytups import SuperDict
from two_dimension_bin_packing.core import Experiment, Solution
Expand Down Expand Up @@ -257,6 +257,7 @@ def solve(self, options):
_, solver_name = solver_name.split(".")

# Setting solver
# TODO: review solver options translation
if solver_name == "gurobi":
opt = SolverFactory(solver_name, solver_io="python")
else:
Expand All @@ -274,14 +275,12 @@ def solve(self, options):
# Check status
if status in ["error", "unknown", "warning"]:
return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_INFEASIBLE
status=termination_condition, status_sol=SOLUTION_STATUS_INFEASIBLE
)
elif status == "aborted":
if termination_condition != STATUS_TIME_LIMIT:
return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_INFEASIBLE
status=termination_condition, status_sol=SOLUTION_STATUS_INFEASIBLE
)

solution_dict = SuperDict()
Expand All @@ -304,7 +303,4 @@ def solve(self, options):
self.solution = Solution.from_dict(solution_dict)
# self.plot_solution()

return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_FEASIBLE
)
return dict(status=termination_condition, status_sol=SOLUTION_STATUS_FEASIBLE)
13 changes: 4 additions & 9 deletions cornflow-dags/DAG/update_all_schemas.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
# General imports
import importlib as il
import os
import sys

# Partial imports
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow import DAG
from airflow.utils.db import create_session
from datetime import datetime, timedelta
from typing import List

# Import from cornflow environment
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.utils.db import create_session
from cornflow_client import ApplicationCore


default_args = {
"owner": "baobab",
"depends_on_past": False,
Expand Down
9 changes: 3 additions & 6 deletions cornflow-dags/DAG/update_dag_registry.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# Full imports


# Partial imports
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import DagModel
from airflow.operators.python import PythonOperator
from airflow.secrets.environment_variables import EnvironmentVariablesBackend
from airflow.utils.db import create_session
from cornflow_client import CornFlow
from cornflow_client.airflow.dag_utilities import connect_to_cornflow
from datetime import datetime, timedelta

from update_all_schemas import get_new_apps

default_args = {
Expand All @@ -27,7 +25,6 @@

def update_dag_registry(**kwargs):
with create_session() as session:

model_dags = [
dag
for dag in session.query(DagModel)
Expand Down
Loading

0 comments on commit 8590599

Please sign in to comment.