Skip to content

Commit

Permalink
added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Dec 18, 2024
1 parent 18f948d commit fb21cd4
Showing 1 changed file with 216 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,131 @@
],
}


_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"dynamic_streams": [
{
"type": "DynamicDeclarativeStream",
"stream_template": {
"type": "DeclarativeStream",
"name": "",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
},
},
"components_resolver": {
"type": "HttpComponentsResolver",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "parent/{{ stream_partition.parent_id }}/items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"paginator": {"type": "NoPagination"},
"partition_router": {
"type": "SubstreamPartitionRouter",
"parent_stream_configs": [
{
"type": "ParentStreamConfig",
"parent_key": "id",
"partition_field": "parent_id",
"stream": {
"type": "DeclarativeStream",
"name": "parent",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "/parents",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
},
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"id": {"type": "integer"}
},
"type": "object",
},
},
}
}
]
}
},
"components_mapping": [
{
"type": "ComponentMappingDefinition",
"field_path": ["name"],
"value": "parent_{{stream_slice['parent_id']}}_{{components_values['name']}}",
},
{
"type": "ComponentMappingDefinition",
"field_path": [
"retriever",
"requester",
"path",
],
"value": "{{ stream_slice['parent_id'] }}/{{ components_values['id'] }}",
},
],
},
}
],
}
@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
[
Expand Down Expand Up @@ -234,6 +358,41 @@ def test_http_components_resolver(
result = list(resolver.resolve_components(stream_template_config=stream_template_config))
assert result == expected_result

@pytest.mark.parametrize(
"components_mapping, retriever_data, stream_template_config, expected_result",
[
(
[
ComponentMappingDefinition(
field_path=[InterpolatedString.create("path", parameters={})],
value="{{stream_slice['parent_id']}}/{{components_values['id']}}",
value_type=str,
parameters={},
)
],
[{"id": "1", "field1": "data1"}, {"id": "2", "field1": "data2"}],
{"path": None},
[{"path": "1/1"}, {"path": "1/2"}, {"path": "2/1"}, {"path": "2/2"}],
)
],
)
def test_http_components_resolver_with_stream_slices(
components_mapping, retriever_data, stream_template_config, expected_result
):
mock_retriever = MagicMock()
mock_retriever.read_records.return_value = retriever_data
mock_retriever.stream_slices.return_value = [{"parent_id": 1}, {"parent_id": 2}]
config = {}

resolver = HttpComponentsResolver(
retriever=mock_retriever,
config=config,
components_mapping=components_mapping,
parameters={},
)

result = list(resolver.resolve_components(stream_template_config=stream_template_config))
assert result == expected_result

def test_dynamic_streams_read_with_http_components_resolver():
expected_stream_names = ["item_1", "item_2"]
Expand Down Expand Up @@ -306,3 +465,59 @@ def test_duplicated_dynamic_streams_read_with_http_components_resolver():
str(exc_info.value)
== "Dynamic streams list contains a duplicate name: item_2. Please contact Airbyte Support."
)

def test_dynamic_streams_with_http_components_resolver_retriever_with_parent_stream():
expected_stream_names = [
"parent_1_item_1", "parent_1_item_2", "parent_2_item_1", "parent_2_item_2"
]
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/parents"),
HttpResponse(
body=json.dumps([{"id": 1}, {"id": 2}])
),
)
parent_ids = [1, 2]
for parent_id in parent_ids:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/parent/{parent_id}/items"),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1"},
{"id": 2, "name": "item_2"},
]
)
),
)
dynamic_stream_paths = ["1/1", "2/1", "1/2", "2/2"]
for dynamic_stream_path in dynamic_stream_paths:
http_mocker.get(
HttpRequest(url=f"https://api.test.com/{dynamic_stream_path}"),
HttpResponse(
body=json.dumps([{"ABC": 1, "AED": 2}])
),
)

source = ConcurrentDeclarativeSource(
source_config=_MANIFEST_WITH_HTTP_COMPONENT_RESOLVER_WITH_RETRIEVER_WITH_PARENT_STREAM, config=_CONFIG, catalog=None, state=None
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

configured_streams = [
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
for stream in actual_catalog.streams
]
configured_catalog = to_configured_catalog(configured_streams)

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
if message.type == Type.RECORD
]

assert len(actual_catalog.streams) == 4
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
assert len(records) == 4
assert [record.stream for record in records] == expected_stream_names

0 comments on commit fb21cd4

Please sign in to comment.