From 0cbfd38afee857b31deff6bf8292bb34bcffcaaf Mon Sep 17 00:00:00 2001 From: cbizon Date: Thu, 13 Jul 2023 20:43:01 -0400 Subject: [PATCH 1/4] add qualifiers to creative edge --- openapi-config.yaml | 2 +- src/service_aggregator.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/openapi-config.yaml b/openapi-config.yaml index 779be6d..6e821b1 100644 --- a/openapi-config.yaml +++ b/openapi-config.yaml @@ -11,7 +11,7 @@ servers: # url: http://127.0.0.1:5000 termsOfService: http://robokop.renci.org:7055/tos?service_long=ARAGORN&provider_long=RENCI title: ARAGORN -version: 2.4.15 +version: 2.4.16 tags: - name: translator - name: ARA diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 3ddf9a3..572e348 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -751,6 +751,10 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer): qnode_subject = answer qnode_object = query_graph["nodes"][qnode_object_id]["ids"][0] predicate = qedge["predicates"][0] + if "qualifier_constraints" in qedge: + qualifiers = qedge["qualifier_constraints"][0]["qualifier_set"] + else: + qualifiers = None # Create a new knowledge edge new_edge_id = str(uuid.uuid4()) new_edge = { @@ -766,6 +770,8 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer): # Aragorn is the primary ks because aragorn inferred the existence of this edge. "sources": [{"resource_id":"infores:aragorn", "resource_role":"primary_knowledge_source"}] } + if qualifiers is not None: + new_edge["qualifiers"] = qualifiers result_message["message"]["knowledge_graph"]["edges"][new_edge_id] = new_edge return new_edge_id From 60e4f82fe06516e098e2b868eb981519e7d45779 Mon Sep 17 00:00:00 2001 From: cbizon Date: Wed, 19 Jul 2023 11:23:42 -0400 Subject: [PATCH 2/4] various bugfixes --- requirements.txt | 2 +- src/aragorn_app.py | 1 - src/operations.py | 3 ++ src/service_aggregator.py | 64 +++++++++++++++++++++++++++++---------- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/requirements.txt b/requirements.txt index 125e5cb..26792af 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ pytest==7.1.2 pytest-asyncio==0.15.1 pytest-dotenv==0.5.2 pyyaml==6.0 -reasoner-pydantic==4.0.6 +reasoner-pydantic==4.1.1 redis~=3.5.3 requests==2.28.1 uvicorn==0.17.6 diff --git a/src/aragorn_app.py b/src/aragorn_app.py index ea940ec..92ea0d9 100644 --- a/src/aragorn_app.py +++ b/src/aragorn_app.py @@ -113,7 +113,6 @@ async def subservice_callback(response: PDResponse, guid: str) -> int: ret_val: int = 200 logger.debug(f"{guid}: Receiving sub-service callback") - # logger.debug(f'{guid}: The sub-service response: {response.json()}') try: async with channel_pool.acquire() as channel: diff --git a/src/operations.py b/src/operations.py index 688414c..1c1dd59 100644 --- a/src/operations.py +++ b/src/operations.py @@ -74,6 +74,9 @@ async def filter_kgraph_orphans(message,params,guid): for auxgraph in auxgraphs: aux_edges = message.get('message',{}).get('auxiliary_graphs',{}).get(auxgraph,{}).get('edges',[]) for aux_edge in aux_edges: + if aux_edge not in message["message"]["knowledge_graph"]["edges"]: + logger.warning(f"{guid}: aux_edge {aux_edge} not in knowledge_graph.edges") + continue edges.add(aux_edge) nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["subject"]) nodes.add(message["message"]["knowledge_graph"]["edges"][aux_edge]["object"]) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 572e348..092cb43 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -21,6 +21,8 @@ from reasoner_pydantic import Response as PDResponse import uuid +DUMPTRUCK = False + #from src.rules.rules import rules as AMIE_EXPANSIONS logger = logging.getLogger(__name__) @@ -310,7 +312,9 @@ def has_unique_nodes(result): async def filter_repeated_nodes(response,guid): """We have some rules that include e.g. 2 chemicals. We don't want responses in which those two are the same. If you have A-B-A-C then what shows up in the ui is B-A-C which makes no sense.""" - original_result_count = len(response["message"]["results"]) + original_result_count = len(response["message"].get("results",[])) + if original_result_count == 0: + return results = list(filter( lambda x: has_unique_nodes(x), response["message"]["results"] )) response["message"]["results"] = results if len(results) != original_result_count: @@ -337,23 +341,21 @@ async def check_for_messages(guid, num_queries, num_previously_received=0): num_responses += 1 logger.info(f"{guid}: Strider returned {num_responses} out of {num_queries}.") jr = process_message(message) - #with open(f"{guid}_{num_responses}.json","w") as outf: - # json.dump(jr,outf,indent=2) + if DUMPTRUCK: + with open(f"{guid}_{num_responses}.json","w") as outf: + json.dump(jr,outf,indent=2) if is_end_message(jr): logger.info(f"{guid}: Received complete message from multistrider") complete = True break - # it's a real message; update the kgraph and results - #await filter_repeated_nodes(jr,guid) - #query = Query.parse_obj(jr) - #pydantic_kgraph.update(query.message.knowledge_graph) - #if jr["message"]["results"] is None: - # jr["message"]["results"] = [] - #accumulated_results += jr["message"]["results"] - responses.append(jr) - logger.info(f"{guid}: {len(jr['message']['results'])} results from {jr['message']['query_graph']}") - logger.info(f"{guid}: {len(jr['message']['auxiliary_graphs'])} auxgraphs") + await de_noneify(jr) + if "query_graph" not in jr["message"]: + logger.warning(f"{guid}: No query graph in message") + else: + logger.info(f"{guid}: {len(jr.get('message',{}).get('results',[]))} results from {jr['message']['query_graph']}") + logger.info(f"{guid}: {len(jr.get('message',{}).get('auxiliary_graphs',[]))} auxgraphs") + responses.append(jr) # this is a little messy because this is trying to handle multiquery (returns an end message) # and single query (no end message; single query) @@ -467,8 +469,9 @@ async def subservice_post(name, url, message, guid, asyncquery=False, params=Non try: # if there is a response return it as a dict if len(response.json()): - #pass it through pydantic for validation and cleaning - ret_val = await to_jsonable_dict(PDResponse.parse_obj(response.json()).dict(exclude_none = True)) + result = response.json() + await de_noneify(result) + ret_val = await to_jsonable_dict(result) except Exception as e: status_code = 500 logger.exception(f"{guid}: ARAGORN Exception {e} translating json from post to {name}") @@ -561,6 +564,21 @@ def chunk(input, n): for i in range(0, len(input), n): yield input[i : i + n] +async def de_noneify(message): + """Remove all the None values from a message""" + if isinstance(message, dict): + keys_to_remove = [] + for key, value in message.items(): + if value is None: + keys_to_remove.append(key) + else: + await de_noneify(value) + for key in keys_to_remove: + del message[key] + elif isinstance(message, list): + for item in message: + await de_noneify(item) + async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): if not infer: @@ -585,7 +603,12 @@ async def aragorn_lookup(input_message, params, guid, infer, answer_qnode): num_batches_returned += 1 logger.info(f"{guid}: {num_batches_returned} batches returned") for result in batch_result_messages: - rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True)) + #this clean is dog slow with big messages + #rmessage = await to_jsonable_dict(PDResponse.parse_obj(result).dict(exclude_none=True)) + await de_noneify(result) + rmessage = await to_jsonable_dict(result) + if "knowledge_graph" not in rmessage["message"] or "results" not in rmessage["message"]: + continue await filter_repeated_nodes(rmessage, guid) result_messages.append(rmessage) logger.info(f"{guid}: strider complete") @@ -1040,6 +1063,11 @@ async def omnicorp(message, params, guid) -> (dict, int): :param guid: :return: """ + + if DUMPTRUCK: + with open("to_omni.json","w") as outf: + json.dump(message, outf, indent=2) + url = f'{os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/")}omnicorp_overlay' rval, omni_status = await subservice_post("omnicorp", url, message, guid) @@ -1059,6 +1087,10 @@ async def score(message, params, guid) -> (dict, int): :return: """ + if DUMPTRUCK: + with open("to_score.json","w") as outf: + json.dump(message, outf, indent=2) + ranker_url = os.environ.get("RANKER_URL", "https://aragorn-ranker.renci.org/1.4/") score_url = f"{ranker_url}score" From 5e5035f3f993ab7572094086df2cc0cee35e2049 Mon Sep 17 00:00:00 2001 From: cbizon Date: Wed, 19 Jul 2023 13:32:28 -0400 Subject: [PATCH 3/4] fixed tests --- tests/test_query_exam.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/test_query_exam.py b/tests/test_query_exam.py index b4ae0fa..967e78e 100644 --- a/tests/test_query_exam.py +++ b/tests/test_query_exam.py @@ -3,6 +3,7 @@ from reasoner_pydantic.results import Analysis, EdgeBinding, Result, NodeBinding from reasoner_pydantic.auxgraphs import AuxiliaryGraph from reasoner_pydantic.message import Response +import json def create_result_graph(): """Create a "treats" result graph with a query graph.""" @@ -19,6 +20,11 @@ def create_result(node_bindings: dict[str,str], edge_bindings: dict[str,str]) -> result = Result(node_bindings = {k:[NodeBinding(id=v)] for k,v in node_bindings.items()}, analyses = set([analysis])) return result +def create_pretend_knowledge_edge(subject, object, predicate, infores): + """Create a pretend knowledge edge.""" + ke = {"subject":subject, "object":object, "predicate":predicate, "sources":[{"resource_id":infores, "resource_role": "primary_knowledge_source"}]} + return ke + def test_merge_answer_creative_only(): """Test that merge_answer() puts all the aux graphs in the right places.""" pydantic_result = create_result_graph() @@ -28,6 +34,7 @@ def test_merge_answer_creative_only(): result1 = create_result({"input":"MONDO:1234", "output":answer, "node2": "curie:3"}, {"g":"KEDGE:1", "f":"KEDGE:2"}).to_dict() result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict() results = [result1, result2] + #In reality the results will be in the message and we want to be sure that they get cleared out. result_message["message"]["results"] = results merge_results_by_node(result_message,"output",[]) @@ -61,6 +68,9 @@ def test_merge_answer_lookup_only(): qnode_ids = ["input", "output"] result1 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True) result2 = create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:2"}).dict(exclude_none=True) + for n, ke_id in enumerate(["lookup:1", "lookup:2"]): + ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}") + result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke lookup_results = [result1, result2] result_message["message"]["results"] = [] merge_results_by_node(result_message,"output",lookup_results) @@ -84,6 +94,9 @@ def test_merge_answer_creative_and_lookup(): result2 = create_result({"input":"MONDO:1234", "output":answer, "nodeX": "curie:8"}, {"q":"KEDGE:4", "z":"KEDGE:8"}).to_dict() results = [result1, result2] lookup = [create_result({"input":"MONDO:1234", "output":answer}, {"e":"lookup:1"}).dict(exclude_none=True)] + for n, ke_id in enumerate(["lookup:1"]): + ke = create_pretend_knowledge_edge("MONDO:1234", answer, "biolink:treats", f"infores:i{n}") + result_message["message"]["knowledge_graph"]["edges"][ke_id] = ke #In reality the results will be in the message and we want to be sure that they get cleared out. result_message["message"]["results"] = results merge_results_by_node(result_message,"output",lookup) From 3238c0e6f58f976f75e08293cfb6bd31e576da2c Mon Sep 17 00:00:00 2001 From: cbizon Date: Wed, 19 Jul 2023 13:37:58 -0400 Subject: [PATCH 4/4] gotta push em all --- src/service_aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/service_aggregator.py b/src/service_aggregator.py index 092cb43..d2b25e4 100644 --- a/src/service_aggregator.py +++ b/src/service_aggregator.py @@ -774,7 +774,7 @@ def add_knowledge_edge(result_message, aux_graph_ids, answer): qnode_subject = answer qnode_object = query_graph["nodes"][qnode_object_id]["ids"][0] predicate = qedge["predicates"][0] - if "qualifier_constraints" in qedge: + if "qualifier_constraints" in qedge and qedge["qualifier_constraints"] is not None and len(qedge["qualifier_constraints"]) > 0: qualifiers = qedge["qualifier_constraints"][0]["qualifier_set"] else: qualifiers = None