Skip to content

Commit

Permalink
Ability to retrieve paginated analysis results (#198)
Browse files Browse the repository at this point in the history
* Ability to retrieve paginated analysis results

* Review comments

Co-Authored-By: Mohsen Nosratinia <mohsen.nosratinia@vikinganalytics.se>

* Bump API version

---------

Co-authored-by: Mohsen Nosratinia <mohsen.nosratinia@vikinganalytics.se>
  • Loading branch information
vnadhan and tuix authored Feb 10, 2023
1 parent 8c46cc5 commit 1b493a2
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 7 deletions.
33 changes: 27 additions & 6 deletions mvg/mvg.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import semver

from mvg.exceptions import MVGConnectionError
from mvg.utils.response_processing import SortOrder, get_paginated_items
from mvg.utils.response_processing import (
SortOrder,
get_paginated_analysis_results,
get_paginated_items,
)
from mvg.http_client import HTTPClient

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,8 +61,8 @@ def __init__(self, endpoint: str, token: str):
self.endpoint = endpoint
self.token = token

self.mvg_version = self.parse_version("v0.14.3")
self.tested_api_version = self.parse_version("v0.5.2")
self.mvg_version = self.parse_version("v0.14.4")
self.tested_api_version = self.parse_version("v0.5.3")

# Get API version
try:
Expand Down Expand Up @@ -1040,14 +1044,25 @@ def get_analysis_status(self, request_id: str) -> str:

return response.json()["request_status"]

def get_analysis_results(self, request_id: str) -> dict:
def get_analysis_results(
self,
request_id: str,
offset: int = None,
limit: int = None,
) -> dict:
"""Retrieves an analysis with given request_id
The format of the result structure depends on the feature.
Parameters
----------
request_id : str
request_id (analysis identifier)
offset: int
zero-based index of an item in one of the dictionaries
in the data for "results" key [optional].
limit: int
maximum number of items to be returned from each
dictionary in the data for "results" key [optional].
Returns
-------
Expand All @@ -1058,9 +1073,15 @@ def get_analysis_results(self, request_id: str) -> dict:
logger.info("endpoint %s", self.endpoint)
logger.info("get analysis results with request_id=%s", request_id)

response = self._request("get", f"/analyses/requests/{request_id}/results")
url = f"/analyses/requests/{request_id}/results"
params = {}
if offset is not None:
params["offset"] = offset
if limit is not None:
params["limit"] = limit

return response.json()
response = get_paginated_analysis_results(self._request, url, params)
return response

def delete_analysis(self, request_id: str):
"""Deletes an analysis.
Expand Down
53 changes: 53 additions & 0 deletions mvg/utils/response_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,56 @@ def get_paginated_items(request: Callable, url: str, params: Dict) -> Dict:
response = request("get", url, params=params)
all_items += response.json()["items"]
return {"items": all_items, "total": num_items}


def get_paginated_analysis_results(request: Callable, url: str, params: Dict) -> Dict:
"""
Iteratively construct the results of an analysis making use of the
pagination parameters provided in the params dictionary
Parameters
----------
request : Callable
the request object
url : str
the URL to retrieve the analysis results
params : Dict
parameters dict for the request object
Returns
-------
Dict
Analysis results
"""
response = request("get", url, params=params).json()

if "limit" not in params and "offset" not in params:
# User has not requested a subset of results
results = response["results"]

# Pagination Model fields
paginator_model_fields = ["total", "limit", "offset"]

# Does the analysis results include pagination?
# We remove this check when the backend has enabled pagination for
# the results of all analysis
is_paginated = all(f in results for f in paginator_model_fields)
if is_paginated:
# Fields with paginated data
paginated_fields = ["timestamps", "labels", "uncertain", "mode_probability"]

num_items = results["total"]
limit = results["limit"]
num_reqs = (num_items - 1) // limit
for idx in range(1, num_reqs + 1):
offset = idx * limit
params["offset"] = offset
_results = request("get", url, params=params).json()["results"]
for key in _results:
if key in paginated_fields:
results[key] += _results[key]

# Construct the response to return
response["results"] = results

return response
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def fixture(session):
source_id, meta={"type": "pump"}, channels=list(pattern.keys())
)
upload_measurements(session, source_id, data)
yield source_id, timestamps
yield source_id, {"timestamps": timestamps, "pattern": pattern}
finally:
session.delete_source(source_id)

Expand Down
70 changes: 70 additions & 0 deletions tests/test_api_analyses.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,73 @@ def test_delete_analysis(session, waveform_source_multiaxial_001):
with pytest.raises(MVGAPIError) as exc:
session.get_analysis_results(request_id)
assert f"Request with ID {request_id} does not exist" in str(exc)


def test_get_analysis_results_modeid_paginated(
session: MVG, waveform_source_multiaxial_001
):
source_id, source_info = waveform_source_multiaxial_001
timestamps = source_info["timestamps"]
pattern = source_info["pattern"]["acc_x"]

# ModeId
parameters = {"n_trials": 1}
request = session.request_analysis(source_id, "ModeId", parameters=parameters)
request_id = request["request_id"]
response = session.get_analysis_results(request_id)
assert response["results"] == {}
session.wait_for_analyses([request_id])

# Request for entire results
response = session.get_analysis_results(request_id)
assert timestamps == response["results"]["timestamps"]
assert pattern == response["results"]["labels"]

# Request for a subset of results with a limit
result_size = 100
response = session.get_analysis_results(request_id, limit=result_size)
assert timestamps[:result_size] == response["results"]["timestamps"]
assert pattern[:result_size] == response["results"]["labels"]

# Request for a subset of results with a limit and an offset
offset = 10
result_size = 100
response = session.get_analysis_results(
request_id, offset=offset, limit=result_size
)
assert timestamps[offset:result_size] == response["results"]["timestamps"]
assert pattern[offset:result_size] == response["results"]["labels"]

# Request for a subset of results with an offset
offset = 10
response = session.get_analysis_results(request_id, offset=offset)
assert timestamps[offset:] == response["results"]["timestamps"]
assert pattern[offset:] == response["results"]["labels"]

# Delete analysis
session.delete_analysis(request_id)


def test_get_analysis_results_kpidemo_paginated(
session: MVG, waveform_source_multiaxial_001
):
source_id, source_info = waveform_source_multiaxial_001
timestamps = source_info["timestamps"]

request = session.request_analysis(source_id, "KPIDemo")
request_id = request["request_id"]
response = session.get_analysis_results(request_id)
assert response["results"] == {}
session.wait_for_analyses([request_id])

# Request for entire results
response = session.get_analysis_results(request_id)
assert timestamps == response["results"]["timestamps"]

# Request for a subset of results, but still returns entire results
result_size = 10
response = session.get_analysis_results(request_id, limit=result_size)
assert timestamps == response["results"]["timestamps"]

# Delete analysis
session.delete_analysis(request_id)

0 comments on commit 1b493a2

Please sign in to comment.