Source code for ciowarehouse2.inputs.rule_basic

"""Class for basic input rule."""

from __future__ import annotations
from os import makedirs
from os.path import join, normpath, exists, isfile, basename, dirname
from logging import getLogger
from shutil import copy
from re import compile as re_compile, error as re_error, sub as re_sub
from collections import OrderedDict
from functools import reduce

from chrysalio.lib.utils import copy_content
from chrysalio.scripts import ScriptRegistry
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES
from ..lib.ciopath import CioPath
from ..lib.file_info import info_get_root, info_save, info_set_fields
from ..lib.warehouse import Warehouse
from ..models.dbinput import DBInputRule

LOG = getLogger(__name__)


# =============================================================================
[docs] class InputRuleBasic(): """Class to manage basic input rule. :type registry: chrysalio.scripts.ScriptRegistry :param registry: Application registry. :param get_warehouse: Function to retrieve a warehouse. :type dbrule: .models.dbinput.DBInputRule :param dbrule: SqlAlchemy object to build this rule. ``self.conditions`` is a list of conditions. Each condition is a tuple such as ``(key, value_regex)``. """ # ------------------------------------------------------------------------- def __init__( self, registry: ScriptRegistry, get_warehouse, dbrule: DBInputRule): """Constructor method.""" self._get_warehouse = get_warehouse self._backend = registry['modules']['ciowarehouse2'].backend self._fields = registry['fields'] self.exclusive = dbrule.exclusive variables = [] for dbvariable in dbrule.variables: try: variables.append((str(dbvariable.name), ( # yapf: disable str(dbvariable.key) if dbvariable.key else None, re_compile(dbvariable.pattern) if dbvariable.pattern else None, dbvariable.transform, [ k.split('=') for k in (dbvariable.args or '').split() ]))) except re_error as err: LOG.error(err) self.variables = OrderedDict(variables) self.conditions = [] for dbcondition in dbrule.conditions: try: self.conditions.append(( # yapf: disable str(dbcondition.key), re_compile(dbcondition.pattern) if dbcondition.pattern else None)) except re_error as err: LOG.error(err) self.warehouse_id = dbrule.warehouse_id self.path = dbrule.path self.flat = dbrule.flat # -------------------------------------------------------------------------
[docs] def apply(self, build: Build, document: dict) -> bool: """Check if the document matches with this rule and possibly send the document to its destination. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict document: Current document. :rtype: bool """ # Values values = document.copy() for name, variable in self.variables.items(): key = variable[0] if key is None: # pragma: nocover continue if variable[1] is not None: matches = variable[1].search(values[key]) if matches and matches.groups(): values[name] = self._transform( name, ''.join(matches.groups())) else: values[name] = self._transform( name, values[self.variables[name][0]]) # Conditions for condition in self.conditions: if condition[0] not in values \ or (condition[1] is None and not values[condition[0]]) \ or (condition[1] is not None and condition[1].search( values[condition[0]])) is None: return False # Destination try: warehouse_id = self.warehouse_id.format(**values) path = normpath((self.path or '.').format(**values)) except KeyError as error: build.error(error) return False warehouse = self._get_warehouse(build, warehouse_id) if warehouse is None: return False return self.document2destination( build, values, document, warehouse, path)
# -------------------------------------------------------------------------
[docs] def document2destination( self, build: Build, values: dict, document: dict, warehouse: Warehouse, path: str) -> bool: """Copy the document into the destination warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict values: Values of the variables. :param dict document: Current document. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param str path: Path of the document inside the warehouse. :rtype: bool """ # Check document and destination directory if not document.get('source'): return False directory_ciopath = CioPath( warehouse.uid, join(path, '.' if self.flat else dirname(document['path'])), True) directory_abspath = directory_ciopath.absolute_path(warehouse.root) if directory_abspath is None or \ not normpath(directory_abspath).startswith(warehouse.root): build.error(translate( # yapf: disable _('Directory ${d} is outside warehouse!', {'d': path}), lang=build.lang)) return False # Import document makedirs(directory_abspath, exist_ok=True) ciopath = CioPath(None) if isfile(document['source']): copy(document['source'], directory_abspath) ciopath = CioPath( warehouse.uid, join(directory_ciopath.path, basename(document['source']))) elif exists(document['source']): copy_content(document['source'], directory_abspath, EXCLUDED_FILES) ciopath = CioPath( warehouse.uid, join(directory_ciopath.path, basename(document['source'])), True) # Update information file self._update_info(warehouse, ciopath, values) # Commit and refresh the warehouse self._commit(build, document, warehouse, directory_ciopath) return True
# ------------------------------------------------------------------------- def _update_info( self, warehouse: Warehouse, ciopath: CioPath, values: dict): """Update information file. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param ciopath: `CioPath` of the destination file. :param dict values: Values of the variables. """ # Retrieve field values fields = [] for field_id, value in values.items(): if field_id in warehouse.metafields: fields.append( (field_id, value, self._fields[field_id]['display'])) if not fields: return # Load or create information file root_elt = info_get_root(warehouse.root, ciopath) if root_elt is None: # pragma: nocover LOG.error(_('Incorrect file ${f}!', {'f': ciopath})) return # Set and save info_set_fields(root_elt, fields) info_save(root_elt, warehouse.root, ciopath) # ------------------------------------------------------------------------- def _commit( self, build: Build, document: dict, warehouse: Warehouse, directory_ciopath: CioPath): """Commit and refresh the warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :param dict document: Current document. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object for destination warehouse. :param directory_ciopath: `CioPath` of the destination directory. """ err = warehouse.vcs.add(directory_ciopath) if err is not None: # pragma: nocover LOG.error(err) err = warehouse.vcs.commit( translate(_('Automatic import'), lang=build.lang), document.get('from', '')) if err: # pragma: nocover build.error(err) else: build.info(translate(_( # yapf: disable '"${s}" imported from ${f} into ${t}.', { 's': basename(document['source']), 'f': document['from'], 't': directory_ciopath }), lang=build.lang)) err = self._backend.index(warehouse, [directory_ciopath.path]) if err is not None: # pragma: nocover LOG.error(err) # ------------------------------------------------------------------------- def _transform(self, name: str, value: str) -> str: """Apply the transformation (capitalize, lower or upper) of the variable named ``name`` to the ``value``. :param str name: Name of the variable. :param str value: Value to transform. :rtype: str """ if self.variables[name][2] is None: return value if self.variables[name][2] == 'capitalize': return value.capitalize() if self.variables[name][2] == 'upper': return value.upper() if self.variables[name][2] == 'lower': return value.lower() if self.variables[name][2] == 'replace' and self.variables[name][3]: return reduce( lambda a, kv: re_sub(kv[0], kv[1], a), self.variables[name][3], value) return value # pragma: nocover