"""Warehouse class."""
from __future__ import annotations
from logging import getLogger
from os import sep, scandir, makedirs, rmdir, remove
from os.path import exists, join, abspath, isdir, dirname, basename
from os.path import getmtime, splitext, normpath, relpath, commonpath
from shutil import rmtree, copy
from json import loads
from time import time
from threading import Thread
from configparser import ConfigParser
from sqlalchemy import Column
from pyramid.request import Request
from chrysalio.lib.utils import decrypt
from chrysalio.lib.utils import copy_content
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.log import log_error, log_warning
from chrysalio.helpers.builder import Builder
from chrysalio.lib.attachment import attachment_url
from cioservice.models.dbjob import DBJob
from ..models.dbwarehouse import CORE_DISPLAY, DBWarehouse
from ..models.dbwarehouse import default_fields
from .utils import CIOWAREHOUSE2_NS, CACHE_REGION_USER, EXCLUDED_FILES
from .utils import normalize_filename, file_move_check, cache_user_seeds
from .seeder import Seeder
from .ciopath import LOCAL_DIR, LOCKS_DIR, CioPath
from .vcs_none import VcsNone
from .vcs_git import VcsGit
from .i18n import _, translate
LOG = getLogger(__name__)
REFRESHED_FILE = join(LOCAL_DIR, 'refreshed')
# =============================================================================
[docs]
class Warehouse():
"""Class to manage a warehouse.
:param dict registry:
Application registry.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy object representing the warehouse
:param dict locations:
Dictionary of locations.
"""
# pylint: disable = too-many-instance-attributes
# -------------------------------------------------------------------------
def __init__(self, registry, dbwarehouse: DBWarehouse, locations: dict):
"""Constructor method."""
self.uid = str(dbwarehouse.warehouse_id)
self.created = time()
self.root = abspath(join(locations[dbwarehouse.location], self.uid))
self._i18n_label = loads(str(dbwarehouse.i18n_label))
self._i18n_description = dict(dbwarehouse.i18n_description)
# Vcs
if dbwarehouse.vcs:
password = decrypt(dbwarehouse.vcs_password, 'warehouse')
password = '-' if dbwarehouse.vcs_password and not password \
else password
self.vcs: VcsGit | VcsNone = VcsGit(
self.uid, self.root, dbwarehouse, password)
else:
self.vcs = VcsNone(self.uid, self.root, dbwarehouse)
self._normalize = str(dbwarehouse.normalize)
self.access = dbwarehouse.access
# Max size of a download
self.download_max_size = dbwarehouse.download_max_size
# Lock and refresh
self.lock_ttl = dbwarehouse.lock_ttl
refreshed_file = join(self.root, REFRESHED_FILE)
self._refresh_period = dbwarehouse.refresh_period
self._refreshed = getmtime(refreshed_file) \
if exists(refreshed_file) else 0
# Rendering directory
self.rendering_dir = self._dir2abs_path(
registry, dbwarehouse.rendering_dir)
# Card fields
# pylint: disable = too-many-boolean-expressions
known = registry['fields']
fields = []
for dbitem in dbwarehouse.cardfields:
if dbitem.field_id in known and \
0 <= known[dbitem.field_id]['in_cards'] and (
dbitem.field_id in CORE_DISPLAY or (
known[dbitem.field_id]['stored'] and
not known[dbitem.field_id]['hidden'] and
not known[dbitem.field_id]['core'])):
fields.append((dbitem.field_id, dbitem.classes))
self.cardfields = tuple(fields) \
if fields else default_fields(known, 'in_cards')
# List fields
fields = []
for dbitem in dbwarehouse.listfields:
if dbitem.field_id in known and \
0 <= known[dbitem.field_id]['in_list'] and (
dbitem.field_id in CORE_DISPLAY or (
known[dbitem.field_id]['stored'] and
not known[dbitem.field_id]['hidden'] and
not known[dbitem.field_id]['core'])):
fields.append((dbitem.field_id, dbitem.classes))
self.listfields = tuple(fields) \
if fields else default_fields(known, 'in_list')
# Metadata fields
# pylint: enable = too-many-boolean-expressions
fields = []
for dbitem in dbwarehouse.metafields:
if dbitem.field_id in known and \
0 <= known[dbitem.field_id]['in_meta'] and \
not known[dbitem.field_id]['core']:
fields.append(dbitem.field_id)
self.metafields = tuple(fields) \
if fields else default_fields(known, 'in_meta')
# Seeds
self._seed_ids = \
['directory'] + [k.seed_id for k in dbwarehouse.seeds]
# Jobs
self._job_ids = [k.job_id for k in dbwarehouse.jobs]
self._jobs: dict[str, dict] = {}
# -------------------------------------------------------------------------
[docs]
def label(self, request: Request) -> str:
"""Return a translated label.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
return translate_field(request, self._i18n_label, self.uid)
# -------------------------------------------------------------------------
[docs]
def description(self, request: Request) -> str:
"""Return a translated description.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
if not self._i18n_description:
return ''
return translate_field(request, self._i18n_description)
# -------------------------------------------------------------------------
[docs]
def must_refresh(self, now: bool = False, force: bool = False) -> bool:
"""Check if the warehouse must be refreshed."""
return now or force or bool(
self._refresh_period
and self._refreshed + 60 * self._refresh_period < time())
# -------------------------------------------------------------------------
[docs]
def refresh(
self,
request: Request,
ciopaths: list[CioPath] | set[CioPath] | None = None,
now: bool = False,
force: bool = False,
in_thread: bool = False) -> str | None:
"""Call the backend for refreshing the index. If ``paths`` is ``None``,
do a full refresh.
:type request: pyramid.request.Request
:param request:
Current request.
:param list paths: (optional)
List of `CioPath` to refresh.
:param bool now: (default=False)
If ``True``, force refreshing even if the deadline is not reached.
:param bool force: (default=False)
If ``True``, force refreshing even if the source is older than the
index.
:param bool in_thread: (default=False)
Launch the refresh in a thread.
:rtype: str
"""
# Nothing to do
if not ciopaths and not self.must_refresh(now, force):
return None
# Record the time of last refresh
if not ciopaths:
self.refreshed()
# Call directly...
if not in_thread:
return self._refresh(request, ciopaths, force)
# ...or in a thread
thread = Thread(
target=self._refresh,
name=f'{self.uid}:refresh',
args=(request, ciopaths, force))
thread.start()
return None
# -------------------------------------------------------------------------
def _refresh(
self, request: Request,
ciopaths: list[CioPath] | set[CioPath] | None,
force: bool) -> str | None:
"""Private fucntion to call the backend for refreshing the index.
:type request: pyramid.request.Request
:param request:
Current request.
:param list ciopaths:
List of `CioPath` to refresh.
:param bool force:
If ``True``, force refreshing even if the source is older than the
index.
:rtype: str
"""
if ciopaths is None:
self.vcs.pull()
if self.vcs.is_dirty() and 'user' in request.session:
self.vcs.add()
self.vcs.commit(
request.localizer.translate(_('Maintenance')),
request.session['user']['name'],
request.session['user']['email'])
paths = [k.path for k in ciopaths] if ciopaths is not None else None
err = request.registry['modules']['ciowarehouse2'].backend.index(
self, paths=paths, force=force)
if err is not None: # pragma: nocover
self.error(err, request)
if ciopaths:
try:
remove(join(self.root, REFRESHED_FILE))
except OSError: # pragma: nocover
pass
return err
return None
# -------------------------------------------------------------------------
[docs]
def refreshed(self):
"""Set the refreshed flag."""
self._refreshed = time()
refreshed_file = join(self.root, REFRESHED_FILE)
makedirs(dirname(refreshed_file), exist_ok=True)
with open(refreshed_file, 'wb'):
pass
# -------------------------------------------------------------------------
[docs]
def to_refresh(self, inputs: list[str], paths: list[str]) -> list[CioPath]:
"""Compute an optimized list of `CioPath` to refresh.
:param list inputs:
List of absolute paths to the input files.
:param str paths:
List of relative paths to the output files.
:rtype: list
"""
if not inputs:
return CioPath.from_paths(
self.uid, self.root, [k.partition(sep)[0] for k in paths])
root = relpath(
commonpath(inputs) if len(inputs) > 1 else dirname(inputs[0]),
self.root)
path_set = set()
for path in paths:
chunks = relpath(path, root).split(sep)
depth = [k for k in chunks if k == '..']
index = len(depth)
index = index if index < len(chunks) else len(chunks) - 1
path_set.add(normpath(join(root, sep.join(depth), chunks[index])))
return CioPath.from_paths(self.uid, self.root, path_set)
# -------------------------------------------------------------------------
[docs]
def commit_and_refresh(
self,
request: Request,
ciopaths: list[CioPath] | set[CioPath],
message: str,
force: bool = False,
in_thread: bool = False):
"""Commit new files and refresh the warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param list ciopaths:
List of `CioPath`.
:type message: pyramid.i18n.TranslationString
:param message:
Commit message.
:param bool force: (default=False)
If ``True``, force refreshing even if the source is older than the
index.
:param bool in_thread: (default=False)
Launch the refresh in a thread.
"""
# VCS
err = self.vcs.commit(
request.localizer.translate(message),
request.session['user']['name'], request.session['user']['email'])
if err: # pragma: nocover
request.session.flash(err, 'alert')
# Refresh
self.refresh(
request, ciopaths, now=True, force=force, in_thread=in_thread)
# -------------------------------------------------------------------------
[docs]
def file_trail(self, request: Request, ciopath: CioPath) -> str:
"""Return a HTML trail of the given `CioPath`.
:type request: pyramid.request.Request
:param request:
Current request.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current directory.
:rtype: str
"""
if ciopath.wid != self.uid:
return ''
root_label = request.localizer.translate(_('Root of the warehouse'))
if not ciopath or ciopath.is_root():
return Builder().span(
' ',
title=root_label,
class_='cioIconButton cioButtonWarehouse cioOff')
html_path = Builder().a(
Builder().span(' ', class_='cioIconButton cioButtonWarehouse'),
href=request.route_path('browse', ciopath=self.uid),
title=root_label,
class_='cioTrailChunk',
data_ciopath=CioPath(self.uid))
list_path = ciopath.path.split(sep)
for index in range(len(list_path) - 1):
path = '/'.join(list_path[0:index + 1])
html_path += ' / ' + Builder().a(
list_path[index],
href=request.route_path(
'browse', ciopath=f'{self.uid}/{path}'),
data_ciopath=CioPath(self.uid, path, True),
class_='cioTrailChunk')
html_path += ' / ' + Builder().span(list_path[-1])
return html_path
# -------------------------------------------------------------------------
[docs]
def file_normalize(
self, file_name: str, is_dir: bool = False) -> str | None:
"""Return a normalized file name or ``None`` if the file is in the
excluded file list.
:param str fielname:
Name to normalize.
:param bool is_dir: (default=False)
``True`` if the file is a directory.
:rtype: str
"""
if '\\' in file_name:
file_name = file_name.split('\\')[-1]
file_name = basename(file_name)
if file_name in EXCLUDED_FILES:
return None
if self._normalize:
file_name = normalize_filename(file_name, self._normalize, is_dir)
return file_name
# -------------------------------------------------------------------------
[docs]
def move_from(
self,
from_warehouse: Warehouse,
from_ciopath: CioPath,
to_ciopath: CioPath,
overwrite_ok: bool = False,
mode: str = 'move') -> tuple[CioPath | None, str | None]:
"""Move or copy a file from an another warehouse.
:type from_warehouse: .lib.warehouse.Warehouse
:param from_warehouse:
Source warehouse object.
:type from_ciopath: .lib.ciopath.CioPath
:param from_ciopath:
`CioPath` of the source file.
:type to_ciopath: .lib.ciopath.CioPath
:param to_ciopath:
`CioPath` of the destination file.
:param bool overwrite_ok: (default=False)
If ``True``, silently overwrite the destination file.
:param str mode: (``'move'``, ``'copy'``, ``'rename'``)
The way the move must operate.
:rtype: tuple
:return:
A tuple such as ``(final_to_ciopath, error)``.
"""
# Prepare files
if from_warehouse.uid != from_ciopath.wid \
or self.uid != to_ciopath.wid:
return None, _('File outside the warehouse')
final_ciopath, err = file_move_check(
from_warehouse.root, from_ciopath, self.root, to_ciopath,
overwrite_ok, mode)
if final_ciopath is None:
return final_ciopath, err
# Copy file
action = copy_content if from_ciopath.is_directory() else copy
abs_path = final_ciopath.absolute_path(self.root)
if abs_path:
makedirs(dirname(abs_path), exist_ok=True)
action(
from_ciopath.absolute_path(from_warehouse.root) or '.',
abs_path)
# Copy information file
from_info = from_ciopath.absolute_info(from_warehouse.root)
to_info = final_ciopath.absolute_info(self.root)
if from_info and to_info and exists(from_info):
makedirs(dirname(to_info), exist_ok=True)
copy(from_info, to_info)
if from_info and from_ciopath.is_directory():
from_info = splitext(from_info)[0]
if isdir(from_info) and to_info:
to_info = splitext(to_info)[0]
copy_content(from_info, to_info)
# Add/remove
self.vcs.add(final_ciopath)
if mode == 'move':
from_warehouse.vcs.remove(from_ciopath)
return final_ciopath, None
# -------------------------------------------------------------------------
[docs]
def lock(
self,
name: str,
ciopath: CioPath | None = None,
relock: bool = False) -> tuple[bool, str]:
"""Lock a file, a directory or the whole warehouse with a name.
:param str name:
Name of the responsible of the lock.
:type ciopath: .lib.ciopath.CioPath
:param ciopath: (optional)
`CioPath` of the file to lock.
:param bool relock:
If ``True`` update the date/time of the lock.
:rtype: str
"""
if ciopath is None:
ciopath = CioPath(self.uid)
lock_file = ciopath.absolute_lock(self.root)
lock_warehouse = CioPath(self.uid).absolute_lock(self.root)
if not lock_file or ciopath.wid != self.uid or not lock_warehouse:
return False, 'error'
if not ciopath.is_root() and exists(lock_warehouse) and \
getmtime(lock_warehouse) + self.lock_ttl > time():
with open(lock_warehouse, 'r', encoding='utf8') as hdl:
locker = hdl.read()
return False, locker
if not relock and exists(lock_file) and \
getmtime(lock_file) + self.lock_ttl > time():
with open(lock_file, 'r', encoding='utf8') as hdl:
locker = hdl.read()
return False, locker
try:
makedirs(dirname(lock_file), exist_ok=True)
with open(lock_file, 'w', encoding='utf8') as hdl:
hdl.write(name)
except (OSError, FileNotFoundError): # pragma: nocover
return True, name
return True, name
# -------------------------------------------------------------------------
[docs]
def unlock(self, ciopath: CioPath | None = None):
"""Unlock a file, a directory or the whole warehouse.
:type ciopath: .lib.ciopath.CioPath
:param ciopath: (optional)
`CioPath` of the file to unlock.
"""
if ciopath is None:
ciopath = CioPath(self.uid)
lock_file = ciopath.absolute_lock(self.root)
if not lock_file or ciopath.wid != self.uid:
return
if exists(lock_file):
try:
remove(lock_file)
except OSError: # pragma: nocover
pass
lock_file = dirname(lock_file)
if not exists(lock_file):
return
while lock_file != self.root:
try:
if tuple(scandir(lock_file)):
break
rmdir(lock_file)
except OSError: # pragma: nocover
break
lock_file = dirname(lock_file)
# -------------------------------------------------------------------------
[docs]
def unlock_all(self):
"""Remove lock directory."""
lock_dir = join(self.root, LOCKS_DIR)
if exists(lock_dir):
rmtree(lock_dir)
# -------------------------------------------------------------------------
@cache_user_seeds(CIOWAREHOUSE2_NS, CACHE_REGION_USER)
def seeds(self, request: Request) -> tuple:
"""Return a tuple of available seeds for this warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: tuple
:return:
A tuple of tuples such as
``(seed_id, icon, i18n_label, seeder_id)``.
"""
seeds = []
for seed_id in self._seed_ids:
i18n_seed = Seeder.i18n_seed(request, seed_id)
if i18n_seed is not None:
seeds.append(i18n_seed)
return tuple(seeds)
# -------------------------------------------------------------------------
[docs]
def jobs(self, request: Request) -> dict:
"""Return a dictionary of available jobs for this warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: dict
"""
config_file = request.registry.settings['__file__']
config = ConfigParser({'here': dirname(config_file)})
config.read(config_file, encoding='utf8')
for job_id in self._job_ids:
self.job(request, job_id, config)
return self._jobs
# -------------------------------------------------------------------------
[docs]
def job(
self,
request: Request,
job_id: str,
config: ConfigParser | None = None) -> dict | None:
"""Return a dictionary representing the job.
:type request: pyramid.request.Request
:param request:
Current request.
:param str job_id:
Job ID.
:type config: configparser.ConfigParser
:param config: (optional)
Configuration parser based on application configuration file.
:rtype: dict
:return:
A dictionary with keys ``'job_id'``, ``'i18n_label'``,
``'i18n_description'``, ``'icon'``, ``'threaded'``, ``'ttl'``,
``'priority'``, ``'settings'``, ``'service_id'``.
If a problem succeeds, it returns ``None``.
"""
if job_id in self._jobs:
return self._jobs[job_id]
dbjob = request.dbsession.query(DBJob).filter_by(job_id=job_id).first()
if dbjob is None:
return None
service = request.registry['services'].get(dbjob.service) \
if 'services' in request.registry else None
if service is None:
log_error(request, request.localizer.translate( # yapf: disable
_('Service "${s}" is not available.', {'s': dbjob.service})))
return None
if config is None:
config_file = request.registry.settings['__file__']
config = ConfigParser({'here': dirname(config_file)})
config.read(config_file, encoding='utf8')
section = f'Job:{job_id}'
settings = dict(config.items(section)) if config.has_section(section) \
else config.defaults()
self._jobs[job_id] = { # yapf: disable
'job_id': job_id,
'i18n_label': loads(dbjob.i18n_label),
'i18n_description': dbjob.i18n_description,
'icon': attachment_url(
request, dbjob.attachments_dir, dbjob.attachments_key,
dbjob.icon),
'service_id': service.uid,
'context': dbjob.context,
'access': dbjob.access,
'threaded': dbjob.threaded,
'ttl': dbjob.ttl,
'priority': dbjob.priority,
'users': tuple(k.user_id for k in dbjob.users),
'groups': {k.group_id for k in dbjob.groups},
'settings': settings
}
return self._jobs[job_id]
# -------------------------------------------------------------------------
def _dir2abs_path(self, registry, directory: Column) -> str | None:
"""Return an absolute path to the directory if exists.
:param dict registry:
Application registry.
:param directory:
Relative path to a local directory or a CioPath as a string.
:rtype: str
"""
if not directory:
return None
# Local directory
ciopath = CioPath.from_str(str(directory))
if not ciopath:
abs_path = normpath(join(self.root, directory))
return abs_path \
if abs_path[:len(self.root)] == self.root and isdir(abs_path) \
else None
# CioPath
for location in registry['modules']['ciowarehouse2'].locations.values(
):
root = normpath(join(location, ciopath.wid)) # type: ignore
if isdir(root):
abs_path = ciopath.absolute_path(root) # type: ignore
return abs_path \
if abs_path is not None and abs_path[:len(root)] == root \
and isdir(abs_path) else None
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def error(cls, message: str, request: Request | None = None):
"""Log an error message.
:param str message:
Error message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.error(translate(message))
else:
log_error(request, translate(message, lang='en'))
# -------------------------------------------------------------------------
[docs]
@classmethod
def warning(cls, message: str, request: Request | None = None):
"""Log an warning message.
:param str message:
Warning message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.warning(translate(message))
else:
log_warning(request, translate(message, lang='en'))