Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions aircan/dags/api_ckan_import_to_bq.bkp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

import logging
import time
import json
import ast
from datetime import date, datetime

# Local imports
from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv

# Third-party library imports
from airflow import DAG
from airflow.exceptions import AirflowException

from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


args = {
'start_date': days_ago(0),
'params': {
"resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": "['field1', 'field2']"
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
},
"big_query": {
"bq_project_id": "bigquery_project_id",
"bq_dataset_id": "bigquery_dataset_id"
},
"output_bucket": str(date.today())
}
}

dag = DAG(
dag_id='ckan_api_import_to_bq',
default_args=args,
schedule_interval=None
)

def task_import_resource_to_bq(**context):
logging.info('Invoking import resource to bigquery')
logging.info("resource: {}".format(context['params'].get('resource', {})))

gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
logging.info("bq_table_name: {}".format(bq_table_name))

raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
eval_schema = ast.literal_eval(eval_schema)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential schema parsing vulnerability.

The code directly calls ast.literal_eval() without checking if eval_schema is already a dict, which could cause errors if the schema is already parsed. The main version in api_ckan_import_to_bq.py handles this more robustly with an isinstance check.

Consider using the enhanced logic from the main DAG file:

 eval_schema = json.loads(raw_schema)
-eval_schema = ast.literal_eval(eval_schema)
+if isinstance(eval_schema, str):
+    eval_schema = ast.literal_eval(eval_schema)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
eval_schema = ast.literal_eval(eval_schema)
eval_schema = json.loads(raw_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
🤖 Prompt for AI Agents
In aircan/dags/api_ckan_import_to_bq.bkp.py at line 61, the code calls
ast.literal_eval on eval_schema without checking its type, which can cause
errors if eval_schema is already a dictionary. To fix this, add an isinstance
check before calling ast.literal_eval: only parse eval_schema if it is a string,
otherwise leave it as is. This will make the schema parsing more robust and
prevent potential errors.

schema = eval_schema.get('fields')
logging.info("SCHEMA: {}".format(schema))

# sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
ckan_conf = context['params'].get('ckan_config', {})
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)

import_resource_to_bq_task = PythonOperator(
task_id='import_resource_to_bq',
provide_context=True,
python_callable=task_import_resource_to_bq,
dag=dag,
)
28 changes: 22 additions & 6 deletions aircan/dags/api_ckan_import_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from datetime import date, datetime

# Local imports
from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update

# Third-party library imports
from airflow import DAG
Expand All @@ -15,7 +16,7 @@
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

import traceback

args = {
'start_date': days_ago(0),
Expand All @@ -41,12 +42,14 @@
}

dag = DAG(
dag_id='ckan_api_import_to_bq',
dag_id='ckan_api_import_to_bq_v2',
default_args=args,
schedule_interval=None
)

def task_import_resource_to_bq(**context):
ckan_api_key = context['params'].get('ckan_config', {}).get('api_key')
ckan_site_url = context['params'].get('ckan_config', {}).get('site_url')
logging.info('Invoking import resource to bigquery')
logging.info("resource: {}".format(context['params'].get('resource', {})))

Expand All @@ -58,7 +61,8 @@ def task_import_resource_to_bq(**context):

raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
eval_schema = ast.literal_eval(eval_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
schema = eval_schema.get('fields')
logging.info("SCHEMA: {}".format(schema))

Expand All @@ -67,11 +71,23 @@ def task_import_resource_to_bq(**context):
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
ckan_conf = context['params'].get('ckan_config', {})
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
dag_run_id = context['run_id']
res_id = ckan_conf.get('resource_id')
ckan_conf['dag_run_id'] = dag_run_id
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
status_dict = {
'dag_run_id': dag_run_id,
'resource_id': res_id,
'state': 'complete',
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
res_id=res_id),
'clear_logs': True
}
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)

import_resource_to_bq_task = PythonOperator(
task_id='import_resource_to_bq',
task_id='import_resource_to_bq_v2',
provide_context=True,
python_callable=task_import_resource_to_bq,
dag=dag,
)
)
93 changes: 93 additions & 0 deletions aircan/dags/api_ckan_import_to_bq_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@

import logging
import time
import json
import ast
from datetime import date, datetime

# Local imports
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update

# Third-party library imports
from airflow import DAG
from airflow.exceptions import AirflowException

from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import traceback

args = {
'start_date': days_ago(0),
'params': {
"resource": {
"path": "path/to/my.csv",
"format": "CSV",
"ckan_resource_id": "res-id-123",
"schema": {
"fields": "['field1', 'field2']"
}
},
"ckan_config": {
"api_key": "API_KEY",
"site_url": "URL",
},
"big_query": {
"bq_project_id": "bigquery_project_id",
"bq_dataset_id": "bigquery_dataset_id"
},
"output_bucket": str(date.today())
}
}

dag = DAG(
dag_id='ckan_api_import_to_bq_v2',
default_args=args,
schedule_interval=None
)

def task_import_resource_to_bq(**context):
ckan_api_key = context['params'].get('ckan_config', {}).get('api_key')
ckan_site_url = context['params'].get('ckan_config', {}).get('site_url')
logging.info('Invoking import resource to bigquery')
logging.info("resource: {}".format(context['params'].get('resource', {})))

gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
logging.info("bq_table_name: {}".format(bq_table_name))

raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
schema = eval_schema.get('fields')
logging.info("SCHEMA: {}".format(schema))

# sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
ckan_conf = context['params'].get('ckan_config', {})
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
dag_run_id = context['run_id']
res_id = ckan_conf.get('resource_id')
ckan_conf['dag_run_id'] = dag_run_id
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
status_dict = {
'dag_run_id': dag_run_id,
'resource_id': res_id,
'state': 'complete',
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
res_id=res_id),
'clear_logs': True
}
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)

import_resource_to_bq_task = PythonOperator(
task_id='import_resource_to_bq_v2',
provide_context=True,
python_callable=task_import_resource_to_bq,
dag=dag,
)
152 changes: 152 additions & 0 deletions aircan/dependencies/google_cloud/bigquery_handler_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from google.cloud import bigquery
import google.api_core.exceptions
from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update
import json
import logging

def replace_all(dict, string):
for key in dict:
string = string.replace(key, dict[key])
return string

def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
try:
client = bigquery.Client()

try:
job_config = bigquery.LoadJobConfig()

schema = bq_schema_from_table_schema(table_schema)
job_config.schema = schema

job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# overwrite a Table
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)

load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
except Exception as e:
logging.info(e)
# Use a list to build the string components efficiently.
error_lines = []
error_lines.append(
"BigQuery Load Job Failed with a BadRequest."
)
error_lines.append(f"Original API message: {e}")

# The key part: Iterate through the e.errors list and append to our list.
if load_job.errors:
error_lines.append("\n--- Detailed Error Breakdown ---")
logging.info(load_job.errors)
for i, error in enumerate(load_job.errors):
# Format each error dictionary into a readable line.
line = (
f"Error {i+1}: "
f"Reason: {error.get('reason', 'N/A')}, "
f"Location: {error.get('location', 'N/A')}, "
f"Message: {error.get('message', 'N/A')}"
)
error_lines.append(line)
else:
error_lines.append("No detailed errors were provided in the exception.")

# Join the list of lines into a single string with newlines.
error_report_string = "\n".join(error_lines)
logging.info(error_report_string)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'failed',
'message': error_report_string,
'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-            raise AirflowCKANException('Data ingestion has failed.', str(e))
+            raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
- raise AirflowCKANException('Data ingestion has failed.', str(e))
+ raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

67-67: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 67, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context.

#status_dict = {
# 'res_id': ckan_conf.get('resource_id'),
# 'state': 'progress',
# 'message': 'Data ingestion using provided schema failed, trying to autodetect schema.',
# 'dag_run_id': ckan_conf.get('dag_run_id')
#}
#aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
#job_config = bigquery.LoadJobConfig()
#job_config.autodetect = True

#job_config.skip_leading_rows = 1
#job_config.source_format = bigquery.SourceFormat.CSV
## overwrite a Table
#job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
#load_job = client.load_table_from_uri(
# gcs_path, table_id, job_config=job_config
#)
#load_job.result() # Waits for table load to complete.
#destination_table = client.get_table(table_id)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.',
'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed",
'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
except Exception as e:
replacers = {
'gs://dx-nhs-staging-giftless/': '',
'gs://dx-nhs-production-giftless/': '',
'gs://dx-nhs-prod-giftless/': '',
'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
'datopian-dx': '',
'bigquery': '',
'googleapi': '',
'google': ''

}
Comment on lines +104 to +114
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make error sanitization configurable.

The hardcoded bucket names and project IDs make this code environment-specific and brittle. Consider making this configurable.

-        replacers = {
-            'gs://dx-nhs-staging-giftless/': '',
-            'gs://dx-nhs-production-giftless/': '',
-            'gs://dx-nhs-prod-giftless/': '',
-            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
-            'datopian-dx': '',
-            'bigquery': '',
-            'googleapi': '',
-            'google': ''
-
-        }
+        # Configurable sanitization - could be passed as parameter or from config
+        replacers = ckan_conf.get('error_sanitization', {
+            'gs://dx-nhs-staging-giftless/': '',
+            'gs://dx-nhs-production-giftless/': '',
+            'gs://dx-nhs-prod-giftless/': '',
+            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
+            'datopian-dx': '',
+            'bigquery': '',
+            'googleapi': '',
+            'google': ''
+        })

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 48 to
58, the error sanitization uses hardcoded bucket names and project IDs, making
it environment-specific and brittle. Refactor the code to accept these replacer
values from a configuration source such as environment variables or a config
file, allowing different environments to specify their own values without
changing the code.

e = replace_all(replacers,str(e))
logging.info(e)
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'dag_run_id': ckan_conf.get('dag_run_id'),
'state': 'failed',
'message': str(e)
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 124, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context for better debugging.



def bq_schema_from_table_schema(table_schema):
mapping = {
'string': 'STRING',
'number': 'NUMERIC',
'integer': 'NUMERIC',
'boolean': 'BOOLEAN',
'object': 'STRING',
'array': 'STRING',
'date': 'DATE',
'time': 'TIME',
'datetime': 'DATETIME',
'year': 'NUMERIC',
'yearmonth': 'STRING',
'duration': 'DATETIME',
'geopoint': 'GEOPOINT',
'geojson': 'STRING',
'any': 'STRING'
}

def _convert(field):
# Â TODO: support for e.g. required
return bigquery.SchemaField(field['name'],
mapping.get(field['type'], field['type']),
'NULLABLE'
)
return [_convert(field) for field in table_schema]
Loading
Loading