Skip to content

Commit

Permalink
Add flatten fields
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Dec 18, 2024
1 parent adef1e8 commit 9b364f3
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 0 deletions.
13 changes: 13 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
FlattenFields:
title: Flatten Fields
description: A transformation that flatten record to single level format.
type: object
required:
- type
properties:
type:
type: string
enum: [FlattenFields]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FlattenFields as FlattenFieldsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
GzipJsonDecoder as GzipJsonDecoderModel,
)
Expand Down Expand Up @@ -387,6 +390,9 @@
RemoveFields,
)
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -472,6 +478,7 @@ def _init_mappings(self) -> None:
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
Expand Down Expand Up @@ -587,6 +594,11 @@ def create_keys_to_lower_transformation(
) -> KeysToLowerTransformation:
return KeysToLowerTransformation()

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
return FlattenFields()

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
Expand Down
50 changes: 50 additions & 0 deletions airbyte_cdk/sources/declarative/transformations/flatten_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Dict, Optional

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class FlattenFields(RecordTransformation):
def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = self.flatten_record(record)
record.clear()
record.update(transformed_record)

def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
stack = [(record, "_")]
transformed_record = {}
force_with_parent_name = False

while stack:
current_record, parent_key = stack.pop()

if isinstance(current_record, dict):
for current_key, value in current_record.items():
new_key = (
f"{parent_key}.{current_key}"
if (current_key in transformed_record or force_with_parent_name)
else current_key
)
stack.append((value, new_key))

elif isinstance(current_record, list):
for i, item in enumerate(current_record):
force_with_parent_name = True
stack.append((item, f"{parent_key}.{i}"))

else:
transformed_record[parent_key] = current_record

return transformed_record
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import pytest

from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)


@pytest.mark.parametrize(
"input_record, expected_output",
[
({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}),
({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}),
(
{
"NestedRecord": {"FirstName": "John", "LastName": "Doe"},
"456Another123": 456,
},
{
"FirstName": "John",
"LastName": "Doe",
"456Another123": 456,
},
),
(
{"ListExample": [{"A": "a"}, {"A": "b"}]},
{"ListExample.0.A": "a", "ListExample.1.A": "b"},
),
(
{
"MixedCase123": {
"Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}]
},
"SimpleKey": "SimpleValue",
},
{
"Nested.0.Key.Value": "test1",
"Nested.1.Key.Value": "test2",
"SimpleKey": "SimpleValue",
},
),
(
{"List": ["Item1", "Item2", "Item3"]},
{"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"},
),
],
)
def test_flatten_fields(input_record, expected_output):
flattener = FlattenFields()
flattener.transform(input_record)
assert input_record == expected_output

0 comments on commit 9b364f3

Please sign in to comment.