"""SPARQL 1.1 HTTP triple-store backend for Apache Jena Fuseki and similar."""
import re
from io import BytesIO
from typing import Optional
import requests
from rdflib import Graph
from rdflib.query import Result
from .base import TripleStoreBackend
[docs]
class FusekiBackend(TripleStoreBackend):
"""Triple-store backend that talks SPARQL 1.1 HTTP to a remote endpoint.
Compatible with Apache Jena Fuseki and any other store that implements
the SPARQL 1.1 Protocol (Blazegraph, GraphDB, Stardog).
"""
_QUERY_FORMS = ("CONSTRUCT", "DESCRIBE", "SELECT", "ASK")
def __init__(
self,
endpoint: str,
user: Optional[str] = None,
password: Optional[str] = None,
) -> None:
self.endpoint = endpoint.rstrip("/")
self.session = requests.Session()
if user is not None and password is not None:
self.session.auth = (user, password)
[docs]
def query(self, sparql):
form = self._detect_query_form(sparql)
if form in ("CONSTRUCT", "DESCRIBE"):
accept = "text/turtle"
else:
accept = "application/sparql-results+json"
response = self._post(
f"{self.endpoint}/query",
sparql,
content_type="application/sparql-query",
accept=accept,
)
if form in ("CONSTRUCT", "DESCRIBE"):
graph = Graph()
graph.parse(data=response.text, format="turtle")
return graph
return Result.parse(BytesIO(response.content), format="json")
[docs]
def update(self, sparql):
"""POST ``sparql`` to the Fuseki update endpoint.
Returns the underlying ``requests.Response`` so callers can
distinguish 200 vs. 204 outcomes, inspect custom headers, or
log the body — mirroring how :meth:`query` returns a graph or
result for callers to consume. ``raise_for_status`` has
already been called inside ``_post``, so any 4xx/5xx surface
as ``requests.HTTPError`` before this returns.
"""
return self._post(
f"{self.endpoint}/update",
sparql,
content_type="application/sparql-update",
accept="*/*",
)
[docs]
def add(self, triples, graph=None):
body = "\n".join(self._format_triple(t) for t in triples)
if graph is not None:
sparql = (
f"INSERT DATA {{ GRAPH <{graph}> {{ {body} }} }}"
)
else:
sparql = f"INSERT DATA {{ {body} }}"
self.update(sparql)
[docs]
def remove(self, pattern, graph=None):
s, p, o = pattern
triple = (
f"{self._term_or_var(s, '?s')} "
f"{self._term_or_var(p, '?p')} "
f"{self._term_or_var(o, '?o')} ."
)
if graph is not None:
sparql = (
f"DELETE WHERE {{ GRAPH <{graph}> {{ {triple} }} }}"
)
else:
sparql = f"DELETE WHERE {{ {triple} }}"
self.update(sparql)
[docs]
def clear(self, graph=None):
if graph is None:
self.update("CLEAR DEFAULT")
else:
self.update(f"CLEAR SILENT GRAPH <{graph}>")
@staticmethod
def _format_triple(triple):
s, p, o = triple
return f"{s.n3()} {p.n3()} {o.n3()} ."
@staticmethod
def _term_or_var(term, variable):
return term.n3() if term is not None else variable
def _post(self, url, body, content_type, accept):
response = self.session.post(
url,
data=body,
headers={
"Content-Type": content_type,
"Accept": accept,
},
)
response.raise_for_status()
return response
@classmethod
def _detect_query_form(cls, sparql):
"""Pick the first SPARQL query keyword that appears in the string,
ignoring SPARQL comments."""
cleaned = re.sub(r"#[^\n]*", "", sparql).upper()
for keyword in cls._QUERY_FORMS:
if re.search(rf"\b{keyword}\b", cleaned):
return keyword
raise ValueError("Cannot determine SPARQL query form")