Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
08c2c86
ADO-330 Basic Hbase provider implementation
Dimitrionian Dec 15, 2025
60cba34
ADO-330 Advanced Hbase provider implementation
Dimitrionian Dec 16, 2025
e43cd78
ADO-330 Change branch to cache dependencies
Dimitrionian Dec 16, 2025
c6da543
ADO-330 Build from the current branch
Dimitrionian Dec 16, 2025
a3164f4
ADO-330 Fix google-re2 version
Dimitrionian Dec 16, 2025
e35c1e4
ADO-330 Fix uv version
Dimitrionian Dec 16, 2025
df2a2a9
ADO-330 Add FERNET_KEY from env
Dimitrionian Dec 17, 2025
1f9822f
ADO-330 Fix params
Dimitrionian Dec 17, 2025
4df3e03
ADO-330 Create backup/restore, intermediate results
Dimitrionian Dec 17, 2025
6527d17
Merge pull request #1 from arenadata/feature/ado-330
Dimitrionian Dec 17, 2025
5fb9b15
ADO-330 Create backup/restore, different approach
Dimitrionian Dec 18, 2025
7e354e9
ADO-330 Use SSH to make backups
Dimitrionian Dec 19, 2025
8e68d67
ADO-330 Fix SSH backup logic, fix build problems
Dimitrionian Dec 19, 2025
3809919
ADO-330 Fix SSH full backup logic
Dimitrionian Dec 19, 2025
e00a236
ADO-330 Resore SSH full backup logic
Dimitrionian Dec 19, 2025
5a82441
ADO-330 Fix tests
Dimitrionian Dec 19, 2025
44f26a0
ADO-368 Update documentation
Dimitrionian Dec 19, 2025
269d5b4
Merge pull request #3 from arenadata/feature/ado-368
Dimitrionian Dec 19, 2025
a7c244b
ADO-336 Basic Kerberos implementation
Dimitrionian Dec 22, 2025
8f2572e
ADO-336 Remove redundant library use
Dimitrionian Dec 22, 2025
8cb57d9
ADO-336 Add example dag
Dimitrionian Dec 22, 2025
f9796c3
ADO-336 Add basic operations with Kerberos support
Dimitrionian Dec 23, 2025
136ac50
ADO-336 Refactor to use strategy
Dimitrionian Dec 23, 2025
58285db
ADO-336 Finish and test strategy
Dimitrionian Dec 24, 2025
b3e8a0d
ADO-336 Fix all tests
Dimitrionian Dec 24, 2025
a2c8378
ADO-336 Make example_hbase dag idempotent
Dimitrionian Dec 24, 2025
1d0233a
ADO-336 Make example_hbase_advanced dag idempotent
Dimitrionian Dec 24, 2025
a237472
ADO-336 Backup fix
Dimitrionian Dec 24, 2025
eb1e1d4
ADO-336 Add the restore backup functionality dag
Dimitrionian Dec 24, 2025
031dfa2
ADO-336 Fix dags examples
Dimitrionian Dec 25, 2025
d2fb979
ADO-336 Fix tests
Dimitrionian Dec 25, 2025
a7055e0
ADO-336 Mask sensitive data in logs
Dimitrionian Dec 25, 2025
30e53d0
ADO-336 Test the sensitive data masking in logs
Dimitrionian Dec 25, 2025
8333c67
ADO-336 Attempt to establish SSL over Thrift, naive approach
Dimitrionian Dec 25, 2025
dbee6d2
ADO-336 Expand happybase to handle SSL via Thrift
Dimitrionian Dec 25, 2025
7e5f9da
ADO-336 Provide retry logic
Dimitrionian Dec 25, 2025
fd1b76d
ADO-336 Test ssl dag with proxy tunnel
Dimitrionian Dec 26, 2025
3233a99
ADO-336 Add ssl proxy example
Dimitrionian Dec 26, 2025
7703da0
ADO-336 Add the connections documentation
Dimitrionian Dec 26, 2025
9386bf1
ADO-336 Fulfill the changelog
Dimitrionian Dec 26, 2025
60287d9
ADO-336 Update the docs index
Dimitrionian Dec 26, 2025
02b3013
ADO-336 Update the security docs
Dimitrionian Dec 26, 2025
3df6ac1
Merge pull request #5 from arenadata/feature/ado-336
Dimitrionian Dec 26, 2025
dab6266
ADO-334 Use connection pools for hbase thrift operations
Dimitrionian Dec 29, 2025
237e7fb
ADO-334 Optimize bulk inserts
Dimitrionian Dec 29, 2025
591392b
ADO-334 Make bulk inserts connection pooled
Dimitrionian Dec 29, 2025
97d3bf8
ADO-334 Update documentation
Dimitrionian Dec 30, 2025
a25d09a
Merge pull request #6 from arenadata/feature/ado-334
Dimitrionian Dec 30, 2025
1d975c6
Fix sphinxcontrib-serializinghtml version
Dimitrionian Jan 12, 2026
da36222
Upgrade pip
Dimitrionian Jan 12, 2026
9b6a47f
Merge pull request #7 from arenadata/feature/build-3.10
Dimitrionian Jan 13, 2026
daa5207
Include hbase into prod list
Dimitrionian Jan 16, 2026
373d333
Remove Exception handling to forward an exception higher to allow Air…
Dimitrionian Jan 16, 2026
88f8541
Remove redundant dags
Dimitrionian Jan 16, 2026
69202f3
Use enums instead of strings
Dimitrionian Jan 16, 2026
aba2d1f
Add HBaseRowCountSensor warning
Dimitrionian Jan 16, 2026
c1a4a44
Compare bytes directly to avoid UnicodeDecodeError on HBase binary data
Dimitrionian Jan 16, 2026
075d0b2
Merge pull request #9 from arenadata/feature/hbase-provider-review
Dimitrionian Jan 16, 2026
a31c495
Add current date for Hbase provider
Dimitrionian Jan 16, 2026
931de7f
Merge pull request #10 from arenadata/feature/hbase-provider-review
Dimitrionian Jan 16, 2026
4f79af7
Generalize a default connection name
Dimitrionian Jan 19, 2026
02f3ccf
Remove JAVA_HOME hardcode
Dimitrionian Jan 19, 2026
899cc34
Remove hardcoded connection parameters
Dimitrionian Jan 19, 2026
8323b43
Remove hasattr and delattr
Dimitrionian Jan 19, 2026
ba1ce2d
Remove redundant sss_conn_id parameter
Dimitrionian Jan 19, 2026
ad9c415
Add if_exists parameter for better table handling
Dimitrionian Jan 19, 2026
e57e610
Add if_not_exists parameter for better table handling
Dimitrionian Jan 19, 2026
512f113
Fix docstrings
Dimitrionian Jan 19, 2026
c2098e2
Make encoding arbitrary in HBaseScanOperator and HBaseBatchGetOperator
Dimitrionian Jan 19, 2026
1275e9d
Fix SSH tests as it failed in python package mode. Now it's universal
Dimitrionian Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ airflow/www/static/docs
**/.DS_Store
**/Thumbs.db

# Exclude non-existent standard provider to prevent entry point issues
airflow/providers/standard

# Exclude docs generated files
docs/_build/
docs/_api/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
# much smaller.
#
# Use the same builder frontend version for everyone
ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv,virtualenv"
ARG AIRFLOW_EXTRAS="aiobotocore,amazon,async,celery,cncf-kubernetes,common-io,docker,elasticsearch,fab,ftp,google,google-auth,graphviz,grpc,hashicorp,hbase,http,ldap,microsoft-azure,mysql,odbc,openlineage,pandas,postgres,redis,sendgrid,sftp,slack,snowflake,ssh,statsd,uv,virtualenv"
ARG ADDITIONAL_AIRFLOW_EXTRAS=""
ARG ADDITIONAL_PYTHON_DEPS=""

Expand Down
8 changes: 4 additions & 4 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1297,8 +1297,8 @@ ARG DEFAULT_CONSTRAINTS_BRANCH="constraints-main"
# It can also be overwritten manually by setting the AIRFLOW_CI_BUILD_EPOCH environment variable.
ARG AIRFLOW_CI_BUILD_EPOCH="10"
ARG AIRFLOW_PRE_CACHED_PIP_PACKAGES="true"
ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.1
ARG AIRFLOW_PIP_VERSION=25.3
ARG AIRFLOW_UV_VERSION=0.5.24
ARG AIRFLOW_USE_UV="true"
# Setup PIP
# By default PIP install run without cache to make image smaller
Expand All @@ -1321,8 +1321,8 @@ ARG AIRFLOW_VERSION=""
# Additional PIP flags passed to all pip install commands except reinstalling pip itself
ARG ADDITIONAL_PIP_INSTALL_FLAGS=""

ARG AIRFLOW_PIP_VERSION=24.2
ARG AIRFLOW_UV_VERSION=0.4.1
ARG AIRFLOW_PIP_VERSION=25.3
ARG AIRFLOW_UV_VERSION=0.5.24
ARG AIRFLOW_USE_UV="true"

ENV AIRFLOW_REPO=${AIRFLOW_REPO}\
Expand Down
31 changes: 31 additions & 0 deletions airflow/providers/hbase/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

``apache-airflow-providers-hbase``

Changelog
---------

1.0.0
.....

Initial version of the provider.

Features
~~~~~~~~

* ``Add HBase provider with basic functionality``
18 changes: 18 additions & 0 deletions airflow/providers/hbase/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase provider package."""
23 changes: 23 additions & 0 deletions airflow/providers/hbase/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase authentication module."""

from airflow.providers.hbase.auth.authenticators import AuthenticatorFactory
from airflow.providers.hbase.auth.base import HBaseAuthenticator, KerberosAuthenticator, SimpleAuthenticator

__all__ = ["AuthenticatorFactory", "HBaseAuthenticator", "SimpleAuthenticator", "KerberosAuthenticator"]
62 changes: 62 additions & 0 deletions airflow/providers/hbase/auth/authenticators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase authenticator factory."""

from __future__ import annotations

from typing import Type

from airflow.providers.hbase.auth.base import (
HBaseAuthenticator,
KerberosAuthenticator,
SimpleAuthenticator,
)


class AuthenticatorFactory:
"""Factory for creating HBase authenticators."""

_authenticators: dict[str, Type[HBaseAuthenticator]] = {
"simple": SimpleAuthenticator,
"kerberos": KerberosAuthenticator,
}

@classmethod
def create(cls, auth_method: str) -> HBaseAuthenticator:
"""
Create authenticator instance.

:param auth_method: Authentication method name
:return: Authenticator instance
"""
if auth_method not in cls._authenticators:
raise ValueError(
f"Unknown authentication method: {auth_method}. "
f"Supported methods: {', '.join(cls._authenticators.keys())}"
)
return cls._authenticators[auth_method]()

@classmethod
def register(cls, name: str, authenticator_class: Type[HBaseAuthenticator]) -> None:
"""
Register custom authenticator.

:param name: Authentication method name
:param authenticator_class: Authenticator class
"""
cls._authenticators[name] = authenticator_class
103 changes: 103 additions & 0 deletions airflow/providers/hbase/auth/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase authentication base classes."""

from __future__ import annotations

import base64
import os
import subprocess
import tempfile
from abc import ABC, abstractmethod
from typing import Any


class HBaseAuthenticator(ABC):
"""Base class for HBase authentication methods."""

@abstractmethod
def authenticate(self, config: dict[str, Any]) -> dict[str, Any]:
"""
Perform authentication and return connection kwargs.

:param config: Connection configuration from extras
:return: Additional connection kwargs
"""
pass


class SimpleAuthenticator(HBaseAuthenticator):
"""Simple authentication (no authentication)."""

def authenticate(self, config: dict[str, Any]) -> dict[str, Any]:
"""No authentication needed."""
return {}


class KerberosAuthenticator(HBaseAuthenticator):
"""Kerberos authentication using kinit."""

def authenticate(self, config: dict[str, Any]) -> dict[str, Any]:
"""Perform Kerberos authentication via kinit."""
principal = config.get("principal")
if not principal:
raise ValueError("Kerberos principal is required when auth_method=kerberos")

# Get keytab from secrets backend or file
keytab_secret_key = config.get("keytab_secret_key")
keytab_path = config.get("keytab_path")

if keytab_secret_key:
# Get keytab from Airflow secrets backend
keytab_content = self._get_secret(keytab_secret_key)
if not keytab_content:
raise ValueError(f"Keytab not found in secrets backend: {keytab_secret_key}")

# Create temporary keytab file
with tempfile.NamedTemporaryFile(delete=False, suffix='.keytab') as f:
if isinstance(keytab_content, str):
# Assume base64 encoded
keytab_content = base64.b64decode(keytab_content)
f.write(keytab_content)
keytab_path = f.name

if not keytab_path or not os.path.exists(keytab_path):
raise ValueError(f"Keytab file not found: {keytab_path}")

# Perform kinit
try:
cmd = ["kinit", "-kt", keytab_path, principal]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
# Log success but don't expose sensitive info
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Kerberos authentication failed: {e.stderr}")
finally:
# Clean up temporary keytab file if created
if keytab_secret_key and keytab_path and os.path.exists(keytab_path):
os.unlink(keytab_path)

return {} # kinit handles authentication, use default transport

def _get_secret(self, secret_key: str) -> str | None:
"""Get secret from Airflow secrets backend."""
try:
from airflow.models import Variable
return Variable.get(secret_key, default_var=None)
except Exception:
# Fallback to environment variable
return os.environ.get(secret_key)
59 changes: 59 additions & 0 deletions airflow/providers/hbase/connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase connection pool utilities."""

from __future__ import annotations

import threading
from typing import Dict, Any

import happybase

# Global pool storage
_pools: Dict[str, happybase.ConnectionPool] = {}
_pool_lock = threading.Lock()


def get_or_create_pool(conn_id: str, pool_size: int, **connection_args) -> happybase.ConnectionPool:
"""Get existing pool or create new one for connection ID.

Args:
conn_id: Connection ID
pool_size: Pool size
**connection_args: Arguments for happybase.Connection

Returns:
happybase.ConnectionPool instance
"""
with _pool_lock:
if conn_id not in _pools:
_pools[conn_id] = happybase.ConnectionPool(pool_size, **connection_args)
return _pools[conn_id]


def create_connection_pool(size: int, **connection_args) -> happybase.ConnectionPool:
"""Create HBase connection pool using happybase built-in pool.

Args:
size: Pool size
**connection_args: Arguments for happybase.Connection

Returns:
happybase.ConnectionPool instance
"""
return happybase.ConnectionPool(size, **connection_args)
18 changes: 18 additions & 0 deletions airflow/providers/hbase/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HBase datasets."""
Loading