Source code for ciowarehouse2.lib.manager

"""A file manager is responsible for displaying and, possibly, editing  a file.
"""

from __future__ import annotations
from os import makedirs
from os.path import join, exists, dirname, basename, normpath, isabs, isdir
from logging import getLogger
from re import compile as re_compile
from shutil import rmtree, copy
from time import time

from chameleon import PageTemplateFile
from pytomlpp import loads as toml_loads
from lxml.etree import ElementTree

from pyramid.asset import abspath_from_asset_spec
from pyramid.config import Configurator
from pyramid.request import Request

from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.log import log_error, log_warning
from chrysalio.lib.panel import Panel
from chrysalio.includes.themes import theme_static_prefix, theme_has_static
from ..lib.ciotype import CioType
from ..lib.ciopath import CioPath
from ..lib.utils import CACHE_REGION_USER
from ..lib.utils import CIOWAREHOUSE2_NS, cache_user_renderings
from ..lib.warehouse import Warehouse
from ..lib.i18n import _, translate

LOG = getLogger(__name__)
EXCLUDED_FILES_RE = re_compile('(__pycache__|\\.pyc?$)')
LAYOUT_VIEW_PT = 'ciowarehouse2:Templates/manager_layout_view.pt'
LAYOUT_EDIT_PT = 'ciowarehouse2:Templates/manager_layout_edit.pt'
CHRYSALIO_JS = ('/js/js.cookie.js', '/js/jquery.js', '/js/chrysalio.js')


# =============================================================================
[docs] class Manager(): """Base class for file managers. This object provides rendering for a particular type of file. ``viewings`` and ``editings`` attributes are lists of dictionaries with the following keys: * ``'label'``: (required) label of the rendering * ``'template'``: (required) Chameleon template to use * ``'css'``: list of paths to CSS file * ``'js'``: list of path to Javascript file * ``'only4groups'``: set of groups of authorized users for editing """ ciotype = CioType(None) uid: str = '' label: str | None = None imports: tuple[tuple[str, str]] | None = None viewings: tuple[dict] | tuple = () editings: tuple[dict] | tuple = () panel: Panel | None = None _home = normpath(join(dirname(__file__), '..', 'managers')) # ------------------------------------------------------------------------- def __init__(self): """Constructor method.""" if not isinstance(self.ciotype, CioType): self.ciotype = CioType.from_str(self.ciotype) if not self.uid: self.uid = self.ciotype.uid() if self.label is None: self.label = self.uid self.home = self._home self._develop = False # -------------------------------------------------------------------------
[docs] @classmethod def register( cls, environment: Configurator | dict, manager_class) -> Manager: """Method to register the manager. :type environment: :class:`pyramid.config.Configurator` or :class:`dict` :param environment: Object used to do configuration declaration within the application or a ScriptRegistry to simulate the application registry. :param manager_class: Manager class. :param dict kwargs: Keyworded arguments :rtype: .lib.manager.Manager """ # Server mode (environment == configurator) manager = manager_class() if hasattr(environment, 'registry'): if 'managers' not in environment.registry: environment.registry['managers'] = {} environment.registry['managers'][manager.ciotype] = manager # Populate/backup/execute mode (environment == ScriptRegistry) else: if 'managers' not in environment: environment['managers'] = {} environment['managers'][manager.ciotype] = manager return manager
# -------------------------------------------------------------------------
[docs] def initialize(self, config: dict): """Initialize the manager. :param dict config: Dictionary with keys ``'root'`` and ``'develop'`` where ``'root'`` is the absolute path to the root directory for manager with imports. """ self._develop = config.get('develop') == 'true' if config.get('root') and self.imports: self.home = normpath(join(config['root'], self.uid))
# -------------------------------------------------------------------------
[docs] def install(self, force: bool = False): """Install a manager in an alternative home directory with its imports. :param bool force: If ``True``, force installation. """ if self._home == self.home: return if exists(self.home): if not force: return rmtree(self.home) makedirs(self.home, exist_ok=True) # Copy imports for import_ in self.imports or (): target = normpath(join(self.home, import_[1])) if not target.startswith(self.home): continue path = import_[0] if ':' in path: path = abspath_from_asset_spec(path) if not isabs(path): path = join(self._home, path) if isdir(path): copy_content_re(path, target, EXCLUDED_FILES_RE) elif EXCLUDED_FILES_RE.search( basename(path)) is None and exists(path): makedirs(dirname(target), exist_ok=True) copy(path, target) # Copy original copy_content_re(self._home, self.home, EXCLUDED_FILES_RE)
# -------------------------------------------------------------------------
[docs] def match(self, ciotype: CioType) -> bool: """Check whether this file manager matches with the CioType. :type ciotype: .lib.ciotype.CioType :param ciotype: The `CioType` to compare with mine. :rtype: bool """ return self.ciotype.match(ciotype)
# -------------------------------------------------------------------------
[docs] def current_rendering( self, request: Request, warehouse: Warehouse, rendering_type: str) -> dict | None: """Find the current rendering. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :param str rendering_type: ('viewing' or 'editing') Type of rendering. :rtype: dict """ if not rendering_type: return None renderings = self._available_renderings(request, warehouse) renderings = renderings.get('editings') \ if rendering_type == 'editing' else renderings.get('viewings') if not renderings: return None if 'managers' not in request.session: request.session['managers'] = {} if self.uid not in request.session['managers']: request.session['managers'][self.uid] = {} index = int(request.params['rendering']) \ if 'rendering' in request.params \ and request.params['rendering'].isdigit() \ else request.session['managers'][self.uid].get(rendering_type, 0) index = index if index < len(renderings) else 0 request.session['managers'][self.uid][rendering_type] = index return renderings[index]
# -------------------------------------------------------------------------
[docs] def view( self, request: Request, warehouse: Warehouse, ciopath: CioPath, ts_factory=None) -> str | None: """Return a string containing HTML to display the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current file. :param ts_factory: (optional) Translation String Factory fucntion. :rtype: :class:`str` or ``None`` """
# -------------------------------------------------------------------------
[docs] def can_edit(self) -> bool: """Return ``True`` if it can produce an editor. :rtype: bool """ return bool(self.editings)
# -------------------------------------------------------------------------
[docs] def edit( self, request: Request, warehouse: Warehouse, ciopath: CioPath, ts_factory=None) -> str | None: """Return a string containing HTML to edit the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current file. :param ts_factory: (optional) Translation String Factory fucntion. :rtype: :class:`str` or ``None`` """
# -------------------------------------------------------------------------
[docs] def save( self, request: Request, warehouse: Warehouse, ciopath: CioPath, editing: dict, values: dict, go_on: bool, original: ElementTree | None = None) -> str | None: """Save the file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current file. :param dict editing: Dictionary representing the editing. :param dict values: Modified values. :param bool go_on: ``True`` if the modification continues after saving. :type original: :class:`lxml.etree._ElementTree` :param original: (optional) Initial content of the file. :rtype: :class:`str` or ``None`` :return: An error message or ``None``. """ # pylint: disable = too-many-arguments, too-many-positional-arguments # pylint: disable = unused-argument return 'Not implemented!'
# -------------------------------------------------------------------------
[docs] def edit_finalization( self, request: Request, warehouse: Warehouse, ciopath: CioPath, message: str | None = None): """Commit changes, unlock files and refresh warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current file. :param str message: (optional) Commit meesage. :rtype: bool """ if warehouse.vcs.is_dirty(): if message is None: editing = self.current_rendering(request, warehouse, 'editing') if editing is not None: message = _( 'Online editing in "${m}" mode', {'m': translate(editing['label'], request=request)}) else: message = _('Online editing') warehouse.vcs.add(ciopath) warehouse.vcs.commit( # yapf: disable translate(message, request=request), request.session['user']['name'], request.session['user']['email']) self.file_unlock(request, warehouse, ciopath) warehouse.refresh(request, [ciopath])
# -------------------------------------------------------------------------
[docs] @classmethod def file_lock( cls, request: Request, warehouse: Warehouse, ciopath: CioPath) -> tuple[bool, str]: """Lock a file for myself for the purpose of editing it. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the file to lock. :rtype: bool """ if 'ciowarehouse2' not in request.session: request.session['ciowarehouse2'] = {} if 'editing' not in request.session['ciowarehouse2']: request.session['ciowarehouse2']['editing'] = {} name = request.session['user']['name'] if ciopath in request.session['ciowarehouse2']['editing'] and \ request.session['ciowarehouse2']['editing'][ciopath] > time(): warehouse.lock(name, ciopath, relock=True) request.session['ciowarehouse2']['editing'][ciopath] = \ time() + warehouse.lock_ttl return True, name locked, locker = warehouse.lock(name, ciopath) if locked: request.session['ciowarehouse2']['editing'][ciopath] = \ time() + warehouse.lock_ttl elif ciopath in request.session['ciowarehouse2']['editing']: del request.session['ciowarehouse2']['editing'][ciopath] return locked, locker
# -------------------------------------------------------------------------
[docs] @classmethod def file_unlock( cls, request: Request, warehouse: Warehouse, ciopath: CioPath): """Unlock a file. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the file to unlock. """ warehouse.unlock(ciopath) if 'ciowarehouse2' in request.session and \ 'editing' in request.session['ciowarehouse2'] and \ ciopath in request.session['ciowarehouse2']['editing']: del request.session['ciowarehouse2']['editing'][ciopath]
# -------------------------------------------------------------------------
[docs] def abspath_from_home(self, path: str) -> str: """Return an absolute path. :param str path: Absolute path or relative path to the home directory of the manager. :rtype: str """ if ':' in path: path = abspath_from_asset_spec(path) return path if isabs(path) else join(self.home, path)
# -------------------------------------------------------------------------
[docs] @classmethod def themed_urls(cls, request, rendering, name): """Return a list of URLs possibly completed with theme prefix. :type request: pyramid.request.Request :param request: Current request. :param dict rendering: Dictionary defining the rendering. :param str name: Name of the bunch of URLs ('css', 'js') :rtype: list """ url_list = [] theme = theme_static_prefix(request) for url in rendering.get(name) or '': url_list.append( url if url.startswith('http') or url.startswith('/file') else '{0}{1}'.format(theme, url)) return url_list
# ------------------------------------------------------------------------- def _chameleon_render( self, request: Request, warehouse: Warehouse, ciopath: CioPath, rendering: dict, ts_factory, values: dict) -> str | None: """Execute a Chameleon render and return the result. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :type ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the current file. :param dict rendering: Dictionary defining the rendering. :param ts_factory: Translation String Factory fucntion. :param dict values: Dictionary of values for template variables. :rtype: :class:`str` or ``None`` """ # pylint: disable = too-many-arguments, too-many-positional-arguments my_domain = rendering['template'].partition(':')[0] \ if ':' in rendering['template'] else 'ciowarehouse2' def _translate( msgid, domain=my_domain, mapping=None, default=None, context=None, target_language=None): """Translation for Chameleon.""" # pylint: disable = unused-argument return request.localizer.translate( msgid, domain=domain or my_domain, mapping=mapping) can_edit = self.can_edit() and \ request.registry['modules']['ciowarehouse2'].warehouse_file_writer( request, warehouse) values.update({ # yapf: disable 'request': request, '_': ts_factory, 'local_translate': request.localizer.translate, 'global_class': ' {0}'.format( request.GET['class']) if request.GET.get('class') else '', 'local_class': ' {0}'.format( values['class']) if values.get('class') else '', 'action': get_action(request)[0], 'title': request.registry['settings']['title'], 'theme': theme_static_prefix(request), 'theme_has': theme_has_static, 'manager': self, 'rendering': rendering, 'ciopath': ciopath, 'route_directory': request.route_path( 'browse', ciopath=ciopath.parent().route()), 'route_download': request.route_path( 'file_download', ciopath=ciopath.route()), 'route_close': self._route_close(request, ciopath.uid())}) values['route_previous'], values['route_next'] = self._routes_around( request, ciopath) values.update(self._available_renderings(request, warehouse)) values['can_edit'] = bool(can_edit and values['editings']) if 'form' not in values: values['form'] = Form(request) if 'rendering_num' not in values: values['rendering_num'] = 0 if 'content' not in values: values['content'] = '' try: return PageTemplateFile( abspath_from_asset_spec(rendering['template']), translate=_translate).render(**values) except (SyntaxError, UnboundLocalError, AssertionError, TypeError, NameError, AttributeError, KeyError) as error: # pragma: nocover self._log_error(str(error), request) return None # ------------------------------------------------------------------------- @classmethod def _route_close(cls, request: Request, uid: str) -> str: """Return the URL to close the view and return to the previous page. :type request: pyramid.request.Request :param request: Current request. :param str uid: ID of the current file. :rtype: str """ return f'{request.breadcrumbs.back_path()}#{uid}_' # ------------------------------------------------------------------------- def _routes_around(self, request: Request, ciopath: CioPath) -> tuple[str | None, str | None]: """Return a tuple of URL to go to the previous and the next file according to the file type. :type request: pyramid.request.Request :param request: Current request. :rtype: tuple :return: A tuple such as ``(route_previous, route_next)``. """ if 'current_files' not in request.session: return None, None files = request.session['current_files'] # Find the current file in the list index = -1 for k, item in enumerate(files): if item[0] == ciopath: index = k break if index == -1: return None, None # Find the previous one ciowarehouse2 = request.registry['modules']['ciowarehouse2'] route_previous = None for k in range(index - 1, -1, -1): if files[k][1].is_directory(): continue manager = ciowarehouse2.manager(request, files[k][1]) if manager is not None: route_previous = request.route_path( 'file_view', ciotype=files[k][1].route(), ciopath=files[k][0].route()) break # Find the next one route_next = None for k in range(index + 1, len(files)): if files[k][1].is_directory(): continue manager = ciowarehouse2.manager(request, files[k][1]) if manager is not None: route_next = request.route_path( 'file_view', ciotype=files[k][1].route(), ciopath=files[k][0].route()) break return route_previous, route_next # ------------------------------------------------------------------------- @cache_user_renderings(CIOWAREHOUSE2_NS, CACHE_REGION_USER) def _available_renderings(self, request: Request, warehouse: Warehouse): """Return a dictionary of available renderings. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Object describing the warehouse containing the file. :rtype: dictionary """ available: dict[str, list] = {} user_groups = set(request.session['user']['groups']) toml = None if warehouse.rendering_dir: warehouse_toml = join(warehouse.rendering_dir, f'{self.uid}.toml') if exists(warehouse_toml): with open(warehouse_toml, 'r', encoding='utf8') as hdl: toml = toml_loads(hdl.read()) else: toml = None for renderings in ('viewings', 'editings'): available[renderings] = [] done = set() # From a custom directory if toml is not None: for rendering in toml.get(renderings, ''): rendering = self._check_rendering( request, user_groups, toml.get(f'default_{renderings}'), rendering) if rendering is not None: available[renderings].append(rendering) done.add(rendering['name']) # Hard coded rendering_list = self.editings \ if renderings == 'editings' else self.viewings available[renderings] += [ k for k in rendering_list if k['name'] not in done and ( k.get('only4groups') is None or k['only4groups'] & user_groups) ] return self._fix_renderings(request, available) # ------------------------------------------------------------------------- def _check_rendering( self, request: Request, user_groups: set, defaults: dict | None, rendering: dict) -> dict | None: """Check and complete a rendering. :type request: pyramid.request.Request :param request: Current request. :param set user_groups: Groups the user belongs to. :param dict defaults: Default values. :param dict rendering: Rendering to process. :rtype: dictionary """ if rendering.get('only4groups') and \ not set(rendering['only4groups']) & user_groups: return None if 'name' not in rendering: self._log_error( _('Name is missing for a dynaming rendering'), request) return None # Default values checked = dict(defaults) if defaults else {} checked.update(rendering) # Manager UID checked['manager_uid'] = checked.get('manager_uid', self.uid) # Label checked['label'] = checked['label'].get( request.locale_name, checked['label'].get('en', checked['name'])) \ if 'label' in checked else checked['name'] # Check if 'template' not in checked or 'css' not in checked: self._log_error( _('Rendering "${n}" is incorrect', {'n': checked['name']}), request) return None return checked # ------------------------------------------------------------------------- def _fix_renderings(self, request: Request, available: dict) -> dict: """Possibly Fix URLs. :type request: pyramid.request.Request :param request: Current request. :param dict available: Available renderings. :rtype: dictionary """ # pylint: disable = unused-argument return available # ------------------------------------------------------------------------- @classmethod def _log_error(cls, error: str, request: Request | None = None): """Log an error message. :param str error: Error message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.error(translate(error)) else: log_error(request, translate(error, lang='en')) # ------------------------------------------------------------------------- @classmethod def _log_warning(cls, warning: str, request: Request | None = None): """Log an warning message. :param str warning: Warning message. :type request: pyramid.request.Request :param request: (optional) Current request or ``None`` if called by populate script. """ if request is None: LOG.warning(translate(warning)) else: log_warning(request, translate(warning, lang='en'))