-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/test deployed dags #481
Changes from all commits
fc59dc4
af3202b
829c079
8cfccfa
76eb206
30fc1e6
be735d0
d69f492
2de0cb6
bce45f9
1ea1b26
5ddaf4a
8c26a27
8af8806
c55446b
89d2509
3f45e63
e87144f
355ecdb
fb07a6c
a031bc1
7ebc464
1b2ab68
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"$schema": "http://json-schema.org/schema#", | ||
"type": "object", | ||
"properties": { | ||
"solver": { | ||
"type": "string", | ||
"enum": ["Pyomo", "Pyomo.cbc"], | ||
"default": "Pyomo.cbc" | ||
}, | ||
"timeLimit": { | ||
"type": "number" | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import json | ||
import logging | ||
import time | ||
from datetime import datetime, timedelta | ||
|
||
from airflow import DAG | ||
from airflow.models import Variable | ||
from airflow.operators.python import PythonOperator | ||
from airflow.secrets.environment_variables import EnvironmentVariablesBackend | ||
from airflow.utils.db import create_session | ||
from cornflow_client import CornFlowApiError | ||
from cornflow_client.airflow.dag_utilities import connect_to_cornflow | ||
|
||
default_args = { | ||
"owner": "baobab", | ||
"depends_on_past": False, | ||
"start_date": datetime(2020, 2, 1), | ||
"email": [""], | ||
"email_on_failure": False, | ||
"email_on_retry": False, | ||
"retries": -1, | ||
"retry_delay": timedelta(minutes=1), | ||
"catchup": False, | ||
} | ||
|
||
logger = logging.getLogger("airflow.task") | ||
|
||
|
||
def run_examples(**kwargs): | ||
with create_session() as session: | ||
current_examples = { | ||
var.key: json.loads(var.get_val())["instance_1"] | ||
for var in session.query(Variable) | ||
if "_examples" in var.key | ||
} | ||
|
||
current_examples = {k: v for k, v in current_examples.items() if v != {}} | ||
|
||
cf_client = connect_to_cornflow(EnvironmentVariablesBackend()) | ||
executions = [] | ||
|
||
for key, instance in current_examples.items(): | ||
schema = key.split("z_")[1].split("_examples")[0] | ||
|
||
try: | ||
response = cf_client.create_instance( | ||
data=instance, name=f"Automatic_instance_run_{schema}", schema=schema | ||
) | ||
except CornFlowApiError as e: | ||
logger.info(e) | ||
logger.info( | ||
f"Instance example for schema {schema} had an error on creation" | ||
) | ||
continue | ||
|
||
instance_id = response["id"] | ||
|
||
config = {"timeLimit": 60, "msg": True, "seconds": 60} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marioncottard I am getting a lot of weird behaviours with this part of timeLimit and seconds. Could you start preparing a PR that modifies all example DAGs to use the translation dictionaries that we had created? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay |
||
|
||
try: | ||
response = cf_client.create_execution( | ||
instance_id=instance_id, | ||
config=config, | ||
name=f"Automatic_execution_run_{schema}", | ||
schema=schema, | ||
) | ||
except CornFlowApiError as e: | ||
logger.info(e) | ||
logger.info( | ||
f"Execution example for schema {schema} had an error on creation" | ||
) | ||
continue | ||
|
||
execution_id = response["id"] | ||
executions.append((execution_id, schema)) | ||
|
||
limit = (len(executions) + 1) * 60 | ||
start = datetime.utcnow() | ||
|
||
while executions and datetime.utcnow() - start < timedelta(seconds=limit): | ||
for index, (execution, schema) in enumerate(executions): | ||
try: | ||
response = cf_client.get_status(execution_id=execution) | ||
except CornFlowApiError as e: | ||
logger.info(e) | ||
logger.info( | ||
f"Execution {execution} of schema {schema} had an error on status retrieval" | ||
) | ||
executions.pop(index) | ||
continue | ||
|
||
if response["state"] in (1, 2): | ||
logger.info( | ||
f"Execution {execution} of schema {schema} finished successfully" | ||
) | ||
executions.pop(index) | ||
elif response["state"] in (-1, -2, -3, -4, -5, -6): | ||
logger.info(f"Execution {execution} of schema {schema} failed") | ||
executions.pop(index) | ||
else: | ||
continue | ||
|
||
time.sleep(15) | ||
|
||
if len(executions): | ||
for execution, schema in executions: | ||
logger.info( | ||
f"Execution {execution} of schema {schema} could not be checked" | ||
) | ||
|
||
logger.info("Automatic test process finished") | ||
|
||
|
||
dag = DAG( | ||
"run_deployed_models", | ||
default_args=default_args, | ||
schedule_interval=None, | ||
catchup=False, | ||
) | ||
|
||
run_examples_task = PythonOperator( | ||
task_id="run_examples", | ||
python_callable=run_examples, | ||
provide_context=True, | ||
dag=dag, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change @AlejandraGalan done over your code is to make sure that this change is compatible with older versions
cornflow-client
just in case