"""A file manager is responsible for displaying and, possibly, editing a file.
"""
from __future__ import annotations
from os import makedirs
from os.path import join, exists, dirname, basename, normpath, isabs, isdir
from logging import getLogger
from re import compile as re_compile
from shutil import rmtree, copy
from time import time
from chameleon import PageTemplateFile
from pytomlpp import loads as toml_loads
from lxml.etree import ElementTree
from pyramid.asset import abspath_from_asset_spec
from pyramid.config import Configurator
from pyramid.request import Request
from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.form import Form, get_action
from chrysalio.lib.log import log_error, log_warning
from chrysalio.lib.panel import Panel
from chrysalio.includes.themes import theme_static_prefix, theme_has_static
from ..lib.ciotype import CioType
from ..lib.ciopath import CioPath
from ..lib.utils import CACHE_REGION_USER
from ..lib.utils import CIOWAREHOUSE2_NS, cache_user_renderings
from ..lib.warehouse import Warehouse
from ..lib.i18n import _, translate
LOG = getLogger(__name__)
EXCLUDED_FILES_RE = re_compile('(__pycache__|\\.pyc?$)')
LAYOUT_VIEW_PT = 'ciowarehouse2:Templates/manager_layout_view.pt'
LAYOUT_EDIT_PT = 'ciowarehouse2:Templates/manager_layout_edit.pt'
CHRYSALIO_JS = ('/js/js.cookie.js', '/js/jquery.js', '/js/chrysalio.js')
# =============================================================================
[docs]
class Manager():
"""Base class for file managers.
This object provides rendering for a particular type of file.
``viewings`` and ``editings`` attributes are lists of dictionaries with
the following keys:
* ``'label'``: (required) label of the rendering
* ``'template'``: (required) Chameleon template to use
* ``'css'``: list of paths to CSS file
* ``'js'``: list of path to Javascript file
* ``'only4groups'``: set of groups of authorized users for editing
"""
ciotype = CioType(None)
uid: str = ''
label: str | None = None
imports: tuple[tuple[str, str]] | None = None
viewings: tuple[dict] | tuple = ()
editings: tuple[dict] | tuple = ()
panel: Panel | None = None
_home = normpath(join(dirname(__file__), '..', 'managers'))
# -------------------------------------------------------------------------
def __init__(self):
"""Constructor method."""
if not isinstance(self.ciotype, CioType):
self.ciotype = CioType.from_str(self.ciotype)
if not self.uid:
self.uid = self.ciotype.uid()
if self.label is None:
self.label = self.uid
self.home = self._home
self._develop = False
# -------------------------------------------------------------------------
[docs]
@classmethod
def register(
cls, environment: Configurator | dict, manager_class) -> Manager:
"""Method to register the manager.
:type environment: :class:`pyramid.config.Configurator` or
:class:`dict`
:param environment:
Object used to do configuration declaration within the application
or a ScriptRegistry to simulate the application registry.
:param manager_class:
Manager class.
:param dict kwargs:
Keyworded arguments
:rtype: .lib.manager.Manager
"""
# Server mode (environment == configurator)
manager = manager_class()
if hasattr(environment, 'registry'):
if 'managers' not in environment.registry:
environment.registry['managers'] = {}
environment.registry['managers'][manager.ciotype] = manager
# Populate/backup/execute mode (environment == ScriptRegistry)
else:
if 'managers' not in environment:
environment['managers'] = {}
environment['managers'][manager.ciotype] = manager
return manager
# -------------------------------------------------------------------------
[docs]
def initialize(self, config: dict):
"""Initialize the manager.
:param dict config:
Dictionary with keys ``'root'`` and ``'develop'`` where ``'root'``
is the absolute path to the root directory for manager with
imports.
"""
self._develop = config.get('develop') == 'true'
if config.get('root') and self.imports:
self.home = normpath(join(config['root'], self.uid))
# -------------------------------------------------------------------------
[docs]
def install(self, force: bool = False):
"""Install a manager in an alternative home directory with its
imports.
:param bool force:
If ``True``, force installation.
"""
if self._home == self.home:
return
if exists(self.home):
if not force:
return
rmtree(self.home)
makedirs(self.home, exist_ok=True)
# Copy imports
for import_ in self.imports or ():
target = normpath(join(self.home, import_[1]))
if not target.startswith(self.home):
continue
path = import_[0]
if ':' in path:
path = abspath_from_asset_spec(path)
if not isabs(path):
path = join(self._home, path)
if isdir(path):
copy_content_re(path, target, EXCLUDED_FILES_RE)
elif EXCLUDED_FILES_RE.search(
basename(path)) is None and exists(path):
makedirs(dirname(target), exist_ok=True)
copy(path, target)
# Copy original
copy_content_re(self._home, self.home, EXCLUDED_FILES_RE)
# -------------------------------------------------------------------------
[docs]
def match(self, ciotype: CioType) -> bool:
"""Check whether this file manager matches with the CioType.
:type ciotype: .lib.ciotype.CioType
:param ciotype:
The `CioType` to compare with mine.
:rtype: bool
"""
return self.ciotype.match(ciotype)
# -------------------------------------------------------------------------
[docs]
def current_rendering(
self, request: Request, warehouse: Warehouse,
rendering_type: str) -> dict | None:
"""Find the current rendering.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:param str rendering_type: ('viewing' or 'editing')
Type of rendering.
:rtype: dict
"""
if not rendering_type:
return None
renderings = self._available_renderings(request, warehouse)
renderings = renderings.get('editings') \
if rendering_type == 'editing' else renderings.get('viewings')
if not renderings:
return None
if 'managers' not in request.session:
request.session['managers'] = {}
if self.uid not in request.session['managers']:
request.session['managers'][self.uid] = {}
index = int(request.params['rendering']) \
if 'rendering' in request.params \
and request.params['rendering'].isdigit() \
else request.session['managers'][self.uid].get(rendering_type, 0)
index = index if index < len(renderings) else 0
request.session['managers'][self.uid][rendering_type] = index
return renderings[index]
# -------------------------------------------------------------------------
[docs]
def view(
self,
request: Request,
warehouse: Warehouse,
ciopath: CioPath,
ts_factory=None) -> str | None:
"""Return a string containing HTML to display the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current file.
:param ts_factory: (optional)
Translation String Factory fucntion.
:rtype: :class:`str` or ``None``
"""
# -------------------------------------------------------------------------
[docs]
def can_edit(self) -> bool:
"""Return ``True`` if it can produce an editor.
:rtype: bool
"""
return bool(self.editings)
# -------------------------------------------------------------------------
[docs]
def edit(
self,
request: Request,
warehouse: Warehouse,
ciopath: CioPath,
ts_factory=None) -> str | None:
"""Return a string containing HTML to edit the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current file.
:param ts_factory: (optional)
Translation String Factory fucntion.
:rtype: :class:`str` or ``None``
"""
# -------------------------------------------------------------------------
[docs]
def save(
self,
request: Request,
warehouse: Warehouse,
ciopath: CioPath,
editing: dict,
values: dict,
go_on: bool,
original: ElementTree | None = None) -> str | None:
"""Save the file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current file.
:param dict editing:
Dictionary representing the editing.
:param dict values:
Modified values.
:param bool go_on:
``True`` if the modification continues after saving.
:type original: :class:`lxml.etree._ElementTree`
:param original: (optional)
Initial content of the file.
:rtype: :class:`str` or ``None``
:return:
An error message or ``None``.
"""
# pylint: disable = too-many-arguments, too-many-positional-arguments
# pylint: disable = unused-argument
return 'Not implemented!'
# -------------------------------------------------------------------------
[docs]
def edit_finalization(
self,
request: Request,
warehouse: Warehouse,
ciopath: CioPath,
message: str | None = None):
"""Commit changes, unlock files and refresh warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current file.
:param str message: (optional)
Commit meesage.
:rtype: bool
"""
if warehouse.vcs.is_dirty():
if message is None:
editing = self.current_rendering(request, warehouse, 'editing')
if editing is not None:
message = _(
'Online editing in "${m}" mode',
{'m': translate(editing['label'], request=request)})
else:
message = _('Online editing')
warehouse.vcs.add(ciopath)
warehouse.vcs.commit( # yapf: disable
translate(message, request=request),
request.session['user']['name'],
request.session['user']['email'])
self.file_unlock(request, warehouse, ciopath)
warehouse.refresh(request, [ciopath])
# -------------------------------------------------------------------------
[docs]
@classmethod
def file_lock(
cls, request: Request, warehouse: Warehouse,
ciopath: CioPath) -> tuple[bool, str]:
"""Lock a file for myself for the purpose of editing it.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the file to lock.
:rtype: bool
"""
if 'ciowarehouse2' not in request.session:
request.session['ciowarehouse2'] = {}
if 'editing' not in request.session['ciowarehouse2']:
request.session['ciowarehouse2']['editing'] = {}
name = request.session['user']['name']
if ciopath in request.session['ciowarehouse2']['editing'] and \
request.session['ciowarehouse2']['editing'][ciopath] > time():
warehouse.lock(name, ciopath, relock=True)
request.session['ciowarehouse2']['editing'][ciopath] = \
time() + warehouse.lock_ttl
return True, name
locked, locker = warehouse.lock(name, ciopath)
if locked:
request.session['ciowarehouse2']['editing'][ciopath] = \
time() + warehouse.lock_ttl
elif ciopath in request.session['ciowarehouse2']['editing']:
del request.session['ciowarehouse2']['editing'][ciopath]
return locked, locker
# -------------------------------------------------------------------------
[docs]
@classmethod
def file_unlock(
cls, request: Request, warehouse: Warehouse, ciopath: CioPath):
"""Unlock a file.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the file to unlock.
"""
warehouse.unlock(ciopath)
if 'ciowarehouse2' in request.session and \
'editing' in request.session['ciowarehouse2'] and \
ciopath in request.session['ciowarehouse2']['editing']:
del request.session['ciowarehouse2']['editing'][ciopath]
# -------------------------------------------------------------------------
[docs]
def abspath_from_home(self, path: str) -> str:
"""Return an absolute path.
:param str path:
Absolute path or relative path to the home directory of the
manager.
:rtype: str
"""
if ':' in path:
path = abspath_from_asset_spec(path)
return path if isabs(path) else join(self.home, path)
# -------------------------------------------------------------------------
[docs]
@classmethod
def themed_urls(cls, request, rendering, name):
"""Return a list of URLs possibly completed with theme prefix.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict rendering:
Dictionary defining the rendering.
:param str name:
Name of the bunch of URLs ('css', 'js')
:rtype: list
"""
url_list = []
theme = theme_static_prefix(request)
for url in rendering.get(name) or '':
url_list.append(
url if url.startswith('http') or url.startswith('/file') else
'{0}{1}'.format(theme, url))
return url_list
# -------------------------------------------------------------------------
def _chameleon_render(
self, request: Request, warehouse: Warehouse, ciopath: CioPath,
rendering: dict, ts_factory, values: dict) -> str | None:
"""Execute a Chameleon render and return the result.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:type ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the current file.
:param dict rendering:
Dictionary defining the rendering.
:param ts_factory:
Translation String Factory fucntion.
:param dict values:
Dictionary of values for template variables.
:rtype: :class:`str` or ``None``
"""
# pylint: disable = too-many-arguments, too-many-positional-arguments
my_domain = rendering['template'].partition(':')[0] \
if ':' in rendering['template'] else 'ciowarehouse2'
def _translate(
msgid,
domain=my_domain,
mapping=None,
default=None,
context=None,
target_language=None):
"""Translation for Chameleon."""
# pylint: disable = unused-argument
return request.localizer.translate(
msgid, domain=domain or my_domain, mapping=mapping)
can_edit = self.can_edit() and \
request.registry['modules']['ciowarehouse2'].warehouse_file_writer(
request, warehouse)
values.update({ # yapf: disable
'request': request,
'_': ts_factory,
'local_translate': request.localizer.translate,
'global_class': ' {0}'.format(
request.GET['class']) if request.GET.get('class') else '',
'local_class': ' {0}'.format(
values['class']) if values.get('class') else '',
'action': get_action(request)[0],
'title': request.registry['settings']['title'],
'theme': theme_static_prefix(request),
'theme_has': theme_has_static,
'manager': self,
'rendering': rendering,
'ciopath': ciopath,
'route_directory': request.route_path(
'browse', ciopath=ciopath.parent().route()),
'route_download': request.route_path(
'file_download', ciopath=ciopath.route()),
'route_close': self._route_close(request, ciopath.uid())})
values['route_previous'], values['route_next'] = self._routes_around(
request, ciopath)
values.update(self._available_renderings(request, warehouse))
values['can_edit'] = bool(can_edit and values['editings'])
if 'form' not in values:
values['form'] = Form(request)
if 'rendering_num' not in values:
values['rendering_num'] = 0
if 'content' not in values:
values['content'] = ''
try:
return PageTemplateFile(
abspath_from_asset_spec(rendering['template']),
translate=_translate).render(**values)
except (SyntaxError, UnboundLocalError, AssertionError, TypeError,
NameError, AttributeError,
KeyError) as error: # pragma: nocover
self._log_error(str(error), request)
return None
# -------------------------------------------------------------------------
@classmethod
def _route_close(cls, request: Request, uid: str) -> str:
"""Return the URL to close the view and return to the previous page.
:type request: pyramid.request.Request
:param request:
Current request.
:param str uid:
ID of the current file.
:rtype: str
"""
return f'{request.breadcrumbs.back_path()}#{uid}_'
# -------------------------------------------------------------------------
def _routes_around(self, request: Request,
ciopath: CioPath) -> tuple[str | None, str | None]:
"""Return a tuple of URL to go to the previous and the next file
according to the file type.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: tuple
:return:
A tuple such as ``(route_previous, route_next)``.
"""
if 'current_files' not in request.session:
return None, None
files = request.session['current_files']
# Find the current file in the list
index = -1
for k, item in enumerate(files):
if item[0] == ciopath:
index = k
break
if index == -1:
return None, None
# Find the previous one
ciowarehouse2 = request.registry['modules']['ciowarehouse2']
route_previous = None
for k in range(index - 1, -1, -1):
if files[k][1].is_directory():
continue
manager = ciowarehouse2.manager(request, files[k][1])
if manager is not None:
route_previous = request.route_path(
'file_view',
ciotype=files[k][1].route(),
ciopath=files[k][0].route())
break
# Find the next one
route_next = None
for k in range(index + 1, len(files)):
if files[k][1].is_directory():
continue
manager = ciowarehouse2.manager(request, files[k][1])
if manager is not None:
route_next = request.route_path(
'file_view',
ciotype=files[k][1].route(),
ciopath=files[k][0].route())
break
return route_previous, route_next
# -------------------------------------------------------------------------
@cache_user_renderings(CIOWAREHOUSE2_NS, CACHE_REGION_USER)
def _available_renderings(self, request: Request, warehouse: Warehouse):
"""Return a dictionary of available renderings.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Object describing the warehouse containing the file.
:rtype: dictionary
"""
available: dict[str, list] = {}
user_groups = set(request.session['user']['groups'])
toml = None
if warehouse.rendering_dir:
warehouse_toml = join(warehouse.rendering_dir, f'{self.uid}.toml')
if exists(warehouse_toml):
with open(warehouse_toml, 'r', encoding='utf8') as hdl:
toml = toml_loads(hdl.read())
else:
toml = None
for renderings in ('viewings', 'editings'):
available[renderings] = []
done = set()
# From a custom directory
if toml is not None:
for rendering in toml.get(renderings, ''):
rendering = self._check_rendering(
request, user_groups,
toml.get(f'default_{renderings}'), rendering)
if rendering is not None:
available[renderings].append(rendering)
done.add(rendering['name'])
# Hard coded
rendering_list = self.editings \
if renderings == 'editings' else self.viewings
available[renderings] += [
k for k in rendering_list if k['name'] not in done and (
k.get('only4groups') is None or k['only4groups']
& user_groups)
]
return self._fix_renderings(request, available)
# -------------------------------------------------------------------------
def _check_rendering(
self, request: Request, user_groups: set, defaults: dict | None,
rendering: dict) -> dict | None:
"""Check and complete a rendering.
:type request: pyramid.request.Request
:param request:
Current request.
:param set user_groups:
Groups the user belongs to.
:param dict defaults:
Default values.
:param dict rendering:
Rendering to process.
:rtype: dictionary
"""
if rendering.get('only4groups') and \
not set(rendering['only4groups']) & user_groups:
return None
if 'name' not in rendering:
self._log_error(
_('Name is missing for a dynaming rendering'), request)
return None
# Default values
checked = dict(defaults) if defaults else {}
checked.update(rendering)
# Manager UID
checked['manager_uid'] = checked.get('manager_uid', self.uid)
# Label
checked['label'] = checked['label'].get(
request.locale_name,
checked['label'].get('en', checked['name'])) \
if 'label' in checked else checked['name']
# Check
if 'template' not in checked or 'css' not in checked:
self._log_error(
_('Rendering "${n}" is incorrect', {'n': checked['name']}),
request)
return None
return checked
# -------------------------------------------------------------------------
def _fix_renderings(self, request: Request, available: dict) -> dict:
"""Possibly Fix URLs.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict available:
Available renderings.
:rtype: dictionary
"""
# pylint: disable = unused-argument
return available
# -------------------------------------------------------------------------
@classmethod
def _log_error(cls, error: str, request: Request | None = None):
"""Log an error message.
:param str error:
Error message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.error(translate(error))
else:
log_error(request, translate(error, lang='en'))
# -------------------------------------------------------------------------
@classmethod
def _log_warning(cls, warning: str, request: Request | None = None):
"""Log an warning message.
:param str warning:
Warning message.
:type request: pyramid.request.Request
:param request: (optional)
Current request or ``None`` if called by populate script.
"""
if request is None:
LOG.warning(translate(warning))
else:
log_warning(request, translate(warning, lang='en'))