Source code for ciowarehouse2.lib.file_info

"""File information class."""

from __future__ import annotations
from logging import getLogger
from os import remove, makedirs
from os.path import isfile, exists, dirname
from datetime import date, datetime

from lxml import etree

from pyramid.request import Request

from chrysalio.lib.xml import load_xml2, validate_xml, relaxng4validation
from chrysalio.lib.log import log_error
from chrysalio.lib.utils import convert_value, tostr
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from .ciopath import CioPath
from .warehouse import Warehouse

LOG = getLogger(__name__)


# =============================================================================
[docs] def info_read( request: Request, warehouse: Warehouse, ciopath: CioPath, metafields: list | tuple | None = None ) -> tuple[dict, set[str], set[str] | None]: """Return fields, sharings and only4groups for the given path. :type request: pyramid.request.Request :param request: Current request or ``None`` if called by a script. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :type CioPath: .lib.ciopath.CioPath :param CioPath: `CioPath` of the file. :param list metafields: (optional) List of metadata field IDS to retrieve. If not defined, return all metadata fields of the warehouse. :rtype: tuple :return: A tuple such as ``(metadata, sharings, only4groups)``. """ root_elt = info_get_root(warehouse.root, ciopath) if root_elt is None: return {}, set(), None namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} fields = request.registry['fields'] if metafields is None: metafields = warehouse.metafields metadata = {} for field_id in metafields: if field_id not in warehouse.metafields or field_id not in fields: continue elts = root_elt.xpath( f'ns:info/ns:fields/ns:field[@id="{field_id}"]', namespaces=namespaces) if elts: if fields[field_id]['display'] == 'strings': metadata[field_id] = '\n'.join([k.text.strip() for k in elts]) else: metadata[field_id] = convert_value( fields[field_id]['display'], elts[0].text.strip()) return \ metadata, info_get_sharings(root_elt), info_get_only4groups(root_elt)
# =============================================================================
[docs] def info_get_root( warehouse_root: str, ciopath: CioPath, remove_fields: bool = False, remove_only4groups: bool = False) -> etree.Element | None: """Load or create the information XML structure of a given `CioPath`. :param warehouse_root: Absolute path to the warehouse root. :type ciopath: .lib.ciopaht.CioPath :param ciopath: The current `CioPath`. :param bool remove_fields: if ``True`` metadata fields are removed. :param bool remove_only4groups: if ``True`` authorized groups are removed. :rtype: lxml.etree._Element :return: Root element of the information XML file. """ abs_info = ciopath.absolute_info(warehouse_root) if abs_info is None: return None namespace = RELAXNG_CIOWAREHOUSE2['namespace'] root_elt = None if isfile(abs_info): root_elt, err = load_xml2( abs_info, relaxng4validation(RELAXNG_CIOWAREHOUSE2), parser=etree.XMLParser(remove_blank_text=True)) root_elt = root_elt.getroot() if err is None else None if root_elt is not None and remove_fields: elt = root_elt.xpath('ns:info/ns:fields', namespaces={'ns': namespace}) if elt: elt[0].getparent().remove(elt[0]) if root_elt is not None and remove_only4groups: elt = root_elt.xpath( 'ns:info/ns:only4groups', namespaces={'ns': namespace}) if elt: elt[0].getparent().remove(elt[0]) if root_elt is None: root_elt = etree.Element( RELAXNG_CIOWAREHOUSE2['root'], version=RELAXNG_CIOWAREHOUSE2['version'], nsmap={None: namespace}) etree.SubElement(root_elt, f'{{{namespace}}}info') return root_elt
# =============================================================================
[docs] def info_save( root_elt: etree.Element, warehouse_root: str, ciopath: CioPath, request: Request | None = None) -> bool: """Save an information XML file of a given `CioPath`. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param warehouse_root: Absolute path to the warehouse root. :type ciopath: .lib.ciopaht.CioPath :param ciopath: The current `CioPath`. :type request: pyramid.request.Request :param request: (optional) Current request. :rtype: bool """ abs_info = ciopath.absolute_info(warehouse_root) if abs_info is None: return False # Clean up XML namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} tree = etree.ElementTree(etree.XML(etree.tostring( # yapf: disable root_elt, pretty_print=True, encoding='utf-8', xml_declaration=True))) elt = tree.xpath('ns:info/ns:fields', namespaces=namespaces) if elt and not elt[0].xpath('*'): elt[0].getparent().remove(elt[0]) elt = tree.xpath('ns:info/ns:sharings', namespaces=namespaces) if elt and not elt[0].xpath('*'): elt[0].getparent().remove(elt[0]) elt = tree.xpath('ns:info/ns:only4groups', namespaces=namespaces) if elt and not elt[0].xpath('*'): elt[0].getparent().remove(elt[0]) if not tree.xpath('ns:info/*', namespaces=namespaces): if exists(abs_info): remove(abs_info) return True return False # Validate XML err = validate_xml(tree, relaxng4validation(RELAXNG_CIOWAREHOUSE2)) if err is not None: # pragma: nocover if request is not None: log_error(request, err) request.session.flash(err, 'alert') else: LOG.error(err) return False # Save XML makedirs(dirname(abs_info), exist_ok=True) try: tree.write( # yapf: disable abs_info, pretty_print=True, encoding='utf-8', xml_declaration=True) except IOError: # pragma: nocover return False return True
# =============================================================================
[docs] def info_get_field(request: Request, root_elt: etree.Element, field_id: str): """Return the value of a field in its correct type or ``None``. :type request: pyramid.request.Request :param request: Current request. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param str meta_id: ID of the metadata to retrieve. :return: Value of field or ``None``. """ if field_id not in request.registry['fields']: return None namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} field_elt = root_elt.xpath( f'ns:info/ns:fields/ns:field[@id="{field_id}"]', namespaces=namespaces) if not field_elt: return None return convert_value( request.registry['fields'][field_id]['field_type'], field_elt[0].text.strip())
# =============================================================================
[docs] def info_set_fields(root_elt: etree.Element, fields: list): """Update metadata fields. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param list fields: List of metadata fields. Each item is a tuple such as ``(field_id, value, display)``. """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] namespaces = {'ns': namespace} fields_elt = root_elt.xpath('ns:info/ns:fields', namespaces=namespaces) if not fields_elt: fields_elt = etree.SubElement(root_elt[0], f'{{{namespace}}}fields') else: fields_elt = fields_elt[0] for field in fields: elts = fields_elt.xpath( f'ns:field[@id="{field[0]}"]', namespaces=namespaces) if field[1] is None: for elt in elts: elt.getparent().remove(elt) continue # "strings" field if field[2] == 'strings': for elt in elts: elt.getparent().remove(elt) for value in field[1].replace('\r', '').split('\n'): elt = etree.SubElement(fields_elt, f'{{{namespace}}}field') elt.set('id', field[0]) elt.text = value.strip() continue # other fields if not elts: elts = (etree.SubElement(fields_elt, f'{{{namespace}}}field'), ) elts[0].set('id', field[0]) elts[0].text = value2str(field[1])
# =============================================================================
[docs] def info_get_sharings(root_elt: etree.Element | None) -> set[str]: """Return a list of sharing IDs. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :rtype: set """ if root_elt is None: return set() sharings = set() namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} for elt in root_elt.xpath('ns:info/ns:sharings/ns:sharing', namespaces=namespaces): sharings.add(elt.text.strip()) return sharings
# =============================================================================
[docs] def info_add_sharing(root_elt: etree.Element, sharing_id: str) -> bool: """Add a sharing ID. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param str sharing_id: Sharing ID. :rtype: bool """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] namespaces = {'ns': namespace} sharing_elt = root_elt.xpath( f'ns:info/ns:sharings/ns:sharing[normalize-space()="{sharing_id}"]', namespaces=namespaces) if sharing_elt: return False sharings_elt = root_elt.xpath('ns:info/ns:sharings', namespaces=namespaces) if not sharings_elt: sharings_elt = etree.SubElement( root_elt[0], f'{{{namespace}}}sharings') else: sharings_elt = sharings_elt[0] etree.SubElement(sharings_elt, f'{{{namespace}}}sharing').text = sharing_id return True
# =============================================================================
[docs] def info_remove_sharing(root_elt: etree.Element, sharing_id: str): """Remove a sharing ID. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :param str sharing_id: Sharing ID. """ namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} sharing_elt = root_elt.xpath( f'ns:info/ns:sharings/ns:sharing[normalize-space()="{sharing_id}"]', namespaces=namespaces) if sharing_elt: sharing_elt[0].getparent().remove(sharing_elt[0])
# =============================================================================
[docs] def info_get_only4groups(root_elt: etree.Element | None) -> set[str] | None: """Return a list of authorized groups or ``None`` if there is no restriction. :type root_elt: lxml.etree.Element :param root_elt: Root of the XML document. :rtype: set """ if root_elt is None: return None namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']} only4groups_elt = root_elt.xpath( 'ns:info/ns:only4groups', namespaces=namespaces) if not only4groups_elt: return None only4groups_elt = only4groups_elt[0] only4groups = set() for elt in only4groups_elt.xpath('ns:only4group', namespaces=namespaces): only4groups.add(elt.text.strip()) return only4groups
# =============================================================================
[docs] def value2str(value) -> str: """Convert a value to a string for XML. :param value: Value to convert. :rtype: str """ if isinstance(value, bool): return 'true' if value else 'false' if isinstance(value, (int, float)): return str(value) if isinstance(value, datetime): return value.replace(microsecond=0, tzinfo=None).isoformat() if isinstance(value, date): return value.isoformat() return tostr(value).replace('\r', '')