Source code for ciowarehouse2

"""CioWarehouse2 module: a Chrysalio add-on to manage digital warehouses."""
# pylint: disable = too-many-lines

from __future__ import annotations
from sys import exit as sys_exit
from logging import getLogger
from os import makedirs, scandir
from os.path import exists, join, dirname
from shutil import rmtree
from json import loads
from argparse import Namespace

from sqlalchemy.orm.session import Session

from pyramid.config import Configurator
from pyramid.request import Request
from pyramid.registry import Registry

from chrysalio.initialize import Initialize
from chrysalio.lib.config import settings_get_list, settings_get_namespace
from chrysalio.includes.modules.models import DBModule
from chrysalio.includes.cache import cache_user_access, cache_global_item
from chrysalio.includes.cache import cache_namespace
from chrysalio.modules import Module
from chrysalio.scripts import ScriptRegistry
from chrysalio.lib.navigation import CURRENT_CLASS, NavEntry, NavSubentry
from chrysalio.lib.navigation import entry_current_class
from chrysalio.lib.i18n import translate_field
from .relaxng import RELAXNG_CIOWAREHOUSE2
from .security import PRINCIPALS_CIOWAREHOUSE2
from .lib.utils import CIOWAREHOUSE2_NS, CACHE_REGION_GLOBAL, CACHE_REGION_USER
from .lib.utils import FILE_RIGHTS_INDEX, META_RIGHTS_INDEX, build_callback
from .lib.utils import warehouses_in_menu
from .lib.backend import CioBackend
from .lib.warehouse import Warehouse
from .lib.file_panel import FilePanel
from .lib.ciotype import CioType
from .lib.manager import Manager
from .lib.i18n import _, translate
from .managers.html import ManagerHtml
from .managers.pdf import ManagerPdf
from .managers.image import ManagerImage
from .managers.audio import ManagerAudio
from .managers.video import ManagerVideo
from .managers.config import ManagerConfig
from .managers.code import ManagerCode
from .managers.text import ManagerText
from .managers.grammar import ManagerGrammar
from .managers.directory import ManagerDirectory
from .seeders.directory import SeederDirectory
from .seeders.file import SeederFile
from .models.populate import xml2db as _xml2db, db2xml as _db2xml
from .models.dbfield import DBField
from .models.dbseed import DBSeed
from .models.dbwarehouse import CORE_DISPLAY, DBWarehouse
from .models.dbwarehouse import DBWarehouseUser, DBWarehouseGroup
from .routes import HOME_PAGES, ADVANCED_SEARCH_ROUTE
# pylint: disable = no-name-in-module
from .proto.ciotantivy_pb2 import InspectField
# pylint: enable = no-name-in-module

LOG = getLogger(__name__)
RIGHT_STRENGTH = {None: 0, 'reader': 1, 'writer': 2, 'writer-admin': 3}
CORE_MANAGERS = (
    ManagerDirectory, ManagerText, ManagerConfig, ManagerCode, ManagerVideo,
    ManagerAudio, ManagerImage, ManagerHtml, ManagerPdf, ManagerGrammar)
CORE_SEEDERS = (SeederDirectory, SeederFile)


# =============================================================================
[docs] def includeme(configurator: Configurator | ScriptRegistry): """Function to include CioWarehouse module. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ # Registration Module.register(configurator, ModuleCioWarehouse2) if not isinstance(configurator, Configurator): return # Cache if 'cache_user' not in configurator.registry.keys() or \ 'cache_global' not in configurator.registry.keys(): sys_exit(translate(_('*** You must register a cache manager.'))) # Permissions configurator.include('ciowarehouse2.security') # Routes configurator.include('ciowarehouse2.routes') # Translations configurator.add_translation_dirs(join(dirname(__file__), 'Locale')) # Views static_dir = join(dirname(__file__), 'Static') Initialize(configurator).add_static_views( __package__, ( # yapf: disable ('css', join(static_dir, 'Css')), ('js', join(static_dir, 'Js')), ('images', join(static_dir, 'Images')), ('pdfjs', join(static_dir, 'Pdfjs')))) configurator.scan('ciowarehouse2.views')
# ============================================================================= # ============================================================================= # =============================================================================
[docs] class ModuleCioWarehouse2(Module): """Class for CioWarehouse2 module. :param str config_ini: Absolute path to the configuration file (e.g. development.ini). This module has the following attributes: * ``backend``: object to communicate with CioTantivy backend * ``locations``: a dictionary of absolute paths of warehouse locations * ``homes``: a dictionary of absolute paths of homes (icons, thumbnails) * ``restful``: a dictionary defining the RESTful parameters """ name = _('Warehouse2') implements = ('warehouse', ) dependencies = ('cioservice', ) relaxng = RELAXNG_CIOWAREHOUSE2 xml2db = (_xml2db, ) db2xml = (_db2xml, ) areas = {'ciowarehouse2.browse': _('Warehouse browsing')} _DBModule = DBModule # ------------------------------------------------------------------------- def __init__(self, config_ini: str): """Constructor method.""" super().__init__(config_ini) # Get site UID settings = self._settings(config_ini, 'app:main') uid = settings.get('site.uid') # Get backend settings = self._settings(config_ini) self.backend = CioBackend(settings, uid) # Read locations self.locations = settings_get_list(settings, 'locations') if not self.locations: sys_exit(translate(_( # yapf: disable '*** ${n}: "locations" is missing.', {'n': self.uid}))) self.locations = { k.split(':')[0]: k.split(':')[1] for k in self.locations if ':' in k } if not self.locations: sys_exit(translate(_( # yapf: disable '*** ${n}: "locations" must be a list of WAREHOUSE_ID:PATH.', {'n': self.uid}))) # Create location directories for location in self.locations: try: makedirs(self.locations[location], exist_ok=True) except OSError: sys_exit(translate(_( # yapf: disable '*** ${n}: Unable to create location "${l}".', {'n': self.uid, 'l': self.locations[location]}))) # Navigation self._nav_entries = (NavWarehouses(), ) self._nav_admin_subentries = (SubnavSeeds(), ) # Retrieve RESTful parameters self.restful = settings_get_namespace(settings, 'restful') # Retrieve manager parameters self._managers_config: dict = settings_get_namespace( settings, 'managers') if self._managers_config.get('develop') == 'true' \ and exists(self._managers_config['root']): rmtree(self._managers_config['root']) # Search self.search: dict = settings_get_namespace(settings, 'search') # Warehouses self._warehouses: dict[str, Warehouse] = {} # -------------------------------------------------------------------------
[docs] def populate( self, args: Namespace, registry: Registry, dbsession: Session): """Method called by populate script to complete the operation. See: :meth:`chrysalio.modules.Module.populate` """ # Clean up manager installation managers_root = self._managers_config.get('root') if managers_root and exists(managers_root): rmtree(managers_root) # Check Chrysalio backend err = self.backend.ping() if err is not None: # pragma: nocover sys_exit(f'*** {translate(err)}') # Reset index and remove thumbnails if '--reset' in args.extra or (hasattr(args, 'reset') and args.reset): LOG.info(translate(_('Erasing indexes and thumbnails'))) err = self.backend.reset() if err is not None: # pragma: nocover sys_exit(f'*** {translate(err)}') # Populate warehouses LOG.info(translate(_('====== Updating warehouses'))) is_ok = True for dbwarehouse in dbsession.query(DBWarehouse): LOG.info('......{0:.<32}'.format(dbwarehouse.warehouse_id)) is_ok &= self._prepare_warehouse(args, registry, dbwarehouse) return translate(_('Warehouses have errors')) if not is_ok else None
# -------------------------------------------------------------------------
[docs] def activate(self, registry: Registry, dbsession: Session): """Method to activate the module. :type registry: pyramid.registry.Registry :param registry: Application registry. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ # Check Chrysalio backend error = self.backend.ping() if error is not None: # pragma: nocover sys_exit(f'*** {translate(error)}') # Security if PRINCIPALS_CIOWAREHOUSE2[0] not in registry['principals']: for principals in PRINCIPALS_CIOWAREHOUSE2: registry['principals'].append(principals) # Navigation if 'navigation' in registry and 'main' in registry['navigation'] \ and self._nav_entries[0] not in registry['navigation']['main']: for nav_entry in self._nav_entries: registry['navigation']['main'].insert(1, nav_entry) registry['navigation']['main'][-1].subentries = \ registry['navigation']['main'][-1].subentries[:-3] + \ self._nav_admin_subentries + \ registry['navigation']['main'][-1].subentries[-3:] # Home pages if 'homes' not in registry: registry['homes'] = [] registry['homes'] += HOME_PAGES # Backend homes and fields self._register_backend_homes_and_fields(registry, dbsession) # Managers self._register_managers(registry) # Seeders self.register_seeds(registry, dbsession) # FileInfo registration FilePanel.register(registry, FilePanel) # Advanced search if self.search.get('advanced') == 'true' and 'search' not in registry: registry['search'] = ADVANCED_SEARCH_ROUTE # Callback registry['modules']['cioservice'].build_manager.add_callback( 'ciowarehouse2', build_callback)
# -------------------------------------------------------------------------
[docs] def deactivate(self, registry: Registry, dbsession: Session): """Method to deactivate the module. :type registry: pyramid.registry.Registry :param registry: Application registry. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ # pylint: disable = unused-argument, too-many-branches # Security if PRINCIPALS_CIOWAREHOUSE2[0] in registry['principals']: for principals in PRINCIPALS_CIOWAREHOUSE2: registry['principals'].remove(principals) # Navigation if 'navigation' in registry and 'main' in registry['navigation'] \ and self._nav_entries[0] in registry['navigation']['main']: for nav_entry in self._nav_entries: registry['navigation']['main'].remove(nav_entry) admin_subentries = list( registry['navigation']['main'][-1].subentries) for nav_entry in self._nav_admin_subentries: admin_subentries.remove(nav_entry) registry['navigation']['main'][-1].subentries = tuple( admin_subentries) # Home pages for home in HOME_PAGES: if home in registry['homes']: registry['homes'].remove(home) # Registry entries for entry in ('backend_homes', 'tokens', 'fields', 'seeders', 'seeds', 'managers'): if entry in registry: del registry[entry] # Panels panels = registry.get('panels', '') for panel_id in tuple(panels): if panel_id == 'filepanel' or hasattr(panels[panel_id], 'manager'): del registry['panels'][panel_id] # Advanced search if 'search' in registry \ and registry['search'] == ADVANCED_SEARCH_ROUTE: del registry['search'] # Callback registry['modules']['cioservice'].build_manager.remove_callback( 'ciowarehouse2') # Warehouses registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS) for warehouse_id in self._warehouses: registry['cache_global'].clear( namespace=cache_namespace(CIOWAREHOUSE2_NS, warehouse_id)) self._warehouses = {}
# -------------------------------------------------------------------------
[docs] def configuration_route(self, request: Request) -> str: """Return the route to configure this module. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ return request.route_path('ciowarehouse2_view')
# ------------------------------------------------------------------------- @cache_user_access(CIOWAREHOUSE2_NS, CACHE_REGION_USER) @classmethod def warehouse_access(cls, request: Request, warehouse: Warehouse) -> tuple: """Return a tuple defining the authorization of the user on the warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: .lib.warehouse.Warehouse :param warehouse: Current warehouse object. :rtype: tuple :return: A tuple with index ``FILE_RIGHTS_INDEX`` and ``META_RIGHTS_INDEX``. """ if warehouse is None: return (None, 'reader') # Compute rights user_id = request.session['user']['user_id'] \ if 'user' in request.session else '' groups = request.session['user']['groups'] \ if 'user' in request.session else () dbwrh2_user = request.dbsession.query(DBWarehouseUser)\ .filter_by(warehouse_id=warehouse.uid, user_id=user_id).first() right_list = [(dbwrh2_user.file_rights, dbwrh2_user.meta_rights)] \ if dbwrh2_user else [] right_list += request.dbsession.query( DBWarehouseGroup.file_rights, DBWarehouseGroup.meta_rights)\ .filter_by(warehouse_id=warehouse.uid)\ .filter(DBWarehouseGroup.group_id.in_(groups)).all() rights = [None, 'reader'] for item in right_list: if RIGHT_STRENGTH[item[0]] > RIGHT_STRENGTH[rights[0]]: rights[0] = item[0] if RIGHT_STRENGTH[item[1]] > RIGHT_STRENGTH[rights[1]]: rights[1] = item[1] i_creator = request.has_permission('warehouse-create') # Compute access access = tuple(rights) if warehouse.access == 'closed': access = ('reader-admin' if i_creator else None, 'reader') elif warehouse.access == 'readonly': access = ('reader-admin' if i_creator else 'reader', 'reader') elif warehouse.access == 'restricted-readonly': access = ( 'reader-admin' if i_creator else ('reader' if rights[0] is not None else None), 'reader') elif warehouse.access == 'free' or i_creator: access = ('writer-admin' if i_creator else 'writer', 'writer') return access # ------------------------------------------------------------------------- @cache_global_item(CIOWAREHOUSE2_NS, CACHE_REGION_GLOBAL, warehouse_access) def warehouse( self, request: Request, warehouse_id: str | None) -> Warehouse | None: """Return the warehouse with ID ``warehouse_id`` or ``None``. :type request: pyramid.request.Request :param request: Current request. :param str warehouse_id: ID of the warehouse to return. :rtype: :class:`.lib.warehouse.Warehouse` or ``None`` Thanks to the decorator, the global cache ``ciowrh-<warehouse_id>`` stores file lists. The user cache ``ciowrh-<warehouse_id>`` is a tuple with following index: * ``FILE_RIGTS_INDEX``: ``'reader'``, ``'reader-admin'``, ``'writer'``, ``'writer-admin'`` or ``None`` (not authorized) * ``META_RIGHTS_INDEX``: ``'reader'`` or ``'writer'`` """ if not warehouse_id: return None dbwarehouse = request.dbsession.query(DBWarehouse).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is None or dbwarehouse.location not in self.locations: return None warehouse = Warehouse(request.registry, dbwarehouse, self.locations) if not exists(warehouse.root): return None return warehouse # -------------------------------------------------------------------------
[docs] def warehouse_admin( self, request: Request, warehouse: Warehouse, access: tuple | None = None) -> bool: """Return ``True`` if the user administrates the warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: lib.warehouse.Warehouse :param warehouse: Current warehouse object.. :param tuple access: (optional) Already retrieved access tuple. :rtype: bool """ if access is None: access = self.warehouse_access(request, warehouse) return (access[FILE_RIGHTS_INDEX] or '')[-5:] == 'admin'
# -------------------------------------------------------------------------
[docs] def warehouse_file_writer( self, request: Request, warehouse: Warehouse, access: tuple | None = None) -> bool: """Return ``True`` if the user can write files in this warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param tuple access: (optional) Already retrieved access tuple. :rtype: bool """ if access is None: access = self.warehouse_access(request, warehouse) return (access[FILE_RIGHTS_INDEX] or '')[:6] == 'writer'
# -------------------------------------------------------------------------
[docs] def warehouse_meta_writer( self, request: Request, warehouse: Warehouse, access: tuple | None = None) -> bool: """Return ``True`` if the user can write metadata in this warehouse. :type request: pyramid.request.Request :param request: Current request. :type warehouse: lib.warehouse.Warehouse :param warehouse: Current warehouse object. :param tuple access: (optional) Already retrieved access tuple. :rtype: bool """ if access is None: access = self.warehouse_access(request, warehouse) return access[META_RIGHTS_INDEX] == 'writer'
# -------------------------------------------------------------------------
[docs] def warehouse_root( self, request: Request, warehouse_id: str | None) -> str | None: """Return the root directory of the warehouse with ID ``warehouse_id`` or ``None``. :type request: pyramid.request.Request :param request: Current request. :param str warehouse_id: ID of the warehouse to return. :rtype: str """ if not warehouse_id: return None if warehouse_id not in self._warehouses: dbwarehouse = request.dbsession.query(DBWarehouse).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is None: return None self._warehouses[warehouse_id] = Warehouse( request.registry, dbwarehouse, self.locations) return self._warehouses[warehouse_id].root
# -------------------------------------------------------------------------
[docs] def warehouse_forget( self, request: Request, warehouse_id: str | None = None): """Remove warehouse from list. :type request: pyramid.request.Request :param request: Current request. :param str warehouse_id: (optional) ID of the warehouse to forget. """ if warehouse_id is None: request.registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS) for uid in self._warehouses: namespace = cache_namespace(CIOWAREHOUSE2_NS, uid) request.registry['cache_global'].clear(namespace=namespace) request.registry['cache_user'].clear( request, namespace=namespace) self._warehouses = {} else: self.cache_clear(request, warehouse_id) if warehouse_id in self._warehouses: del self._warehouses[warehouse_id]
# -------------------------------------------------------------------------
[docs] @classmethod def cache_clear(cls, request: Request, warehouse_id: str): """Clear file and metadata cache for a warehouse. :type request: pyramid.request.Request :param request: Current request. :param str warehouse_id: ID of the warehouse. """ namespace = cache_namespace(CIOWAREHOUSE2_NS, warehouse_id) request.registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS) request.registry['cache_global'].clear(namespace=namespace) request.registry['cache_user'].clear(request, namespace=namespace)
# -------------------------------------------------------------------------
[docs] @classmethod def manager(cls, request: Request, ciotype: CioType) -> Manager | None: """Return the best file manager for the given `CioType`. :type request: pyramid.request.Request :param request: Current request. :type ciotype: .lib.ciotype.CioType :param ciotype: `CioType` of the file. :rtype: .lib.manager.Manager """ managers = request.registry.get('managers') if not managers: return None # Exact match if ciotype in managers: return managers[ciotype] if ciotype.subtype is None: return None # Without suffix if '+' in ciotype.subtype: candidate = CioType( ciotype.type_, ciotype.subtype.partition('+')[0]) if candidate in managers and managers[candidate].match(ciotype): return managers[candidate] # Generic manager manager = managers.get(CioType(ciotype.type_)) if manager is not None and manager.match(ciotype): return manager return None
# ------------------------------------------------------------------------- def _prepare_warehouse(self, args, registry, dbwarehouse) -> bool: """Prepare a warehouse according to command line arguments. :type args: argparse.Namespace :param args: Command line arguments. :param dict registry: Dictionary registry. :type dbwarehouse: .models.dbwarehouse.DBWarehouse :param dbwarehouse: SQLAlchemy-powered warehouse class. :rtype: bool """ # Check location if dbwarehouse.location not in self.locations: LOG.error(translate(_( # yapf: disable 'Location ${l} for warehouse ${w} does not exist.', {'l': dbwarehouse.location, 'w': dbwarehouse.warehouse_id}))) return False # Warehouse object creation warehouse = Warehouse(registry, dbwarehouse, self.locations) if not exists(warehouse.root) or not scandir(warehouse.root): err = warehouse.vcs.clone() if err: LOG.error(err) return False else: err = warehouse.vcs.pull() if err: LOG.error(err) return False # Clean up dirty warehouse if warehouse.vcs.is_dirty(): warehouse.vcs.add() warehouse.vcs.commit( translate(_('Automatic integration.')), registry.settings['site.uid']) # Remove locks if '--remove-locks' in args.extra or ( # yapf: disable hasattr(args, 'remove_locks') and args.remove_locks): warehouse.unlock_all() # Skip refresh if '--skip-refresh' in args.extra or ( # yapf: disable hasattr(args, 'skip_refresh') and args.skip_refresh): return True # Index and create thumbnails now = '--now' in args.extra or (hasattr(args, 'now') and args.force) force = '--force' in args.extra or ( hasattr(args, 'force') and args.force) no_thumbs = '--no-thumbs' in args.extra or ( hasattr(args, 'no_thumbs') and args.no_thumbs) if warehouse.must_refresh(now=now, force=force): self.backend.index(warehouse, force=force, no_thumbs=no_thumbs) warehouse.refreshed() LOG.info(translate(_('updated index'))) return True # ------------------------------------------------------------------------- def _register_backend_homes_and_fields( self, registry: Registry, dbsession: Session): """Fill the registry with: - a dictionary defining homes with keys ``'thumbnails'`` and ``'icons'`` - a dictionary where each key is a field ID and each entry a dictionary with keys ``'display'``, ``'indexed'``, ``'stored'``, ``'hidden'``, ``'label'``, ``'in_cards'``, ``'in_list'``, ``'in_snippets'``, ``'in_meta'``, ``'classes'`` and, possibly, ``'choices'`` - a list of metadata fields :type registry: pyramid.registry.Registry :param registry: Application registry. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ reply, error = self.backend.inspect() if error is not None: # pragma: nocover registry['backend_homes'] = {'thumbnails': '.', 'icons': '.'} registry['fields'] = {} LOG.error(translate(error)) return self._register_backend_homes(registry, reply) self._register_fields(registry, dbsession, reply) # ------------------------------------------------------------------------- @classmethod def _register_backend_homes(cls, registry: Registry, reply): """Fill dictionary defining homes with keys ``'thumbnails'`` and ``'icons'`` and add a static route to the icons directory. :type registry: pyramid.registry.Registry :param registry: Application registry. :type reply: sqlalchemy.orm.session.Session :param reply: Backend reply for an inspect request. """ registry['backend_homes'] = { 'thumbnails': reply.homes.thumbnails, 'icons': reply.homes.icons } url = f"/{__package__.split('.', maxsplit=1)[0]}/home/icons" if 'configurator' in registry and \ registry['configurator'].introspector.get( 'static views', url) is None: registry['configurator'].add_static_view( url, registry['backend_homes']['icons']) # ------------------------------------------------------------------------- def _register_fields(self, registry: Registry, dbsession: Session, reply): """Fill: - a dictionary defining homes with keys ``'thumbnails'`` and ``'icons'`` - a dictionary where each key is a field ID and each entry a dictionary with keys ``'display'``, ``'indexed'``, ``'stored'``, ``'hidden'``, ``'label'``, ``'in_cards'``, ``'in_list'``, ``'in_snippets'``, ``'in_meta'``, ``'classes'`` and, possibly, ``'choices'`` - a list of metadata fields :type registry: pyramid.registry.Registry :param registry: Application registry. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. """ # Labels, locations and choices for fields fields = {} for dbitem in dbsession.query(DBField): fields[dbitem.field_id] = { 'display': dbitem.display, 'label': loads(str(dbitem.i18n_label)), 'position': dbitem.position + 1000, 'in_cards': dbitem.in_cards, 'in_list': dbitem.in_list, 'in_snippets': dbitem.in_snippets, 'in_filter': dbitem.in_filter, 'in_meta': dbitem.in_meta, 'classes': dbitem.classes or '', 'core': False, 'indexed': False, 'default': False, 'suggested': False, 'stored': False, 'hidden': False, 'sortable': False, } if dbitem.display in ['list', 'color']: choices = [ # yapf: disable (k.value, loads(k.i18n_label)) for k in dbitem.choices] if choices: fields[dbitem.field_id]['choices'] = dict(choices) # Fields from backend langs = settings_get_list(registry.settings, 'languages', ('en', )) for field in reply.fields: if field.name in fields: fields[field.name].update({ # yapf: disable 'field_type': field.field_type, 'display': self._fix_display( fields[field.name]['display'], field), 'core': field.core, 'indexed': field.indexed, 'default': field.default, 'suggested': field.suggested, 'stored': field.stored, 'sortable': field.sortable, }) else: label = { # yapf: disable k: translate( _(field.name.capitalize().replace('_', ' ')), k) for k in langs} fields[field.name] = { # yapf: disable 'field_type': field.field_type, 'display': self._reply_field2display(field), 'label': label, 'position': 2000, 'in_cards': 1 if field.name == 'file_name' else -1, 'in_list': CORE_DISPLAY.get(field.name, (-1, ''))[0], 'in_snippets': 1 if field.name == 'file_name' else -1, 'in_filter': 1 if field.name == 'file_name' else -1, 'in_meta': -1, 'classes': CORE_DISPLAY.get(field.name, (-1, ''))[1], 'core': field.core, 'indexed': field.indexed, 'default': field.default, 'suggested': field.suggested, 'stored': field.stored, 'hidden': field.hidden, 'sortable': field.sortable, } # Fix in_cards, in_list, in_snippets and in_meta for field_id, field in fields.items(): if field_id not in CORE_DISPLAY and ( # yapf: disable field['core'] or not field['stored'] or field['hidden']): field['in_cards'] = -1 field['in_list'] = -1 if field['core']: field['in_meta'] = -1 registry['fields'] = fields # ------------------------------------------------------------------------- def _register_managers(self, registry: Registry): """Fill the registry with a dictionary defining managers. :type registry: pyramid.registry.Registry :param registry: Application registry. """ # Dictionary of managers if 'managers' not in registry: registry['managers'] = {} for manager_class in CORE_MANAGERS: if manager_class.ciotype not in registry['managers']: registry['managers'][manager_class.ciotype] = manager_class() # Manager initialization and panel registration for manager in registry['managers'].values(): manager.initialize(self._managers_config) if manager.panel is not None: panel = manager.panel.register(registry, manager.panel) panel.manager = manager # -------------------------------------------------------------------------
[docs] def register_seeds(self, registry: Registry, dbsession: Session): """Fill the registry with a dictionary defining seeders and another defining seeds. :type registry: pyramid.registry.Registry :param registry: Application registry. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: """ # Dictionary of seeders if 'seeders' not in registry: registry['seeders'] = {} for seeder_class in CORE_SEEDERS: if seeder_class.uid not in registry['seeders']: registry['seeders'][seeder_class.uid] = seeder_class() # Dictionary of seeds registry['seeds'] = {} for dbseed in dbsession.query(DBSeed): registry['seeds'][str(dbseed.seed_id)] = { 'icon': (str(dbseed.attachments_key), str(dbseed.icon)), 'label': loads(str(dbseed.i18n_label)), 'seeder': str(dbseed.seeder) } if 'directory' not in registry['seeds']: registry['seeds']['directory'] = { 'core': True, 'icon': '/ciowarehouse2/images/seed_directory.svg', 'label': _('Directory'), 'seeder': 'directory' }
# ------------------------------------------------------------------------- @classmethod def _reply_field2display(cls, field: InspectField) -> str: """Compute the best display mode for a Tantivy field. :type field: .proto.ciotantivy_pb2.InspectField :param field: Tantivy field. :rtype: str """ display = { 'file_name': 'file_name', 'file_size': 'file_size', 'file_date': 'age' }.get(field.name) if display is not None: return display return { 'U64': 'integer', 'I64': 'integer', 'F64': 'decimal', 'Bool': 'boolean', 'Date': 'date', 'DateTime': 'datetime', 'Bytes': 'text', 'Facet': 'facet', 'Json': 'text', 'IP': 'ip', 'Strings': 'strings', 'Color': 'color', }.get(field.field_type, 'string') # ------------------------------------------------------------------------- @classmethod def _fix_display(cls, display: str, field: InspectField) -> str: """Fix the display mode for a Tantivy field: :param str display: Original display to fix. :type field: .proto.ciotantivy_pb2.InspectField :param field: Tantivy field. :rtype: str """ # pylint: disable = too-many-return-statements fixed = { 'file_name': 'file_name', 'file_size': 'file_size', 'file_date': 'age' }.get(field.name) if fixed is not None: return fixed fixed = { 'Bool': 'boolean', 'Bytes': 'text', 'Json': 'text', }.get(field.field_type) if fixed is not None: return fixed if field.field_type == 'Text' and display not in ( # yapf: disable 'string', 'text', 'file_name', 'title', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'string' if field.field_type in ('U64', 'I64') and display not in ( 'integer', 'decimal', 'file_size', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'integer' if field.field_type == 'F64' and display not in ('decimal', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'decimal' if field.field_type == 'Date' and display not in ('date', 'age'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'date' if field.field_type == 'DateTime' and display not in ( # yapf: disable 'datetime', 'date', 'age'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'datetime' if field.field_type == 'Facet' and display not in ('facet', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'facet' if field.field_type == 'IP' and display not in ('ip', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'ip' if field.field_type == 'Color' and display not in ('color', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'color' if field.field_type == 'Strings' and display not in ( # yapf: disable 'strings', 'list'): LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name}))) return 'strings' return display