"""Some various utilities."""
from __future__ import annotations
from logging import getLogger
from os import walk, sep
from os.path import join, exists, basename, dirname, normpath, isfile, relpath
from os.path import commonpath, splitext
from datetime import datetime
from time import sleep
from collections import namedtuple
from unicodedata import normalize, combining
from re import MULTILINE, UNICODE, sub as re_sub
from zipfile import ZIP_DEFLATED, ZipFile, LargeZipFile
from tempfile import NamedTemporaryFile
from mimetypes import guess_type
from json import loads
from io import open as io_open
from transaction import manager
from sqlalchemy import or_
from pyramid.request import Request
from pyramid.response import FileResponse
from pyramid.httpexceptions import HTTPNotFound, HTTPForbidden
from chrysalio.lib.utils import EXCLUDED_FILES as CIOEXCLUDED_FILES
from chrysalio.lib.utils import make_id, age
from chrysalio.lib.log import log_error
from chrysalio.includes.cache import cache_namespace
from chrysalio.models import get_tm_dbsession
from ..models.dbwarehouse import DBWarehouse, DBWarehouseUser, DBWarehouseGroup
from .ciotype import CioType
from .ciopath import LOCAL_DIR, INFO_DIR, MYSELF, CioPath
from .i18n import _, translate
LOG = getLogger(__name__)
CIOWAREHOUSE2_NS = 'ciowrh2'
CACHE_REGION_USER = 'ciowarehouse2_user'
CACHE_REGION_GLOBAL = 'ciowarehouse2_global'
IGNORED_PREFIX = '('
IGNORED_SUFFIX = ')'
EXCLUDED_FILES = CIOEXCLUDED_FILES + (LOCAL_DIR, INFO_DIR, '.infos')
FILE_RIGHTS_INDEX = 0
META_RIGHTS_INDEX = 1
NOT_FOUND = normpath(
join(dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'))
ROUTE_NOT_FOUND = \
f"/{__package__.split('.', maxsplit=1)[0]}/images/notfound.jpg"
THUMBNAIL_TRIALS = 4
SCOPES_ALL_ENUM = ('directory', 'warehouse', 'favorite', 'all')
SCOPES_ALL = (
('directory', _('in the directory')), ('warehouse', _('in the warehouse')),
('favorite', _('in the favorites')), ('all', _('everywhere')))
SCOPES_TOP = (('favorite', _('in the favorites')), ('all', _('everywhere')))
SCOPES_TOP_ENUM = ('favorite', 'all')
ONLY4GROUPS_ALL = '__ALL__'
# =============================================================================
[docs]
def files2response(
request: Request,
abs_paths: list[str],
common_path: str | None = None,
download_name: str | None = None) -> FileResponse | None:
"""Prepare files for download and return a Pyramid response.
:type request: pyramid.request.Request
:param request:
Current request.
:param list abs_paths:
List of absolute paths to files to download.
:param str common_path: (optional)
Root of all files.
:param str download_name: (optional)
Visible name during download.
:rtype: pyramid.response.FileResponse
:return:
Return a FileResponse or raise a
:class:`pyramid.httpexceptions.HTTPNotFound` exception.
"""
# Check list
abs_paths = [
k for k in abs_paths if exists(k) and basename(k) not in EXCLUDED_FILES
]
if not abs_paths:
return None
# Single file
if len(abs_paths) == 1 and isfile(abs_paths[0]):
return file2response(request, abs_paths[0], basename(abs_paths[0]))
# Directory or several files
common_path = commonpath(abs_paths) if common_path is None \
else common_path
with NamedTemporaryFile(
dir=request.registry.settings.get('temporary')) as tmp:
with ZipFile(tmp, 'w', ZIP_DEFLATED) as zip_file:
for filename in abs_paths:
# File
if isfile(filename):
try:
zip_file.write(
filename, relpath(filename, common_path))
except LargeZipFile: # pragma: nocover
zip_file.close()
tmp.close()
raise HTTPForbidden(comment=_('This file is too big!'))
continue # pragma: nocover
# Directory
for root, dirs, files in walk(filename):
for name in dirs:
if name in EXCLUDED_FILES:
dirs.remove(name)
for name in files:
if name in EXCLUDED_FILES:
continue
name = join(root, name)
try:
zip_file.write(name, relpath(name, common_path))
except LargeZipFile: # pragma: nocover
zip_file.close()
raise HTTPForbidden(
comment=_('This file is too big!'))
except IOError as error: # pragma: nocover
zip_file.close()
tmp.close()
raise HTTPNotFound(comment=error)
download_name = download_name or '{0}{1}.zip'.format(
len(abs_paths) == 1 and basename(abs_paths[0])
or basename(normpath(common_path))
or request.registry.settings['site.uid'],
'-{0}'.format(datetime.now().isoformat(' ').partition(' ')[0])
if len(abs_paths) > 1 else '')
response = file2response(
request, tmp.name, download_name, 'application/zip')
return response
# =============================================================================
[docs]
def file2response(
request: Request,
abs_path: str,
download_name: str | None = None,
content_type: str | None = None) -> FileResponse:
"""Return a Pyramid FileResponse containing the given file.
:type request: pyramid.request.Request
:param request:
Current request.
:param str abs_path:
Absolute path to file to encapsulate.
:param str download_name:
Visible name during download.
:rtype: pyramid.response.FileResponse
"""
if download_name is None:
download_name = basename(abs_path)
if content_type is None:
content_type = guess_type(abs_path, False)[0]
response = FileResponse(
abs_path, request=request, content_type=content_type)
# yapf: disable
try:
download_name.encode('latin-1')
response.headerlist.append((
'Content-Disposition',
f'attachment; filename="{download_name}"'))
except UnicodeEncodeError:
download_name = make_id(download_name, 'no_accent')
response.headerlist.append((
'Content-Disposition',
f'attachment; filename="{download_name}"'))
# yapf: enable
return response
# =============================================================================
[docs]
def file_move_check(
warehouse1_root: str,
ciopath1: CioPath,
warehouse2_root: str,
ciopath2: CioPath,
overwrite_ok: bool = False,
mode: str = 'move') -> tuple[CioPath | None, str | None]:
"""Check files for a move or copy and return absolute paths.
:param str warehouse1_root:
Root of warehouse of the source.
:type ciopath1: .lib.ciopath.CioPath
:param ciopaht1:
`CioPath` of the first file.
:param str warehouse2_root:
Root of the warehouse of the target.
:type ciopath2: .lib.ciopath.CioPath
:param ciopath2:
`CioPath` of the srecond file.
:param bool overwrite_ok: (default=False)
If ``True``, silently overwrite the destination file.
:param str mode: (``'move'``, ``'copy'``, ``'rename'``)
The way the move must operate.
:rtype: tuple
:return:
A tuple such as ``(ciopath2, error)``.
"""
# Check source file
abs_path1 = ciopath1.absolute_path(warehouse1_root)
if ciopath1.is_root() or not abs_path1 or not exists(abs_path1):
return None, _('Incorrect file ${f}!', {'f': ciopath1})
# Source and destination are not the same type
if ciopath1.is_directory() and not ciopath2.is_directory():
return None, _( # yapf: disable
'${f1} and ${f2} are not the same type!',
{'f1': ciopath1, 'f2': ciopath2})
# Source contains or equals destination
if ciopath1.contains(ciopath2) or (ciopath1.is_directory()
and ciopath1 == ciopath2):
return None, _( # yapf: disable
'${f1} contains ${f2}!', {'f1': ciopath1, 'f2': ciopath2})
# Fixed destination
if mode != 'rename' and ciopath2.is_directory():
ciopath2 = CioPath( # yapf: disable
ciopath2.wid,
normpath(join(ciopath2.path, ciopath1.file_name() or '.')),
ciopath1.is_directory())
# Source and destination are the same file
if ciopath1 == ciopath2 and overwrite_ok:
return None, None
# Incorrect destination
abs_path2 = ciopath2.absolute_path(warehouse2_root)
if not abs_path2:
return None, _( # yapf: disable
'File ${f} is outside the warehouse!', {'f': ciopath2})
# Existing destination
if not overwrite_ok and exists(abs_path2):
num = 2
path = splitext(ciopath2.path)
ciopath2 = CioPath(
ciopath2.wid, f'{path[0]}_{num}{path[1]}', ciopath2.is_directory())
abs_path2 = ciopath2.absolute_path(warehouse2_root)
while abs_path2 and exists(abs_path2):
num += 1
ciopath2 = CioPath(
ciopath2.wid, f'{path[0]}_{num}{path[1]}',
ciopath2.is_directory())
abs_path2 = ciopath2.absolute_path(warehouse2_root)
return ciopath2, None
# =============================================================================
[docs]
def normalize_filename(
filename: str, mode: str = 'simple', is_dir: bool = False) -> str:
"""Normalize file name.
:param str filename:
File name to normalize.
:param str mode: (default='simple')
Strategy to normalize file name (``'simple'`` or ``'strict'``).
:param bool is_dir: (default=False)
``True`` if the file is a directory.
:rtype: str
"""
result = re_sub('\\.+', '.', re_sub('[*?:<>]', '_', filename))
if not is_dir:
result_tuple = splitext(result)
result = f'{result_tuple[0]}{result_tuple[1].lower()}'
if mode == 'simple':
return result
result_list = result.split(sep)
for i, chunk in enumerate(result_list):
chunk = re_sub(
'_+', '_', re_sub('[ !;:,"\'/«»()\\[\\]–&]', '_', chunk))
chunk = normalize('NFKD', chunk.encode('utf8').decode('utf8'))
result_list[i] = ''.join([k for k in chunk if not combining(k)])
if not is_dir:
result_list[-1] = result_list[-1].lower()
return sep.join(result_list)
# =============================================================================
[docs]
def ciopaths2absolute_paths(request: Request,
ciopaths_str: list[str]) -> list[str]:
"""Convert a list of `CioPath` into a list of absolute paths.
:type request: pyramid.request.Request
:param request:
Current request.
:param list ciopaths_str:
List of strings representing `CioPath`.
:rtype: list
"""
abs_names = []
ciowarehouse2 = request.registry['modules']['ciowarehouse2']
for ciopath_str in ciopaths_str:
ciopath = CioPath.from_str(ciopath_str)
root = ciowarehouse2.warehouse_root(request, ciopath.wid)
abs_path = ciopath.absolute_path(root)
if root is not None and abs_path:
abs_names.append(abs_path)
return abs_names
# =============================================================================
[docs]
def sort_key(name: str) -> str:
"""Key for sorting according to a name.
:param str name:
Name to convert.
:rtype: str
"""
if not name:
return ''
key = normalize('NFKD', name.lower())
return ''.join([k for k in key if not combining(k)])
# =============================================================================
[docs]
def build_callback(
registry,
build_env: dict,
result: dict,
request: Request | None = None):
"""Function called after the build is completed.
:param registry:
Application registry.
:param dict build_env:
Build environment.
:param dict result:
Result of the processing. See: :class:`cioservice.lib.build.Build`.
:type request: pyramid.request.Request
:param request: (optional)
Current request.
"""
# Something to do?
params = build_env['params']
service = registry['services'].get(build_env['job']['service_id'])
if service is None \
or not service.need_write_permission(build_env['job']['context']) \
or not result.get('output') \
or not params['settings'].get('output.home.id'):
return
# Get the warehouse
ciowarehouse2 = registry['modules']['ciowarehouse2']
if request is None:
warehouse = None
with manager:
request = namedtuple( # type: ignore
'Request', 'registry session dbsession has_permission')(
registry=registry,
session={},
dbsession=get_tm_dbsession(
registry['dbsession_factory'], manager),
has_permission=lambda x: bool(
'caller' in params and 'warehouse.creator' in params[
'caller']['principals']))
if 'caller' in params:
request.session['user'] = params['caller'] # type: ignore
warehouse = ciowarehouse2.warehouse(
request, params['settings']['output.home.id'])
else:
warehouse = ciowarehouse2.warehouse(
request, params['settings']['output.home.id'])
if warehouse is None:
return
# Commit
generator = translate(
build_env['job']['context'] or service.label, params.get('lang'))
caller = params.get('caller') \
or request.session.get('user') # type: ignore
warehouse.vcs.pull()
warehouse.vcs.add()
warehouse.vcs.commit(
translate(
_('Generated by ${g}', {'g': generator}), params.get('lang')),
caller['name'] if caller is not None else 'anonymous',
caller['email'] if caller is not None else '')
# Refresh the warehouse
ciopaths = warehouse.to_refresh(
build_env['params'].get('files', []),
(result['output'], ) + result.get('to_refresh', ()))
warehouse.refresh(request, ciopaths, now=True)
# =============================================================================
[docs]
def cache_user_renderings(namespace_prefix: str, region: str | None = None):
"""A decorator to retrieve in the user cache the dictionary renderings.
:param str namespace_prefix:
Prefix of the cache namespace.
:param str region: (optional)
Name of region.
"""
def _decorated(method):
"""Decoration of the method `method`."""
def _wrapper(class_, request, warehouse, *args, **kwargs):
"""Use of user cache."""
if not hasattr(class_, 'uid'):
raise AttributeError('Class must have a "uid" attribute!')
namespace = cache_namespace(namespace_prefix, warehouse.uid)
key = 'renderings:{}'.format(class_.uid)
renderings = request.registry['cache_user'].get(
request, key, namespace)
if renderings is not None:
return renderings
renderings_method = method.__func__ \
if isinstance(method, classmethod) else method
renderings = renderings_method(
class_, request, warehouse, *args, **kwargs)
request.registry['cache_user'].set(
request, key, renderings, namespace, region)
return renderings
return _wrapper
return _decorated
# =============================================================================
[docs]
def cache_user_seeds(namespace_prefix: str, region: str | None = None):
"""A decorator to retrieve in the user cache the dictionary of seeds.
:param str namespace_prefix:
Prefix of the cache namespace.
:param str region: (optional)
Name of region.
"""
def _decorated(method):
"""Decoration of the method `method`."""
def _wrapper(class_, request, *args, **kwargs):
"""Use of user cache."""
if not hasattr(class_, 'uid'):
raise AttributeError('Class must have a "uid" attribute!')
namespace = cache_namespace(namespace_prefix, class_.uid)
key = 'seeds'
seeds = request.registry['cache_user'].get(request, key, namespace)
if seeds is not None:
return seeds
seeds_method = method.__func__ \
if isinstance(method, classmethod) else method
seeds = seeds_method(class_, request, *args, **kwargs)
request.registry['cache_user'].set(
request, key, seeds, namespace, region)
return seeds
return _wrapper
return _decorated
# =============================================================================
[docs]
def isodt2str(isodt: str) -> str:
"""Return a string representing the given datetime in local time zone in
an ISO format.
:param str isodt:
ISO datetime format.
:rtype: str
"""
try:
return datetime.fromisoformat(_isodt_cleaned(
isodt)).astimezone().isoformat(' ').partition('.')[0]
except ValueError:
return ''
# =============================================================================
[docs]
def isodt2age(request: Request, isodt: str) -> str:
"""Return a string representing the given datetime as an age.
:type request: pyramid.request.Request
:param request:
Current request.
:param str isodt:
ISO datetime format.
:rtype: str
"""
try:
return request.localizer.translate(
age(datetime.fromisoformat(_isodt_cleaned(isodt))))
except ValueError:
return ''
# =============================================================================
def _isodt_cleaned(isodt: str) -> str:
"""Clean ISO date/time to be compatible with version before Python 3.11."""
return re_sub(r'\.\d+', '', isodt).replace('Z', '+00:00')
# =============================================================================
[docs]
def wait4thumbnails(
request: Request,
thumbnails: list[tuple[CioPath, str, str | None]],
trials: int = THUMBNAIL_TRIALS):
"""Wait a while to be sure each thumbnail of the list is created.
:type request: pyramid.request.Request
:param request:
Current request.
:param list thumbnails:
A list of tuples such as ``(ciopath, size, thumbnail_ext)``.
:param int trails: (default=``THUMBNAIL_TRIALS``)
Number of trials for each thumbnail.
"""
for ciopath, size, thumbnail_ext in thumbnails:
if not thumbnail_ext or not ciopath.wid:
continue
home = join(
request.registry['backend_homes']['thumbnails'], ciopath.wid)
abs_thumbnail = join(home, ciopath.path, f'{size}.{thumbnail_ext}') \
if not ciopath.is_directory() else \
join(home, ciopath.path, MYSELF, f'{size}.{thumbnail_ext}')
count = trials
while not exists(abs_thumbnail) and count > 0:
sleep(.2)
count -= 1
# =============================================================================
[docs]
def ciotype_thumbnails(
request: Request, ciopaths: tuple[CioPath]
) -> dict[CioPath, tuple[CioType, str, str | None]]:
"""Retrieve `CioType`s and extensions of a given list of `CioPath`.
:type request: pyramid.request.Request
:param request:
Current request.
:param tuple ciopaths:
A tuple of `CioPath`.
:rtype: dict
"""
backend = request.registry['modules']['ciowarehouse2'].backend
query = query_in('ciopath', [f'"{str(k)}"' for k in ciopaths])
reply, error = backend.search(query, len(ciopaths))
if reply is None or error is not None:
log_error(request, error)
return {}
result = {}
for hit in reply.hits:
values = loads(hit.json)
result[CioPath.from_str(values['ciopath'][0])] = (
CioType.from_str(values['ciotype'][0]), values['icon'][0],
values['thumbnail_ext'][0]
if values.get('thumbnail_ext') else None)
return result
# =============================================================================
[docs]
def thumbnail_absolute_path(
request: Request,
ciotype: CioType,
ciopath: CioPath,
icon: str,
thumbnail_ext: str | None) -> str:
"""Return the absolute path to the thumbnail or to the icon.
:type request: pyramid.request.Request
:param request:
Current request.
:type ciotype: .lib.ciotype.CioType
:param ciotype:
`CioType` of the file.
:path ciopath: .lib.ciopath.CioPath
:param ciopath:
`CioPath` of the file.
:param str icon:
File name of the icon.
:param str thumbnail_ext:
Extension of the thumbnail.
:rtype: str
"""
if thumbnail_ext is None or ciopath.wid is None:
home = join(request.registry['backend_homes']['icons'])
return join(home, 'medium', icon)
home = join(request.registry['backend_homes']['thumbnails'], ciopath.wid)
size = 'medium' if request.GET.get('size') == 'small' else 'large'
return join(home, ciopath.path, f'{size}.{thumbnail_ext}') \
if not ciotype.is_directory() else \
join(home, ciopath.path, MYSELF, f'{size}.{thumbnail_ext}')
# =============================================================================
[docs]
def query_in(field: str, values: tuple | list) -> str:
"""Create an optimized query for a multivalued field.
:param str field:
Field name.
:param tuple values:
Possible values.
:rtype: str
"""
if not values:
return ''
if len(values) == 1:
return f'{field}:{values[0]}'
return f"{field}: IN [{' '.join(values)}]"
# =============================================================================
[docs]
def query_in_quoted(field: str, values: tuple | list) -> str:
"""Create an optimized query for a multivalued field with quoted values.
:param str field:
Field name.
:param tuple values:
Possible values.
:rtype: str
"""
if not values:
return ''
if len(values) == 1:
return f'{field}:"{values[0]}"'
quoted = ' '.join(map(lambda x: f'"{x}"', values))
return f"{field}: IN [{quoted}]"
# =============================================================================
[docs]
def scope_query(
request: Request,
scope: str,
directory_ciopath: CioPath,
flat: bool = False) -> str | None:
"""Create a query to restrick the search in the current and authorized
scope.
:type request: pyramid.request.Request
:param request:
Current request.
:param str scope:
Scope of the search: ``'all'``, ``'favorite'``, ``'warehouse'`` or
``'directory'``.
:param directory_ciopath:
CioPath of the current directory, possibly an empty one.
:param bool flat:
if ``True`` use ``facet`` field instead of ``directory`` field.
:rtype: str
"""
# Create query on warehouse(s)
warehouse_ids = warehouses_in_scope(request, scope, directory_ciopath.wid)
if not warehouse_ids:
return None
query = query_in('warehouse', warehouse_ids)
# Create query on directory
if scope == 'directory' and directory_ciopath.path:
if not flat:
query = f'{query} AND directory:"{directory_ciopath.path}"'
elif not directory_ciopath.is_root():
query = f'{query} AND facet:"{directory_ciopath.facet(True)}"'
return query
# =============================================================================
[docs]
def warehouses_in_scope(
request: Request, scope: str,
warehouse_id: str | None) -> tuple[str, ...]:
"""Find ID of warehouses in the scope.
:type request: pyramid.request.Request
:param request:
Current request.
:param str scope:
Scope of the search: ``'all'``, ``'favorite'``, ``'warehouse'`` or
``'directory'``.
:param str warehouse_id:
Warehouse ID if exists.
:rtype: list
"""
# One warehouse
if warehouse_id is not None and scope not in SCOPES_TOP_ENUM:
return (warehouse_id, )
# All warehouses
if scope == 'all':
user_id = request.session['user']['user_id']
dbquery = request.dbsession.query(DBWarehouse.warehouse_id)
if not request.has_permission('warehouse-create'):
groups = set(request.session['user']['groups'])
dbquery = dbquery.outerjoin(
DBWarehouseUser, DBWarehouseGroup).filter(
or_(
DBWarehouseUser.user_id == user_id,
DBWarehouseGroup.group_id.in_(groups),
DBWarehouse.access.in_(('free', 'readonly'))))
return tuple(str(k[0]) for k in dbquery)
# Favorite warehouses
return warehouses_favorite(request)
# =============================================================================
[docs]
def warehouses_favorite(request: Request) -> tuple[str, ...]:
"""Return a dictionary of favorite warehouses.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: tuple
"""
if 'favorites' in request.session:
return request.session['favorites']
if 'user' not in request.session:
return ()
user_id = request.session['user']['user_id']
favorites = [
str(k[0])
for k in request.dbsession.query(DBWarehouse.warehouse_id).outerjoin(
DBWarehouseUser).filter(
DBWarehouseUser.user_id == user_id, DBWarehouseUser.favorite)
]
request.session['favorites'] = tuple(favorites)
return request.session['favorites']
# =============================================================================
# =============================================================================
[docs]
def apply_regex(request: Request, regex_file: str, content: str) -> str:
"""Apply a list of regular expressions from a file.
:type request: pyramid.request.Request
:param request:
Current request.
:param str regex_file:
Absolute path to the file containing regular expressions.
:param str content:
Content to process.
:rtype: str
"""
if not regex_file:
return content
if not exists(regex_file):
log_error(request, translate( # yapf: disable
_('Regular expression file "${f}" does not exist.', {
'f': regex_file}), lang='en'))
return content
with io_open(regex_file, 'r', encoding='utf8') as lines:
for line in lines:
if not line or line[0] == '#' or line[0:7] == '[Regex]':
continue
pattern, replace = line.partition(' =')[::2]
pattern = pattern.strip()
if not pattern:
continue
if pattern[0] in '\'"' and pattern[-1] in '\'"':
pattern = pattern[1:-1]
replace = replace.strip()
if replace and replace[0] in '\'"' and replace[-1] in '\'"':
replace = replace[1:-1]
# pylint: disable = eval-used
if replace.startswith('lambda'):
replace = eval(replace) # nosec
content = re_sub(
pattern, replace, content, flags=MULTILINE | UNICODE)
return content