Source code for prov.serializers.provrdf

"""PROV-RDF serializers for ProvDocument
"""
import base64
from collections import OrderedDict
import datetime
import io

import dateutil.parser

from rdflib.term import URIRef, BNode
from rdflib.term import Literal as RDFLiteral
from rdflib.graph import ConjunctiveGraph
from rdflib.namespace import RDF, RDFS, XSD

from prov import Error
import prov.model as pm
from prov.constants import (
    PROV,
    PROV_ID_ATTRIBUTES_MAP,
    PROV_N_MAP,
    PROV_BASE_CLS,
    XSD_QNAME,
    PROV_END,
    PROV_START,
    PROV_USAGE,
    PROV_GENERATION,
    PROV_DERIVATION,
    PROV_INVALIDATION,
    PROV_ALTERNATE,
    PROV_MENTION,
    PROV_DELEGATION,
    PROV_ACTIVITY,
    PROV_ATTR_STARTTIME,
    PROV_ATTR_ENDTIME,
    PROV_LOCATION,
    PROV_ATTR_TIME,
    PROV_ROLE,
    PROV_COMMUNICATION,
    PROV_ATTR_INFORMANT,
    PROV_ATTR_RESPONSIBLE,
    PROV_ATTR_TRIGGER,
    PROV_ATTR_ENDER,
    PROV_ATTR_STARTER,
    PROV_ATTR_USED_ENTITY,
    PROV_ASSOCIATION,
)
from prov.serializers import Serializer


__author__ = "Satrajit S. Ghosh"
__email__ = "satra@mit.edu"


[docs]class ProvRDFException(Error): pass
class AnonymousIDGenerator: def __init__(self): self._cache = {} self._count = 0 def get_anon_id(self, obj, local_prefix="id"): if obj not in self._cache: self._count += 1 self._cache[obj] = pm.Identifier("_:%s%d" % (local_prefix, self._count)).uri return self._cache[obj] # Reverse map for prov.model.XSD_DATATYPE_PARSERS LITERAL_XSDTYPE_MAP = { float: XSD["double"], int: XSD["int"], str: XSD["string"], # boolean, string values are supported natively by PROV-RDF # datetime values are converted separately } relation_mapper = { URIRef(PROV["alternateOf"].uri): "alternate", URIRef(PROV["actedOnBehalfOf"].uri): "delegation", URIRef(PROV["specializationOf"].uri): "specialization", URIRef(PROV["mentionOf"].uri): "mention", URIRef(PROV["wasAssociatedWith"].uri): "association", URIRef(PROV["wasDerivedFrom"].uri): "derivation", URIRef(PROV["wasAttributedTo"].uri): "attribution", URIRef(PROV["wasInformedBy"].uri): "communication", URIRef(PROV["wasGeneratedBy"].uri): "generation", URIRef(PROV["wasInfluencedBy"].uri): "influence", URIRef(PROV["wasInvalidatedBy"].uri): "invalidation", URIRef(PROV["wasEndedBy"].uri): "end", URIRef(PROV["wasStartedBy"].uri): "start", URIRef(PROV["hadMember"].uri): "membership", URIRef(PROV["used"].uri): "usage", } predicate_mapper = { RDFS.label: pm.PROV["label"], URIRef(PROV["atLocation"].uri): PROV_LOCATION, URIRef(PROV["startedAtTime"].uri): PROV_ATTR_STARTTIME, URIRef(PROV["endedAtTime"].uri): PROV_ATTR_ENDTIME, URIRef(PROV["atTime"].uri): PROV_ATTR_TIME, URIRef(PROV["hadRole"].uri): PROV_ROLE, URIRef(PROV["hadPlan"].uri): pm.PROV_ATTR_PLAN, URIRef(PROV["hadUsage"].uri): pm.PROV_ATTR_USAGE, URIRef(PROV["hadGeneration"].uri): pm.PROV_ATTR_GENERATION, URIRef(PROV["hadActivity"].uri): pm.PROV_ATTR_ACTIVITY, } def attr2rdf(attr): return URIRef(PROV[PROV_ID_ATTRIBUTES_MAP[attr].split("prov:")[1]].uri) def valid_qualified_name(bundle, value, xsd_qname=False): if value is None: return None qualified_name = bundle.valid_qualified_name(value) return qualified_name if not xsd_qname else XSD_QNAME(qualified_name)
[docs]class ProvRDFSerializer(Serializer): """ PROV-O serializer for :class:`~prov.model.ProvDocument` """
[docs] def serialize( self, stream=None, rdf_format="trig", PROV_N_MAP=PROV_N_MAP, **kwargs ): """ Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-O <https://www.w3.org/TR/prov-o/>`_. :param stream: Where to save the output. :param rdf_format: The RDF format of the output, default to TRiG. """ container = self.encode_document(self.document, PROV_N_MAP=PROV_N_MAP) newargs = kwargs.copy() newargs["format"] = rdf_format buf = io.BytesIO() try: container.serialize(buf, **newargs) buf.seek(0, 0) # Right now this is a bytestream. If the object to stream to is # a text object is must be decoded. We assume utf-8 here which # should be fine for almost every case. if isinstance(stream, io.TextIOBase): stream.write(buf.read().decode("utf-8")) else: stream.write(buf.read()) finally: buf.close()
[docs] def deserialize( self, stream, rdf_format="trig", relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, **kwargs, ): """ Deserialize from the `PROV-O <https://www.w3.org/TR/prov-o/>`_ representation to a :class:`~prov.model.ProvDocument` instance. :param stream: Input data. :param rdf_format: The RDF format of the input data, default: TRiG. """ newargs = kwargs.copy() newargs["format"] = rdf_format container = ConjunctiveGraph() container.parse(stream, **newargs) document = pm.ProvDocument() self.document = document self.decode_document( container, document, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ) return document
def valid_identifier(self, value): return self.document.valid_qualified_name(value) def encode_rdf_representation(self, value): if isinstance(value, URIRef): return value elif isinstance(value, pm.Literal): return literal_rdf_representation(value) elif isinstance(value, datetime.datetime): return RDFLiteral(value.isoformat(), datatype=XSD["dateTime"]) elif isinstance(value, pm.QualifiedName): return URIRef(value.uri) elif isinstance(value, pm.Identifier): return RDFLiteral(value.uri, datatype=XSD["anyURI"]) elif type(value) in LITERAL_XSDTYPE_MAP: return RDFLiteral(value, datatype=LITERAL_XSDTYPE_MAP[type(value)]) else: return RDFLiteral(value) def decode_rdf_representation(self, literal, graph): if isinstance(literal, RDFLiteral): value = literal.value if literal.value is not None else literal datatype = literal.datatype if hasattr(literal, "datatype") else None langtag = literal.language if hasattr(literal, "language") else None if datatype and "XMLLiteral" in datatype: value = literal if datatype and "base64Binary" in datatype: value = base64.standard_b64encode(value) if datatype == XSD["QName"]: return pm.Literal(literal, datatype=XSD_QNAME) if datatype == XSD["dateTime"]: return dateutil.parser.parse(literal) if datatype == XSD["gYear"]: return pm.Literal( dateutil.parser.parse(literal).year, datatype=self.valid_identifier(datatype), ) if datatype == XSD["gYearMonth"]: parsed_info = dateutil.parser.parse(literal) return pm.Literal( "{0}-{1:02d}".format(parsed_info.year, parsed_info.month), datatype=self.valid_identifier(datatype), ) else: # The literal of standard Python types is not converted here # It will be automatically converted when added to a record by # _auto_literal_conversion() return pm.Literal(value, self.valid_identifier(datatype), langtag) elif isinstance(literal, URIRef): rval = self.valid_identifier(literal) if rval is None: prefix, iri, _ = graph.namespace_manager.compute_qname(literal) ns = self.document.add_namespace(prefix, iri) rval = pm.QualifiedName(ns, literal.replace(ns.uri, "")) return rval else: # simple type, just return it return literal def encode_document(self, document, PROV_N_MAP=PROV_N_MAP): container = self.encode_container(document) for item in document.bundles: # encoding the sub-bundle bundle = self.encode_container( item, identifier=item.identifier.uri, PROV_N_MAP=PROV_N_MAP ) container.addN(bundle.quads()) return container def encode_container( self, bundle, PROV_N_MAP=PROV_N_MAP, container=None, identifier=None ): if container is None: container = ConjunctiveGraph(identifier=identifier) nm = container.namespace_manager nm.bind("prov", PROV.uri) for namespace in bundle.namespaces: container.bind(namespace.prefix, namespace.uri) id_generator = AnonymousIDGenerator() real_or_anon_id = ( lambda record: record._identifier.uri if record._identifier else id_generator.get_anon_id(record) ) for record in bundle._records: rec_type = record.get_type() if hasattr(record, "identifier") and record.identifier: identifier = URIRef(str(real_or_anon_id(record))) container.add((identifier, RDF.type, URIRef(rec_type.uri))) else: identifier = None if record.attributes: bnode = None formal_objects = [] used_objects = [] all_attributes = list(record.formal_attributes) + list( record.attributes ) formal_qualifiers = False for attrid, (attr, value) in enumerate(list(record.formal_attributes)): if (identifier is not None and value is not None) or ( identifier is None and value is not None and attrid > 1 ): formal_qualifiers = True has_qualifiers = len(record.extra_attributes) > 0 or formal_qualifiers for idx, (attr, value) in enumerate(all_attributes): if record.is_relation(): pred = URIRef(PROV[PROV_N_MAP[rec_type]].uri) # create bnode relation if bnode is None: valid_formal_indices = set() for idx, (key, val) in enumerate(record.formal_attributes): formal_objects.append(key) if val: valid_formal_indices.add(idx) used_objects = [record.formal_attributes[0][0]] subj = None if record.formal_attributes[0][1]: subj = URIRef(record.formal_attributes[0][1].uri) if identifier is None and subj is not None: try: obj_val = record.formal_attributes[1][1] obj_attr = URIRef( record.formal_attributes[1][0].uri ) # TODO: Why is obj_attr above not used anywhere? except IndexError: obj_val = None if obj_val and ( rec_type not in { PROV_END, PROV_START, PROV_USAGE, PROV_GENERATION, PROV_DERIVATION, PROV_ASSOCIATION, PROV_INVALIDATION, } or ( valid_formal_indices == {0, 1} and len(record.extra_attributes) == 0 ) ): used_objects.append(record.formal_attributes[1][0]) obj_val = self.encode_rdf_representation(obj_val) if rec_type == PROV_ALTERNATE: subj, obj_val = obj_val, subj container.add((subj, pred, obj_val)) if rec_type == PROV_MENTION: if record.formal_attributes[2][1]: used_objects.append( record.formal_attributes[2][0] ) obj_val = self.encode_rdf_representation( record.formal_attributes[2][1] ) container.add( ( subj, URIRef(PROV["asInBundle"].uri), obj_val, ) ) has_qualifiers = False if rec_type in [PROV_ALTERNATE]: continue if subj and (has_qualifiers or identifier): qualifier = rec_type._localpart rec_uri = rec_type.uri for attr_name, val in record.extra_attributes: if attr_name == PROV["type"]: if ( PROV["Revision"] == val or PROV["Quotation"] == val or PROV["PrimarySource"] == val ): qualifier = val._localpart rec_uri = val.uri if identifier is not None: container.remove( ( identifier, RDF.type, URIRef(rec_type.uri), ) ) QRole = URIRef(PROV["qualified" + qualifier].uri) if identifier is not None: container.add((subj, QRole, identifier)) else: bnode = identifier = BNode() container.add((subj, QRole, identifier)) container.add( (identifier, RDF.type, URIRef(rec_uri)) ) # reset identifier to BNode if value is not None and attr not in used_objects: if attr in formal_objects: pred = attr2rdf(attr) elif attr == PROV["role"]: pred = URIRef(PROV["hadRole"].uri) elif attr == PROV["plan"]: pred = URIRef(PROV["hadPlan"].uri) elif attr == PROV["type"]: pred = RDF.type elif attr == PROV["label"]: pred = RDFS.label elif isinstance(attr, pm.QualifiedName): pred = URIRef(attr.uri) else: pred = self.encode_rdf_representation(attr) if PROV["plan"].uri in pred: pred = URIRef(PROV["hadPlan"].uri) if PROV["informant"].uri in pred: pred = URIRef(PROV["activity"].uri) if PROV["responsible"].uri in pred: pred = URIRef(PROV["agent"].uri) if ( rec_type == PROV_DELEGATION and PROV["activity"].uri in pred ): pred = URIRef(PROV["hadActivity"].uri) if ( rec_type in [PROV_END, PROV_START] and PROV["trigger"].uri in pred ) or ( rec_type in [PROV_USAGE] and PROV["used"].uri in pred ): pred = URIRef(PROV["entity"].uri) if rec_type in [ PROV_GENERATION, PROV_END, PROV_START, PROV_USAGE, PROV_INVALIDATION, ]: if PROV["time"].uri in pred: pred = URIRef(PROV["atTime"].uri) if PROV["ender"].uri in pred: pred = URIRef(PROV["hadActivity"].uri) if PROV["starter"].uri in pred: pred = URIRef(PROV["hadActivity"].uri) if PROV["location"].uri in pred: pred = URIRef(PROV["atLocation"].uri) if rec_type in [PROV_ACTIVITY]: if PROV_ATTR_STARTTIME in pred: pred = URIRef(PROV["startedAtTime"].uri) if PROV_ATTR_ENDTIME in pred: pred = URIRef(PROV["endedAtTime"].uri) if rec_type == PROV_DERIVATION: if PROV["activity"].uri in pred: pred = URIRef(PROV["hadActivity"].uri) if PROV["generation"].uri in pred: pred = URIRef(PROV["hadGeneration"].uri) if PROV["usage"].uri in pred: pred = URIRef(PROV["hadUsage"].uri) if PROV["usedEntity"].uri in pred: pred = URIRef(PROV["entity"].uri) container.add( ( identifier, pred, self.encode_rdf_representation(value), ) ) continue if value is None: continue if isinstance(value, pm.ProvRecord): obj = URIRef(str(real_or_anon_id(value))) else: # Assuming this is a datetime value obj = self.encode_rdf_representation(value) if attr == PROV["location"]: pred = URIRef(PROV["atLocation"].uri) if False and isinstance(value, (URIRef, pm.QualifiedName)): if isinstance(value, pm.QualifiedName): value = URIRef(value.uri) container.add((identifier, pred, value)) else: container.add( (identifier, pred, self.encode_rdf_representation(obj)) ) continue if attr == PROV["type"]: pred = RDF.type elif attr == PROV["label"]: pred = RDFS.label elif attr == PROV_ATTR_STARTTIME: pred = URIRef(PROV["startedAtTime"].uri) elif attr == PROV_ATTR_ENDTIME: pred = URIRef(PROV["endedAtTime"].uri) else: pred = self.encode_rdf_representation(attr) container.add((identifier, pred, obj)) return container def decode_document( self, content, document, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ): for prefix, url in content.namespaces(): document.add_namespace(prefix, str(url)) if hasattr(content, "contexts"): for graph in content.contexts(): if isinstance(graph.identifier, BNode): self.decode_container( graph, document, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ) else: bundle_id = str(graph.identifier) bundle = document.bundle(bundle_id) self.decode_container( graph, bundle, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ) else: self.decode_container( content, document, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ) def decode_container( self, graph, bundle, relation_mapper=relation_mapper, predicate_mapper=predicate_mapper, ): ids = {} PROV_CLS_MAP = {} formal_attributes = {} unique_sets = {} for key, val in PROV_BASE_CLS.items(): PROV_CLS_MAP[key.uri] = PROV_BASE_CLS[key] other_attributes = {} for stmt in graph.triples((None, RDF.type, None)): id = str(stmt[0]) obj = str(stmt[2]) if obj in PROV_CLS_MAP: if not isinstance(stmt[0], BNode) and self.valid_identifier(id) is None: prefix, iri, _ = graph.namespace_manager.compute_qname(id) self.document.add_namespace(prefix, iri) try: prov_obj = PROV_CLS_MAP[obj] except AttributeError: prov_obj = None add_attr = True isderivation = ( pm.PROV["Revision"].uri in stmt[2] or pm.PROV["Quotation"].uri in stmt[2] or pm.PROV["PrimarySource"].uri in stmt[2] ) if ( id not in ids and prov_obj and ( prov_obj.uri == obj or isderivation or isinstance(stmt[0], BNode) ) ): ids[id] = prov_obj klass = pm.PROV_REC_CLS[prov_obj] formal_attributes[id] = OrderedDict( [(key, None) for key in klass.FORMAL_ATTRIBUTES] ) unique_sets[id] = OrderedDict( [(key, []) for key in klass.FORMAL_ATTRIBUTES] ) add_attr = False or ( (isinstance(stmt[0], BNode) or isderivation) and prov_obj.uri != obj ) if add_attr: if id not in other_attributes: other_attributes[id] = [] obj_formatted = self.decode_rdf_representation(stmt[2], graph) other_attributes[id].append((pm.PROV["type"], obj_formatted)) else: if id not in other_attributes: other_attributes[id] = [] obj = self.decode_rdf_representation(stmt[2], graph) other_attributes[id].append((pm.PROV["type"], obj)) for id, pred, obj in graph: id = str(id) if id not in other_attributes: other_attributes[id] = [] if pred == RDF.type: continue if pred in relation_mapper: if "alternateOf" in pred: getattr(bundle, relation_mapper[pred])(obj, id) elif "mentionOf" in pred: mentionBundle = None for stmt in graph.triples( (URIRef(id), URIRef(pm.PROV["asInBundle"].uri), None) ): mentionBundle = stmt[2] getattr(bundle, relation_mapper[pred])(id, str(obj), mentionBundle) elif "actedOnBehalfOf" in pred or "wasAssociatedWith" in pred: qualifier = ( "qualified" + relation_mapper[pred].upper()[0] + relation_mapper[pred][1:] ) qualifier_bnode = None for stmt in graph.triples( (URIRef(id), URIRef(pm.PROV[qualifier].uri), None) ): qualifier_bnode = stmt[2] if qualifier_bnode is None: getattr(bundle, relation_mapper[pred])(id, str(obj)) else: fakeys = list(formal_attributes[str(qualifier_bnode)].keys()) formal_attributes[str(qualifier_bnode)][fakeys[0]] = id formal_attributes[str(qualifier_bnode)][fakeys[1]] = str(obj) else: getattr(bundle, relation_mapper[pred])(id, str(obj)) elif id in ids: obj1 = self.decode_rdf_representation(obj, graph) if obj is not None and obj1 is None: raise ValueError(("Error transforming", obj)) pred_new = pred if pred in predicate_mapper: pred_new = predicate_mapper[pred] if ids[id] == PROV_COMMUNICATION and "activity" in str(pred_new): pred_new = PROV_ATTR_INFORMANT if ids[id] == PROV_DELEGATION and "agent" in str(pred_new): pred_new = PROV_ATTR_RESPONSIBLE if ids[id] in [PROV_END, PROV_START] and "entity" in str(pred_new): pred_new = PROV_ATTR_TRIGGER if ids[id] in [PROV_END] and "activity" in str(pred_new): pred_new = PROV_ATTR_ENDER if ids[id] in [PROV_START] and "activity" in str(pred_new): pred_new = PROV_ATTR_STARTER if ids[id] == PROV_DERIVATION and "entity" in str(pred_new): pred_new = PROV_ATTR_USED_ENTITY if str(pred_new) in [val.uri for val in formal_attributes[id]]: qname_key = self.valid_identifier(pred_new) formal_attributes[id][qname_key] = obj1 unique_sets[id][qname_key].append(obj1) if len(unique_sets[id][qname_key]) > 1: formal_attributes[id][qname_key] = None else: if "qualified" not in str(pred_new) and "asInBundle" not in str( pred_new ): other_attributes[id].append((str(pred_new), obj1)) local_key = str(obj) if local_key in ids: if "qualified" in pred: formal_attributes[local_key][ list(formal_attributes[local_key].keys())[0] ] = id for id in ids: attrs = None if id in other_attributes: attrs = other_attributes[id] items_to_walk = [] for qname, values in unique_sets[id].items(): if values and len(values) > 1: items_to_walk.append((qname, values)) if items_to_walk: for subset in list(walk(items_to_walk)): for key, value in subset.items(): formal_attributes[id][key] = value bundle.new_record(ids[id], id, formal_attributes[id], attrs) else: bundle.new_record(ids[id], id, formal_attributes[id], attrs) ids[id] = None if attrs is not None: other_attributes[id] = [] for key, val in other_attributes.items(): if val: ids[key].add_attributes(val)
[docs]def walk(children, level=0, path=None, usename=True): """Generate all the full paths in a tree, as a dict. :Example: >>> from prov.serializers.provrdf import walk >>> iterables = [('a', lambda: [1, 2]), ('b', lambda: [3, 4])] >>> [val['a'] for val in walk(iterables)] [1, 1, 2, 2] >>> [val['b'] for val in walk(iterables)] [3, 4, 3, 4] """ # Entry point if level == 0: path = {} # Exit condition if not children: yield path.copy() return # Tree recursion head, tail = children[0], children[1:] name, func = head for child in func: # We can use the arg name or the tree level as a key if usename: path[name] = child else: path[level] = child # Recurse into the next level for child_paths in walk(tail, level + 1, path, usename): yield child_paths
def literal_rdf_representation(literal): value = str(literal.value) if literal.value else literal if literal.langtag: # a language tag can only go with prov:InternationalizedString return RDFLiteral(value, lang=str(literal.langtag)) else: datatype = literal.datatype if "base64Binary" in datatype.uri: value = literal.value.encode() return RDFLiteral(value, datatype=datatype.uri)