"""File information class."""
from __future__ import annotations
from logging import getLogger
from os import remove, makedirs
from os.path import isfile, exists, dirname
from datetime import date, datetime
from lxml import etree
from pyramid.request import Request
from chrysalio.lib.xml import load_xml2, validate_xml, relaxng4validation
from chrysalio.lib.log import log_error
from chrysalio.lib.utils import convert_value, tostr
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from .ciopath import CioPath
from .warehouse import Warehouse
LOG = getLogger(__name__)
# =============================================================================
[docs]
def info_read(
request: Request,
warehouse: Warehouse,
ciopath: CioPath,
metafields: list | tuple | None = None
) -> tuple[dict, set[str], set[str] | None]:
"""Return fields, sharings and only4groups for the given path.
:type request: pyramid.request.Request
:param request:
Current request or ``None`` if called by a script.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:type CioPath: .lib.ciopath.CioPath
:param CioPath:
`CioPath` of the file.
:param list metafields: (optional)
List of metadata field IDS to retrieve. If not defined, return all
metadata fields of the warehouse.
:rtype: tuple
:return:
A tuple such as ``(metadata, sharings, only4groups)``.
"""
root_elt = info_get_root(warehouse.root, ciopath)
if root_elt is None:
return {}, set(), None
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
fields = request.registry['fields']
if metafields is None:
metafields = warehouse.metafields
metadata = {}
for field_id in metafields:
if field_id not in warehouse.metafields or field_id not in fields:
continue
elts = root_elt.xpath(
f'ns:info/ns:fields/ns:field[@id="{field_id}"]',
namespaces=namespaces)
if elts:
if fields[field_id]['display'] == 'strings':
metadata[field_id] = '\n'.join([k.text.strip() for k in elts])
else:
metadata[field_id] = convert_value(
fields[field_id]['display'], elts[0].text.strip())
return \
metadata, info_get_sharings(root_elt), info_get_only4groups(root_elt)
# =============================================================================
[docs]
def info_get_root(
warehouse_root: str,
ciopath: CioPath,
remove_fields: bool = False,
remove_only4groups: bool = False) -> etree.Element | None:
"""Load or create the information XML structure of a given `CioPath`.
:param warehouse_root:
Absolute path to the warehouse root.
:type ciopath: .lib.ciopaht.CioPath
:param ciopath:
The current `CioPath`.
:param bool remove_fields:
if ``True`` metadata fields are removed.
:param bool remove_only4groups:
if ``True`` authorized groups are removed.
:rtype: lxml.etree._Element
:return:
Root element of the information XML file.
"""
abs_info = ciopath.absolute_info(warehouse_root)
if abs_info is None:
return None
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
root_elt = None
if isfile(abs_info):
root_elt, err = load_xml2(
abs_info,
relaxng4validation(RELAXNG_CIOWAREHOUSE2),
parser=etree.XMLParser(remove_blank_text=True))
root_elt = root_elt.getroot() if err is None else None
if root_elt is not None and remove_fields:
elt = root_elt.xpath('ns:info/ns:fields', namespaces={'ns': namespace})
if elt:
elt[0].getparent().remove(elt[0])
if root_elt is not None and remove_only4groups:
elt = root_elt.xpath(
'ns:info/ns:only4groups', namespaces={'ns': namespace})
if elt:
elt[0].getparent().remove(elt[0])
if root_elt is None:
root_elt = etree.Element(
RELAXNG_CIOWAREHOUSE2['root'],
version=RELAXNG_CIOWAREHOUSE2['version'],
nsmap={None: namespace})
etree.SubElement(root_elt, f'{{{namespace}}}info')
return root_elt
# =============================================================================
[docs]
def info_save(
root_elt: etree.Element,
warehouse_root: str,
ciopath: CioPath,
request: Request | None = None) -> bool:
"""Save an information XML file of a given `CioPath`.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:param warehouse_root:
Absolute path to the warehouse root.
:type ciopath: .lib.ciopaht.CioPath
:param ciopath:
The current `CioPath`.
:type request: pyramid.request.Request
:param request: (optional)
Current request.
:rtype: bool
"""
abs_info = ciopath.absolute_info(warehouse_root)
if abs_info is None:
return False
# Clean up XML
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
tree = etree.ElementTree(etree.XML(etree.tostring( # yapf: disable
root_elt, pretty_print=True, encoding='utf-8',
xml_declaration=True)))
elt = tree.xpath('ns:info/ns:fields', namespaces=namespaces)
if elt and not elt[0].xpath('*'):
elt[0].getparent().remove(elt[0])
elt = tree.xpath('ns:info/ns:sharings', namespaces=namespaces)
if elt and not elt[0].xpath('*'):
elt[0].getparent().remove(elt[0])
elt = tree.xpath('ns:info/ns:only4groups', namespaces=namespaces)
if elt and not elt[0].xpath('*'):
elt[0].getparent().remove(elt[0])
if not tree.xpath('ns:info/*', namespaces=namespaces):
if exists(abs_info):
remove(abs_info)
return True
return False
# Validate XML
err = validate_xml(tree, relaxng4validation(RELAXNG_CIOWAREHOUSE2))
if err is not None: # pragma: nocover
if request is not None:
log_error(request, err)
request.session.flash(err, 'alert')
else:
LOG.error(err)
return False
# Save XML
makedirs(dirname(abs_info), exist_ok=True)
try:
tree.write( # yapf: disable
abs_info, pretty_print=True, encoding='utf-8',
xml_declaration=True)
except IOError: # pragma: nocover
return False
return True
# =============================================================================
[docs]
def info_get_field(request: Request, root_elt: etree.Element, field_id: str):
"""Return the value of a field in its correct type or ``None``.
:type request: pyramid.request.Request
:param request:
Current request.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:param str meta_id:
ID of the metadata to retrieve.
:return:
Value of field or ``None``.
"""
if field_id not in request.registry['fields']:
return None
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
field_elt = root_elt.xpath(
f'ns:info/ns:fields/ns:field[@id="{field_id}"]', namespaces=namespaces)
if not field_elt:
return None
return convert_value(
request.registry['fields'][field_id]['field_type'],
field_elt[0].text.strip())
# =============================================================================
[docs]
def info_set_fields(root_elt: etree.Element, fields: list):
"""Update metadata fields.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:param list fields:
List of metadata fields. Each item is a tuple such as
``(field_id, value, display)``.
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
namespaces = {'ns': namespace}
fields_elt = root_elt.xpath('ns:info/ns:fields', namespaces=namespaces)
if not fields_elt:
fields_elt = etree.SubElement(root_elt[0], f'{{{namespace}}}fields')
else:
fields_elt = fields_elt[0]
for field in fields:
elts = fields_elt.xpath(
f'ns:field[@id="{field[0]}"]', namespaces=namespaces)
if field[1] is None:
for elt in elts:
elt.getparent().remove(elt)
continue
# "strings" field
if field[2] == 'strings':
for elt in elts:
elt.getparent().remove(elt)
for value in field[1].replace('\r', '').split('\n'):
elt = etree.SubElement(fields_elt, f'{{{namespace}}}field')
elt.set('id', field[0])
elt.text = value.strip()
continue
# other fields
if not elts:
elts = (etree.SubElement(fields_elt, f'{{{namespace}}}field'), )
elts[0].set('id', field[0])
elts[0].text = value2str(field[1])
# =============================================================================
[docs]
def info_get_sharings(root_elt: etree.Element | None) -> set[str]:
"""Return a list of sharing IDs.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:rtype: set
"""
if root_elt is None:
return set()
sharings = set()
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
for elt in root_elt.xpath('ns:info/ns:sharings/ns:sharing',
namespaces=namespaces):
sharings.add(elt.text.strip())
return sharings
# =============================================================================
[docs]
def info_add_sharing(root_elt: etree.Element, sharing_id: str) -> bool:
"""Add a sharing ID.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:param str sharing_id:
Sharing ID.
:rtype: bool
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
namespaces = {'ns': namespace}
sharing_elt = root_elt.xpath(
f'ns:info/ns:sharings/ns:sharing[normalize-space()="{sharing_id}"]',
namespaces=namespaces)
if sharing_elt:
return False
sharings_elt = root_elt.xpath('ns:info/ns:sharings', namespaces=namespaces)
if not sharings_elt:
sharings_elt = etree.SubElement(
root_elt[0], f'{{{namespace}}}sharings')
else:
sharings_elt = sharings_elt[0]
etree.SubElement(sharings_elt, f'{{{namespace}}}sharing').text = sharing_id
return True
# =============================================================================
[docs]
def info_remove_sharing(root_elt: etree.Element, sharing_id: str):
"""Remove a sharing ID.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:param str sharing_id:
Sharing ID.
"""
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
sharing_elt = root_elt.xpath(
f'ns:info/ns:sharings/ns:sharing[normalize-space()="{sharing_id}"]',
namespaces=namespaces)
if sharing_elt:
sharing_elt[0].getparent().remove(sharing_elt[0])
# =============================================================================
[docs]
def info_get_only4groups(root_elt: etree.Element | None) -> set[str] | None:
"""Return a list of authorized groups or ``None`` if there is no
restriction.
:type root_elt: lxml.etree.Element
:param root_elt:
Root of the XML document.
:rtype: set
"""
if root_elt is None:
return None
namespaces = {'ns': RELAXNG_CIOWAREHOUSE2['namespace']}
only4groups_elt = root_elt.xpath(
'ns:info/ns:only4groups', namespaces=namespaces)
if not only4groups_elt:
return None
only4groups_elt = only4groups_elt[0]
only4groups = set()
for elt in only4groups_elt.xpath('ns:only4group', namespaces=namespaces):
only4groups.add(elt.text.strip())
return only4groups
# =============================================================================
[docs]
def value2str(value) -> str:
"""Convert a value to a string for XML.
:param value:
Value to convert.
:rtype: str
"""
if isinstance(value, bool):
return 'true' if value else 'false'
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, datetime):
return value.replace(microsecond=0, tzinfo=None).isoformat()
if isinstance(value, date):
return value.isoformat()
return tostr(value).replace('\r', '')