Source code for prov.serializers.provjson

from collections import defaultdict
import datetime
import io
import json

from prov import Error
from prov.serializers import Serializer
from prov.constants import *
from prov.model import (
    Literal,
    Identifier,
    QualifiedName,
    Namespace,
    ProvDocument,
    ProvBundle,
    first,
    parse_xsd_datetime,
)

import logging

logger = logging.getLogger(__name__)

__author__ = "Trung Dong Huynh"
__email__ = "trungdong@donggiang.com"


[docs]class ProvJSONException(Error): pass
class AnonymousIDGenerator: def __init__(self): self._cache = {} self._count = 0 def get_anon_id(self, obj, local_prefix="id"): if obj not in self._cache: self._count += 1 self._cache[obj] = Identifier("_:%s%d" % (local_prefix, self._count)) return self._cache[obj] # Reverse map for prov.model.XSD_DATATYPE_PARSERS LITERAL_XSDTYPE_MAP = { float: "xsd:double", int: "xsd:int" # boolean, string values are supported natively by PROV-JSON # datetime values are converted separately }
[docs]class ProvJSONSerializer(Serializer): """ PROV-JSON serializer for :class:`~prov.model.ProvDocument` """
[docs] def serialize(self, stream, **kwargs): """ Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-JSON <https://openprovenance.org/prov-json/>`_. :param stream: Where to save the output. """ buf = io.StringIO() try: json.dump(self.document, buf, cls=ProvJSONEncoder, **kwargs) buf.seek(0, 0) # Right now this is a bytestream. If the object to stream to is # a text object is must be decoded. We assume utf-8 here which # should be fine for almost every case. if isinstance(stream, io.TextIOBase): stream.write(buf.read()) else: stream.write(buf.read().encode("utf-8")) finally: buf.close()
[docs] def deserialize(self, stream, **kwargs): """ Deserialize from the `PROV JSON <https://openprovenance.org/prov-json/>`_ representation to a :class:`~prov.model.ProvDocument` instance. :param stream: Input data. """ if not isinstance(stream, io.TextIOBase): buf = io.StringIO(stream.read().decode("utf-8")) stream = buf return json.load(stream, cls=ProvJSONDecoder, **kwargs)
[docs]class ProvJSONEncoder(json.JSONEncoder):
[docs] def default(self, o): if isinstance(o, ProvDocument): return encode_json_document(o) else: return super(ProvJSONEncoder, self).encode(o)
[docs]class ProvJSONDecoder(json.JSONDecoder):
[docs] def decode(self, s, *args, **kwargs): container = super(ProvJSONDecoder, self).decode(s, *args, **kwargs) document = ProvDocument() decode_json_document(container, document) return document
# Encoding/decoding functions def valid_qualified_name(bundle, value): if value is None: return None qualified_name = bundle.valid_qualified_name(value) return qualified_name def encode_json_document(document): container = encode_json_container(document) for bundle in document.bundles: # encoding the sub-bundle bundle_json = encode_json_container(bundle) container["bundle"][str(bundle.identifier)] = bundle_json return container def encode_json_container(bundle): container = defaultdict(dict) prefixes = {} for namespace in bundle._namespaces.get_registered_namespaces(): prefixes[namespace.prefix] = namespace.uri if bundle._namespaces._default: prefixes["default"] = bundle._namespaces._default.uri if prefixes: container["prefix"] = prefixes id_generator = AnonymousIDGenerator() def real_or_anon_id(r): return r._identifier if r._identifier else id_generator.get_anon_id(r) for record in bundle._records: rec_type = record.get_type() rec_label = PROV_N_MAP[rec_type] identifier = str(real_or_anon_id(record)) record_json = {} if record._attributes: for (attr, values) in record._attributes.items(): if not values: continue attr_name = str(attr) if attr in PROV_ATTRIBUTE_QNAMES: # TODO: QName export record_json[attr_name] = str(first(values)) elif attr in PROV_ATTRIBUTE_LITERALS: record_json[attr_name] = first(values).isoformat() else: if len(values) == 1: # single value record_json[attr_name] = encode_json_representation( first(values) ) else: # multiple values record_json[attr_name] = list( encode_json_representation(value) for value in values ) # Check if the container already has the id of the record if identifier not in container[rec_label]: # this is the first instance, just put in the new record container[rec_label][identifier] = record_json else: # the container already has some record(s) of the same identifier # check if this is the second instance current_content = container[rec_label][identifier] if hasattr(current_content, "items"): # this is a dict, make it a singleton list container[rec_label][identifier] = [current_content] # now append the new record to the list container[rec_label][identifier].append(record_json) return container def decode_json_document(content, document): bundles = dict() if "bundle" in content: bundles = content["bundle"] del content["bundle"] decode_json_container(content, document) for bundle_id, bundle_content in bundles.items(): bundle = ProvBundle(document=document) decode_json_container(bundle_content, bundle) document.add_bundle(bundle, bundle.valid_qualified_name(bundle_id)) def decode_json_container(jc, bundle): if "prefix" in jc: prefixes = jc["prefix"] for prefix, uri in prefixes.items(): if prefix != "default": bundle.add_namespace(Namespace(prefix, uri)) else: bundle.set_default_namespace(uri) del jc["prefix"] for rec_type_str in jc: rec_type = PROV_RECORD_IDS_MAP[rec_type_str] for rec_id, content in jc[rec_type_str].items(): if hasattr(content, "items"): # it is a dict # There is only one element, create a singleton list elements = [content] else: # expect it to be a list of dictionaries elements = content for element in elements: attributes = dict() other_attributes = [] # this is for the multiple-entity membership hack to come membership_extra_members = None for attr_name, values in element.items(): attr = ( PROV_ATTRIBUTES_ID_MAP[attr_name] if attr_name in PROV_ATTRIBUTES_ID_MAP else valid_qualified_name(bundle, attr_name) ) if attr in PROV_ATTRIBUTES: if isinstance(values, list): # only one value is allowed if len(values) > 1: # unless it is the membership hack if ( rec_type == PROV_MEMBERSHIP and attr == PROV_ATTR_ENTITY ): # This is a membership relation with # multiple entities # HACK: create multiple membership # relations, one for each entity # Store all the extra entities membership_extra_members = values[1:] # Create the first membership relation as # normal for the first entity value = values[0] else: error_msg = ( "The prov package does not support PROV" " attributes having multiple values." ) logger.error(error_msg) raise ProvJSONException(error_msg) else: value = values[0] else: value = values value = ( valid_qualified_name(bundle, value) if attr in PROV_ATTRIBUTE_QNAMES else parse_xsd_datetime(value) ) attributes[attr] = value else: if isinstance(values, list): other_attributes.extend( (attr, decode_json_representation(value, bundle)) for value in values ) else: # single value other_attributes.append( (attr, decode_json_representation(values, bundle)) ) bundle.new_record(rec_type, rec_id, attributes, other_attributes) # HACK: creating extra (unidentified) membership relations if membership_extra_members: collection = attributes[PROV_ATTR_COLLECTION] for member in membership_extra_members: bundle.membership( collection, valid_qualified_name(bundle, member) ) def encode_json_representation(value): if isinstance(value, Literal): return literal_json_representation(value) elif isinstance(value, datetime.datetime): return {"$": value.isoformat(), "type": "xsd:dateTime"} elif isinstance(value, QualifiedName): # TODO Manage prefix in the whole structure consistently # TODO QName export return {"$": str(value), "type": PROV_QUALIFIEDNAME._str} elif isinstance(value, Identifier): return {"$": value.uri, "type": "xsd:anyURI"} elif type(value) in LITERAL_XSDTYPE_MAP: return {"$": value, "type": LITERAL_XSDTYPE_MAP[type(value)]} else: return value def decode_json_representation(literal, bundle): if isinstance(literal, dict): # complex type value = literal["$"] datatype = literal["type"] if "type" in literal else None datatype = valid_qualified_name(bundle, datatype) langtag = literal["lang"] if "lang" in literal else None if datatype == XSD_ANYURI: return Identifier(value) elif datatype == PROV_QUALIFIEDNAME: return valid_qualified_name(bundle, value) else: # The literal of standard Python types is not converted here # It will be automatically converted when added to a record by # _auto_literal_conversion() return Literal(value, datatype, langtag) else: # simple type, just return it return literal def literal_json_representation(literal): # TODO: QName export value, datatype, langtag = literal.value, literal.datatype, literal.langtag if langtag: return {"$": value, "lang": langtag} else: return {"$": value, "type": str(datatype)}