diff --git a/requirements.txt b/requirements.txt index 125e5cb..26792af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ pytest==7.1.2 pytest-asyncio==0.15.1 pytest-dotenv==0.5.2 pyyaml==6.0 -reasoner-pydantic==4.0.6 +reasoner-pydantic==4.1.1 redis~=3.5.3 requests==2.28.1 uvicorn==0.17.6 diff --git a/src/aragorn_app.py b/src/aragorn_app.py index ea940ec..92ea0d9 100644 --- a/src/aragorn_app.py +++ b/src/aragorn_app.py @@ -113,7 +113,6 @@ async def subservice_callback(response: PDResponse, guid: str) -> int: ret_val: int = 200 logger.debug(f"{guid}: Receiving sub-service callback") - # logger.debug(f'{guid}: The sub-service response: {response.json()}') try: async with channel_pool.acquire() as channel: diff --git a/src/operations.py b/src/operations.py index 688414c..1c1dd59 100644 --- a/src/operations.py +++ b/src/operations.py @@ -74,6 +74,9 @@ async def filter_kgraph_orphans(message,params,guid): for auxgraph in auxgraphs: aux_edges = message.get('message',{}).get('auxiliary_graphs',{}).get(auxgraph,{}).get('edges',[]) for aux_edge in aux_edges: + if aux_edge not in message["message"]["knowledge_graph"]["edges"]: + logger.warning(f"{guid}: aux_edge {aux_edge} not in knowledge_graph.edges") + continue edges.add(aux_edge) nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["subject"]) nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["object"]) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 572e348..092cb43 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -21,6 +21,8 @@ from reasoner_pydantic import Response as PDResponse import uuid +DUMPTRUCK = False + #from src.rules.rules import rules as AMIE_EXPANSIONS logger = logging.getLogger(__name__) @@ -310,7 +312,9 @@ def has_unique_nodes(result): async def filter_repeated_nodes(response,guid): """We have some rules that include e.g. 2 chemicals. We don't want responses in which those two are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense.""" - original_result_count = len(response["message"]["results"]) + original_result_count = len(response["message"].get("results",[])) + if original_result_count == 0: + return results = list(filter( lambda x: has_unique_nodes(x), response["message"]["results"] )) response["message"]["results"] = results if len(results) != original_result_count: @@ -337,23 +341,21 @@ async def check_for_messages(guid, num_queries, num_previously_received=0): num_responses += 1 logger.info(f"{guid}: Strider returned {num_responses} out of {num_queries}.") jr = process_message(message) - #with open(f"{guid}_{num_responses}.json","w") as outf: - # json.dump(jr,outf,indent=2) + if DUMPTRUCK: + with open(f"{guid}_{num_responses}.json","w") as outf: + json.dump(jr,outf,indent=2) if is_end_message(jr): logger.info(f"{guid}: Received complete message from multistrider") complete = True break - # it's a real message; update the kgraph and results - #await filter_repeated_nodes(jr,guid) - #query = Query.parse_obj(jr) - #pydantic_kgraph.update(query.message.knowledge_graph) - #if jr["message"]["results"] is None: - # jr["message"]["results"] = [] - #accumulated_results += jr["message"]["results"] - responses.append(jr) - logger.info(f"{guid}: {len(jr['message']['results'])} results from {jr['message']['query_graph']}") - logger.info(f"{guid}: {len(jr['message']['auxiliary_graphs'])} auxgraphs") + await de_noneify(jr) + if "query_graph" not in jr["message"]: + logger.warning(f"{guid}: No query graph in message") + else: + logger.info(f"{guid}: {len(jr.get('message',{}).get('results',[]))} results from {jr['message']['query_graph']}") + logger.info(f"{guid}: {len(jr.get('message',{}).get('auxiliary_graphs',[]))} auxgraphs") + responses.append(jr) # this is a little messy because this is trying to handle multiquery (returns an end message) # and single query (no end message; single query) @@ -467,8 +469,9 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params=Non try: # if there is a response return it as a dict if len(response.json()): - #pass it through pydantic for validation and cleaning - ret_val = await to_jsonable_dict(PDResponse.parse_obj(response.json()).dict(exclude_none = True)) + result = response.json() + await de_noneify(result) + ret_val = await to_jsonable_dict(result) except Exception as e: status_code = 500 logger.exception(f"{guid}: ARAGORN Exception {e} translating json from post to {name}") @@ -561,6 +564,21 @@ def chunk(input, n): for i in range(0, len(input), n): yield input[i : i + n] +async def de_noneify(message): + """Remove all the None values from a message""" + if isinstance(message, dict): + keys_to_remove = [] + for key, value in message.items(): + if value is None: + keys_to_remove.append(key) + else: + await de_noneify(value) + for key in keys_to_remove: + del message[key] + elif isinstance(message, list): + for item in message: + await de_noneify(item) + async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): if not infer: @@ -585,7 +603,12 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): num_batches_returned += 1 logger.info(f"{guid}: {num_batches_returned} batches returned") for result in batch_result_messages: - rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True)) + #this clean is dog slow with big messages + #rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True)) + await de_noneify(result) + rmessage = await to_jsonable_dict(result) + if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]: + continue await filter_repeated_nodes(rmessage, guid) result_messages.append(rmessage) logger.info(f"{guid}: strider complete") @@ -1040,6 +1063,11 @@ async def omnicorp(message, params, guid) -> (dict, int): :param guid: :return: """ + + if DUMPTRUCK: + with open("to_omni.json","w") as outf: + json.dump(message, outf, indent=2) + url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay' rval, omni_status = await subservice_post("omnicorp", url, message, guid) @@ -1059,6 +1087,10 @@ async def score(message, params, guid) -> (dict, int): :return: """ + if DUMPTRUCK: + with open("to_score.json","w") as outf: + json.dump(message, outf, indent=2) + ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/") score_url = f"{ranker_url}score"