Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source-declarative-manifest): add support for custom Python components from dynamic text input #174

Merged
merged 43 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9973955
skeleton: components module from dynamic text input
aaronsteers Dec 13, 2024
8309f79
refactor / clean up
aaronsteers Dec 16, 2024
399dd7b
add test resource for py_components unit test
aaronsteers Dec 19, 2024
9115757
add fixture for custom py components scenario
aaronsteers Dec 19, 2024
5dc664c
add test
aaronsteers Dec 19, 2024
5be084f
chore: add missing guard statement
aaronsteers Jan 13, 2025
7379eea
chore: remove stale comment
aaronsteers Jan 13, 2025
51cbcbd
checkpoint: passing tests with pokeapi
aaronsteers Jan 13, 2025
aaef285
chore: add `poe lock` task definition
aaronsteers Jan 13, 2025
e7c3eae
add 'source_the_guardian_api' test resources
aaronsteers Jan 13, 2025
2300f7a
checkpoint: working `check`
aaronsteers Jan 13, 2025
4efcd40
checkpoint: working discover
aaronsteers Jan 13, 2025
cb6a4ab
checkpoint: working sync
aaronsteers Jan 13, 2025
051c57b
improve module name parsing
aaronsteers Jan 13, 2025
e511a2b
remove unused files
aaronsteers Jan 13, 2025
a19b5c1
tidy up
aaronsteers Jan 13, 2025
c837745
skip if no creds
aaronsteers Jan 13, 2025
c54a73d
cosmetic: cleaner diff
aaronsteers Jan 15, 2025
3f66c46
don't fail when custom components.py is already grafted into filesystem
aaronsteers Jan 15, 2025
75332e8
clean up import code
aaronsteers Jan 15, 2025
67b84a0
clean up imports, implement safety mechanisms and blocked-by-default …
aaronsteers Jan 15, 2025
5805649
fix mypy issues
aaronsteers Jan 15, 2025
3251e5c
Update unit_tests/source_declarative_manifest/test_source_declarative…
aaronsteers Jan 15, 2025
877d721
more clean up
aaronsteers Jan 15, 2025
7531ed0
fix ruff format issue
aaronsteers Jan 15, 2025
5e7e826
add intentionally failing use case
aaronsteers Jan 15, 2025
c654ef5
validate input text
aaronsteers Jan 15, 2025
6badf7e
clean up module name parsing
aaronsteers Jan 15, 2025
b81ca33
refactor and clean up interfaces
aaronsteers Jan 16, 2025
ceab6fd
use monkeypatch for setting env vars
aaronsteers Jan 16, 2025
714360c
full code review and cleanup
aaronsteers Jan 16, 2025
c8de81a
apply suggestion
aaronsteers Jan 16, 2025
a084e7a
apply suggestion
aaronsteers Jan 16, 2025
0491b99
apply suggestion
aaronsteers Jan 16, 2025
7134340
apply suggestion
aaronsteers Jan 16, 2025
bff4dc4
fix lint issues
aaronsteers Jan 21, 2025
15cd254
clean up tests
aaronsteers Jan 21, 2025
3921341
Merge branch 'main' into aj/feat/accept-components-text-input
aaronsteers Jan 21, 2025
6c4e01f
autofix lint issue
aaronsteers Jan 21, 2025
6c81115
fix tests
aaronsteers Jan 21, 2025
1c35577
fix another test
aaronsteers Jan 21, 2025
e6b28b6
fix failing test
aaronsteers Jan 22, 2025
f29f616
mark full sync as slow test (~60s)
aaronsteers Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ def create_declarative_source(
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
f"of the config but config only has keys: {list(config.keys() if config else [])}"
)
if not isinstance(config["__injected_declarative_manifest"], dict):
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
f"but got type: {type(config['__injected_declarative_manifest'])}"
)

return ConcurrentDeclarativeSource(
config=config,
catalog=catalog,
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource:
manifest = config["__injected_declarative_manifest"]
return ManifestDeclarativeSource(
config=config,
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
emit_connector_builder_messages=True,
source_config=manifest,
component_factory=ModelToComponentFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
)

super().__init__(
config=config,
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
source_config=source_config,
debug=debug,
emit_connector_builder_messages=emit_connector_builder_messages,
Expand Down
18 changes: 14 additions & 4 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pkgutil
from copy import deepcopy
from importlib import metadata
from types import ModuleType
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set

import yaml
Expand All @@ -31,6 +32,9 @@
DeclarativeStream as DeclarativeStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
get_registered_components_module,
)
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
ManifestComponentTransformer,
)
Expand All @@ -57,23 +61,29 @@ class ManifestDeclarativeSource(DeclarativeSource):

def __init__(
self,
config: dict[str, Any] | None,
source_config: ConnectionDefinition,
debug: bool = False,
emit_connector_builder_messages: bool = False,
component_factory: Optional[ModelToComponentFactory] = None,
):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
:param debug(bool): True if debug mode is enabled
:param component_factory(ModelToComponentFactory): optional factory if ModelToComponentFactory's default behaviour needs to be tweaked
Args:
config: The provided config dict.
source_config: The manifest of low-code components that describe the source connector.
debug: True if debug mode is enabled.
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")

# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
manifest = dict(source_config)
if "type" not in manifest:
manifest["type"] = "DeclarativeSource"

# If custom components are needed, locate and/or register them.
self.components_module: ModuleType | None = get_registered_components_module(config=config)

aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
"", resolved_source_config, {}
Expand Down
142 changes: 142 additions & 0 deletions airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Contains functions to compile custom code from text."""

import hashlib
import os
import sys
from types import ModuleType
from typing import Any, cast

from typing_extensions import Literal

ChecksumType = Literal["md5", "sha256"]
CHECKSUM_FUNCTIONS = {
"md5": hashlib.md5,
"sha256": hashlib.sha256,
}
COMPONENTS_MODULE_NAME = "components"
SDM_COMPONENTS_MODULE_NAME = "source_declarative_manifest.components"
INJECTED_MANIFEST = "__injected_declarative_manifest"
INJECTED_COMPONENTS_PY = "__injected_components_py"
INJECTED_COMPONENTS_PY_CHECKSUMS = "__injected_components_py_checksums"
ENV_VAR_ALLOW_CUSTOM_CODE = "AIRBYTE_ALLOW_CUSTOM_CODE"


class AirbyteCodeTamperedError(Exception):
"""Raised when the connector's components module does not match its checksum.

This is a fatal error, as it can be a sign of code tampering.
"""


class AirbyteCustomCodeNotPermittedError(Exception):
"""Raised when custom code is attempted to be run in an environment that does not support it."""

def __init__(self) -> None:
super().__init__(
"Custom connector code is not permitted in this environment. "
"If you need to run custom code, please ask your administrator to set the `AIRBYTE_ALLOW_CUSTOM_CODE` "
"environment variable to 'true' in your Airbyte environment. "
"If you see this message in Airbyte Cloud, your workspace does not allow executing "
"custom connector code."
)


def _hash_text(input_text: str, hash_type: Literal["md5", "sha256"] = "md5") -> str:
"""Return the hash of the input text using the specified hash type."""
if not input_text:
raise ValueError("Input text cannot be empty.")

hash_object = CHECKSUM_FUNCTIONS[hash_type]()
hash_object.update(input_text.encode())
return hash_object.hexdigest()


def custom_code_execution_permitted() -> bool:
"""Return `True` if custom code execution is permitted, otherwise `False`.

Custom code execution is permitted if the `AIRBYTE_ALLOW_CUSTOM_CODE` environment variable is set to 'true'.
"""
return os.environ.get(ENV_VAR_ALLOW_CUSTOM_CODE, "").lower() == "true"


def validate_python_code(
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
code_text: str,
checksums: dict[str, str] | None,
) -> None:
"""Validate the provided Python code text against the provided checksums.

Currently we fail if no checksums are provided, although this may change in the future.
"""
if not checksums:
raise ValueError(f"A checksum is required to validate the code. Received: {checksums}")

for checksum_type, checksum in checksums.items():
if checksum_type not in CHECKSUM_FUNCTIONS:
raise ValueError(
f"Unsupported checksum type: {checksum_type}. Supported checksum types are: {CHECKSUM_FUNCTIONS.keys()}"
)

if _hash_text(code_text, checksum_type) != checksum:
raise AirbyteCodeTamperedError(f"{checksum_type} checksum does not match.")


def get_registered_components_module(
config: dict[str, Any],
) -> ModuleType | None:
"""Get a components module object based on the provided config.

If custom python components is provided, this will be loaded. Otherwise, we will
attempt to load from the `components` module already imported/registered in sys.modules.

If custom `components.py` text is provided in config, it will be registered with sys.modules
so that it can be later imported by manifest declarations which reference the provided classes.

Returns `None` if no components is provided and the `components` module is not found.
"""
if INJECTED_COMPONENTS_PY in config:
if not custom_code_execution_permitted():
raise AirbyteCustomCodeNotPermittedError

# Create a new module object and execute the provided Python code text within it
python_text = config[INJECTED_COMPONENTS_PY]
return register_components_module_from_string(
components_py_text=python_text,
checksums=config.get(INJECTED_COMPONENTS_PY_CHECKSUMS, None),
)

# Check for `components` or `source_declarative_manifest.components`.
if SDM_COMPONENTS_MODULE_NAME in sys.modules:
return cast(ModuleType, sys.modules.get(SDM_COMPONENTS_MODULE_NAME))

if COMPONENTS_MODULE_NAME in sys.modules:
return cast(ModuleType, sys.modules.get(COMPONENTS_MODULE_NAME))

# Could not find module 'components' in `sys.modules`
# and INJECTED_COMPONENTS_PY was not provided in config.
return None


def register_components_module_from_string(
components_py_text: str,
checksums: dict[str, Any] | None,
) -> ModuleType:
"""Load and return the components module from a provided string containing the python code."""
# First validate the code
validate_python_code(
code_text=components_py_text,
checksums=checksums,
)

# Create a new module object
components_module = ModuleType(name=COMPONENTS_MODULE_NAME)

# Execute the module text in the module's namespace
exec(components_py_text, components_module.__dict__)

# Register the module in `sys.modules`` so it can be imported as
# `source_declarative_manifest.components` and/or `components`.
sys.modules[SDM_COMPONENTS_MODULE_NAME] = components_module
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
sys.modules[COMPONENTS_MODULE_NAME] = components_module

# Now you can import and use the module
return components_module
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from __future__ import annotations

import datetime
import importlib
import inspect
import re
import sys
from functools import partial
from typing import (
Any,
Expand Down Expand Up @@ -316,6 +316,10 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
XmlDecoder as XmlDecoderModel,
)
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
COMPONENTS_MODULE_NAME,
SDM_COMPONENTS_MODULE_NAME,
)
from airbyte_cdk.sources.declarative.partition_routers import (
CartesianProductStreamSlicer,
ListPartitionRouter,
Expand Down Expand Up @@ -986,7 +990,6 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
:param config: The custom defined connector config
:return: The declarative component built from the Pydantic model to be used at runtime
"""

custom_component_class = self._get_class_from_fully_qualified_class_name(model.class_name)
component_fields = get_type_hints(custom_component_class)
model_args = model.dict()
Expand Down Expand Up @@ -1040,14 +1043,35 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
return custom_component_class(**kwargs)

@staticmethod
def _get_class_from_fully_qualified_class_name(full_qualified_class_name: str) -> Any:
def _get_class_from_fully_qualified_class_name(
full_qualified_class_name: str,
) -> Any:
"""Get a class from its fully qualified name.

If a custom components module is needed, we assume it is already registered - probably
as `source_declarative_manifest.components` or `components`.

Args:
full_qualified_class_name (str): The fully qualified name of the class (e.g., "module.ClassName").

Returns:
Any: The class object.

Raises:
ValueError: If the class cannot be loaded.
"""
split = full_qualified_class_name.split(".")
module = ".".join(split[:-1])
module_name_full = ".".join(split[:-1])
class_name = split[-1]

if module_name_full == COMPONENTS_MODULE_NAME:
# Assume "components" on its own means "source_declarative_manifest.components"
module_name_full = SDM_COMPONENTS_MODULE_NAME

try:
return getattr(importlib.import_module(module), class_name)
except AttributeError:
raise ValueError(f"Could not load class {full_qualified_class_name}.")
return getattr(sys.modules[module_name_full], class_name)
except (AttributeError, ModuleNotFoundError) as e:
raise ValueError(f"Could not load class {full_qualified_class_name}.") from e

@staticmethod
def _derive_component_type_from_type_hints(field_type: Any) -> Optional[str]:
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/test/utils/manifest_only_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import importlib.util
from pathlib import Path
from types import ModuleType
from typing import Optional

import pytest

Expand All @@ -30,7 +29,7 @@ def connector_dir(request: pytest.FixtureRequest) -> Path:


@pytest.fixture(scope="session")
def components_module(connector_dir: Path) -> Optional[ModuleType]:
def components_module(connector_dir: Path) -> ModuleType | None:
"""Load and return the components module from the connector directory.
This assumes the components module is located at <connector_dir>/components.py.
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ select = ["I"]
[tool.poe.tasks]
# Installation
install = { shell = "poetry install --all-extras" }
lock = { shell = "poetry lock --no-update" }
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

# Build tasks
assemble = {cmd = "bin/generate-component-manifest-dagger.sh", help = "Generate component manifest files."}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
secrets*
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# The Guardian API Tests

For these tests to work, you'll need to create a `secrets.yaml` file in this directory that looks like this:

```yml
api_key: ******
```
The `.gitignore` file in this directory should ensure your file is not committed to git, but it's a good practice to double-check. 👀
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Mapping, Optional

import requests

from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import (
PageIncrement,
)


@dataclass
class CustomPageIncrement(PageIncrement):
"""
Starts page from 1 instead of the default value that is 0. Stops Pagination when currentPage is equal to totalPages.
"""

def next_page_token(self, response: requests.Response, *args) -> Optional[Any]:
res = response.json().get("response")
currPage = res.get("currentPage")
totalPages = res.get("pages")
if currPage < totalPages:
self._page += 1
return self._page
else:
return None
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

def __post_init__(self, parameters: Mapping[str, Any]):
super().__post_init__(parameters)
self._page = 1

def reset(self):
self._page = 1
Loading
Loading