Source code for metsrw.mets

import logging
import os
import sys
from collections import namedtuple
from collections import OrderedDict
from datetime import datetime

from lxml import etree

from . import exceptions
from . import fsentry
from . import metadata
from . import utils


LOGGER = logging.getLogger(__name__)

AIP_ENTRY_TYPE = "archival information package"
FPtr = namedtuple(
    "FPtr",
    "file_uuid derived_from use path amdids dmdids checksum checksumtype fileid transform_files",
)
TRANSFORM_PREFIX = "TRANSFORM"
TRANSFORM_PREFIX_LEN = len(TRANSFORM_PREFIX)
DEFAULT_FILESEC_GROUPS_ORDER = [
    "original",
    "submissionDocumentation",
    "preservation",
    "service",
    "access",
    "license",
    "text/ocr",
    "metadata",
    "derivative",
]


[docs]class METSDocument: def __init__(self): # Stores the ElementTree if this was parsed from an existing file self.tree = None # Only root-level elements are stored, since the rest # can be inferred via their #children attribute self.createdate = None self.objid = None self.alternate_ids = [] self._root_elements = [] self._all_files = None self._iter = None self.dmdsecs = [] self.amdsecs = [] self.agents = [] self._custom_structmaps = []
[docs] @classmethod def read(cls, source): """Read ``source`` into a ``METSDocument`` instance. This is an instance constructor. The ``source`` may be a path to a METS file, a file-like object, or a string of XML. """ if hasattr(source, "read"): return cls.fromfile(source) if os.path.exists(source): return cls.fromfile(source) if isinstance(source, str): source = source.encode("utf8") return cls.fromstring(source)
# FSENTRYS def _collect_all_files(self, files=None): """ Collect all FSEntrys into a set, including all descendants. :param list files: List of :class:`FSEntry` to traverse. :returns: Set of FSEntry """ if files is None: files = self._root_elements collected = set() for entry in files: collected.add(entry) collected.update(self._collect_all_files(entry.children)) return collected
[docs] def all_files(self): """ Return a set of all FSEntrys in this METS document. :returns: Set containing all :class:`FSEntry` in this METS document, including descendants of ones explicitly added. """ # FIXME cache this. Should not break when add_child is called on an # element already in the document. return self._collect_all_files(self._root_elements)
[docs] def get_subsections_counts(self): """ Return a dictionary with the count of the following subsections: dmdSec, amdSec, techMD, rightsMD, digiprovMD and sourceMDs. :returns: Dict with subsections counts. """ subsections = ("techMD", "rightsMD", "digiprovMD", "sourceMD") counts = { "dmdSec": 0, "amdSec": 0, "techMD": 0, "rightsMD": 0, "digiprovMD": 0, "sourceMD": 0, } for entry in self.all_files(): counts["dmdSec"] += len(entry.dmdsecs) counts["amdSec"] += len(entry.amdsecs) for amdsec in entry.amdsecs: for subsection in amdsec.subsections: if subsection.subsection in subsections: counts[subsection.subsection] += 1 return counts
[docs] def get_file(self, **kwargs): """ Return the FSEntry that matches parameters. :param str file_uuid: UUID of the target FSEntry. :param str label: structMap LABEL of the target FSEntry. :param str type: structMap TYPE of the target FSEntry. :returns: :class:`FSEntry` that matches parameters, or None. """ # TODO put this in a sqlite DB so it can be queried efficiently # TODO handle multiple matches (with DB?) # TODO check that kwargs are actual attrs for entry in self.all_files(): if all(value == getattr(entry, key) for key, value in kwargs.items()): return entry return None
[docs] def append_file(self, fs_entry): """ Adds an FSEntry object to this METS document's tree. Any of the represented object's children will also be added to the document. A given FSEntry object can only be included in a document once, and any attempt to add an object the second time will be ignored. :param metsrw.mets.FSEntry fs_entry: FSEntry to add to the METS document """ if fs_entry in self._root_elements: return self._root_elements.append(fs_entry) # Reset file lists so they get regenerated with the new files(s) self._all_files = None
append = append_file
[docs] def remove_entry(self, fs_entry): """Removes an FSEntry object from this METS document. Any children of this FSEntry will also be removed. This will be removed as a child of it's parent, if any. :param metsrw.mets.FSEntry fs_entry: FSEntry to remove from the METS """ try: self._root_elements.remove(fs_entry) except ValueError: # fs_entry may not be in the root elements pass if fs_entry.parent: fs_entry.parent.remove_child(fs_entry) # Reset file lists so they get regenerated without the removed file(s) self._all_files = None
remove = remove_entry # The following methods allow us to iterate over the FSEntry instances of a # METSDocument---``for fsentry in mets: ...``---, count # them---``len(mets)``---, and fetch them by # index---``my_fsentry = mets[21]``. def __iter__(self): return self def __len__(self): return len(self.all_files()) def _get_all_files_list(self): return sorted(self.all_files(), key=lambda fsentry: fsentry.path or "") def __getitem__(self, index): return self._get_all_files_list()[index] def __next__(self): # Py3-style iterator interface if self._iter is None: self._iter = iter(self._get_all_files_list()) return next(self._iter) # SERIALIZE def _document_root(self, fully_qualified=True): """ Return the mets Element for the document root. """ nsmap = {"xsi": utils.NAMESPACES["xsi"], "xlink": utils.NAMESPACES["xlink"]} if fully_qualified: nsmap["mets"] = utils.NAMESPACES["mets"] else: nsmap[None] = utils.NAMESPACES["mets"] attrib = { "{}schemaLocation".format(utils.lxmlns("xsi")): utils.SCHEMA_LOCATIONS } if self.objid: attrib["OBJID"] = self.objid return etree.Element(utils.lxmlns("mets") + "mets", nsmap=nsmap, attrib=attrib) def _mets_header(self, now): """ Return the metsHdr Element. """ header_tag = etree.QName(utils.NAMESPACES["mets"], "metsHdr") header_attrs = {} if self.createdate is None: header_attrs["CREATEDATE"] = now else: header_attrs["CREATEDATE"] = self.createdate header_attrs["LASTMODDATE"] = now header_element = etree.Element(header_tag, **header_attrs) for agent in self.agents: header_element.append(agent.serialize()) for alternate_id in self.alternate_ids: header_element.append(alternate_id.serialize()) return header_element @staticmethod def _collect_mdsec_elements(files): """ Return all dmdSec and amdSec classes associated with the files. Returns all dmdSecs, then all amdSecs, so they only need to be serialized before being appended to the METS document. :param List files: List of :class:`FSEntry` to collect MDSecs for. :returns: List of AMDSecs and SubSections """ dmdsecs = [] amdsecs = [] for f in files: for d in f.dmdsecs: dmdsecs.append(d) for a in f.amdsecs: amdsecs.append(a) dmdsecs.sort(key=lambda x: x.id_string) amdsecs.sort(key=lambda x: x.id_string) return dmdsecs + amdsecs def _structmap(self): """ Returns structMap element for all files. """ structmap = etree.Element( utils.lxmlns("mets") + "structMap", TYPE="physical", # TODO Add ability for multiple structMaps ID="structMap_1", # TODO don't hardcode this LABEL="Archivematica default", ) for item in self._root_elements: child = item.serialize_structmap(recurse=True) if child is not None: structmap.append(child) return structmap def _normative_structmap(self): """Returns the normative structMap element for all files. This is a logical structMap that includes empty directories. """ structmap = etree.Element( utils.lxmlns("mets") + "structMap", TYPE="logical", ID="structMap_2", LABEL="Normative Directory Structure", ) for item in self._root_elements: child = item.serialize_structmap(recurse=True, normative=True) if child is not None: structmap.append(child) return structmap def _filesec(self, files=None): """ Returns fileSec Element containing all files grouped by use. """ if files is None: files = self.all_files() filesec = etree.Element(utils.lxmlns("mets") + "fileSec") filegrps = {} for file_ in files: if file_.type.lower() not in ("item", AIP_ENTRY_TYPE): continue # Get fileGrp, or create if not exist filegrp = filegrps.get(file_.use) if filegrp is None: filegrp = etree.Element(utils.lxmlns("mets") + "fileGrp", USE=file_.use) filegrps[file_.use] = filegrp file_el = file_.serialize_filesec() if file_el is not None: filegrp.append(file_el) for filegrp in self._sort_filegrps(filegrps): filesec.append(filegrp) return filesec def _sort_filegrps(self, filegrps): result = [] default_groups_count = len(DEFAULT_FILESEC_GROUPS_ORDER) for i, use in enumerate(filegrps.keys()): filegrp = filegrps[use] try: filegrp_position = DEFAULT_FILESEC_GROUPS_ORDER.index(use) except ValueError: filegrp_position = default_groups_count + i result.append((filegrp_position, filegrp)) return [v for _, v in sorted(result, key=lambda i: i[0])]
[docs] def serialize(self, fully_qualified=True, normative_structmap=True): """ Returns this document serialized to an xml Element. :return: Element for this document """ now = datetime.utcnow().replace(microsecond=0).isoformat("T") files = self.all_files() mdsecs = self._collect_mdsec_elements(files) root = self._document_root(fully_qualified=fully_qualified) root.append(self._mets_header(now=now)) for section in mdsecs: root.append(section.serialize(now=now)) root.append(self._filesec(files)) root.append(self._structmap()) if normative_structmap: root.append(self._normative_structmap()) for struct_map in self._custom_structmaps: root.append(struct_map) return root
[docs] def tostring(self, fully_qualified=True, pretty_print=True, encoding="UTF-8"): """ Serialize and return a string of this METS document. To write to file, see :meth:`write`. The default encoding is ``UTF-8``. This method will return a unicode string when ``encoding`` is set to ``unicode``. :return: String of this document """ root = self.serialize(fully_qualified=fully_qualified) kwargs = {"pretty_print": pretty_print, "encoding": encoding} if encoding != "unicode": kwargs["xml_declaration"] = True return etree.tostring(root, **kwargs)
[docs] def write( self, filepath, fully_qualified=True, pretty_print=False, encoding="UTF-8" ): """Serialize and write this METS document to `filepath`. The default encoding is ``UTF-8``. This method will return a unicode string when ``encoding`` is set to ``unicode``. :param str filepath: Path to write the METS document to """ root = self.serialize(fully_qualified=fully_qualified) tree = root.getroottree() kwargs = {"pretty_print": pretty_print, "encoding": encoding} if encoding != "unicode": kwargs["xml_declaration"] = True tree.write(filepath, **kwargs)
# PARSE HELPERS def _parse_tree_structmap(self, tree, parent_elem, normative_parent_elem=None): """Recursively parse all the children of parent_elem, including amdSecs and dmdSecs. :param lxml._ElementTree tree: encodes the entire METS file. :param lxml._Element parent_elem: the element whose children we are parsing. :param lxml._Element normative_parent_elem: the normative counterpart of ``parent_elem`` taken from the logical structMap labelled "Normative Directory Structure". """ siblings = [] el_to_normative = self._get_el_to_normative(parent_elem, normative_parent_elem) for elem, normative_elem in el_to_normative.items(): if elem.tag != utils.lxmlns("mets") + "div": continue # Only handle divs, not fptrs entry_type = elem.get("TYPE") label = elem.get("LABEL") fptr_elems = elem.findall("mets:fptr", namespaces=utils.NAMESPACES) # Directories are walked recursively. Additionally, they may # contain direct fptrs. if entry_type.lower() == "directory": children = self._parse_tree_structmap( tree, elem, normative_parent_elem=normative_elem ) fs_entry = fsentry.FSEntry.dir(label, children) self._add_dmdsecs_to_fs_entry(elem, fs_entry, tree) self._add_amdsecs_to_fs_entry(elem.get("ADMID"), fs_entry, tree) siblings.append(fs_entry) for fptr_elem in fptr_elems: fptr = self._analyze_fptr(fptr_elem, tree, entry_type) fs_entry = fsentry.FSEntry.from_fptr( label=None, type_="Item", fptr=fptr ) self._add_dmdsecs_to_fs_entry(elem, fs_entry, fptr.dmdids) self._add_amdsecs_to_fs_entry(fptr.amdids, fs_entry, tree) siblings.append(fs_entry) continue # Other types, e.g.: items, aips... if not len(fptr_elems): continue fptr = self._analyze_fptr(fptr_elems[0], tree, entry_type) fs_entry = fsentry.FSEntry.from_fptr(label, entry_type, fptr) self._add_dmdsecs_to_fs_entry(elem, fs_entry, tree, fptr.dmdids) self._add_amdsecs_to_fs_entry(fptr.amdids, fs_entry, tree) siblings.append(fs_entry) return siblings @staticmethod def _get_el_to_normative(parent_elem, normative_parent_elem): """Return ordered dict ``el_to_normative``, which maps children of ``parent_elem`` to their normative counterparts in the children of ``normative_parent_elem`` or to ``None`` if there is no normative parent. If there is a normative div element with no non-normative counterpart, that element is treated as a key with value ``None``. This allows us to create ``FSEntry`` instances for empty directory div elements, which are only documented in a normative logical structmap. """ el_to_normative = OrderedDict() if normative_parent_elem is None: for el in parent_elem: el_to_normative[el] = None else: for norm_el in normative_parent_elem: matches = [ el for el in parent_elem if el.get("TYPE") == norm_el.get("TYPE") and el.get("LABEL") == norm_el.get("LABEL") ] if matches: el_to_normative[matches[0]] = norm_el else: el_to_normative[norm_el] = None return el_to_normative @staticmethod def _analyze_fptr(fptr_elem, tree, entry_type): file_uuid = derived_from = use = path = amdids = checksum = checksumtype = None file_id = fptr_elem.get("FILEID") file_elem = tree.find( 'mets:fileSec//mets:file[@ID="' + file_id + '"]', namespaces=utils.NAMESPACES, ) if file_elem is None: raise exceptions.ParseError( "%s exists in structMap but not fileSec" % file_id ) use = file_elem.getparent().get("USE") path = file_elem.find("mets:FLocat", namespaces=utils.NAMESPACES).get( utils.lxmlns("xlink") + "href" ) try: path = utils.urldecode(path) except ValueError: raise exceptions.ParseError( 'Value "{}" (of attribute xlink:href) is not a valid' " URL.".format(path) ) amdids = file_elem.get("ADMID") dmdids = file_elem.get("DMDID") checksum = file_elem.get("CHECKSUM") checksumtype = file_elem.get("CHECKSUMTYPE") file_id_prefix = utils.FILE_ID_PREFIX # If the file is an AIP, then its prefix is the name of the AIP, # plus `file-` on 1.10+. Therefore we need to get the extension-less # basename of the AIP's path and remove its UUID suffix to ge # the prefix to remove from the FILEID attribute value. if entry_type.lower() in ( "archival information package", "archival information collection", ): aip_name = os.path.splitext(os.path.basename(path))[0][:-36] if file_id.startswith(file_id_prefix): file_id_prefix = file_id_prefix + aip_name else: file_id_prefix = aip_name # If the file is part of a directory (with no intermediate item), then # its prefix *may not* be "file-" but the name of the file. This # pattern is found in old Archivematica METS files, e.g. see # ``fixtures/mets_dir_with_many_ptrs.xml``. elif entry_type.lower() == "directory" and file_id[:5] != "file-": file_id_prefix = os.path.basename(path) + "-" file_uuid = file_id.replace(file_id_prefix, "", 1) group_uuid = file_elem.get("GROUPID", "").replace(utils.GROUP_ID_PREFIX, "", 1) if group_uuid != file_uuid: derived_from = group_uuid # Use group_uuid as placeholder transform_files = [] for transform_file in file_elem.findall( "mets:transformFile", namespaces=utils.NAMESPACES ): transform_file_attributes = {} for attrib, value in transform_file.attrib.items(): # FSEntry.__init__ will make this uppercase anyway key = attrib.upper() if key.startswith(TRANSFORM_PREFIX): key = key[TRANSFORM_PREFIX_LEN:] transform_file_attributes[key] = value transform_files.append(transform_file_attributes) return FPtr( file_uuid, derived_from, use, path, amdids, dmdids, checksum, checksumtype, file_id, transform_files, ) @staticmethod def _add_dmdsecs_to_fs_entry(elem, fs_entry, tree, dmdids=None): dmdids_to_add = elem.get("DMDID", "").split() if dmdids is not None: dmdids_to_add.extend( [dmdid for dmdid in dmdids.split() if dmdid not in dmdids_to_add] ) for dmdid in dmdids_to_add: dmdsec_elem = tree.find( 'mets:dmdSec[@ID="' + dmdid + '"]', namespaces=utils.NAMESPACES ) dmdsec = metadata.SubSection.parse(dmdsec_elem) fs_entry.dmdsecs.append(dmdsec) # Order by creation date and generate mapping by mdtype_othermdtype fs_entry.dmdsecs.sort(key=lambda x: x.created) for dmdsec in fs_entry.dmdsecs: mdtype_key = utils.generate_mdtype_key( dmdsec.contents.mdtype, getattr(dmdsec.contents, "othermdtype", "") ) fs_entry.dmdsecs_by_mdtype.setdefault(mdtype_key, []).append(dmdsec) @staticmethod def _add_amdsecs_to_fs_entry(amdids, fs_entry, tree): if amdids: amdids = amdids.split() for amdid in amdids: amdsec_elem = tree.find( 'mets:amdSec[@ID="' + amdid + '"]', namespaces=utils.NAMESPACES ) amdsec = metadata.AMDSec.parse(amdsec_elem) fs_entry.amdsecs.append(amdsec) def _parse_tree(self, tree=None): if tree is None: tree = self.tree # self._validate() self._parse_header(tree) # Read root attributes root = tree if isinstance(tree, etree._ElementTree): root = tree.getroot() self.objid = root.get("OBJID", None) # Parse structMap structMap = tree.find( 'mets:structMap[@TYPE="physical"]', namespaces=utils.NAMESPACES ) if structMap is None: raise exceptions.ParseError("No physical structMap found.") normative_struct_map_label = "Normative Directory Structure" normative_struct_map = tree.find( f'mets:structMap[@TYPE="logical"][@LABEL="{normative_struct_map_label}"]', namespaces=utils.NAMESPACES, ) self._root_elements = self._parse_tree_structmap( tree, structMap, normative_parent_elem=normative_struct_map ) self._custom_structmaps = [ e for e in tree.findall( 'mets:structMap[@TYPE="logical"]', namespaces=utils.NAMESPACES, ) if e.attrib.get("LABEL") != normative_struct_map_label ] # Associated derived files for entry in self.all_files(): entry.derived_from = self.get_file( file_uuid=entry.derived_from, type="Item" ) def _parse_header(self, tree): header = self.tree.find("mets:metsHdr", namespaces=utils.NAMESPACES) # Check CREATEDATE < now if header is not None: createdate = header.get("CREATEDATE") else: createdate = None now = datetime.utcnow().isoformat("T") if createdate and createdate > now: raise exceptions.ParseError(f"CREATEDATE more recent than now ({now})") self.createdate = createdate if header is not None: agent_elements = header.findall( metadata.Agent.AGENT_TAG, namespaces=utils.NAMESPACES ) for agent_element in agent_elements: agent = metadata.Agent.parse(agent_element) self.agents.append(agent) alternate_ids = header.findall( metadata.AltRecordID.ALT_RECORD_ID_TAG, namespaces=utils.NAMESPACES ) for alternate_id_element in alternate_ids: alternate_id = metadata.AltRecordID.parse(alternate_id_element) self.alternate_ids.append(alternate_id) def _validate(self): raise NotImplementedError()
[docs] @classmethod def fromfile(cls, path): """ Creates a METS by parsing a file. :param str path: Path to a METS document. """ parser = etree.XMLParser(remove_blank_text=True) return cls.fromtree(etree.parse(path, parser=parser))
[docs] @classmethod def fromstring(cls, string): """ Create a METS by parsing a string. :param str string: String containing a METS document. """ parser = etree.XMLParser(remove_blank_text=True) root = etree.fromstring(string, parser) tree = root.getroottree() return cls.fromtree(tree)
[docs] @classmethod def fromtree(cls, tree): """ Create a METS from an ElementTree or Element. :param ElementTree tree: ElementTree to build a METS document from. """ mets = cls() mets.tree = tree mets._parse_tree(tree) return mets
if __name__ == "__main__": mw = METSDocument() mw.fromfile(sys.argv[1]) mw.write(sys.argv[2], fully_qualified=True, pretty_print=True)