Source code for ciowarehouse2.lib.utils

"""Some various utilities."""

from __future__ import annotations
from logging import getLogger
from os import walk, sep
from os.path import join, exists, basename, dirname, normpath, isfile, relpath
from os.path import commonpath, splitext
from datetime import datetime
from time import sleep
from collections import namedtuple
from unicodedata import normalize, combining
from re import MULTILINE, UNICODE, sub as re_sub
from zipfile import ZIP_DEFLATED, ZipFile, LargeZipFile
from tempfile import NamedTemporaryFile
from mimetypes import guess_type
from json import loads
from io import open as io_open

from transaction import manager
from sqlalchemy import or_

from pyramid.request import Request
from pyramid.response import FileResponse
from pyramid.httpexceptions import HTTPNotFound, HTTPForbidden

from chrysalio.lib.utils import EXCLUDED_FILES as CIOEXCLUDED_FILES
from chrysalio.lib.utils import make_id, age
from chrysalio.lib.log import log_error
from chrysalio.includes.cache import cache_namespace
from chrysalio.models import get_tm_dbsession
from ..models.dbwarehouse import DBWarehouse, DBWarehouseUser, DBWarehouseGroup
from .ciotype import CioType
from .ciopath import LOCAL_DIR, INFO_DIR, MYSELF, CioPath
from .i18n import _, translate

LOG = getLogger(__name__)
CIOWAREHOUSE2_NS = 'ciowrh2'
CACHE_REGION_USER = 'ciowarehouse2_user'
CACHE_REGION_GLOBAL = 'ciowarehouse2_global'
IGNORED_PREFIX = '('
IGNORED_SUFFIX = ')'
EXCLUDED_FILES = CIOEXCLUDED_FILES + (LOCAL_DIR, INFO_DIR, '.infos')
FILE_RIGHTS_INDEX = 0
META_RIGHTS_INDEX = 1
NOT_FOUND = normpath(
    join(dirname(__file__), '..', 'Static', 'Images', 'notfound.jpg'))
ROUTE_NOT_FOUND = \
    f"/{__package__.split('.', maxsplit=1)[0]}/images/notfound.jpg"
THUMBNAIL_TRIALS = 4
SCOPES_ALL_ENUM = ('directory', 'warehouse', 'favorite', 'all')
SCOPES_ALL = (
    ('directory', _('in the directory')), ('warehouse', _('in the warehouse')),
    ('favorite', _('in the favorites')), ('all', _('everywhere')))
SCOPES_TOP = (('favorite', _('in the favorites')), ('all', _('everywhere')))
SCOPES_TOP_ENUM = ('favorite', 'all')
ONLY4GROUPS_ALL = '__ALL__'


# =============================================================================
[docs] def files2response( request: Request, abs_paths: list[str], common_path: str | None = None, download_name: str | None = None) -> FileResponse | None: """Prepare files for download and return a Pyramid response. :type request: pyramid.request.Request :param request: Current request. :param list abs_paths: List of absolute paths to files to download. :param str common_path: (optional) Root of all files. :param str download_name: (optional) Visible name during download. :rtype: pyramid.response.FileResponse :return: Return a FileResponse or raise a :class:`pyramid.httpexceptions.HTTPNotFound` exception. """ # Check list abs_paths = [ k for k in abs_paths if exists(k) and basename(k) not in EXCLUDED_FILES ] if not abs_paths: return None # Single file if len(abs_paths) == 1 and isfile(abs_paths[0]): return file2response(request, abs_paths[0], basename(abs_paths[0])) # Directory or several files common_path = commonpath(abs_paths) if common_path is None \ else common_path with NamedTemporaryFile( dir=request.registry.settings.get('temporary')) as tmp: with ZipFile(tmp, 'w', ZIP_DEFLATED) as zip_file: for filename in abs_paths: # File if isfile(filename): try: zip_file.write( filename, relpath(filename, common_path)) except LargeZipFile: # pragma: nocover zip_file.close() tmp.close() raise HTTPForbidden(comment=_('This file is too big!')) continue # pragma: nocover # Directory for root, dirs, files in walk(filename): for name in dirs: if name in EXCLUDED_FILES: dirs.remove(name) for name in files: if name in EXCLUDED_FILES: continue name = join(root, name) try: zip_file.write(name, relpath(name, common_path)) except LargeZipFile: # pragma: nocover zip_file.close() raise HTTPForbidden( comment=_('This file is too big!')) except IOError as error: # pragma: nocover zip_file.close() tmp.close() raise HTTPNotFound(comment=error) download_name = download_name or '{0}{1}.zip'.format( len(abs_paths) == 1 and basename(abs_paths[0]) or basename(normpath(common_path)) or request.registry.settings['site.uid'], '-{0}'.format(datetime.now().isoformat(' ').partition(' ')[0]) if len(abs_paths) > 1 else '') response = file2response( request, tmp.name, download_name, 'application/zip') return response
# =============================================================================
[docs] def file2response( request: Request, abs_path: str, download_name: str | None = None, content_type: str | None = None) -> FileResponse: """Return a Pyramid FileResponse containing the given file. :type request: pyramid.request.Request :param request: Current request. :param str abs_path: Absolute path to file to encapsulate. :param str download_name: Visible name during download. :rtype: pyramid.response.FileResponse """ if download_name is None: download_name = basename(abs_path) if content_type is None: content_type = guess_type(abs_path, False)[0] response = FileResponse( abs_path, request=request, content_type=content_type) # yapf: disable try: download_name.encode('latin-1') response.headerlist.append(( 'Content-Disposition', f'attachment; filename="{download_name}"')) except UnicodeEncodeError: download_name = make_id(download_name, 'no_accent') response.headerlist.append(( 'Content-Disposition', f'attachment; filename="{download_name}"')) # yapf: enable return response
# =============================================================================
[docs] def file_move_check( warehouse1_root: str, ciopath1: CioPath, warehouse2_root: str, ciopath2: CioPath, overwrite_ok: bool = False, mode: str = 'move') -> tuple[CioPath | None, str | None]: """Check files for a move or copy and return absolute paths. :param str warehouse1_root: Root of warehouse of the source. :type ciopath1: .lib.ciopath.CioPath :param ciopaht1: `CioPath` of the first file. :param str warehouse2_root: Root of the warehouse of the target. :type ciopath2: .lib.ciopath.CioPath :param ciopath2: `CioPath` of the srecond file. :param bool overwrite_ok: (default=False) If ``True``, silently overwrite the destination file. :param str mode: (``'move'``, ``'copy'``, ``'rename'``) The way the move must operate. :rtype: tuple :return: A tuple such as ``(ciopath2, error)``. """ # Check source file abs_path1 = ciopath1.absolute_path(warehouse1_root) if ciopath1.is_root() or not abs_path1 or not exists(abs_path1): return None, _('Incorrect file ${f}!', {'f': ciopath1}) # Source and destination are not the same type if ciopath1.is_directory() and not ciopath2.is_directory(): return None, _( # yapf: disable '${f1} and ${f2} are not the same type!', {'f1': ciopath1, 'f2': ciopath2}) # Source contains or equals destination if ciopath1.contains(ciopath2) or (ciopath1.is_directory() and ciopath1 == ciopath2): return None, _( # yapf: disable '${f1} contains ${f2}!', {'f1': ciopath1, 'f2': ciopath2}) # Fixed destination if mode != 'rename' and ciopath2.is_directory(): ciopath2 = CioPath( # yapf: disable ciopath2.wid, normpath(join(ciopath2.path, ciopath1.file_name() or '.')), ciopath1.is_directory()) # Source and destination are the same file if ciopath1 == ciopath2 and overwrite_ok: return None, None # Incorrect destination abs_path2 = ciopath2.absolute_path(warehouse2_root) if not abs_path2: return None, _( # yapf: disable 'File ${f} is outside the warehouse!', {'f': ciopath2}) # Existing destination if not overwrite_ok and exists(abs_path2): num = 2 path = splitext(ciopath2.path) ciopath2 = CioPath( ciopath2.wid, f'{path[0]}_{num}{path[1]}', ciopath2.is_directory()) abs_path2 = ciopath2.absolute_path(warehouse2_root) while abs_path2 and exists(abs_path2): num += 1 ciopath2 = CioPath( ciopath2.wid, f'{path[0]}_{num}{path[1]}', ciopath2.is_directory()) abs_path2 = ciopath2.absolute_path(warehouse2_root) return ciopath2, None
# =============================================================================
[docs] def normalize_filename( filename: str, mode: str = 'simple', is_dir: bool = False) -> str: """Normalize file name. :param str filename: File name to normalize. :param str mode: (default='simple') Strategy to normalize file name (``'simple'`` or ``'strict'``). :param bool is_dir: (default=False) ``True`` if the file is a directory. :rtype: str """ result = re_sub('\\.+', '.', re_sub('[*?:<>]', '_', filename)) if not is_dir: result_tuple = splitext(result) result = f'{result_tuple[0]}{result_tuple[1].lower()}' if mode == 'simple': return result result_list = result.split(sep) for i, chunk in enumerate(result_list): chunk = re_sub( '_+', '_', re_sub('[  !;:,"\'/«»()\\[\\]–&]', '_', chunk)) chunk = normalize('NFKD', chunk.encode('utf8').decode('utf8')) result_list[i] = ''.join([k for k in chunk if not combining(k)]) if not is_dir: result_list[-1] = result_list[-1].lower() return sep.join(result_list)
# =============================================================================
[docs] def ciopaths2absolute_paths(request: Request, ciopaths_str: list[str]) -> list[str]: """Convert a list of `CioPath` into a list of absolute paths. :type request: pyramid.request.Request :param request: Current request. :param list ciopaths_str: List of strings representing `CioPath`. :rtype: list """ abs_names = [] ciowarehouse2 = request.registry['modules']['ciowarehouse2'] for ciopath_str in ciopaths_str: ciopath = CioPath.from_str(ciopath_str) root = ciowarehouse2.warehouse_root(request, ciopath.wid) abs_path = ciopath.absolute_path(root) if root is not None and abs_path: abs_names.append(abs_path) return abs_names
# =============================================================================
[docs] def sort_key(name: str) -> str: """Key for sorting according to a name. :param str name: Name to convert. :rtype: str """ if not name: return '' key = normalize('NFKD', name.lower()) return ''.join([k for k in key if not combining(k)])
# =============================================================================
[docs] def build_callback( registry, build_env: dict, result: dict, request: Request | None = None): """Function called after the build is completed. :param registry: Application registry. :param dict build_env: Build environment. :param dict result: Result of the processing. See: :class:`cioservice.lib.build.Build`. :type request: pyramid.request.Request :param request: (optional) Current request. """ # Something to do? params = build_env['params'] service = registry['services'].get(build_env['job']['service_id']) if service is None \ or not service.need_write_permission(build_env['job']['context']) \ or not result.get('output') \ or not params['settings'].get('output.home.id'): return # Get the warehouse ciowarehouse2 = registry['modules']['ciowarehouse2'] if request is None: warehouse = None with manager: request = namedtuple( # type: ignore 'Request', 'registry session dbsession has_permission')( registry=registry, session={}, dbsession=get_tm_dbsession( registry['dbsession_factory'], manager), has_permission=lambda x: bool( 'caller' in params and 'warehouse.creator' in params[ 'caller']['principals'])) if 'caller' in params: request.session['user'] = params['caller'] # type: ignore warehouse = ciowarehouse2.warehouse( request, params['settings']['output.home.id']) else: warehouse = ciowarehouse2.warehouse( request, params['settings']['output.home.id']) if warehouse is None: return # Commit generator = translate( build_env['job']['context'] or service.label, params.get('lang')) caller = params.get('caller') \ or request.session.get('user') # type: ignore warehouse.vcs.pull() warehouse.vcs.add() warehouse.vcs.commit( translate( _('Generated by ${g}', {'g': generator}), params.get('lang')), caller['name'] if caller is not None else 'anonymous', caller['email'] if caller is not None else '') # Refresh the warehouse ciopaths = warehouse.to_refresh( build_env['params'].get('files', []), (result['output'], ) + result.get('to_refresh', ())) warehouse.refresh(request, ciopaths, now=True)
# =============================================================================
[docs] def cache_user_renderings(namespace_prefix: str, region: str | None = None): """A decorator to retrieve in the user cache the dictionary renderings. :param str namespace_prefix: Prefix of the cache namespace. :param str region: (optional) Name of region. """ def _decorated(method): """Decoration of the method `method`.""" def _wrapper(class_, request, warehouse, *args, **kwargs): """Use of user cache.""" if not hasattr(class_, 'uid'): raise AttributeError('Class must have a "uid" attribute!') namespace = cache_namespace(namespace_prefix, warehouse.uid) key = 'renderings:{}'.format(class_.uid) renderings = request.registry['cache_user'].get( request, key, namespace) if renderings is not None: return renderings renderings_method = method.__func__ \ if isinstance(method, classmethod) else method renderings = renderings_method( class_, request, warehouse, *args, **kwargs) request.registry['cache_user'].set( request, key, renderings, namespace, region) return renderings return _wrapper return _decorated
# =============================================================================
[docs] def cache_user_seeds(namespace_prefix: str, region: str | None = None): """A decorator to retrieve in the user cache the dictionary of seeds. :param str namespace_prefix: Prefix of the cache namespace. :param str region: (optional) Name of region. """ def _decorated(method): """Decoration of the method `method`.""" def _wrapper(class_, request, *args, **kwargs): """Use of user cache.""" if not hasattr(class_, 'uid'): raise AttributeError('Class must have a "uid" attribute!') namespace = cache_namespace(namespace_prefix, class_.uid) key = 'seeds' seeds = request.registry['cache_user'].get(request, key, namespace) if seeds is not None: return seeds seeds_method = method.__func__ \ if isinstance(method, classmethod) else method seeds = seeds_method(class_, request, *args, **kwargs) request.registry['cache_user'].set( request, key, seeds, namespace, region) return seeds return _wrapper return _decorated
# =============================================================================
[docs] def isodt2str(isodt: str) -> str: """Return a string representing the given datetime in local time zone in an ISO format. :param str isodt: ISO datetime format. :rtype: str """ try: return datetime.fromisoformat(_isodt_cleaned( isodt)).astimezone().isoformat(' ').partition('.')[0] except ValueError: return ''
# =============================================================================
[docs] def isodt2age(request: Request, isodt: str) -> str: """Return a string representing the given datetime as an age. :type request: pyramid.request.Request :param request: Current request. :param str isodt: ISO datetime format. :rtype: str """ try: return request.localizer.translate( age(datetime.fromisoformat(_isodt_cleaned(isodt)))) except ValueError: return ''
# ============================================================================= def _isodt_cleaned(isodt: str) -> str: """Clean ISO date/time to be compatible with version before Python 3.11.""" return re_sub(r'\.\d+', '', isodt).replace('Z', '+00:00') # =============================================================================
[docs] def wait4thumbnails( request: Request, thumbnails: list[tuple[CioPath, str, str | None]], trials: int = THUMBNAIL_TRIALS): """Wait a while to be sure each thumbnail of the list is created. :type request: pyramid.request.Request :param request: Current request. :param list thumbnails: A list of tuples such as ``(ciopath, size, thumbnail_ext)``. :param int trails: (default=``THUMBNAIL_TRIALS``) Number of trials for each thumbnail. """ for ciopath, size, thumbnail_ext in thumbnails: if not thumbnail_ext or not ciopath.wid: continue home = join( request.registry['backend_homes']['thumbnails'], ciopath.wid) abs_thumbnail = join(home, ciopath.path, f'{size}.{thumbnail_ext}') \ if not ciopath.is_directory() else \ join(home, ciopath.path, MYSELF, f'{size}.{thumbnail_ext}') count = trials while not exists(abs_thumbnail) and count > 0: sleep(.2) count -= 1
# =============================================================================
[docs] def ciotype_thumbnails( request: Request, ciopaths: tuple[CioPath] ) -> dict[CioPath, tuple[CioType, str, str | None]]: """Retrieve `CioType`s and extensions of a given list of `CioPath`. :type request: pyramid.request.Request :param request: Current request. :param tuple ciopaths: A tuple of `CioPath`. :rtype: dict """ backend = request.registry['modules']['ciowarehouse2'].backend query = query_in('ciopath', [f'"{str(k)}"' for k in ciopaths]) reply, error = backend.search(query, len(ciopaths)) if reply is None or error is not None: log_error(request, error) return {} result = {} for hit in reply.hits: values = loads(hit.json) result[CioPath.from_str(values['ciopath'][0])] = ( CioType.from_str(values['ciotype'][0]), values['icon'][0], values['thumbnail_ext'][0] if values.get('thumbnail_ext') else None) return result
# =============================================================================
[docs] def thumbnail_absolute_path( request: Request, ciotype: CioType, ciopath: CioPath, icon: str, thumbnail_ext: str | None) -> str: """Return the absolute path to the thumbnail or to the icon. :type request: pyramid.request.Request :param request: Current request. :type ciotype: .lib.ciotype.CioType :param ciotype: `CioType` of the file. :path ciopath: .lib.ciopath.CioPath :param ciopath: `CioPath` of the file. :param str icon: File name of the icon. :param str thumbnail_ext: Extension of the thumbnail. :rtype: str """ if thumbnail_ext is None or ciopath.wid is None: home = join(request.registry['backend_homes']['icons']) return join(home, 'medium', icon) home = join(request.registry['backend_homes']['thumbnails'], ciopath.wid) size = 'medium' if request.GET.get('size') == 'small' else 'large' return join(home, ciopath.path, f'{size}.{thumbnail_ext}') \ if not ciotype.is_directory() else \ join(home, ciopath.path, MYSELF, f'{size}.{thumbnail_ext}')
# =============================================================================
[docs] def query_in(field: str, values: tuple | list) -> str: """Create an optimized query for a multivalued field. :param str field: Field name. :param tuple values: Possible values. :rtype: str """ if not values: return '' if len(values) == 1: return f'{field}:{values[0]}' return f"{field}: IN [{' '.join(values)}]"
# =============================================================================
[docs] def query_in_quoted(field: str, values: tuple | list) -> str: """Create an optimized query for a multivalued field with quoted values. :param str field: Field name. :param tuple values: Possible values. :rtype: str """ if not values: return '' if len(values) == 1: return f'{field}:"{values[0]}"' quoted = ' '.join(map(lambda x: f'"{x}"', values)) return f"{field}: IN [{quoted}]"
# =============================================================================
[docs] def scope_query( request: Request, scope: str, directory_ciopath: CioPath, flat: bool = False) -> str | None: """Create a query to restrick the search in the current and authorized scope. :type request: pyramid.request.Request :param request: Current request. :param str scope: Scope of the search: ``'all'``, ``'favorite'``, ``'warehouse'`` or ``'directory'``. :param directory_ciopath: CioPath of the current directory, possibly an empty one. :param bool flat: if ``True`` use ``facet`` field instead of ``directory`` field. :rtype: str """ # Create query on warehouse(s) warehouse_ids = warehouses_in_scope(request, scope, directory_ciopath.wid) if not warehouse_ids: return None query = query_in('warehouse', warehouse_ids) # Create query on directory if scope == 'directory' and directory_ciopath.path: if not flat: query = f'{query} AND directory:"{directory_ciopath.path}"' elif not directory_ciopath.is_root(): query = f'{query} AND facet:"{directory_ciopath.facet(True)}"' return query
# =============================================================================
[docs] def warehouses_in_scope( request: Request, scope: str, warehouse_id: str | None) -> tuple[str, ...]: """Find ID of warehouses in the scope. :type request: pyramid.request.Request :param request: Current request. :param str scope: Scope of the search: ``'all'``, ``'favorite'``, ``'warehouse'`` or ``'directory'``. :param str warehouse_id: Warehouse ID if exists. :rtype: list """ # One warehouse if warehouse_id is not None and scope not in SCOPES_TOP_ENUM: return (warehouse_id, ) # All warehouses if scope == 'all': user_id = request.session['user']['user_id'] dbquery = request.dbsession.query(DBWarehouse.warehouse_id) if not request.has_permission('warehouse-create'): groups = set(request.session['user']['groups']) dbquery = dbquery.outerjoin( DBWarehouseUser, DBWarehouseGroup).filter( or_( DBWarehouseUser.user_id == user_id, DBWarehouseGroup.group_id.in_(groups), DBWarehouse.access.in_(('free', 'readonly')))) return tuple(str(k[0]) for k in dbquery) # Favorite warehouses return warehouses_favorite(request)
# =============================================================================
[docs] def warehouses_favorite(request: Request) -> tuple[str, ...]: """Return a dictionary of favorite warehouses. :type request: pyramid.request.Request :param request: Current request. :rtype: tuple """ if 'favorites' in request.session: return request.session['favorites'] if 'user' not in request.session: return () user_id = request.session['user']['user_id'] favorites = [ str(k[0]) for k in request.dbsession.query(DBWarehouse.warehouse_id).outerjoin( DBWarehouseUser).filter( DBWarehouseUser.user_id == user_id, DBWarehouseUser.favorite) ] request.session['favorites'] = tuple(favorites) return request.session['favorites']
# =============================================================================
[docs] def warehouses_in_menu(request: Request) -> dict: """Return a dictionary of warehouses in the menu. :type request: pyramid.request.Request :param request: Current request. :rtype: dict """ if 'in_menu' in request.session: return request.session['in_menu'] if 'user' not in request.session: return {} user_id = request.session['user']['user_id'] in_menu = { str(k.warehouse_id): loads(k.i18n_label) for k in request.dbsession.query(DBWarehouse).join( DBWarehouseUser).filter(DBWarehouseUser.user_id == user_id).filter( DBWarehouseUser.in_menu).order_by('warehouse_id') } request.session['in_menu'] = in_menu return request.session['in_menu']
# =============================================================================
[docs] def apply_regex(request: Request, regex_file: str, content: str) -> str: """Apply a list of regular expressions from a file. :type request: pyramid.request.Request :param request: Current request. :param str regex_file: Absolute path to the file containing regular expressions. :param str content: Content to process. :rtype: str """ if not regex_file: return content if not exists(regex_file): log_error(request, translate( # yapf: disable _('Regular expression file "${f}" does not exist.', { 'f': regex_file}), lang='en')) return content with io_open(regex_file, 'r', encoding='utf8') as lines: for line in lines: if not line or line[0] == '#' or line[0:7] == '[Regex]': continue pattern, replace = line.partition(' =')[::2] pattern = pattern.strip() if not pattern: continue if pattern[0] in '\'"' and pattern[-1] in '\'"': pattern = pattern[1:-1] replace = replace.strip() if replace and replace[0] in '\'"' and replace[-1] in '\'"': replace = replace[1:-1] # pylint: disable = eval-used if replace.startswith('lambda'): replace = eval(replace) # nosec content = re_sub( pattern, replace, content, flags=MULTILINE | UNICODE) return content