Skip to content

Commit b62c9c7

Browse files
Refactore report.py. Simpler progress reports posting
1 parent 9dd3125 commit b62c9c7

File tree

1 file changed

+25
-60
lines changed

1 file changed

+25
-60
lines changed

cwl_airflow/utilities/report.py

Lines changed: 25 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import json
33
import jwt
44
import logging
5-
import requests
65

76
from airflow.models import Variable
87
from airflow.utils.state import State
@@ -11,31 +10,22 @@
1110

1211
CONN_ID = "process_report"
1312
ROUTES = {
14-
"progress": "progress",
15-
"results": "results",
16-
"status": "status"
13+
"progress": "airflow/progress",
14+
"results": "airflow/results",
15+
"status": "airflow/status"
1716
}
1817
PRIVATE_KEY = "process_report_private_key"
1918
ALGORITHM = "process_report_algorithm"
2019

20+
http_hook = HttpHook(method="POST", http_conn_id=CONN_ID) # won't fail even if CONN_ID doesn't exist
2121

22-
def prepare_connection(conn_id, route):
23-
http_hook = HttpHook(http_conn_id=conn_id)
24-
session = http_hook.get_conn()
25-
url = "/".join(
26-
[
27-
u.strip("/") for u in [http_hook.base_url, session.headers["endpoint"], route]
28-
]
29-
)
30-
return http_hook, session, url
3122

32-
33-
def sign_with_jwt(data, private_key=None, algorithm=None):
23+
def sign_with_jwt(data):
3424
try:
3525
data = jwt.encode(
3626
payload=data,
37-
key=private_key or Variable.get(PRIVATE_KEY),
38-
algorithm=algorithm or Variable.get(ALGORITHM)
27+
key=Variable.get(PRIVATE_KEY),
28+
algorithm=Variable.get(ALGORITHM)
3929
).decode("utf-8")
4030
except Exception as err:
4131
logging.debug(f"Failed to sign data with JWT key. \n {err}")
@@ -45,27 +35,19 @@ def sign_with_jwt(data, private_key=None, algorithm=None):
4535
def post_progress(context, from_task=None):
4636
from_task = False if from_task is None else from_task
4737
try:
48-
http_hook, session, url = prepare_connection(CONN_ID, ROUTES["progress"])
4938
dag_run = context["dag_run"]
5039
len_tis = len(dag_run.get_task_instances())
5140
len_tis_success = len(dag_run.get_task_instances(state=State.SUCCESS)) + int(from_task)
5241
data = sign_with_jwt(
53-
data={
54-
"state": dag_run.state,
55-
"dag_id": dag_run.dag_id,
56-
"run_id": dag_run.run_id,
42+
{
43+
"state": dag_run.state,
44+
"dag_id": dag_run.dag_id,
45+
"run_id": dag_run.run_id,
5746
"progress": int(len_tis_success / len_tis * 100),
58-
"error": context["reason"] if dag_run.state == State.FAILED else ""
47+
"error": context["reason"] if dag_run.state == State.FAILED else ""
5948
}
6049
)
61-
prepped_request = session.prepare_request(
62-
requests.Request(
63-
"POST",
64-
url,
65-
json={"payload": data}
66-
)
67-
)
68-
http_hook.run_and_check(session, prepped_request, {})
50+
http_hook.run(endpoint=ROUTES["progress"], json={"payload": data})
6951
except Exception as err:
7052
logging.debug(f"Failed to POST progress updates. \n {err}")
7153

@@ -76,11 +58,10 @@ def post_results(context):
7658
isinstance(task, CWLJobGatherer) to find the proper task because of the
7759
endless import loop (file where we define CWLJobGatherer class import this
7860
file). If CWLDAG is contsructed with custom gatherer node, posting results
79-
might not work.
61+
might not work. We need to except missing results file as the same callback
62+
is used for clean_dag_run DAG
8063
"""
81-
8264
try:
83-
http_hook, session, url = prepare_connection(CONN_ID, ROUTES["results"])
8465
dag_run = context["dag_run"]
8566
results = {}
8667
try:
@@ -89,47 +70,31 @@ def post_results(context):
8970
results = json.load(input_stream)
9071
except Exception as err:
9172
logging.debug(f"Failed to read results. \n {err}")
92-
9373
data = sign_with_jwt(
94-
data={
95-
"dag_id": dag_run.dag_id,
96-
"run_id": dag_run.run_id,
74+
{
75+
"dag_id": dag_run.dag_id,
76+
"run_id": dag_run.run_id,
9777
"results": results
9878
}
9979
)
100-
prepped_request = session.prepare_request(
101-
requests.Request(
102-
"POST",
103-
url,
104-
json={"payload": data}
105-
)
106-
)
107-
http_hook.run_and_check(session, prepped_request, {})
80+
http_hook.run(endpoint=ROUTES["results"], json={"payload": data})
10881
except Exception as err:
10982
logging.debug(f"Failed to POST results. \n {err}")
11083

11184

11285
def post_status(context):
11386
try:
114-
http_hook, session, url = prepare_connection(CONN_ID, ROUTES["status"])
11587
dag_run = context["dag_run"]
11688
ti = context["ti"]
11789
data = sign_with_jwt(
118-
data={
119-
"state": ti.state,
120-
"dag_id": dag_run.dag_id,
121-
"run_id": dag_run.run_id,
122-
"task_id": ti.task_id
90+
{
91+
"state": ti.state,
92+
"dag_id": dag_run.dag_id,
93+
"run_id": dag_run.run_id,
94+
"task_id": ti.task_id
12395
}
12496
)
125-
prepped_request = session.prepare_request(
126-
requests.Request(
127-
"POST",
128-
url,
129-
json={"payload": data}
130-
)
131-
)
132-
http_hook.run_and_check(session, prepped_request, {})
97+
http_hook.run(endpoint=ROUTES["status"], json={"payload": data})
13398
except Exception as err:
13499
logging.debug(f"Failed to POST status updates. \n {err}")
135100

0 commit comments

Comments
 (0)