Skip to content

Commit 8ff298e

Browse files
author
Rusi Popov
committed
Merge branch 'main' into 50395-2-2
* main: feat(low-code): add DpathFlattenFields (airbytehq#227)
2 parents 71e3c70 + 17dd71f commit 8ff298e

File tree

5 files changed

+235
-0
lines changed

5 files changed

+235
-0
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,6 +1316,7 @@ definitions:
13161316
- "$ref": "#/definitions/KeysToLower"
13171317
- "$ref": "#/definitions/KeysToSnakeCase"
13181318
- "$ref": "#/definitions/FlattenFields"
1319+
- "$ref": "#/definitions/DpathFlattenFields"
13191320
- "$ref": "#/definitions/KeysReplace"
13201321
state_migrations:
13211322
title: State Migrations
@@ -1866,6 +1867,7 @@ definitions:
18661867
- "$ref": "#/definitions/KeysToLower"
18671868
- "$ref": "#/definitions/KeysToSnakeCase"
18681869
- "$ref": "#/definitions/FlattenFields"
1870+
- "$ref": "#/definitions/DpathFlattenFields"
18691871
- "$ref": "#/definitions/KeysReplace"
18701872
schema_type_identifier:
18711873
"$ref": "#/definitions/SchemaTypeIdentifier"
@@ -1970,6 +1972,33 @@ definitions:
19701972
$parameters:
19711973
type: object
19721974
additionalProperties: true
1975+
DpathFlattenFields:
1976+
title: Dpath Flatten Fields
1977+
description: A transformation that flatten field values to the to top of the record.
1978+
type: object
1979+
required:
1980+
- type
1981+
- field_path
1982+
properties:
1983+
type:
1984+
type: string
1985+
enum: [DpathFlattenFields]
1986+
field_path:
1987+
title: Field Path
1988+
description: A path to field that needs to be flattened.
1989+
type: array
1990+
items:
1991+
- type: string
1992+
examples:
1993+
- ["data"]
1994+
- ["data", "*", "field"]
1995+
delete_origin_value:
1996+
title: Delete Origin Value
1997+
description: Whether to delete the origin value or keep it. Default is False.
1998+
type: boolean
1999+
$parameters:
2000+
type: object
2001+
additionalProperties: true
19732002
KeysReplace:
19742003
title: Keys Replace
19752004
description: A transformation that replaces symbols in keys.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,25 @@ class FlattenFields(BaseModel):
792792
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
793793

794794

795+
class DpathFlattenFields(BaseModel):
796+
type: Literal["DpathFlattenFields"]
797+
field_path: List[str] = Field(
798+
...,
799+
description="A path to field that needs to be flattened.",
800+
examples=[
801+
["data"],
802+
["data", "*", "field"],
803+
],
804+
title="Field Path",
805+
)
806+
delete_origin_value: Optional[bool] = Field(
807+
False,
808+
description="Whether to delete the origin value or keep it. Default is False.",
809+
title="Delete Origin Value",
810+
)
811+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
812+
813+
795814
class KeysReplace(BaseModel):
796815
type: Literal["KeysReplace"]
797816
old: str = Field(
@@ -1819,6 +1838,7 @@ class Config:
18191838
KeysToLower,
18201839
KeysToSnakeCase,
18211840
FlattenFields,
1841+
DpathFlattenFields,
18221842
KeysReplace,
18231843
]
18241844
]
@@ -1994,6 +2014,7 @@ class DynamicSchemaLoader(BaseModel):
19942014
KeysToLower,
19952015
KeysToSnakeCase,
19962016
FlattenFields,
2017+
DpathFlattenFields,
19972018
KeysReplace,
19982019
]
19992020
]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@
211211
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
212212
DpathExtractor as DpathExtractorModel,
213213
)
214+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
215+
DpathFlattenFields as DpathFlattenFieldsModel,
216+
)
214217
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
215218
DynamicSchemaLoader as DynamicSchemaLoaderModel,
216219
)
@@ -434,6 +437,9 @@
434437
RemoveFields,
435438
)
436439
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
440+
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import (
441+
DpathFlattenFields,
442+
)
437443
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
438444
FlattenFields,
439445
)
@@ -542,6 +548,7 @@ def _init_mappings(self) -> None:
542548
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
543549
KeysReplaceModel: self.create_keys_replace_transformation,
544550
FlattenFieldsModel: self.create_flatten_fields,
551+
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
545552
IterableDecoderModel: self.create_iterable_decoder,
546553
XmlDecoderModel: self.create_xml_decoder,
547554
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
@@ -677,6 +684,19 @@ def create_flatten_fields(
677684
flatten_lists=model.flatten_lists if model.flatten_lists is not None else True
678685
)
679686

687+
def create_dpath_flatten_fields(
688+
self, model: DpathFlattenFieldsModel, config: Config, **kwargs: Any
689+
) -> DpathFlattenFields:
690+
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
691+
return DpathFlattenFields(
692+
config=config,
693+
field_path=model_field_path,
694+
delete_origin_value=model.delete_origin_value
695+
if model.delete_origin_value is not None
696+
else False,
697+
parameters=model.parameters or {},
698+
)
699+
680700
@staticmethod
681701
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
682702
if not value_type:
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from dataclasses import InitVar, dataclass
2+
from typing import Any, Dict, List, Mapping, Optional, Union
3+
4+
import dpath
5+
6+
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
7+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
8+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
9+
10+
11+
@dataclass
12+
class DpathFlattenFields(RecordTransformation):
13+
"""
14+
Flatten fields only for provided path.
15+
16+
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
17+
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
18+
19+
"""
20+
21+
config: Config
22+
field_path: List[Union[InterpolatedString, str]]
23+
parameters: InitVar[Mapping[str, Any]]
24+
delete_origin_value: bool = False
25+
26+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
27+
self._field_path = [
28+
InterpolatedString.create(path, parameters=parameters) for path in self.field_path
29+
]
30+
for path_index in range(len(self.field_path)):
31+
if isinstance(self.field_path[path_index], str):
32+
self._field_path[path_index] = InterpolatedString.create(
33+
self.field_path[path_index], parameters=parameters
34+
)
35+
36+
def transform(
37+
self,
38+
record: Dict[str, Any],
39+
config: Optional[Config] = None,
40+
stream_state: Optional[StreamState] = None,
41+
stream_slice: Optional[StreamSlice] = None,
42+
) -> None:
43+
path = [path.eval(self.config) for path in self._field_path]
44+
if "*" in path:
45+
matched = dpath.values(record, path)
46+
extracted = matched[0] if matched else None
47+
else:
48+
extracted = dpath.get(record, path, default=[])
49+
50+
if isinstance(extracted, dict):
51+
conflicts = set(extracted.keys()) & set(record.keys())
52+
if not conflicts:
53+
if self.delete_origin_value:
54+
dpath.delete(record, path)
55+
record.update(extracted)
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import pytest
2+
3+
from airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields import DpathFlattenFields
4+
5+
_ANY_VALUE = -1
6+
_DELETE_ORIGIN_VALUE = True
7+
_DO_NOT_DELETE_ORIGIN_VALUE = False
8+
9+
10+
@pytest.mark.parametrize(
11+
[
12+
"input_record",
13+
"config",
14+
"field_path",
15+
"delete_origin_value",
16+
"expected_record",
17+
],
18+
[
19+
pytest.param(
20+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
21+
{},
22+
["field2"],
23+
_DO_NOT_DELETE_ORIGIN_VALUE,
24+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
25+
id="flatten by dpath, don't delete origin value",
26+
),
27+
pytest.param(
28+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
29+
{},
30+
["field2"],
31+
_DELETE_ORIGIN_VALUE,
32+
{"field1": _ANY_VALUE, "field3": _ANY_VALUE},
33+
id="flatten by dpath, delete origin value",
34+
),
35+
pytest.param(
36+
{
37+
"field1": _ANY_VALUE,
38+
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
39+
},
40+
{},
41+
["field2", "*", "field4"],
42+
_DO_NOT_DELETE_ORIGIN_VALUE,
43+
{
44+
"field1": _ANY_VALUE,
45+
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
46+
"field5": _ANY_VALUE,
47+
},
48+
id="flatten by dpath with *, don't delete origin value",
49+
),
50+
pytest.param(
51+
{
52+
"field1": _ANY_VALUE,
53+
"field2": {"field3": {"field4": {"field5": _ANY_VALUE}}},
54+
},
55+
{},
56+
["field2", "*", "field4"],
57+
_DELETE_ORIGIN_VALUE,
58+
{"field1": _ANY_VALUE, "field2": {"field3": {}}, "field5": _ANY_VALUE},
59+
id="flatten by dpath with *, delete origin value",
60+
),
61+
pytest.param(
62+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
63+
{"field_path": "field2"},
64+
["{{ config['field_path'] }}"],
65+
_DO_NOT_DELETE_ORIGIN_VALUE,
66+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
67+
id="flatten by dpath from config, don't delete origin value",
68+
),
69+
pytest.param(
70+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
71+
{},
72+
["non-existing-field"],
73+
_DO_NOT_DELETE_ORIGIN_VALUE,
74+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
75+
id="flatten by non-existing dpath, don't delete origin value",
76+
),
77+
pytest.param(
78+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
79+
{},
80+
["*", "non-existing-field"],
81+
_DO_NOT_DELETE_ORIGIN_VALUE,
82+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}},
83+
id="flatten by non-existing dpath with *, don't delete origin value",
84+
),
85+
pytest.param(
86+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
87+
{},
88+
["field2"],
89+
_DO_NOT_DELETE_ORIGIN_VALUE,
90+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
91+
id="flatten by dpath, not to update when record has field conflicts, don't delete origin value",
92+
),
93+
pytest.param(
94+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
95+
{},
96+
["field2"],
97+
_DO_NOT_DELETE_ORIGIN_VALUE,
98+
{"field1": _ANY_VALUE, "field2": {"field3": _ANY_VALUE}, "field3": _ANY_VALUE},
99+
id="flatten by dpath, not to update when record has field conflicts, delete origin value",
100+
),
101+
],
102+
)
103+
def test_dpath_flatten_lists(
104+
input_record, config, field_path, delete_origin_value, expected_record
105+
):
106+
flattener = DpathFlattenFields(
107+
field_path=field_path, parameters={}, config=config, delete_origin_value=delete_origin_value
108+
)
109+
flattener.transform(input_record)
110+
assert input_record == expected_record

0 commit comments

Comments
 (0)