From fb21cd407334c9e95fce843771690a46df4f9976 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Wed, 18 Dec 2024 16:22:53 +0200 Subject: [PATCH] added unit tests --- .../test_http_components_resolver.py | 217 +++++++++++++++++- 1 file changed, 216 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 0a9c12a5..5582d517 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -197,7 +197,131 @@ ], } - +_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "parent/{{ stream_partition.parent_id }}/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "type": "ParentStreamConfig", + "parent_key": "id", + "partition_field": "parent_id", + "stream": { + "type": "DeclarativeStream", + "name": "parent", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"} + }, + "type": "object", + }, + }, + } + } + ] + } + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "path", + ], + "value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}", + }, + ], + }, + } + ], +} @pytest.mark.parametrize( "components_mapping, retriever_data, stream_template_config, expected_result", [ @@ -234,6 +358,41 @@ def test_http_components_resolver( result = list(resolver.resolve_components(stream_template_config=stream_template_config)) assert result == expected_result +@pytest.mark.parametrize( + "components_mapping, retriever_data, stream_template_config, expected_result", + [ + ( + [ + ComponentMappingDefinition( + field_path=[InterpolatedString.create("path", parameters={})], + value="{{stream_slice['parent_id']}}/{{components_values['id']}}", + value_type=str, + parameters={}, + ) + ], + [{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}], + {"path": None}, + [{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}], + ) + ], +) +def test_http_components_resolver_with_stream_slices( + components_mapping, retriever_data, stream_template_config, expected_result +): + mock_retriever = MagicMock() + mock_retriever.read_records.return_value = retriever_data + mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}] + config = {} + + resolver = HttpComponentsResolver( + retriever=mock_retriever, + config=config, + components_mapping=components_mapping, + parameters={}, + ) + + result = list(resolver.resolve_components(stream_template_config=stream_template_config)) + assert result == expected_result def test_dynamic_streams_read_with_http_components_resolver(): expected_stream_names = ["item_1", "item_2"] @@ -306,3 +465,59 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver(): str(exc_info.value) == "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support." ) + +def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream(): + expected_stream_names = [ + "parent_1_item_1", "parent_1_item_2", "parent_2_item_1", "parent_2_item_2" + ] + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps([{"id": 1}, {"id": 2}]) + ), + ) + parent_ids = [1, 2] + for parent_id in parent_ids: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, + ] + ) + ), + ) + dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"] + for dynamic_stream_path in dynamic_stream_paths: + http_mocker.get( + HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"), + HttpResponse( + body=json.dumps([{"ABC": 1, "AED": 2}]) + ), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == 4 + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 4 + assert [record.stream for record in records] == expected_stream_names \ No newline at end of file