Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lawson/sparql pyoxigraph #186

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ To run the development server (with auto-reload on code changes):
poetry run python main.py
```

In order for Prez to work, a triple store such as [Apache Jena](https://jena.apache.org/) or [GraphDB](https://www.ontotext.com/products/graphdb/) must be available for Prez to interface with.
Connection to the triple store is done via [environment variables](#environment-variables).


Prez runs as a standard FastAPI application, so for all the normal HOW TO running questions, see FastAPI's documentation:
Expand All @@ -83,7 +85,8 @@ Prez runs as a standard FastAPI application, so for all the normal HOW TO runnin

### Environment Variables

You need to configure at least 1 environment variable for Prez to run. The full set of available variables are found in `prez/config.py`.
You need to configure at least one environment variable for Prez to run.
The full set of available variables are found in `prez/config.py`.

#### Minimal

Expand Down
59 changes: 0 additions & 59 deletions dev/prez demo instance.run.xml

This file was deleted.

28 changes: 21 additions & 7 deletions prez/routers/sparql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import io
import logging
from pprint import pprint

import httpx
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse, Response
from rdflib import Namespace, Graph
from starlette.background import BackgroundTask
from starlette.datastructures import Headers
Expand All @@ -16,6 +20,7 @@

router = APIRouter(tags=["SPARQL"])

log = logging.getLogger(__name__)

@router.api_route("/sparql", methods=["GET"])
async def sparql_endpoint(
Expand Down Expand Up @@ -48,10 +53,19 @@ async def sparql_endpoint(
headers=prof_and_mt_info.profile_headers,
)
else:
response = await repo.sparql(request)
return StreamingResponse(
response.aiter_raw(),
status_code=response.status_code,
headers=dict(response.headers),
background=BackgroundTask(response.aclose),
)
query_result = await repo.sparql(request)
if isinstance(query_result, dict):
return JSONResponse(content=query_result)
elif isinstance(query_result, Graph):
return Response(
content=query_result.serialize(format="text/turtle"),
status_code=200
)
else:
return StreamingResponse(
query_result.aiter_raw(),
status_code=query_result.status_code,
headers=dict(query_result.headers),
background=BackgroundTask(query_result.aclose),
)

73 changes: 56 additions & 17 deletions prez/sparql/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def tabular_query_to_table(self, query: str, context: URIRef = None):
pass

async def send_queries(
self, rdf_queries: List[str], tabular_queries: List[Tuple[URIRef, str]] = None
self, rdf_queries: List[str], tabular_queries: List[Tuple[URIRef, str]] = None
):
# Common logic to send both query types in parallel
results = await asyncio.gather(
Expand All @@ -47,6 +47,10 @@ async def send_queries(
tabular_results.append(result)
return g, tabular_results

@abstractmethod
def sparql(self, request):
pass


class RemoteSparqlRepo(Repo):
def __init__(self, async_client: httpx.AsyncClient):
Expand Down Expand Up @@ -108,40 +112,75 @@ class PyoxigraphRepo(Repo):
def __init__(self, pyoxi_store: pyoxigraph.Store):
self.pyoxi_store = pyoxi_store

def _sync_rdf_query_to_graph(self, query: str) -> Graph:
results = self.pyoxi_store.query(query)
ntriples = " .\n".join([str(r) for r in list(results)]) + " ."
g = Graph()
g.bind("prez", URIRef("https://prez.dev/"))
if ntriples == " .":
return g
return g.parse(data=ntriples, format="ntriples")

def _sync_tabular_query_to_table(self, query: str, context: URIRef = None):
results = self.pyoxi_store.query(query)
def _handle_query_solution_results(self, results: pyoxigraph.QuerySolutions) -> dict:
"""Organise the query results into format serializable by FastAPIs JSONResponse."""
variables = results.variables
results_dict = {"head": {"vars": [v.value for v in results.variables]}}
results_list = []
for result in results:
results_dict = {}
result_dict = {}
for var in variables:
binding = result[var]
if binding:
binding_type = self._pyoxi_result_type(binding)
results_dict[str(var)[1:]] = {
result_dict[str(var)[1:]] = {
"type": binding_type,
"value": binding.value,
}
results_list.append(results_dict)
return context, results_list
results_list.append(result_dict)
results_dict["results"] = {"bindings": results_list}
return results_dict

@staticmethod
def _handle_query_triples_results(results: pyoxigraph.QueryTriples) -> Graph:
"""Parse the query results into a Graph object."""
ntriples = " .\n".join([str(r) for r in list(results)]) + " ."
g = Graph()
g.bind("prez", URIRef("https://prez.dev/"))
if ntriples == " .":
return g
return g.parse(data=ntriples, format="ntriples")

def _sync_rdf_query_to_graph(self, query: str) -> Graph:
results = self.pyoxi_store.query(query)
result_graph = self._handle_query_triples_results(results)
return result_graph

def _sync_tabular_query_to_table(self, query: str, context: URIRef = None) -> tuple:
results = self.pyoxi_store.query(query)
results_dict = self._handle_query_solution_results(results)
# only return the bindings from the results.
return context, results_dict["results"]["bindings"]

def _sparql(self, request: Request) -> dict | Graph:
try:
query = request.query_params["query"]
except KeyError:
raise KeyError(f"No query was provided in the request parameters.")
results = self.pyoxi_store.query(query)
if isinstance(results, pyoxigraph.QuerySolutions): # a SELECT query result
results_dict = self._handle_query_solution_results(results)
return results_dict
elif isinstance(results, pyoxigraph.QueryTriples): # a CONSTRUCT query result
result_graph = self._handle_query_triples_results(results)
return result_graph
elif isinstance(results, bool):
results_dict = {"head": {}, "boolean": results}
return results_dict
else:
raise TypeError(f"Unexpected result class {type(results)}")

async def rdf_query_to_graph(self, query: str) -> Graph:
return await run_in_threadpool(self._sync_rdf_query_to_graph, query)

async def tabular_query_to_table(self, query: str, context: URIRef = None):
async def tabular_query_to_table(self, query: str, context: URIRef = None) -> list:
return await run_in_threadpool(
self._sync_tabular_query_to_table, query, context
)

async def sparql(self, request: Request) -> list | Graph:
return self._sparql(request)

@staticmethod
def _pyoxi_result_type(term) -> str:
if isinstance(term, pyoxigraph.Literal):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_sparql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

# TODO: call the sparql endpoint with a sparql query in the query params and check for response 200