Skip to content

Couchbase-Ecosystem/airflow-providers-couchbase

Repository files navigation

airflow-providers-couchbase

A custom Apache Airflow provider for Couchbase

ci pypi version airflow version


Table of Contents

Overview

The airflow-providers-couchbase enables interactions with Couchbase clusters within Apache Airflow workflows. It provides custom Couchbase Hook that allow users to seamlessly interact with Couchbase databases, execute queries, manage documents, and more.

For those new to Apache Airflow, a DAG (Directed Acyclic Graph) represents a workflow as a collection of tasks with directional dependencies. Each task in our example DAGs performs specific operations with Couchbase, such as querying data or processing documents.

Installation

pip install airflow-providers-couchbase

Running a DAG Example Inside an Airflow Docker Container

Prerequisites

  • Docker and Docker Compose installed on your machine
  • A running Couchbase cluster accessible from your Docker container
  • The travel-sample bucket available in your Couchbase cluster

Connecting to a Local Couchbase Server

If you're running Couchbase Server locally (either installed natively or in Docker), you'll need to make some adjustments to connect from the Airflow containers:

  1. For locally installed Couchbase Server:

    • Update the Couchbase connection host to use your machine's IP address instead of localhost
    • Example: couchbase://192.168.1.100 (replace with your actual IP)
  2. For Couchbase Server running in Docker:

    • Add Couchbase service to your docker-compose.yaml:
    services:
      couchbase:
        image: couchbase/server:latest
        ports:
          - "8091-8096:8091-8096"
          - "11210-11211:11210-11211"
        networks:
          - airflow-couchbase
    
      airflow-common:
        networks:
          - airflow-couchbase
          
    networks:
      airflow-couchbase:
        driver: bridge
    • Update the Couchbase connection host to use the service name: couchbase://couchbase

Remember to initialize your Couchbase Server with:

  • Create a bucket named "travel-sample"
  • Import the travel-sample dataset
  • Create a user with appropriate permissions

Steps to Run the Example DAG

  1. Navigate to the Docker Directory:

    cd docker
  2. Build and Run the Docker Containers:

    # Initialize the Airflow database and create the first user account
    docker compose up airflow-init
    
    # Start all services defined in docker-compose.yml
    docker compose up --build
  3. Access the Airflow Web UI:

  4. Configure a Couchbase Connection:

    • Go to Admin -> Connections
    • Click "+" to add a new connection
    • Fill in the details:
      • Connection Id: couchbase_conn_id
      • Connection Type: Couchbase
      • Host: your_couchbase_host
      • Login: Your Couchbase username
      • Password: Your Couchbase password
      • Extra: Additional configuration parameters (JSON format)
  5. Trigger the DAG:

    • Go to DAGs view
    • Find "airflow_test_couchbase_cluster" DAG
    • Click "Play" to trigger a manual run
  6. Monitor the Execution:

    • Click on the DAG run to view progress
    • View logs, duration, and status for each task
    • For a visual guide of this process, check our step-by-step tutorial on docs/videos/airflow

Example DAGs Explanation

1. airflow_test_couchbase_cluster.py

  • Basic Couchbase interaction example
  • Connects using CouchbaseHook
  • Queries "airline" collection
  • Extracts document IDs
  • Converts data to CSV

2. airflow_test_couchbase_scope.py

  • Advanced example with direct scope/collection access
  • Retrieves document IDs and full documents
  • Includes error handling
  • Processes complete documents

Both DAGs follow ETL pattern:

  • Extract:

    • Connect to Couchbase using CouchbaseHook
    • Query the "airline" collection in "travel-sample" bucket
    • Retrieve document IDs and data (airflow_test_couchbase_scope.py retrieves full documents, airflow_test_couchbase_cluster.py retrieves IDs only)
  • Transform:

    • Convert Couchbase query results to pandas DataFrame
    • Save DataFrame to a temporary CSV file using tempfile.NamedTemporaryFile
  • Load:

    • Read the temporary CSV file to verify data integrity
    • Clean up by removing the temporary file
    • (Placeholder: In production, you would upload to S3 or similar storage)

Clean Up

docker-compose down -v

Additional Resources

License

airflow-providers-couchbase is distributed under the terms of the MIT license.

About

Apache airflow provider for couchbase

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages