diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 694cb104..48be0820 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2254,7 +2254,7 @@ def create_async_retriever( extractor=download_extractor, name=name, record_filter=None, - transformations=[], + transformations=transformations, schema_normalization=TypeTransformer(TransformConfig.NoTransform), config=config, parameters={}, diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 1e7ddd59..0015662b 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -189,7 +189,12 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: for url in self.urls_extractor.extract_records( self._polling_job_response_by_id[job.api_job_id()] ): - stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) + job_slice = job.job_parameters() + stream_slice = StreamSlice( + partition=job_slice.partition, + cursor_slice=job_slice.cursor_slice, + extra_fields={**job_slice.extra_fields, "url": url}, + ) for message in self.download_retriever.read_records({}, stream_slice): if isinstance(message, Record): yield message.data diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 07cd44a3..3c466ccd 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -152,3 +152,6 @@ def __json_serializable__(self) -> Any: def __hash__(self) -> int: return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) + + def __bool__(self) -> bool: + return bool(self._stream_slice) or bool(self._extra_fields) diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 346fec82..cdc14e60 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -84,7 +84,7 @@ def setUp(self) -> None: requester=HttpRequester( name="stream : fetch_result", url_base="", - path="{{stream_slice['url']}}", + path="{{stream_slice.extra_fields['url']}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG,