Skip to content

Commit

Permalink
pushed bypass_cache to strider
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Bizon authored and Chris Bizon committed Apr 11, 2024
1 parent 45d4c1b commit 8188cbf
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,17 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
print(e)
return None, 500


#we grab this stuff here so we can get it into lookup
override_cache = message.get("bypass_cache", False)
override_cache = override_cache if type(override_cache) is bool else False

# A map from operations advertised in our x-trapi to functions
# This is to functions rather than e.g. service urls because we may combine multiple calls into one op.
# e.g. our score operation will include both weighting and scoring for now.
# Also gives us a place to handle function specific logic
known_operations = {
"lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode),
"lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode, override_cache=override_cache),
"enrich_results": partial(answercoalesce, coalesce_type=coalesce_type),
"overlay_connect_knodes": omnicorp,
"score": score,
Expand Down Expand Up @@ -170,8 +175,6 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
except KeyError:
return f"No query graph", 422
results_cache = ResultsCache()
override_cache = message.get("bypass_cache", False)
override_cache = override_cache if type(override_cache) is bool else False
results = None
if infer:
# We're going to cache infer queries, and we need to do that even if we're overriding the cache
Expand Down Expand Up @@ -223,7 +226,7 @@ def is_end_message(message):
return False


async def post_with_callback(host_url, query, guid, params={}):
async def post_with_callback(host_url, query, guid, params={}, override_cache=False):
"""
Post an asynchronous message.
Expand Down Expand Up @@ -254,9 +257,11 @@ async def post_with_callback(host_url, query, guid, params={}):
# make sure there is a place for the trapi log messages
if "logs" not in query:
query["logs"] = []
query["bypass_cache"] = override_cache
else:
for qname, individual_query in query.items():
individual_query["callback"] = callback_url
individual_query["bypass_cache"] = override_cache
if "logs" not in individual_query:
individual_query["logs"] = []
num_queries = len(query)
Expand Down Expand Up @@ -627,7 +632,7 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params={})
return ret_val, status_code


async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None) -> (dict, int):
async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, override_cache=False) -> (dict, int):
"""
Performs lookup, parameterized by ARAGORN/ROBOKOP and whether the query is an infer type query
Expand All @@ -639,7 +644,7 @@ async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qn
"""

if caller == "ARAGORN":
return await aragorn_lookup(message, params, guid, infer, answer_qnode)
return await aragorn_lookup(message, params, guid, infer, answer_qnode, override_cache)
elif caller == "ROBOKOP":
robo_results, robo_status = await robokop_lookup(message, params, guid, infer, question_qnode, answer_qnode)
return await add_provenance(robo_results), robo_status
Expand Down Expand Up @@ -674,12 +679,12 @@ async def de_noneify(message):
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
async def aragorn_lookup(input_message, params, guid, infer, answer_qnode, override_cache):
timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds")
if timeout_seconds:
params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60
if not infer:
return await strider(input_message, params, guid)
return await strider(input_message, params, guid, override_cache)
# Now it's an infer query.
messages = expand_query(input_message, params, guid)
lookup_query_graph = messages[0]["message"]["query_graph"]
Expand All @@ -696,7 +701,7 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
num += 1
message[f"query_{num}"] = q
logger.info(f"Sending {len(message)} messages to strider")
batch_result_messages = await multi_strider(message, params, guid)
batch_result_messages = await multi_strider(message, params, guid, override_cache)
num_batches_returned += 1
logger.info(f"{guid}: {num_batches_returned} batches returned")
for result in batch_result_messages:
Expand Down Expand Up @@ -728,7 +733,7 @@ def merge_results_by_node_op(message, params, guid) -> (dict, int):
return merged_results, 200


async def strider(message, params, guid) -> (dict, int):
async def strider(message, params, guid, override_cache) -> (dict, int):
# strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/")
strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/")
#strider_url = os.environ.get("STRIDER_URL", "https://strider.transltr.io/1.3/")
Expand All @@ -741,6 +746,7 @@ async def strider(message, params, guid) -> (dict, int):
strider_url += "asyncquery"
asyncquery = True

message["bypass_cache"] = override_cache
response = await subservice_post("strider", strider_url, message, guid, asyncquery=asyncquery, params=params)

return response
Expand Down Expand Up @@ -847,15 +853,15 @@ def expand_query(input_message, params, guid):



async def multi_strider(messages, params, guid):
async def multi_strider(messages, params, guid, override_cache):
#strider_url = os.environ.get("STRIDER_URL", "https://strider-dev.apps.renci.org/1.3/")
strider_url = os.environ.get("STRIDER_URL", "https://strider.renci.org/1.5/")

strider_url += "multiquery"
#We don't want to do subservice_post, because that assumes TRAPI in and out.
#it leads to confusion.
#response, status_code = await subservice_post("strider", strider_url, messages, guid, asyncquery=True)
responses = await post_with_callback(strider_url,messages,guid,params)
responses = await post_with_callback(strider_url,messages,guid,params,override_cache)

return responses

Expand Down

0 comments on commit 8188cbf

Please sign in to comment.