Skip to content

Commit

Permalink
feat(source-declarative-manifest): add support for custom Python comp…
Browse files Browse the repository at this point in the history
…onents from dynamic text input (#174)
  • Loading branch information
aaronsteers authored Jan 22, 2025
1 parent e78b272 commit 3a9ab87
Show file tree
Hide file tree
Showing 17 changed files with 1,014 additions and 16 deletions.
6 changes: 6 additions & 0 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ def create_declarative_source(
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
f"of the config but config only has keys: {list(config.keys() if config else [])}"
)
if not isinstance(config["__injected_declarative_manifest"], dict):
raise ValueError(
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
f"but got type: {type(config['__injected_declarative_manifest'])}"
)

return ConcurrentDeclarativeSource(
config=config,
catalog=catalog,
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
config=config,
emit_connector_builder_messages=True,
source_config=manifest,
component_factory=ModelToComponentFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(

super().__init__(
source_config=source_config,
config=config,
debug=debug,
emit_connector_builder_messages=emit_connector_builder_messages,
component_factory=component_factory,
Expand Down
19 changes: 15 additions & 4 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pkgutil
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import yaml
Expand All @@ -32,6 +33,9 @@
DeclarativeStream as DeclarativeStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
get_registered_components_module,
)
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
ManifestComponentTransformer,
)
Expand Down Expand Up @@ -59,22 +63,29 @@ class ManifestDeclarativeSource(DeclarativeSource):
def __init__(
self,
source_config: ConnectionDefinition,
*,
config: Mapping[str, Any] | None = None,
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: Optional[ModelToComponentFactory] = None,
):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
:param component_factory(ModelToComponentFactory): optional factory if ModelToComponentFactory's default behaviour needs to be tweaked
Args:
config: The provided config dict.
source_config: The manifest of low-code components that describe the source connector.
debug: True if debug mode is enabled.
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")

# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = dict(source_config)
if "type" not in manifest:
manifest["type"] = "DeclarativeSource"

# If custom components are needed, locate and/or register them.
self.components_module: ModuleType | None = get_registered_components_module(config=config)

resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
"", resolved_source_config, {}
Expand Down
143 changes: 143 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Contains functions to compile custom code from text."""

import hashlib
import os
import sys
from collections.abc import Mapping
from types import ModuleType
from typing import Any, cast

from typing_extensions import Literal

ChecksumType = Literal["md5", "sha256"]
CHECKSUM_FUNCTIONS = {
"md5": hashlib.md5,
"sha256": hashlib.sha256,
}
COMPONENTS_MODULE_NAME = "components"
SDM_COMPONENTS_MODULE_NAME = "source_declarative_manifest.components"
INJECTED_MANIFEST = "__injected_declarative_manifest"
INJECTED_COMPONENTS_PY = "__injected_components_py"
INJECTED_COMPONENTS_PY_CHECKSUMS = "__injected_components_py_checksums"
ENV_VAR_ALLOW_CUSTOM_CODE = "AIRBYTE_ALLOW_CUSTOM_CODE"


class AirbyteCodeTamperedError(Exception):
"""Raised when the connector's components module does not match its checksum.
This is a fatal error, as it can be a sign of code tampering.
"""


class AirbyteCustomCodeNotPermittedError(Exception):
"""Raised when custom code is attempted to be run in an environment that does not support it."""

def __init__(self) -> None:
super().__init__(
"Custom connector code is not permitted in this environment. "
"If you need to run custom code, please ask your administrator to set the `AIRBYTE_ALLOW_CUSTOM_CODE` "
"environment variable to 'true' in your Airbyte environment. "
"If you see this message in Airbyte Cloud, your workspace does not allow executing "
"custom connector code."
)


def _hash_text(input_text: str, hash_type: str = "md5") -> str:
"""Return the hash of the input text using the specified hash type."""
if not input_text:
raise ValueError("Input text cannot be empty.")

hash_object = CHECKSUM_FUNCTIONS[hash_type]()
hash_object.update(input_text.encode())
return hash_object.hexdigest()


def custom_code_execution_permitted() -> bool:
"""Return `True` if custom code execution is permitted, otherwise `False`.
Custom code execution is permitted if the `AIRBYTE_ALLOW_CUSTOM_CODE` environment variable is set to 'true'.
"""
return os.environ.get(ENV_VAR_ALLOW_CUSTOM_CODE, "").lower() == "true"


def validate_python_code(
code_text: str,
checksums: dict[str, str] | None,
) -> None:
"""Validate the provided Python code text against the provided checksums.
Currently we fail if no checksums are provided, although this may change in the future.
"""
if not checksums:
raise ValueError(f"A checksum is required to validate the code. Received: {checksums}")

for checksum_type, checksum in checksums.items():
if checksum_type not in CHECKSUM_FUNCTIONS:
raise ValueError(
f"Unsupported checksum type: {checksum_type}. Supported checksum types are: {CHECKSUM_FUNCTIONS.keys()}"
)

if _hash_text(code_text, checksum_type) != checksum:
raise AirbyteCodeTamperedError(f"{checksum_type} checksum does not match.")


def get_registered_components_module(
config: Mapping[str, Any] | None,
) -> ModuleType | None:
"""Get a components module object based on the provided config.
If custom python components is provided, this will be loaded. Otherwise, we will
attempt to load from the `components` module already imported/registered in sys.modules.
If custom `components.py` text is provided in config, it will be registered with sys.modules
so that it can be later imported by manifest declarations which reference the provided classes.
Returns `None` if no components is provided and the `components` module is not found.
"""
if config and INJECTED_COMPONENTS_PY in config:
if not custom_code_execution_permitted():
raise AirbyteCustomCodeNotPermittedError

# Create a new module object and execute the provided Python code text within it
python_text: str = config[INJECTED_COMPONENTS_PY]
return register_components_module_from_string(
components_py_text=python_text,
checksums=config.get(INJECTED_COMPONENTS_PY_CHECKSUMS, None),
)

# Check for `components` or `source_declarative_manifest.components`.
if SDM_COMPONENTS_MODULE_NAME in sys.modules:
return cast(ModuleType, sys.modules.get(SDM_COMPONENTS_MODULE_NAME))

if COMPONENTS_MODULE_NAME in sys.modules:
return cast(ModuleType, sys.modules.get(COMPONENTS_MODULE_NAME))

# Could not find module 'components' in `sys.modules`
# and INJECTED_COMPONENTS_PY was not provided in config.
return None


def register_components_module_from_string(
components_py_text: str,
checksums: dict[str, Any] | None,
) -> ModuleType:
"""Load and return the components module from a provided string containing the python code."""
# First validate the code
validate_python_code(
code_text=components_py_text,
checksums=checksums,
)

# Create a new module object
components_module = ModuleType(name=COMPONENTS_MODULE_NAME)

# Execute the module text in the module's namespace
exec(components_py_text, components_module.__dict__)

# Register the module in `sys.modules`` so it can be imported as
# `source_declarative_manifest.components` and/or `components`.
sys.modules[SDM_COMPONENTS_MODULE_NAME] = components_module
sys.modules[COMPONENTS_MODULE_NAME] = components_module

# Now you can import and use the module
return components_module
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from __future__ import annotations

import datetime
import importlib
import inspect
import re
import sys
from functools import partial
from typing import (
Any,
Expand Down Expand Up @@ -363,6 +363,10 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ZipfileDecoder as ZipfileDecoderModel,
)
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
COMPONENTS_MODULE_NAME,
SDM_COMPONENTS_MODULE_NAME,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -1102,7 +1106,6 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
:param config: The custom defined connector config
:return: The declarative component built from the Pydantic model to be used at runtime
"""

custom_component_class = self._get_class_from_fully_qualified_class_name(model.class_name)
component_fields = get_type_hints(custom_component_class)
model_args = model.dict()
Expand Down Expand Up @@ -1156,14 +1159,35 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
return custom_component_class(**kwargs)

@staticmethod
def _get_class_from_fully_qualified_class_name(full_qualified_class_name: str) -> Any:
def _get_class_from_fully_qualified_class_name(
full_qualified_class_name: str,
) -> Any:
"""Get a class from its fully qualified name.
If a custom components module is needed, we assume it is already registered - probably
as `source_declarative_manifest.components` or `components`.
Args:
full_qualified_class_name (str): The fully qualified name of the class (e.g., "module.ClassName").
Returns:
Any: The class object.
Raises:
ValueError: If the class cannot be loaded.
"""
split = full_qualified_class_name.split(".")
module = ".".join(split[:-1])
module_name_full = ".".join(split[:-1])
class_name = split[-1]

if module_name_full == COMPONENTS_MODULE_NAME:
# Assume "components" on its own means "source_declarative_manifest.components"
module_name_full = SDM_COMPONENTS_MODULE_NAME

try:
return getattr(importlib.import_module(module), class_name)
except AttributeError:
raise ValueError(f"Could not load class {full_qualified_class_name}.")
return getattr(sys.modules[module_name_full], class_name)
except (AttributeError, ModuleNotFoundError) as e:
raise ValueError(f"Could not load class {full_qualified_class_name}.") from e

@staticmethod
def _derive_component_type_from_type_hints(field_type: Any) -> Optional[str]:
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/test/utils/manifest_only_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import importlib.util
from pathlib import Path
from types import ModuleType
from typing import Optional

import pytest

Expand All @@ -30,7 +29,7 @@ def connector_dir(request: pytest.FixtureRequest) -> Path:


@pytest.fixture(scope="session")
def components_module(connector_dir: Path) -> Optional[ModuleType]:
def components_module(connector_dir: Path) -> ModuleType | None:
"""Load and return the components module from the connector directory.
This assumes the components module is located at <connector_dir>/components.py.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ skip = ["__init__.py"] # TODO: Remove after this is fixed: https://github.com/a
[tool.poe.tasks]
# Installation
install = { shell = "poetry install --all-extras" }
lock = { shell = "poetry lock --no-update" }

# Build tasks
assemble = {cmd = "bin/generate-component-manifest-dagger.sh", help = "Generate component manifest files."}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file):
config = copy.deepcopy(RESOLVE_MANIFEST_CONFIG)
command = "resolve_manifest"
config["__command"] = command
source = ManifestDeclarativeSource(MANIFEST)
source = ManifestDeclarativeSource(source_config=MANIFEST)
limits = TestReadLimits()
resolved_manifest = handle_connector_builder_request(
source, command, config, create_configured_catalog("dummy_stream"), _A_STATE, limits
Expand Down Expand Up @@ -505,7 +505,7 @@ def resolved_manifest(self):

def test_read():
config = TEST_READ_CONFIG
source = ManifestDeclarativeSource(MANIFEST)
source = ManifestDeclarativeSource(source_config=MANIFEST)

real_record = AirbyteRecordMessage(
data={"id": "1234", "key": "value"}, emitted_at=1, stream=_stream_name
Expand Down Expand Up @@ -592,7 +592,7 @@ def test_config_update() -> None:
"client_secret": "a client secret",
"refresh_token": "a refresh token",
}
source = ManifestDeclarativeSource(manifest)
source = ManifestDeclarativeSource(source_config=manifest)

refresh_request_response = {
"access_token": "an updated access token",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
secrets*
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# The Guardian API Tests

For these tests to work, you'll need to create a `secrets.yaml` file in this directory that looks like this:

```yml
api_key: ******
```
The `.gitignore` file in this directory should ensure your file is not committed to git, but it's a good practice to double-check. 👀
Loading

0 comments on commit 3a9ab87

Please sign in to comment.