"""CioWarehouse2 module: a Chrysalio add-on to manage digital warehouses."""
# pylint: disable = too-many-lines
from __future__ import annotations
from sys import exit as sys_exit
from logging import getLogger
from os import makedirs, scandir
from os.path import exists, join, dirname
from shutil import rmtree
from json import loads
from argparse import Namespace
from sqlalchemy.orm.session import Session
from pyramid.config import Configurator
from pyramid.request import Request
from pyramid.registry import Registry
from chrysalio.initialize import Initialize
from chrysalio.lib.config import settings_get_list, settings_get_namespace
from chrysalio.includes.modules.models import DBModule
from chrysalio.includes.cache import cache_user_access, cache_global_item
from chrysalio.includes.cache import cache_namespace
from chrysalio.modules import Module
from chrysalio.scripts import ScriptRegistry
from chrysalio.lib.navigation import CURRENT_CLASS, NavEntry, NavSubentry
from chrysalio.lib.navigation import entry_current_class
from chrysalio.lib.i18n import translate_field
from .relaxng import RELAXNG_CIOWAREHOUSE2
from .security import PRINCIPALS_CIOWAREHOUSE2
from .lib.utils import CIOWAREHOUSE2_NS, CACHE_REGION_GLOBAL, CACHE_REGION_USER
from .lib.utils import FILE_RIGHTS_INDEX, META_RIGHTS_INDEX, build_callback
from .lib.utils import warehouses_in_menu
from .lib.backend import CioBackend
from .lib.warehouse import Warehouse
from .lib.file_panel import FilePanel
from .lib.ciotype import CioType
from .lib.manager import Manager
from .lib.i18n import _, translate
from .managers.html import ManagerHtml
from .managers.pdf import ManagerPdf
from .managers.image import ManagerImage
from .managers.audio import ManagerAudio
from .managers.video import ManagerVideo
from .managers.config import ManagerConfig
from .managers.code import ManagerCode
from .managers.text import ManagerText
from .managers.grammar import ManagerGrammar
from .managers.directory import ManagerDirectory
from .seeders.directory import SeederDirectory
from .seeders.file import SeederFile
from .models.populate import xml2db as _xml2db, db2xml as _db2xml
from .models.dbfield import DBField
from .models.dbseed import DBSeed
from .models.dbwarehouse import CORE_DISPLAY, DBWarehouse
from .models.dbwarehouse import DBWarehouseUser, DBWarehouseGroup
from .routes import HOME_PAGES, ADVANCED_SEARCH_ROUTE
# pylint: disable = no-name-in-module
from .proto.ciotantivy_pb2 import InspectField
# pylint: enable = no-name-in-module
LOG = getLogger(__name__)
RIGHT_STRENGTH = {None: 0, 'reader': 1, 'writer': 2, 'writer-admin': 3}
CORE_MANAGERS = (
ManagerDirectory, ManagerText, ManagerConfig, ManagerCode, ManagerVideo,
ManagerAudio, ManagerImage, ManagerHtml, ManagerPdf, ManagerGrammar)
CORE_SEEDERS = (SeederDirectory, SeederFile)
# =============================================================================
[docs]
def includeme(configurator: Configurator | ScriptRegistry):
"""Function to include CioWarehouse module.
:type configurator: pyramid.config.Configurator
:param configurator:
Object used to do configuration declaration within the application.
"""
# Registration
Module.register(configurator, ModuleCioWarehouse2)
if not isinstance(configurator, Configurator):
return
# Cache
if 'cache_user' not in configurator.registry.keys() or \
'cache_global' not in configurator.registry.keys():
sys_exit(translate(_('*** You must register a cache manager.')))
# Permissions
configurator.include('ciowarehouse2.security')
# Routes
configurator.include('ciowarehouse2.routes')
# Translations
configurator.add_translation_dirs(join(dirname(__file__), 'Locale'))
# Views
static_dir = join(dirname(__file__), 'Static')
Initialize(configurator).add_static_views(
__package__, ( # yapf: disable
('css', join(static_dir, 'Css')),
('js', join(static_dir, 'Js')),
('images', join(static_dir, 'Images')),
('pdfjs', join(static_dir, 'Pdfjs'))))
configurator.scan('ciowarehouse2.views')
# =============================================================================
[docs]
class SubnavWarehouses(NavSubentry):
"""Sub-entry to display the index of warehouses."""
uid = 'warehouses'
label = _('Index of warehouses')
route = 'warehouse_index'
permission = 'warehouse-view'
[docs]
class NavWarehouses(NavEntry):
"""Entry for warehouses."""
uid = 'warehouses'
label = _('Warehouses')
auto_open = False
permission = 'warehouse-view'
subentries = (SubnavWarehouses(), )
# -------------------------------------------------------------------------
# =============================================================================
[docs]
class SubnavSeeds(NavSubentry):
"""Entry for seeds."""
uid = 'seeds'
label = _('Seeds')
route = 'seed_index'
permission = 'seed-view'
# =============================================================================
[docs]
class ModuleCioWarehouse2(Module):
"""Class for CioWarehouse2 module.
:param str config_ini:
Absolute path to the configuration file (e.g. development.ini).
This module has the following attributes:
* ``backend``: object to communicate with CioTantivy backend
* ``locations``: a dictionary of absolute paths of warehouse locations
* ``homes``: a dictionary of absolute paths of homes (icons, thumbnails)
* ``restful``: a dictionary defining the RESTful parameters
"""
name = _('Warehouse2')
implements = ('warehouse', )
dependencies = ('cioservice', )
relaxng = RELAXNG_CIOWAREHOUSE2
xml2db = (_xml2db, )
db2xml = (_db2xml, )
areas = {'ciowarehouse2.browse': _('Warehouse browsing')}
_DBModule = DBModule
# -------------------------------------------------------------------------
def __init__(self, config_ini: str):
"""Constructor method."""
super().__init__(config_ini)
# Get site UID
settings = self._settings(config_ini, 'app:main')
uid = settings.get('site.uid')
# Get backend
settings = self._settings(config_ini)
self.backend = CioBackend(settings, uid)
# Read locations
self.locations = settings_get_list(settings, 'locations')
if not self.locations:
sys_exit(translate(_( # yapf: disable
'*** ${n}: "locations" is missing.', {'n': self.uid})))
self.locations = {
k.split(':')[0]: k.split(':')[1]
for k in self.locations if ':' in k
}
if not self.locations:
sys_exit(translate(_( # yapf: disable
'*** ${n}: "locations" must be a list of WAREHOUSE_ID:PATH.',
{'n': self.uid})))
# Create location directories
for location in self.locations:
try:
makedirs(self.locations[location], exist_ok=True)
except OSError:
sys_exit(translate(_( # yapf: disable
'*** ${n}: Unable to create location "${l}".',
{'n': self.uid, 'l': self.locations[location]})))
# Navigation
self._nav_entries = (NavWarehouses(), )
self._nav_admin_subentries = (SubnavSeeds(), )
# Retrieve RESTful parameters
self.restful = settings_get_namespace(settings, 'restful')
# Retrieve manager parameters
self._managers_config: dict = settings_get_namespace(
settings, 'managers')
if self._managers_config.get('develop') == 'true' \
and exists(self._managers_config['root']):
rmtree(self._managers_config['root'])
# Search
self.search: dict = settings_get_namespace(settings, 'search')
# Warehouses
self._warehouses: dict[str, Warehouse] = {}
# -------------------------------------------------------------------------
[docs]
def populate(
self, args: Namespace, registry: Registry, dbsession: Session):
"""Method called by populate script to complete the operation.
See: :meth:`chrysalio.modules.Module.populate`
"""
# Clean up manager installation
managers_root = self._managers_config.get('root')
if managers_root and exists(managers_root):
rmtree(managers_root)
# Check Chrysalio backend
err = self.backend.ping()
if err is not None: # pragma: nocover
sys_exit(f'*** {translate(err)}')
# Reset index and remove thumbnails
if '--reset' in args.extra or (hasattr(args, 'reset') and args.reset):
LOG.info(translate(_('Erasing indexes and thumbnails')))
err = self.backend.reset()
if err is not None: # pragma: nocover
sys_exit(f'*** {translate(err)}')
# Populate warehouses
LOG.info(translate(_('====== Updating warehouses')))
is_ok = True
for dbwarehouse in dbsession.query(DBWarehouse):
LOG.info('......{0:.<32}'.format(dbwarehouse.warehouse_id))
is_ok &= self._prepare_warehouse(args, registry, dbwarehouse)
return translate(_('Warehouses have errors')) if not is_ok else None
# -------------------------------------------------------------------------
[docs]
def activate(self, registry: Registry, dbsession: Session):
"""Method to activate the module.
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
# Check Chrysalio backend
error = self.backend.ping()
if error is not None: # pragma: nocover
sys_exit(f'*** {translate(error)}')
# Security
if PRINCIPALS_CIOWAREHOUSE2[0] not in registry['principals']:
for principals in PRINCIPALS_CIOWAREHOUSE2:
registry['principals'].append(principals)
# Navigation
if 'navigation' in registry and 'main' in registry['navigation'] \
and self._nav_entries[0] not in registry['navigation']['main']:
for nav_entry in self._nav_entries:
registry['navigation']['main'].insert(1, nav_entry)
registry['navigation']['main'][-1].subentries = \
registry['navigation']['main'][-1].subentries[:-3] + \
self._nav_admin_subentries + \
registry['navigation']['main'][-1].subentries[-3:]
# Home pages
if 'homes' not in registry:
registry['homes'] = []
registry['homes'] += HOME_PAGES
# Backend homes and fields
self._register_backend_homes_and_fields(registry, dbsession)
# Managers
self._register_managers(registry)
# Seeders
self.register_seeds(registry, dbsession)
# FileInfo registration
FilePanel.register(registry, FilePanel)
# Advanced search
if self.search.get('advanced') == 'true' and 'search' not in registry:
registry['search'] = ADVANCED_SEARCH_ROUTE
# Callback
registry['modules']['cioservice'].build_manager.add_callback(
'ciowarehouse2', build_callback)
# -------------------------------------------------------------------------
[docs]
def deactivate(self, registry: Registry, dbsession: Session):
"""Method to deactivate the module.
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
# pylint: disable = unused-argument, too-many-branches
# Security
if PRINCIPALS_CIOWAREHOUSE2[0] in registry['principals']:
for principals in PRINCIPALS_CIOWAREHOUSE2:
registry['principals'].remove(principals)
# Navigation
if 'navigation' in registry and 'main' in registry['navigation'] \
and self._nav_entries[0] in registry['navigation']['main']:
for nav_entry in self._nav_entries:
registry['navigation']['main'].remove(nav_entry)
admin_subentries = list(
registry['navigation']['main'][-1].subentries)
for nav_entry in self._nav_admin_subentries:
admin_subentries.remove(nav_entry)
registry['navigation']['main'][-1].subentries = tuple(
admin_subentries)
# Home pages
for home in HOME_PAGES:
if home in registry['homes']:
registry['homes'].remove(home)
# Registry entries
for entry in ('backend_homes', 'tokens', 'fields', 'seeders', 'seeds',
'managers'):
if entry in registry:
del registry[entry]
# Panels
panels = registry.get('panels', '')
for panel_id in tuple(panels):
if panel_id == 'filepanel' or hasattr(panels[panel_id], 'manager'):
del registry['panels'][panel_id]
# Advanced search
if 'search' in registry \
and registry['search'] == ADVANCED_SEARCH_ROUTE:
del registry['search']
# Callback
registry['modules']['cioservice'].build_manager.remove_callback(
'ciowarehouse2')
# Warehouses
registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS)
for warehouse_id in self._warehouses:
registry['cache_global'].clear(
namespace=cache_namespace(CIOWAREHOUSE2_NS, warehouse_id))
self._warehouses = {}
# -------------------------------------------------------------------------
[docs]
def configuration_route(self, request: Request) -> str:
"""Return the route to configure this module.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: str
"""
return request.route_path('ciowarehouse2_view')
# -------------------------------------------------------------------------
@cache_user_access(CIOWAREHOUSE2_NS, CACHE_REGION_USER)
@classmethod
def warehouse_access(cls, request: Request, warehouse: Warehouse) -> tuple:
"""Return a tuple defining the authorization of the user on the
warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: .lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:rtype: tuple
:return:
A tuple with index ``FILE_RIGHTS_INDEX`` and ``META_RIGHTS_INDEX``.
"""
if warehouse is None:
return (None, 'reader')
# Compute rights
user_id = request.session['user']['user_id'] \
if 'user' in request.session else ''
groups = request.session['user']['groups'] \
if 'user' in request.session else ()
dbwrh2_user = request.dbsession.query(DBWarehouseUser)\
.filter_by(warehouse_id=warehouse.uid, user_id=user_id).first()
right_list = [(dbwrh2_user.file_rights, dbwrh2_user.meta_rights)] \
if dbwrh2_user else []
right_list += request.dbsession.query(
DBWarehouseGroup.file_rights, DBWarehouseGroup.meta_rights)\
.filter_by(warehouse_id=warehouse.uid)\
.filter(DBWarehouseGroup.group_id.in_(groups)).all()
rights = [None, 'reader']
for item in right_list:
if RIGHT_STRENGTH[item[0]] > RIGHT_STRENGTH[rights[0]]:
rights[0] = item[0]
if RIGHT_STRENGTH[item[1]] > RIGHT_STRENGTH[rights[1]]:
rights[1] = item[1]
i_creator = request.has_permission('warehouse-create')
# Compute access
access = tuple(rights)
if warehouse.access == 'closed':
access = ('reader-admin' if i_creator else None, 'reader')
elif warehouse.access == 'readonly':
access = ('reader-admin' if i_creator else 'reader', 'reader')
elif warehouse.access == 'restricted-readonly':
access = (
'reader-admin' if i_creator else
('reader' if rights[0] is not None else None), 'reader')
elif warehouse.access == 'free' or i_creator:
access = ('writer-admin' if i_creator else 'writer', 'writer')
return access
# -------------------------------------------------------------------------
@cache_global_item(CIOWAREHOUSE2_NS, CACHE_REGION_GLOBAL, warehouse_access)
def warehouse(
self, request: Request,
warehouse_id: str | None) -> Warehouse | None:
"""Return the warehouse with ID ``warehouse_id`` or ``None``.
:type request: pyramid.request.Request
:param request:
Current request.
:param str warehouse_id:
ID of the warehouse to return.
:rtype: :class:`.lib.warehouse.Warehouse` or ``None``
Thanks to the decorator, the global cache ``ciowrh-<warehouse_id>``
stores file lists.
The user cache ``ciowrh-<warehouse_id>`` is a tuple with
following index:
* ``FILE_RIGTS_INDEX``: ``'reader'``, ``'reader-admin'``,
``'writer'``, ``'writer-admin'`` or ``None`` (not authorized)
* ``META_RIGHTS_INDEX``: ``'reader'`` or ``'writer'``
"""
if not warehouse_id:
return None
dbwarehouse = request.dbsession.query(DBWarehouse).filter_by(
warehouse_id=warehouse_id).first()
if dbwarehouse is None or dbwarehouse.location not in self.locations:
return None
warehouse = Warehouse(request.registry, dbwarehouse, self.locations)
if not exists(warehouse.root):
return None
return warehouse
# -------------------------------------------------------------------------
[docs]
def warehouse_admin(
self,
request: Request,
warehouse: Warehouse,
access: tuple | None = None) -> bool:
"""Return ``True`` if the user administrates the warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: lib.warehouse.Warehouse
:param warehouse:
Current warehouse object..
:param tuple access: (optional)
Already retrieved access tuple.
:rtype: bool
"""
if access is None:
access = self.warehouse_access(request, warehouse)
return (access[FILE_RIGHTS_INDEX] or '')[-5:] == 'admin'
# -------------------------------------------------------------------------
[docs]
def warehouse_file_writer(
self,
request: Request,
warehouse: Warehouse,
access: tuple | None = None) -> bool:
"""Return ``True`` if the user can write files in this warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:type warehouse: lib.warehouse.Warehouse
:param warehouse:
Current warehouse object.
:param tuple access: (optional)
Already retrieved access tuple.
:rtype: bool
"""
if access is None:
access = self.warehouse_access(request, warehouse)
return (access[FILE_RIGHTS_INDEX] or '')[:6] == 'writer'
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def warehouse_root(
self, request: Request, warehouse_id: str | None) -> str | None:
"""Return the root directory of the warehouse with ID ``warehouse_id``
or ``None``.
:type request: pyramid.request.Request
:param request:
Current request.
:param str warehouse_id:
ID of the warehouse to return.
:rtype: str
"""
if not warehouse_id:
return None
if warehouse_id not in self._warehouses:
dbwarehouse = request.dbsession.query(DBWarehouse).filter_by(
warehouse_id=warehouse_id).first()
if dbwarehouse is None:
return None
self._warehouses[warehouse_id] = Warehouse(
request.registry, dbwarehouse, self.locations)
return self._warehouses[warehouse_id].root
# -------------------------------------------------------------------------
[docs]
def warehouse_forget(
self, request: Request, warehouse_id: str | None = None):
"""Remove warehouse from list.
:type request: pyramid.request.Request
:param request:
Current request.
:param str warehouse_id: (optional)
ID of the warehouse to forget.
"""
if warehouse_id is None:
request.registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS)
for uid in self._warehouses:
namespace = cache_namespace(CIOWAREHOUSE2_NS, uid)
request.registry['cache_global'].clear(namespace=namespace)
request.registry['cache_user'].clear(
request, namespace=namespace)
self._warehouses = {}
else:
self.cache_clear(request, warehouse_id)
if warehouse_id in self._warehouses:
del self._warehouses[warehouse_id]
# -------------------------------------------------------------------------
[docs]
@classmethod
def cache_clear(cls, request: Request, warehouse_id: str):
"""Clear file and metadata cache for a warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param str warehouse_id:
ID of the warehouse.
"""
namespace = cache_namespace(CIOWAREHOUSE2_NS, warehouse_id)
request.registry['cache_global'].clear(namespace=CIOWAREHOUSE2_NS)
request.registry['cache_global'].clear(namespace=namespace)
request.registry['cache_user'].clear(request, namespace=namespace)
# -------------------------------------------------------------------------
[docs]
@classmethod
def manager(cls, request: Request, ciotype: CioType) -> Manager | None:
"""Return the best file manager for the given `CioType`.
:type request: pyramid.request.Request
:param request:
Current request.
:type ciotype: .lib.ciotype.CioType
:param ciotype:
`CioType` of the file.
:rtype: .lib.manager.Manager
"""
managers = request.registry.get('managers')
if not managers:
return None
# Exact match
if ciotype in managers:
return managers[ciotype]
if ciotype.subtype is None:
return None
# Without suffix
if '+' in ciotype.subtype:
candidate = CioType(
ciotype.type_,
ciotype.subtype.partition('+')[0])
if candidate in managers and managers[candidate].match(ciotype):
return managers[candidate]
# Generic manager
manager = managers.get(CioType(ciotype.type_))
if manager is not None and manager.match(ciotype):
return manager
return None
# -------------------------------------------------------------------------
def _prepare_warehouse(self, args, registry, dbwarehouse) -> bool:
"""Prepare a warehouse according to command line arguments.
:type args: argparse.Namespace
:param args:
Command line arguments.
:param dict registry:
Dictionary registry.
:type dbwarehouse: .models.dbwarehouse.DBWarehouse
:param dbwarehouse:
SQLAlchemy-powered warehouse class.
:rtype: bool
"""
# Check location
if dbwarehouse.location not in self.locations:
LOG.error(translate(_( # yapf: disable
'Location ${l} for warehouse ${w} does not exist.',
{'l': dbwarehouse.location, 'w': dbwarehouse.warehouse_id})))
return False
# Warehouse object creation
warehouse = Warehouse(registry, dbwarehouse, self.locations)
if not exists(warehouse.root) or not scandir(warehouse.root):
err = warehouse.vcs.clone()
if err:
LOG.error(err)
return False
else:
err = warehouse.vcs.pull()
if err:
LOG.error(err)
return False
# Clean up dirty warehouse
if warehouse.vcs.is_dirty():
warehouse.vcs.add()
warehouse.vcs.commit(
translate(_('Automatic integration.')),
registry.settings['site.uid'])
# Remove locks
if '--remove-locks' in args.extra or ( # yapf: disable
hasattr(args, 'remove_locks') and args.remove_locks):
warehouse.unlock_all()
# Skip refresh
if '--skip-refresh' in args.extra or ( # yapf: disable
hasattr(args, 'skip_refresh') and args.skip_refresh):
return True
# Index and create thumbnails
now = '--now' in args.extra or (hasattr(args, 'now') and args.force)
force = '--force' in args.extra or (
hasattr(args, 'force') and args.force)
no_thumbs = '--no-thumbs' in args.extra or (
hasattr(args, 'no_thumbs') and args.no_thumbs)
if warehouse.must_refresh(now=now, force=force):
self.backend.index(warehouse, force=force, no_thumbs=no_thumbs)
warehouse.refreshed()
LOG.info(translate(_('updated index')))
return True
# -------------------------------------------------------------------------
def _register_backend_homes_and_fields(
self, registry: Registry, dbsession: Session):
"""Fill the registry with:
- a dictionary defining homes with keys ``'thumbnails'`` and
``'icons'``
- a dictionary where each key is a field ID and each entry a
dictionary with keys ``'display'``, ``'indexed'``, ``'stored'``,
``'hidden'``, ``'label'``, ``'in_cards'``, ``'in_list'``,
``'in_snippets'``, ``'in_meta'``, ``'classes'`` and, possibly,
``'choices'``
- a list of metadata fields
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
reply, error = self.backend.inspect()
if error is not None: # pragma: nocover
registry['backend_homes'] = {'thumbnails': '.', 'icons': '.'}
registry['fields'] = {}
LOG.error(translate(error))
return
self._register_backend_homes(registry, reply)
self._register_fields(registry, dbsession, reply)
# -------------------------------------------------------------------------
@classmethod
def _register_backend_homes(cls, registry: Registry, reply):
"""Fill dictionary defining homes with keys ``'thumbnails'`` and
``'icons'`` and add a static route to the icons directory.
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type reply: sqlalchemy.orm.session.Session
:param reply:
Backend reply for an inspect request.
"""
registry['backend_homes'] = {
'thumbnails': reply.homes.thumbnails,
'icons': reply.homes.icons
}
url = f"/{__package__.split('.', maxsplit=1)[0]}/home/icons"
if 'configurator' in registry and \
registry['configurator'].introspector.get(
'static views', url) is None:
registry['configurator'].add_static_view(
url, registry['backend_homes']['icons'])
# -------------------------------------------------------------------------
def _register_fields(self, registry: Registry, dbsession: Session, reply):
"""Fill:
- a dictionary defining homes with keys ``'thumbnails'`` and
``'icons'``
- a dictionary where each key is a field ID and each entry a
dictionary with keys ``'display'``, ``'indexed'``, ``'stored'``,
``'hidden'``, ``'label'``, ``'in_cards'``, ``'in_list'``,
``'in_snippets'``, ``'in_meta'``, ``'classes'`` and, possibly,
``'choices'``
- a list of metadata fields
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
"""
# Labels, locations and choices for fields
fields = {}
for dbitem in dbsession.query(DBField):
fields[dbitem.field_id] = {
'display': dbitem.display,
'label': loads(str(dbitem.i18n_label)),
'position': dbitem.position + 1000,
'in_cards': dbitem.in_cards,
'in_list': dbitem.in_list,
'in_snippets': dbitem.in_snippets,
'in_filter': dbitem.in_filter,
'in_meta': dbitem.in_meta,
'classes': dbitem.classes or '',
'core': False,
'indexed': False,
'default': False,
'suggested': False,
'stored': False,
'hidden': False,
'sortable': False,
}
if dbitem.display in ['list', 'color']:
choices = [ # yapf: disable
(k.value, loads(k.i18n_label)) for k in dbitem.choices]
if choices:
fields[dbitem.field_id]['choices'] = dict(choices)
# Fields from backend
langs = settings_get_list(registry.settings, 'languages', ('en', ))
for field in reply.fields:
if field.name in fields:
fields[field.name].update({ # yapf: disable
'field_type': field.field_type,
'display': self._fix_display(
fields[field.name]['display'], field),
'core': field.core,
'indexed': field.indexed,
'default': field.default,
'suggested': field.suggested,
'stored': field.stored,
'sortable': field.sortable,
})
else:
label = { # yapf: disable
k: translate(
_(field.name.capitalize().replace('_', ' ')), k)
for k in langs}
fields[field.name] = { # yapf: disable
'field_type': field.field_type,
'display': self._reply_field2display(field),
'label': label,
'position': 2000,
'in_cards': 1 if field.name == 'file_name' else -1,
'in_list': CORE_DISPLAY.get(field.name, (-1, ''))[0],
'in_snippets': 1 if field.name == 'file_name' else -1,
'in_filter': 1 if field.name == 'file_name' else -1,
'in_meta': -1,
'classes': CORE_DISPLAY.get(field.name, (-1, ''))[1],
'core': field.core,
'indexed': field.indexed,
'default': field.default,
'suggested': field.suggested,
'stored': field.stored,
'hidden': field.hidden,
'sortable': field.sortable,
}
# Fix in_cards, in_list, in_snippets and in_meta
for field_id, field in fields.items():
if field_id not in CORE_DISPLAY and ( # yapf: disable
field['core'] or not field['stored'] or field['hidden']):
field['in_cards'] = -1
field['in_list'] = -1
if field['core']:
field['in_meta'] = -1
registry['fields'] = fields
# -------------------------------------------------------------------------
def _register_managers(self, registry: Registry):
"""Fill the registry with a dictionary defining managers.
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
"""
# Dictionary of managers
if 'managers' not in registry:
registry['managers'] = {}
for manager_class in CORE_MANAGERS:
if manager_class.ciotype not in registry['managers']:
registry['managers'][manager_class.ciotype] = manager_class()
# Manager initialization and panel registration
for manager in registry['managers'].values():
manager.initialize(self._managers_config)
if manager.panel is not None:
panel = manager.panel.register(registry, manager.panel)
panel.manager = manager
# -------------------------------------------------------------------------
[docs]
def register_seeds(self, registry: Registry, dbsession: Session):
"""Fill the registry with a dictionary defining seeders and another
defining seeds.
:type registry: pyramid.registry.Registry
:param registry:
Application registry.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
"""
# Dictionary of seeders
if 'seeders' not in registry:
registry['seeders'] = {}
for seeder_class in CORE_SEEDERS:
if seeder_class.uid not in registry['seeders']:
registry['seeders'][seeder_class.uid] = seeder_class()
# Dictionary of seeds
registry['seeds'] = {}
for dbseed in dbsession.query(DBSeed):
registry['seeds'][str(dbseed.seed_id)] = {
'icon': (str(dbseed.attachments_key), str(dbseed.icon)),
'label': loads(str(dbseed.i18n_label)),
'seeder': str(dbseed.seeder)
}
if 'directory' not in registry['seeds']:
registry['seeds']['directory'] = {
'core': True,
'icon': '/ciowarehouse2/images/seed_directory.svg',
'label': _('Directory'),
'seeder': 'directory'
}
# -------------------------------------------------------------------------
@classmethod
def _reply_field2display(cls, field: InspectField) -> str:
"""Compute the best display mode for a Tantivy field.
:type field: .proto.ciotantivy_pb2.InspectField
:param field:
Tantivy field.
:rtype: str
"""
display = {
'file_name': 'file_name',
'file_size': 'file_size',
'file_date': 'age'
}.get(field.name)
if display is not None:
return display
return {
'U64': 'integer',
'I64': 'integer',
'F64': 'decimal',
'Bool': 'boolean',
'Date': 'date',
'DateTime': 'datetime',
'Bytes': 'text',
'Facet': 'facet',
'Json': 'text',
'IP': 'ip',
'Strings': 'strings',
'Color': 'color',
}.get(field.field_type, 'string')
# -------------------------------------------------------------------------
@classmethod
def _fix_display(cls, display: str, field: InspectField) -> str:
"""Fix the display mode for a Tantivy field:
:param str display:
Original display to fix.
:type field: .proto.ciotantivy_pb2.InspectField
:param field:
Tantivy field.
:rtype: str
"""
# pylint: disable = too-many-return-statements
fixed = {
'file_name': 'file_name',
'file_size': 'file_size',
'file_date': 'age'
}.get(field.name)
if fixed is not None:
return fixed
fixed = {
'Bool': 'boolean',
'Bytes': 'text',
'Json': 'text',
}.get(field.field_type)
if fixed is not None:
return fixed
if field.field_type == 'Text' and display not in ( # yapf: disable
'string', 'text', 'file_name', 'title', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'string'
if field.field_type in ('U64', 'I64') and display not in (
'integer', 'decimal', 'file_size', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'integer'
if field.field_type == 'F64' and display not in ('decimal', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'decimal'
if field.field_type == 'Date' and display not in ('date', 'age'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'date'
if field.field_type == 'DateTime' and display not in ( # yapf: disable
'datetime', 'date', 'age'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'datetime'
if field.field_type == 'Facet' and display not in ('facet', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'facet'
if field.field_type == 'IP' and display not in ('ip', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'ip'
if field.field_type == 'Color' and display not in ('color', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'color'
if field.field_type == 'Strings' and display not in ( # yapf: disable
'strings', 'list'):
LOG.warning(translate(_('Field "${f}" fixed', {'f': field.name})))
return 'strings'
return display