From 3c76ef385e1920989ba193fbd107b8937be4e22f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:03:21 +0100 Subject: [PATCH] feat: add `CustomSchemaNormalization` (#194) Signed-off-by: Artem Inzhyyants --- .../declarative_component_schema.yaml | 28 +++++++++- .../declarative/extractors/__init__.py | 2 + .../declarative/extractors/record_selector.py | 12 ++-- .../extractors/type_transformer.py | 55 +++++++++++++++++++ .../models/declarative_component_schema.py | 22 +++++++- .../parsers/model_to_component_factory.py | 26 ++++++--- 6 files changed, 129 insertions(+), 16 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/extractors/type_transformer.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a9d4f255..144ba5e3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -667,6 +667,28 @@ definitions: $parameters: type: object additionalProperties: true + CustomSchemaNormalization: + title: Custom Schema Normalization + description: Schema normalization component whose behavior is derived from a custom code implementation of the connector. + type: object + additionalProperties: true + required: + - type + - class_name + properties: + type: + type: string + enum: [ CustomSchemaNormalization ] + class_name: + title: Class Name + description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. + type: string + additionalProperties: true + examples: + - "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" + $parameters: + type: object + additionalProperties: true CustomStateMigration: title: Custom State Migration description: Apply a custom transformation on the input state. @@ -2600,7 +2622,11 @@ definitions: - "$ref": "#/definitions/CustomRecordFilter" - "$ref": "#/definitions/RecordFilter" schema_normalization: - "$ref": "#/definitions/SchemaNormalization" + title: Schema Normalization + description: Responsible for normalization according to the schema. + anyOf: + - "$ref": "#/definitions/SchemaNormalization" + - "$ref": "#/definitions/CustomSchemaNormalization" default: None $parameters: type: object diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index aacac665..8f1d18d1 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -9,8 +9,10 @@ from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( ResponseToFileExtractor, ) +from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer __all__ = [ + "TypeTransformer", "HttpSelector", "DpathExtractor", "RecordFilter", diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index b2eed93b..f29b8a75 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -10,16 +10,14 @@ from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter +from airbyte_cdk.sources.declarative.extractors.type_transformer import ( + TypeTransformer as DeclarativeTypeTransformer, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer - -SCHEMA_TRANSFORMER_TYPE_MAPPING = { - SchemaNormalization.None_: TransformConfig.NoTransform, - SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization, -} +from airbyte_cdk.sources.utils.transform import TypeTransformer @dataclass @@ -38,7 +36,7 @@ class RecordSelector(HttpSelector): extractor: RecordExtractor config: Config parameters: InitVar[Mapping[str, Any]] - schema_normalization: TypeTransformer + schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer] name: str _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") record_filter: Optional[RecordFilter] = None diff --git a/airbyte_cdk/sources/declarative/extractors/type_transformer.py b/airbyte_cdk/sources/declarative/extractors/type_transformer.py new file mode 100644 index 00000000..fe307684 --- /dev/null +++ b/airbyte_cdk/sources/declarative/extractors/type_transformer.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, Mapping + + +@dataclass +class TypeTransformer(ABC): + """ + Abstract base class for implementing type transformation logic. + + This class provides a blueprint for defining custom transformations + on data records based on a provided schema. Implementing classes + must override the `transform` method to specify the transformation + logic. + + Attributes: + None explicitly defined, as this is a dataclass intended to be + subclassed. + + Methods: + transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: + Abstract method that must be implemented by subclasses. + It performs a transformation on a given data record based + on the provided schema. + + Usage: + To use this class, create a subclass that implements the + `transform` method with the desired transformation logic. + """ + + @abstractmethod + def transform( + self, + record: Dict[str, Any], + schema: Mapping[str, Any], + ) -> None: + """ + Perform a transformation on a data record based on a given schema. + + Args: + record (Dict[str, Any]): The data record to be transformed. + schema (Mapping[str, Any]): The schema that dictates how + the record should be transformed. + + Returns: + None + + Raises: + NotImplementedError: If the method is not implemented + by a subclass. + """ diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6b70b3fd..66d4c0ad 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -268,6 +268,22 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class CustomSchemaNormalization(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["CustomSchemaNormalization"] + class_name: str = Field( + ..., + description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`.", + examples=[ + "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer" + ], + title="Class Name", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class CustomStateMigration(BaseModel): class Config: extra = Extra.allow @@ -1530,7 +1546,11 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_ + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + SchemaNormalization.None_, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index bd164abc..39c07b04 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -82,9 +82,6 @@ from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) -from airbyte_cdk.sources.declarative.extractors.record_selector import ( - SCHEMA_TRANSFORMER_TYPE_MAPPING, -) from airbyte_cdk.sources.declarative.incremental import ( ChildPartitionResumableFullRefreshCursor, CursorFactory, @@ -100,7 +97,9 @@ from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import ( LegacyToPerPartitionStateMigration, ) -from airbyte_cdk.sources.declarative.models import CustomStateMigration +from airbyte_cdk.sources.declarative.models import ( + CustomStateMigration, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( AddedFieldDefinition as AddedFieldDefinitionModel, ) @@ -185,6 +184,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomSchemaLoader as CustomSchemaLoader, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CustomSchemaNormalization as CustomSchemaNormalizationModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomTransformation as CustomTransformationModel, ) @@ -311,6 +313,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ResponseToFileExtractor as ResponseToFileExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + SchemaNormalization as SchemaNormalizationModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SchemaTypeIdentifier as SchemaTypeIdentifierModel, ) @@ -445,6 +450,11 @@ ComponentDefinition = Mapping[str, Any] +SCHEMA_TRANSFORMER_TYPE_MAPPING = { + SchemaNormalizationModel.None_: TransformConfig.NoTransform, + SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization, +} + class ModelToComponentFactory: EPOCH_DATETIME_FORMAT = "%s" @@ -493,6 +503,7 @@ def _init_mappings(self) -> None: CustomRequesterModel: self.create_custom_component, CustomRetrieverModel: self.create_custom_component, CustomSchemaLoader: self.create_custom_component, + CustomSchemaNormalizationModel: self.create_custom_component, CustomStateMigration: self.create_custom_component, CustomPaginationStrategyModel: self.create_custom_component, CustomPartitionRouterModel: self.create_custom_component, @@ -2000,7 +2011,6 @@ def create_record_selector( client_side_incremental_sync: Dict[str, Any] | None = None, **kwargs: Any, ) -> RecordSelector: - assert model.schema_normalization is not None # for mypy extractor = self._create_component_from_model( model=model.extractor, decoder=decoder, config=config ) @@ -2018,8 +2028,10 @@ def create_record_selector( else None, **client_side_incremental_sync, ) - schema_normalization = TypeTransformer( - SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization] + schema_normalization = ( + TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]) + if isinstance(model.schema_normalization, SchemaNormalizationModel) + else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here ) return RecordSelector(