Source code for ciowarehouse2.lib.warehouse

"""Warehouse class."""

from __future__ import annotations
from logging import getLogger
from os import sep, scandir, makedirs, rmdir, remove
from os.path import exists, join, abspath, isdir, dirname, basename
from os.path import getmtime, splitext, normpath, relpath, commonpath
from shutil import rmtree, copy
from json import loads
from time import time
from threading import Thread
from configparser import ConfigParser

from sqlalchemy import Column

from pyramid.request import Request

from chrysalio.lib.utils import decrypt
from chrysalio.lib.utils import copy_content
from chrysalio.lib.i18n import translate_field
from chrysalio.lib.log import log_error, log_warning
from chrysalio.helpers.builder import Builder
from chrysalio.lib.attachment import attachment_url
from cioservice.models.dbjob import DBJob
from ..models.dbwarehouse import CORE_DISPLAY, DBWarehouse
from ..models.dbwarehouse import default_fields
from .utils import CIOWAREHOUSE2_NS, CACHE_REGION_USER, EXCLUDED_FILES
from .utils import normalize_filename, file_move_check, cache_user_seeds
from .seeder import Seeder
from .ciopath import LOCAL_DIR, LOCKS_DIR, CioPath
from .vcs_none import VcsNone
from .vcs_git import VcsGit
from .i18n import _, translate

LOG = getLogger(__name__)
REFRESHED_FILE = join(LOCAL_DIR, 'refreshed')


# =============================================================================
[docs] class Warehouse(): """Class to manage a warehouse. :param dict registry: Application registry. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: SQLAlchemy object representing the warehouse :param dict locations: Dictionary of locations. """ # pylint: disable = too-many-instance-attributes # ------------------------------------------------------------------------- def __init__(self, registry, dbwarehouse: DBWarehouse, locations: dict): """Constructor method.""" self.uid = str(dbwarehouse.warehouse_id) self.created = time() self.root = abspath(join(locations[dbwarehouse.location], self.uid)) self._i18n_label = loads(str(dbwarehouse.i18n_label)) self._i18n_description = dict(dbwarehouse.i18n_description) # Vcs if dbwarehouse.vcs: password = decrypt(dbwarehouse.vcs_password, 'warehouse') password = '-' if dbwarehouse.vcs_password and not password \ else password self.vcs: VcsGit | VcsNone = VcsGit( self.uid, self.root, dbwarehouse, password) else: self.vcs = VcsNone(self.uid, self.root, dbwarehouse) self._normalize = str(dbwarehouse.normalize) self.access = dbwarehouse.access # Max size of a download self.download_max_size = dbwarehouse.download_max_size # Lock and refresh self.lock_ttl = dbwarehouse.lock_ttl refreshed_file = join(self.root, REFRESHED_FILE) self._refresh_period = dbwarehouse.refresh_period self._refreshed = getmtime(refreshed_file) \ if exists(refreshed_file) else 0 # Rendering directory self.rendering_dir = self._dir2abs_path( registry, dbwarehouse.rendering_dir) # Card fields # pylint: disable = too-many-boolean-expressions known = registry['fields'] fields = [] for dbitem in dbwarehouse.cardfields: if dbitem.field_id in known and \ 0 <= known[dbitem.field_id]['in_cards'] and ( dbitem.field_id in CORE_DISPLAY or ( known[dbitem.field_id]['stored'] and not known[dbitem.field_id]['hidden'] and not known[dbitem.field_id]['core'])): fields.append((dbitem.field_id, dbitem.classes)) self.cardfields = tuple(fields) \ if fields else default_fields(known, 'in_cards') # List fields fields = [] for dbitem in dbwarehouse.listfields: if dbitem.field_id in known and \ 0 <= known[dbitem.field_id]['in_list'] and ( dbitem.field_id in CORE_DISPLAY or ( known[dbitem.field_id]['stored'] and not known[dbitem.field_id]['hidden'] and not known[dbitem.field_id]['core'])): fields.append((dbitem.field_id, dbitem.classes)) self.listfields = tuple(fields) \ if fields else default_fields(known, 'in_list') # Metadata fields # pylint: enable = too-many-boolean-expressions fields = [] for dbitem in dbwarehouse.metafields: if dbitem.field_id in known and \ 0 <= known[dbitem.field_id]['in_meta'] and \ not known[dbitem.field_id]['core']: fields.append(dbitem.field_id) self.metafields = tuple(fields) \ if fields else default_fields(known, 'in_meta') # Seeds self._seed_ids = \ ['directory'] + [k.seed_id for k in dbwarehouse.seeds] # Jobs self._job_ids = [k.job_id for k in dbwarehouse.jobs] self._jobs: dict[str, dict] = {} # -------------------------------------------------------------------------
[docs] def label(self, request: Request) -> str: """Return a translated label. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ return translate_field(request, self._i18n_label, self.uid)
# -------------------------------------------------------------------------
[docs] def description(self, request: Request) -> str: """Return a translated description. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ if not self._i18n_description: return '' return translate_field(request, self._i18n_description)
# -------------------------------------------------------------------------
[docs] def must_refresh(self, now: bool = False, force: bool = False) -> bool: """Check if the warehouse must be refreshed.""" return now or force or bool( self._refresh_period and self._refreshed + 60 * self._refresh_period < time())
# -------------------------------------------------------------------------
[docs] def refresh( self, request: Request, ciopaths: list[CioPath] | set[CioPath] | None = None, now: bool = False, force: bool = False, in_thread: bool = False) -> str | None: """Call the backend for refreshing the index. If ``paths`` is ``None``, do a full refresh. :type request: pyramid.request.Request :param request: Current request. :param list paths: (optional) List of `CioPath` to refresh. :param bool now: (default=False) If ``True``, force refreshing even if the deadline is not reached. :param bool force: (default=False) If ``True``, force refreshing even if the source is older than the index. :param bool in_thread: (default=False) Launch the refresh in a thread. :rtype: str """ # Nothing to do if not ciopaths and not self.must_refresh(now, force): return None # Record the time of last refresh if not ciopaths: self.refreshed() # Call directly... if not in_thread: return self._refresh(request, ciopaths, force) # ...or in a thread thread = Thread( target=self._refresh, name=f'{self.uid}:refresh', args=(request, ciopaths, force)) thread.start() return None
# ------------------------------------------------------------------------- def _refresh( self, request: Request, ciopaths: list[CioPath] | set[CioPath] | None, force: bool) -> str | None: """Private fucntion to call the backend for refreshing the index. :type request: pyramid.request.Request :param request: Current request. :param list ciopaths: List of `CioPath` to refresh. :param bool force: If ``True``, force refreshing even if the source is older than the index. :rtype: str """ if ciopaths is None: self.vcs.pull() if self.vcs.is_dirty() and 'user' in request.session: self.vcs.add() self.vcs.commit( request.localizer.translate(_('Maintenance')), request.session['user']['name'], request.session['user']['email']) paths = [k.path for k in ciopaths] if ciopaths is not None else None err = request.registry['modules']['ciowarehouse2'].backend.index( self, paths=paths, force=force) if err is not None: # pragma: nocover self.error(err, request) if ciopaths: try: remove(join(self.root, REFRESHED_FILE)) except OSError: # pragma: nocover pass return err return None # -------------------------------------------------------------------------
[docs] def refreshed(self): """Set the refreshed flag.""" self._refreshed = time() refreshed_file = join(self.root, REFRESHED_FILE) makedirs(dirname(refreshed_file), exist_ok=True) with open(refreshed_file, 'wb'): pass
# -------------------------------------------------------------------------
[docs] def to_refresh(self, inputs: list[str], paths: list[str]) -> list[CioPath]: """Compute an optimized list of `CioPath` to refresh. :param list inputs: List of absolute paths to the input files. :param str paths: List of relative paths to the output files. :rtype: list """ if not inputs: return CioPath.from_paths( self.uid, self.root, [k.partition(sep)[0] for k in paths]) root = relpath( commonpath(inputs) if len(inputs) > 1 else dirname(inputs[0]), self.root) path_set = set() for path in paths: chunks = relpath(path, root).split(sep) depth = [k for k in chunks if k == '..'] index = len(depth) index = index if index < len(chunks) else len(chunks) - 1 path_set.add(normpath(join(root, sep.join(depth), chunks[index]))) return CioPath.from_paths(self.uid, self.root, path_set)
# -------------------------------------------------------------------------
[docs] def commit_and_refresh( self, request: Request, ciopaths: list[CioPath] | set[CioPath], message: str, force: bool = False, in_thread: bool = False): """Commit new files and refresh the warehouse. :type request: pyramid.request.Request :param request: Current request. :param list ciopaths: List of `CioPath`. :type message: pyramid.i18n.TranslationString :param message: Commit message. :param bool force: (default=False) If ``True``, force refreshing even if the source is older than the index. :param bool in_thread: (default=False) Launch the refresh in a thread. """ # VCS err = self.vcs.commit( request.localizer.translate(message), request.session['user']['name'], request.session['user']['email']) if err: # pragma: nocover request.session.flash(err, 'alert') # Refresh self.refresh( request, ciopaths, now=True, force=force, in_thread=in_thread)
# -------------------------------------------------------------------------
[docs] def file_trail(self, request: Request, ciopath: CioPath) -> str: """Return a HTML trail of the given `CioPath`. :type request: pyramid.request.Request :param request: Current request. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current directory. :rtype: str """ if ciopath.wid != self.uid: return '' root_label = request.localizer.translate(_('Root of the warehouse')) if not ciopath or ciopath.is_root(): return Builder().span( ' ', title=root_label, class_='cioIconButton cioButtonWarehouse cioOff') html_path = Builder().a( Builder().span(' ', class_='cioIconButton cioButtonWarehouse'), href=request.route_path('browse', ciopath=self.uid), title=root_label, class_='cioTrailChunk', data_ciopath=CioPath(self.uid)) list_path = ciopath.path.split(sep) for index in range(len(list_path) - 1): path = '/'.join(list_path[0:index + 1]) html_path += ' / ' + Builder().a( list_path[index], href=request.route_path( 'browse', ciopath=f'{self.uid}/{path}'), data_ciopath=CioPath(self.uid, path, True), class_='cioTrailChunk') html_path += ' / ' + Builder().span(list_path[-1]) return html_path
# -------------------------------------------------------------------------
[docs] def file_normalize( self, file_name: str, is_dir: bool = False) -> str | None: """Return a normalized file name or ``None`` if the file is in the excluded file list. :param str fielname: Name to normalize. :param bool is_dir: (default=False) ``True`` if the file is a directory. :rtype: str """ if '\\' in file_name: file_name = file_name.split('\\')[-1] file_name = basename(file_name) if file_name in EXCLUDED_FILES: return None if self._normalize: file_name = normalize_filename(file_name, self._normalize, is_dir) return file_name
# -------------------------------------------------------------------------
[docs] def move_from( self, from_warehouse: Warehouse, from_ciopath: CioPath, to_ciopath: CioPath, overwrite_ok: bool = False, mode: str = 'move') -> tuple[CioPath | None, str | None]: """Move or copy a file from an another warehouse. :type from_warehouse: .lib.warehouse.Warehouse :param from_warehouse: Source warehouse object. :type from_ciopath: .lib.ciopath.CioPath :param from_ciopath: `CioPath` of the source file. :type to_ciopath: .lib.ciopath.CioPath :param to_ciopath: `CioPath` of the destination file. :param bool overwrite_ok: (default=False) If ``True``, silently overwrite the destination file. :param str mode: (``'move'``, ``'copy'``, ``'rename'``) The way the move must operate. :rtype: tuple :return: A tuple such as ``(final_to_ciopath, error)``. """ # Prepare files if from_warehouse.uid != from_ciopath.wid \ or self.uid != to_ciopath.wid: return None, _('File outside the warehouse') final_ciopath, err = file_move_check( from_warehouse.root, from_ciopath, self.root, to_ciopath, overwrite_ok, mode) if final_ciopath is None: return final_ciopath, err # Copy file action = copy_content if from_ciopath.is_directory() else copy abs_path = final_ciopath.absolute_path(self.root) if abs_path: makedirs(dirname(abs_path), exist_ok=True) action( from_ciopath.absolute_path(from_warehouse.root) or '.', abs_path) # Copy information file from_info = from_ciopath.absolute_info(from_warehouse.root) to_info = final_ciopath.absolute_info(self.root) if from_info and to_info and exists(from_info): makedirs(dirname(to_info), exist_ok=True) copy(from_info, to_info) if from_info and from_ciopath.is_directory(): from_info = splitext(from_info)[0] if isdir(from_info) and to_info: to_info = splitext(to_info)[0] copy_content(from_info, to_info) # Add/remove self.vcs.add(final_ciopath) if mode == 'move': from_warehouse.vcs.remove(from_ciopath) return final_ciopath, None
# -------------------------------------------------------------------------
[docs] def lock( self, name: str, ciopath: CioPath | None = None, relock: bool = False) -> tuple[bool, str]: """Lock a file, a directory or the whole warehouse with a name. :param str name: Name of the responsible of the lock. :type ciopath: .lib.ciopath.CioPath :param ciopath: (optional) `CioPath` of the file to lock. :param bool relock: If ``True`` update the date/time of the lock. :rtype: str """ if ciopath is None: ciopath = CioPath(self.uid) lock_file = ciopath.absolute_lock(self.root) lock_warehouse = CioPath(self.uid).absolute_lock(self.root) if not lock_file or ciopath.wid != self.uid or not lock_warehouse: return False, 'error' if not ciopath.is_root() and exists(lock_warehouse) and \ getmtime(lock_warehouse) + self.lock_ttl > time(): with open(lock_warehouse, 'r', encoding='utf8') as hdl: locker = hdl.read() return False, locker if not relock and exists(lock_file) and \ getmtime(lock_file) + self.lock_ttl > time(): with open(lock_file, 'r', encoding='utf8') as hdl: locker = hdl.read() return False, locker try: makedirs(dirname(lock_file), exist_ok=True) with open(lock_file, 'w', encoding='utf8') as hdl: hdl.write(name) except (OSError, FileNotFoundError): # pragma: nocover return True, name return True, name
# -------------------------------------------------------------------------
[docs] def unlock(self, ciopath: CioPath | None = None): """Unlock a file, a directory or the whole warehouse. :type ciopath: .lib.ciopath.CioPath :param ciopath: (optional) `CioPath` of the file to unlock. """ if ciopath is None: ciopath = CioPath(self.uid) lock_file = ciopath.absolute_lock(self.root) if not lock_file or ciopath.wid != self.uid: return if exists(lock_file): try: remove(lock_file) except OSError: # pragma: nocover pass lock_file = dirname(lock_file) if not exists(lock_file): return while lock_file != self.root: try: if tuple(scandir(lock_file)): break rmdir(lock_file) except OSError: # pragma: nocover break lock_file = dirname(lock_file)
# -------------------------------------------------------------------------
[docs] def unlock_all(self): """Remove lock directory.""" lock_dir = join(self.root, LOCKS_DIR) if exists(lock_dir): rmtree(lock_dir)
# ------------------------------------------------------------------------- @cache_user_seeds(CIOWAREHOUSE2_NS, CACHE_REGION_USER) def seeds(self, request: Request) -> tuple: """Return a tuple of available seeds for this warehouse. :type request: pyramid.request.Request :param request: Current request. :rtype: tuple :return: A tuple of tuples such as ``(seed_id, icon, i18n_label, seeder_id)``. """ seeds = [] for seed_id in self._seed_ids: i18n_seed = Seeder.i18n_seed(request, seed_id) if i18n_seed is not None: seeds.append(i18n_seed) return tuple(seeds) # -------------------------------------------------------------------------
[docs] def jobs(self, request: Request) -> dict: """Return a dictionary of available jobs for this warehouse. :type request: pyramid.request.Request :param request: Current request. :rtype: dict """ config_file = request.registry.settings['__file__'] config = ConfigParser({'here': dirname(config_file)}) config.read(config_file, encoding='utf8') for job_id in self._job_ids: self.job(request, job_id, config) return self._jobs
# -------------------------------------------------------------------------
[docs] def job( self, request: Request, job_id: str, config: ConfigParser | None = None) -> dict | None: """Return a dictionary representing the job. :type request: pyramid.request.Request :param request: Current request. :param str job_id: Job ID. :type config: configparser.ConfigParser :param config: (optional) Configuration parser based on application configuration file. :rtype: dict :return: A dictionary with keys ``'job_id'``, ``'i18n_label'``, ``'i18n_description'``, ``'icon'``, ``'threaded'``, ``'ttl'``, ``'priority'``, ``'settings'``, ``'service_id'``. If a problem succeeds, it returns ``None``. """ if job_id in self._jobs: return self._jobs[job_id] dbjob = request.dbsession.query(DBJob).filter_by(job_id=job_id).first() if dbjob is None: return None service = request.registry['services'].get(dbjob.service) \ if 'services' in request.registry else None if service is None: log_error(request, request.localizer.translate( # yapf: disable _('Service "${s}" is not available.', {'s': dbjob.service}))) return None if config is None: config_file = request.registry.settings['__file__'] config = ConfigParser({'here': dirname(config_file)}) config.read(config_file, encoding='utf8') section = f'Job:{job_id}' settings = dict(config.items(section)) if config.has_section(section) \ else config.defaults() self._jobs[job_id] = { # yapf: disable 'job_id': job_id, 'i18n_label': loads(dbjob.i18n_label), 'i18n_description': dbjob.i18n_description, 'icon': attachment_url( request, dbjob.attachments_dir, dbjob.attachments_key, dbjob.icon), 'service_id': service.uid, 'context': dbjob.context, 'access': dbjob.access, 'threaded': dbjob.threaded, 'ttl': dbjob.ttl, 'priority': dbjob.priority, 'users': tuple(k.user_id for k in dbjob.users), 'groups': {k.group_id for k in dbjob.groups}, 'settings': settings } return self._jobs[job_id]
# ------------------------------------------------------------------------- def _dir2abs_path(self, registry, directory: Column) -> str | None: """Return an absolute path to the directory if exists. :param dict registry: Application registry. :param directory: Relative path to a local directory or a CioPath as a string. :rtype: str """ if not directory: return None # Local directory ciopath = CioPath.from_str(str(directory)) if not ciopath: abs_path = normpath(join(self.root, directory)) return abs_path \ if abs_path[:len(self.root)] == self.root and isdir(abs_path) \ else None # CioPath for location in registry['modules']['ciowarehouse2'].locations.values( ): root = normpath(join(location, ciopath.wid)) # type: ignore if isdir(root): abs_path = ciopath.absolute_path(root) # type: ignore return abs_path \ if abs_path is not None and abs_path[:len(root)] == root \ and isdir(abs_path) else None return None # -------------------------------------------------------------------------
[docs] @classmethod def error(cls, message: str, request: Request | None = None): """Log an error message. :param str message: Error message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.error(translate(message)) else: log_error(request, translate(message, lang='en'))
# -------------------------------------------------------------------------
[docs] @classmethod def warning(cls, message: str, request: Request | None = None): """Log an warning message. :param str message: Warning message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.warning(translate(message)) else: log_warning(request, translate(message, lang='en'))