Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/test deployed dags #481

Merged
merged 23 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cornflow-dags/DAG/activate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def solve(**kwargs):
**kwargs
)
with dag:
if not app.notify:
notify = getattr(app, "notify", True)
if not notify:
Comment on lines +31 to +32
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change @AlejandraGalan done over your code is to make sure that this change is compatible with older versions cornflow-client just in case

t1 = PythonOperator(task_id=app.name, python_callable=solve)
else:
t1 = PythonOperator(
Expand Down
3 changes: 0 additions & 3 deletions cornflow-dags/DAG/bar_cutting/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ class BarCutting(ApplicationCore):
solution = Solution
solvers = dict(mip=MipModel, CG=ColumnGeneration)
schema = load_json(os.path.join(os.path.dirname(__file__), "./schemas/config.json"))
schema["properties"]["solver"]["enum"].append("mip.cbc")
schema["properties"]["solver"]["enum"].append("CG.cbc")

@property
def test_cases(self) -> List[Union[Dict, Tuple[Dict, Dict]]]:

options_instance = ["data/example_instance_1.json"]

options_solution = ["data/example_solution_1.json"]
Expand Down
4 changes: 2 additions & 2 deletions cornflow-dags/DAG/bar_cutting/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"properties": {
"solver": {
"type": "string",
"enum": ["mip", "CG"],
"default": "mip"
"enum": ["mip", "CG", "mip.cbc", "CG.cbc"],
"default": "mip.cbc"
},
"timeLimit": {
"type": "number"
Expand Down
12 changes: 1 addition & 11 deletions cornflow-dags/DAG/facility_location/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,7 @@ class FacilityLocation(ApplicationCore):
instance = Instance
solution = Solution
solvers = dict(Pyomo=PyomoSolver)
schema = get_empty_schema(
properties=dict(timeLimit=dict(type="number")),
solvers=list(solvers.keys()) + ["Pyomo.cbc"],
)

def get_solver(self, name: str = "Pyomo") -> Union[Type[Experiment], None]:
if "." in name:
solver, _ = name.split(".")
else:
solver = name
return self.solvers.get(solver)
schema = load_json(os.path.join(os.path.dirname(__file__), "./schemas/config.json"))

@property
def test_cases(self):
Expand Down
14 changes: 14 additions & 0 deletions cornflow-dags/DAG/facility_location/schemas/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://json-schema.org/schema#",
"type": "object",
"properties": {
"solver": {
"type": "string",
"enum": ["Pyomo", "Pyomo.cbc"],
"default": "Pyomo.cbc"
},
"timeLimit": {
"type": "number"
}
}
}
126 changes: 126 additions & 0 deletions cornflow-dags/DAG/run_deployed_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import json
import logging
import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.secrets.environment_variables import EnvironmentVariablesBackend
from airflow.utils.db import create_session
from cornflow_client import CornFlowApiError
from cornflow_client.airflow.dag_utilities import connect_to_cornflow

default_args = {
"owner": "baobab",
"depends_on_past": False,
"start_date": datetime(2020, 2, 1),
"email": [""],
"email_on_failure": False,
"email_on_retry": False,
"retries": -1,
"retry_delay": timedelta(minutes=1),
"catchup": False,
}

logger = logging.getLogger("airflow.task")


def run_examples(**kwargs):
with create_session() as session:
current_examples = {
var.key: json.loads(var.get_val())["instance_1"]
for var in session.query(Variable)
if "_examples" in var.key
}

current_examples = {k: v for k, v in current_examples.items() if v != {}}

cf_client = connect_to_cornflow(EnvironmentVariablesBackend())
executions = []

for key, instance in current_examples.items():
schema = key.split("z_")[1].split("_examples")[0]

try:
response = cf_client.create_instance(
data=instance, name=f"Automatic_instance_run_{schema}", schema=schema
)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Instance example for schema {schema} had an error on creation"
)
continue

instance_id = response["id"]

config = {"timeLimit": 60, "msg": True, "seconds": 60}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marioncottard I am getting a lot of weird behaviours with this part of timeLimit and seconds. Could you start preparing a PR that modifies all example DAGs to use the translation dictionaries that we had created?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay


try:
response = cf_client.create_execution(
instance_id=instance_id,
config=config,
name=f"Automatic_execution_run_{schema}",
schema=schema,
)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Execution example for schema {schema} had an error on creation"
)
continue

execution_id = response["id"]
executions.append((execution_id, schema))

limit = (len(executions) + 1) * 60
start = datetime.utcnow()

while executions and datetime.utcnow() - start < timedelta(seconds=limit):
for index, (execution, schema) in enumerate(executions):
try:
response = cf_client.get_status(execution_id=execution)
except CornFlowApiError as e:
logger.info(e)
logger.info(
f"Execution {execution} of schema {schema} had an error on status retrieval"
)
executions.pop(index)
continue

if response["state"] in (1, 2):
logger.info(
f"Execution {execution} of schema {schema} finished successfully"
)
executions.pop(index)
elif response["state"] in (-1, -2, -3, -4, -5, -6):
logger.info(f"Execution {execution} of schema {schema} failed")
executions.pop(index)
else:
continue

time.sleep(15)

if len(executions):
for execution, schema in executions:
logger.info(
f"Execution {execution} of schema {schema} could not be checked"
)

logger.info("Automatic test process finished")


dag = DAG(
"run_deployed_models",
default_args=default_args,
schedule_interval=None,
catchup=False,
)

run_examples_task = PythonOperator(
task_id="run_examples",
python_callable=run_examples,
provide_context=True,
dag=dag,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
STATUS_TIME_LIMIT,
SOLUTION_STATUS_FEASIBLE,
SOLUTION_STATUS_INFEASIBLE,
PYOMO_STOP_MAPPING
PYOMO_STOP_MAPPING,
)
from pytups import SuperDict
from two_dimension_bin_packing.core import Experiment, Solution
Expand Down Expand Up @@ -257,6 +257,7 @@ def solve(self, options):
_, solver_name = solver_name.split(".")

# Setting solver
# TODO: review solver options translation
if solver_name == "gurobi":
opt = SolverFactory(solver_name, solver_io="python")
else:
Expand All @@ -274,14 +275,12 @@ def solve(self, options):
# Check status
if status in ["error", "unknown", "warning"]:
return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_INFEASIBLE
status=termination_condition, status_sol=SOLUTION_STATUS_INFEASIBLE
)
elif status == "aborted":
if termination_condition != STATUS_TIME_LIMIT:
return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_INFEASIBLE
status=termination_condition, status_sol=SOLUTION_STATUS_INFEASIBLE
)

solution_dict = SuperDict()
Expand All @@ -304,7 +303,4 @@ def solve(self, options):
self.solution = Solution.from_dict(solution_dict)
# self.plot_solution()

return dict(
status=termination_condition,
status_sol=SOLUTION_STATUS_FEASIBLE
)
return dict(status=termination_condition, status_sol=SOLUTION_STATUS_FEASIBLE)
13 changes: 4 additions & 9 deletions cornflow-dags/DAG/update_all_schemas.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
# General imports
import importlib as il
import os
import sys

# Partial imports
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow import DAG
from airflow.utils.db import create_session
from datetime import datetime, timedelta
from typing import List

# Import from cornflow environment
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.utils.db import create_session
from cornflow_client import ApplicationCore


default_args = {
"owner": "baobab",
"depends_on_past": False,
Expand Down
9 changes: 3 additions & 6 deletions cornflow-dags/DAG/update_dag_registry.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
# Full imports


# Partial imports
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import DagModel
from airflow.operators.python import PythonOperator
from airflow.secrets.environment_variables import EnvironmentVariablesBackend
from airflow.utils.db import create_session
from cornflow_client import CornFlow
from cornflow_client.airflow.dag_utilities import connect_to_cornflow
from datetime import datetime, timedelta

from update_all_schemas import get_new_apps

default_args = {
Expand All @@ -27,7 +25,6 @@

def update_dag_registry(**kwargs):
with create_session() as session:

model_dags = [
dag
for dag in session.query(DagModel)
Expand Down
Loading
Loading