diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 40fd23ed..85bce965 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -200,7 +200,11 @@ def _group_streams( # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, # so we need to treat them as synchronous - if isinstance(declarative_stream, DeclarativeStream): + if ( + isinstance(declarative_stream, DeclarativeStream) + and name_to_stream_mapping[declarative_stream.name].get("retriever")["type"] + == "SimpleRetriever" + ): incremental_sync_component_definition = name_to_stream_mapping[ declarative_stream.name ].get("incremental_sync") @@ -210,36 +214,30 @@ def _group_streams( .get("retriever") .get("partition_router") ) + is_without_partition_router_or_cursor = not bool( + incremental_sync_component_definition + ) and not bool(partition_router_component_definition) is_substream_without_incremental = ( partition_router_component_definition and not incremental_sync_component_definition ) - if ( - incremental_sync_component_definition - and incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ - and self._stream_supports_concurrent_partition_processing( - declarative_stream=declarative_stream - ) - and hasattr(declarative_stream.retriever, "stream_slicer") - and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + if self._is_datetime_incremental_without_partition_routing( + declarative_stream, incremental_sync_component_definition ): stream_state = state_manager.get_stream_state( stream_name=declarative_stream.name, namespace=declarative_stream.namespace ) - cursor, connector_state_converter = ( - self._constructor.create_concurrent_cursor_from_datetime_based_cursor( - state_manager=state_manager, - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - stream_state=stream_state, - ) + cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( + state_manager=state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=incremental_sync_component_definition, + stream_name=declarative_stream.name, + stream_namespace=declarative_stream.namespace, + config=config or {}, + stream_state=stream_state, ) partition_generator = StreamSlicerPartitionGenerator( @@ -263,14 +261,19 @@ def _group_streams( json_schema=declarative_stream.get_json_schema(), availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), - cursor_field=cursor.cursor_field.cursor_field_key, + cursor_field=cursor.cursor_field.cursor_field_key + if hasattr(cursor, "cursor_field") + and hasattr( + cursor.cursor_field, "cursor_field_key" + ) # FIXME this will need to be updated once we do the per partition + else None, logger=self.logger, cursor=cursor, ) ) - elif is_substream_without_incremental and hasattr( - declarative_stream.retriever, "stream_slicer" - ): + elif ( + is_substream_without_incremental or is_without_partition_router_or_cursor + ) and hasattr(declarative_stream.retriever, "stream_slicer"): partition_generator = StreamSlicerPartitionGenerator( DeclarativePartitionFactory( declarative_stream.name, @@ -310,6 +313,22 @@ def _group_streams( return concurrent_streams, synchronous_streams + def _is_datetime_incremental_without_partition_routing( + self, + declarative_stream: DeclarativeStream, + incremental_sync_component_definition: Mapping[str, Any], + ) -> bool: + return ( + bool(incremental_sync_component_definition) + and incremental_sync_component_definition.get("type", "") + == DatetimeBasedCursorModel.__name__ + and self._stream_supports_concurrent_partition_processing( + declarative_stream=declarative_stream + ) + and hasattr(declarative_stream.retriever, "stream_slicer") + and isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) + ) + def _stream_supports_concurrent_partition_processing( self, declarative_stream: DeclarativeStream ) -> bool: diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 49e047d3..1712cb67 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -17,7 +17,6 @@ Mapping, MutableMapping, Optional, - Tuple, Type, Union, get_args, @@ -760,7 +759,7 @@ def create_concurrent_cursor_from_datetime_based_cursor( config: Config, stream_state: MutableMapping[str, Any], **kwargs: Any, - ) -> Tuple[ConcurrentCursor, DateTimeStreamStateConverter]: + ) -> ConcurrentCursor: component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: raise ValueError( @@ -891,23 +890,20 @@ def create_concurrent_cursor_from_datetime_based_cursor( if evaluated_step: step_length = parse_duration(evaluated_step) - return ( - ConcurrentCursor( - stream_name=stream_name, - stream_namespace=stream_namespace, - stream_state=stream_state, - message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory - connector_state_manager=state_manager, - connector_state_converter=connector_state_converter, - cursor_field=cursor_field, - slice_boundary_fields=slice_boundary_fields, - start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice - lookback_window=lookback_window, - slice_range=step_length, - cursor_granularity=cursor_granularity, - ), - connector_state_converter, + return ConcurrentCursor( + stream_name=stream_name, + stream_namespace=stream_namespace, + stream_state=stream_state, + message_repository=self._message_repository, # type: ignore # message_repository is always instantiated with a value by factory + connector_state_manager=state_manager, + connector_state_converter=connector_state_converter, + cursor_field=cursor_field, + slice_boundary_fields=slice_boundary_fields, + start=start_date, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + end_provider=end_date_provider, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice + lookback_window=lookback_window, + slice_range=step_length, + cursor_granularity=cursor_granularity, ) @staticmethod diff --git a/airbyte_cdk/sources/streams/concurrent/helpers.py b/airbyte_cdk/sources/streams/concurrent/helpers.py index d839068a..5e2edf05 100644 --- a/airbyte_cdk/sources/streams/concurrent/helpers.py +++ b/airbyte_cdk/sources/streams/concurrent/helpers.py @@ -13,8 +13,15 @@ def get_primary_key_from_stream( elif isinstance(stream_primary_key, str): return [stream_primary_key] elif isinstance(stream_primary_key, list): - if len(stream_primary_key) > 0 and all(isinstance(k, str) for k in stream_primary_key): + are_all_elements_str = all(isinstance(k, str) for k in stream_primary_key) + are_all_elements_list_of_size_one = all( + isinstance(k, list) and len(k) == 1 for k in stream_primary_key + ) + + if are_all_elements_str: return stream_primary_key # type: ignore # We verified all items in the list are strings + elif are_all_elements_list_of_size_one: + return list(map(lambda x: x[0], stream_primary_key)) else: raise ValueError(f"Nested primary keys are not supported. Found {stream_primary_key}") else: diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index e68e083e..e849af85 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -3062,7 +3062,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( "lookback_window": "P3D", } - concurrent_cursor, stream_state_converter = ( + concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, @@ -3094,6 +3094,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor_all_fields( assert concurrent_cursor._end_provider() == expected_end assert concurrent_cursor._concurrent_state == expected_concurrent_state + stream_state_converter = concurrent_cursor._connector_state_converter assert isinstance(stream_state_converter, CustomFormatConcurrentStreamStateConverter) assert stream_state_converter._datetime_format == expected_datetime_format assert stream_state_converter._is_sequential_state @@ -3194,7 +3195,7 @@ def test_create_concurrent_cursor_from_datetime_based_cursor( stream_state={}, ) else: - concurrent_cursor, stream_state_converter = ( + concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, @@ -3251,7 +3252,7 @@ def test_create_concurrent_cursor_uses_min_max_datetime_format_if_defined(): "lookback_window": "P3D", } - concurrent_cursor, stream_state_converter = ( + concurrent_cursor = ( connector_builder_factory.create_concurrent_cursor_from_datetime_based_cursor( state_manager=connector_state_manager, model_type=DatetimeBasedCursorModel, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 70a01a2c..dfaca8ca 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -276,6 +276,69 @@ }, }, }, + "async_job_stream": { + "$ref": "#/definitions/base_stream", + "$parameters": { + "name": "async_job_stream", + "primary_key": "id", + "url_base": "https://persona.metaverse.com", + }, + "retriever": { + "type": "AsyncRetriever", + "status_mapping": { + "failed": ["failed"], + "running": ["pending"], + "timeout": ["timeout"], + "completed": ["ready"], + }, + "urls_extractor": {"type": "DpathExtractor", "field_path": ["urls"]}, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "status_extractor": {"type": "DpathExtractor", "field_path": ["status"]}, + "polling_requester": { + "type": "HttpRequester", + "path": "/async_job/{{stream_slice['create_job_response'].json()['id'] }}", + "http_method": "GET", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "creation_requester": { + "type": "HttpRequester", + "path": "async_job", + "http_method": "POST", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config['api_key'] }}", + }, + }, + "download_requester": { + "type": "HttpRequester", + "path": "{{stream_slice['url']}}", + "http_method": "GET", + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "description": "The identifier", + "type": ["null", "string"], + }, + "name": { + "description": "The name of the metaverse palace", + "type": ["null", "string"], + }, + }, + }, + }, + }, "locations_stream": { "$ref": "#/definitions/base_incremental_stream", "retriever": { @@ -463,6 +526,7 @@ "#/definitions/party_members_skills_stream", "#/definitions/arcana_personas_stream", "#/definitions/palace_enemies_stream", + "#/definitions/async_job_stream", ], "check": {"stream_names": ["party_members", "locations"]}, "concurrency_level": { @@ -586,27 +650,32 @@ def test_group_streams(): concurrent_streams = source._concurrent_streams synchronous_streams = source._synchronous_streams - # 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental - assert len(concurrent_streams) == 4 - concurrent_stream_0, concurrent_stream_1, concurrent_stream_2, concurrent_stream_3 = ( - concurrent_streams - ) + # 1 full refresh stream, 2 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental + assert len(concurrent_streams) == 5 + ( + concurrent_stream_0, + concurrent_stream_1, + concurrent_stream_2, + concurrent_stream_3, + concurrent_stream_4, + ) = concurrent_streams assert isinstance(concurrent_stream_0, DefaultStream) assert concurrent_stream_0.name == "party_members" assert isinstance(concurrent_stream_1, DefaultStream) - assert concurrent_stream_1.name == "locations" + assert concurrent_stream_1.name == "palaces" assert isinstance(concurrent_stream_2, DefaultStream) - assert concurrent_stream_2.name == "party_members_skills" + assert concurrent_stream_2.name == "locations" assert isinstance(concurrent_stream_3, DefaultStream) - assert concurrent_stream_3.name == "arcana_personas" + assert concurrent_stream_3.name == "party_members_skills" + assert isinstance(concurrent_stream_4, DefaultStream) + assert concurrent_stream_4.name == "arcana_personas" - # 1 full refresh stream, 1 substream w/ incremental + # 1 substream w/ incremental, 1 stream with async retriever assert len(synchronous_streams) == 2 - synchronous_stream_0, synchronous_stream_1 = synchronous_streams - assert isinstance(synchronous_stream_0, DeclarativeStream) - assert synchronous_stream_0.name == "palaces" - assert isinstance(synchronous_stream_1, DeclarativeStream) - assert synchronous_stream_1.name == "palace_enemies" + assert isinstance(synchronous_streams[0], DeclarativeStream) + assert synchronous_streams[0].name == "palace_enemies" + assert isinstance(synchronous_streams[1], DeclarativeStream) + assert synchronous_streams[1].name == "async_job_stream" @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) @@ -653,7 +722,7 @@ def test_create_concurrent_cursor(): assert party_members_cursor._lookback_window == timedelta(days=5) assert party_members_cursor._cursor_granularity == timedelta(days=1) - locations_stream = source._concurrent_streams[1] + locations_stream = source._concurrent_streams[2] assert isinstance(locations_stream, DefaultStream) locations_cursor = locations_stream.cursor @@ -722,14 +791,15 @@ def test_discover(): """ Verifies that the ConcurrentDeclarativeSource discover command returns concurrent and synchronous catalog definitions """ - expected_stream_names = [ + expected_stream_names = { "party_members", "palaces", "locations", "party_members_skills", "arcana_personas", "palace_enemies", - ] + "async_job_stream", + } source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None @@ -737,13 +807,7 @@ def test_discover(): actual_catalog = source.discover(logger=source.logger, config=_CONFIG) - assert len(actual_catalog.streams) == 6 - assert actual_catalog.streams[0].name in expected_stream_names - assert actual_catalog.streams[1].name in expected_stream_names - assert actual_catalog.streams[2].name in expected_stream_names - assert actual_catalog.streams[3].name in expected_stream_names - assert actual_catalog.streams[4].name in expected_stream_names - assert actual_catalog.streams[5].name in expected_stream_names + assert set(map(lambda stream: stream.name, actual_catalog.streams)) == expected_stream_names def _mock_requests( @@ -1366,7 +1430,9 @@ def test_streams_with_stream_state_interpolation_should_be_synchronous(): state=None, ) - assert len(source._concurrent_streams) == 2 + # 1 full refresh stream, 2 with parent stream without incremental dependency + assert len(source._concurrent_streams) == 3 + # 2 incremental stream with interpolation on state (locations and party_members), 1 incremental with parent stream (palace_enemies), 1 stream with async retriever assert len(source._synchronous_streams) == 4 @@ -1662,6 +1728,6 @@ def get_states_for_stream( def disable_emitting_sequential_state_messages(source: ConcurrentDeclarativeSource) -> None: - for concurrent_streams in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test - if isinstance(concurrent_streams.cursor, ConcurrentCursor): - concurrent_streams.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above + for concurrent_stream in source._concurrent_streams: # type: ignore # This is the easiest way to disable behavior from the test + if isinstance(concurrent_stream.cursor, ConcurrentCursor): + concurrent_stream.cursor._connector_state_converter._is_sequential_state = False # type: ignore # see above diff --git a/unit_tests/sources/streams/concurrent/test_helpers.py b/unit_tests/sources/streams/concurrent/test_helpers.py new file mode 100644 index 00000000..8f067eda --- /dev/null +++ b/unit_tests/sources/streams/concurrent/test_helpers.py @@ -0,0 +1,18 @@ +import pytest + +from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream + + +def test_given_primary_key_is_list_of_strings_when_get_primary_key_from_stream_then_assume_it_is_composite_key_and_return_as_is(): + result = get_primary_key_from_stream(["composite_id_1", "composite_id_2"]) + assert result == ["composite_id_1", "composite_id_2"] + + +def test_given_primary_key_is_composite_in_nested_lists_when_get_primary_key_from_stream_then_flatten_lists(): + result = get_primary_key_from_stream([["composite_id_1"], ["composite_id_2"]]) + assert result == ["composite_id_1", "composite_id_2"] + + +def test_given_nested_key_when_get_primary_key_from_stream_then_raise_error(): + with pytest.raises(ValueError): + get_primary_key_from_stream([["composite_id_1", "composite_id_2"]])