Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shadowfax #279

Merged
merged 14 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions documentation/pathfinder_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"message": {
"query_graph": {
"nodes": {
"n0": {
"ids": [
"PUBCHEM.COMPOUND:5291"
],
"name": "imatinib"
},
"n1": {
"ids": [
"MONDO:0004979"
],
"name": "asthma"
},
"un": {
"categories": [
"biolink:NamedThing"
]
}
},
"edges": {
"e0": {
"subject": "n0",
"object": "n1",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
},
"e1": {
"subject": "n0",
"object": "un",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
},
"e2": {
"subject": "n1",
"object": "un",
"predicates": [
"biolink:related_to"
],
"knowledge_type": "inferred"
}
}
},
"knowledge_graph": {
"nodes": {},
"edges": {}
},
"results": []
}
}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ opentelemetry-instrumentation-fastapi==0.37b0
opentelemetry-exporter-jaeger==1.16.0
opentelemetry-instrumentation-httpx==0.37b0
fakeredis<=2.10.2
networkx==3.3
46 changes: 46 additions & 0 deletions src/pathfinder/get_cooccurrence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Get more than pairwise literature cooccurence for a given list of curies."""
import json
import gzip
import os
import redis
import time
from typing import List

REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("REDIS_PORT", 6379)
PMIDS_DB = os.environ.get("PMIDS_DB", 1)
CURIES_DB = os.environ.get("CURIES_DB", 2)
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")

def get_the_pmids(curies: List[str]):
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=PMIDS_DB,
password=REDIS_PASSWORD
)
curie_pmids = []
for curie in curies:
pmids = r.get(curie)
if pmids is None:
pmids = []
else:
pmids = json.loads(gzip.decompress(pmids))
curie_pmids.append(pmids)
answer = list(set.intersection(*map(set, curie_pmids)))
return answer

def get_the_curies(pmid: str):
r = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=CURIES_DB,
password=REDIS_PASSWORD
)
curies = r.get(pmid)
if curies is None:
curies = []
else:
curies = json.loads(gzip.decompress(curies))
answer = list(curies)
return answer
34 changes: 24 additions & 10 deletions src/service_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from asyncio.exceptions import TimeoutError
from reasoner_pydantic import Query, KnowledgeGraph, QueryGraph
from reasoner_pydantic import Response as PDResponse
from src.shadowfax import shadowfax
import uuid

DUMPTRUCK = False
Expand All @@ -47,6 +48,8 @@ def examine_query(message):
# queries that are any shape with all lookup edges
# OR
# A 1-hop infer query.
# OR
# Pathfinder query
try:
# this can still fail if the input looks like e.g.:
# "query_graph": None
Expand All @@ -57,13 +60,14 @@ def examine_query(message):
for edge_id in qedges:
if qedges.get(edge_id, {}).get("knowledge_type", "lookup") == "inferred":
n_infer_edges += 1
if n_infer_edges > 1:
pathfinder = n_infer_edges == 3
if n_infer_edges > 1 and n_infer_edges and not pathfinder:
raise Exception("Only a single infer edge is supported", 400)
if (n_infer_edges > 0) and (n_infer_edges < len(qedges)):
raise Exception("Mixed infer and lookup queries not supported", 400)
infer = n_infer_edges == 1
if not infer:
return infer, None, None
return infer, None, None, pathfinder
qnodes = message.get("message", {}).get("query_graph", {}).get("nodes", {})
question_node = None
answer_node = None
Expand All @@ -76,7 +80,7 @@ def examine_query(message):
raise Exception("Both nodes of creative edge pinned", 400)
if question_node is None:
raise Exception("No nodes of creative edge pinned", 400)
return infer, question_node, answer_node
return infer, question_node, answer_node, pathfinder


def match_results_to_query(results, query_message, query_source, query_target, query_qedge_id):
Expand Down Expand Up @@ -111,7 +115,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
"""

try:
infer, question_qnode, answer_qnode = examine_query(message)
infer, question_qnode, answer_qnode, pathfinder = examine_query(message)
except Exception as e:
print(e)
return None, 500
Expand All @@ -127,7 +131,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
# e.g. our score operation will include both weighting and scoring for now.
# Also gives us a place to handle function specific logic
known_operations = {
"lookup": partial(lookup, caller=caller, infer=infer, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache),
"lookup": partial(lookup, caller=caller, infer=infer, pathfinder=pathfinder, answer_qnode=answer_qnode, question_qnode=question_qnode, bypass_cache=bypass_cache),
"enrich_results": partial(answercoalesce, coalesce_type=coalesce_type),
"overlay_connect_knodes": omnicorp,
"score": score,
Expand All @@ -154,6 +158,13 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
{"id": "score"},
{"id": "filter_message_top_n", "parameters": {"max_results": 500}},
]
elif pathfinder:
workflow_def = [
{"id": "lookup"},
{"id": "overlay_connect_knodes"},
{"id": "score"},
{"id": "filter_message_top_n", "parameters": {"max_results": 500}}
]
else:
# TODO: if this is robokop, need to normalize.
workflow_def = [
Expand All @@ -171,7 +182,7 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
# We told the world what we can do!
# Workflow will be a list of the functions, and the parameters if there are any

read_from_cache = not (bypass_cache or overwrite_cache)
read_from_cache = not (bypass_cache or overwrite_cache) and not pathfinder

try:
query_graph = message["message"]["query_graph"]
Expand Down Expand Up @@ -221,7 +232,8 @@ async def entry(message, guid, coalesce_type, caller) -> (dict, int):
if overwrite_cache or (not bypass_cache):
if infer:
results_cache.set_result(input_id, predicate, qualifiers, source_input, caller, workflow_def, mcq, member_ids, final_answer)
elif {"id": "lookup"} in workflow_def:
elif {"id": "lookup"} in workflow_def and not pathfinder:
# We won't cache pathfinder results for now
results_cache.set_lookup_result(workflow_def, query_graph, final_answer)

# return the answer
Expand Down Expand Up @@ -708,7 +720,7 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params={})
return ret_val, status_code


async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int):
async def lookup(message, params, guid, infer=False, pathfinder=False, caller="ARAGORN", answer_qnode=None, question_qnode=None, bypass_cache=False) -> (dict, int):
"""
Performs lookup, parameterized by ARAGORN/ROBOKOP and whether the query is an infer type query

Expand All @@ -720,7 +732,7 @@ async def lookup(message, params, guid, infer=False, caller="ARAGORN", answer_qn
"""
message = await normalize_qgraph_ids(message)
if caller == "ARAGORN":
return await aragorn_lookup(message, params, guid, infer, answer_qnode, bypass_cache)
return await aragorn_lookup(message, params, guid, infer, pathfinder, answer_qnode, bypass_cache)
elif caller == "ROBOKOP":
robo_results, robo_status = await robokop_lookup(message, params, guid, infer, question_qnode, answer_qnode)
return await add_provenance(robo_results), robo_status
Expand Down Expand Up @@ -755,10 +767,12 @@ async def de_noneify(message):
await de_noneify(item)


async def aragorn_lookup(input_message, params, guid, infer, answer_qnode, bypass_cache):
async def aragorn_lookup(input_message, params, guid, infer, pathfinder, answer_qnode, bypass_cache):
timeout_seconds = (input_message.get("parameters") or {}).get("timeout_seconds")
if timeout_seconds:
params["timeout_seconds"] = timeout_seconds if type(timeout_seconds) is int else 3 * 60
if pathfinder:
return await shadowfax(input_message, guid, logger)
if not infer:
return await strider(input_message, params, guid, bypass_cache)
# Now it's an infer query.
Expand Down
Loading
Loading