Source code for djangordf.manager

"""Default object manager bound to each RDFModel subclass.

``RDFManager`` provides the Django-style ``Model.objects`` entry point:
``create``, ``get``, ``save``, ``delete``, ``all`` and ``filter``. Reads
hydrate instances by dispatching the CONSTRUCTed graph through each
property's ``from_rdf``. ``all`` and ``filter`` return a lazy
``RDFQuerySet`` that only hits the store on iteration or terminal
methods (``len``, ``count``, ``first``).
"""
from typing import List

from rdflib import BNode, Literal, URIRef

from .conf import get_backend


_FILTER_DUMMY_SUBJECT = URIRef("urn:_djangordf:_filter:dummy")


_KNOWN_LOOKUP_SUFFIXES = frozenset({
    "exact", "iexact",
    "contains", "icontains",
    "startswith", "istartswith",
    "endswith", "iendswith",
    "in",
    "gt", "gte", "lt", "lte",
    "regex", "iregex",
    "isnull",
    "year", "month", "day",
    "hour", "minute", "second",
})


def _peel_lookup_suffix(segments):
    """Return ``(path_segments, suffix)``. A suffix is peeled only when
    the key has at least two ``__``-separated segments, so a single
    attribute name that happens to collide with a suffix is still
    treated as the attribute name."""
    if len(segments) >= 2 and segments[-1] in _KNOWN_LOOKUP_SUFFIXES:
        return segments[:-1], segments[-1]
    return segments, "exact"


def _term(term):
    return term.n3()


def _format_triple(triple):
    s, p, o = triple
    return f"{_term(s)} {_term(p)} {_term(o)} ."


def _render_sparql_term(term) -> str:
    """Render a SPARQL term. ``?var`` strings pass through verbatim;
    anything else is rdflib-serialised via ``.n3()``."""
    if isinstance(term, str) and term.startswith("?"):
        return term
    return term.n3()


def _render_triple(s, p, o) -> str:
    """Render a triple pattern. Predicate is always rendered as an
    angle-bracketed IRI; subject and object go through
    :func:`_render_sparql_term`."""
    return (
        f"{_render_sparql_term(s)} "
        f"<{p}> "
        f"{_render_sparql_term(o)} ."
    )


[docs] class RDFManager: """Object manager attached to ``cls.objects`` by ``RDFModelMeta``.""" def __init__(self, model_class): self.model_class = model_class self._backend = None @property def backend(self): if self._backend is None: self._backend = get_backend() return self._backend # -- write side ---------------------------------------------------------
[docs] def create(self, **kwargs): instance = self.model_class(**kwargs) instance.save() return instance
[docs] def save(self, instance) -> None: from .signals import post_save, pre_save pre_save.send(sender=type(instance), instance=instance) graph_iri = instance._meta.graph_iri iri = instance.iri triples = list(instance._to_triples()) mirror_triples, inverse_predicates = self._mirror_writes(instance) triples.extend(mirror_triples) body = "\n".join(_format_triple(t) for t in triples) statements = [ f"WITH <{graph_iri}> " f"DELETE {{ <{iri}> ?p ?o }} WHERE {{ <{iri}> ?p ?o }}" ] for inv_pred in inverse_predicates: statements.append( f"WITH <{graph_iri}> " f"DELETE {{ ?s <{inv_pred}> <{iri}> }} " f"WHERE {{ ?s <{inv_pred}> <{iri}> }}" ) statements.append( f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {body} }} }}" ) self.backend.update(" ;".join(statements)) post_save.send(sender=type(instance), instance=instance)
[docs] def delete(self, instance) -> None: from .signals import post_delete, pre_delete pre_delete.send(sender=type(instance), instance=instance) graph_iri = instance._meta.graph_iri iri = instance.iri statements = [ f"WITH <{graph_iri}> " f"DELETE {{ <{iri}> ?p ?o }} WHERE {{ <{iri}> ?p ?o }}" ] for inv_pred in self._inverse_predicates(instance): statements.append( f"WITH <{graph_iri}> " f"DELETE {{ ?s <{inv_pred}> <{iri}> }} " f"WHERE {{ ?s <{inv_pred}> <{iri}> }}" ) self.backend.update(" ;".join(statements)) post_delete.send(sender=type(instance), instance=instance)
# -- bulk operations ----------------------------------------------------
[docs] def bulk_create(self, instances): """Persist many instances in one SPARQL update. Mints IRIs for any instance whose ``iri`` is ``None``. Issues a single ``INSERT DATA`` block; **does not** fire signals and **does not** emit inverse-property mirror triples (see #60). Intended for new objects only — re-running it on existing IRIs would add duplicate triples. Use :meth:`bulk_update` when persisting changes to existing instances. """ import uuid instances = list(instances) if not instances: return instances all_triples = [] graph_iri = None for instance in instances: if instance.iri is None: instance.iri = URIRef( f"{instance._meta.namespace}{uuid.uuid4().hex}" ) if graph_iri is None: graph_iri = instance._meta.graph_iri all_triples.extend(instance._to_triples()) body = "\n".join(_format_triple(t) for t in all_triples) self.backend.update( f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {body} }} }}" ) return instances
[docs] def bulk_update(self, instances): """Re-persist many existing instances in one SPARQL update. Each instance must already carry an ``iri``. Issues one multi-statement update with ``DELETE { <iri> ?p ?o } WHERE { <iri> ?p ?o } ; INSERT DATA { ... }`` per instance. **Does not** fire signals and **does not** emit inverse-property mirror triples (see #60). """ instances = list(instances) if not instances: return instances statements = [] graph_iri = None for instance in instances: if instance.iri is None: raise ValueError( "bulk_update requires each instance to carry an " "iri; use bulk_create for new instances" ) if graph_iri is None: graph_iri = instance._meta.graph_iri iri = instance.iri statements.append( f"WITH <{graph_iri}> " f"DELETE {{ <{iri}> ?p ?o }} WHERE {{ <{iri}> ?p ?o }}" ) triples = list(instance._to_triples()) body = "\n".join(_format_triple(t) for t in triples) statements.append( f"INSERT DATA {{ GRAPH <{graph_iri}> {{ {body} }} }}" ) self.backend.update(" ;".join(statements)) return instances
[docs] def bulk_delete(self, instances): """Strip many instances in one SPARQL update. Issues one multi-statement update with one ``DELETE WHERE`` per IRI. **Does not** fire signals and **does not** strip inverse-property mirror triples (see #60). """ instances = list(instances) if not instances: return 0 statements = [] for instance in instances: if instance.iri is None: raise ValueError( "bulk_delete requires each instance to carry an iri" ) graph_iri = instance._meta.graph_iri iri = instance.iri statements.append( f"WITH <{graph_iri}> " f"DELETE {{ <{iri}> ?p ?o }} WHERE {{ <{iri}> ?p ?o }}" ) self.backend.update(" ;".join(statements)) return len(instances)
def _inverse_properties(self, instance): """Yield ``(prop, inverse_predicate)`` for every ObjectProperty on this model that declares an ``inverse=``.""" from .properties import ObjectProperty properties = getattr(self.model_class, "_properties", None) or {} for prop in properties.values(): if not isinstance(prop, ObjectProperty) or prop.inverse is None: continue inv_pred = prop.inverse_predicate if inv_pred is None: continue yield prop, inv_pred def _inverse_predicates(self, instance): """Distinct inverse predicates declared on this model.""" seen = [] for _, inv_pred in self._inverse_properties(instance): if inv_pred not in seen: seen.append(inv_pred) return seen def _mirror_writes(self, instance): """Build the mirror triples and the inverse-predicate list for ``instance``. The triples flow into the INSERT DATA body; the predicates drive the extra DELETE statements that strip stale mirror triples on the (potentially different) target subjects.""" mirror_triples = [] inverse_predicates = [] for prop, inv_pred in self._inverse_properties(instance): if inv_pred not in inverse_predicates: inverse_predicates.append(inv_pred) value = getattr(instance, prop.attr_name, None) if value is None: continue if prop.many: targets = value else: targets = [value] for target in targets: target_iri = self._iri_of_value(target) mirror_triples.append((target_iri, inv_pred, instance.iri)) return mirror_triples, inverse_predicates @staticmethod def _iri_of_value(value): if isinstance(value, URIRef): return value return URIRef(value.iri) # -- read side ----------------------------------------------------------
[docs] def get(self, iri): iri = URIRef(iri) graph_iri = self.model_class._meta.graph_iri forward_sparql = ( f"CONSTRUCT {{ <{iri}> ?p ?o }} WHERE {{ " f"GRAPH <{graph_iri}> {{ <{iri}> ?p ?o }} }}" ) graph = self.backend.query(forward_sparql) if len(graph) == 0: raise self.model_class.DoesNotExist(str(iri)) if self._has_reverse_properties(): reverse_sparql = ( f"CONSTRUCT {{ ?s ?p <{iri}> }} WHERE {{ " f"GRAPH <{graph_iri}> {{ ?s ?p <{iri}> }} }}" ) reverse_graph = self.backend.query(reverse_sparql) for triple in reverse_graph: graph.add(triple) instance = self.model_class(iri=iri) self._hydrate(instance, graph, iri) return instance
def _has_reverse_properties(self) -> bool: from .properties import ObjectProperty return any( isinstance(p, ObjectProperty) and p.reverse for p in getattr(self.model_class, "_properties", {}).values() ) def _hydrate(self, instance, graph, subject) -> None: for attr, prop in self.model_class._properties.items(): if prop.predicate is None: continue setattr(instance, attr, prop.from_rdf(graph, subject))
[docs] def all(self) -> "RDFQuerySet": return RDFQuerySet(self)
[docs] def filter(self, *q_args, **kwargs) -> "RDFQuerySet": from .query import Q if not q_args and not kwargs: return RDFQuerySet(self) top = Q(*q_args, **kwargs) return RDFQuerySet(self, q=top)
def _emit_leaf(self, key, value, current_var, current_cls, counter): """Render one ``(key, value)`` filter leaf as a list of SPARQL pattern strings (one per triple / FILTER). ``counter`` is a single-element list so callers share the same monotonic variable-name source across the entire Q walk.""" raw_segments = key.split("__") path_segments, suffix = _peel_lookup_suffix(raw_segments) lines: List[str] = [] for i, segment in enumerate(path_segments): prop = self._resolve_segment(current_cls, segment) is_reverse = getattr(prop, "reverse", False) if i == len(path_segments) - 1: if suffix == "exact": obj_term = self._object_term(prop, value) if is_reverse: lines.append( _render_triple(obj_term, prop.predicate, current_var) ) else: lines.append( _render_triple(current_var, prop.predicate, obj_term) ) elif suffix == "isnull": counter[0] += 1 anon_var = f"?v{counter[0]}" if is_reverse: triple = _render_triple( anon_var, prop.predicate, current_var, ) else: triple = _render_triple( current_var, prop.predicate, anon_var, ) if bool(value): lines.append( f"FILTER NOT EXISTS {{ {triple} }}" ) else: lines.append(triple) else: counter[0] += 1 terminal_var = f"?v{counter[0]}" if is_reverse: lines.append( _render_triple(terminal_var, prop.predicate, current_var) ) else: lines.append( _render_triple(current_var, prop.predicate, terminal_var) ) lines.append( f"FILTER(" f"{self._build_filter_clause(terminal_var, suffix, value, prop)}" f")" ) break from .properties import ObjectProperty if not isinstance(prop, ObjectProperty): raise ValueError( f"non-terminal lookup segment {segment!r} on " f"{current_cls.__name__} is not an ObjectProperty; " f"cannot span" ) counter[0] += 1 next_var = f"?v{counter[0]}" if is_reverse: lines.append( _render_triple(next_var, prop.predicate, current_var) ) else: lines.append( _render_triple(current_var, prop.predicate, next_var) ) current_var = next_var current_cls = prop.target_class return lines def _emit_q(self, q, current_var, current_cls, counter) -> str: """Recursively render a ``Q`` tree as a SPARQL fragment string.""" from .query import Q child_fragments: List[str] = [] for child in q.children: if isinstance(child, Q): child_fragments.append( self._emit_q(child, current_var, current_cls, counter) ) else: key, value = child lines = self._emit_leaf( key, value, current_var, current_cls, counter ) child_fragments.append(" ".join(lines)) if q.connector == Q.OR: body = " UNION ".join( f"{{ {f} }}" for f in child_fragments if f ) else: body = " ".join(f for f in child_fragments if f) if q.negated: return f"FILTER NOT EXISTS {{ {body} }}" return body def _build_filter_clause(self, var, suffix, value, prop) -> str: """Render the SPARQL FILTER expression for a suffix lookup against ``var``. The expression is returned without its outer ``FILTER( … )`` wrapper — the queryset adds that on render.""" if suffix == "iexact": return ( f"LCASE(STR({var})) = LCASE({Literal(str(value)).n3()})" ) if suffix == "contains": return f"CONTAINS(STR({var}), {Literal(str(value)).n3()})" if suffix == "icontains": return ( f"CONTAINS(LCASE(STR({var})), " f"LCASE({Literal(str(value)).n3()}))" ) if suffix == "startswith": return f"STRSTARTS(STR({var}), {Literal(str(value)).n3()})" if suffix == "istartswith": return ( f"STRSTARTS(LCASE(STR({var})), " f"LCASE({Literal(str(value)).n3()}))" ) if suffix == "endswith": return f"STRENDS(STR({var}), {Literal(str(value)).n3()})" if suffix == "iendswith": return ( f"STRENDS(LCASE(STR({var})), " f"LCASE({Literal(str(value)).n3()}))" ) if suffix == "in": try: items = list(value) except TypeError as exc: raise TypeError( f"__in expects an iterable; got {type(value).__name__}" ) from exc rendered = ", ".join( self._object_term(prop, item).n3() for item in items ) return f"{var} IN ({rendered})" if suffix == "regex": return f"REGEX(STR({var}), {Literal(str(value)).n3()})" if suffix == "iregex": return f"REGEX(STR({var}), {Literal(str(value)).n3()}, \"i\")" if suffix in {"year", "month", "day", "hour", "minute", "second"}: fn = { "year": "YEAR", "month": "MONTH", "day": "DAY", "hour": "HOURS", "minute": "MINUTES", "second": "SECONDS", }[suffix] return f"{fn}({var}) = {Literal(int(value)).n3()}" op = {"gt": ">", "gte": ">=", "lt": "<", "lte": "<="}[suffix] return f"{var} {op} {self._object_term(prop, value).n3()}" @staticmethod def _resolve_segment(cls, segment): try: prop = cls._properties[segment] except KeyError as exc: raise ValueError( f"Unknown attribute {segment!r} on {cls.__name__}" ) from exc if prop.predicate is None: raise ValueError( f"Property {segment!r} on {cls.__name__} has no " f"predicate; cannot use it in filter()" ) return prop def _object_term(self, prop, value): if isinstance(value, (URIRef, Literal, BNode)): return value from .models import RDFModel from .namespaces import LangString if isinstance(value, RDFModel) and value.iri is not None: return URIRef(value.iri) if isinstance(value, LangString): return Literal(value.value, lang=value.lang) # ``prop.to_rdf`` of a ``many=True`` property iterates over # ``value`` — that's wrong for filter, where the user supplied a # single scalar to compare against. Build a single-value triple # by temporarily flipping the cardinality. original_many = getattr(prop, "many", False) prop.many = False try: triples = prop.to_rdf(_FILTER_DUMMY_SUBJECT, value) finally: prop.many = original_many if not triples: raise ValueError( f"Cannot serialise {value!r} for property {prop.attr_name!r}" ) return triples[0][2]
[docs] class RDFQuerySet: """Lazy queryset over an ``RDFManager``. Materialises on iteration / ``len`` / ``count`` / ``first`` / ``__getitem__(int)`` by issuing ``SELECT DISTINCT ?s`` to enumerate matching subjects, then calling ``manager.get(s)`` per subject. ``order_by`` and slicing are both chainable and lazy — they return new querysets carrying ``ORDER BY`` / ``LIMIT`` / ``OFFSET`` state that the SPARQL builder honours on next materialisation. """ def __init__( self, manager: RDFManager, q=None, order_by: tuple = (), limit=None, offset=None, ): self._manager = manager self._q = q self._order_by = tuple(order_by) self._limit = limit self._offset = offset self._results_cache = None def _clone(self, **overrides): defaults = dict( q=self._q, order_by=self._order_by, limit=self._limit, offset=self._offset, ) defaults.update(overrides) return RDFQuerySet(self._manager, **defaults) # -- chainable surface --------------------------------------------------
[docs] def order_by(self, *fields): return self._clone(order_by=tuple(fields))
# -- SPARQL construction ------------------------------------------------ def _build_subject_sparql(self) -> str: model = self._manager.model_class graph_iri = model._meta.graph_iri class_iri = model._meta.class_iri counter = [0] parts = [f"?s a <{class_iri}> ."] if self._q is not None: fragment = self._manager._emit_q( self._q, "?s", model, counter, ) if fragment: parts.append(fragment) select_vars = ["?s"] order_tokens = [] for field in self._order_by: descending = field.startswith("-") attr = field.lstrip("-") prop = self._manager._resolve_segment(model, attr) counter[0] += 1 ord_var = f"?ord_{counter[0]}" parts.append( _render_triple("?s", prop.predicate, ord_var) ) select_vars.append(ord_var) order_tokens.append( f"DESC({ord_var})" if descending else ord_var ) body = " ".join(parts) select = "SELECT DISTINCT " + " ".join(select_vars) sparql = ( f"{select} WHERE {{ GRAPH <{graph_iri}> {{ {body} }} }}" ) if order_tokens: sparql += " ORDER BY " + " ".join(order_tokens) if self._limit is not None: sparql += f" LIMIT {self._limit}" if self._offset is not None: sparql += f" OFFSET {self._offset}" return sparql # -- materialisation ---------------------------------------------------- def _fetch(self): if self._results_cache is not None: return self._results_cache sparql = self._build_subject_sparql() result = self._manager.backend.query(sparql) subjects = list(dict.fromkeys(row[0] for row in result)) self._results_cache = [self._manager.get(s) for s in subjects] return self._results_cache def __iter__(self): return iter(self._fetch()) def __len__(self) -> int: return len(self._fetch())
[docs] def count(self) -> int: return len(self)
[docs] def first(self): items = list(self._clone(limit=1)) return items[0] if items else None
# -- slicing / indexing ------------------------------------------------- def __getitem__(self, key): if isinstance(key, slice): if key.step is not None: raise TypeError( "RDFQuerySet does not support step in slicing" ) start = key.start or 0 stop = key.stop if start < 0 or (stop is not None and stop < 0): raise IndexError( "RDFQuerySet does not support negative indices" ) new_offset = (self._offset or 0) + start if stop is None: new_limit = self._limit else: window = stop - start if window < 0: window = 0 if self._limit is not None: remaining = max(self._limit - start, 0) new_limit = min(window, remaining) else: new_limit = window return self._clone( limit=new_limit, offset=new_offset if new_offset else None, ) if isinstance(key, int): if key < 0: raise IndexError( "RDFQuerySet does not support negative indices" ) items = list(self[key:key + 1]) if not items: raise IndexError(key) return items[0] raise TypeError( f"RDFQuerySet indices must be int or slice, not " f"{type(key).__name__}" )