From f2c9b0a5fe805ac1407ea30afc3628feffba44f5 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 23 Dec 2024 13:55:02 +0100 Subject: [PATCH] Composite Decoder: add to model factory Signed-off-by: Artem Inzhyyants --- .../parsers/model_to_component_factory.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 215d6fff..46ae171c 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,6 +26,12 @@ from isodate import parse_duration from pydantic.v1 import BaseModel +from sources.declarative.decoders.composite_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + JsonLineParser, +) from airbyte_cdk.models import FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager @@ -125,6 +131,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CompositeRawDecoder as CompositeRawDecoderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) @@ -134,6 +143,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConstantBackoffStrategy as ConstantBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CsvParser as CsvParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CursorPagination as CursorPaginationModel, ) @@ -200,6 +212,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GzipParser as GzipParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpComponentsResolver as HttpComponentsResolverModel, ) @@ -224,6 +239,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonLineParser as JsonLineParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -470,7 +488,9 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonLineParserModel: self.create_jsonline_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, + GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -1666,6 +1686,12 @@ def create_jsonl_decoder( ) -> JsonlDecoder: return JsonlDecoder(parameters={}) + @staticmethod + def create_jsonline_parser( + model: JsonLineParserModel, config: Config, **kwargs: Any + ) -> JsonLineParser: + return JsonLineParser(encoding=model.encoding) + @staticmethod def create_iterable_decoder( model: IterableDecoderModel, config: Config, **kwargs: Any @@ -1682,6 +1708,20 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) + @staticmethod + def create_gzip_parser(model: GzipParserModel, config: Config, **kwargs: Any) -> GzipParser: + return GzipParser(inner_parser=model.inner_parser) + + @staticmethod + def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: + return CsvParser(encoding=model.encoding, delimiter=model.delimiter) + + @staticmethod + def create_composite_raw_decoder( + model: CompositeRawDecoderModel, config: Config, **kwargs: Any + ) -> CompositeRawDecoder: + return CompositeRawDecoder(parser=model.parser) + @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any