Skip to content

Commit

Permalink
#49971 Publish the response object in the transformation's context
Browse files Browse the repository at this point in the history
  • Loading branch information
rpopov committed Dec 26, 2024
1 parent 02cc6c4 commit 8bfa7ab
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import requests

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
Expand All @@ -21,6 +22,7 @@
SchemaNormalization.Default: TransformConfig.DefaultSchemaNormalization,
}

STREAM_SLICE_RESPONSE_KEY = "response"

@dataclass
class RecordSelector(HttpSelector):
Expand Down Expand Up @@ -51,7 +53,8 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if isinstance(self._name, str)
else self._name
)

self.response_root_extractor = DpathExtractor(field_path=[], config={}, parameters={})

@property # type: ignore
def name(self) -> str:
"""
Expand Down Expand Up @@ -86,9 +89,16 @@ def select_records(
:return: List of Records selected from the response
"""
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
yield from self.filter_and_transform(
all_data, stream_state, records_schema, stream_slice, next_page_token
)

response_root_data = self.response_root_extractor.extract_records(response)
stream_state.update({STREAM_SLICE_RESPONSE_KEY: response_root_data})
try:
yield from self.filter_and_transform(
all_data, stream_state, records_schema, stream_slice, next_page_token
)
finally:
stream_state.pop(STREAM_SLICE_RESPONSE_KEY)


def filter_and_transform(
self,
Expand Down

0 comments on commit 8bfa7ab

Please sign in to comment.