Skip to content

Commit

Permalink
various bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cbizon committed Jul 19, 2023
1 parent 0cbfd38 commit 60e4f82
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 18 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pytest==7.1.2
pytest-asyncio==0.15.1
pytest-dotenv==0.5.2
pyyaml==6.0
reasoner-pydantic==4.0.6
reasoner-pydantic==4.1.1
redis~=3.5.3
requests==2.28.1
uvicorn==0.17.6
Expand Down
1 change: 0 additions & 1 deletion src/aragorn_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ async def subservice_callback(response: PDResponse, guid: str) -> int:
ret_val: int = 200

logger.debug(f"{guid}: Receiving sub-service callback")
# logger.debug(f'{guid}: The sub-service response: {response.json()}')

try:
async with channel_pool.acquire() as channel:
Expand Down
3 changes: 3 additions & 0 deletions src/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ async def filter_kgraph_orphans(message,params,guid):
for auxgraph in auxgraphs:
aux_edges = message.get('message',{}).get('auxiliary_graphs',{}).get(auxgraph,{}).get('edges',[])
for aux_edge in aux_edges:
if aux_edge not in message["message"]["knowledge_graph"]["edges"]:
logger.warning(f"{guid}: aux_edge {aux_edge} not in knowledge_graph.edges")
continue
edges.add(aux_edge)
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["subject"])
nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["object"])
Expand Down
64 changes: 48 additions & 16 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from reasoner_pydantic import Response as PDResponse
import uuid

DUMPTRUCK = False

#from src.rules.rules import rules as AMIE_EXPANSIONS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -310,7 +312,9 @@ def has_unique_nodes(result):
async def filter_repeated_nodes(response,guid):
"""We have some rules that include e.g. 2 chemicals. We don't want responses in which those two
are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense."""
original_result_count = len(response["message"]["results"])
original_result_count = len(response["message"].get("results",[]))
if original_result_count == 0:
return
results = list(filter( lambda x: has_unique_nodes(x), response["message"]["results"] ))
response["message"]["results"] = results
if len(results) != original_result_count:
Expand All @@ -337,23 +341,21 @@ async def check_for_messages(guid, num_queries, num_previously_received=0):
num_responses += 1
logger.info(f"{guid}: Strider returned {num_responses} out of {num_queries}.")
jr = process_message(message)
#with open(f"{guid}_{num_responses}.json","w") as outf:
# json.dump(jr,outf,indent=2)
if DUMPTRUCK:
with open(f"{guid}_{num_responses}.json","w") as outf:
json.dump(jr,outf,indent=2)
if is_end_message(jr):
logger.info(f"{guid}: Received complete message from multistrider")
complete = True
break

# it's a real message; update the kgraph and results
#await filter_repeated_nodes(jr,guid)
#query = Query.parse_obj(jr)
#pydantic_kgraph.update(query.message.knowledge_graph)
#if jr["message"]["results"] is None:
# jr["message"]["results"] = []
#accumulated_results += jr["message"]["results"]
responses.append(jr)
logger.info(f"{guid}: {len(jr['message']['results'])} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr['message']['auxiliary_graphs'])} auxgraphs")
await de_noneify(jr)
if "query_graph" not in jr["message"]:
logger.warning(f"{guid}: No query graph in message")
else:
logger.info(f"{guid}: {len(jr.get('message',{}).get('results',[]))} results from {jr['message']['query_graph']}")
logger.info(f"{guid}: {len(jr.get('message',{}).get('auxiliary_graphs',[]))} auxgraphs")
responses.append(jr)

# this is a little messy because this is trying to handle multiquery (returns an end message)
# and single query (no end message; single query)
Expand Down Expand Up @@ -467,8 +469,9 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params=Non
try:
# if there is a response return it as a dict
if len(response.json()):
#pass it through pydantic for validation and cleaning
ret_val = await to_jsonable_dict(PDResponse.parse_obj(response.json()).dict(exclude_none = True))
result = response.json()
await de_noneify(result)
ret_val = await to_jsonable_dict(result)
except Exception as e:
status_code = 500
logger.exception(f"{guid}: ARAGORN Exception {e} translating json from post to {name}")
Expand Down Expand Up @@ -561,6 +564,21 @@ def chunk(input, n):
for i in range(0, len(input), n):
yield input[i : i + n]

async def de_noneify(message):
"""Remove all the None values from a message"""
if isinstance(message, dict):
keys_to_remove = []
for key, value in message.items():
if value is None:
keys_to_remove.append(key)
else:
await de_noneify(value)
for key in keys_to_remove:
del message[key]
elif isinstance(message, list):
for item in message:
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
if not infer:
Expand All @@ -585,7 +603,12 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode):
num_batches_returned += 1
logger.info(f"{guid}: {num_batches_returned} batches returned")
for result in batch_result_messages:
rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
#this clean is dog slow with big messages
#rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True))
await de_noneify(result)
rmessage = await to_jsonable_dict(result)
if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]:
continue
await filter_repeated_nodes(rmessage, guid)
result_messages.append(rmessage)
logger.info(f"{guid}: strider complete")
Expand Down Expand Up @@ -1040,6 +1063,11 @@ async def omnicorp(message, params, guid) -> (dict, int):
:param guid:
:return:
"""

if DUMPTRUCK:
with open("to_omni.json","w") as outf:
json.dump(message, outf, indent=2)

url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay'

rval, omni_status = await subservice_post("omnicorp", url, message, guid)
Expand All @@ -1059,6 +1087,10 @@ async def score(message, params, guid) -> (dict, int):
:return:
"""

if DUMPTRUCK:
with open("to_score.json","w") as outf:
json.dump(message, outf, indent=2)

ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")

score_url = f"{ranker_url}score"
Expand Down

0 comments on commit 60e4f82

Please sign in to comment.