Skip to content

Commit

Permalink
Merge pull request #206 from ranking-agent/qualbug
Browse files Browse the repository at this point in the history
Qualbug
  • Loading branch information
cbizon authored Jul 19, 2023
2 parents 9ff3e4f + 3238c0e commit ff2cfcd
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 19 deletions.
2 changes: 1 addition & 1 deletion openapi-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ servers:
# url: http://127.0.0.1:5000
termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI
title: ARAGORN
version: 2.4.15
version: 2.4.16
tags:
- name: translator
- name: ARA
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pytest==7.1.2
pytest-asyncio==0.15.1
pytest-dotenv==0.5.2
pyyaml==6.0
reasoner-pydantic==4.0.6
reasoner-pydantic==4.1.1
redis~=3.5.3
requests==2.28.1
uvicorn==0.17.6
Expand Down
1 change: 0 additions & 1 deletion src/aragorn_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ async def subservice_callback(response: PDResponse, guid: str) -> int:
ret_val: int = 200

logger.debug(f"{guid}: Receiving sub-service callback")
# logger.debug(f'{guid}: The sub-service response: {response.json()}')

try:
async with channel_pool.acquire() as channel:
Expand Down
3 changes: 3 additions & 0 deletions src/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async def filter_kgraph_orphans(message,params,guid):
for auxgraph in auxgraphs:
aux_edges = message.get('message',{}).get('auxiliary_graphs',{}).get(auxgraph,{}).get('edges',[])
for aux_edge in aux_edges:
if aux_edge not in message["message"]["knowledge_graph"]["edges"]:
logger.warning(f"{guid}: aux_edge {aux_edge} not in knowledge_graph.edges")
continue
edges.add(aux_edge)
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["subject"])
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["object"])
Expand Down
70 changes: 54 additions & 16 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from reasoner_pydantic import Response as PDResponse
import uuid

DUMPTRUCK = False

#from src.rules.rules import rules as AMIE_EXPANSIONS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -310,7 +312,9 @@ def has_unique_nodes(result):
async def filter_repeated_nodes(response,guid):
"""We have some rules that include e.g. 2 chemicals. We don't want responses in which those two
are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense."""
original_result_count = len(response["message"]["results"])
original_result_count = len(response["message"].get("results",[]))
if original_result_count == 0:
return
results = list(filter( lambda x: has_unique_nodes(x), response["message"]["results"] ))
response["message"]["results"] = results
if len(results) != original_result_count:
Expand All @@ -337,23 +341,21 @@ async def check_for_messages(guid, num_queries, num_previously_received=0):
num_responses += 1
logger.info(f"{guid}: Strider returned {num_responses} out of {num_queries}.")
jr = process_message(message)
#with open(f"{guid}_{num_responses}.json","w") as outf:
# json.dump(jr,outf,indent=2)
if DUMPTRUCK:
with open(f"{guid}_{num_responses}.json","w") as outf:
json.dump(jr,outf,indent=2)
if is_end_message(jr):
logger.info(f"{guid}: Received complete message from multistrider")
complete = True
break

# it's a real message; update the kgraph and results
#await filter_repeated_nodes(jr,guid)
#query = Query.parse_obj(jr)
#pydantic_kgraph.update(query.message.knowledge_graph)
#if jr["message"]["results"] is None:
# jr["message"]["results"] = []
#accumulated_results += jr["message"]["results"]
responses.append(jr)
logger.info(f"{guid}: {len(jr['message']['results'])} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr['message']['auxiliary_graphs'])} auxgraphs")
await de_noneify(jr)
if "query_graph" not in jr["message"]:
logger.warning(f"{guid}: No query graph in message")
else:
logger.info(f"{guid}: {len(jr.get('message',{}).get('results',[]))} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr.get('message',{}).get('auxiliary_graphs',[]))} auxgraphs")
responses.append(jr)

# this is a little messy because this is trying to handle multiquery (returns an end message)
# and single query (no end message; single query)
Expand Down Expand Up @@ -467,8 +469,9 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params=Non
try:
# if there is a response return it as a dict
if len(response.json()):
#pass it through pydantic for validation and cleaning
ret_val = await to_jsonable_dict(PDResponse.parse_obj(response.json()).dict(exclude_none = True))
result = response.json()
await de_noneify(result)
ret_val = await to_jsonable_dict(result)
except Exception as e:
status_code = 500
logger.exception(f"{guid}: ARAGORN Exception {e} translating json from post to {name}")
Expand Down Expand Up @@ -561,6 +564,21 @@ def chunk(input, n):
for i in range(0, len(input), n):
yield input[i : i + n]

async def de_noneify(message):
"""Remove all the None values from a message"""
if isinstance(message, dict):
keys_to_remove = []
for key, value in message.items():
if value is None:
keys_to_remove.append(key)
else:
await de_noneify(value)
for key in keys_to_remove:
del message[key]
elif isinstance(message, list):
for item in message:
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
if not infer:
Expand All @@ -585,7 +603,12 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
num_batches_returned += 1
logger.info(f"{guid}: {num_batches_returned} batches returned")
for result in batch_result_messages:
rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
#this clean is dog slow with big messages
#rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
await de_noneify(result)
rmessage = await to_jsonable_dict(result)
if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]:
continue
await filter_repeated_nodes(rmessage, guid)
result_messages.append(rmessage)
logger.info(f"{guid}: strider complete")
Expand Down Expand Up @@ -751,6 +774,10 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer):
qnode_subject = answer
qnode_object = query_graph["nodes"][qnode_object_id]["ids"][0]
predicate = qedge["predicates"][0]
if "qualifier_constraints" in qedge and qedge["qualifier_constraints"] is not None and len(qedge["qualifier_constraints"]) > 0:
qualifiers = qedge["qualifier_constraints"][0]["qualifier_set"]
else:
qualifiers = None
# Create a new knowledge edge
new_edge_id = str(uuid.uuid4())
new_edge = {
Expand All @@ -766,6 +793,8 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer):
# Aragorn is the primary ks because aragorn inferred the existence of this edge.
"sources": [{"resource_id":"infores:aragorn", "resource_role":"primary_knowledge_source"}]
}
if qualifiers is not None:
new_edge["qualifiers"] = qualifiers
result_message["message"]["knowledge_graph"]["edges"][new_edge_id] = new_edge
return new_edge_id

Expand Down Expand Up @@ -1034,6 +1063,11 @@ async def omnicorp(message, params, guid) -> (dict, int):
:param guid:
:return:
"""

if DUMPTRUCK:
with open("to_omni.json","w") as outf:
json.dump(message, outf, indent=2)

url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay'

rval, omni_status = await subservice_post("omnicorp", url, message, guid)
Expand All @@ -1053,6 +1087,10 @@ async def score(message, params, guid) -> (dict, int):
:return:
"""

if DUMPTRUCK:
with open("to_score.json","w") as outf:
json.dump(message, outf, indent=2)

ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")

score_url = f"{ranker_url}score"
Expand Down
13 changes: 13 additions & 0 deletions tests/test_query_exam.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from reasoner_pydantic.results import Analysis, EdgeBinding, Result, NodeBinding
from reasoner_pydantic.auxgraphs import AuxiliaryGraph
from reasoner_pydantic.message import Response
import json

def create_result_graph():
"""Create a "treats" result graph with a query graph."""
Expand All @@ -19,6 +20,11 @@ def create_result(node_bindings: dict[str,str], edge_bindings: dict[str,str]) ->
result = Result(node_bindings = {k:[NodeBinding(id=v)] for k,v in node_bindings.items()}, analyses = set([analysis]))
return result

def create_pretend_knowledge_edge(subject, object, predicate, infores):
"""Create a pretend knowledge edge."""
ke = {"subject":subject, "object":object, "predicate":predicate, "sources":[{"resource_id":infores, "resource_role": "primary_knowledge_source"}]}
return ke

def test_merge_answer_creative_only():
"""Test that merge_answer() puts all the aux graphs in the right places."""
pydantic_result = create_result_graph()
Expand All @@ -28,6 +34,7 @@ def test_merge_answer_creative_only():
result1 = create_result({"input":"MONDO:1234", "output":answer, "node2": "curie:3"}, {"g":"KEDGE:1", "f":"KEDGE:2"}).to_dict()
result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict()
results = [result1, result2]

#In reality the results will be in the message and we want to be sure that they get cleared out.
result_message["message"]["results"] = results
merge_results_by_node(result_message,"output",[])
Expand Down Expand Up @@ -61,6 +68,9 @@ def test_merge_answer_lookup_only():
qnode_ids = ["input", "output"]
result1 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True)
result2 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:2"}).dict(exclude_none=True)
for n, ke_id in enumerate(["lookup:1", "lookup:2"]):
ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}")
result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke
lookup_results = [result1, result2]
result_message["message"]["results"] = []
merge_results_by_node(result_message,"output",lookup_results)
Expand All @@ -84,6 +94,9 @@ def test_merge_answer_creative_and_lookup():
result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict()
results = [result1, result2]
lookup = [create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True)]
for n, ke_id in enumerate(["lookup:1"]):
ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}")
result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke
#In reality the results will be in the message and we want to be sure that they get cleared out.
result_message["message"]["results"] = results
merge_results_by_node(result_message,"output",lookup)
Expand Down

0 comments on commit ff2cfcd

Please sign in to comment.