Skip to content

Commit 6def9a4

Browse files
authored
Merge pull request #115 from datopian/dag/data-dictionary
Dag added to reporting empty data dictionary.
2 parents c13ea91 + 66bccff commit 6def9a4

File tree

2 files changed

+148
-1
lines changed

2 files changed

+148
-1
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import logging
2+
import datetime
3+
import time
4+
from pprint import pprint
5+
6+
from airflow import DAG
7+
from airflow.exceptions import AirflowException
8+
from airflow.models import Variable
9+
import requests
10+
from urllib.parse import urljoin
11+
from airflow.operators.python_operator import PythonOperator
12+
from airflow.utils.dates import days_ago
13+
from airflow.operators.email_operator import EmailOperator
14+
from sendgrid import SendGridAPIClient
15+
from sendgrid.helpers.mail import Mail, To
16+
from string import Template
17+
from time import sleep
18+
19+
CKAN_URL = Variable.get("NG_CKAN_URL")
20+
SENDGRID_KEY = Variable.get("NG_SENDGRID_API_KEY")
21+
MAIL_FROM = Variable.get("NG_MAIL_FROM")
22+
MAIL_TO = Variable.get("NG_MAIL_TO")
23+
24+
args = {
25+
"start_date": days_ago(0)
26+
}
27+
28+
dag = DAG(
29+
dag_id="ng_data_example_logs",
30+
default_args=args,
31+
schedule_interval="0 0 * * 0"
32+
)
33+
34+
def get_all_datastore_tables():
35+
try:
36+
logging.info("Retriving all datastore tables.")
37+
response = requests.get(urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id=_table_metadata"),
38+
headers = {"User-Agent": "ckan-others/latest (internal API call from airflow dag)"})
39+
response.raise_for_status()
40+
if response.status_code == 200:
41+
return [resource["name"] for resource in response.json()["result"]["records"]]
42+
43+
except requests.exceptions.HTTPError as e:
44+
return e.response.text
45+
46+
47+
def get_data_dictionary(res_id):
48+
try:
49+
logging.info("Retriving fields infomation for {res_id}".format(res_id=res_id))
50+
response = requests.get(
51+
urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id={0}&limit=0".format(res_id)),
52+
headers={"User-Agent": "ckan-others/latest (internal API call from airflow dag)"}
53+
)
54+
response.raise_for_status()
55+
if response.status_code == 200:
56+
return response.json()["result"]["fields"]
57+
58+
except requests.exceptions.HTTPError as e:
59+
logging.error("Failed to get fields infomation for {res_id}".format(res_id=res_id))
60+
return False
61+
62+
63+
def check_empty_data_dictionary(ds, **kwargs):
64+
all_datastore_tables = get_all_datastore_tables()
65+
logging.info("Retriving all datastore tables.")
66+
report = []
67+
for res_id in all_datastore_tables:
68+
# Delay 500ms
69+
sleep(0.5)
70+
fields = get_data_dictionary(res_id)
71+
if(fields):
72+
data_dictionary = ['info' in f for f in fields]
73+
if True not in data_dictionary:
74+
report.append(res_id)
75+
return report
76+
77+
78+
def HTML_report_generate(resource_list):
79+
table_row = ""
80+
for resource in resource_list:
81+
table_row += u"<tr><td>{0}</td><td>{1}</td></tr>".format(resource, "Empty")
82+
83+
html = """\
84+
<!DOCTYPE html>
85+
<html>
86+
<head>
87+
<style>
88+
table {
89+
font-family: arial, sans-serif;
90+
border-collapse: collapse;
91+
width: 100%;
92+
}
93+
td, th {
94+
border: 1px solid #dddddd;
95+
text-align: left;
96+
padding: 8px;
97+
}
98+
tr:nth-child(even) {
99+
background-color: #dddddd;
100+
}
101+
</style>
102+
</head>
103+
<body>
104+
<h2>List of the resource with empty data dictionaries.</h2>
105+
<table>
106+
<tr>
107+
<th>Resource ID</th>
108+
<th>Data Dictionary</th>
109+
</tr>
110+
$table_row
111+
</table>
112+
</body>
113+
</html>
114+
"""
115+
return Template(html).safe_substitute(table_row=table_row)
116+
117+
def dispatch_email(**context):
118+
resource_list = context["task_instance"].xcom_pull(task_ids="check_empty_data_dictionary")
119+
if resource_list:
120+
message = Mail(
121+
from_email=MAIL_FROM,
122+
to_emails=[To("{0}".format(recipient)) for recipient in MAIL_TO.split(',')],
123+
subject="Empty data dictionary report.",
124+
html_content= HTML_report_generate(resource_list))
125+
try:
126+
sg = SendGridAPIClient(SENDGRID_KEY)
127+
response = sg.send(message)
128+
logging.info("{0} - Report successfully sent via email.".format(response.status_code))
129+
except Exception as e:
130+
logging.info(e.message)
131+
132+
133+
check_empty_data_dictionary_task = PythonOperator(
134+
task_id="check_empty_data_dictionary",
135+
provide_context=True,
136+
python_callable=check_empty_data_dictionary,
137+
dag=dag,
138+
)
139+
140+
dispatch_email_task = PythonOperator(
141+
task_id="dispatch_email",
142+
provide_context=True,
143+
python_callable=dispatch_email,
144+
dag=dag,
145+
)
146+
147+
check_empty_data_dictionary_task >> dispatch_email_task

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,6 @@ werkzeug==0.16.1 # via apache-airflow, flask, flask-caching, flask-jwt-
103103
wtforms==2.3.1 # via flask-admin, flask-wtf
104104
zipp==3.1.0 # via importlib-metadata
105105
zope.deprecation==4.4.0 # via apache-airflow
106-
106+
sendgrid==6.9.4 # via sendgrid
107107
# The following packages are considered to be unsafe in a requirements file:
108108
# setuptools

0 commit comments

Comments
 (0)