Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): add DpathFlattenFields #227

Merged
merged 5 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
Expand Down Expand Up @@ -1865,6 +1866,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
Expand Down Expand Up @@ -1969,6 +1971,33 @@ definitions:
$parameters:
type: object
additionalProperties: true
DpathFlattenFields:
title: Dpath Flatten Fields
description: A transformation that flatten field values to the to top of the record.
darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
type: object
required:
- type
- field_path
properties:
type:
type: string
enum: [DpathFlattenFields]
field_path:
title: Field Path
description: A path to field that needs to be flattened.
type: array
items:
- type: string
examples:
- ["data"]
- ["data", "*", "field"]
delete_origin_value:
title: Delete Origin Value
description: Whether to delete the origin value or keep it. Default is False.
type: boolean
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,25 @@ class FlattenFields(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DpathFlattenFields(BaseModel):
type: Literal["DpathFlattenFields"]
field_path: List[str] = Field(
...,
description="A path to field that needs to be flattened.",
examples=[
["data"],
["data", "*", "field"],
],
title="Field Path",
)
delete_origin_value: Optional[bool] = Field(
False,
description="Whether to delete the origin value or keep it. Default is False.",
title="Delete Origin Value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
Expand Down Expand Up @@ -1810,6 +1829,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
]
]
Expand Down Expand Up @@ -1985,6 +2005,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
]
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathExtractor as DpathExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DpathFlattenFields as DpathFlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
Expand Down Expand Up @@ -430,6 +433,9 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
DpathFlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
Expand Down Expand Up @@ -538,6 +544,7 @@ def _init_mappings(self) -> None:
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
Expand Down Expand Up @@ -672,6 +679,19 @@ def create_flatten_fields(
flatten_lists=model.flatten_lists if model.flatten_lists is not None else True
)

def create_dpath_flatten_fields(
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
) -> DpathFlattenFields:
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
return DpathFlattenFields(
config=config,
field_path=model_field_path,
delete_origin_value=model.delete_origin_value
if model.delete_origin_value is not None
else False,
parameters=model.parameters or {},
)

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Union

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class DpathFlattenFields(RecordTransformation):
"""
Flatten fields only for provided path.

field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.

"""

config: Config
field_path: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
delete_origin_value: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self._field_path[path_index] = InterpolatedString.create(
self.field_path[path_index], parameters=parameters
)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
matched = dpath.values(record, path)
extracted = matched[0] if matched else None
else:
extracted = dpath.get(record, path, default=[])

darynaishchenko marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(extracted, dict):
conflicts = set(extracted.keys()) & set(record.keys())
if not conflicts:
if self.delete_origin_value:
dpath.delete(record, path)
record.update(extracted)
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest

from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields

_ANY_VALUE = -1
_DELETE_ORIGIN_VALUE = True
_DO_NOT_DELETE_ORIGIN_VALUE = False


@pytest.mark.parametrize(
[
"input_record",
"config",
"field_path",
"delete_origin_value",
"expected_record",
],
[
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["field2"],
_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
id="flatten by dpath, delete origin value",
),
pytest.param(
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
},
{},
["field2", "*", "field4"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
"field5": _ANY_VALUE,
},
id="flatten by dpath with *, don't delete origin value",
),
pytest.param(
{
"field1": _ANY_VALUE,
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
},
{},
["field2", "*", "field4"],
_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
id="flatten by dpath with *, delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{"field_path": "field2"},
["{{ config['field_path'] }}"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath from config, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
{},
["*", "non-existing-field"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
id="flatten by non-existing dpath with *, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
),
pytest.param(
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
{},
["field2"],
_DO_NOT_DELETE_ORIGIN_VALUE,
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
),
],
)
def test_dpath_flatten_lists(
input_record, config, field_path, delete_origin_value, expected_record
):
flattener = DpathFlattenFields(
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
)
flattener.transform(input_record)
assert input_record == expected_record
Loading