From 87a6a6002562a0c3b0521015d1593a9a76a1929b Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Wed, 2 Jul 2025 14:36:49 -0300 Subject: [PATCH 1/7] NHS Import to BQ --- aircan/dags/api_ckan_import_to_bq_v2.py | 76 +++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 aircan/dags/api_ckan_import_to_bq_v2.py diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py new file mode 100644 index 0000000..50802de --- /dev/null +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -0,0 +1,76 @@ + +import logging +import time +import json +import ast +from datetime import date, datetime + +# Local imports +from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv + +# Third-party library imports +from airflow import DAG +from airflow.exceptions import AirflowException + +from airflow.models import Variable +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + + +args = { + 'start_date': days_ago(0), + 'params': { + "resource": { + "path": "path/to/my.csv", + "format": "CSV", + "ckan_resource_id": "res-id-123", + "schema": { + "fields": "['field1', 'field2']" + } + }, + "ckan_config": { + "api_key": "API_KEY", + "site_url": "URL", + }, + "big_query": { + "bq_project_id": "bigquery_project_id", + "bq_dataset_id": "bigquery_dataset_id" + }, + "output_bucket": str(date.today()) + } +} + +dag = DAG( + dag_id='ckan_api_import_to_bq_v2', + default_args=args, + schedule_interval=None +) + +def task_import_resource_to_bq(**context): + logging.info('Invoking import resource to bigquery') + logging.info("resource: {}".format(context['params'].get('resource', {}))) + + gc_file_url = context['params'].get('big_query', {}).get('gcs_uri') + bq_project_id = context['params'].get('big_query', {}).get('bq_project_id') + bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id') + bq_table_name = context['params'].get('big_query', {}).get('bq_table_name') + logging.info("bq_table_name: {}".format(bq_table_name)) + + raw_schema = context['params'].get('resource', {}).get('schema') + eval_schema = json.loads(raw_schema) + schema = eval_schema.get('fields') + logging.info("SCHEMA: {}".format(schema)) + + # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test" + bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name) + logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) + ckan_conf = context['params'].get('ckan_config', {}) + ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + +import_resource_to_bq_task = PythonOperator( + task_id='import_resource_to_bq', + provide_context=True, + python_callable=task_import_resource_to_bq, + dag=dag, +) From b0859e9aacf764811f5d26666caac08bc21b4474 Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Wed, 2 Jul 2025 14:38:07 -0300 Subject: [PATCH 2/7] New task as well --- aircan/dags/api_ckan_import_to_bq_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py index 50802de..1f040fd 100644 --- a/aircan/dags/api_ckan_import_to_bq_v2.py +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -69,7 +69,7 @@ def task_import_resource_to_bq(**context): bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) import_resource_to_bq_task = PythonOperator( - task_id='import_resource_to_bq', + task_id='import_resource_to_bq_v2', provide_context=True, python_callable=task_import_resource_to_bq, dag=dag, From 832667c18267a2ddcc4671fb0eb725ba1e9a8af9 Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Wed, 2 Jul 2025 14:56:25 -0300 Subject: [PATCH 3/7] Add safeguard --- aircan/dags/api_ckan_import_to_bq_v2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py index 1f040fd..eec779e 100644 --- a/aircan/dags/api_ckan_import_to_bq_v2.py +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -58,6 +58,8 @@ def task_import_resource_to_bq(**context): raw_schema = context['params'].get('resource', {}).get('schema') eval_schema = json.loads(raw_schema) + if isinstance(eval_schema, str): + eval_schema = ast.literal_eval(eval_schema) schema = eval_schema.get('fields') logging.info("SCHEMA: {}".format(schema)) From 4358c47b34965a7c399b12885343f48ec7ab6362 Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Thu, 3 Jul 2025 10:33:44 -0300 Subject: [PATCH 4/7] v2 schema detection --- aircan/dags/api_ckan_import_to_bq_v2.py | 2 +- .../google_cloud/bigquery_handler_v2.py | 95 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 aircan/dependencies/google_cloud/bigquery_handler_v2.py diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py index eec779e..2652a7e 100644 --- a/aircan/dags/api_ckan_import_to_bq_v2.py +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -6,7 +6,7 @@ from datetime import date, datetime # Local imports -from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv +from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv # Third-party library imports from airflow import DAG diff --git a/aircan/dependencies/google_cloud/bigquery_handler_v2.py b/aircan/dependencies/google_cloud/bigquery_handler_v2.py new file mode 100644 index 0000000..5fab896 --- /dev/null +++ b/aircan/dependencies/google_cloud/bigquery_handler_v2.py @@ -0,0 +1,95 @@ +from google.cloud import bigquery +import google.api_core.exceptions +from aircan.dependencies.utils import AirflowCKANException, aircan_status_update +import json +import logging + +def replace_all(dict, string): + for key in dict: + string = string.replace(key, dict[key]) + return string + +def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): + try: + client = bigquery.Client() + + job_config = bigquery.LoadJobConfig() + + schema = bq_schema_from_table_schema(table_schema) + job_config.schema = schema + + job_config.skip_leading_rows = 1 + job_config.source_format = bigquery.SourceFormat.CSV + # overwrite a Table + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub + # job_config.autodetect = True + load_job = client.load_table_from_uri( + gcs_path, table_id, job_config=job_config + ) + + load_job.result() # Waits for table load to complete. + destination_table = client.get_table(table_id) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'progress', + 'message': 'Data ingestion is in progress.' + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + if destination_table: + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'complete', + 'message': "Ingession Completed" + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + return {'success': True, 'message': 'BigQuery Table created successfully.'} + except Exception as e: + replacers = { + 'gs://dx-nhs-staging-giftless/': '', + 'gs://dx-nhs-production-giftless/': '', + 'gs://dx-nhs-prod-giftless/': '', + 'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '', + 'datopian-dx': '', + 'bigquery': '', + 'googleapi': '', + 'google': '' + + } + e = replace_all(replacers,str(e)) + logging.info(e) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'failed', + 'message': str(e) + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + raise AirflowCKANException('Data ingestion has failed.', str(e)) + + +def bq_schema_from_table_schema(table_schema): + mapping = { + 'string': 'STRING', + 'number': 'NUMERIC', + 'integer': 'NUMERIC', + 'boolean': 'BOOLEAN', + 'object': 'STRING', + 'array': 'STRING', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'year': 'NUMERIC', + 'yearmonth': 'STRING', + 'duration': 'DATETIME', + 'geopoint': 'GEOPOINT', + 'geojson': 'STRING', + 'any': 'STRING' + } + + def _convert(field): + # Â TODO: support for e.g. required + return bigquery.SchemaField(field['name'], + mapping.get(field['type'], field['type']), + 'NULLABLE' + ) + return [_convert(field) for field in table_schema] From 58bfb3019efa5387d24a499855097c04f0d7fd99 Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Tue, 15 Jul 2025 14:07:51 -0300 Subject: [PATCH 5/7] Add logging --- aircan/dags/api_ckan_import_to_bq_v2.py | 27 ++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py index 2652a7e..429deb2 100644 --- a/aircan/dags/api_ckan_import_to_bq_v2.py +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -7,6 +7,7 @@ # Local imports from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv +from aircan.dependencies.utils import aircan_status_update # Third-party library imports from airflow import DAG @@ -47,6 +48,8 @@ ) def task_import_resource_to_bq(**context): + ckan_api_key = context['params'].get('ckan_config', {}).get('api_key') + ckan_site_url = context['params'].get('ckan_config', {}).get('site_url') logging.info('Invoking import resource to bigquery') logging.info("resource: {}".format(context['params'].get('resource', {}))) @@ -68,7 +71,29 @@ def task_import_resource_to_bq(**context): logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) ckan_conf = context['params'].get('ckan_config', {}) ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') - bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + dag_run_id = context['dag_run'].run_id + res_id = ckan_conf.get('resource_id') + try: + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'complete', + 'message': 'Data ingestion completed successfully for "{res_id}".'.format( + res_id=res_id), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) + except Exception as e: + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'failed', + 'message': str(e), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) + raise Exception(str(e)) import_resource_to_bq_task = PythonOperator( task_id='import_resource_to_bq_v2', From 3a2c8486e82ca532988ae7116044045415712ca2 Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Wed, 16 Jul 2025 09:27:22 -0300 Subject: [PATCH 6/7] Infer from schema on error --- aircan/dags/api_ckan_import_to_bq_v2.py | 38 +++++--------- .../google_cloud/bigquery_handler_v2.py | 52 +++++++++++++------ aircan/dependencies/utils.py | 38 +++++++++++++- 3 files changed, 86 insertions(+), 42 deletions(-) diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py index 429deb2..d38166d 100644 --- a/aircan/dags/api_ckan_import_to_bq_v2.py +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -7,7 +7,7 @@ # Local imports from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv -from aircan.dependencies.utils import aircan_status_update +from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update # Third-party library imports from airflow import DAG @@ -16,7 +16,7 @@ from airflow.models import Variable from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago - +import traceback args = { 'start_date': days_ago(0), @@ -71,29 +71,19 @@ def task_import_resource_to_bq(**context): logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) ckan_conf = context['params'].get('ckan_config', {}) ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') - dag_run_id = context['dag_run'].run_id + dag_run_id = context['run_id'] res_id = ckan_conf.get('resource_id') - try: - bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) - status_dict = { - 'dag_run_id': dag_run_id, - 'resource_id': res_id, - 'state': 'complete', - 'message': 'Data ingestion completed successfully for "{res_id}".'.format( - res_id=res_id), - 'clear_logs': True - } - aircan_status_update(ckan_site_url, ckan_api_key, status_dict) - except Exception as e: - status_dict = { - 'dag_run_id': dag_run_id, - 'resource_id': res_id, - 'state': 'failed', - 'message': str(e), - 'clear_logs': True - } - aircan_status_update(ckan_site_url, ckan_api_key, status_dict) - raise Exception(str(e)) + ckan_conf['dag_run_id'] = dag_run_id + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'complete', + 'message': 'Data ingestion completed successfully for "{res_id}".'.format( + res_id=res_id), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) import_resource_to_bq_task = PythonOperator( task_id='import_resource_to_bq_v2', diff --git a/aircan/dependencies/google_cloud/bigquery_handler_v2.py b/aircan/dependencies/google_cloud/bigquery_handler_v2.py index 5fab896..ec6613d 100644 --- a/aircan/dependencies/google_cloud/bigquery_handler_v2.py +++ b/aircan/dependencies/google_cloud/bigquery_handler_v2.py @@ -1,6 +1,6 @@ from google.cloud import bigquery import google.api_core.exceptions -from aircan.dependencies.utils import AirflowCKANException, aircan_status_update +from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update import json import logging @@ -13,34 +13,51 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): try: client = bigquery.Client() - job_config = bigquery.LoadJobConfig() + try: + job_config = bigquery.LoadJobConfig() - schema = bq_schema_from_table_schema(table_schema) - job_config.schema = schema + schema = bq_schema_from_table_schema(table_schema) + job_config.schema = schema - job_config.skip_leading_rows = 1 - job_config.source_format = bigquery.SourceFormat.CSV - # overwrite a Table - job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE - # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub - # job_config.autodetect = True - load_job = client.load_table_from_uri( - gcs_path, table_id, job_config=job_config - ) + job_config.skip_leading_rows = 1 + job_config.source_format = bigquery.SourceFormat.CSV + # overwrite a Table + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub + # job_config.autodetect = True + load_job = client.load_table_from_uri( + gcs_path, table_id, job_config=job_config + ) - load_job.result() # Waits for table load to complete. - destination_table = client.get_table(table_id) + load_job.result() # Waits for table load to complete. + destination_table = client.get_table(table_id) + except Exception as e: + job_config = bigquery.LoadJobConfig() + + job_config.skip_leading_rows = 1 + job_config.source_format = bigquery.SourceFormat.CSV + # overwrite a Table + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub + # job_config.autodetect = True + load_job = client.load_table_from_uri( + gcs_path, table_id, job_config=job_config + ) + load_job.result() # Waits for table load to complete. + destination_table = client.get_table(table_id) status_dict = { 'res_id': ckan_conf.get('resource_id'), 'state': 'progress', - 'message': 'Data ingestion is in progress.' + 'message': 'Data ingestion is in progress.', + 'dag_run_id': ckan_conf.get('dag_run_id') } aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) if destination_table: status_dict = { 'res_id': ckan_conf.get('resource_id'), 'state': 'complete', - 'message': "Ingession Completed" + 'message': "Ingession Completed", + 'dag_run_id': ckan_conf.get('dag_run_id') } aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) return {'success': True, 'message': 'BigQuery Table created successfully.'} @@ -60,6 +77,7 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): logging.info(e) status_dict = { 'res_id': ckan_conf.get('resource_id'), + 'dag_run_id': ckan_conf.get('dag_run_id'), 'state': 'failed', 'message': str(e) } diff --git a/aircan/dependencies/utils.py b/aircan/dependencies/utils.py index 916932f..f8b73c8 100644 --- a/aircan/dependencies/utils.py +++ b/aircan/dependencies/utils.py @@ -72,6 +72,42 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0): time(hour, minute, second, microsecond, tzinfo=timezone.TIMEZONE), ) +def aircan_status_update_nhs (site_url, ckan_api_key, status_dict): + """ + Update aircan run status like pending, error, process, complete + on ckan with message. + """ + logging.info('Updating data loading status') + try: + request_data = { + 'dag_run_id': status_dict.get('dag_run_id', ''), + 'resource_id': status_dict.get('res_id', ''), + 'state': status_dict.get('state', ''), + 'last_updated': str(datetime.utcnow()), + 'message': status_dict.get('message', ''), + } + + if status_dict.get('error', False): + request_data.update({'error': { + 'message' : status_dict.get('error', '') + }}) + + url = urljoin(site_url, '/api/3/action/aircan_status_update') + response = requests.post(url, + data=json.dumps(request_data), + headers={'Content-Type': 'application/json', + 'Authorization': ckan_api_key}) + print(response.text) + if response.status_code == 200: + resource_json = response.json() + logging.info('Loading status updated successfully in CKAN.') + return {'success': True} + else: + print(response.json()) + return response.json() + except Exception as e: + logging.error('Failed to update status in CKAN. {0}'.format(e)) + def aircan_status_update(site_url, ckan_api_key, status_dict): """ Update aircan run status like pending, error, process, complete @@ -325,4 +361,4 @@ def join_path(path, *paths): """ for p in paths: path = os.path.join(path, p) - return path \ No newline at end of file + return path From e9f14b77a5ac7a47084ff2280024378ee18e27ca Mon Sep 17 00:00:00 2001 From: Luccas Mateus Date: Mon, 21 Jul 2025 08:57:28 -0300 Subject: [PATCH 7/7] Update bg handler --- aircan/dags/api_ckan_import_to_bq.bkp.py | 77 +++++++++++++++++++ aircan/dags/api_ckan_import_to_bq.py | 28 +++++-- .../google_cloud/bigquery_handler_v2.py | 67 ++++++++++++---- 3 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 aircan/dags/api_ckan_import_to_bq.bkp.py diff --git a/aircan/dags/api_ckan_import_to_bq.bkp.py b/aircan/dags/api_ckan_import_to_bq.bkp.py new file mode 100644 index 0000000..0068dea --- /dev/null +++ b/aircan/dags/api_ckan_import_to_bq.bkp.py @@ -0,0 +1,77 @@ + +import logging +import time +import json +import ast +from datetime import date, datetime + +# Local imports +from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv + +# Third-party library imports +from airflow import DAG +from airflow.exceptions import AirflowException + +from airflow.models import Variable +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + + +args = { + 'start_date': days_ago(0), + 'params': { + "resource": { + "path": "path/to/my.csv", + "format": "CSV", + "ckan_resource_id": "res-id-123", + "schema": { + "fields": "['field1', 'field2']" + } + }, + "ckan_config": { + "api_key": "API_KEY", + "site_url": "URL", + }, + "big_query": { + "bq_project_id": "bigquery_project_id", + "bq_dataset_id": "bigquery_dataset_id" + }, + "output_bucket": str(date.today()) + } +} + +dag = DAG( + dag_id='ckan_api_import_to_bq', + default_args=args, + schedule_interval=None +) + +def task_import_resource_to_bq(**context): + logging.info('Invoking import resource to bigquery') + logging.info("resource: {}".format(context['params'].get('resource', {}))) + + gc_file_url = context['params'].get('big_query', {}).get('gcs_uri') + bq_project_id = context['params'].get('big_query', {}).get('bq_project_id') + bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id') + bq_table_name = context['params'].get('big_query', {}).get('bq_table_name') + logging.info("bq_table_name: {}".format(bq_table_name)) + + raw_schema = context['params'].get('resource', {}).get('schema') + eval_schema = json.loads(raw_schema) + eval_schema = ast.literal_eval(eval_schema) + schema = eval_schema.get('fields') + logging.info("SCHEMA: {}".format(schema)) + + # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test" + bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name) + logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) + ckan_conf = context['params'].get('ckan_config', {}) + ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + +import_resource_to_bq_task = PythonOperator( + task_id='import_resource_to_bq', + provide_context=True, + python_callable=task_import_resource_to_bq, + dag=dag, +) \ No newline at end of file diff --git a/aircan/dags/api_ckan_import_to_bq.py b/aircan/dags/api_ckan_import_to_bq.py index 0068dea..d38166d 100644 --- a/aircan/dags/api_ckan_import_to_bq.py +++ b/aircan/dags/api_ckan_import_to_bq.py @@ -6,7 +6,8 @@ from datetime import date, datetime # Local imports -from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv +from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv +from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update # Third-party library imports from airflow import DAG @@ -15,7 +16,7 @@ from airflow.models import Variable from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago - +import traceback args = { 'start_date': days_ago(0), @@ -41,12 +42,14 @@ } dag = DAG( - dag_id='ckan_api_import_to_bq', + dag_id='ckan_api_import_to_bq_v2', default_args=args, schedule_interval=None ) def task_import_resource_to_bq(**context): + ckan_api_key = context['params'].get('ckan_config', {}).get('api_key') + ckan_site_url = context['params'].get('ckan_config', {}).get('site_url') logging.info('Invoking import resource to bigquery') logging.info("resource: {}".format(context['params'].get('resource', {}))) @@ -58,7 +61,8 @@ def task_import_resource_to_bq(**context): raw_schema = context['params'].get('resource', {}).get('schema') eval_schema = json.loads(raw_schema) - eval_schema = ast.literal_eval(eval_schema) + if isinstance(eval_schema, str): + eval_schema = ast.literal_eval(eval_schema) schema = eval_schema.get('fields') logging.info("SCHEMA: {}".format(schema)) @@ -67,11 +71,23 @@ def task_import_resource_to_bq(**context): logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) ckan_conf = context['params'].get('ckan_config', {}) ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + dag_run_id = context['run_id'] + res_id = ckan_conf.get('resource_id') + ckan_conf['dag_run_id'] = dag_run_id bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'complete', + 'message': 'Data ingestion completed successfully for "{res_id}".'.format( + res_id=res_id), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) import_resource_to_bq_task = PythonOperator( - task_id='import_resource_to_bq', + task_id='import_resource_to_bq_v2', provide_context=True, python_callable=task_import_resource_to_bq, dag=dag, -) \ No newline at end of file +) diff --git a/aircan/dependencies/google_cloud/bigquery_handler_v2.py b/aircan/dependencies/google_cloud/bigquery_handler_v2.py index ec6613d..33bb39c 100644 --- a/aircan/dependencies/google_cloud/bigquery_handler_v2.py +++ b/aircan/dependencies/google_cloud/bigquery_handler_v2.py @@ -23,8 +23,6 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): job_config.source_format = bigquery.SourceFormat.CSV # overwrite a Table job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE - # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub - # job_config.autodetect = True load_job = client.load_table_from_uri( gcs_path, table_id, job_config=job_config ) @@ -32,19 +30,60 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): load_job.result() # Waits for table load to complete. destination_table = client.get_table(table_id) except Exception as e: - job_config = bigquery.LoadJobConfig() - - job_config.skip_leading_rows = 1 - job_config.source_format = bigquery.SourceFormat.CSV - # overwrite a Table - job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE - # set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub - # job_config.autodetect = True - load_job = client.load_table_from_uri( - gcs_path, table_id, job_config=job_config + logging.info(e) + # Use a list to build the string components efficiently. + error_lines = [] + error_lines.append( + "BigQuery Load Job Failed with a BadRequest." ) - load_job.result() # Waits for table load to complete. - destination_table = client.get_table(table_id) + error_lines.append(f"Original API message: {e}") + + # The key part: Iterate through the e.errors list and append to our list. + if load_job.errors: + error_lines.append("\n--- Detailed Error Breakdown ---") + logging.info(load_job.errors) + for i, error in enumerate(load_job.errors): + # Format each error dictionary into a readable line. + line = ( + f"Error {i+1}: " + f"Reason: {error.get('reason', 'N/A')}, " + f"Location: {error.get('location', 'N/A')}, " + f"Message: {error.get('message', 'N/A')}" + ) + error_lines.append(line) + else: + error_lines.append("No detailed errors were provided in the exception.") + + # Join the list of lines into a single string with newlines. + error_report_string = "\n".join(error_lines) + logging.info(error_report_string) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'failed', + 'message': error_report_string, + 'dag_run_id': ckan_conf.get('dag_run_id') + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + raise AirflowCKANException('Data ingestion has failed.', str(e)) + #status_dict = { + # 'res_id': ckan_conf.get('resource_id'), + # 'state': 'progress', + # 'message': 'Data ingestion using provided schema failed, trying to autodetect schema.', + # 'dag_run_id': ckan_conf.get('dag_run_id') + #} + #aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + #job_config = bigquery.LoadJobConfig() + #job_config.autodetect = True + + #job_config.skip_leading_rows = 1 + #job_config.source_format = bigquery.SourceFormat.CSV + ## overwrite a Table + #job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + #load_job = client.load_table_from_uri( + # gcs_path, table_id, job_config=job_config + #) + #load_job.result() # Waits for table load to complete. + #destination_table = client.get_table(table_id) status_dict = { 'res_id': ckan_conf.get('resource_id'), 'state': 'progress',