Skip to content

Commit

Permalink
Add KeyToSnakeCase transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
lazebnyi committed Dec 17, 2024
1 parent 216cd43 commit 453580d
Show file tree
Hide file tree
Showing 7 changed files with 530 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ definitions:
- "$ref": "#/definitions/CustomTransformation"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeyToSnakeCase"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1838,6 +1839,19 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyToSnakeCase:
title: Key to Snake Case
description: A transformation that renames all keys to snake case.
type: object
required:
- type
properties:
type:
type: string
enum: [KeyToSnakeCase]
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down Expand Up @@ -2160,7 +2174,9 @@ definitions:
description: |-
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
- {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
}
access_token_params:
title: Access Token Query Params (Json Encoded)
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,11 @@ class KeysToLower(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyToSnakeCase(BaseModel):
type: Literal["KeyToSnakeCase"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class IterableDecoder(BaseModel):
type: Literal["IterableDecoder"]

Expand Down Expand Up @@ -1654,7 +1659,15 @@ class Config:
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
List[
Union[
AddFields,
CustomTransformation,
RemoveFields,
KeysToLower,
KeyToSnakeCase,
]
]
] = Field(
None,
description="A list of transformations to be applied to each output record.",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import re
from dataclasses import dataclass
from typing import Any, Dict, Optional

import unidecode

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

TOKEN_PATTERN = re.compile(r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)")
DEFAULT_SEPARATOR = "_"


@dataclass
class KeyToSnakeCaseTransformation(RecordTransformation):
token_pattern: re.Pattern = TOKEN_PATTERN

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
transformed_record = {}
for key in record:
transformed_key = self.process_key(key)
transformed_record[transformed_key] = record[key]
record.clear()
record.update(transformed_record)

def process_key(self, key: str) -> str:
key = self.normalize_key(key)
tokens = self.tokenize_key(key)
tokens = self.filter_tokens(tokens)
return self.tokens_to_snake_case(tokens)

def normalize_key(self, key: str) -> str:
return unidecode.unidecode(key)

def tokenize_key(self, key: str) -> list:
tokens = []
for match in self.token_pattern.finditer(key):
token = match.group(0) if match.group("NoToken") is None else ""
tokens.append(token)
return tokens

def filter_tokens(self, tokens: list) -> list:
if len(tokens) >= 3:
tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
if tokens and tokens[0].isdigit():
tokens.insert(0, "")
return tokens

def tokens_to_snake_case(self, tokens: list) -> str:
return "_".join(token.lower() for token in tokens)
Loading

0 comments on commit 453580d

Please sign in to comment.