diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 980b601a2..90b6875cc 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1316,6 +1316,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" - "$ref": "#/definitions/KeysReplace" state_migrations: title: State Migrations @@ -1865,6 +1866,7 @@ definitions: - "$ref": "#/definitions/KeysToLower" - "$ref": "#/definitions/KeysToSnakeCase" - "$ref": "#/definitions/FlattenFields" + - "$ref": "#/definitions/DpathFlattenFields" - "$ref": "#/definitions/KeysReplace" schema_type_identifier: "$ref": "#/definitions/SchemaTypeIdentifier" @@ -1969,6 +1971,33 @@ definitions: $parameters: type: object additionalProperties: true + DpathFlattenFields: + title: Dpath Flatten Fields + description: A transformation that flatten field values to the to top of the record. + type: object + required: + - type + - field_path + properties: + type: + type: string + enum: [DpathFlattenFields] + field_path: + title: Field Path + description: A path to field that needs to be flattened. + type: array + items: + - type: string + examples: + - ["data"] + - ["data", "*", "field"] + delete_origin_value: + title: Delete Origin Value + description: Whether to delete the origin value or keep it. Default is False. + type: boolean + $parameters: + type: object + additionalProperties: true KeysReplace: title: Keys Replace description: A transformation that replaces symbols in keys. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 670340472..9c2b242f7 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -792,6 +792,25 @@ class FlattenFields(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class DpathFlattenFields(BaseModel): + type: Literal["DpathFlattenFields"] + field_path: List[str] = Field( + ..., + description="A path to field that needs to be flattened.", + examples=[ + ["data"], + ["data", "*", "field"], + ], + title="Field Path", + ) + delete_origin_value: Optional[bool] = Field( + False, + description="Whether to delete the origin value or keep it. Default is False.", + title="Delete Origin Value", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class KeysReplace(BaseModel): type: Literal["KeysReplace"] old: str = Field( @@ -1810,6 +1829,7 @@ class Config: KeysToLower, KeysToSnakeCase, FlattenFields, + DpathFlattenFields, KeysReplace, ] ] @@ -1985,6 +2005,7 @@ class DynamicSchemaLoader(BaseModel): KeysToLower, KeysToSnakeCase, FlattenFields, + DpathFlattenFields, KeysReplace, ] ] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 597be6386..51c3172ce 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -210,6 +210,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DpathExtractor as DpathExtractorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DpathFlattenFields as DpathFlattenFieldsModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicSchemaLoader as DynamicSchemaLoaderModel, ) @@ -430,6 +433,9 @@ RemoveFields, ) from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import ( + DpathFlattenFields, +) from airbyte_cdk.sources.declarative.transformations.flatten_fields import ( FlattenFields, ) @@ -538,6 +544,7 @@ def _init_mappings(self) -> None: KeysToSnakeCaseModel: self.create_keys_to_snake_transformation, KeysReplaceModel: self.create_keys_replace_transformation, FlattenFieldsModel: self.create_flatten_fields, + DpathFlattenFieldsModel: self.create_dpath_flatten_fields, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, JsonFileSchemaLoaderModel: self.create_json_file_schema_loader, @@ -672,6 +679,19 @@ def create_flatten_fields( flatten_lists=model.flatten_lists if model.flatten_lists is not None else True ) + def create_dpath_flatten_fields( + self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any + ) -> DpathFlattenFields: + model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + return DpathFlattenFields( + config=config, + field_path=model_field_path, + delete_origin_value=model.delete_origin_value + if model.delete_origin_value is not None + else False, + parameters=model.parameters or {}, + ) + @staticmethod def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]: if not value_type: diff --git a/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py new file mode 100644 index 000000000..73162d848 --- /dev/null +++ b/airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py @@ -0,0 +1,55 @@ +from dataclasses import InitVar, dataclass +from typing import Any, Dict, List, Mapping, Optional, Union + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.transformations import RecordTransformation +from airbyte_cdk.sources.types import Config, StreamSlice, StreamState + + +@dataclass +class DpathFlattenFields(RecordTransformation): + """ + Flatten fields only for provided path. + + field_path: List[Union[InterpolatedString, str]] path to the field to flatten. + delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. + + """ + + config: Config + field_path: List[Union[InterpolatedString, str]] + parameters: InitVar[Mapping[str, Any]] + delete_origin_value: bool = False + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._field_path = [ + InterpolatedString.create(path, parameters=parameters) for path in self.field_path + ] + for path_index in range(len(self.field_path)): + if isinstance(self.field_path[path_index], str): + self._field_path[path_index] = InterpolatedString.create( + self.field_path[path_index], parameters=parameters + ) + + def transform( + self, + record: Dict[str, Any], + config: Optional[Config] = None, + stream_state: Optional[StreamState] = None, + stream_slice: Optional[StreamSlice] = None, + ) -> None: + path = [path.eval(self.config) for path in self._field_path] + if "*" in path: + matched = dpath.values(record, path) + extracted = matched[0] if matched else None + else: + extracted = dpath.get(record, path, default=[]) + + if isinstance(extracted, dict): + conflicts = set(extracted.keys()) & set(record.keys()) + if not conflicts: + if self.delete_origin_value: + dpath.delete(record, path) + record.update(extracted) diff --git a/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py new file mode 100644 index 000000000..578999636 --- /dev/null +++ b/unit_tests/sources/declarative/transformations/test_dpath_flatten_fields.py @@ -0,0 +1,110 @@ +import pytest + +from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields + +_ANY_VALUE = -1 +_DELETE_ORIGIN_VALUE = True +_DO_NOT_DELETE_ORIGIN_VALUE = False + + +@pytest.mark.parametrize( + [ + "input_record", + "config", + "field_path", + "delete_origin_value", + "expected_record", + ], + [ + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["field2"], + _DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field3": _ANY_VALUE}, + id="flatten by dpath, delete origin value", + ), + pytest.param( + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + }, + {}, + ["field2", "*", "field4"], + _DO_NOT_DELETE_ORIGIN_VALUE, + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + "field5": _ANY_VALUE, + }, + id="flatten by dpath with *, don't delete origin value", + ), + pytest.param( + { + "field1": _ANY_VALUE, + "field2": {"field3": {"field4": {"field5": _ANY_VALUE}}}, + }, + {}, + ["field2", "*", "field4"], + _DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE}, + id="flatten by dpath with *, delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {"field_path": "field2"}, + ["{{ config['field_path'] }}"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath from config, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["non-existing-field"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + id="flatten by non-existing dpath, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + {}, + ["*", "non-existing-field"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}}, + id="flatten by non-existing dpath with *, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, not to update when record has field conflicts, don't delete origin value", + ), + pytest.param( + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + {}, + ["field2"], + _DO_NOT_DELETE_ORIGIN_VALUE, + {"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE}, + id="flatten by dpath, not to update when record has field conflicts, delete origin value", + ), + ], +) +def test_dpath_flatten_lists( + input_record, config, field_path, delete_origin_value, expected_record +): + flattener = DpathFlattenFields( + field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value + ) + flattener.transform(input_record) + assert input_record == expected_record