diff --git a/airbyte_cdk/cli/source_declarative_manifest/_run.py b/airbyte_cdk/cli/source_declarative_manifest/_run.py index 232ac302f..5def00602 100644 --- a/airbyte_cdk/cli/source_declarative_manifest/_run.py +++ b/airbyte_cdk/cli/source_declarative_manifest/_run.py @@ -171,6 +171,12 @@ def create_declarative_source( "Invalid config: `__injected_declarative_manifest` should be provided at the root " f"of the config but config only has keys: {list(config.keys() if config else [])}" ) + if not isinstance(config["__injected_declarative_manifest"], dict): + raise ValueError( + "Invalid config: `__injected_declarative_manifest` should be a dictionary, " + f"but got type: {type(config['__injected_declarative_manifest'])}" + ) + return ConcurrentDeclarativeSource( config=config, catalog=catalog, diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index 2c241f6fb..b2a728570 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -52,6 +52,7 @@ def get_limits(config: Mapping[str, Any]) -> TestReadLimits: def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> ManifestDeclarativeSource: manifest = config["__injected_declarative_manifest"] return ManifestDeclarativeSource( + config=config, emit_connector_builder_messages=True, source_config=manifest, component_factory=ModelToComponentFactory( diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7de937782..5db0b0909 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -77,6 +77,7 @@ def __init__( super().__init__( source_config=source_config, + config=config, debug=debug, emit_connector_builder_messages=emit_connector_builder_messages, component_factory=component_factory, diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 4282f7fc7..deef5a3be 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -7,6 +7,7 @@ import pkgutil from copy import deepcopy from importlib import metadata +from types import ModuleType from typing import Any, Dict, Iterator, List, Mapping, Optional, Set import yaml @@ -32,6 +33,9 @@ DeclarativeStream as DeclarativeStreamModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + get_registered_components_module, +) from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( ManifestComponentTransformer, ) @@ -59,22 +63,29 @@ class ManifestDeclarativeSource(DeclarativeSource): def __init__( self, source_config: ConnectionDefinition, + *, + config: Mapping[str, Any] | None = None, debug: bool = False, emit_connector_builder_messages: bool = False, component_factory: Optional[ModelToComponentFactory] = None, ): """ - :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector - :param debug(bool): True if debug mode is enabled - :param component_factory(ModelToComponentFactory): optional factory if ModelToComponentFactory's default behaviour needs to be tweaked + Args: + config: The provided config dict. + source_config: The manifest of low-code components that describe the source connector. + debug: True if debug mode is enabled. + emit_connector_builder_messages: True if messages should be emitted to the connector builder. + component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked. """ self.logger = logging.getLogger(f"airbyte.{self.name}") - # For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing manifest = dict(source_config) if "type" not in manifest: manifest["type"] = "DeclarativeSource" + # If custom components are needed, locate and/or register them. + self.components_module: ModuleType | None = get_registered_components_module(config=config) + resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest) propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters( "", resolved_source_config, {} diff --git a/airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py b/airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py new file mode 100644 index 000000000..8a6638fad --- /dev/null +++ b/airbyte_cdk/sources/declarative/parsers/custom_code_compiler.py @@ -0,0 +1,143 @@ +"""Contains functions to compile custom code from text.""" + +import hashlib +import os +import sys +from collections.abc import Mapping +from types import ModuleType +from typing import Any, cast + +from typing_extensions import Literal + +ChecksumType = Literal["md5", "sha256"] +CHECKSUM_FUNCTIONS = { + "md5": hashlib.md5, + "sha256": hashlib.sha256, +} +COMPONENTS_MODULE_NAME = "components" +SDM_COMPONENTS_MODULE_NAME = "source_declarative_manifest.components" +INJECTED_MANIFEST = "__injected_declarative_manifest" +INJECTED_COMPONENTS_PY = "__injected_components_py" +INJECTED_COMPONENTS_PY_CHECKSUMS = "__injected_components_py_checksums" +ENV_VAR_ALLOW_CUSTOM_CODE = "AIRBYTE_ALLOW_CUSTOM_CODE" + + +class AirbyteCodeTamperedError(Exception): + """Raised when the connector's components module does not match its checksum. + + This is a fatal error, as it can be a sign of code tampering. + """ + + +class AirbyteCustomCodeNotPermittedError(Exception): + """Raised when custom code is attempted to be run in an environment that does not support it.""" + + def __init__(self) -> None: + super().__init__( + "Custom connector code is not permitted in this environment. " + "If you need to run custom code, please ask your administrator to set the `AIRBYTE_ALLOW_CUSTOM_CODE` " + "environment variable to 'true' in your Airbyte environment. " + "If you see this message in Airbyte Cloud, your workspace does not allow executing " + "custom connector code." + ) + + +def _hash_text(input_text: str, hash_type: str = "md5") -> str: + """Return the hash of the input text using the specified hash type.""" + if not input_text: + raise ValueError("Input text cannot be empty.") + + hash_object = CHECKSUM_FUNCTIONS[hash_type]() + hash_object.update(input_text.encode()) + return hash_object.hexdigest() + + +def custom_code_execution_permitted() -> bool: + """Return `True` if custom code execution is permitted, otherwise `False`. + + Custom code execution is permitted if the `AIRBYTE_ALLOW_CUSTOM_CODE` environment variable is set to 'true'. + """ + return os.environ.get(ENV_VAR_ALLOW_CUSTOM_CODE, "").lower() == "true" + + +def validate_python_code( + code_text: str, + checksums: dict[str, str] | None, +) -> None: + """Validate the provided Python code text against the provided checksums. + + Currently we fail if no checksums are provided, although this may change in the future. + """ + if not checksums: + raise ValueError(f"A checksum is required to validate the code. Received: {checksums}") + + for checksum_type, checksum in checksums.items(): + if checksum_type not in CHECKSUM_FUNCTIONS: + raise ValueError( + f"Unsupported checksum type: {checksum_type}. Supported checksum types are: {CHECKSUM_FUNCTIONS.keys()}" + ) + + if _hash_text(code_text, checksum_type) != checksum: + raise AirbyteCodeTamperedError(f"{checksum_type} checksum does not match.") + + +def get_registered_components_module( + config: Mapping[str, Any] | None, +) -> ModuleType | None: + """Get a components module object based on the provided config. + + If custom python components is provided, this will be loaded. Otherwise, we will + attempt to load from the `components` module already imported/registered in sys.modules. + + If custom `components.py` text is provided in config, it will be registered with sys.modules + so that it can be later imported by manifest declarations which reference the provided classes. + + Returns `None` if no components is provided and the `components` module is not found. + """ + if config and INJECTED_COMPONENTS_PY in config: + if not custom_code_execution_permitted(): + raise AirbyteCustomCodeNotPermittedError + + # Create a new module object and execute the provided Python code text within it + python_text: str = config[INJECTED_COMPONENTS_PY] + return register_components_module_from_string( + components_py_text=python_text, + checksums=config.get(INJECTED_COMPONENTS_PY_CHECKSUMS, None), + ) + + # Check for `components` or `source_declarative_manifest.components`. + if SDM_COMPONENTS_MODULE_NAME in sys.modules: + return cast(ModuleType, sys.modules.get(SDM_COMPONENTS_MODULE_NAME)) + + if COMPONENTS_MODULE_NAME in sys.modules: + return cast(ModuleType, sys.modules.get(COMPONENTS_MODULE_NAME)) + + # Could not find module 'components' in `sys.modules` + # and INJECTED_COMPONENTS_PY was not provided in config. + return None + + +def register_components_module_from_string( + components_py_text: str, + checksums: dict[str, Any] | None, +) -> ModuleType: + """Load and return the components module from a provided string containing the python code.""" + # First validate the code + validate_python_code( + code_text=components_py_text, + checksums=checksums, + ) + + # Create a new module object + components_module = ModuleType(name=COMPONENTS_MODULE_NAME) + + # Execute the module text in the module's namespace + exec(components_py_text, components_module.__dict__) + + # Register the module in `sys.modules`` so it can be imported as + # `source_declarative_manifest.components` and/or `components`. + sys.modules[SDM_COMPONENTS_MODULE_NAME] = components_module + sys.modules[COMPONENTS_MODULE_NAME] = components_module + + # Now you can import and use the module + return components_module diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 12a7ea2cf..3fcb0928e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -5,9 +5,9 @@ from __future__ import annotations import datetime -import importlib import inspect import re +import sys from functools import partial from typing import ( Any, @@ -363,6 +363,10 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ZipfileDecoder as ZipfileDecoderModel, ) +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + COMPONENTS_MODULE_NAME, + SDM_COMPONENTS_MODULE_NAME, +) from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, ListPartitionRouter, @@ -1102,7 +1106,6 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> :param config: The custom defined connector config :return: The declarative component built from the Pydantic model to be used at runtime """ - custom_component_class = self._get_class_from_fully_qualified_class_name(model.class_name) component_fields = get_type_hints(custom_component_class) model_args = model.dict() @@ -1156,14 +1159,35 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> return custom_component_class(**kwargs) @staticmethod - def _get_class_from_fully_qualified_class_name(full_qualified_class_name: str) -> Any: + def _get_class_from_fully_qualified_class_name( + full_qualified_class_name: str, + ) -> Any: + """Get a class from its fully qualified name. + + If a custom components module is needed, we assume it is already registered - probably + as `source_declarative_manifest.components` or `components`. + + Args: + full_qualified_class_name (str): The fully qualified name of the class (e.g., "module.ClassName"). + + Returns: + Any: The class object. + + Raises: + ValueError: If the class cannot be loaded. + """ split = full_qualified_class_name.split(".") - module = ".".join(split[:-1]) + module_name_full = ".".join(split[:-1]) class_name = split[-1] + + if module_name_full == COMPONENTS_MODULE_NAME: + # Assume "components" on its own means "source_declarative_manifest.components" + module_name_full = SDM_COMPONENTS_MODULE_NAME + try: - return getattr(importlib.import_module(module), class_name) - except AttributeError: - raise ValueError(f"Could not load class {full_qualified_class_name}.") + return getattr(sys.modules[module_name_full], class_name) + except (AttributeError, ModuleNotFoundError) as e: + raise ValueError(f"Could not load class {full_qualified_class_name}.") from e @staticmethod def _derive_component_type_from_type_hints(field_type: Any) -> Optional[str]: diff --git a/airbyte_cdk/test/utils/manifest_only_fixtures.py b/airbyte_cdk/test/utils/manifest_only_fixtures.py index 47620e7c1..28015d05b 100644 --- a/airbyte_cdk/test/utils/manifest_only_fixtures.py +++ b/airbyte_cdk/test/utils/manifest_only_fixtures.py @@ -4,7 +4,6 @@ import importlib.util from pathlib import Path from types import ModuleType -from typing import Optional import pytest @@ -30,7 +29,7 @@ def connector_dir(request: pytest.FixtureRequest) -> Path: @pytest.fixture(scope="session") -def components_module(connector_dir: Path) -> Optional[ModuleType]: +def components_module(connector_dir: Path) -> ModuleType | None: """Load and return the components module from the connector directory. This assumes the components module is located at /components.py. diff --git a/pyproject.toml b/pyproject.toml index 0bba75f5a..d8e3034e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,6 +125,7 @@ select = ["I"] [tool.poe.tasks] # Installation install = { shell = "poetry install --all-extras" } +lock = { shell = "poetry lock --no-update" } # Build tasks assemble = {cmd = "bin/generate-component-manifest-dagger.sh", help = "Generate component manifest files."} diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index aac00a889..c00a7e2f1 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -344,7 +344,7 @@ def test_resolve_manifest(valid_resolve_manifest_config_file): config = copy.deepcopy(RESOLVE_MANIFEST_CONFIG) command = "resolve_manifest" config["__command"] = command - source = ManifestDeclarativeSource(MANIFEST) + source = ManifestDeclarativeSource(source_config=MANIFEST) limits = TestReadLimits() resolved_manifest = handle_connector_builder_request( source, command, config, create_configured_catalog("dummy_stream"), _A_STATE, limits @@ -505,7 +505,7 @@ def resolved_manifest(self): def test_read(): config = TEST_READ_CONFIG - source = ManifestDeclarativeSource(MANIFEST) + source = ManifestDeclarativeSource(source_config=MANIFEST) real_record = AirbyteRecordMessage( data={"id": "1234", "key": "value"}, emitted_at=1, stream=_stream_name @@ -592,7 +592,7 @@ def test_config_update() -> None: "client_secret": "a client secret", "refresh_token": "a refresh token", } - source = ManifestDeclarativeSource(manifest) + source = ManifestDeclarativeSource(source_config=manifest) refresh_request_response = { "access_token": "an updated access token", diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/.gitignore b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/.gitignore new file mode 100644 index 000000000..c4ab49a30 --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/.gitignore @@ -0,0 +1 @@ +secrets* diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/README.md b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/README.md new file mode 100644 index 000000000..403a4baba --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/README.md @@ -0,0 +1,9 @@ +# The Guardian API Tests + +For these tests to work, you'll need to create a `secrets.yaml` file in this directory that looks like this: + +```yml +api_key: ****** +``` + +The `.gitignore` file in this directory should ensure your file is not committed to git, but it's a good practice to double-check. 👀 diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py new file mode 100644 index 000000000..98a9f7ad5 --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py @@ -0,0 +1,61 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union + +import requests + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.paginators import PaginationStrategy +from airbyte_cdk.sources.declarative.types import Config, Record + + +@dataclass +class CustomPageIncrement(PaginationStrategy): + """ + Starts page from 1 instead of the default value that is 0. Stops Pagination when currentPage is equal to totalPages. + """ + + config: Config + page_size: Optional[Union[str, int]] + parameters: InitVar[Mapping[str, Any]] + start_from_page: int = 0 + inject_on_first_request: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if isinstance(self.page_size, int) or (self.page_size is None): + self._page_size = self.page_size + else: + page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config) + if not isinstance(page_size, int): + raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}") + self._page_size = page_size + + @property + def initial_token(self) -> Optional[Any]: + if self.inject_on_first_request: + return self.start_from_page + return None + + def next_page_token( + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], + ) -> Optional[Any]: + res = response.json().get("response") + current_page = res.get("currentPage") + total_pages = res.get("pages") + + # The first request to the API does not include the page_token, so it comes in as None when determing whether to paginate + last_page_token_value = last_page_token_value or 0 + if current_page < total_pages: + return last_page_token_value + 1 + else: + return None + + def get_page_size(self) -> Optional[int]: + return self._page_size diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components_failing.py b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components_failing.py new file mode 100644 index 000000000..8655bdf2d --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components_failing.py @@ -0,0 +1,54 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass +from typing import Any, Mapping, Optional, Union + +import requests + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.paginators import PaginationStrategy +from airbyte_cdk.sources.declarative.types import Config, Record + + +class IntentionalException(Exception): + """This exception is raised intentionally in order to test error handling.""" + + +@dataclass +class CustomPageIncrement(PaginationStrategy): + """ + Starts page from 1 instead of the default value that is 0. Stops Pagination when currentPage is equal to totalPages. + """ + + config: Config + page_size: Optional[Union[str, int]] + parameters: InitVar[Mapping[str, Any]] + start_from_page: int = 0 + inject_on_first_request: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + if isinstance(self.page_size, int) or (self.page_size is None): + self._page_size = self.page_size + else: + page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config) + if not isinstance(page_size, int): + raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}") + self._page_size = page_size + + @property + def initial_token(self) -> Optional[Any]: + raise IntentionalException() + + def next_page_token( + self, + response: requests.Response, + last_page_size: int, + last_record: Optional[Record], + last_page_token_value: Optional[Any], + ) -> Optional[Any]: + raise IntentionalException() + + def get_page_size(self) -> Optional[int]: + return self._page_size diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/manifest.yaml b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/manifest.yaml new file mode 100644 index 000000000..a42e0ebba --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/manifest.yaml @@ -0,0 +1,376 @@ +version: "4.3.2" +definitions: + selector: + extractor: + field_path: + - response + - results + requester: + url_base: "https://content.guardianapis.com" + http_method: "GET" + request_parameters: + api-key: "{{ config['api_key'] }}" + q: "{{ config['query'] }}" + tag: "{{ config['tag'] }}" + section: "{{ config['section'] }}" + order-by: "oldest" + incremental_sync: + type: DatetimeBasedCursor + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ config['end_date'] or now_utc().strftime('%Y-%m-%d') }}" + datetime_format: "%Y-%m-%d" + step: "P7D" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + cursor_granularity: "PT1S" + cursor_field: "webPublicationDate" + start_time_option: + field_name: "from-date" + inject_into: "request_parameter" + end_time_option: + field_name: "to-date" + inject_into: "request_parameter" + retriever: + record_selector: + extractor: + field_path: + - response + - results + paginator: + type: DefaultPaginator + pagination_strategy: + type: CustomPaginationStrategy + class_name: "CustomPageIncrement" + page_size: 10 + page_token_option: + type: RequestOption + inject_into: "request_parameter" + field_name: "page" + page_size_option: + inject_into: "body_data" + field_name: "page_size" + requester: + url_base: "https://content.guardianapis.com" + http_method: "GET" + request_parameters: + api-key: "{{ config['api_key'] }}" + q: "{{ config['query'] }}" + tag: "{{ config['tag'] }}" + section: "{{ config['section'] }}" + order-by: "oldest" + base_stream: + incremental_sync: + type: DatetimeBasedCursor + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ config['end_date'] or now_utc().strftime('%Y-%m-%d') }}" + datetime_format: "%Y-%m-%d" + step: "P7D" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + cursor_granularity: "PT1S" + cursor_field: "webPublicationDate" + start_time_option: + field_name: "from-date" + inject_into: "request_parameter" + end_time_option: + field_name: "to-date" + inject_into: "request_parameter" + retriever: + record_selector: + extractor: + field_path: + - response + - results + paginator: + type: DefaultPaginator + pagination_strategy: + type: CustomPaginationStrategy + class_name: "CustomPageIncrement" + page_size: 10 + page_token_option: + type: RequestOption + inject_into: "request_parameter" + field_name: "page" + page_size_option: + inject_into: "body_data" + field_name: "page_size" + requester: + url_base: "https://content.guardianapis.com" + http_method: "GET" + request_parameters: + api-key: "{{ config['api_key'] }}" + q: "{{ config['query'] }}" + tag: "{{ config['tag'] }}" + section: "{{ config['section'] }}" + order-by: "oldest" + content_stream: + incremental_sync: + type: DatetimeBasedCursor + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + end_datetime: + datetime: "{{ config['end_date'] or now_utc().strftime('%Y-%m-%d') }}" + datetime_format: "%Y-%m-%d" + step: "P7D" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + cursor_granularity: "PT1S" + cursor_field: "webPublicationDate" + start_time_option: + field_name: "from-date" + inject_into: "request_parameter" + end_time_option: + field_name: "to-date" + inject_into: "request_parameter" + retriever: + record_selector: + extractor: + field_path: + - response + - results + paginator: + type: "DefaultPaginator" + pagination_strategy: + type: CustomPaginationStrategy + class_name: "components.CustomPageIncrement" + page_size: 10 + page_token_option: + type: RequestOption + inject_into: "request_parameter" + field_name: "page" + page_size_option: + inject_into: "body_data" + field_name: "page_size" + requester: + url_base: "https://content.guardianapis.com" + http_method: "GET" + request_parameters: + api-key: "{{ config['api_key'] }}" + q: "{{ config['query'] }}" + tag: "{{ config['tag'] }}" + section: "{{ config['section'] }}" + order-by: "oldest" + schema_loader: + type: InlineSchemaLoader + schema: + $schema: http://json-schema.org/draft-04/schema# + type: object + properties: + id: + type: string + type: + type: string + sectionId: + type: string + sectionName: + type: string + webPublicationDate: + type: string + webTitle: + type: string + webUrl: + type: string + apiUrl: + type: string + isHosted: + type: boolean + pillarId: + type: string + pillarName: + type: string + required: + - id + - type + - sectionId + - sectionName + - webPublicationDate + - webTitle + - webUrl + - apiUrl + - isHosted + - pillarId + - pillarName +streams: + - incremental_sync: + type: DatetimeBasedCursor + start_datetime: + datetime: "{{ config['start_date'] }}" + datetime_format: "%Y-%m-%d" + type: MinMaxDatetime + end_datetime: + datetime: "{{ config['end_date'] or now_utc().strftime('%Y-%m-%d') }}" + datetime_format: "%Y-%m-%d" + type: MinMaxDatetime + step: "P7D" + datetime_format: "%Y-%m-%dT%H:%M:%SZ" + cursor_granularity: "PT1S" + cursor_field: "webPublicationDate" + start_time_option: + field_name: "from-date" + inject_into: "request_parameter" + type: RequestOption + end_time_option: + field_name: "to-date" + inject_into: "request_parameter" + type: RequestOption + retriever: + record_selector: + extractor: + field_path: + - response + - results + type: DpathExtractor + type: RecordSelector + paginator: + type: "DefaultPaginator" + pagination_strategy: + class_name: components.CustomPageIncrement + page_size: 10 + type: CustomPaginationStrategy + page_token_option: + type: RequestOption + inject_into: "request_parameter" + field_name: "page" + page_size_option: + inject_into: "body_data" + field_name: "page_size" + type: RequestOption + requester: + url_base: "https://content.guardianapis.com" + http_method: "GET" + request_parameters: + api-key: "{{ config['api_key'] }}" + q: "{{ config['query'] }}" + tag: "{{ config['tag'] }}" + section: "{{ config['section'] }}" + order-by: "oldest" + type: HttpRequester + path: "/search" + type: SimpleRetriever + schema_loader: + type: InlineSchemaLoader + schema: + $schema: http://json-schema.org/draft-04/schema# + type: object + properties: + id: + type: string + type: + type: string + sectionId: + type: string + sectionName: + type: string + webPublicationDate: + type: string + webTitle: + type: string + webUrl: + type: string + apiUrl: + type: string + isHosted: + type: boolean + pillarId: + type: string + pillarName: + type: string + required: + - id + - type + - sectionId + - sectionName + - webPublicationDate + - webTitle + - webUrl + - apiUrl + - isHosted + - pillarId + - pillarName + type: DeclarativeStream + name: "content" + primary_key: "id" +check: + stream_names: + - "content" + type: CheckStream +type: DeclarativeSource +spec: + type: Spec + documentation_url: https://docs.airbyte.com/integrations/sources/the-guardian-api + connection_specification: + $schema: http://json-schema.org/draft-07/schema# + title: The Guardian Api Spec + type: object + required: + - api_key + - start_date + additionalProperties: true + properties: + api_key: + title: API Key + type: string + description: + Your API Key. See here. + The key is case sensitive. + airbyte_secret: true + start_date: + title: Start Date + type: string + description: + Use this to set the minimum date (YYYY-MM-DD) of the results. + Results older than the start_date will not be shown. + pattern: ^([1-9][0-9]{3})\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])$ + examples: + - YYYY-MM-DD + query: + title: Query + type: string + description: + (Optional) The query (q) parameter filters the results to only + those that include that search term. The q parameter supports AND, OR and + NOT operators. + examples: + - environment AND NOT water + - environment AND political + - amusement park + - political + tag: + title: Tag + type: string + description: + (Optional) A tag is a piece of data that is used by The Guardian + to categorise content. Use this parameter to filter results by showing only + the ones matching the entered tag. See here + for a list of all tags, and here + for the tags endpoint documentation. + examples: + - environment/recycling + - environment/plasticbags + - environment/energyefficiency + section: + title: Section + type: string + description: + (Optional) Use this to filter the results by a particular section. + See here + for a list of all sections, and here + for the sections endpoint documentation. + examples: + - media + - technology + - housing-network + end_date: + title: End Date + type: string + description: + (Optional) Use this to set the maximum date (YYYY-MM-DD) of the + results. Results newer than the end_date will not be shown. Default is set + to the current date (today) for incremental syncs. + pattern: ^([1-9][0-9]{3})\-(0?[1-9]|1[012])\-(0?[1-9]|[12][0-9]|3[01])$ + examples: + - YYYY-MM-DD diff --git a/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/valid_config.yaml b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/valid_config.yaml new file mode 100644 index 000000000..b2f752ea1 --- /dev/null +++ b/unit_tests/source_declarative_manifest/resources/source_the_guardian_api/valid_config.yaml @@ -0,0 +1 @@ +{ "start_date": "2024-01-01" } diff --git a/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py new file mode 100644 index 000000000..d608e7620 --- /dev/null +++ b/unit_tests/source_declarative_manifest/test_source_declarative_w_custom_components.py @@ -0,0 +1,304 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import datetime +import json +import logging +import os +import sys +import types +from collections.abc import Callable, Mapping +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Any + +import pytest +import yaml +from airbyte_protocol_dataclasses.models.airbyte_protocol import AirbyteCatalog + +from airbyte_cdk.cli.source_declarative_manifest._run import ( + create_declarative_source, +) +from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import ( + ENV_VAR_ALLOW_CUSTOM_CODE, + INJECTED_COMPONENTS_PY, + INJECTED_COMPONENTS_PY_CHECKSUMS, + INJECTED_MANIFEST, + AirbyteCodeTamperedError, + AirbyteCustomCodeNotPermittedError, + _hash_text, + custom_code_execution_permitted, + register_components_module_from_string, +) + +SAMPLE_COMPONENTS_PY_TEXT = """ +def sample_function() -> str: + return "Hello, World!" + +class SimpleClass: + def sample_method(self) -> str: + return sample_function() +""" + + +def get_fixture_path(file_name) -> str: + return os.path.join(os.path.dirname(__file__), file_name) + + +def test_components_module_from_string() -> None: + # Call the function to get the module + components_module: types.ModuleType = register_components_module_from_string( + components_py_text=SAMPLE_COMPONENTS_PY_TEXT, + checksums={ + "md5": _hash_text(SAMPLE_COMPONENTS_PY_TEXT, "md5"), + }, + ) + + # Check that the module is created and is of the correct type + assert isinstance(components_module, types.ModuleType) + + # Check that the function is correctly defined in the module + assert hasattr(components_module, "sample_function") + + # Check that simple functions are callable + assert components_module.sample_function() == "Hello, World!" + + # Check class definitions work as expected + assert isinstance(components_module.SimpleClass, type) + obj = components_module.SimpleClass() + assert isinstance(obj, components_module.SimpleClass) + assert obj.sample_method() == "Hello, World!" + + # Check we can get the class definition from sys.modules + module_lookup = sys.modules[components_module.__name__] + class_lookup = getattr(sys.modules[components_module.__name__], "SimpleClass") + + assert module_lookup == components_module + assert class_lookup == components_module.SimpleClass + assert class_lookup().sample_method() == "Hello, World!" + + # Check we can import the module by name + from source_declarative_manifest.components import sample_function as imported_sample_function # type: ignore [import] # noqa: I001 + + assert imported_sample_function() == "Hello, World!" + + +def get_py_components_config_dict( + *, + failing_components: bool = False, + needs_secrets: bool = True, +) -> dict[str, Any]: + connector_dir = Path(get_fixture_path("resources/source_the_guardian_api")) + manifest_yml_path: Path = connector_dir / "manifest.yaml" + custom_py_code_path: Path = connector_dir / ( + "components.py" if not failing_components else "components_failing.py" + ) + config_yaml_path: Path = connector_dir / "valid_config.yaml" + secrets_yaml_path: Path = connector_dir / "secrets.yaml" + + manifest_dict = yaml.safe_load(manifest_yml_path.read_text()) + assert manifest_dict, "Failed to load the manifest file." + assert isinstance( + manifest_dict, Mapping + ), f"Manifest file is type {type(manifest_dict).__name__}, not a mapping: {manifest_dict}" + + custom_py_code = custom_py_code_path.read_text() + combined_config_dict = { + INJECTED_MANIFEST: manifest_dict, + INJECTED_COMPONENTS_PY: custom_py_code, + INJECTED_COMPONENTS_PY_CHECKSUMS: { + "md5": _hash_text(custom_py_code, "md5"), + "sha256": _hash_text(custom_py_code, "sha256"), + }, + } + combined_config_dict.update(yaml.safe_load(config_yaml_path.read_text())) + if needs_secrets: + combined_config_dict.update(yaml.safe_load(secrets_yaml_path.read_text())) + + return combined_config_dict + + +def test_missing_checksum_fails_to_run( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Assert that missing checksum in the config will raise an error.""" + monkeypatch.setenv(ENV_VAR_ALLOW_CUSTOM_CODE, "true") + + py_components_config_dict = get_py_components_config_dict( + needs_secrets=False, + ) + # Truncate the start_date to speed up tests + py_components_config_dict["start_date"] = ( + datetime.datetime.now() - datetime.timedelta(days=2) + ).strftime("%Y-%m-%d") + + py_components_config_dict.pop("__injected_components_py_checksums") + + with NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: + json_str = json.dumps(py_components_config_dict) + Path(temp_config_file.name).write_text(json_str) + temp_config_file.flush() + with pytest.raises(ValueError): + source = create_declarative_source( + ["check", "--config", temp_config_file.name], + ) + + +@pytest.mark.parametrize( + "hash_type", + [ + "md5", + "sha256", + ], +) +def test_invalid_checksum_fails_to_run( + hash_type: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Assert that an invalid checksum in the config will raise an error.""" + monkeypatch.setenv(ENV_VAR_ALLOW_CUSTOM_CODE, "true") + + py_components_config_dict = get_py_components_config_dict( + needs_secrets=False, + ) + # Truncate the start_date to speed up tests + py_components_config_dict["start_date"] = ( + datetime.datetime.now() - datetime.timedelta(days=2) + ).strftime("%Y-%m-%d") + + py_components_config_dict["__injected_components_py_checksums"][hash_type] = "invalid_checksum" + + with NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: + json_str = json.dumps(py_components_config_dict) + Path(temp_config_file.name).write_text(json_str) + temp_config_file.flush() + with pytest.raises(AirbyteCodeTamperedError): + source = create_declarative_source( + ["check", "--config", temp_config_file.name], + ) + + +@pytest.mark.parametrize( + "env_value, should_raise", + [ + ("true", False), + ("True", False), + ("TRUE", False), + ("1", True), # Not accepted as truthy as of now + ("false", True), + ("False", True), + ("", True), + ("0", True), + ], +) +def test_fail_unless_custom_code_enabled_explicitly( + env_value: str | None, + should_raise: bool, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Assert that we properly fail if the environment variable to allow custom code is not set. + + A missing value should fail. + Any value other than "true" (case insensitive) should fail. + """ + monkeypatch.delenv(ENV_VAR_ALLOW_CUSTOM_CODE, raising=False) + if env_value is not None: + monkeypatch.setenv(ENV_VAR_ALLOW_CUSTOM_CODE, env_value) + + assert custom_code_execution_permitted() == (not should_raise) + + py_components_config_dict = get_py_components_config_dict( + needs_secrets=False, + ) + # Truncate the start_date to speed up tests + py_components_config_dict["start_date"] = ( + datetime.datetime.now() - datetime.timedelta(days=2) + ).strftime("%Y-%m-%d") + + with NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: + json_str = json.dumps(py_components_config_dict) + Path(temp_config_file.name).write_text(json_str) + temp_config_file.flush() + fn: Callable = lambda: create_declarative_source( + ["check", "--config", temp_config_file.name], + ) + if should_raise: + with pytest.raises(AirbyteCustomCodeNotPermittedError): + fn() + + return # Success + + fn() + + +# TODO: Create a new test source that doesn't require credentials to run. +@pytest.mark.skipif( + condition=not Path(get_fixture_path("resources/source_the_guardian_api/secrets.yaml")).exists(), + reason="Skipped due to missing 'secrets.yaml'.", +) +@pytest.mark.parametrize( + "failing_components", + [ + pytest.param(False, marks=pytest.mark.slow), # Slow because we run a full sync + True, + ], +) +def test_sync_with_injected_py_components( + failing_components: bool, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv(ENV_VAR_ALLOW_CUSTOM_CODE, "true") + + py_components_config_dict = get_py_components_config_dict( + failing_components=failing_components, + ) + # Truncate the start_date to speed up tests + py_components_config_dict["start_date"] = ( + datetime.datetime.now() - datetime.timedelta(days=2) + ).strftime("%Y-%m-%d") + assert isinstance(py_components_config_dict, dict) + assert "__injected_declarative_manifest" in py_components_config_dict + assert "__injected_components_py" in py_components_config_dict + assert "__injected_components_py_checksums" in py_components_config_dict + + with NamedTemporaryFile(delete=False, suffix=".json") as temp_config_file: + json_str = json.dumps(py_components_config_dict) + Path(temp_config_file.name).write_text(json_str) + temp_config_file.flush() + source = create_declarative_source( + ["check", "--config", temp_config_file.name], + ) + assert isinstance(source, ManifestDeclarativeSource) + source.check(logger=logging.getLogger(), config=py_components_config_dict) + catalog: AirbyteCatalog = source.discover( + logger=logging.getLogger(), config=py_components_config_dict + ) + assert isinstance(catalog, AirbyteCatalog) + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=stream, + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + for stream in catalog.streams + ] + ) + + msg_iterator = source.read( + logger=logging.getLogger(), + config=py_components_config_dict, + catalog=configured_catalog, + state=None, + ) + if failing_components: + with pytest.raises(Exception): + for msg in msg_iterator: + assert msg + return + + for msg in msg_iterator: + assert msg diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index b3c9ab4bb..b20adaa93 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -16,6 +16,7 @@ import yaml from jsonschema.exceptions import ValidationError +import unit_tests.sources.declarative.external_component # Needed for dynamic imports to work from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, @@ -264,6 +265,11 @@ def test_valid_manifest(self): ], "check": {"type": "CheckStream", "stream_names": ["lists"]}, } + assert "unit_tests" in sys.modules + assert "unit_tests.sources" in sys.modules + assert "unit_tests.sources.declarative" in sys.modules + assert "unit_tests.sources.declarative.external_component" in sys.modules + source = ManifestDeclarativeSource(source_config=manifest) check_stream = source.connection_checker