Skip to content

Commit

Permalink
[realtime-cards] backend API Methods
Browse files Browse the repository at this point in the history
- also ran black on some files.
  • Loading branch information
valayDave committed Jan 8, 2024
1 parent 657e526 commit cea1296
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 18 deletions.
150 changes: 133 additions & 17 deletions services/ui_backend_service/api/card.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
from services.data.db_utils import translate_run_key, translate_task_key
from services.ui_backend_service.data import unpack_processed_value
from services.utils import handle_exceptions
from .utils import format_response_list, get_pathspec_from_request, query_param_enabled, web_response, DBPagination, DBResponse
from .utils import (
format_response_list,
get_pathspec_from_request,
query_param_enabled,
web_response,
DBPagination,
DBResponse,
)
from aiohttp import web


class CardsApi(object):
def __init__(self, app, db, cache=None):
self.db = db
self.cache = getattr(cache, "artifact_cache", None)

app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards",
Expand All @@ -20,10 +28,14 @@ def __init__(self, app, db, cache=None):
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}",
self.get_card_content_by_hash,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}/data",
self.get_card_data_by_hash,
)

async def get_task_by_request(self, request):
flow_id, run_number, step_name, task_id, _ = \
get_pathspec_from_request(request)
async def get_task_by_request(self, request, with_status=False):
flow_id, run_number, step_name, task_id, _ = get_pathspec_from_request(request)

run_id_key, run_id_value = translate_run_key(run_number)
task_id_key, task_id_value = translate_task_key(task_id)
Expand All @@ -32,14 +44,15 @@ async def get_task_by_request(self, request):
"flow_id = %s",
"{run_id_key} = %s".format(run_id_key=run_id_key),
"step_name = %s",
"{task_id_key} = %s".format(task_id_key=task_id_key)
"{task_id_key} = %s".format(task_id_key=task_id_key),
]
values = [flow_id, run_id_value, step_name, task_id_value]
db_response, *_ = await self.db.task_table_postgres.find_records(
fetch_single=True,
conditions=conditions,
values=values,
expanded=True
expanded=True,
enable_joins=with_status,
)
if db_response.response_code == 200:
return db_response.body
Expand Down Expand Up @@ -77,7 +90,7 @@ async def get_cards_list_for_task(self, request):

task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
return web_response(404, {"data": []})

invalidate_cache = query_param_enabled(request, "invalidate")
cards = await get_cards_for_task(self.cache, task, invalidate_cache)
Expand All @@ -86,7 +99,7 @@ async def get_cards_list_for_task(self, request):
# Handle edge: Cache failed to return anything, even errors.
# NOTE: choice of status 200 here is quite arbitrary, as the cache returning None is usually
# caused by a premature request, and cards are not permanently missing.
return web_response(200, {'data': []})
return web_response(200, {"data": []})

card_hashes = [
{"id": data["id"], "hash": hash, "type": data["type"]}
Expand Down Expand Up @@ -131,22 +144,124 @@ async def get_card_content_by_hash(self, request):
"""

hash = request.match_info.get("hash")
task = await self.get_task_by_request(request)
task = await self.get_task_by_request(request, with_status=True)
if not task:
return web.Response(content_type="text/html", status=404, body="Task not found.")
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
# Invalidate cache until the task completes because the card
# itself may keep getting updated
task_status = task.get("status")
task_has_not_finished = task_status in ["pending", "running", "unknown"]

cards = await get_cards_for_task(self.cache, task)
cards = await get_cards_for_task(
self.cache, task, invalidate_cache=task_has_not_finished
)

if cards is None:
return web.Response(content_type="text/html", status=404, body="Card not found for task. Possibly still being processed.")
return web.Response(
content_type="text/html",
status=404,
body="Card not found for task. Possibly still being processed.",
)

if cards and hash in cards:
return web.Response(content_type="text/html", body=cards[hash]["html"])
else:
return web.Response(content_type="text/html", status=404, body="Card not found for task.")
return web.Response(
content_type="text/html", status=404, body="Card not found for task."
)

@handle_exceptions
async def get_card_data_by_hash(self, request):
"""
---
description: Get the data of a card created for a task. Contains any additional updates needed by the card.
tags:
- Card
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Path/hash'
produces:
- application/json
responses:
"200":
description: Returns the data object created by the realtime card with the specific hash
"404":
description: Card data was not found.
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"""

_hash = request.match_info.get("hash")
task = await self.get_task_by_request(request, with_status=True)
if not task:
return web.Response(
content_type="text/html", status=404, body="Task not found."
)
data = await get_card_data_for_task(self.cache, task, _hash)

if data is None:
return web_response(404, {"error": "Card data not found for task"})
else:
return web_response(200, data)


async def get_card_data_for_task(
cache_client, task, card_hash
) -> Optional[Dict[str, Dict]]:
"""
Return the card-data from the cache, or nothing.
async def get_cards_for_task(cache_client, task, invalidate_cache=False) -> Optional[Dict[str, Dict]]:
Example:
--------
{
"id": 1,
"hash": "abc123",
"data": {}
}
"""
pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format(
flow_id=task.get("flow_id"),
run_id=task.get("run_id") or task.get("run_number"),
step_name=task.get("step_name"),
task_id=task.get("task_name") or task.get("task_id"),
)
pathspec_with_hash = "{pathspec}/{card_hash}".format(
pathspec=pathspec, card_hash=card_hash
)
# Invalidate cache until the task completes because the card data may keep getting updated
task_status = task.get("status")
task_has_not_finished = task_status in ["pending", "running", "unknown"]
res = await cache_client.cache.GetCardData(
pathspec, card_hash, task_has_not_finished
)

if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise CardException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
data = res.get()
if data and pathspec_with_hash in data:
success, value, detail, trace = unpack_processed_value(data[pathspec_with_hash])
if success:
return {**value, "is_complete": not task_has_not_finished}
else:
raise CardException(detail, value, trace)
return None


async def get_cards_for_task(
cache_client, task, invalidate_cache=False
) -> Optional[Dict[str, Dict]]:
"""
Return a dictionary of cards from the cache, or nothing.
Expand Down Expand Up @@ -186,8 +301,7 @@ async def get_cards_for_task(cache_client, task, invalidate_cache=False) -> Opti


def get_pagination_params(request):
"""extract pagination params from request
"""
"""extract pagination params from request"""
# Page
page = max(int(request.query.get("_page", 1)), 1)

Expand All @@ -202,7 +316,9 @@ def get_pagination_params(request):


class CardException(Exception):
def __init__(self, msg='Failed to read card contents', id='card-error', traceback_str=None):
def __init__(
self, msg="Failed to read card contents", id="card-error", traceback_str=None
):
self.message = msg
self.id = id
self.traceback_str = traceback_str
Expand Down
87 changes: 87 additions & 0 deletions services/ui_backend_service/data/cache/get_card_data_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Callable, List

from services.ui_backend_service.data.cache.utils import streamed_errors

from .get_data_action import GetData

from metaflow import Task
from metaflow.cards import get_cards
import os


class GetCardData(GetData):
@classmethod
def format_request(cls, pathspec: str, card_hash: str, invalidate_cache=False):
"""
Cache Action to fetch Cards HTML content for a pathspec
Parameters
----------
pathspec : str
Task pathspec:
["FlowId/RunNumber/StepName/TaskId"]
card_hash : str
Card hash
"""
targets = "{}/{}".format(pathspec, card_hash)
# We set the targets to have the card hash so that we can find the card data for a particular hash
return super().format_request(
targets=[targets], invalidate_cache=invalidate_cache
)

@classmethod
def fetch_data(
cls, pathspec_with_card_hash: str, stream_output: Callable[[str], None]
):
"""
Fetch data using Metaflow Client.
Parameters
----------
pathspec_with_card_hash : str
Task pathspec with card hash
"FlowId/RunNumber/StepName/TaskId/CardHash"
stream_output : Callable[[object], None]
Stream output callable from execute() that accepts a JSON serializable object.
Used for generic messaging.
Errors can be streamed to cache client using `stream_output` in combination with
the error_event_msg helper. This way failures won't be cached for individual artifacts,
thus making it necessary to retry fetching during next attempt.
(Will add significant overhead/delay).
Stream error example:
stream_output(error_event_msg(str(ex), "s3-not-found", get_traceback_str()))
"""

def _card_item(card):
card_load_policy = os.environ.get("MF_CARD_LOAD_POLICY", "full")
card_data = None
if card_load_policy == "full":
card_data = card.get_data()
if card_data is None:
return None
return {"id": card.id, "data": card_data, "hash": card.hash}

try:
pathspec = "/".join(pathspec_with_card_hash.split("/")[:-1])
card_hash = pathspec_with_card_hash.split("/")[-1]
with streamed_errors(stream_output):
task = Task("{}".format(pathspec))
cards_datas = [
_card_item(card)
for card in get_cards(task)
if card.hash == card_hash
]
if len(cards_datas) == 0:
# This means there is no card with the given hash
return [True, None]
elif cards_datas[0] is None:
# This means the card data is not available
return [True, None]
except Exception:
# NOTE: return false in order not to cache this
# since parameters might be available later
return False

return [True, cards_datas[0]]
3 changes: 2 additions & 1 deletion services/ui_backend_service/data/cache/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .get_parameters_action import GetParameters
from .get_task_action import GetTask
from .get_cards_action import GetCards
from .get_card_data_action import GetCardData

# Tagged logger
logger = logging.getLogger("CacheStore")
Expand Down Expand Up @@ -145,7 +146,7 @@ async def restart_requested(self):

async def start_cache(self):
"Initialize the CacheAsyncClient for artifact caching"
actions = [GetData, SearchArtifacts, GetTask, GetArtifacts, GetParameters, GetCards]
actions = [GetData, SearchArtifacts, GetTask, GetArtifacts, GetParameters, GetCards, GetCardData]
self.cache = CacheAsyncClient('cache_data/artifact_search',
actions,
max_size=CACHE_ARTIFACT_STORAGE_LIMIT,
Expand Down

0 comments on commit cea1296

Please sign in to comment.