Skip to content

Conversation

@luccasmmg
Copy link
Member

@luccasmmg luccasmmg commented Jul 2, 2025

Summary by CodeRabbit

  • New Features
    • Introduced a new manual workflow to import CKAN resource data into BigQuery.
    • Added automated CSV data loading from cloud storage into BigQuery with schema support and error handling.
    • Enhanced status update functionality with support for additional metadata during data import.
    • Improved import process visibility with detailed logging and status updates.

@coderabbitai
Copy link

coderabbitai bot commented Jul 2, 2025

"""

Walkthrough

A new Airflow DAG, ckan_api_import_to_bq_v2, has been added to import data from a CKAN resource into BigQuery. The DAG uses a PythonOperator to execute a function that extracts parameters, prepares configurations, and triggers a CSV import into BigQuery. The DAG is configured for manual execution. Additionally, a new BigQuery handler module provides functions to import CSV to BigQuery tables with schema mapping, error sanitization, and ingestion status updates. A utility function to update CKAN run status with DAG run ID support was also added.

Changes

File Change Summary
aircan/dags/api_ckan_import_to_bq_v2.py Added new DAG ckan_api_import_to_bq_v2 with a PythonOperator task and function to import CKAN resource data into BigQuery, enhanced with DAG run ID and improved schema parsing.
aircan/dependencies/google_cloud/bigquery_handler_v2.py Added new module with functions to import CSV to BigQuery, map table schemas, sanitize error messages, and update ingestion status.
aircan/dependencies/utils.py Added aircan_status_update_nhs function to update CKAN status with DAG run ID support and enhanced payload construction.

Sequence Diagram(s)

sequenceDiagram
    participant Trigger
    participant Airflow DAG
    participant PythonOperator
    participant task_import_resource_to_bq
    participant bq_import_csv

    Trigger->>Airflow DAG: Trigger DAG (manual/external)
    Airflow DAG->>PythonOperator: Run import_resource_to_bq_v2 task
    PythonOperator->>task_import_resource_to_bq: Execute function
    task_import_resource_to_bq->>bq_import_csv: Call to import CSV to BigQuery
    bq_import_csv-->>task_import_resource_to_bq: Return status
    task_import_resource_to_bq-->>PythonOperator: Log and complete
Loading

Estimated code review effort

2 (10–30 minutes)

Poem

A new DAG hops into view,
To fetch CKAN data, fresh as dew.
With Python tasks and logging bright,
It loads to BigQuery, day or night.
No schedule set, just on demand—
Data flows with a rabbit’s hand!
🐇✨
"""

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
aircan/dags/api_ckan_import_to_bq_v2.py (3)

3-3: Remove unused imports.

The static analysis tools correctly identified several unused imports that should be removed to keep the code clean.

-import time
-import ast
-from datetime import date, datetime
+from datetime import date
-from airflow.exceptions import AirflowException
-from airflow.models import Variable

Also applies to: 5-5, 6-6, 13-13, 15-15


43-47: Fix formatting for PEP 8 compliance.

Add proper spacing before the function definition.

 }
+

 dag = DAG(

71-76: Fix formatting and update deprecated parameter.

  1. Formatting: Add proper spacing after the function definition
  2. Deprecation: The provide_context=True parameter is deprecated in newer Airflow versions
+

 import_resource_to_bq_task = PythonOperator(
     task_id='import_resource_to_bq_v2',
-    provide_context=True,
     python_callable=task_import_resource_to_bq,
     dag=dag,
 )

Note: The provide_context=True parameter is automatically handled in newer Airflow versions when using **context in the function signature.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8f8a520 and b0859e9.

📒 Files selected for processing (1)
  • aircan/dags/api_ckan_import_to_bq_v2.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
aircan/dags/api_ckan_import_to_bq_v2.py

3-3: time imported but unused

Remove unused import: time

(F401)


5-5: ast imported but unused

Remove unused import: ast

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


13-13: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


15-15: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)

🪛 Flake8 (7.2.0)
aircan/dags/api_ckan_import_to_bq_v2.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 5-5: 'ast' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 13-13: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 15-15: 'airflow.models.Variable' imported but unused

(F401)


[error] 49-49: expected 2 blank lines, found 1

(E302)


[error] 71-71: expected 2 blank lines after class or function definition, found 1

(E305)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
aircan/dags/api_ckan_import_to_bq_v2.py (2)

22-40: Parameter structure issues remain unaddressed.

The default parameters still don't match what the task function expects, as previously identified. The task function looks for big_query.gcs_uri and big_query.bq_table_name which aren't defined in the default args.


49-71: Critical error handling and validation issues remain unaddressed.

The function still has the same critical issues that were previously identified:

  • Parameter access without validation that will cause KeyError exceptions
  • Unsafe JSON parsing without error handling
  • No error handling for the BigQuery import operation
  • Potential security concerns with logging sensitive information

These issues will cause runtime failures and should be addressed.

🧹 Nitpick comments (2)
aircan/dags/api_ckan_import_to_bq_v2.py (2)

3-3: Remove unused imports.

Static analysis tools have identified several unused imports that should be removed to keep the code clean.

-import time
-from datetime import date, datetime
+from datetime import date
-from airflow.exceptions import AirflowException
-from airflow.models import Variable

Also applies to: 6-6, 13-13, 15-15


49-49: Fix formatting issues.

Missing required blank lines before function and after class/function definitions.

+

 def task_import_resource_to_bq(**context):
 )
+

Also applies to: 73-73

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b0859e9 and 832667c.

📒 Files selected for processing (1)
  • aircan/dags/api_ckan_import_to_bq_v2.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
aircan/dags/api_ckan_import_to_bq_v2.py

3-3: time imported but unused

Remove unused import: time

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


13-13: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


15-15: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)

🪛 Flake8 (7.2.0)
aircan/dags/api_ckan_import_to_bq_v2.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 13-13: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 15-15: 'airflow.models.Variable' imported but unused

(F401)


[error] 49-49: expected 2 blank lines, found 1

(E302)


[error] 73-73: expected 2 blank lines after class or function definition, found 1

(E305)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
aircan/dags/api_ckan_import_to_bq_v2.py (2)

35-38: Add missing BigQuery parameters.

The default parameters are missing required BigQuery parameters that the task function expects to access.

Based on the past review comments, this issue was previously identified but the parameters gcs_uri and bq_table_name are still missing from the default args.

         "big_query": {
             "bq_project_id": "bigquery_project_id",
-            "bq_dataset_id": "bigquery_dataset_id"
+            "bq_dataset_id": "bigquery_dataset_id",
+            "gcs_uri": "gs://bucket/path/to/file.csv",
+            "bq_table_name": "table_name"
         },

49-71: Add comprehensive error handling and parameter validation.

The task function has multiple critical issues that will cause runtime failures, as previously identified in past reviews.

The function still lacks proper parameter validation and error handling. Here's a comprehensive fix:

 def task_import_resource_to_bq(**context):
     logging.info('Invoking import resource to bigquery')
-    logging.info("resource: {}".format(context['params'].get('resource', {})))
-
-    gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
-    bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
-    bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
-    bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
+    
+    # Validate required parameters
+    params = context.get('params', {})
+    big_query_params = params.get('big_query', {})
+    resource_params = params.get('resource', {})
+    
+    required_bq_params = ['gcs_uri', 'bq_project_id', 'bq_dataset_id', 'bq_table_name']
+    for param in required_bq_params:
+        if not big_query_params.get(param):
+            raise ValueError(f"Missing required parameter: big_query.{param}")
+    
+    gc_file_url = big_query_params['gcs_uri']
+    bq_project_id = big_query_params['bq_project_id']
+    bq_dataset_id = big_query_params['bq_dataset_id']
+    bq_table_name = big_query_params['bq_table_name']
     logging.info("bq_table_name: {}".format(bq_table_name))
     
-    raw_schema = context['params'].get('resource', {}).get('schema')
-    eval_schema = json.loads(raw_schema)
-    if isinstance(eval_schema, str):
-        eval_schema = ast.literal_eval(eval_schema)
-    schema = eval_schema.get('fields')
+    raw_schema = resource_params.get('schema', {})
+    try:
+        if isinstance(raw_schema, str):
+            eval_schema = json.loads(raw_schema)
+        else:
+            eval_schema = raw_schema
+        schema = eval_schema.get('fields', [])
+    except (json.JSONDecodeError, AttributeError) as e:
+        raise ValueError(f"Invalid schema format: {e}")
+        
     logging.info("SCHEMA: {}".format(schema))

     # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
     bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)          
     logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
-    ckan_conf = context['params'].get('ckan_config', {})
-    ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
-    bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
+    
+    ckan_conf = params.get('ckan_config', {}).copy()
+    ckan_conf['resource_id'] = resource_params.get('ckan_resource_id')
+    
+    try:
+        bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
+        logging.info("Successfully imported data to BigQuery")
+    except Exception as e:
+        logging.error(f"Failed to import data to BigQuery: {e}")
+        raise
🧹 Nitpick comments (5)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (4)

2-2: Remove unused imports.

Static analysis correctly identified unused imports that should be removed to clean up the code.

-import google.api_core.exceptions
 from aircan.dependencies.utils import AirflowCKANException, aircan_status_update
-import json
 import logging

Also applies to: 4-4


7-10: Improve parameter naming and function design.

The parameter name dict shadows the built-in dict type, which can cause confusion. Also, the function could be more robust.

-def replace_all(dict, string):
-    for key in dict:
-        string = string.replace(key, dict[key])
-    return string
+def replace_all(replacements, text):
+    """Replace multiple substrings in text using a dictionary of replacements."""
+    for old, new in replacements.items():
+        text = text.replace(old, new)
+    return text

59-59: Fix formatting and exception chaining.

Address the static analysis warnings for proper formatting and exception handling.

-        e = replace_all(replacers,str(e))
+        e = replace_all(replacers, str(e))
         logging.info(e)
         status_dict = {
             'res_id': ckan_conf.get('resource_id'),
             'state': 'failed',
             'message': str(e)
         }
         aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e

Also applies to: 67-67


90-90: Address the TODO comment.

The TODO comment indicates missing functionality for required fields in the schema.

The TODO comment suggests support for required fields is missing. Do you want me to implement this feature or create an issue to track it?

aircan/dags/api_ckan_import_to_bq_v2.py (1)

3-3: Remove unused imports.

Static analysis correctly identified several unused imports that should be removed to clean up the code.

 import logging
-import time
 import json
 import ast
-from datetime import date, datetime
+from datetime import date

 # Local imports
 from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv

 # Third-party library imports
 from airflow import DAG
-from airflow.exceptions import AirflowException

-from airflow.models import Variable
 from airflow.operators.python_operator import PythonOperator
 from airflow.utils.dates import days_ago

Also applies to: 6-6, 13-13, 15-15

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 832667c and 4358c47.

📒 Files selected for processing (2)
  • aircan/dags/api_ckan_import_to_bq_v2.py (1 hunks)
  • aircan/dependencies/google_cloud/bigquery_handler_v2.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (1)
aircan/dependencies/utils.py (2)
  • AirflowCKANException (165-172)
  • aircan_status_update (75-106)
aircan/dags/api_ckan_import_to_bq_v2.py (2)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (1)
  • bq_import_csv (12-67)
aircan/dependencies/utils.py (1)
  • days_ago (69-73)
🪛 Ruff (0.11.9)
aircan/dependencies/google_cloud/bigquery_handler_v2.py

2-2: google.api_core.exceptions imported but unused

Remove unused import: google.api_core.exceptions

(F401)


4-4: json imported but unused

Remove unused import: json

(F401)


67-67: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

aircan/dags/api_ckan_import_to_bq_v2.py

3-3: time imported but unused

Remove unused import: time

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


13-13: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


15-15: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)

🪛 Flake8 (7.2.0)
aircan/dependencies/google_cloud/bigquery_handler_v2.py

[error] 2-2: 'google.api_core.exceptions' imported but unused

(F401)


[error] 4-4: 'json' imported but unused

(F401)


[error] 7-7: expected 2 blank lines, found 1

(E302)


[error] 12-12: expected 2 blank lines, found 1

(E302)


[error] 59-59: missing whitespace after ','

(E231)

aircan/dags/api_ckan_import_to_bq_v2.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 13-13: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 15-15: 'airflow.models.Variable' imported but unused

(F401)


[error] 49-49: expected 2 blank lines, found 1

(E302)


[error] 73-73: expected 2 blank lines after class or function definition, found 1

(E305)

Comment on lines +48 to +58
replacers = {
'gs://dx-nhs-staging-giftless/': '',
'gs://dx-nhs-production-giftless/': '',
'gs://dx-nhs-prod-giftless/': '',
'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
'datopian-dx': '',
'bigquery': '',
'googleapi': '',
'google': ''

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Make error sanitization configurable.

The hardcoded bucket names and project IDs make this code environment-specific and brittle. Consider making this configurable.

-        replacers = {
-            'gs://dx-nhs-staging-giftless/': '',
-            'gs://dx-nhs-production-giftless/': '',
-            'gs://dx-nhs-prod-giftless/': '',
-            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
-            'datopian-dx': '',
-            'bigquery': '',
-            'googleapi': '',
-            'google': ''
-
-        }
+        # Configurable sanitization - could be passed as parameter or from config
+        replacers = ckan_conf.get('error_sanitization', {
+            'gs://dx-nhs-staging-giftless/': '',
+            'gs://dx-nhs-production-giftless/': '',
+            'gs://dx-nhs-prod-giftless/': '',
+            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
+            'datopian-dx': '',
+            'bigquery': '',
+            'googleapi': '',
+            'google': ''
+        })

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 48 to
58, the error sanitization uses hardcoded bucket names and project IDs, making
it environment-specific and brittle. Refactor the code to accept these replacer
values from a configuration source such as environment variables or a config
file, allowing different environments to specify their own values without
changing the code.

Comment on lines 39 to 46
if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify redundant condition and fix typo.

The condition if destination_table: is always true since get_table() would raise an exception if the table doesn't exist. Also, there's a typo in the message.

-        if destination_table:
-            status_dict = {
-                'res_id': ckan_conf.get('resource_id'),
-                'state': 'complete',
-                'message': "Ingession Completed"
-            }
-            aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
-            return {'success': True, 'message': 'BigQuery Table created successfully.'}
+        # Update status to complete
+        status_dict = {
+            'res_id': ckan_conf.get('resource_id'),
+            'state': 'complete',
+            'message': "Ingestion Completed"
+        }
+        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
+        return {'success': True, 'message': 'BigQuery Table created successfully.'}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if destination_table:
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingession Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
# Update status to complete
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'complete',
'message': "Ingestion Completed"
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
return {'success': True, 'message': 'BigQuery Table created successfully.'}
🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 39 to
46, remove the redundant if condition checking destination_table since
get_table() raises an exception if the table doesn't exist, making the check
unnecessary. Also, fix the typo in the message from "Ingession Completed" to
"Ingestion Completed".

Comment on lines 33 to 38
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect status update timing.

The status update logic is incorrect - it sends "progress" status after the job has already completed successfully. This should be sent before starting the job.

+        # Update status before starting ingestion
+        status_dict = {
+            'res_id': ckan_conf.get('resource_id'),
+            'state': 'progress',
+            'message': 'Data ingestion is in progress.'
+        }
+        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
+        
         load_job = client.load_table_from_uri(
             gcs_path, table_id, job_config=job_config
         )

         load_job.result()  # Waits for table load to complete.
         destination_table = client.get_table(table_id)
-        status_dict = {
-            'res_id': ckan_conf.get('resource_id'),
-            'state': 'progress',
-            'message': 'Data ingestion is in progress.'
-        }
-        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
# Update status before starting ingestion
status_dict = {
'res_id': ckan_conf.get('resource_id'),
'state': 'progress',
'message': 'Data ingestion is in progress.'
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
load_job = client.load_table_from_uri(
gcs_path, table_id, job_config=job_config
)
load_job.result() # Waits for table load to complete.
destination_table = client.get_table(table_id)
🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py around lines 33 to
38, the status update indicating "progress" is sent after the job completes,
which is incorrect. Move the status update call with 'state': 'progress' to
before the job starts to correctly reflect the ingestion progress status.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
aircan/dags/api_ckan_import_to_bq_v2.py (2)

23-41: Parameter structure still has inconsistencies.

The default parameters don't include required keys that the task function expects to access, specifically gcs_uri and bq_table_name in the big_query section.

        "big_query": {
            "bq_project_id": "bigquery_project_id",
-            "bq_dataset_id": "bigquery_dataset_id"
+            "bq_dataset_id": "bigquery_dataset_id",
+            "gcs_uri": "gs://bucket/path/to/file.csv",
+            "bq_table_name": "table_name"
        },

50-67: Add parameter validation and improve error handling.

The function accesses nested dictionary keys without validation and has unsafe JSON parsing that could cause runtime failures.

 def task_import_resource_to_bq(**context):
     ckan_api_key = context['params'].get('ckan_config', {}).get('api_key')
     ckan_site_url = context['params'].get('ckan_config', {}).get('site_url')
     logging.info('Invoking import resource to bigquery')
-    logging.info("resource: {}".format(context['params'].get('resource', {})))
+    
+    # Validate required parameters
+    params = context.get('params', {})
+    big_query_params = params.get('big_query', {})
+    resource_params = params.get('resource', {})
+    
+    required_bq_params = ['gcs_uri', 'bq_project_id', 'bq_dataset_id', 'bq_table_name']
+    for param in required_bq_params:
+        if not big_query_params.get(param):
+            raise ValueError(f"Missing required parameter: big_query.{param}")

-    gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
-    bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
-    bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
-    bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
+    gc_file_url = big_query_params['gcs_uri']
+    bq_project_id = big_query_params['bq_project_id']
+    bq_dataset_id = big_query_params['bq_dataset_id']
+    bq_table_name = big_query_params['bq_table_name']
     logging.info("bq_table_name: {}".format(bq_table_name))
     
-    raw_schema = context['params'].get('resource', {}).get('schema')
-    eval_schema = json.loads(raw_schema)
-    if isinstance(eval_schema, str):
-        eval_schema = ast.literal_eval(eval_schema)
-    schema = eval_schema.get('fields')
+    raw_schema = resource_params.get('schema', {})
+    try:
+        if isinstance(raw_schema, str):
+            eval_schema = json.loads(raw_schema)
+        else:
+            eval_schema = raw_schema
+        if isinstance(eval_schema, str):
+            eval_schema = ast.literal_eval(eval_schema)
+        schema = eval_schema.get('fields', [])
+    except (json.JSONDecodeError, ValueError, SyntaxError) as e:
+        raise ValueError(f"Invalid schema format: {e}")
     logging.info("SCHEMA: {}".format(schema))
🧹 Nitpick comments (2)
aircan/dags/api_ckan_import_to_bq_v2.py (2)

3-3: Remove unused imports.

Several imports are not used in the code and should be removed to improve code cleanliness.

-import time
-from datetime import date, datetime
+from datetime import date
-from airflow.exceptions import AirflowException
-from airflow.models import Variable

Also applies to: 6-6, 14-14, 16-16


98-103: Consider using the newer BashOperator syntax.

The provide_context=True parameter is deprecated in newer versions of Airflow. Modern Airflow automatically provides context to callable functions.

import_resource_to_bq_task = PythonOperator(
    task_id='import_resource_to_bq_v2',
-    provide_context=True,
    python_callable=task_import_resource_to_bq,
    dag=dag,
)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4358c47 and 58bfb30.

📒 Files selected for processing (1)
  • aircan/dags/api_ckan_import_to_bq_v2.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
aircan/dags/api_ckan_import_to_bq_v2.py

3-3: time imported but unused

Remove unused import: time

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


14-14: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


16-16: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)


96-96: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 Flake8 (7.2.0)
aircan/dags/api_ckan_import_to_bq_v2.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 14-14: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 16-16: 'airflow.models.Variable' imported but unused

(F401)


[error] 79-79: continuation line missing indentation or outdented

(E122)


[error] 80-80: continuation line missing indentation or outdented

(E122)


[error] 81-81: continuation line missing indentation or outdented

(E122)


[error] 82-82: continuation line missing indentation or outdented

(E122)


[error] 84-84: continuation line missing indentation or outdented

(E122)


[error] 89-89: continuation line missing indentation or outdented

(E122)


[error] 90-90: continuation line missing indentation or outdented

(E122)


[error] 91-91: continuation line missing indentation or outdented

(E122)


[error] 92-92: continuation line missing indentation or outdented

(E122)


[error] 93-93: continuation line missing indentation or outdented

(E122)

Comment on lines 78 to 85
status_dict = {
'dag_run_id': dag_run_id,
'resource_id': res_id,
'state': 'complete',
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
res_id=res_id),
'clear_logs': True
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix indentation issues in status dictionaries.

The status dictionaries have incorrect indentation that will cause syntax errors.

        status_dict = {
-        'dag_run_id': dag_run_id,
-        'resource_id': res_id,
-        'state': 'complete',
-        'message': 'Data ingestion completed successfully for "{res_id}".'.format(
-                    res_id=res_id),
-        'clear_logs': True
+            'dag_run_id': dag_run_id,
+            'resource_id': res_id,
+            'state': 'complete',
+            'message': 'Data ingestion completed successfully for "{res_id}".'.format(
+                        res_id=res_id),
+            'clear_logs': True
         }
        status_dict = {
-        'dag_run_id': dag_run_id,
-        'resource_id': res_id,
-        'state': 'failed',
-        'message': str(e),
-        'clear_logs': True
+            'dag_run_id': dag_run_id,
+            'resource_id': res_id,
+            'state': 'failed',
+            'message': str(e),
+            'clear_logs': True
         }

Also applies to: 88-95

🧰 Tools
🪛 Flake8 (7.2.0)

[error] 79-79: continuation line missing indentation or outdented

(E122)


[error] 80-80: continuation line missing indentation or outdented

(E122)


[error] 81-81: continuation line missing indentation or outdented

(E122)


[error] 82-82: continuation line missing indentation or outdented

(E122)


[error] 84-84: continuation line missing indentation or outdented

(E122)

🤖 Prompt for AI Agents
In aircan/dags/api_ckan_import_to_bq_v2.py around lines 78 to 85 and 88 to 95,
the status dictionaries have incorrect indentation causing syntax errors. Adjust
the indentation so that all key-value pairs inside the dictionaries are aligned
properly under the dictionary variable assignment, ensuring consistent use of
spaces for indentation to fix the syntax issues.

'clear_logs': True
}
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)
raise Exception(str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve exception handling to preserve original context.

The current exception handling loses the original exception context. Use proper exception chaining.

-        raise Exception(str(e))
+        raise Exception(str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise Exception(str(e))
raise Exception(str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

96-96: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dags/api_ckan_import_to_bq_v2.py at line 96, the current exception
handling raises a new Exception with the string of the caught exception, which
loses the original traceback context. To fix this, use the "raise ... from e"
syntax to chain the new exception to the original one, preserving the full
exception context for better debugging.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (3)

48-62: Fix status update timing and typo.

The status update logic is incorrect - it sends "progress" status after the job has already completed successfully. This should be sent before starting the job. Also, there's a typo in "Ingession".

Apply this diff to fix the timing and typo:

+        # Update status before starting ingestion
+        status_dict = {
+            'res_id': ckan_conf.get('resource_id'),
+            'state': 'progress',
+            'message': 'Data ingestion is in progress.',
+            'dag_run_id': ckan_conf.get('dag_run_id')
+        }
+        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
+        
         load_job.result()  # Waits for table load to complete.
         destination_table = client.get_table(table_id)
-        status_dict = {
-            'res_id': ckan_conf.get('resource_id'),
-            'state': 'progress',
-            'message': 'Data ingestion is in progress.',
-            'dag_run_id': ckan_conf.get('dag_run_id')
-        }
-        aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
         if destination_table:
             status_dict = {
                 'res_id': ckan_conf.get('resource_id'),
                 'state': 'complete',
-                'message': "Ingession Completed",
+                'message': "Ingestion Completed",
-            'dag_run_id': ckan_conf.get('dag_run_id')
+                'dag_run_id': ckan_conf.get('dag_run_id')
             }
             aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
             return {'success': True, 'message': 'BigQuery Table created successfully.'}

85-85: Fix exception handling to preserve original context.

The current exception handling loses the original exception context. Use proper exception chaining.

Apply this diff to preserve the exception context:

-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e

65-75: Make error sanitization configurable.

The hardcoded bucket names and project IDs make this code environment-specific and brittle. Consider making this configurable.

Apply this diff to make the sanitization configurable:

-        replacers = {
-            'gs://dx-nhs-staging-giftless/': '',
-            'gs://dx-nhs-production-giftless/': '',
-            'gs://dx-nhs-prod-giftless/': '',
-            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
-            'datopian-dx': '',
-            'bigquery': '',
-            'googleapi': '',
-            'google': ''
-
-        }
+        # Configurable sanitization - could be passed as parameter or from config
+        replacers = ckan_conf.get('error_sanitization', {
+            'gs://dx-nhs-staging-giftless/': '',
+            'gs://dx-nhs-production-giftless/': '',
+            'gs://dx-nhs-prod-giftless/': '',
+            'https://bigquery.googleapis.com/bigquery/v2/projects/datopian-dx/jobs?prettyPrint=false': '',
+            'datopian-dx': '',
+            'bigquery': '',
+            'googleapi': '',
+            'google': ''
+        })
aircan/dags/api_ckan_import_to_bq_v2.py (1)

78-85: Fix indentation issues in status dictionary.

The status dictionary has incorrect indentation that will cause syntax errors.

Apply this diff to fix the indentation:

     status_dict = {
-    'dag_run_id': dag_run_id,
-    'resource_id': res_id,
-    'state': 'complete',
-    'message': 'Data ingestion completed successfully for "{res_id}".'.format(
-                res_id=res_id),
-    'clear_logs': True
+        'dag_run_id': dag_run_id,
+        'resource_id': res_id,
+        'state': 'complete',
+        'message': 'Data ingestion completed successfully for "{res_id}".'.format(
+                    res_id=res_id),
+        'clear_logs': True
     }
🧹 Nitpick comments (3)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (2)

2-4: Remove unused imports.

The imports google.api_core.exceptions and json are not used in the code.

Apply this diff to remove the unused imports:

 from google.cloud import bigquery
-import google.api_core.exceptions
 from aircan.dependencies.utils import AirflowCKANException, aircan_status_update_nhs as aircan_status_update
-import json
 import logging

34-34: Remove unused variable assignment.

The variable e is assigned in the exception block but never used.

Apply this diff to remove the unused assignment:

-        except Exception as e:
+        except Exception:
aircan/dags/api_ckan_import_to_bq_v2.py (1)

3-3: Remove unused imports.

Several imports are not used in the code and should be removed to keep the code clean.

Apply this diff to remove the unused imports:

 import logging
-import time
 import json
 import ast
-from datetime import date, datetime
+from datetime import date

 # Local imports
 from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
 from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update

 # Third-party library imports
 from airflow import DAG
-from airflow.exceptions import AirflowException

-from airflow.models import Variable
 from airflow.operators.python_operator import PythonOperator
 from airflow.utils.dates import days_ago
-import traceback

Also applies to: 6-6, 14-14, 16-16, 19-19

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 58bfb30 and 3a2c848.

📒 Files selected for processing (3)
  • aircan/dags/api_ckan_import_to_bq_v2.py (1 hunks)
  • aircan/dependencies/google_cloud/bigquery_handler_v2.py (1 hunks)
  • aircan/dependencies/utils.py (2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (1)
aircan/dependencies/utils.py (1)
  • aircan_status_update_nhs (75-109)
🪛 Ruff (0.12.2)
aircan/dependencies/utils.py

102-102: Local variable resource_json is assigned to but never used

Remove assignment to unused variable resource_json

(F841)

aircan/dependencies/google_cloud/bigquery_handler_v2.py

2-2: google.api_core.exceptions imported but unused

Remove unused import: google.api_core.exceptions

(F401)


4-4: json imported but unused

Remove unused import: json

(F401)


34-34: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


85-85: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

aircan/dags/api_ckan_import_to_bq_v2.py

3-3: time imported but unused

Remove unused import: time

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


14-14: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


16-16: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)


19-19: traceback imported but unused

Remove unused import: traceback

(F401)

🪛 Flake8 (7.2.0)
aircan/dependencies/utils.py

[error] 97-97: continuation line under-indented for visual indent

(E128)


[error] 98-98: continuation line under-indented for visual indent

(E128)


[error] 99-99: continuation line under-indented for visual indent

(E128)


[error] 102-102: local variable 'resource_json' is assigned to but never used

(F841)

aircan/dependencies/google_cloud/bigquery_handler_v2.py

[error] 2-2: 'google.api_core.exceptions' imported but unused

(F401)


[error] 4-4: 'json' imported but unused

(F401)


[error] 34-34: local variable 'e' is assigned to but never used

(F841)


[error] 60-60: continuation line missing indentation or outdented

(E122)

aircan/dags/api_ckan_import_to_bq_v2.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 14-14: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 16-16: 'airflow.models.Variable' imported but unused

(F401)


[error] 19-19: 'traceback' imported but unused

(F401)


[error] 79-79: continuation line missing indentation or outdented

(E122)


[error] 80-80: continuation line missing indentation or outdented

(E122)


[error] 81-81: continuation line missing indentation or outdented

(E122)


[error] 82-82: continuation line missing indentation or outdented

(E122)


[error] 84-84: continuation line missing indentation or outdented

(E122)

🔇 Additional comments (1)
aircan/dags/api_ckan_import_to_bq_v2.py (1)

50-87: LGTM! Good implementation of the BigQuery import task.

The function correctly extracts parameters, handles schema parsing, constructs the BigQuery table ID, and calls the import function. The addition of dag_run_id support and proper status updates enhance the functionality.

Comment on lines +75 to +109
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}

if status_dict.get('error', False):
request_data.update({'error': {
'message' : status_dict.get('error', '')
}})

url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(url,
data=json.dumps(request_data),
headers={'Content-Type': 'application/json',
'Authorization': ckan_api_key})
print(response.text)
if response.status_code == 200:
resource_json = response.json()
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
print(response.json())
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix unused variable, indentation, and consistency issues.

The new function has several issues that need to be addressed:

  1. Unused variable - resource_json is assigned but never used
  2. Indentation issues - continuation lines are not properly indented
  3. Inconsistent logging - uses print() statements while the existing aircan_status_update function doesn't
  4. Missing return statement - exception handler doesn't return anything

Apply this diff to fix the issues:

def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
    """
    Update aircan run status like pending, error, process, complete 
    on ckan with message.
    """
    logging.info('Updating data loading status')
    try:
        request_data = { 
            'dag_run_id': status_dict.get('dag_run_id', ''),
            'resource_id': status_dict.get('res_id', ''),
            'state': status_dict.get('state', ''),
            'last_updated': str(datetime.utcnow()),
            'message': status_dict.get('message', ''),
        }

        if status_dict.get('error', False):
            request_data.update({'error': {
                'message' : status_dict.get('error', '')
            }})

        url = urljoin(site_url, '/api/3/action/aircan_status_update')
        response = requests.post(url,
-                        data=json.dumps(request_data),
-                        headers={'Content-Type': 'application/json',
-                                'Authorization': ckan_api_key})
-        print(response.text)
+                         data=json.dumps(request_data),
+                         headers={'Content-Type': 'application/json',
+                                  'Authorization': ckan_api_key})
        if response.status_code == 200:
-            resource_json = response.json()
            logging.info('Loading status updated successfully in CKAN.')
            return {'success': True}
        else:
-            print(response.json())
            return response.json()
    except Exception as e:
        logging.error('Failed to update status in CKAN. {0}'.format(e))
+        return {'success': False, 'error': str(e)}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}
if status_dict.get('error', False):
request_data.update({'error': {
'message' : status_dict.get('error', '')
}})
url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(url,
data=json.dumps(request_data),
headers={'Content-Type': 'application/json',
'Authorization': ckan_api_key})
print(response.text)
if response.status_code == 200:
resource_json = response.json()
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
print(response.json())
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
def aircan_status_update_nhs (site_url, ckan_api_key, status_dict):
"""
Update aircan run status like pending, error, process, complete
on ckan with message.
"""
logging.info('Updating data loading status')
try:
request_data = {
'dag_run_id': status_dict.get('dag_run_id', ''),
'resource_id': status_dict.get('res_id', ''),
'state': status_dict.get('state', ''),
'last_updated': str(datetime.utcnow()),
'message': status_dict.get('message', ''),
}
if status_dict.get('error', False):
request_data.update({'error': {
'message': status_dict.get('error', '')
}})
url = urljoin(site_url, '/api/3/action/aircan_status_update')
response = requests.post(
url,
data=json.dumps(request_data),
headers={
'Content-Type': 'application/json',
'Authorization': ckan_api_key
}
)
if response.status_code == 200:
logging.info('Loading status updated successfully in CKAN.')
return {'success': True}
else:
return response.json()
except Exception as e:
logging.error('Failed to update status in CKAN. {0}'.format(e))
return {'success': False, 'error': str(e)}
🧰 Tools
🪛 Ruff (0.12.2)

102-102: Local variable resource_json is assigned to but never used

Remove assignment to unused variable resource_json

(F841)

🪛 Flake8 (7.2.0)

[error] 97-97: continuation line under-indented for visual indent

(E128)


[error] 98-98: continuation line under-indented for visual indent

(E128)


[error] 99-99: continuation line under-indented for visual indent

(E128)


[error] 102-102: local variable 'resource_json' is assigned to but never used

(F841)

🤖 Prompt for AI Agents
In aircan/dependencies/utils.py from lines 75 to 109, fix the
aircan_status_update_nhs function by removing the unused variable resource_json,
replacing print statements with logging calls for consistency, correcting the
indentation of continuation lines to align properly, and adding a return
statement in the exception handler to return a failure indication or error
details.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (12)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (7)

7-10: Avoid shadowing built-in types with parameter names.

The parameter name dict shadows the built-in dict type, which is poor practice and can lead to confusion.

-def replace_all(dict, string):
-    for key in dict:
-        string = string.replace(key, dict[key])
+def replace_all(replacers, string):
+    for key in replacers:
+        string = string.replace(key, replacers[key])
     return string

2-2: Remove unused import.

The google.api_core.exceptions import is not used in this module.

-import google.api_core.exceptions

4-4: Remove unused import.

The json import is not used in this module.

-import json

68-86: Remove commented-out code block.

This large block of commented code should be removed to improve code cleanliness and maintainability.

-            #status_dict = {
-            #    'res_id': ckan_conf.get('resource_id'),
-            #    'state': 'progress',
-            #    'message': 'Data ingestion using provided schema failed, trying to autodetect schema.',
-            #    'dag_run_id': ckan_conf.get('dag_run_id')
-            #}
-            #aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
-            #job_config = bigquery.LoadJobConfig()
-            #job_config.autodetect = True
-
-            #job_config.skip_leading_rows = 1
-            #job_config.source_format = bigquery.SourceFormat.CSV
-            ## overwrite a Table
-            #job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
-            #load_job = client.load_table_from_uri(
-            #    gcs_path, table_id, job_config=job_config
-            #)
-            #load_job.result()  # Waits for table load to complete.
-            #destination_table = client.get_table(table_id)

99-99: Fix indentation issue.

The line continuation has incorrect indentation.

-            'dag_run_id': ckan_conf.get('dag_run_id')
+            'dag_run_id': ckan_conf.get('dag_run_id')

147-147: Fix character encoding issue in comment.

The comment contains a non-ASCII character that should be removed.

-        # Â TODO: support for e.g. required
+        # TODO: support for e.g. required

147-147: Consider implementing required field support.

The TODO comment indicates missing support for required fields. Currently all fields are set to NULLABLE mode.

Would you like me to implement support for required fields based on the schema definition?

aircan/dags/api_ckan_import_to_bq.py (2)

19-19: Remove unused import.

The traceback import is not used anywhere in the code.

-import traceback

78-85: Fix indentation issues in status dictionary.

The status dictionary has inconsistent indentation that should be corrected for better readability.

-    status_dict = {
-    'dag_run_id': dag_run_id,
-    'resource_id': res_id,
-    'state': 'complete',
-    'message': 'Data ingestion completed successfully for "{res_id}".'.format(
-                res_id=res_id),
-    'clear_logs': True
-    }
+    status_dict = {
+        'dag_run_id': dag_run_id,
+        'resource_id': res_id,
+        'state': 'complete',
+        'message': 'Data ingestion completed successfully for "{res_id}".'.format(
+            res_id=res_id),
+        'clear_logs': True
+    }
aircan/dags/api_ckan_import_to_bq.bkp.py (3)

3-3: Remove unused imports.

Several imports are not used in this backup file: time, datetime.datetime, AirflowException, and Variable.

-import time

6-6: Remove unused datetime import.

The datetime import from datetime module is not used.

-from datetime import date, datetime
+from datetime import date

13-15: Remove unused Airflow imports.

Both AirflowException and Variable imports are unused in this file.

-from airflow.exceptions import AirflowException
-
-from airflow.models import Variable
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3a2c848 and e9f14b7.

📒 Files selected for processing (3)
  • aircan/dags/api_ckan_import_to_bq.bkp.py (1 hunks)
  • aircan/dags/api_ckan_import_to_bq.py (5 hunks)
  • aircan/dependencies/google_cloud/bigquery_handler_v2.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
aircan/dags/api_ckan_import_to_bq.bkp.py (2)
aircan/dependencies/google_cloud/bigquery_handler_v2.py (1)
  • bq_import_csv (12-124)
aircan/dags/api_ckan_import_to_bq.py (1)
  • task_import_resource_to_bq (50-86)
🪛 Ruff (0.12.2)
aircan/dags/api_ckan_import_to_bq.py

19-19: traceback imported but unused

Remove unused import: traceback

(F401)

aircan/dags/api_ckan_import_to_bq.bkp.py

3-3: time imported but unused

Remove unused import: time

(F401)


6-6: datetime.datetime imported but unused

Remove unused import: datetime.datetime

(F401)


13-13: airflow.exceptions.AirflowException imported but unused

Remove unused import: airflow.exceptions.AirflowException

(F401)


15-15: airflow.models.Variable imported but unused

Remove unused import: airflow.models.Variable

(F401)

aircan/dependencies/google_cloud/bigquery_handler_v2.py

2-2: google.api_core.exceptions imported but unused

Remove unused import: google.api_core.exceptions

(F401)


4-4: json imported but unused

Remove unused import: json

(F401)


67-67: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🪛 Flake8 (7.2.0)
aircan/dags/api_ckan_import_to_bq.py

[error] 19-19: 'traceback' imported but unused

(F401)


[error] 79-79: continuation line missing indentation or outdented

(E122)


[error] 80-80: continuation line missing indentation or outdented

(E122)


[error] 81-81: continuation line missing indentation or outdented

(E122)


[error] 82-82: continuation line missing indentation or outdented

(E122)


[error] 84-84: continuation line missing indentation or outdented

(E122)

aircan/dags/api_ckan_import_to_bq.bkp.py

[error] 3-3: 'time' imported but unused

(F401)


[error] 6-6: 'datetime.datetime' imported but unused

(F401)


[error] 13-13: 'airflow.exceptions.AirflowException' imported but unused

(F401)


[error] 15-15: 'airflow.models.Variable' imported but unused

(F401)

aircan/dependencies/google_cloud/bigquery_handler_v2.py

[error] 2-2: 'google.api_core.exceptions' imported but unused

(F401)


[error] 4-4: 'json' imported but unused

(F401)


[error] 68-68: block comment should start with '# '

(E265)


[error] 73-73: block comment should start with '# '

(E265)


[error] 74-74: block comment should start with '# '

(E265)


[error] 75-75: block comment should start with '# '

(E265)


[error] 76-76: block comment should start with '# '

(E265)


[error] 78-78: block comment should start with '# '

(E265)


[error] 79-79: block comment should start with '# '

(E265)


[error] 81-81: block comment should start with '# '

(E265)


[error] 82-82: block comment should start with '# '

(E265)


[error] 84-84: block comment should start with '# '

(E265)


[error] 85-85: block comment should start with '# '

(E265)


[error] 86-86: block comment should start with '# '

(E265)


[error] 99-99: continuation line missing indentation or outdented

(E122)

🔇 Additional comments (3)
aircan/dags/api_ckan_import_to_bq.py (2)

64-65: Good defensive programming with conditional schema parsing.

The addition of the isinstance check before calling ast.literal_eval prevents errors when the schema is already parsed as a dict rather than a string.


74-76: Well-structured DAG run ID handling.

The extraction and inclusion of dag_run_id in the CKAN configuration enables better tracking and status reporting.

aircan/dags/api_ckan_import_to_bq.bkp.py (1)

9-9: Consider using the updated BigQuery handler.

This backup file imports from bigquery_handler while the main version uses bigquery_handler_v2, which includes enhanced error handling and status updates.

Verify if this backup file is still needed or if it should be updated to use the v2 handler for consistency.


raw_schema = context['params'].get('resource', {}).get('schema')
eval_schema = json.loads(raw_schema)
eval_schema = ast.literal_eval(eval_schema)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential schema parsing vulnerability.

The code directly calls ast.literal_eval() without checking if eval_schema is already a dict, which could cause errors if the schema is already parsed. The main version in api_ckan_import_to_bq.py handles this more robustly with an isinstance check.

Consider using the enhanced logic from the main DAG file:

 eval_schema = json.loads(raw_schema)
-eval_schema = ast.literal_eval(eval_schema)
+if isinstance(eval_schema, str):
+    eval_schema = ast.literal_eval(eval_schema)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
eval_schema = ast.literal_eval(eval_schema)
eval_schema = json.loads(raw_schema)
if isinstance(eval_schema, str):
eval_schema = ast.literal_eval(eval_schema)
🤖 Prompt for AI Agents
In aircan/dags/api_ckan_import_to_bq.bkp.py at line 61, the code calls
ast.literal_eval on eval_schema without checking its type, which can cause
errors if eval_schema is already a dictionary. To fix this, add an isinstance
check before calling ast.literal_eval: only parse eval_schema if it is a string,
otherwise leave it as is. This will make the schema parsing more robust and
prevent potential errors.

'dag_run_id': ckan_conf.get('dag_run_id')
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-            raise AirflowCKANException('Data ingestion has failed.', str(e))
+            raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
- raise AirflowCKANException('Data ingestion has failed.', str(e))
+ raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

67-67: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 67, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context.

'message': str(e)
}
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
raise AirflowCKANException('Data ingestion has failed.', str(e))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use proper exception chaining.

When re-raising exceptions within an except block, use raise ... from err to preserve the original exception context.

-        raise AirflowCKANException('Data ingestion has failed.', str(e))
+        raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise AirflowCKANException('Data ingestion has failed.', str(e))
raise AirflowCKANException('Data ingestion has failed.', str(e)) from e
🧰 Tools
🪛 Ruff (0.12.2)

124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In aircan/dependencies/google_cloud/bigquery_handler_v2.py at line 124, the
exception is re-raised without proper chaining, losing the original error
context. Modify the raise statement to use "raise AirflowCKANException('Data
ingestion has failed.', str(e)) from e" to preserve the original exception
context for better debugging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants