From 366567da572db5015053c017b591f8b8abbfdec6 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 11:38:08 +0100 Subject: [PATCH 1/8] Async Retriever: pass url as extra_field to ignore it in state manager Signed-off-by: Artem Inzhyyants --- .../sources/declarative/requesters/http_job_repository.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 1e7ddd59..d53671e6 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -189,7 +189,8 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: for url in self.urls_extractor.extract_records( self._polling_job_response_by_id[job.api_job_id()] ): - stream_slice: StreamSlice = StreamSlice(partition={"url": url}, cursor_slice={}) + stream_slice = job.job_parameters() + stream_slice.extra_fields.update({"url": url}) for message in self.download_retriever.read_records({}, stream_slice): if isinstance(message, Record): yield message.data From 478badf08c42cb9e1b953d4ca466697748df46e8 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 11:40:10 +0100 Subject: [PATCH 2/8] Async Retriever: add transformations to download retriever Signed-off-by: Artem Inzhyyants --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 694cb104..48be0820 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2254,7 +2254,7 @@ def create_async_retriever( extractor=download_extractor, name=name, record_filter=None, - transformations=[], + transformations=transformations, schema_normalization=TypeTransformer(TransformConfig.NoTransform), config=config, parameters={}, From 4e147bbbacb2eae69ed8d6d2c3787e35911eff4f Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 11:57:18 +0100 Subject: [PATCH 3/8] Async Retriever: update types Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/types.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 07cd44a3..a922d271 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -4,7 +4,7 @@ from __future__ import annotations -from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView +from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView, MutableMapping import orjson @@ -102,7 +102,7 @@ def cursor_slice(self) -> Mapping[str, Any]: return c @property - def extra_fields(self) -> Mapping[str, Any]: + def extra_fields(self) -> MutableMapping[str, Any]: """Returns the extra fields that are not part of the partition.""" return self._extra_fields From ec8a1a110bfc19daa6da08a898ec7597a28bd54b Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 12:14:14 +0100 Subject: [PATCH 4/8] Async Retriever: ref test Signed-off-by: Artem Inzhyyants --- .../sources/declarative/requesters/test_http_job_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/requesters/test_http_job_repository.py b/unit_tests/sources/declarative/requesters/test_http_job_repository.py index 346fec82..cdc14e60 100644 --- a/unit_tests/sources/declarative/requesters/test_http_job_repository.py +++ b/unit_tests/sources/declarative/requesters/test_http_job_repository.py @@ -84,7 +84,7 @@ def setUp(self) -> None: requester=HttpRequester( name="stream : fetch_result", url_base="", - path="{{stream_slice['url']}}", + path="{{stream_slice.extra_fields['url']}}", error_handler=error_handler, http_method=HttpMethod.GET, config=_ANY_CONFIG, From 8539f82fb72d1b254d67f87c33cfaf996d1ce272 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 12:14:36 +0100 Subject: [PATCH 5/8] Async Retriever: add bool for StreamSlice Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/types.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index a922d271..c0640d43 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -152,3 +152,6 @@ def __json_serializable__(self) -> Any: def __hash__(self) -> int: return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) + + def __bool__(self): + return bool(self._stream_slice) or bool(self._extra_fields) From f68f29a0f4a451ece5e28b632d8de5ad7686d318 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 12:22:00 +0100 Subject: [PATCH 6/8] Async Retriever: fmt Signed-off-by: Artem Inzhyyants --- airbyte_cdk/sources/types.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index c0640d43..13e9145c 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -4,7 +4,17 @@ from __future__ import annotations -from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView, MutableMapping +from typing import ( + Any, + ItemsView, + Iterator, + KeysView, + List, + Mapping, + MutableMapping, + Optional, + ValuesView, +) import orjson @@ -68,7 +78,7 @@ def __init__( *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], - extra_fields: Optional[Mapping[str, Any]] = None, + extra_fields: Optional[MutableMapping[str, Any]] = None, ) -> None: """ :param partition: The partition keys representing a unique partition in the stream. @@ -153,5 +163,5 @@ def __json_serializable__(self) -> Any: def __hash__(self) -> int: return hash(orjson.dumps(self._stream_slice, option=orjson.OPT_SORT_KEYS)) - def __bool__(self): + def __bool__(self) -> bool: return bool(self._stream_slice) or bool(self._extra_fields) From 31c2b1a21327d5e45f978a2698d9ae105acacae2 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Mon, 30 Dec 2024 12:29:42 +0100 Subject: [PATCH 7/8] Async Retriever: fmt Signed-off-by: Artem Inzhyyants --- .../partition_routers/substream_partition_router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 1c7bb696..82edf9fe 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -4,7 +4,7 @@ import copy import logging from dataclasses import InitVar, dataclass -from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, Optional, Union +from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath @@ -215,7 +215,7 @@ def _extract_extra_fields( self, parent_record: Mapping[str, Any] | AirbyteMessage, extra_fields: Optional[List[List[str]]] = None, - ) -> Mapping[str, Any]: + ) -> MutableMapping[str, Any]: """ Extracts additional fields specified by their paths from the parent record. From 9d33042dc42708ea65ea077139e87744f3801c01 Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants Date: Fri, 3 Jan 2025 12:11:28 +0100 Subject: [PATCH 8/8] CDK: rev mutableMapping -> Mapping Signed-off-by: Artem Inzhyyants --- .../substream_partition_router.py | 4 ++-- .../requesters/http_job_repository.py | 8 ++++++-- airbyte_cdk/sources/types.py | 16 +++------------- 3 files changed, 11 insertions(+), 17 deletions(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 82edf9fe..1c7bb696 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -4,7 +4,7 @@ import copy import logging from dataclasses import InitVar, dataclass -from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union +from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, Optional, Union import dpath @@ -215,7 +215,7 @@ def _extract_extra_fields( self, parent_record: Mapping[str, Any] | AirbyteMessage, extra_fields: Optional[List[List[str]]] = None, - ) -> MutableMapping[str, Any]: + ) -> Mapping[str, Any]: """ Extracts additional fields specified by their paths from the parent record. diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index d53671e6..0015662b 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -189,8 +189,12 @@ def fetch_records(self, job: AsyncJob) -> Iterable[Mapping[str, Any]]: for url in self.urls_extractor.extract_records( self._polling_job_response_by_id[job.api_job_id()] ): - stream_slice = job.job_parameters() - stream_slice.extra_fields.update({"url": url}) + job_slice = job.job_parameters() + stream_slice = StreamSlice( + partition=job_slice.partition, + cursor_slice=job_slice.cursor_slice, + extra_fields={**job_slice.extra_fields, "url": url}, + ) for message in self.download_retriever.read_records({}, stream_slice): if isinstance(message, Record): yield message.data diff --git a/airbyte_cdk/sources/types.py b/airbyte_cdk/sources/types.py index 13e9145c..3c466ccd 100644 --- a/airbyte_cdk/sources/types.py +++ b/airbyte_cdk/sources/types.py @@ -4,17 +4,7 @@ from __future__ import annotations -from typing import ( - Any, - ItemsView, - Iterator, - KeysView, - List, - Mapping, - MutableMapping, - Optional, - ValuesView, -) +from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView import orjson @@ -78,7 +68,7 @@ def __init__( *, partition: Mapping[str, Any], cursor_slice: Mapping[str, Any], - extra_fields: Optional[MutableMapping[str, Any]] = None, + extra_fields: Optional[Mapping[str, Any]] = None, ) -> None: """ :param partition: The partition keys representing a unique partition in the stream. @@ -112,7 +102,7 @@ def cursor_slice(self) -> Mapping[str, Any]: return c @property - def extra_fields(self) -> MutableMapping[str, Any]: + def extra_fields(self) -> Mapping[str, Any]: """Returns the extra fields that are not part of the partition.""" return self._extra_fields