Source code for djangordf.backends.fuseki

"""SPARQL 1.1 HTTP triple-store backend for Apache Jena Fuseki and similar."""
import re
from io import BytesIO
from typing import Optional

import requests
from rdflib import Graph
from rdflib.query import Result

from .base import TripleStoreBackend


[docs] class FusekiBackend(TripleStoreBackend): """Triple-store backend that talks SPARQL 1.1 HTTP to a remote endpoint. Compatible with Apache Jena Fuseki and any other store that implements the SPARQL 1.1 Protocol (Blazegraph, GraphDB, Stardog). """ _QUERY_FORMS = ("CONSTRUCT", "DESCRIBE", "SELECT", "ASK") def __init__( self, endpoint: str, user: Optional[str] = None, password: Optional[str] = None, ) -> None: self.endpoint = endpoint.rstrip("/") self.session = requests.Session() if user is not None and password is not None: self.session.auth = (user, password)
[docs] def query(self, sparql): form = self._detect_query_form(sparql) if form in ("CONSTRUCT", "DESCRIBE"): accept = "text/turtle" else: accept = "application/sparql-results+json" response = self._post( f"{self.endpoint}/query", sparql, content_type="application/sparql-query", accept=accept, ) if form in ("CONSTRUCT", "DESCRIBE"): graph = Graph() graph.parse(data=response.text, format="turtle") return graph return Result.parse(BytesIO(response.content), format="json")
[docs] def update(self, sparql): """POST ``sparql`` to the Fuseki update endpoint. Returns the underlying ``requests.Response`` so callers can distinguish 200 vs. 204 outcomes, inspect custom headers, or log the body — mirroring how :meth:`query` returns a graph or result for callers to consume. ``raise_for_status`` has already been called inside ``_post``, so any 4xx/5xx surface as ``requests.HTTPError`` before this returns. """ return self._post( f"{self.endpoint}/update", sparql, content_type="application/sparql-update", accept="*/*", )
[docs] def add(self, triples, graph=None): body = "\n".join(self._format_triple(t) for t in triples) if graph is not None: sparql = ( f"INSERT DATA {{ GRAPH <{graph}> {{ {body} }} }}" ) else: sparql = f"INSERT DATA {{ {body} }}" self.update(sparql)
[docs] def remove(self, pattern, graph=None): s, p, o = pattern triple = ( f"{self._term_or_var(s, '?s')} " f"{self._term_or_var(p, '?p')} " f"{self._term_or_var(o, '?o')} ." ) if graph is not None: sparql = ( f"DELETE WHERE {{ GRAPH <{graph}> {{ {triple} }} }}" ) else: sparql = f"DELETE WHERE {{ {triple} }}" self.update(sparql)
[docs] def clear(self, graph=None): if graph is None: self.update("CLEAR DEFAULT") else: self.update(f"CLEAR SILENT GRAPH <{graph}>")
@staticmethod def _format_triple(triple): s, p, o = triple return f"{s.n3()} {p.n3()} {o.n3()} ." @staticmethod def _term_or_var(term, variable): return term.n3() if term is not None else variable def _post(self, url, body, content_type, accept): response = self.session.post( url, data=body, headers={ "Content-Type": content_type, "Accept": accept, }, ) response.raise_for_status() return response @classmethod def _detect_query_form(cls, sparql): """Pick the first SPARQL query keyword that appears in the string, ignoring SPARQL comments.""" cleaned = re.sub(r"#[^\n]*", "", sparql).upper() for keyword in cls._QUERY_FORMS: if re.search(rf"\b{keyword}\b", cleaned): return keyword raise ValueError("Cannot determine SPARQL query form")