Skip to content

Commit

Permalink
feat: add CustomSchemaNormalization (#194)
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 authored Jan 9, 2025
1 parent e78eaff commit 3c76ef3
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,28 @@ definitions:
$parameters:
type: object
additionalProperties: true
CustomSchemaNormalization:
title: Custom Schema Normalization
description: Schema normalization component whose behavior is derived from a custom code implementation of the connector.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
$parameters:
type: object
additionalProperties: true
CustomStateMigration:
title: Custom State Migration
description: Apply a custom transformation on the input state.
Expand Down Expand Up @@ -2600,7 +2622,11 @@ definitions:
- "$ref": "#/definitions/CustomRecordFilter"
- "$ref": "#/definitions/RecordFilter"
schema_normalization:
"$ref": "#/definitions/SchemaNormalization"
title: Schema Normalization
description: Responsible for normalization according to the schema.
anyOf:
- "$ref": "#/definitions/SchemaNormalization"
- "$ref": "#/definitions/CustomSchemaNormalization"
default: None
$parameters:
type: object
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
ResponseToFileExtractor,
)
from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer

__all__ = [
"TypeTransformer",
"HttpSelector",
"DpathExtractor",
"RecordFilter",
Expand Down
12 changes: 5 additions & 7 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.type_transformer import (
TypeTransformer as DeclarativeTypeTransformer,
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalization.None_: TransformConfig.NoTransform,
SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization,
}
from airbyte_cdk.sources.utils.transform import TypeTransformer


@dataclass
Expand All @@ -38,7 +36,7 @@ class RecordSelector(HttpSelector):
extractor: RecordExtractor
config: Config
parameters: InitVar[Mapping[str, Any]]
schema_normalization: TypeTransformer
schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer]
name: str
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
record_filter: Optional[RecordFilter] = None
Expand Down
55 changes: 55 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/type_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Mapping


@dataclass
class TypeTransformer(ABC):
"""
Abstract base class for implementing type transformation logic.
This class provides a blueprint for defining custom transformations
on data records based on a provided schema. Implementing classes
must override the `transform` method to specify the transformation
logic.
Attributes:
None explicitly defined, as this is a dataclass intended to be
subclassed.
Methods:
transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
Abstract method that must be implemented by subclasses.
It performs a transformation on a given data record based
on the provided schema.
Usage:
To use this class, create a subclass that implements the
`transform` method with the desired transformation logic.
"""

@abstractmethod
def transform(
self,
record: Dict[str, Any],
schema: Mapping[str, Any],
) -> None:
"""
Perform a transformation on a data record based on a given schema.
Args:
record (Dict[str, Any]): The data record to be transformed.
schema (Mapping[str, Any]): The schema that dictates how
the record should be transformed.
Returns:
None
Raises:
NotImplementedError: If the method is not implemented
by a subclass.
"""
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,22 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomSchemaNormalization(BaseModel):
class Config:
extra = Extra.allow

type: Literal["CustomSchemaNormalization"]
class_name: str = Field(
...,
description="Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.",
examples=[
"source_amazon_seller_partner.components.LedgerDetailedViewReportsTypeTransformer"
],
title="Class Name",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CustomStateMigration(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -1530,7 +1546,11 @@ class RecordSelector(BaseModel):
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
SchemaNormalization.None_,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@
from airbyte_cdk.sources.declarative.extractors.record_filter import (
ClientSideIncrementalRecordFilterDecorator,
)
from airbyte_cdk.sources.declarative.extractors.record_selector import (
SCHEMA_TRANSFORMER_TYPE_MAPPING,
)
from airbyte_cdk.sources.declarative.incremental import (
ChildPartitionResumableFullRefreshCursor,
CursorFactory,
Expand All @@ -100,7 +97,9 @@
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import (
LegacyToPerPartitionStateMigration,
)
from airbyte_cdk.sources.declarative.models import CustomStateMigration
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
)
Expand Down Expand Up @@ -185,6 +184,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaLoader as CustomSchemaLoader,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomSchemaNormalization as CustomSchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CustomTransformation as CustomTransformationModel,
)
Expand Down Expand Up @@ -311,6 +313,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ResponseToFileExtractor as ResponseToFileExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaNormalization as SchemaNormalizationModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
)
Expand Down Expand Up @@ -445,6 +450,11 @@

ComponentDefinition = Mapping[str, Any]

SCHEMA_TRANSFORMER_TYPE_MAPPING = {
SchemaNormalizationModel.None_: TransformConfig.NoTransform,
SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization,
}


class ModelToComponentFactory:
EPOCH_DATETIME_FORMAT = "%s"
Expand Down Expand Up @@ -493,6 +503,7 @@ def _init_mappings(self) -> None:
CustomRequesterModel: self.create_custom_component,
CustomRetrieverModel: self.create_custom_component,
CustomSchemaLoader: self.create_custom_component,
CustomSchemaNormalizationModel: self.create_custom_component,
CustomStateMigration: self.create_custom_component,
CustomPaginationStrategyModel: self.create_custom_component,
CustomPartitionRouterModel: self.create_custom_component,
Expand Down Expand Up @@ -2000,7 +2011,6 @@ def create_record_selector(
client_side_incremental_sync: Dict[str, Any] | None = None,
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
extractor = self._create_component_from_model(
model=model.extractor, decoder=decoder, config=config
)
Expand All @@ -2018,8 +2028,10 @@ def create_record_selector(
else None,
**client_side_incremental_sync,
)
schema_normalization = TypeTransformer(
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
schema_normalization = (
TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])
if isinstance(model.schema_normalization, SchemaNormalizationModel)
else self._create_component_from_model(model.schema_normalization, config=config) # type: ignore[arg-type] # custom normalization model expected here
)

return RecordSelector(
Expand Down

0 comments on commit 3c76ef3

Please sign in to comment.