diff --git a/aircan/dags/api_ckan_import_to_bq.bkp.py b/aircan/dags/api_ckan_import_to_bq.bkp.py new file mode 100644 index 0000000..0068dea --- /dev/null +++ b/aircan/dags/api_ckan_import_to_bq.bkp.py @@ -0,0 +1,77 @@ + +import logging +import time +import json +import ast +from datetime import date, datetime + +# Local imports +from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv + +# Third-party library imports +from airflow import DAG +from airflow.exceptions import AirflowException + +from airflow.models import Variable +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago + + +args = { + 'start_date': days_ago(0), + 'params': { + "resource": { + "path": "path/to/my.csv", + "format": "CSV", + "ckan_resource_id": "res-id-123", + "schema": { + "fields": "['field1', 'field2']" + } + }, + "ckan_config": { + "api_key": "API_KEY", + "site_url": "URL", + }, + "big_query": { + "bq_project_id": "bigquery_project_id", + "bq_dataset_id": "bigquery_dataset_id" + }, + "output_bucket": str(date.today()) + } +} + +dag = DAG( + dag_id='ckan_api_import_to_bq', + default_args=args, + schedule_interval=None +) + +def task_import_resource_to_bq(**context): + logging.info('Invoking import resource to bigquery') + logging.info("resource: {}".format(context['params'].get('resource', {}))) + + gc_file_url = context['params'].get('big_query', {}).get('gcs_uri') + bq_project_id = context['params'].get('big_query', {}).get('bq_project_id') + bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id') + bq_table_name = context['params'].get('big_query', {}).get('bq_table_name') + logging.info("bq_table_name: {}".format(bq_table_name)) + + raw_schema = context['params'].get('resource', {}).get('schema') + eval_schema = json.loads(raw_schema) + eval_schema = ast.literal_eval(eval_schema) + schema = eval_schema.get('fields') + logging.info("SCHEMA: {}".format(schema)) + + # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test" + bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name) + logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) + ckan_conf = context['params'].get('ckan_config', {}) + ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + +import_resource_to_bq_task = PythonOperator( + task_id='import_resource_to_bq', + provide_context=True, + python_callable=task_import_resource_to_bq, + dag=dag, +) \ No newline at end of file diff --git a/aircan/dags/api_ckan_import_to_bq.py b/aircan/dags/api_ckan_import_to_bq.py index 0068dea..d38166d 100644 --- a/aircan/dags/api_ckan_import_to_bq.py +++ b/aircan/dags/api_ckan_import_to_bq.py @@ -6,7 +6,8 @@ from datetime import date, datetime # Local imports -from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv +from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv +from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update # Third-party library imports from airflow import DAG @@ -15,7 +16,7 @@ from airflow.models import Variable from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago - +import traceback args = { 'start_date': days_ago(0), @@ -41,12 +42,14 @@ } dag = DAG( - dag_id='ckan_api_import_to_bq', + dag_id='ckan_api_import_to_bq_v2', default_args=args, schedule_interval=None ) def task_import_resource_to_bq(**context): + ckan_api_key = context['params'].get('ckan_config', {}).get('api_key') + ckan_site_url = context['params'].get('ckan_config', {}).get('site_url') logging.info('Invoking import resource to bigquery') logging.info("resource: {}".format(context['params'].get('resource', {}))) @@ -58,7 +61,8 @@ def task_import_resource_to_bq(**context): raw_schema = context['params'].get('resource', {}).get('schema') eval_schema = json.loads(raw_schema) - eval_schema = ast.literal_eval(eval_schema) + if isinstance(eval_schema, str): + eval_schema = ast.literal_eval(eval_schema) schema = eval_schema.get('fields') logging.info("SCHEMA: {}".format(schema)) @@ -67,11 +71,23 @@ def task_import_resource_to_bq(**context): logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) ckan_conf = context['params'].get('ckan_config', {}) ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + dag_run_id = context['run_id'] + res_id = ckan_conf.get('resource_id') + ckan_conf['dag_run_id'] = dag_run_id bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'complete', + 'message': 'Data ingestion completed successfully for "{res_id}".'.format( + res_id=res_id), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) import_resource_to_bq_task = PythonOperator( - task_id='import_resource_to_bq', + task_id='import_resource_to_bq_v2', provide_context=True, python_callable=task_import_resource_to_bq, dag=dag, -) \ No newline at end of file +) diff --git a/aircan/dags/api_ckan_import_to_bq_v2.py b/aircan/dags/api_ckan_import_to_bq_v2.py new file mode 100644 index 0000000..d38166d --- /dev/null +++ b/aircan/dags/api_ckan_import_to_bq_v2.py @@ -0,0 +1,93 @@ + +import logging +import time +import json +import ast +from datetime import date, datetime + +# Local imports +from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv +from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update + +# Third-party library imports +from airflow import DAG +from airflow.exceptions import AirflowException + +from airflow.models import Variable +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +import traceback + +args = { + 'start_date': days_ago(0), + 'params': { + "resource": { + "path": "path/to/my.csv", + "format": "CSV", + "ckan_resource_id": "res-id-123", + "schema": { + "fields": "['field1', 'field2']" + } + }, + "ckan_config": { + "api_key": "API_KEY", + "site_url": "URL", + }, + "big_query": { + "bq_project_id": "bigquery_project_id", + "bq_dataset_id": "bigquery_dataset_id" + }, + "output_bucket": str(date.today()) + } +} + +dag = DAG( + dag_id='ckan_api_import_to_bq_v2', + default_args=args, + schedule_interval=None +) + +def task_import_resource_to_bq(**context): + ckan_api_key = context['params'].get('ckan_config', {}).get('api_key') + ckan_site_url = context['params'].get('ckan_config', {}).get('site_url') + logging.info('Invoking import resource to bigquery') + logging.info("resource: {}".format(context['params'].get('resource', {}))) + + gc_file_url = context['params'].get('big_query', {}).get('gcs_uri') + bq_project_id = context['params'].get('big_query', {}).get('bq_project_id') + bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id') + bq_table_name = context['params'].get('big_query', {}).get('bq_table_name') + logging.info("bq_table_name: {}".format(bq_table_name)) + + raw_schema = context['params'].get('resource', {}).get('schema') + eval_schema = json.loads(raw_schema) + if isinstance(eval_schema, str): + eval_schema = ast.literal_eval(eval_schema) + schema = eval_schema.get('fields') + logging.info("SCHEMA: {}".format(schema)) + + # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test" + bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name) + logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) + ckan_conf = context['params'].get('ckan_config', {}) + ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') + dag_run_id = context['run_id'] + res_id = ckan_conf.get('resource_id') + ckan_conf['dag_run_id'] = dag_run_id + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) + status_dict = { + 'dag_run_id': dag_run_id, + 'resource_id': res_id, + 'state': 'complete', + 'message': 'Data ingestion completed successfully for "{res_id}".'.format( + res_id=res_id), + 'clear_logs': True + } + aircan_status_update(ckan_site_url, ckan_api_key, status_dict) + +import_resource_to_bq_task = PythonOperator( + task_id='import_resource_to_bq_v2', + provide_context=True, + python_callable=task_import_resource_to_bq, + dag=dag, +) diff --git a/aircan/dependencies/google_cloud/bigquery_handler_v2.py b/aircan/dependencies/google_cloud/bigquery_handler_v2.py new file mode 100644 index 0000000..33bb39c --- /dev/null +++ b/aircan/dependencies/google_cloud/bigquery_handler_v2.py @@ -0,0 +1,152 @@ +from google.cloud import bigquery +import google.api_core.exceptions +from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update +import json +import logging + +def replace_all(dict, string): + for key in dict: + string = string.replace(key, dict[key]) + return string + +def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf): + try: + client = bigquery.Client() + + try: + job_config = bigquery.LoadJobConfig() + + schema = bq_schema_from_table_schema(table_schema) + job_config.schema = schema + + job_config.skip_leading_rows = 1 + job_config.source_format = bigquery.SourceFormat.CSV + # overwrite a Table + job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + load_job = client.load_table_from_uri( + gcs_path, table_id, job_config=job_config + ) + + load_job.result() # Waits for table load to complete. + destination_table = client.get_table(table_id) + except Exception as e: + logging.info(e) + # Use a list to build the string components efficiently. + error_lines = [] + error_lines.append( + "BigQuery Load Job Failed with a BadRequest." + ) + error_lines.append(f"Original API message: {e}") + + # The key part: Iterate through the e.errors list and append to our list. + if load_job.errors: + error_lines.append("\n--- Detailed Error Breakdown ---") + logging.info(load_job.errors) + for i, error in enumerate(load_job.errors): + # Format each error dictionary into a readable line. + line = ( + f"Error {i+1}: " + f"Reason: {error.get('reason', 'N/A')}, " + f"Location: {error.get('location', 'N/A')}, " + f"Message: {error.get('message', 'N/A')}" + ) + error_lines.append(line) + else: + error_lines.append("No detailed errors were provided in the exception.") + + # Join the list of lines into a single string with newlines. + error_report_string = "\n".join(error_lines) + logging.info(error_report_string) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'failed', + 'message': error_report_string, + 'dag_run_id': ckan_conf.get('dag_run_id') + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + raise AirflowCKANException('Data ingestion has failed.', str(e)) + #status_dict = { + # 'res_id': ckan_conf.get('resource_id'), + # 'state': 'progress', + # 'message': 'Data ingestion using provided schema failed, trying to autodetect schema.', + # 'dag_run_id': ckan_conf.get('dag_run_id') + #} + #aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + #job_config = bigquery.LoadJobConfig() + #job_config.autodetect = True + + #job_config.skip_leading_rows = 1 + #job_config.source_format = bigquery.SourceFormat.CSV + ## overwrite a Table + #job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + #load_job = client.load_table_from_uri( + # gcs_path, table_id, job_config=job_config + #) + #load_job.result() # Waits for table load to complete. + #destination_table = client.get_table(table_id) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'progress', + 'message': 'Data ingestion is in progress.', + 'dag_run_id': ckan_conf.get('dag_run_id') + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + if destination_table: + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'state': 'complete', + 'message': "Ingession Completed", + 'dag_run_id': ckan_conf.get('dag_run_id') + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + return {'success': True, 'message': 'BigQuery Table created successfully.'} + except Exception as e: + replacers = { + 'gs://dx-nhs-staging-giftless/': '', + 'gs://dx-nhs-production-giftless/': '', + 'gs://dx-nhs-prod-giftless/': '', + 'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '', + 'datopian-dx': '', + 'bigquery': '', + 'googleapi': '', + 'google': '' + + } + e = replace_all(replacers,str(e)) + logging.info(e) + status_dict = { + 'res_id': ckan_conf.get('resource_id'), + 'dag_run_id': ckan_conf.get('dag_run_id'), + 'state': 'failed', + 'message': str(e) + } + aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict) + raise AirflowCKANException('Data ingestion has failed.', str(e)) + + +def bq_schema_from_table_schema(table_schema): + mapping = { + 'string': 'STRING', + 'number': 'NUMERIC', + 'integer': 'NUMERIC', + 'boolean': 'BOOLEAN', + 'object': 'STRING', + 'array': 'STRING', + 'date': 'DATE', + 'time': 'TIME', + 'datetime': 'DATETIME', + 'year': 'NUMERIC', + 'yearmonth': 'STRING', + 'duration': 'DATETIME', + 'geopoint': 'GEOPOINT', + 'geojson': 'STRING', + 'any': 'STRING' + } + + def _convert(field): + # Â TODO: support for e.g. required + return bigquery.SchemaField(field['name'], + mapping.get(field['type'], field['type']), + 'NULLABLE' + ) + return [_convert(field) for field in table_schema] diff --git a/aircan/dependencies/utils.py b/aircan/dependencies/utils.py index 916932f..f8b73c8 100644 --- a/aircan/dependencies/utils.py +++ b/aircan/dependencies/utils.py @@ -72,6 +72,42 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0): time(hour, minute, second, microsecond, tzinfo=timezone.TIMEZONE), ) +def aircan_status_update_nhs (site_url, ckan_api_key, status_dict): + """ + Update aircan run status like pending, error, process, complete + on ckan with message. + """ + logging.info('Updating data loading status') + try: + request_data = { + 'dag_run_id': status_dict.get('dag_run_id', ''), + 'resource_id': status_dict.get('res_id', ''), + 'state': status_dict.get('state', ''), + 'last_updated': str(datetime.utcnow()), + 'message': status_dict.get('message', ''), + } + + if status_dict.get('error', False): + request_data.update({'error': { + 'message' : status_dict.get('error', '') + }}) + + url = urljoin(site_url, '/api/3/action/aircan_status_update') + response = requests.post(url, + data=json.dumps(request_data), + headers={'Content-Type': 'application/json', + 'Authorization': ckan_api_key}) + print(response.text) + if response.status_code == 200: + resource_json = response.json() + logging.info('Loading status updated successfully in CKAN.') + return {'success': True} + else: + print(response.json()) + return response.json() + except Exception as e: + logging.error('Failed to update status in CKAN. {0}'.format(e)) + def aircan_status_update(site_url, ckan_api_key, status_dict): """ Update aircan run status like pending, error, process, complete @@ -325,4 +361,4 @@ def join_path(path, *paths): """ for p in paths: path = os.path.join(path, p) - return path \ No newline at end of file + return path