From 8188cbf9e0ea849cf9ba403fd007d071e50e9410 Mon Sep 17 00:00:00 2001 From: Chris Bizon Date: Thu, 11 Apr 2024 18:16:18 -0400 Subject: [PATCH] pushed bypass_cache to strider --- src/service_aggregator.py | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 329153a..8241344 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -116,12 +116,17 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): print(e) return None, 500 + + #we grab this stuff here so we can get it into lookup + override_cache = message.get("bypass_cache", False) + override_cache = override_cache if type(override_cache) is bool else False + # A map from operations advertised in our x-trapi to functions # This is to functions rather than e.g. service urls because we may combine multiple calls into one op. # e.g. our score operation will include both weighting and scoring for now. # Also gives us a place to handle function specific logic known_operations = { - "lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode), + "lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode, override_cache=override_cache), "enrich_results": partial(answercoalesce, coalesce_type=coalesce_type), "overlay_connect_knodes": omnicorp, "score": score, @@ -170,8 +175,6 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int): except KeyError: return f"No query graph", 422 results_cache = ResultsCache() - override_cache = message.get("bypass_cache", False) - override_cache = override_cache if type(override_cache) is bool else False results = None if infer: # We're going to cache infer queries, and we need to do that even if we're overriding the cache @@ -223,7 +226,7 @@ def is_end_message(message): return False -async def post_with_callback(host_url, query, guid, params={}): +async def post_with_callback(host_url, query, guid, params={}, override_cache=False): """ Post an asynchronous message. @@ -254,9 +257,11 @@ async def post_with_callback(host_url, query, guid, params={}): # make sure there is a place for the trapi log messages if "logs" not in query: query["logs"] = [] + query["bypass_cache"] = override_cache else: for qname, individual_query in query.items(): individual_query["callback"] = callback_url + individual_query["bypass_cache"] = override_cache if "logs" not in individual_query: individual_query["logs"] = [] num_queries = len(query) @@ -627,7 +632,7 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params={}) return ret_val, status_code -async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None) -> (dict, int): +async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, override_cache=False) -> (dict, int): """ Performs lookup, parameterized by ARAGORN/ROBOKOP and whether the query is an infer type query @@ -639,7 +644,7 @@ async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qn """ if caller == "ARAGORN": - return await aragorn_lookup(message, params, guid, infer, answer_qnode) + return await aragorn_lookup(message, params, guid, infer, answer_qnode, override_cache) elif caller == "ROBOKOP": robo_results, robo_status = await robokop_lookup(message, params, guid, infer, question_qnode, answer_qnode) return await add_provenance(robo_results), robo_status @@ -674,12 +679,12 @@ async def de_noneify(message): await de_noneify(item) -async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): +async def aragorn_lookup(input_message, params, guid, infer, answer_qnode, override_cache): timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds") if timeout_seconds: params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60 if not infer: - return await strider(input_message, params, guid) + return await strider(input_message, params, guid, override_cache) # Now it's an infer query. messages = expand_query(input_message, params, guid) lookup_query_graph = messages[0]["message"]["query_graph"] @@ -696,7 +701,7 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): num += 1 message[f"query_{num}"] = q logger.info(f"Sending {len(message)} messages to strider") - batch_result_messages = await multi_strider(message, params, guid) + batch_result_messages = await multi_strider(message, params, guid, override_cache) num_batches_returned += 1 logger.info(f"{guid}: {num_batches_returned} batches returned") for result in batch_result_messages: @@ -728,7 +733,7 @@ def merge_results_by_node_op(message, params, guid) -> (dict, int): return merged_results, 200 -async def strider(message, params, guid) -> (dict, int): +async def strider(message, params, guid, override_cache) -> (dict, int): # strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/") strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/") #strider_url = os.environ.get("STRIDER_URL", "https://strider.transltr.io/1.3/") @@ -741,6 +746,7 @@ async def strider(message, params, guid) -> (dict, int): strider_url += "asyncquery" asyncquery = True + message["bypass_cache"] = override_cache response = await subservice_post("strider", strider_url, message, guid, asyncquery=asyncquery, params=params) return response @@ -847,7 +853,7 @@ def expand_query(input_message, params, guid): -async def multi_strider(messages, params, guid): +async def multi_strider(messages, params, guid, override_cache): #strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/") strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/") @@ -855,7 +861,7 @@ async def multi_strider(messages, params, guid): #We don't want to do subservice_post, because that assumes TRAPI in and out. #it leads to confusion. #response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True) - responses = await post_with_callback(strider_url,messages,guid,params) + responses = await post_with_callback(strider_url,messages,guid,params,override_cache) return responses