Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualbug #206

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openapi-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ servers:
# url: http://127.0.0.1:5000
termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI
title: ARAGORN
version: 2.4.15
version: 2.4.16
tags:
- name: translator
- name: ARA
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pytest==7.1.2
pytest-asyncio==0.15.1
pytest-dotenv==0.5.2
pyyaml==6.0
reasoner-pydantic==4.0.6
reasoner-pydantic==4.1.1
redis~=3.5.3
requests==2.28.1
uvicorn==0.17.6
Expand Down
1 change: 0 additions & 1 deletion src/aragorn_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ async def subservice_callback(response: PDResponse, guid: str) -> int:
ret_val: int = 200

logger.debug(f"{guid}: Receiving sub-service callback")
# logger.debug(f'{guid}: The sub-service response: {response.json()}')

try:
async with channel_pool.acquire() as channel:
Expand Down
3 changes: 3 additions & 0 deletions src/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async def filter_kgraph_orphans(message,params,guid):
for auxgraph in auxgraphs:
aux_edges = message.get('message',{}).get('auxiliary_graphs',{}).get(auxgraph,{}).get('edges',[])
for aux_edge in aux_edges:
if aux_edge not in message["message"]["knowledge_graph"]["edges"]:
logger.warning(f"{guid}: aux_edge {aux_edge} not in knowledge_graph.edges")
continue
edges.add(aux_edge)
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["subject"])
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["object"])
Expand Down
70 changes: 54 additions & 16 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from reasoner_pydantic import Response as PDResponse
import uuid

DUMPTRUCK = False

#from src.rules.rules import rules as AMIE_EXPANSIONS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -310,7 +312,9 @@ def has_unique_nodes(result):
async def filter_repeated_nodes(response,guid):
"""We have some rules that include e.g. 2 chemicals. We don't want responses in which those two
are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense."""
original_result_count = len(response["message"]["results"])
original_result_count = len(response["message"].get("results",[]))
if original_result_count == 0:
return
results = list(filter( lambda x: has_unique_nodes(x), response["message"]["results"] ))
response["message"]["results"] = results
if len(results) != original_result_count:
Expand All @@ -337,23 +341,21 @@ async def check_for_messages(guid, num_queries, num_previously_received=0):
num_responses += 1
logger.info(f"{guid}: Strider returned {num_responses} out of {num_queries}.")
jr = process_message(message)
#with open(f"{guid}_{num_responses}.json","w") as outf:
# json.dump(jr,outf,indent=2)
if DUMPTRUCK:
with open(f"{guid}_{num_responses}.json","w") as outf:
json.dump(jr,outf,indent=2)
if is_end_message(jr):
logger.info(f"{guid}: Received complete message from multistrider")
complete = True
break

# it's a real message; update the kgraph and results
#await filter_repeated_nodes(jr,guid)
#query = Query.parse_obj(jr)
#pydantic_kgraph.update(query.message.knowledge_graph)
#if jr["message"]["results"] is None:
# jr["message"]["results"] = []
#accumulated_results += jr["message"]["results"]
responses.append(jr)
logger.info(f"{guid}: {len(jr['message']['results'])} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr['message']['auxiliary_graphs'])} auxgraphs")
await de_noneify(jr)
if "query_graph" not in jr["message"]:
logger.warning(f"{guid}: No query graph in message")
else:
logger.info(f"{guid}: {len(jr.get('message',{}).get('results',[]))} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr.get('message',{}).get('auxiliary_graphs',[]))} auxgraphs")
responses.append(jr)

# this is a little messy because this is trying to handle multiquery (returns an end message)
# and single query (no end message; single query)
Expand Down Expand Up @@ -467,8 +469,9 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params=Non
try:
# if there is a response return it as a dict
if len(response.json()):
#pass it through pydantic for validation and cleaning
ret_val = await to_jsonable_dict(PDResponse.parse_obj(response.json()).dict(exclude_none = True))
result = response.json()
await de_noneify(result)
ret_val = await to_jsonable_dict(result)
except Exception as e:
status_code = 500
logger.exception(f"{guid}: ARAGORN Exception {e} translating json from post to {name}")
Expand Down Expand Up @@ -561,6 +564,21 @@ def chunk(input, n):
for i in range(0, len(input), n):
yield input[i : i + n]

async def de_noneify(message):
"""Remove all the None values from a message"""
if isinstance(message, dict):
keys_to_remove = []
for key, value in message.items():
if value is None:
keys_to_remove.append(key)
else:
await de_noneify(value)
for key in keys_to_remove:
del message[key]
elif isinstance(message, list):
for item in message:
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
if not infer:
Expand All @@ -585,7 +603,12 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
num_batches_returned += 1
logger.info(f"{guid}: {num_batches_returned} batches returned")
for result in batch_result_messages:
rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
#this clean is dog slow with big messages
#rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
await de_noneify(result)
rmessage = await to_jsonable_dict(result)
if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]:
continue
await filter_repeated_nodes(rmessage, guid)
result_messages.append(rmessage)
logger.info(f"{guid}: strider complete")
Expand Down Expand Up @@ -751,6 +774,10 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer):
qnode_subject = answer
qnode_object = query_graph["nodes"][qnode_object_id]["ids"][0]
predicate = qedge["predicates"][0]
if "qualifier_constraints" in qedge and qedge["qualifier_constraints"] is not None and len(qedge["qualifier_constraints"]) > 0:
qualifiers = qedge["qualifier_constraints"][0]["qualifier_set"]
else:
qualifiers = None
# Create a new knowledge edge
new_edge_id = str(uuid.uuid4())
new_edge = {
Expand All @@ -766,6 +793,8 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer):
# Aragorn is the primary ks because aragorn inferred the existence of this edge.
"sources": [{"resource_id":"infores:aragorn", "resource_role":"primary_knowledge_source"}]
}
if qualifiers is not None:
new_edge["qualifiers"] = qualifiers
result_message["message"]["knowledge_graph"]["edges"][new_edge_id] = new_edge
return new_edge_id

Expand Down Expand Up @@ -1034,6 +1063,11 @@ async def omnicorp(message, params, guid) -> (dict, int):
:param guid:
:return:
"""

if DUMPTRUCK:
with open("to_omni.json","w") as outf:
json.dump(message, outf, indent=2)

url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay'

rval, omni_status = await subservice_post("omnicorp", url, message, guid)
Expand All @@ -1053,6 +1087,10 @@ async def score(message, params, guid) -> (dict, int):
:return:
"""

if DUMPTRUCK:
with open("to_score.json","w") as outf:
json.dump(message, outf, indent=2)

ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")

score_url = f"{ranker_url}score"
Expand Down
13 changes: 13 additions & 0 deletions tests/test_query_exam.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from reasoner_pydantic.results import Analysis, EdgeBinding, Result, NodeBinding
from reasoner_pydantic.auxgraphs import AuxiliaryGraph
from reasoner_pydantic.message import Response
import json

def create_result_graph():
"""Create a "treats" result graph with a query graph."""
Expand All @@ -19,6 +20,11 @@ def create_result(node_bindings: dict[str,str], edge_bindings: dict[str,str]) ->
result = Result(node_bindings = {k:[NodeBinding(id=v)] for k,v in node_bindings.items()}, analyses = set([analysis]))
return result

def create_pretend_knowledge_edge(subject, object, predicate, infores):
"""Create a pretend knowledge edge."""
ke = {"subject":subject, "object":object, "predicate":predicate, "sources":[{"resource_id":infores, "resource_role": "primary_knowledge_source"}]}
return ke

def test_merge_answer_creative_only():
"""Test that merge_answer() puts all the aux graphs in the right places."""
pydantic_result = create_result_graph()
Expand All @@ -28,6 +34,7 @@ def test_merge_answer_creative_only():
result1 = create_result({"input":"MONDO:1234", "output":answer, "node2": "curie:3"}, {"g":"KEDGE:1", "f":"KEDGE:2"}).to_dict()
result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict()
results = [result1, result2]

#In reality the results will be in the message and we want to be sure that they get cleared out.
result_message["message"]["results"] = results
merge_results_by_node(result_message,"output",[])
Expand Down Expand Up @@ -61,6 +68,9 @@ def test_merge_answer_lookup_only():
qnode_ids = ["input", "output"]
result1 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True)
result2 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:2"}).dict(exclude_none=True)
for n, ke_id in enumerate(["lookup:1", "lookup:2"]):
ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}")
result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke
lookup_results = [result1, result2]
result_message["message"]["results"] = []
merge_results_by_node(result_message,"output",lookup_results)
Expand All @@ -84,6 +94,9 @@ def test_merge_answer_creative_and_lookup():
result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict()
results = [result1, result2]
lookup = [create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True)]
for n, ke_id in enumerate(["lookup:1"]):
ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}")
result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke
#In reality the results will be in the message and we want to be sure that they get cleared out.
result_message["message"]["results"] = results
merge_results_by_node(result_message,"output",lookup)
Expand Down
Loading