diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 5db0b0909..be9b77517 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -20,6 +20,9 @@ ClientSideIncrementalRecordFilterDecorator, ) from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor +from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( + PerPartitionWithGlobalCursor, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( @@ -32,7 +35,7 @@ ModelToComponentFactory, ) from airbyte_cdk.sources.declarative.requesters import HttpRequester -from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers import Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartitionFactory, StreamSlicerPartitionGenerator, @@ -231,21 +234,7 @@ def _group_streams( stream_state=stream_state, ) - retriever = declarative_stream.retriever - - # This is an optimization so that we don't invoke any cursor or state management flows within the - # low-code framework because state management is handled through the ConcurrentCursor. - if declarative_stream and isinstance(retriever, SimpleRetriever): - # Also a temporary hack. In the legacy Stream implementation, as part of the read, - # set_initial_state() is called to instantiate incoming state on the cursor. Although we no - # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components - # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator - # still rely on a DatetimeBasedCursor that is properly initialized with state. - if retriever.cursor: - retriever.cursor.set_initial_state(stream_state=stream_state) - # We zero it out here, but since this is a cursor reference, the state is still properly - # instantiated for the other components that reference it - retriever.cursor = None + retriever = self._get_retriever(declarative_stream, stream_state) partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( @@ -305,6 +294,60 @@ def _group_streams( cursor=final_state_cursor, ) ) + elif ( + incremental_sync_component_definition + and incremental_sync_component_definition.get("type", "") + == DatetimeBasedCursorModel.__name__ + and self._stream_supports_concurrent_partition_processing( + declarative_stream=declarative_stream + ) + and hasattr(declarative_stream.retriever, "stream_slicer") + and isinstance( + declarative_stream.retriever.stream_slicer, PerPartitionWithGlobalCursor + ) + ): + stream_state = state_manager.get_stream_state( + stream_name=declarative_stream.name, namespace=declarative_stream.namespace + ) + partition_router = declarative_stream.retriever.stream_slicer._partition_router + + perpartition_cursor = ( + self._constructor.create_concurrent_cursor_from_perpartition_cursor( + state_manager=state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + stream_state=stream_state, + partition_router=partition_router, + ) + ) + + retriever = self._get_retriever(declarative_stream, stream_state) + + partition_generator = StreamSlicerPartitionGenerator( + DeclarativePartitionFactory( + declarative_stream.name, + declarative_stream.get_json_schema(), + retriever, + self.message_repository, + ), + perpartition_cursor, + ) + + concurrent_streams.append( + DefaultStream( + partition_generator=partition_generator, + name=declarative_stream.name, + json_schema=declarative_stream.get_json_schema(), + availability_strategy=AlwaysAvailableAvailabilityStrategy(), + primary_key=get_primary_key_from_stream(declarative_stream.primary_key), + cursor_field=perpartition_cursor.cursor_field.cursor_field_key, + logger=self.logger, + cursor=perpartition_cursor, + ) + ) else: synchronous_streams.append(declarative_stream) else: @@ -395,6 +438,27 @@ def _stream_supports_concurrent_partition_processing( return False return True + def _get_retriever( + self, declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] + ) -> Retriever: + retriever = declarative_stream.retriever + + # This is an optimization so that we don't invoke any cursor or state management flows within the + # low-code framework because state management is handled through the ConcurrentCursor. + if declarative_stream and isinstance(retriever, SimpleRetriever): + # Also a temporary hack. In the legacy Stream implementation, as part of the read, + # set_initial_state() is called to instantiate incoming state on the cursor. Although we no + # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components + # like StopConditionPaginationStrategyDecorator and ClientSideIncrementalRecordFilterDecorator + # still rely on a DatetimeBasedCursor that is properly initialized with state. + if retriever.cursor: + retriever.cursor.set_initial_state(stream_state=stream_state) + # We zero it out here, but since this is a cursor reference, the state is still properly + # instantiated for the other components that reference it + retriever.cursor = None + + return retriever + @staticmethod def _select_streams( streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py index b744c9796..373669612 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -59,13 +59,11 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): def __init__( self, - date_time_based_cursor: DatetimeBasedCursor, - substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]], + cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor], **kwargs: Any, ): super().__init__(**kwargs) - self._date_time_based_cursor = date_time_based_cursor - self._substream_cursor = substream_cursor + self._cursor = cursor def filter_records( self, @@ -77,7 +75,7 @@ def filter_records( records = ( record for record in records - if (self._substream_cursor or self._date_time_based_cursor).should_be_synced( + if self._cursor.should_be_synced( # Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here # Record stream name is empty cause it is not used durig the filtering Record(data=record, associated_slice=stream_slice, stream_name="") diff --git a/airbyte_cdk/sources/declarative/incremental/__init__.py b/airbyte_cdk/sources/declarative/incremental/__init__.py index 7ce54a07a..395daca6d 100644 --- a/airbyte_cdk/sources/declarative/incremental/__init__.py +++ b/airbyte_cdk/sources/declarative/incremental/__init__.py @@ -2,6 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, +) from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( @@ -21,6 +25,8 @@ __all__ = [ "CursorFactory", + "ConcurrentCursorFactory", + "ConcurrentPerPartitionCursor", "DatetimeBasedCursor", "DeclarativeCursor", "GlobalSubstreamCursor", diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py new file mode 100644 index 000000000..b71890cce --- /dev/null +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -0,0 +1,334 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import copy +import logging +import threading +from collections import OrderedDict +from copy import deepcopy +from datetime import timedelta +from typing import Any, Callable, Iterable, Mapping, MutableMapping, Optional + +from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager +from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( + Timer, + iterate_with_last_flag_and_state, +) +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import ( + PerPartitionKeySerializer, +) +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField +from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition +from airbyte_cdk.sources.types import Record, StreamSlice, StreamState + +logger = logging.getLogger("airbyte") + + +class ConcurrentCursorFactory: + def __init__(self, create_function: Callable[..., ConcurrentCursor]): + self._create_function = create_function + + def create( + self, stream_state: Mapping[str, Any], runtime_lookback_window: Optional[timedelta] + ) -> ConcurrentCursor: + return self._create_function( + stream_state=stream_state, runtime_lookback_window=runtime_lookback_window + ) + + +class ConcurrentPerPartitionCursor(Cursor): + """ + Manages state per partition when a stream has many partitions, preventing data loss or duplication. + + Attributes: + DEFAULT_MAX_PARTITIONS_NUMBER (int): Maximum number of partitions to retain in memory (default is 10,000). + + - **Partition Limitation Logic** + Ensures the number of tracked partitions does not exceed the specified limit to prevent memory overuse. Oldest partitions are removed when the limit is reached. + + - **Global Cursor Fallback** + New partitions use global state as the initial state to progress the state for deleted or new partitions. The history data added after the initial sync will be missing. + + CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}. + """ + + DEFAULT_MAX_PARTITIONS_NUMBER = 10000 + _NO_STATE: Mapping[str, Any] = {} + _NO_CURSOR_STATE: Mapping[str, Any] = {} + _GLOBAL_STATE_KEY = "state" + _PERPARTITION_STATE_KEY = "states" + _KEY = 0 + _VALUE = 1 + + def __init__( + self, + cursor_factory: ConcurrentCursorFactory, + partition_router: PartitionRouter, + stream_name: str, + stream_namespace: Optional[str], + stream_state: Any, + message_repository: MessageRepository, + connector_state_manager: ConnectorStateManager, + cursor_field: CursorField, + ) -> None: + self._global_cursor: Optional[StreamState] = {} + self._stream_name = stream_name + self._stream_namespace = stream_namespace + self._message_repository = message_repository + self._connector_state_manager = connector_state_manager + self._cursor_field = cursor_field + + self._cursor_factory = cursor_factory + self._partition_router = partition_router + + # The dict is ordered to ensure that once the maximum number of partitions is reached, + # the oldest partitions can be efficiently removed, maintaining the most recent partitions. + self._cursor_per_partition: OrderedDict[str, ConcurrentCursor] = OrderedDict() + self._semaphore_per_partition: OrderedDict[str, threading.Semaphore] = OrderedDict() + self._finished_partitions: set[str] = set() + self._lock = threading.Lock() + self._timer = Timer() + self._new_global_cursor: Optional[StreamState] = None + self._lookback_window: int = 0 + self._parent_state: Optional[StreamState] = None + self._over_limit: int = 0 + self._partition_serializer = PerPartitionKeySerializer() + + self._set_initial_state(stream_state) + + @property + def cursor_field(self) -> CursorField: + return self._cursor_field + + @property + def state(self) -> MutableMapping[str, Any]: + states = [] + for partition_tuple, cursor in self._cursor_per_partition.items(): + if cursor.state: + states.append( + { + "partition": self._to_dict(partition_tuple), + "cursor": copy.deepcopy(cursor.state), + } + ) + state: dict[str, Any] = {self._PERPARTITION_STATE_KEY: states} + + if self._global_cursor: + state[self._GLOBAL_STATE_KEY] = self._global_cursor + if self._lookback_window is not None: + state["lookback_window"] = self._lookback_window + if self._parent_state is not None: + state["parent_state"] = self._parent_state + return state + + def close_partition(self, partition: Partition) -> None: + # Attempt to retrieve the stream slice + stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment] + + # Ensure stream_slice is not None + if stream_slice is None: + raise ValueError("stream_slice cannot be None") + + partition_key = self._to_partition_key(stream_slice.partition) + self._cursor_per_partition[partition_key].close_partition(partition=partition) + with self._lock: + self._semaphore_per_partition[partition_key].acquire() + cursor = self._cursor_per_partition[partition_key] + if ( + partition_key in self._finished_partitions + and self._semaphore_per_partition[partition_key]._value == 0 + ): + if ( + self._new_global_cursor is None + or self._new_global_cursor[self.cursor_field.cursor_field_key] + < cursor.state[self.cursor_field.cursor_field_key] + ): + self._new_global_cursor = copy.deepcopy(cursor.state) + self._emit_state_message() + + def ensure_at_least_one_state_emitted(self) -> None: + """ + The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be + called. + """ + if not any( + semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() + ): + self._global_cursor = self._new_global_cursor + self._lookback_window = self._timer.finish() + self._parent_state = self._partition_router.get_stream_state() + self._emit_state_message() + + def _emit_state_message(self) -> None: + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + self._message_repository.emit_message(state_message) + + def stream_slices(self) -> Iterable[StreamSlice]: + if self._timer.is_running(): + raise RuntimeError("stream_slices has been executed more than once.") + + slices = self._partition_router.stream_slices() + self._timer.start() + for partition in slices: + yield from self._generate_slices_from_partition(partition) + + def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]: + # Ensure the maximum number of partitions is not exceeded + self._ensure_partition_limit() + + cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition)) + if not cursor: + cursor = self._create_cursor( + self._global_cursor, + self._lookback_window if self._global_cursor else 0, + ) + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( + threading.Semaphore(0) + ) + + for cursor_slice, is_last_slice, _ in iterate_with_last_flag_and_state( + cursor.stream_slices(), + lambda: None, + ): + self._semaphore_per_partition[self._to_partition_key(partition.partition)].release() + if is_last_slice: + self._finished_partitions.add(self._to_partition_key(partition.partition)) + yield StreamSlice( + partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields + ) + + def _ensure_partition_limit(self) -> None: + """ + Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. + """ + while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + self._over_limit += 1 + oldest_partition = self._cursor_per_partition.popitem(last=False)[ + 0 + ] # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) + + def _set_initial_state(self, stream_state: StreamState) -> None: + """ + Initialize the cursor's state using the provided `stream_state`. + + This method supports global and per-partition state initialization. + + - **Global State**: If `states` is missing, the `state` is treated as global and applied to all partitions. + The `global state` holds a single cursor position representing the latest processed record across all partitions. + + - **Lookback Window**: Configured via `lookback_window`, it defines the period (in seconds) for reprocessing records. + This ensures robustness in case of upstream data delays or reordering. If not specified, it defaults to 0. + + - **Per-Partition State**: If `states` is present, each partition's cursor state is initialized separately. + + - **Parent State**: (if available) Used to initialize partition routers based on parent streams. + + Args: + stream_state (StreamState): The state of the streams to be set. The format of the stream state should be: + { + "states": [ + { + "partition": { + "partition_key": "value" + }, + "cursor": { + "last_updated": "2023-05-27T00:00:00Z" + } + } + ], + "state": { + "last_updated": "2023-05-27T00:00:00Z" + }, + lookback_window: 10, + "parent_state": { + "parent_stream_name": { + "last_updated": "2023-05-27T00:00:00Z" + } + } + } + """ + if not stream_state: + return + + if self._PERPARTITION_STATE_KEY not in stream_state: + # We assume that `stream_state` is in a global format that can be applied to all partitions. + # Example: {"global_state_format_key": "global_state_format_value"} + self._global_cursor = deepcopy(stream_state) + self._new_global_cursor = deepcopy(stream_state) + + else: + self._lookback_window = int(stream_state.get("lookback_window", 0)) + + for state in stream_state[self._PERPARTITION_STATE_KEY]: + self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( + self._create_cursor(state["cursor"]) + ) + self._semaphore_per_partition[self._to_partition_key(state["partition"])] = ( + threading.Semaphore(0) + ) + + # set default state for missing partitions if it is per partition with fallback to global + if self._GLOBAL_STATE_KEY in stream_state: + self._global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY]) + self._new_global_cursor = deepcopy(stream_state[self._GLOBAL_STATE_KEY]) + + # Set initial parent state + if stream_state.get("parent_state"): + self._parent_state = stream_state["parent_state"] + + # Set parent state for partition routers based on parent streams + self._partition_router.set_initial_state(stream_state) + + def observe(self, record: Record) -> None: + if not record.associated_slice: + raise ValueError( + "Invalid state as stream slices that are emitted should refer to an existing cursor" + ) + self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ].observe(record) + + def _to_partition_key(self, partition: Mapping[str, Any]) -> str: + return self._partition_serializer.to_partition_key(partition) + + def _to_dict(self, partition_key: str) -> Mapping[str, Any]: + return self._partition_serializer.to_partition(partition_key) + + def _create_cursor( + self, cursor_state: Any, runtime_lookback_window: int = 0 + ) -> ConcurrentCursor: + cursor = self._cursor_factory.create( + stream_state=deepcopy(cursor_state), + runtime_lookback_window=timedelta(seconds=runtime_lookback_window), + ) + return cursor + + def should_be_synced(self, record: Record) -> bool: + return self._get_cursor(record).should_be_synced(record) + + def _get_cursor(self, record: Record) -> ConcurrentCursor: + if not record.associated_slice: + raise ValueError( + "Invalid state as stream slices that are emitted should refer to an existing cursor" + ) + partition_key = self._to_partition_key(record.associated_slice.partition) + if partition_key not in self._cursor_per_partition: + raise ValueError( + "Invalid state as stream slices that are emitted should refer to an existing cursor" + ) + cursor = self._cursor_per_partition[partition_key] + return cursor diff --git a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py b/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py index 3b3636236..f5439b9ce 100644 --- a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py @@ -64,6 +64,9 @@ def finish(self) -> int: else: raise RuntimeError("Global substream cursor timer not started") + def is_running(self) -> bool: + return self._start is not None + class GlobalSubstreamCursor(DeclarativeCursor): """ diff --git a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py index 8241f7761..4c1eacce5 100644 --- a/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py @@ -303,6 +303,21 @@ def get_request_body_json( raise ValueError("A partition needs to be provided in order to get request body json") def should_be_synced(self, record: Record) -> bool: + if ( + record.associated_slice + and self._to_partition_key(record.associated_slice.partition) + not in self._cursor_per_partition + ): + partition_state = ( + self._state_to_migrate_from + if self._state_to_migrate_from + else self._NO_CURSOR_STATE + ) + cursor = self._create_cursor(partition_state) + + self._cursor_per_partition[ + self._to_partition_key(record.associated_slice.partition) + ] = cursor return self._get_cursor(record).should_be_synced( self._convert_record_to_cursor_record(record) ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4fb38acd1..4a17a95c3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -87,6 +87,8 @@ ) from airbyte_cdk.sources.declarative.incremental import ( ChildPartitionResumableFullRefreshCursor, + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, CursorFactory, DatetimeBasedCursor, DeclarativeCursor, @@ -461,6 +463,7 @@ InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, MessageRepository, + NoopMessageRepository, ) from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, @@ -926,6 +929,8 @@ def create_concurrent_cursor_from_datetime_based_cursor( stream_namespace: Optional[str], config: Config, stream_state: MutableMapping[str, Any], + message_repository: Optional[MessageRepository] = None, + runtime_lookback_window: Optional[datetime.timedelta] = None, **kwargs: Any, ) -> ConcurrentCursor: component_type = component_definition.get("type") @@ -987,10 +992,22 @@ def create_concurrent_cursor_from_datetime_based_cursor( connector_state_converter = CustomFormatConcurrentStreamStateConverter( datetime_format=datetime_format, input_datetime_formats=datetime_based_cursor_model.cursor_datetime_formats, - is_sequential_state=True, + is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state cursor_granularity=cursor_granularity, ) + # Adjusts the stream state by applying the runtime lookback window. + # This is used to ensure correct state handling in case of failed partitions. + stream_state_value = stream_state.get(cursor_field.cursor_field_key) + if runtime_lookback_window and stream_state_value: + new_stream_state = ( + connector_state_converter.parse_timestamp(stream_state_value) + - runtime_lookback_window + ) + stream_state[cursor_field.cursor_field_key] = connector_state_converter.output_format( + new_stream_state + ) + start_date_runtime_value: Union[InterpolatedString, str, MinMaxDatetime] if isinstance(datetime_based_cursor_model.start_datetime, MinMaxDatetimeModel): start_date_runtime_value = self.create_min_max_datetime( @@ -1108,7 +1125,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( stream_name=stream_name, stream_namespace=stream_namespace, stream_state=stream_state, - message_repository=self._message_repository, + message_repository=message_repository or self._message_repository, connector_state_manager=state_manager, connector_state_converter=connector_state_converter, cursor_field=cursor_field, @@ -1140,6 +1157,63 @@ def _assemble_weekday(self, weekday: str) -> Weekday: case _: raise ValueError(f"Unknown weekday {weekday}") + def create_concurrent_cursor_from_perpartition_cursor( + self, + state_manager: ConnectorStateManager, + model_type: Type[BaseModel], + component_definition: ComponentDefinition, + stream_name: str, + stream_namespace: Optional[str], + config: Config, + stream_state: MutableMapping[str, Any], + partition_router: PartitionRouter, + **kwargs: Any, + ) -> ConcurrentPerPartitionCursor: + component_type = component_definition.get("type") + if component_definition.get("type") != model_type.__name__: + raise ValueError( + f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead" + ) + + datetime_based_cursor_model = model_type.parse_obj(component_definition) + + if not isinstance(datetime_based_cursor_model, DatetimeBasedCursorModel): + raise ValueError( + f"Expected {model_type.__name__} component, but received {datetime_based_cursor_model.__class__.__name__}" + ) + + interpolated_cursor_field = InterpolatedString.create( + datetime_based_cursor_model.cursor_field, + parameters=datetime_based_cursor_model.parameters or {}, + ) + cursor_field = CursorField(interpolated_cursor_field.eval(config=config)) + + # Create the cursor factory + cursor_factory = ConcurrentCursorFactory( + partial( + self.create_concurrent_cursor_from_datetime_based_cursor, + state_manager=state_manager, + model_type=model_type, + component_definition=component_definition, + stream_name=stream_name, + stream_namespace=stream_namespace, + config=config, + message_repository=NoopMessageRepository(), + ) + ) + + # Return the concurrent cursor and state converter + return ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=self._message_repository, # type: ignore + connector_state_manager=state_manager, + cursor_field=cursor_field, + ) + @staticmethod def create_constant_backoff_strategy( model: ConstantBackoffStrategyModel, config: Config, **kwargs: Any @@ -1445,18 +1519,15 @@ def create_declarative_stream( raise ValueError( "Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead" ) - client_side_incremental_sync = { - "date_time_based_cursor": self._create_component_from_model( - model=model.incremental_sync, config=config - ), - "substream_cursor": ( - combined_slicers - if isinstance( - combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor) - ) - else None - ), - } + cursor = ( + combined_slicers + if isinstance( + combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor) + ) + else self._create_component_from_model(model=model.incremental_sync, config=config) + ) + + client_side_incremental_sync = {"cursor": cursor} if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): cursor_model = model.incremental_sync @@ -2303,7 +2374,7 @@ def create_simple_retriever( if ( not isinstance(stream_slicer, DatetimeBasedCursor) or type(stream_slicer) is not DatetimeBasedCursor - ): + ) and not isinstance(stream_slicer, PerPartitionWithGlobalCursor): # Many of the custom component implementations of DatetimeBasedCursor override get_request_params() (or other methods). # Because we're decoupling RequestOptionsProvider from the Cursor, custom components will eventually need to reimplement # their own RequestOptionsProvider. However, right now the existing StreamSlicer/Cursor still can act as the SimpleRetriever's diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index d167a84bc..45533ac4b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -160,7 +160,7 @@ def _request_headers( stream_slice, next_page_token, self._paginator.get_request_headers, - self.stream_slicer.get_request_headers, + self.request_option_provider.get_request_headers, ) if isinstance(headers, str): raise ValueError("Request headers cannot be a string") diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 73e45cdd1..88d15bc8a 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -169,7 +169,9 @@ def __init__( @property def state(self) -> MutableMapping[str, Any]: - return self._concurrent_state + return self._connector_state_converter.convert_to_state_message( + self.cursor_field, self._concurrent_state + ) @property def cursor_field(self) -> CursorField: @@ -214,10 +216,10 @@ def _extract_cursor_value(self, record: Record) -> Any: return self._connector_state_converter.parse_value(self._cursor_field.extract_value(record)) def close_partition(self, partition: Partition) -> None: - slice_count_before = len(self.state.get("slices", [])) + slice_count_before = len(self._concurrent_state.get("slices", [])) self._add_slice_to_state(partition) if slice_count_before < len( - self.state["slices"] + self._concurrent_state["slices"] ): # only emit if at least one slice has been processed self._merge_partitions() self._emit_state_message() @@ -229,11 +231,11 @@ def _add_slice_to_state(self, partition: Partition) -> None: ) if self._slice_boundary_fields: - if "slices" not in self.state: + if "slices" not in self._concurrent_state: raise RuntimeError( f"The state for stream {self._stream_name} should have at least one slice to delineate the sync start time, but no slices are present. This is unexpected. Please contact Support." ) - self.state["slices"].append( + self._concurrent_state["slices"].append( { self._connector_state_converter.START_KEY: self._extract_from_slice( partition, self._slice_boundary_fields[self._START_BOUNDARY] @@ -261,7 +263,7 @@ def _add_slice_to_state(self, partition: Partition) -> None: "expected. Please contact the Airbyte team." ) - self.state["slices"].append( + self._concurrent_state["slices"].append( { self._connector_state_converter.START_KEY: self.start, self._connector_state_converter.END_KEY: most_recent_cursor_value, @@ -273,9 +275,7 @@ def _emit_state_message(self) -> None: self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, - self._connector_state_converter.convert_to_state_message( - self._cursor_field, self.state - ), + self.state, ) state_message = self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace @@ -283,7 +283,9 @@ def _emit_state_message(self) -> None: self._message_repository.emit_message(state_message) def _merge_partitions(self) -> None: - self.state["slices"] = self._connector_state_converter.merge_intervals(self.state["slices"]) + self._concurrent_state["slices"] = self._connector_state_converter.merge_intervals( + self._concurrent_state["slices"] + ) def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType: try: @@ -320,36 +322,42 @@ def stream_slices(self) -> Iterable[StreamSlice]: if self._start is not None and self._is_start_before_first_slice(): yield from self._split_per_slice_range( self._start, - self.state["slices"][0][self._connector_state_converter.START_KEY], + self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY], False, ) - if len(self.state["slices"]) == 1: + if len(self._concurrent_state["slices"]) == 1: yield from self._split_per_slice_range( self._calculate_lower_boundary_of_last_slice( - self.state["slices"][0][self._connector_state_converter.END_KEY] + self._concurrent_state["slices"][0][self._connector_state_converter.END_KEY] ), self._end_provider(), True, ) - elif len(self.state["slices"]) > 1: - for i in range(len(self.state["slices"]) - 1): + elif len(self._concurrent_state["slices"]) > 1: + for i in range(len(self._concurrent_state["slices"]) - 1): if self._cursor_granularity: yield from self._split_per_slice_range( - self.state["slices"][i][self._connector_state_converter.END_KEY] + self._concurrent_state["slices"][i][self._connector_state_converter.END_KEY] + self._cursor_granularity, - self.state["slices"][i + 1][self._connector_state_converter.START_KEY], + self._concurrent_state["slices"][i + 1][ + self._connector_state_converter.START_KEY + ], False, ) else: yield from self._split_per_slice_range( - self.state["slices"][i][self._connector_state_converter.END_KEY], - self.state["slices"][i + 1][self._connector_state_converter.START_KEY], + self._concurrent_state["slices"][i][ + self._connector_state_converter.END_KEY + ], + self._concurrent_state["slices"][i + 1][ + self._connector_state_converter.START_KEY + ], False, ) yield from self._split_per_slice_range( self._calculate_lower_boundary_of_last_slice( - self.state["slices"][-1][self._connector_state_converter.END_KEY] + self._concurrent_state["slices"][-1][self._connector_state_converter.END_KEY] ), self._end_provider(), True, @@ -360,7 +368,8 @@ def stream_slices(self) -> Iterable[StreamSlice]: def _is_start_before_first_slice(self) -> bool: return ( self._start is not None - and self._start < self.state["slices"][0][self._connector_state_converter.START_KEY] + and self._start + < self._concurrent_state["slices"][0][self._connector_state_converter.START_KEY] ) def _calculate_lower_boundary_of_last_slice( diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index 12f06a94e..5df391327 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -290,8 +290,7 @@ def test_client_side_record_filter_decorator_no_parent_stream( config={}, condition=record_filter_expression, parameters={}, - date_time_based_cursor=date_time_based_cursor, - substream_cursor=None, + cursor=date_time_based_cursor, ) filtered_records = list( @@ -429,8 +428,7 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor: record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( config={}, parameters={}, - date_time_based_cursor=date_time_based_cursor, - substream_cursor=substream_cursor, + cursor=substream_cursor or date_time_based_cursor, ) # The partition we're testing diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py new file mode 100644 index 000000000..8862600d5 --- /dev/null +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -0,0 +1,2328 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. + +from copy import deepcopy +from datetime import datetime, timedelta +from typing import Any, List, Mapping, MutableMapping, Optional, Union +from urllib.parse import unquote + +import pytest +from orjson import orjson + +from airbyte_cdk.models import ( + AirbyteStateBlob, + AirbyteStateMessage, + AirbyteStateType, + AirbyteStreamState, + StreamDescriptor, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder +from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read + +SUBSTREAM_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comment_votes"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": { + "type": "RequestOption", + "field_name": "per_page", + "inject_into": "request_parameter", + }, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": { + "inject_into": "request_parameter", + "field_name": "start_time", + "type": "RequestOption", + }, + }, + "posts_stream": { + "type": "DeclarativeStream", + "name": "posts", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "title": {"type": "string"}, + "content": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + }, + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "posts", + "path": "community/posts", + "data_path": "posts", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.id }}/comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": ["comments"]}, + "record_filter": { + "condition": "{{ record['updated_at'] >= stream_state.get('updated_at', config.get('start_date')) }}" + }, + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/posts_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date') }}"}, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts/{{ stream_slice.id }}/comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + "post_comment_votes_stream": { + "type": "DeclarativeStream", + "name": "post_comment_votes", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "comment_id": {"type": "integer"}, + "vote": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": "#/definitions/retriever/record_selector", + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/post_comments_stream", + "parent_key": "id", + "partition_field": "id", + "incremental_dependency": True, + "extra_fields": [["updated_at"]], + } + ], + }, + }, + "transformations": [ + { + "type": "AddFields", + "fields": [ + { + "path": ["comment_updated_at"], + "value_type": "string", + "value": "{{ stream_slice.extra_fields['updated_at'] }}", + }, + ], + }, + ], + "incremental_sync": "#/definitions/cursor_incremental_sync", + "$parameters": { + "name": "post_comment_votes", + "path": "community/posts/{{ stream_slice.parent_slice.id }}/comments/{{ stream_slice.id }}/votes", + "data_path": "votes", + "cursor_field": "created_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/posts_stream"}, + {"$ref": "#/definitions/post_comments_stream"}, + {"$ref": "#/definitions/post_comment_votes_stream"}, + ], + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + "spec": { + "type": "Spec", + "documentation_url": "https://airbyte.com/#yaml-from-manifest", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["credentials", "start_date"], + "additionalProperties": False, + "properties": { + "credentials": { + "type": "object", + "required": ["email", "api_token"], + "properties": { + "email": { + "type": "string", + "title": "Email", + "description": "The email for authentication.", + }, + "api_token": { + "type": "string", + "airbyte_secret": True, + "title": "API Token", + "description": "The API token for authentication.", + }, + }, + }, + "start_date": { + "type": "string", + "format": "date-time", + "title": "Start Date", + "description": "The date from which to start syncing data.", + }, + }, + }, + }, +} + +STREAM_NAME = "post_comment_votes" +CONFIG = { + "start_date": "2024-01-01T00:00:01Z", + "credentials": {"email": "email", "api_token": "api_token"}, +} + +SUBSTREAM_MANIFEST_NO_DEPENDENCY = deepcopy(SUBSTREAM_MANIFEST) +# Disable incremental_dependency +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comments_stream"]["retriever"][ + "partition_router" +]["parent_stream_configs"][0]["incremental_dependency"] = False +SUBSTREAM_MANIFEST_NO_DEPENDENCY["definitions"]["post_comment_votes_stream"]["retriever"][ + "partition_router" +]["parent_stream_configs"][0]["incremental_dependency"] = False + +import orjson +import requests_mock + + +def run_mocked_test( + mock_requests, manifest, config, stream_name, initial_state, expected_records, expected_state +): + """ + Helper function to mock requests, run the test, and verify the results. + + Args: + mock_requests (list): List of tuples containing the URL and response data to mock. + manifest (dict): Manifest configuration for the source. + config (dict): Source configuration. + stream_name (str): Name of the stream being tested. + initial_state (dict): Initial state for the stream. + expected_records (list): Expected records to be returned by the stream. + expected_state (dict): Expected state after processing the records. + + Raises: + AssertionError: If the test output does not match the expected records or state. + """ + with requests_mock.Mocker() as m: + for url, response in mock_requests: + if response is None: + m.get(url, status_code=404) + else: + m.get(url, json=response) + + initial_state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=stream_name, namespace=None), + stream_state=AirbyteStateBlob(initial_state), + ), + ) + ] + output = _run_read(manifest, config, stream_name, initial_state) + + # Verify records + assert sorted([r.record.data for r in output.records], key=lambda x: x["id"]) == sorted( + expected_records, key=lambda x: x["id"] + ) + + # Verify state + final_state = output.state_messages[-1].state.stream.stream_state + assert final_state.__dict__ == expected_state + + # Verify that each request was made exactly once + for url, _ in mock_requests: + request_count = len( + [req for req in m.request_history if unquote(req.url) == unquote(url)] + ) + assert ( + request_count == 1 + ), f"URL {url} was called {request_count} times, expected exactly once." + + +def _run_read( + manifest: Mapping[str, Any], + config: Mapping[str, Any], + stream_name: str, + state: Optional[Union[List[AirbyteStateMessage], MutableMapping[str, Any]]] = None, +) -> EntrypointOutput: + source = ConcurrentDeclarativeSource( + source_config=manifest, config=config, catalog=None, state=state + ) + output = read( + source, + config, + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name(stream_name)) + .build(), + ) + return output + + +# Existing Constants for Dates +START_DATE = "2024-01-01T00:00:01Z" # Start of the sync +POST_1_UPDATED_AT = "2024-01-30T00:00:00Z" # Latest update date for post 1 +POST_2_UPDATED_AT = "2024-01-29T00:00:00Z" # Latest update date for post 2 +POST_3_UPDATED_AT = "2024-01-28T00:00:00Z" # Latest update date for post 3 + +COMMENT_9_OLDEST = "2023-01-01T00:00:00Z" # Comment in partition 1 - filtered out due to date +COMMENT_10_UPDATED_AT = "2024-01-25T00:00:00Z" # Latest comment in partition 1 +COMMENT_11_UPDATED_AT = "2024-01-24T00:00:00Z" # Comment in partition 1 +COMMENT_12_UPDATED_AT = "2024-01-23T00:00:00Z" # Comment in partition 1 +COMMENT_20_UPDATED_AT = "2024-01-22T00:00:00Z" # Latest comment in partition 2 +COMMENT_21_UPDATED_AT = "2024-01-21T00:00:00Z" # Comment in partition 2 +COMMENT_30_UPDATED_AT = "2024-01-09T00:00:00Z" # Latest comment in partition 3 +LOOKBACK_WINDOW_DAYS = 1 # Lookback window duration in days + +# Votes Date Constants +VOTE_100_CREATED_AT = "2024-01-15T00:00:00Z" # Latest vote in partition 10 +VOTE_101_CREATED_AT = "2024-01-14T00:00:00Z" # Second-latest vote in partition 10 +VOTE_111_CREATED_AT = "2024-01-13T00:00:00Z" # Latest vote in partition 11 +VOTE_200_CREATED_AT = "2024-01-12T00:00:00Z" # Latest vote in partition 20 +VOTE_210_CREATED_AT = "2024-01-12T00:00:15Z" # Latest vote in partition 21 +VOTE_300_CREATED_AT = "2024-01-10T00:00:00Z" # Latest vote in partition 30 + +# Initial State Constants +PARENT_COMMENT_CURSOR_PARTITION_1 = "2023-01-04T00:00:00Z" # Parent comment cursor (partition) +PARENT_POSTS_CURSOR = "2024-01-05T00:00:00Z" # Parent posts cursor (expected in state) + +INITIAL_STATE_PARTITION_10_CURSOR = "2024-01-02T00:00:01Z" +INITIAL_STATE_PARTITION_11_CURSOR = "2024-01-03T00:00:02Z" +INITIAL_GLOBAL_CURSOR = INITIAL_STATE_PARTITION_11_CURSOR +INITIAL_GLOBAL_CURSOR_DATE = datetime.fromisoformat( + INITIAL_STATE_PARTITION_11_CURSOR.replace("Z", "") +) +LOOKBACK_DATE = ( + INITIAL_GLOBAL_CURSOR_DATE - timedelta(days=LOOKBACK_WINDOW_DAYS) +).isoformat() + "Z" + +PARTITION_SYNC_START_TIME = "2024-01-02T00:00:00Z" + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST_NO_DEPENDENCY, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2", + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={START_DATE}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, # No requests for comment 9, filtered out due to the date + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + { + "comments": [ + { + "id": 12, + "post_id": 1, + "updated_at": COMMENT_12_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + "created_at": VOTE_100_CREATED_AT, + } + ], + "next_page": f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 101, + "comment_id": 10, + "created_at": VOTE_101_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + { + "votes": [ + { + "id": 111, + "comment_id": 11, + "created_at": VOTE_111_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [ + { + "id": 20, + "post_id": 2, + "updated_at": COMMENT_20_UPDATED_AT, + } + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + { + "comments": [ + { + "id": 21, + "post_id": 2, + "updated_at": COMMENT_21_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 200, + "comment_id": 20, + "created_at": VOTE_200_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 210, + "comment_id": 21, + "created_at": VOTE_210_CREATED_AT, + } + ] + }, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + { + "comments": [ + { + "id": 30, + "post_id": 3, + "updated_at": COMMENT_30_UPDATED_AT, + } + ] + }, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}", + { + "votes": [ + { + "id": 300, + "comment_id": 30, + "created_at": VOTE_300_CREATED_AT, + } + ] + }, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Initial state + { + # This should not happen since parent state is disabled, but I've added this to validate that and + # incoming parent_state is ignored when the parent stream's incremental_dependency is disabled + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "lookback_window": 86400, + }, + # Expected state + { + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": { + "id": 12, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": { + "id": 20, + "parent_slice": {"id": 2, "parent_slice": {}}, + }, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": { + "id": 21, + "parent_slice": {"id": 2, "parent_slice": {}}, + }, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": { + "id": 30, + "parent_slice": {"id": 3, "parent_slice": {}}, + }, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + "lookback_window": 1, + "parent_state": {}, + "state": {"created_at": VOTE_100_CREATED_AT}, + }, + ), + ], +) +def test_incremental_parent_state_no_incremental_dependency( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + This is a pretty complicated test that syncs a low-code connector stream with three levels of substreams + - posts: (ids: 1, 2, 3) + - post comments: (parent post 1 with ids: 9, 10, 11, 12; parent post 2 with ids: 20, 21; parent post 3 with id: 30) + - post comment votes: (parent comment 10 with ids: 100, 101; parent comment 11 with id: 111; + parent comment 20 with id: 200; parent comment 21 with id: 210, parent comment 30 with id: 300) + + By setting incremental_dependency to false, parent streams will not use the incoming state and will not update state. + The post_comment_votes substream is incremental and will emit state messages We verify this by ensuring that mocked + parent stream requests use the incoming config as query parameters and the substream state messages does not + contain parent stream state. + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + STREAM_NAME, + initial_state, + expected_records, + expected_state, + ) + + +def run_incremental_parent_state_test( + manifest, + mock_requests, + expected_records, + num_intermediate_states, + initial_state, + expected_states, +): + """ + Run an incremental parent state test for the specified stream. + + This function performs the following steps: + 1. Mocks the API requests as defined in mock_requests. + 2. Executes the read operation using the provided manifest and config. + 3. Asserts that the output records match the expected records. + 4. Collects intermediate states and records, performing additional reads as necessary. + 5. Compares the cumulative records from each state against the expected records. + 6. Asserts that the final state matches one of the expected states for each run. + + Args: + manifest (dict): The manifest configuration for the stream. + mock_requests (list): A list of tuples containing URL and response data for mocking API requests. + expected_records (list): The expected records to compare against the output. + num_intermediate_states (int): The number of intermediate states to expect. + initial_state (list): The initial state to start the read operation. + expected_states (list): A list of expected final states after the read operation. + """ + initial_state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name=STREAM_NAME, namespace=None), + stream_state=AirbyteStateBlob(initial_state), + ), + ) + ] + + with requests_mock.Mocker() as m: + for url, response in mock_requests: + m.get(url, json=response) + + # Run the initial read + output = _run_read(manifest, CONFIG, STREAM_NAME, initial_state) + + # Assert that output_data equals expected_records + assert sorted([r.record.data for r in output.records], key=lambda x: x["id"]) == sorted( + expected_records, key=lambda x: x["id"] + ) + + # Collect the intermediate states and records produced before each state + cumulative_records = [] + intermediate_states = [] + final_states = [] # To store the final state after each read + + # Store the final state after the initial read + final_states.append(output.state_messages[-1].state.stream.stream_state.__dict__) + + for message in output.records_and_state_messages: + if message.type.value == "RECORD": + record_data = message.record.data + cumulative_records.append(record_data) + elif message.type.value == "STATE": + # Record the state and the records produced before this state + state = message.state + records_before_state = cumulative_records.copy() + intermediate_states.append((state, records_before_state)) + + # Assert that the number of intermediate states is as expected + assert len(intermediate_states) - 1 == num_intermediate_states + + # For each intermediate state, perform another read starting from that state + for state, records_before_state in intermediate_states[:-1]: + output_intermediate = _run_read(manifest, CONFIG, STREAM_NAME, [state]) + records_from_state = [r.record.data for r in output_intermediate.records] + + # Combine records produced before the state with records from the new read + cumulative_records_state = records_before_state + records_from_state + + # Duplicates may occur because the state matches the cursor of the last record, causing it to be re-emitted in the next sync. + cumulative_records_state_deduped = list( + {orjson.dumps(record): record for record in cumulative_records_state}.values() + ) + + # Compare the cumulative records with the expected records + expected_records_set = list( + {orjson.dumps(record): record for record in expected_records}.values() + ) + assert ( + sorted(cumulative_records_state_deduped, key=lambda x: x["id"]) + == sorted(expected_records_set, key=lambda x: x["id"]) + ), f"Records mismatch with intermediate state {state}. Expected {expected_records}, got {cumulative_records_state_deduped}" + + # Store the final state after each intermediate read + final_state_intermediate = [ + message.state.stream.stream_state.__dict__ + for message in output_intermediate.state_messages + ] + final_states.append(final_state_intermediate[-1]) + + # Assert that the final state matches the expected state for all runs + for i, final_state in enumerate(final_states): + assert ( + final_state in expected_states + ), f"Final state mismatch at run {i + 1}. Expected {expected_states}, got {final_state}" + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, num_intermediate_states, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts" + f"?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + { + "id": 9, + "post_id": 1, + "updated_at": COMMENT_9_OLDEST, + }, + { + "id": 10, + "post_id": 1, + "updated_at": COMMENT_10_UPDATED_AT, + }, + { + "id": 11, + "post_id": 1, + "updated_at": COMMENT_11_UPDATED_AT, + }, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [ + { + "id": 100, + "comment_id": 10, + "created_at": VOTE_100_CREATED_AT, + } + ], + "next_page": ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": [{"id": 101, "comment_id": 10, "created_at": VOTE_101_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 200, "comment_id": 20, "created_at": VOTE_200_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + # Requests with intermediate states + # Fetch votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes?per_page=100&start_time={VOTE_100_CREATED_AT}", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], + }, + ), + # Fetch votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}], + }, + ), + # Fetch votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={VOTE_111_CREATED_AT}", + { + "votes": [], + }, + ), + # Fetch votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes?per_page=100&start_time={VOTE_200_CREATED_AT}", + {"votes": [{"id": 200, "comment_id": 20, "created_at": VOTE_200_CREATED_AT}]}, + ), + # Fetch votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes?per_page=100&start_time={VOTE_210_CREATED_AT}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes?per_page=100&start_time={VOTE_300_CREATED_AT}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Number of intermediate states - 6 as number of parent partitions + 6, + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "lookback_window": 86400, + }, + # Expected state + { + "state": {"created_at": VOTE_100_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, # 10 is the "latest" + "parent_state": { + "posts": {"updated_at": POST_1_UPDATED_AT} + }, # post 1 is the latest + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state( + test_name, + manifest, + mock_requests, + expected_records, + num_intermediate_states, + initial_state, + expected_state, +): + run_incremental_parent_state_test( + manifest, + mock_requests, + expected_records, + num_intermediate_states, + initial_state, + [expected_state], + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARTITION_SYNC_START_TIME}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARTITION_SYNC_START_TIME}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": COMMENT_9_OLDEST}, + {"id": 10, "post_id": 1, "updated_at": COMMENT_10_UPDATED_AT}, + {"id": 11, "post_id": 1, "updated_at": COMMENT_11_UPDATED_AT}, + ], + "next_page": ( + "https://api.example.com/community/posts/1/comments" + "?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], + "next_page": ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={PARTITION_SYNC_START_TIME}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={PARTITION_SYNC_START_TIME}", + {"votes": [{"id": 101, "comment_id": 10, "created_at": VOTE_101_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": ( + "https://api.example.com/community/posts/2/comments" + "?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + {"votes": [{"id": 200, "comment_id": 20, "created_at": VOTE_200_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes" + f"?per_page=100&start_time={PARTITION_SYNC_START_TIME}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 20, + "comment_updated_at": COMMENT_20_UPDATED_AT, + "created_at": VOTE_200_CREATED_AT, + "id": 200, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Initial state + {"created_at": PARTITION_SYNC_START_TIME}, + # Expected state + { + "state": {"created_at": VOTE_100_CREATED_AT}, + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, + "parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": PARTITION_SYNC_START_TIME}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_200_CREATED_AT}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + }, + ), + ], +) +def test_incremental_parent_state_migration( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with parent state migration + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + STREAM_NAME, + initial_state, + expected_records, + expected_state, + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [], + "next_page": ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": []}, + ), + ], + # Expected records (empty) + [], + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "lookback_window": 1, + }, + # Expected state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "state": {}, + "use_global_cursor": False, + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "state": {"created_at": INITIAL_GLOBAL_CURSOR}, + "lookback_window": 1, + }, + ), + ], +) +def test_incremental_parent_state_no_slices( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with no parent records + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + STREAM_NAME, + initial_state, + expected_records, + expected_state, + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100" + f"&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": COMMENT_9_OLDEST}, + {"id": 10, "post_id": 1, "updated_at": COMMENT_10_UPDATED_AT}, + {"id": 11, "post_id": 1, "updated_at": COMMENT_11_UPDATED_AT}, + ], + "next_page": ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [], + "next_page": ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": []}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": []}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/20/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": []}, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": []}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": []}, + ), + ], + # Expected records + [], + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "use_global_cursor": True, + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "lookback_window": 0, + }, + # Expected state + { + "lookback_window": 1, + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + "parent_state": { + "post_comments": { + "use_global_cursor": False, + "state": {"updated_at": COMMENT_10_UPDATED_AT}, + "parent_state": {"posts": {"updated_at": POST_1_UPDATED_AT}}, + "lookback_window": 1, + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_10_UPDATED_AT}, + }, + { + "partition": {"id": 2, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_20_UPDATED_AT}, + }, + { + "partition": {"id": 3, "parent_slice": {}}, + "cursor": {"updated_at": COMMENT_30_UPDATED_AT}, + }, + ], + } + }, + }, + ), + ], +) +def test_incremental_parent_state_no_records( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test incremental partition router with no child records + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + STREAM_NAME, + initial_state, + expected_records, + expected_state, + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + SUBSTREAM_MANIFEST, + [ + # Fetch the first page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}", + { + "posts": [ + {"id": 1, "updated_at": POST_1_UPDATED_AT}, + {"id": 2, "updated_at": POST_2_UPDATED_AT}, + ], + "next_page": ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2" + ), + }, + ), + # Fetch the second page of posts + ( + f"https://api.example.com/community/posts?per_page=100&start_time={PARENT_POSTS_CURSOR}&page=2", + {"posts": [{"id": 3, "updated_at": POST_3_UPDATED_AT}]}, + ), + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": COMMENT_9_OLDEST}, + {"id": 10, "post_id": 1, "updated_at": COMMENT_10_UPDATED_AT}, + {"id": 11, "post_id": 1, "updated_at": COMMENT_11_UPDATED_AT}, + ], + "next_page": ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2", + {"comments": [{"id": 12, "post_id": 1, "updated_at": COMMENT_12_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + { + "votes": [{"id": 100, "comment_id": 10, "created_at": VOTE_100_CREATED_AT}], + "next_page": ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}" + ), + }, + ), + # Fetch the second page of votes for comment 10 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/10/votes" + f"?per_page=100&page=2&start_time={INITIAL_STATE_PARTITION_10_CURSOR}", + {"votes": [{"id": 101, "comment_id": 10, "created_at": VOTE_101_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 11 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/11/votes" + f"?per_page=100&start_time={INITIAL_STATE_PARTITION_11_CURSOR}", + {"votes": [{"id": 111, "comment_id": 11, "created_at": VOTE_111_CREATED_AT}]}, + ), + # Fetch the first page of votes for comment 12 of post 1 + ( + f"https://api.example.com/community/posts/1/comments/12/votes?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": []}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100", + { + "comments": [{"id": 20, "post_id": 2, "updated_at": COMMENT_20_UPDATED_AT}], + "next_page": ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2" + ), + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2", + {"comments": [{"id": 21, "post_id": 2, "updated_at": COMMENT_21_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 20 of post 2 - 404 error + ( + f"https://api.example.com/community/posts/2/comments/20/votes" + f"?per_page=100&start_time={LOOKBACK_DATE}", + None, + ), + # Fetch the first page of votes for comment 21 of post 2 + ( + f"https://api.example.com/community/posts/2/comments/21/votes" + f"?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 210, "comment_id": 21, "created_at": VOTE_210_CREATED_AT}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100", + {"comments": [{"id": 30, "post_id": 3, "updated_at": COMMENT_30_UPDATED_AT}]}, + ), + # Fetch the first page of votes for comment 30 of post 3 + ( + f"https://api.example.com/community/posts/3/comments/30/votes" + f"?per_page=100&start_time={LOOKBACK_DATE}", + {"votes": [{"id": 300, "comment_id": 30, "created_at": VOTE_300_CREATED_AT}]}, + ), + ], + # Expected records + [ + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_100_CREATED_AT, + "id": 100, + }, + { + "comment_id": 10, + "comment_updated_at": COMMENT_10_UPDATED_AT, + "created_at": VOTE_101_CREATED_AT, + "id": 101, + }, + { + "comment_id": 11, + "comment_updated_at": COMMENT_11_UPDATED_AT, + "created_at": VOTE_111_CREATED_AT, + "id": 111, + }, + { + "comment_id": 21, + "comment_updated_at": COMMENT_21_UPDATED_AT, + "created_at": VOTE_210_CREATED_AT, + "id": 210, + }, + { + "comment_id": 30, + "comment_updated_at": COMMENT_30_UPDATED_AT, + "created_at": VOTE_300_CREATED_AT, + "id": 300, + }, + ], + # Initial state + { + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "lookback_window": 86400, + "states": [ + { + "partition": { + "id": 10, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_10_CURSOR}, + }, + { + "partition": { + "id": 11, + "parent_slice": {"id": 1, "parent_slice": {}}, + }, + "cursor": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + }, + ], + }, + # Expected state + { + # The global state, lookback window and the parent state are the same because sync failed for comment 20 + "parent_state": { + "post_comments": { + "states": [ + { + "partition": {"id": 1, "parent_slice": {}}, + "cursor": {"updated_at": PARENT_COMMENT_CURSOR_PARTITION_1}, + } + ], + "parent_state": {"posts": {"updated_at": PARENT_POSTS_CURSOR}}, + } + }, + "state": {"created_at": INITIAL_STATE_PARTITION_11_CURSOR}, + "lookback_window": 86400, + "states": [ + { + "partition": {"id": 10, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_100_CREATED_AT}, + }, + { + "partition": {"id": 11, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_111_CREATED_AT}, + }, + { + "partition": {"id": 12, "parent_slice": {"id": 1, "parent_slice": {}}}, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": {"id": 20, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": LOOKBACK_DATE}, + }, + { + "partition": {"id": 21, "parent_slice": {"id": 2, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_210_CREATED_AT}, + }, + { + "partition": {"id": 30, "parent_slice": {"id": 3, "parent_slice": {}}}, + "cursor": {"created_at": VOTE_300_CREATED_AT}, + }, + ], + }, + ), + ], +) +def test_incremental_substream_error( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + run_mocked_test( + mock_requests, + manifest, + CONFIG, + STREAM_NAME, + initial_state, + expected_records, + expected_state, + ) + + +LISTPARTITION_MANIFEST: MutableMapping[str, Any] = { + "version": "0.51.42", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["post_comments"]}, + "definitions": { + "basic_authenticator": { + "type": "BasicHttpAuthenticator", + "username": "{{ config['credentials']['email'] + '/token' }}", + "password": "{{ config['credentials']['api_token'] }}", + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": { + "type": "DefaultPaginator", + "page_size_option": { + "type": "RequestOption", + "field_name": "per_page", + "inject_into": "request_parameter", + }, + "pagination_strategy": { + "type": "CursorPagination", + "page_size": 100, + "cursor_value": "{{ response.get('next_page', {}) }}", + "stop_condition": "{{ not response.get('next_page', {}) }}", + }, + "page_token_option": {"type": "RequestPath"}, + }, + }, + "cursor_incremental_sync": { + "type": "DatetimeBasedCursor", + "cursor_datetime_formats": ["%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"], + "datetime_format": "%Y-%m-%dT%H:%M:%SZ", + "cursor_field": "{{ parameters.get('cursor_field', 'updated_at') }}", + "start_datetime": {"datetime": "{{ config.get('start_date')}}"}, + "start_time_option": { + "inject_into": "request_parameter", + "field_name": "start_time", + "type": "RequestOption", + }, + }, + "post_comments_stream": { + "type": "DeclarativeStream", + "name": "post_comments", + "primary_key": ["id"], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "updated_at": {"type": "string", "format": "date-time"}, + "post_id": {"type": "integer"}, + "comment": {"type": "string"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.example.com", + "path": "/community/posts/{{ stream_slice.id }}/comments", + "http_method": "GET", + "authenticator": "#/definitions/basic_authenticator", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "DpathExtractor", + "field_path": ["{{ parameters.get('data_path') or parameters['name'] }}"], + }, + "schema_normalization": "Default", + }, + "paginator": "#/definitions/retriever/paginator", + "partition_router": { + "type": "ListPartitionRouter", + "cursor_field": "id", + "values": ["1", "2", "3"], + }, + }, + "incremental_sync": { + "$ref": "#/definitions/cursor_incremental_sync", + "is_client_side_incremental": True, + }, + "$parameters": { + "name": "post_comments", + "path": "community/posts/{{ stream_slice.id }}/comments", + "data_path": "comments", + "cursor_field": "updated_at", + "primary_key": "id", + }, + }, + }, + "streams": [ + {"$ref": "#/definitions/post_comments_stream"}, + ], + "concurrency_level": { + "type": "ConcurrencyLevel", + "default_concurrency": "{{ config['num_workers'] or 10 }}", + "max_concurrency": 25, + }, + "spec": { + "type": "Spec", + "documentation_url": "https://airbyte.com/#yaml-from-manifest", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["credentials", "start_date"], + "additionalProperties": False, + "properties": { + "credentials": { + "type": "object", + "required": ["email", "api_token"], + "properties": { + "email": { + "type": "string", + "title": "Email", + "description": "The email for authentication.", + }, + "api_token": { + "type": "string", + "airbyte_secret": True, + "title": "API Token", + "description": "The API token for authentication.", + }, + }, + }, + "start_date": { + "type": "string", + "format": "date-time", + "title": "Start Date", + "description": "The date from which to start syncing data.", + }, + }, + }, + }, +} + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_parent_state", + LISTPARTITION_MANIFEST, + [ + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&start_time=2024-01-24T00:00:00Z", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2&start_time=2024-01-24T00:00:00Z", + }, + ), + # Fetch the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2&start_time=2024-01-24T00:00:00Z", + {"comments": [{"id": 12, "post_id": 1, "updated_at": "2024-01-23T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&start_time=2024-01-21T05:00:00Z", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2&start_time=2024-01-21T05:00:00Z", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2&start_time=2024-01-21T05:00:00Z", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100&start_time=2024-01-08T00:00:00Z", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + ], + # Expected records + [ + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"}, + {"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}, + ], + # Initial state + { + "state": {"updated_at": "2024-01-08T00:00:00Z"}, + "states": [ + { + "cursor": {"updated_at": "2024-01-24T00:00:00Z"}, + "partition": {"id": "1"}, + }, + { + "cursor": {"updated_at": "2024-01-21T05:00:00Z"}, + "partition": {"id": "2"}, + }, + ], + "use_global_cursor": False, + }, + # Expected state + { + "lookback_window": 1, + "state": {"updated_at": "2024-01-25T00:00:00Z"}, + "states": [ + {"cursor": {"updated_at": "2024-01-25T00:00:00Z"}, "partition": {"id": "1"}}, + {"cursor": {"updated_at": "2024-01-22T00:00:00Z"}, "partition": {"id": "2"}}, + {"cursor": {"updated_at": "2024-01-09T00:00:00Z"}, "partition": {"id": "3"}}, + ], + }, + ), + ], +) +def test_incremental_list_partition_router( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test ConcurrentPerPartitionCursor with ListPartitionRouter + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + "post_comments", + initial_state, + expected_records, + expected_state, + ) + + +@pytest.mark.parametrize( + "test_name, manifest, mock_requests, expected_records, initial_state, expected_state", + [ + ( + "test_incremental_error_handling", + LISTPARTITION_MANIFEST, + [ + # Fetch the first page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&start_time=2024-01-20T00:00:00Z", + { + "comments": [ + {"id": 9, "post_id": 1, "updated_at": "2023-01-01T00:00:00Z"}, + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + ], + "next_page": "https://api.example.com/community/posts/1/comments?per_page=100&page=2&start_time=2024-01-20T00:00:00Z", + }, + ), + # Error response for the second page of comments for post 1 + ( + "https://api.example.com/community/posts/1/comments?per_page=100&page=2&start_time=2024-01-20T00:00:00Z", + None, # Simulate a network error or an empty response + ), + # Fetch the first page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&start_time=2024-01-21T05:00:00Z", + { + "comments": [ + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"} + ], + "next_page": "https://api.example.com/community/posts/2/comments?per_page=100&page=2&start_time=2024-01-21T05:00:00Z", + }, + ), + # Fetch the second page of comments for post 2 + ( + "https://api.example.com/community/posts/2/comments?per_page=100&page=2&start_time=2024-01-21T05:00:00Z", + {"comments": [{"id": 21, "post_id": 2, "updated_at": "2024-01-21T00:00:00Z"}]}, + ), + # Fetch the first page of comments for post 3 + ( + "https://api.example.com/community/posts/3/comments?per_page=100&start_time=2024-01-08T00:00:00Z", + {"comments": [{"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}]}, + ), + ], + # Expected records + [ + {"id": 10, "post_id": 1, "updated_at": "2024-01-25T00:00:00Z"}, + {"id": 11, "post_id": 1, "updated_at": "2024-01-24T00:00:00Z"}, + {"id": 20, "post_id": 2, "updated_at": "2024-01-22T00:00:00Z"}, + {"id": 30, "post_id": 3, "updated_at": "2024-01-09T00:00:00Z"}, + ], + # Initial state + { + "state": {"updated_at": "2024-01-08T00:00:00Z"}, + "states": [ + { + "cursor": {"updated_at": "2024-01-20T00:00:00Z"}, + "partition": {"id": "1"}, + }, + { + "cursor": {"updated_at": "2024-01-21T05:00:00Z"}, + "partition": {"id": "2"}, + }, + ], + "use_global_cursor": False, + }, + # Expected state + { + "lookback_window": 0, + "state": {"updated_at": "2024-01-08T00:00:00Z"}, + "states": [ + {"cursor": {"updated_at": "2024-01-20T00:00:00Z"}, "partition": {"id": "1"}}, + {"cursor": {"updated_at": "2024-01-22T00:00:00Z"}, "partition": {"id": "2"}}, + {"cursor": {"updated_at": "2024-01-09T00:00:00Z"}, "partition": {"id": "3"}}, + ], + }, + ), + ], +) +def test_incremental_error( + test_name, manifest, mock_requests, expected_records, initial_state, expected_state +): + """ + Test with failed request. + """ + run_mocked_test( + mock_requests, + manifest, + CONFIG, + "post_comments", + initial_state, + expected_records, + expected_state, + ) diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 7bfdc0379..12edb32b1 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -1263,7 +1263,7 @@ def test_client_side_incremental_with_partition_router(): stream.retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator ) assert isinstance( - stream.retriever.record_selector.record_filter._substream_cursor, + stream.retriever.record_selector.record_filter._cursor, PerPartitionWithGlobalCursor, ) diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 5878c758f..b33febcaf 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -491,8 +491,8 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): paginator.get_request_headers.return_value = paginator_mapping requester = MagicMock(use_cache=False) - stream_slicer = MagicMock() - stream_slicer.get_request_headers.return_value = {"key": "value"} + request_option_provider = MagicMock() + request_option_provider.get_request_headers.return_value = {"key": "value"} record_selector = MagicMock() retriever = SimpleRetriever( @@ -500,7 +500,7 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): primary_key=primary_key, requester=requester, record_selector=record_selector, - stream_slicer=stream_slicer, + request_option_provider=request_option_provider, paginator=paginator, parameters={}, config={}, @@ -555,7 +555,7 @@ def test_get_request_headers(test_name, paginator_mapping, expected_mapping): ), ], ) -def test_ignore_stream_slicer_parameters_on_paginated_requests( +def test_ignore_request_option_provider_parameters_on_paginated_requests( test_name, paginator_mapping, ignore_stream_slicer_parameters_on_paginated_requests, @@ -567,8 +567,8 @@ def test_ignore_stream_slicer_parameters_on_paginated_requests( paginator.get_request_headers.return_value = paginator_mapping requester = MagicMock(use_cache=False) - stream_slicer = MagicMock() - stream_slicer.get_request_headers.return_value = {"key_from_slicer": "value"} + request_option_provider = MagicMock() + request_option_provider.get_request_headers.return_value = {"key_from_slicer": "value"} record_selector = MagicMock() retriever = SimpleRetriever( @@ -576,7 +576,7 @@ def test_ignore_stream_slicer_parameters_on_paginated_requests( primary_key=primary_key, requester=requester, record_selector=record_selector, - stream_slicer=stream_slicer, + request_option_provider=request_option_provider, paginator=paginator, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, parameters={}, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 3b5dd50c9..ce88804c4 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -651,8 +651,8 @@ def test_group_streams(): concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental - # 1 async job stream - assert len(concurrent_streams) == 6 + # 1 async job stream, 1 substream w/ incremental + assert len(concurrent_streams) == 7 ( concurrent_stream_0, concurrent_stream_1, @@ -660,6 +660,7 @@ def test_group_streams(): concurrent_stream_3, concurrent_stream_4, concurrent_stream_5, + concurrent_stream_6, ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" @@ -672,12 +673,9 @@ def test_group_streams(): assert isinstance(concurrent_stream_4, DefaultStream) assert concurrent_stream_4.name == "arcana_personas" assert isinstance(concurrent_stream_5, DefaultStream) - assert concurrent_stream_5.name == "async_job_stream" - - # 1 substream w/ incremental, 1 stream with async retriever - assert len(synchronous_streams) == 1 - assert isinstance(synchronous_streams[0], DeclarativeStream) - assert synchronous_streams[0].name == "palace_enemies" + assert concurrent_stream_5.name == "palace_enemies" + assert isinstance(concurrent_stream_6, DefaultStream) + assert concurrent_stream_6.name == "async_job_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -740,7 +738,7 @@ def test_create_concurrent_cursor(): assert locations_cursor._slice_range == isodate.Duration(months=1) assert locations_cursor._lookback_window == timedelta(days=5) assert locations_cursor._cursor_granularity == timedelta(days=1) - assert locations_cursor.state == { + assert locations_cursor._concurrent_state == { "slices": [ { "start": datetime(2024, 7, 1, 0, 0, 0, 0, tzinfo=timezone.utc), @@ -1458,13 +1456,13 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous(): ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - # 1 full refresh stream, 2 with parent stream without incremental dependency, 1 stream with async retriever - assert len(concurrent_streams) == 4 - # 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies) - assert len(synchronous_streams) == 3 + # 1 full refresh stream, 2 with parent stream without incremental dependency, 1 stream with async retriever, 1 incremental with parent stream (palace_enemies) + assert len(concurrent_streams) == 5 + # 2 incremental stream with interpolation on state (locations and party_members) + assert len(synchronous_streams) == 2 -def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurrent(): +def test_given_partition_routing_and_incremental_sync_then_stream_is_concurrent(): manifest = { "version": "5.0.0", "definitions": { @@ -1599,8 +1597,8 @@ def test_given_partition_routing_and_incremental_sync_then_stream_is_not_concurr ) concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - assert len(concurrent_streams) == 0 - assert len(synchronous_streams) == 1 + assert len(concurrent_streams) == 1 + assert len(synchronous_streams) == 0 def create_wrapped_stream(stream: DeclarativeStream) -> Stream: