diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 83b318bf0..2bf13d3df 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1331,6 +1331,19 @@ definitions: $parameters: type: object additionalProperties: true + ResponseToFileExtractor: + title: CSV To File Extractor + description: A record extractor designed for handling large responses that may exceed memory limits (to prevent OOM issues). It downloads a CSV file to disk, reads the data from disk, and deletes the file once it has been fully processed. + type: object + required: + - type + properties: + type: + type: string + enum: [ResponseToFileExtractor] + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. @@ -2680,6 +2693,12 @@ definitions: anyOf: - "$ref": "#/definitions/CustomRecordExtractor" - "$ref": "#/definitions/DpathExtractor" + download_extractor: + description: Responsible for fetching the records from provided urls. + anyOf: + - "$ref": "#/definitions/CustomRecordExtractor" + - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/ResponseToFileExtractor" creation_requester: description: Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job. anyOf: @@ -2734,6 +2753,16 @@ definitions: - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" - "$ref": "#/definitions/GzipJsonDecoder" + download_decoder: + title: Download Decoder + description: Component decoding the download response so records can be extracted. + anyOf: + - "$ref": "#/definitions/CustomDecoder" + - "$ref": "#/definitions/JsonDecoder" + - "$ref": "#/definitions/JsonlDecoder" + - "$ref": "#/definitions/IterableDecoder" + - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/GzipJsonDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 44bf45d6a..0215ddb45 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -6,6 +6,7 @@ import uuid import zlib from contextlib import closing +from dataclasses import InitVar, dataclass from typing import Any, Dict, Iterable, Mapping, Optional, Tuple import pandas as pd @@ -19,6 +20,7 @@ DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 +@dataclass class ResponseToFileExtractor(RecordExtractor): """ This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as @@ -28,7 +30,9 @@ class ResponseToFileExtractor(RecordExtractor): a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. """ - def __init__(self) -> None: + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") def _get_response_encoding(self, headers: Dict[str, Any]) -> str: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3cfba631e..6b830949b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -567,6 +567,11 @@ class DpathExtractor(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class ResponseToFileExtractor(BaseModel): + type: Literal["ResponseToFileExtractor"] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -1798,6 +1803,9 @@ class AsyncRetriever(BaseModel): ..., description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) + download_extractor: Optional[ + Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1848,6 +1856,20 @@ class AsyncRetriever(BaseModel): description="Component decoding the response so records can be extracted.", title="Decoder", ) + download_decoder: Optional[ + Union[ + CustomDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + GzipJsonDecoder, + ] + ] = Field( + None, + description="Component decoding the download response so records can be extracted.", + title="Download Decoder", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e8f7a9b74..089b2065a 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -270,6 +270,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RequestPath as RequestPathModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ResponseToFileExtractor as ResponseToFileExtractorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SelectiveAuthenticator as SelectiveAuthenticatorModel, ) @@ -427,6 +430,7 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, + ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, HttpRequesterModel: self.create_http_requester, @@ -1447,6 +1451,13 @@ def create_dpath_extractor( parameters=model.parameters or {}, ) + def create_response_to_file_extractor( + self, + model: ResponseToFileExtractorModel, + **kwargs: Any, + ) -> ResponseToFileExtractor: + return ResponseToFileExtractor(parameters=model.parameters or {}) + @staticmethod def create_exponential_backoff_strategy( model: ExponentialBackoffStrategyModel, config: Config @@ -2011,6 +2022,7 @@ def create_async_retriever( model=model.record_selector, config=config, decoder=decoder, + name=name, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, ) @@ -2028,16 +2040,36 @@ def create_async_retriever( name=f"job polling - {name}", ) job_download_components_name = f"job download - {name}" + download_decoder = ( + self._create_component_from_model(model=model.download_decoder, config=config) + if model.download_decoder + else JsonDecoder(parameters={}) + ) + download_extractor = ( + self._create_component_from_model( + model=model.download_extractor, + config=config, + decoder=download_decoder, + parameters=model.parameters, + ) + if model.download_extractor + else DpathExtractor( + [], + config=config, + decoder=download_decoder, + parameters=model.parameters or {}, + ) + ) download_requester = self._create_component_from_model( model=model.download_requester, - decoder=decoder, + decoder=download_decoder, config=config, name=job_download_components_name, ) download_retriever = SimpleRetriever( requester=download_requester, record_selector=RecordSelector( - extractor=ResponseToFileExtractor(), + extractor=download_extractor, name=name, record_filter=None, transformations=[], diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 034be3771..1e7ddd594 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -42,7 +42,7 @@ class AsyncHttpJobRepository(AsyncJobRepository): job_timeout: Optional[timedelta] = None record_extractor: RecordExtractor = field( - init=False, repr=False, default_factory=lambda: ResponseToFileExtractor() + init=False, repr=False, default_factory=lambda: ResponseToFileExtractor({}) ) def __post_init__(self) -> None: diff --git a/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py b/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py index 3c1d5a0f4..eeefe0817 100644 --- a/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py @@ -14,7 +14,7 @@ class ResponseToFileExtractorTest(TestCase): def setUp(self) -> None: - self._extractor = ResponseToFileExtractor() + self._extractor = ResponseToFileExtractor({}) self._http_mocker = requests_mock.Mocker() self._http_mocker.__enter__() @@ -76,7 +76,7 @@ def large_event_response_fixture(): @pytest.mark.limit_memory("20 MB") def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response): lines_in_response, file_path = large_events_response - extractor = ResponseToFileExtractor() + extractor = ResponseToFileExtractor({}) url = "https://for-all-mankind.nasa.com/api/v1/users/users1" requests_mock.get(url, body=open(file_path, "rb")) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 20445ac0f..e68e083e6 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -4,11 +4,12 @@ # mypy: ignore-errors import datetime -from typing import Any, Mapping +from typing import Any, Iterable, Mapping import freezegun import pendulum import pytest +import requests from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import FailureType, Level @@ -27,6 +28,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) @@ -47,6 +49,9 @@ from airbyte_cdk.sources.declarative.models import ( CustomPartitionRouter as CustomPartitionRouterModel, ) +from airbyte_cdk.sources.declarative.models import ( + CustomRecordExtractor as CustomRecordExtractorModel, +) from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel @@ -3271,3 +3276,20 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): "state_type": "date-range", "legacy": {}, } + + +class CustomRecordExtractor(RecordExtractor): + def extract_records( + self, + response: requests.Response, + ) -> Iterable[Mapping[str, Any]]: + yield from response.json() + + +def test_create_custom_record_extractor(): + definition = { + "type": "CustomRecordExtractor", + "class_name": "unit_tests.sources.declarative.parsers.test_model_to_component_factory.CustomRecordExtractor", + } + component = factory.create_component(CustomRecordExtractorModel, definition, {}) + assert isinstance(component, CustomRecordExtractor) diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 06d16b38f..346fec826 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -95,7 +95,7 @@ def setUp(self) -> None: stream_response=True, ), record_selector=RecordSelector( - extractor=ResponseToFileExtractor(), + extractor=ResponseToFileExtractor({}), record_filter=None, transformations=[], schema_normalization=TypeTransformer(TransformConfig.NoTransform),