Source code for ciowarehouse2.inputs.rule_basic
"""Class for basic input rule."""
from __future__ import annotations
from os import makedirs
from os.path import join, normpath, exists, isfile, basename, dirname
from logging import getLogger
from shutil import copy
from re import compile as re_compile, error as re_error, sub as re_sub
from collections import OrderedDict
from functools import reduce
from chrysalio.lib.utils import copy_content
from chrysalio.scripts import ScriptRegistry
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES
from ..lib.ciopath import CioPath
from ..lib.file_info import info_get_root, info_save, info_set_fields
from ..lib.warehouse import Warehouse
from ..models.dbinput import DBInputRule
LOG = getLogger(__name__)
# =============================================================================
[docs]
class InputRuleBasic():
"""Class to manage basic input rule.
:type registry: chrysalio.scripts.ScriptRegistry
:param registry:
Application registry.
:param get_warehouse:
Function to retrieve a warehouse.
:type dbrule: .models.dbinput.DBInputRule
:param dbrule:
SqlAlchemy object to build this rule.
``self.conditions`` is a list of conditions. Each condition is a tuple such
as ``(key, value_regex)``.
"""
# -------------------------------------------------------------------------
def __init__(
self, registry: ScriptRegistry, get_warehouse,
dbrule: DBInputRule):
"""Constructor method."""
self._get_warehouse = get_warehouse
self._backend = registry['modules']['ciowarehouse2'].backend
self._fields = registry['fields']
self.exclusive = dbrule.exclusive
variables = []
for dbvariable in dbrule.variables:
try:
variables.append((str(dbvariable.name), ( # yapf: disable
str(dbvariable.key) if dbvariable.key else None,
re_compile(dbvariable.pattern) if dbvariable.pattern
else None,
dbvariable.transform, [
k.split('=')
for k in (dbvariable.args or '').split()
])))
except re_error as err:
LOG.error(err)
self.variables = OrderedDict(variables)
self.conditions = []
for dbcondition in dbrule.conditions:
try:
self.conditions.append(( # yapf: disable
str(dbcondition.key),
re_compile(dbcondition.pattern)
if dbcondition.pattern else None))
except re_error as err:
LOG.error(err)
self.warehouse_id = dbrule.warehouse_id
self.path = dbrule.path
self.flat = dbrule.flat
# -------------------------------------------------------------------------
[docs]
def apply(self, build: Build, document: dict) -> bool:
"""Check if the document matches with this rule and possibly send the
document to its destination.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict document:
Current document.
:rtype: bool
"""
# Values
values = document.copy()
for name, variable in self.variables.items():
key = variable[0]
if key is None: # pragma: nocover
continue
if variable[1] is not None:
matches = variable[1].search(values[key])
if matches and matches.groups():
values[name] = self._transform(
name, ''.join(matches.groups()))
else:
values[name] = self._transform(
name, values[self.variables[name][0]])
# Conditions
for condition in self.conditions:
if condition[0] not in values \
or (condition[1] is None and not values[condition[0]]) \
or (condition[1] is not None and condition[1].search(
values[condition[0]])) is None:
return False
# Destination
try:
warehouse_id = self.warehouse_id.format(**values)
path = normpath((self.path or '.').format(**values))
except KeyError as error:
build.error(error)
return False
warehouse = self._get_warehouse(build, warehouse_id)
if warehouse is None:
return False
return self.document2destination(
build, values, document, warehouse, path)
# -------------------------------------------------------------------------
[docs]
def document2destination(
self, build: Build, values: dict, document: dict,
warehouse: Warehouse, path: str) -> bool:
"""Copy the document into the destination warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict values:
Values of the variables.
:param dict document:
Current document.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param str path:
Path of the document inside the warehouse.
:rtype: bool
"""
# Check document and destination directory
if not document.get('source'):
return False
directory_ciopath = CioPath(
warehouse.uid,
join(path, '.' if self.flat else dirname(document['path'])), True)
directory_abspath = directory_ciopath.absolute_path(warehouse.root)
if directory_abspath is None or \
not normpath(directory_abspath).startswith(warehouse.root):
build.error(translate( # yapf: disable
_('Directory ${d} is outside warehouse!', {'d': path}),
lang=build.lang))
return False
# Import document
makedirs(directory_abspath, exist_ok=True)
ciopath = CioPath(None)
if isfile(document['source']):
copy(document['source'], directory_abspath)
ciopath = CioPath(
warehouse.uid,
join(directory_ciopath.path, basename(document['source'])))
elif exists(document['source']):
copy_content(document['source'], directory_abspath, EXCLUDED_FILES)
ciopath = CioPath(
warehouse.uid,
join(directory_ciopath.path, basename(document['source'])),
True)
# Update information file
self._update_info(warehouse, ciopath, values)
# Commit and refresh the warehouse
self._commit(build, document, warehouse, directory_ciopath)
return True
# -------------------------------------------------------------------------
def _update_info(
self, warehouse: Warehouse, ciopath: CioPath, values: dict):
"""Update information file.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param ciopath:
`CioPath` of the destination file.
:param dict values:
Values of the variables.
"""
# Retrieve field values
fields = []
for field_id, value in values.items():
if field_id in warehouse.metafields:
fields.append(
(field_id, value, self._fields[field_id]['display']))
if not fields:
return
# Load or create information file
root_elt = info_get_root(warehouse.root, ciopath)
if root_elt is None: # pragma: nocover
LOG.error(_('Incorrect file ${f}!', {'f': ciopath}))
return
# Set and save
info_set_fields(root_elt, fields)
info_save(root_elt, warehouse.root, ciopath)
# -------------------------------------------------------------------------
def _commit(
self, build: Build, document: dict, warehouse: Warehouse,
directory_ciopath: CioPath):
"""Commit and refresh the warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:param dict document:
Current document.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object for destination warehouse.
:param directory_ciopath:
`CioPath` of the destination directory.
"""
err = warehouse.vcs.add(directory_ciopath)
if err is not None: # pragma: nocover
LOG.error(err)
err = warehouse.vcs.commit(
translate(_('Automatic import'), lang=build.lang),
document.get('from', ''))
if err: # pragma: nocover
build.error(err)
else:
build.info(translate(_( # yapf: disable
'"${s}" imported from ${f} into ${t}.', {
's': basename(document['source']),
'f': document['from'],
't': directory_ciopath
}), lang=build.lang))
err = self._backend.index(warehouse, [directory_ciopath.path])
if err is not None: # pragma: nocover
LOG.error(err)
# -------------------------------------------------------------------------
def _transform(self, name: str, value: str) -> str:
"""Apply the transformation (capitalize, lower or upper) of the
variable named ``name`` to the ``value``.
:param str name:
Name of the variable.
:param str value:
Value to transform.
:rtype: str
"""
if self.variables[name][2] is None:
return value
if self.variables[name][2] == 'capitalize':
return value.capitalize()
if self.variables[name][2] == 'upper':
return value.upper()
if self.variables[name][2] == 'lower':
return value.lower()
if self.variables[name][2] == 'replace' and self.variables[name][3]:
return reduce(
lambda a, kv: re_sub(kv[0], kv[1], a), self.variables[name][3],
value)
return value # pragma: nocover