diff --git a/services/ui_backend_service/api/card.py b/services/ui_backend_service/api/card.py index 120652c8..3a056ee4 100644 --- a/services/ui_backend_service/api/card.py +++ b/services/ui_backend_service/api/card.py @@ -2,7 +2,14 @@ from services.data.db_utils import translate_run_key, translate_task_key from services.ui_backend_service.data import unpack_processed_value from services.utils import handle_exceptions -from .utils import format_response_list, get_pathspec_from_request, query_param_enabled, web_response, DBPagination, DBResponse +from .utils import ( + format_response_list, + get_pathspec_from_request, + query_param_enabled, + web_response, + DBPagination, + DBResponse, +) from aiohttp import web @@ -10,6 +17,7 @@ class CardsApi(object): def __init__(self, app, db, cache=None): self.db = db self.cache = getattr(cache, "artifact_cache", None) + app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards", @@ -20,10 +28,14 @@ def __init__(self, app, db, cache=None): "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}", self.get_card_content_by_hash, ) + app.router.add_route( + "GET", + "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}/data", + self.get_card_data_by_hash, + ) - async def get_task_by_request(self, request): - flow_id, run_number, step_name, task_id, _ = \ - get_pathspec_from_request(request) + async def get_task_by_request(self, request, with_status=False): + flow_id, run_number, step_name, task_id, _ = get_pathspec_from_request(request) run_id_key, run_id_value = translate_run_key(run_number) task_id_key, task_id_value = translate_task_key(task_id) @@ -32,14 +44,15 @@ async def get_task_by_request(self, request): "flow_id = %s", "{run_id_key} = %s".format(run_id_key=run_id_key), "step_name = %s", - "{task_id_key} = %s".format(task_id_key=task_id_key) + "{task_id_key} = %s".format(task_id_key=task_id_key), ] values = [flow_id, run_id_value, step_name, task_id_value] db_response, *_ = await self.db.task_table_postgres.find_records( fetch_single=True, conditions=conditions, values=values, - expanded=True + expanded=True, + enable_joins=with_status, ) if db_response.response_code == 200: return db_response.body @@ -77,7 +90,7 @@ async def get_cards_list_for_task(self, request): task = await self.get_task_by_request(request) if not task: - return web_response(404, {'data': []}) + return web_response(404, {"data": []}) invalidate_cache = query_param_enabled(request, "invalidate") cards = await get_cards_for_task(self.cache, task, invalidate_cache) @@ -86,7 +99,7 @@ async def get_cards_list_for_task(self, request): # Handle edge: Cache failed to return anything, even errors. # NOTE: choice of status 200 here is quite arbitrary, as the cache returning None is usually # caused by a premature request, and cards are not permanently missing. - return web_response(200, {'data': []}) + return web_response(200, {"data": []}) card_hashes = [ {"id": data["id"], "hash": hash, "type": data["type"]} @@ -131,22 +144,125 @@ async def get_card_content_by_hash(self, request): """ hash = request.match_info.get("hash") - task = await self.get_task_by_request(request) + task = await self.get_task_by_request(request, with_status=True) if not task: - return web.Response(content_type="text/html", status=404, body="Task not found.") + return web.Response( + content_type="text/html", status=404, body="Task not found." + ) + invalidate_cache = query_param_enabled(request, "invalidate") + # Invalidate cache until the task completes because the card + # itself may keep getting updated + task_status = task.get("status") + task_has_not_finished = task_status in ["pending", "running", "unknown"] - cards = await get_cards_for_task(self.cache, task) + cards = await get_cards_for_task( + self.cache, task, invalidate_cache=task_has_not_finished or invalidate_cache + ) if cards is None: - return web.Response(content_type="text/html", status=404, body="Card not found for task. Possibly still being processed.") + return web.Response( + content_type="text/html", + status=404, + body="Card not found for task. Possibly still being processed.", + ) if cards and hash in cards: return web.Response(content_type="text/html", body=cards[hash]["html"]) else: - return web.Response(content_type="text/html", status=404, body="Card not found for task.") + return web.Response( + content_type="text/html", status=404, body="Card not found for task." + ) + + @handle_exceptions + async def get_card_data_by_hash(self, request): + """ + --- + description: Get the data of a card created for a task. Contains any additional updates needed by the card. + tags: + - Card + parameters: + - $ref: '#/definitions/Params/Path/flow_id' + - $ref: '#/definitions/Params/Path/run_number' + - $ref: '#/definitions/Params/Path/step_name' + - $ref: '#/definitions/Params/Path/task_id' + - $ref: '#/definitions/Params/Path/hash' + + produces: + - application/json + responses: + "200": + description: Returns the data object created by the realtime card with the specific hash + "404": + description: Card data was not found. + "405": + description: invalid HTTP Method + schema: + $ref: '#/definitions/ResponsesError405' + """ + + _hash = request.match_info.get("hash") + task = await self.get_task_by_request(request, with_status=True) + if not task: + return web.Response( + content_type="text/html", status=404, body="Task not found." + ) + data = await get_card_data_for_task(self.cache, task, _hash) + if data is None: + return web_response(404, {"error": "Card data not found for task"}) + else: + return web_response(200, data) -async def get_cards_for_task(cache_client, task, invalidate_cache=False) -> Optional[Dict[str, Dict]]: + +async def get_card_data_for_task( + cache_client, task, card_hash +) -> Optional[Dict[str, Dict]]: + """ + Return the card-data from the cache, or nothing. + + Example: + -------- + { + "id": 1, + "hash": "abc123", + "data": {} + } + """ + pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( + flow_id=task.get("flow_id"), + run_id=task.get("run_id") or task.get("run_number"), + step_name=task.get("step_name"), + task_id=task.get("task_name") or task.get("task_id"), + ) + pathspec_with_hash = "{pathspec}/{card_hash}".format( + pathspec=pathspec, card_hash=card_hash + ) + # Invalidate cache until the task completes because the card data may keep getting updated + task_status = task.get("status") + task_has_not_finished = task_status in ["pending", "running", "unknown"] + res = await cache_client.cache.GetCardData( + pathspec, card_hash, task_has_not_finished + ) + + if res.has_pending_request(): + async for event in res.stream(): + if event["type"] == "error": + # raise error, there was an exception during fetching. + raise CardException(event["message"], event["id"], event["traceback"]) + await res.wait() # wait until results are ready + data = res.get() + if data and pathspec_with_hash in data: + success, value, detail, trace = unpack_processed_value(data[pathspec_with_hash]) + if success: + return {**value, "is_complete": not task_has_not_finished} + else: + raise CardException(detail, value, trace) + return None + + +async def get_cards_for_task( + cache_client, task, invalidate_cache=False +) -> Optional[Dict[str, Dict]]: """ Return a dictionary of cards from the cache, or nothing. @@ -186,8 +302,7 @@ async def get_cards_for_task(cache_client, task, invalidate_cache=False) -> Opti def get_pagination_params(request): - """extract pagination params from request - """ + """extract pagination params from request""" # Page page = max(int(request.query.get("_page", 1)), 1) @@ -202,7 +317,9 @@ def get_pagination_params(request): class CardException(Exception): - def __init__(self, msg='Failed to read card contents', id='card-error', traceback_str=None): + def __init__( + self, msg="Failed to read card contents", id="card-error", traceback_str=None + ): self.message = msg self.id = id self.traceback_str = traceback_str diff --git a/services/ui_backend_service/data/cache/get_card_data_action.py b/services/ui_backend_service/data/cache/get_card_data_action.py new file mode 100644 index 00000000..8044cae5 --- /dev/null +++ b/services/ui_backend_service/data/cache/get_card_data_action.py @@ -0,0 +1,87 @@ +from typing import Callable, List + +from services.ui_backend_service.data.cache.utils import streamed_errors + +from .get_data_action import GetData + +from metaflow import Task +from metaflow.cards import get_cards +import os + + +class GetCardData(GetData): + @classmethod + def format_request(cls, pathspec: str, card_hash: str, invalidate_cache=False): + """ + Cache Action to fetch Cards HTML content for a pathspec + + Parameters + ---------- + pathspec : str + Task pathspec: + ["FlowId/RunNumber/StepName/TaskId"] + card_hash : str + Card hash + """ + targets = "{}/{}".format(pathspec, card_hash) + # We set the targets to have the card hash so that we can find the card data for a particular hash + return super().format_request( + targets=[targets], invalidate_cache=invalidate_cache + ) + + @classmethod + def fetch_data( + cls, pathspec_with_card_hash: str, stream_output: Callable[[str], None] + ): + """ + Fetch data using Metaflow Client. + + Parameters + ---------- + pathspec_with_card_hash : str + Task pathspec with card hash + "FlowId/RunNumber/StepName/TaskId/CardHash" + stream_output : Callable[[object], None] + Stream output callable from execute() that accepts a JSON serializable object. + Used for generic messaging. + + Errors can be streamed to cache client using `stream_output` in combination with + the error_event_msg helper. This way failures won't be cached for individual artifacts, + thus making it necessary to retry fetching during next attempt. + (Will add significant overhead/delay). + + Stream error example: + stream_output(error_event_msg(str(ex), "s3-not-found", get_traceback_str())) + """ + + def _card_item(card): + card_load_policy = os.environ.get("MF_CARD_LOAD_POLICY", "full") + card_data = None + if card_load_policy == "full": + card_data = card.get_data() + if card_data is None: + return None + return {"id": card.id, "data": card_data, "hash": card.hash} + + try: + pathspec = "/".join(pathspec_with_card_hash.split("/")[:-1]) + card_hash = pathspec_with_card_hash.split("/")[-1] + with streamed_errors(stream_output): + task = Task("{}".format(pathspec)) + cards_datas = [ + _card_item(card) + for card in get_cards(task) + if card.hash == card_hash + ] + if len(cards_datas) == 0: + # This means there is no card with the given hash + return [True, None] + elif cards_datas[0] is None: + # This means the card data is not available + return [True, None] + except Exception: + # NOTE: return false in order not to cache this + # since parameters might be available later + return False + + return [True, cards_datas[0]] diff --git a/services/ui_backend_service/data/cache/store.py b/services/ui_backend_service/data/cache/store.py index 722f7162..cc201d10 100644 --- a/services/ui_backend_service/data/cache/store.py +++ b/services/ui_backend_service/data/cache/store.py @@ -21,6 +21,7 @@ from .get_parameters_action import GetParameters from .get_task_action import GetTask from .get_cards_action import GetCards +from .get_card_data_action import GetCardData # Tagged logger logger = logging.getLogger("CacheStore") @@ -145,7 +146,7 @@ async def restart_requested(self): async def start_cache(self): "Initialize the CacheAsyncClient for artifact caching" - actions = [GetData, SearchArtifacts, GetTask, GetArtifacts, GetParameters, GetCards] + actions = [GetData, SearchArtifacts, GetTask, GetArtifacts, GetParameters, GetCards, GetCardData] self.cache = CacheAsyncClient('cache_data/artifact_search', actions, max_size=CACHE_ARTIFACT_STORAGE_LIMIT, diff --git a/services/ui_backend_service/tests/integration_tests/card_test.py b/services/ui_backend_service/tests/integration_tests/card_test.py index 9534b302..57116bdb 100644 --- a/services/ui_backend_service/tests/integration_tests/card_test.py +++ b/services/ui_backend_service/tests/integration_tests/card_test.py @@ -30,7 +30,6 @@ async def test_card_not_returned(cli, db): run_id=_step.get("run_id"))).body async def get_cards_for_task(cache_client, task, invalidate_cache=False): - assert invalidate_cache is False return None with mock.patch("services.ui_backend_service.api.card.get_cards_for_task", new=get_cards_for_task):