"""SQLAlchemy-powered model definitions for warehouses."""
# pylint: disable = too-many-lines
from __future__ import annotations
from os.path import expanduser, abspath
from json import dumps
from collections import OrderedDict
from urllib.parse import urlparse
from sqlalchemy import Column, ForeignKey, String, Enum, Integer, Boolean
from sqlalchemy import PickleType, Text
from sqlalchemy.orm import Session, relationship
from lxml import etree
import colander
from pyramid.request import Request
from chrysalio.lib.i18n import record_format_i18n, translate_field
from chrysalio.lib.i18n import schema_i18n_labels, defaults_i18n_labels
from chrysalio.lib.i18n import view_i18n_labels, edit_i18n_labels
from chrysalio.lib.utils import make_id, encrypt, size_label
from chrysalio.lib.utils import deltatime_label
from chrysalio.lib.xml import i18n_xml_text, db2xml_i18n_labels
from chrysalio.lib.form import Form
from chrysalio.lib.filter import Filter
from chrysalio.lib.paging import Paging
from chrysalio.helpers.builder import Builder
from chrysalio.helpers.literal import Literal
from chrysalio.models import ID_LEN, LABEL_LEN, DESCRIPTION_LEN
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.field import default_fields, available_fields, i18n_field_display
from ..lib.i18n import _
from . import URL_LEN, PATH_LEN, CLASSES_LEN
WAREHOUSE_NORMALIZE = {'simple': _('simple'), 'strict': _('strict')}
WAREHOUSE_ACCESSES = {
'free': _('free'),
'readonly': _('read only'),
'restricted': _('restricted'),
'restricted-readonly': _('restricted and read only'),
'closed': _('closed')
}
FILE_RIGHTS_LABELS = OrderedDict(( # yapf: disable
('reader', _('reading')), ('writer', _('writing')),
('writer-admin', _('administration'))))
META_RIGHTS_LABELS = OrderedDict(
(('reader', _('reading')), ('writer', _('writing'))))
DOWNLOAD_MAX_SIZE = 10485760 # 10 MiB
DOWNLOAD_MAX_SIZE_HINT = _('in bytes (ex. 10485760 = 10Mio, 0 = no limit)')
LOCK_TTL = 600
LOCK_TTL_HINT = _('in seconds, default: 600 (10 minutes)')
REFRESH_PERIOD = 360
REFRESH_PERIOD_HINT = \
_('in minutes, default: 360 (6 hours), 0 (no automatic refresh)')
CORE_DISPLAY = {
'file_name': (1, ''),
'file_size': (2, 'cioInfo cioOptional'),
'file_date': (3, 'cioInfo cioOptional')
}
# =============================================================================
[docs]
class DBWarehouse(DBDeclarativeClass, DBBaseClass):
"""SQLAlchemy-powered warehouse class."""
suffix = 'ciowrh2'
attachments_dir = 'Warehouses'
_settings_tabs = (
_('Information'), _('Display'), _('Metadata'), _('Seeds'),
_('Actions'), _('Authorized users'), _('Authorized groups'))
__tablename__ = 'wrh2_warehouses'
__table_args__ = {'mysql_engine': 'InnoDB'}
warehouse_id = Column(String(ID_LEN), primary_key=True)
i18n_label = Column(Text(), nullable=False)
i18n_description = Column(PickleType(1))
attachments_key = Column(String(ID_LEN + 20))
picture = Column(String(ID_LEN + 4))
location = Column(String(ID_LEN), nullable=False)
vcs = Column(Boolean(name='vcs'), default=False)
vcs_url = Column(String(URL_LEN))
vcs_user = Column(String(ID_LEN))
vcs_password = Column(String(128))
normalize = Column(
Enum(*WAREHOUSE_NORMALIZE.keys(), name='normalize_enum'),
default='simple')
access = Column(
Enum(*WAREHOUSE_ACCESSES.keys(), name='wrh2_access_enum'),
nullable=False)
visible_url = Column(String(URL_LEN))
download_max_size = Column(Integer, default=DOWNLOAD_MAX_SIZE)
lock_ttl = Column(Integer, default=LOCK_TTL)
refresh_period = Column(Integer, default=REFRESH_PERIOD)
rendering_dir = Column(String(PATH_LEN))
cardfields = relationship(
'DBWarehouseCardField',
order_by='DBWarehouseCardField.position',
cascade='all, delete')
listfields = relationship(
'DBWarehouseListField',
order_by='DBWarehouseListField.position',
cascade='all, delete')
metafields = relationship(
'DBWarehouseMetaField',
order_by='DBWarehouseMetaField.position',
cascade='all, delete')
seeds = relationship('DBWarehouseSeed', cascade='all, delete')
jobs = relationship(
DBJob,
secondary='wrh2_warehouses_jobs',
order_by='desc(DBJob.priority)')
users = relationship('DBWarehouseUser', cascade='all, delete')
groups = relationship('DBWarehouseGroup', cascade='all, delete')
# -------------------------------------------------------------------------
[docs]
@classmethod
def xml2db(
cls,
dbsession: Session,
warehouse_elt: etree.Element,
error_if_exists: bool = True,
kwargs: dict | None = None) -> str | None:
"""Load a warehouse from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
Warehouse XML element.
:param bool error_if_exists: (default=True)
It returns an error if user warehouse already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# Check if already exists
warehouse_id = make_id(warehouse_elt.get('id'), 'standard', ID_LEN)
dbwarehouse = dbsession.query(cls).filter_by(
warehouse_id=warehouse_id).first()
if dbwarehouse is not None:
if error_if_exists:
return _(
'Warehouse "${w}" already exists.', {'w': warehouse_id})
return None
# Create warehouse
record = cls.record_from_xml(warehouse_id, warehouse_elt)
err = cls.record_format(record)
if err:
return err
dbwarehouse = cls(**record)
dbsession.add(dbwarehouse)
# Fill extra tables
return dbwarehouse.xml2db_extra(dbsession, warehouse_elt, kwargs)
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def xml2db_extra_users_and_groups(
self, dbsession: Session, warehouse_elt: etree.Element,
kwargs: dict | None):
"""Load users and groups from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
User XML element.
:param dict kwargs:
Dictionary of keyword arguments with the key ``'profiles'``.
"""
warehouse_id = self.warehouse_id
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
# Add users
refs = {} if kwargs is None else kwargs.get('users', {})
done = set()
for elt in warehouse_elt.xpath('ns:users/ns:user',
namespaces={'ns': namespace}):
item_id = refs.get(elt.text)
# pylint: disable = too-many-boolean-expressions
if item_id is not None and item_id not in done and (
elt.get('file') or elt.get('meta')
or elt.get('favorite') == 'true'
or elt.get('in-menu') == 'true'):
done.add(item_id)
file_rights = elt.get('file') or (
'reader' if elt.get('meta') == 'writer' else None)
meta_rights = 'writer' if file_rights == 'writer-admin' else \
elt.get('meta', 'reader' if file_rights else None)
dbsession.add(
DBWarehouseUser(
warehouse_id=warehouse_id,
user_id=item_id,
file_rights=file_rights,
meta_rights=meta_rights,
favorite=elt.get('favorite') == 'true' or None,
in_menu=elt.get('in-menu') == 'true' or None))
# Add groups
refs = () if kwargs is None else kwargs.get('groups', ())
done = set()
for elt in warehouse_elt.xpath('ns:groups/ns:group',
namespaces={'ns': namespace}):
item_id = elt.text
if item_id and item_id in refs and item_id not in done:
done.add(item_id)
is_admin = elt.get('file') == 'writer-admin'
dbsession.add(
DBWarehouseGroup(
warehouse_id=warehouse_id,
group_id=item_id,
file_rights=elt.get('file', 'reader'),
meta_rights='writer' if is_admin else elt.get(
'meta', 'reader')))
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_from_xml(
cls, warehouse_id: str, warehouse_elt: etree.Element) -> dict:
"""Convert a warehouse XML element into a dictionary.
:param str warehouse_id:
Warehouse ID.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
Warehouse XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
attachments_elt = warehouse_elt.find(f'{{{namespace}}}attachments')
vcs_elt = warehouse_elt.find(f'{{{namespace}}}vcs')
return { # yapf: disable
'warehouse_id': warehouse_id,
'i18n_label': dumps(i18n_xml_text(
warehouse_elt, 'ns:label', {'ns': namespace})),
'i18n_description': i18n_xml_text(
warehouse_elt, 'ns:description', {'ns': namespace}),
'attachments_key':
attachments_elt is not None and attachments_elt.get(
'key') or None,
'picture':
attachments_elt is not None and attachments_elt.findtext(
f'{{{namespace}}}picture') or None,
'location': warehouse_elt.findtext(
f'{{{namespace}}}location'),
'vcs': vcs_elt is not None,
'vcs_url': (vcs_elt is not None and vcs_elt.findtext(
f'{{{namespace}}}url')) or warehouse_elt.findtext(
f'{{{namespace}}}no-vcs'),
'vcs_user': vcs_elt is not None and vcs_elt.findtext(
f'{{{namespace}}}user'),
'vcs_password': vcs_elt is not None and vcs_elt.findtext(
f'{{{namespace}}}password'),
'normalize': warehouse_elt.findtext(
f'{{{namespace}}}normalize'),
'access': warehouse_elt.findtext(
f'{{{namespace}}}access'),
'visible_url': warehouse_elt.findtext(
f'{{{namespace}}}visible-url'),
'download_max_size': int(
warehouse_elt.findtext(
f'{{{namespace}}}download-max-size') or DOWNLOAD_MAX_SIZE),
'lock_ttl': int(warehouse_elt.findtext(
f'{{{namespace}}}lock-ttl') or LOCK_TTL),
'refresh_period': int(warehouse_elt.findtext(
f'{{{namespace}}}refresh-period') or REFRESH_PERIOD),
'rendering_dir': warehouse_elt.findtext(
f'{{{namespace}}}rendering-dir')
}
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def db2xml(self, dbsession: Session) -> etree.Element:
"""Serialize a warehouse to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
warehouse_elt = etree.Element('warehouse')
warehouse_elt.set('id', self.warehouse_id)
# Labels and descriptions
db2xml_i18n_labels(self, warehouse_elt, 6)
# Attachment
if self.attachments_key:
elt = etree.SubElement(
warehouse_elt, 'attachments', key=self.attachments_key)
if self.picture:
etree.SubElement(elt, 'picture').text = self.picture
# Files
self._db2xml_files(warehouse_elt)
# Max size for a download
if self.download_max_size is not None:
etree.SubElement(warehouse_elt, 'download-max-size').text = \
str(self.download_max_size)
# Lock TTL
if self.lock_ttl != LOCK_TTL:
etree.SubElement(warehouse_elt, 'lock-ttl').text = \
str(self.lock_ttl)
# Refresh period
if self.refresh_period != REFRESH_PERIOD:
etree.SubElement(warehouse_elt,
'refresh-period').text = str(self.refresh_period)
# Rendering directory
if self.rendering_dir:
etree.SubElement(warehouse_elt, 'rendering-dir').text = \
self.rendering_dir
# Fields
self._db2xml_fields(warehouse_elt)
# Seeds
self._db2xml_seeds(warehouse_elt)
# Jobs
self._db2xml_jobs(warehouse_elt)
# Users and groups
self._db2xml_users_and_groups(dbsession, warehouse_elt)
return warehouse_elt
# -------------------------------------------------------------------------
def _db2xml_files(self, warehouse_elt: etree.Element):
"""Add file information to the warehouse serialization.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
XML warehouse to complete.
"""
# Vcs
etree.SubElement(warehouse_elt, 'location').text = self.location
if self.vcs:
elt = etree.SubElement(warehouse_elt, 'vcs')
if self.vcs_url:
etree.SubElement(elt, 'url').text = self.vcs_url
if self.vcs_user:
etree.SubElement(elt, 'user').text = self.vcs_user
if self.vcs_password:
etree.SubElement(elt, 'password').text = \
self.vcs_password
elif self.vcs_url:
etree.SubElement(warehouse_elt, 'no-vcs').text = self.vcs_url
# File name normalization
if self.normalize:
etree.SubElement(warehouse_elt, 'normalize').text = self.normalize
# File access
etree.SubElement(warehouse_elt, 'access').text = self.access
# Visible URL
if self.visible_url:
etree.SubElement(warehouse_elt, 'visible-url').text = \
self.visible_url
# -------------------------------------------------------------------------
def _db2xml_fields(self, warehouse_elt: etree.Element):
"""Add fields to the warehouse serialization.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
XML warehouse to complete.
"""
# Fields for cards
if self.cardfields:
group_elt = etree.SubElement(warehouse_elt, 'card-fields')
for dbitem in self.cardfields:
elt = etree.SubElement(group_elt, 'field')
elt.text = dbitem.field_id
if dbitem.classes:
elt.set('classes', dbitem.classes)
# Fields for list
if self.listfields:
group_elt = etree.SubElement(warehouse_elt, 'list-fields')
for dbitem in self.listfields:
elt = etree.SubElement(group_elt, 'field')
elt.text = dbitem.field_id
if dbitem.classes:
elt.set('classes', dbitem.classes)
# Metadata fields
if self.metafields:
elt = etree.SubElement(warehouse_elt, 'meta-fields')
for dbitem in self.metafields:
etree.SubElement(elt, 'field').text = dbitem.field_id
# -------------------------------------------------------------------------
def _db2xml_seeds(self, warehouse_elt: etree.Element):
"""Add seeds to the warehouse serialization.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
XML warehouse to complete.
"""
if not self.seeds:
return
seeds_elt = etree.SubElement(warehouse_elt, 'seeds')
for dbseed in self.seeds:
etree.SubElement(seeds_elt, 'seed').text = dbseed.seed_id
# -------------------------------------------------------------------------
def _db2xml_jobs(self, warehouse_elt: etree.Element):
"""Add jobs to the warehouse serialization.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
XML warehouse to complete.
"""
if not self.jobs:
return
jobs_elt = etree.SubElement(warehouse_elt, 'jobs')
for dbjob in self.jobs:
etree.SubElement(jobs_elt, 'job').text = dbjob.job_id
# -------------------------------------------------------------------------
def _db2xml_users_and_groups(
self, dbsession: Session, warehouse_elt: etree.Element):
"""Add users and groups to the warehouse serialization.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type warehouse_elt: lxml.etree.Element
:param warehouse_elt:
XML warehouse to complete.
"""
if self.users:
users = {}
users_elt = etree.SubElement(warehouse_elt, 'users')
for dbuser in self.users:
if dbuser.user_id not in users:
users[dbuser.user_id] = dbsession.query( # type: ignore
DBUser.login).filter_by(
user_id=dbuser.user_id).first()[0]
subelt = etree.SubElement(users_elt, 'user')
subelt.text = users[dbuser.user_id]
if dbuser.file_rights:
subelt.set('file', str(dbuser.file_rights))
if dbuser.meta_rights == 'writer':
subelt.set('meta', str(dbuser.meta_rights))
if dbuser.favorite:
subelt.set('favorite', 'true')
if dbuser.in_menu:
subelt.set('in-menu', 'true')
if self.groups:
groups_elt = etree.SubElement(warehouse_elt, 'groups')
for dbgroup in self.groups:
subelt = etree.SubElement(groups_elt, 'group')
subelt.text = dbgroup.group_id
subelt.set('file', str(dbgroup.file_rights))
if dbgroup.meta_rights != 'reader' and \
dbgroup.file_rights != 'writer-admin':
subelt.set('meta', str(dbgroup.meta_rights))
# -------------------------------------------------------------------------
[docs]
def tab4view(
self, request: Request, tab_index: int, form: Form,
user_filter: Filter, user_paging: Paging) -> str:
"""Generate the tab content of a warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param int index:
Index of the tab.
:type form: .lib.form.Form
:param form:
Current form object.
:type user_filter: chrysalio.lib.filter.Filter
:param user_filter:
Filter for users.
:type user_paging: chrysalio.lib.paging.Paging
:param user_paging:
Paging for warehouse users.
:rtype: chrysalio.helpers.literal.Literal
"""
# pylint: disable = too-many-return-statements
if tab_index == 0:
return self._tab4view_information(request, form)
if tab_index == 1:
return self._tab4view_displayfields(request)
if tab_index == 2:
return self._tab4view_metafields(request)
if tab_index == 3:
return self._tab4view_seeds(request)
if tab_index == 4:
return self._tab4view_jobs(request)
if tab_index == 5:
return self._tab4view_users(
request, form, user_filter, user_paging)
if tab_index == 6:
return self._tab4view_groups(request)
return ''
# -------------------------------------------------------------------------
def _tab4view_information(self, request: Request, form: Form) -> str:
"""Generate the information tab.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
html = form.grid_item(
translate(_('Identifier:')), self.warehouse_id, clear=True)
html += view_i18n_labels(request, form, self)
html += form.grid_item(
translate(_('Location:')), self.location, clear=True)
html += form.grid_item(
translate(_('Versioning:')),
translate(_('yes')) if self.vcs else '',
clear=True)
html += form.grid_item(
translate(_('VCS Remote URL:')),
self.vcs_url if self.vcs else '',
clear=True)
html += form.grid_item(
translate(_('VCS User:')),
self.vcs_user if self.vcs else '',
clear=True)
html += form.grid_item(
translate(_('Normalization:')),
translate(WAREHOUSE_NORMALIZE.get(str(self.normalize)))
if self.normalize else None,
clear=True)
html += form.grid_item(
translate(_('Access:')),
translate(WAREHOUSE_ACCESSES.get(str(self.access))),
clear=True)
html += form.grid_item(
translate(_('Visible URL:')), self.visible_url, clear=True)
if self.download_max_size > 0:
html += form.grid_item(
translate(_('Maximum size of download:')),
size_label(self.download_max_size),
clear=True)
if self.lock_ttl != LOCK_TTL:
html += form.grid_item(
translate(_('Lock TTL:')),
deltatime_label(self.lock_ttl),
clear=True)
if self.refresh_period != REFRESH_PERIOD:
html += form.grid_item(
translate(_('Refreshment every:')),
deltatime_label(60 * self.refresh_period),
clear=True)
if self.rendering_dir:
html += form.grid_item(
translate(_('Rendering directory:')),
self.rendering_dir,
clear=True)
return html
# -------------------------------------------------------------------------
def _tab4view_displayfields(self, request: Request) -> str:
"""Generate the tab to describe how to display files with cards, as
list.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
fields = request.registry['fields']
cardfields = []
for dbitem in self.cardfields:
if dbitem.field_id in fields and ( # aypf: disable
(fields[dbitem.field_id]['stored'] and
not fields[dbitem.field_id]['hidden'])
or dbitem.field_id in CORE_DISPLAY):
cardfields.append((dbitem.field_id, dbitem.classes))
listfields = []
for dbitem in self.listfields:
if dbitem.field_id in fields and ( # aypf: disable
(fields[dbitem.field_id]['stored'] and
not fields[dbitem.field_id]['hidden'])
or dbitem.field_id in CORE_DISPLAY):
listfields.append((dbitem.field_id, dbitem.classes))
cardshtml = '<h3>{title}</h3>\n'\
'<table class="cioPagingList">\n<thead>\n'\
'<tr><th>{label}</th><th>{display}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
title=translate(_('Display as cards')),
label=translate(_('Label')),
display=translate(_('Type')))
for field in cardfields or default_fields(fields, 'in_cards'):
cardshtml += \
'<tr><td><strong>{label}</strong></td>'\
'<td>{display}</td></tr>\n'.format(
label=translate_field(
request, fields[field[0]]['label'], field[0]),
display=i18n_field_display(request, fields[field[0]]))
cardshtml += '</tbody>\n</table>\n'
listhtml = '<h3>{title}</h3>\n'\
'<table class="cioPagingList">\n<thead>\n'\
'<tr><th>{label}</th><th>{display}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
title=translate(_('Display as list')),
label=translate(_('Label')),
display=translate(_('Type')))
for field in listfields or default_fields(fields, 'in_list'):
listhtml += \
'<tr><td><strong>{label}</strong></td>'\
'<td>{display}</td></tr>\n'.format(
label=translate_field(
request, fields[field[0]]['label'], field[0]),
display=i18n_field_display(request, fields[field[0]]))
listhtml += '</tbody>\n</table>\n'
return Literal(
'<div class="cioFlexRowContainer">\n'
f'<section class="cioFlexItem">{cardshtml}</section>\n'
f'<section class="cioFlexItem">{listhtml}</section>\n'
'</div>\n')
# -------------------------------------------------------------------------
def _tab4view_metafields(self, request: Request) -> str:
"""Generate the metadata fields tab.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
fields = request.registry['fields']
metafields = []
for dbitem in self.metafields:
if dbitem.field_id in fields \
and fields[dbitem.field_id]['stored'] \
and not fields[dbitem.field_id]['hidden']:
metafields.append(dbitem.field_id)
html = '<table class="cioPagingList">\n<thead>\n'\
'<tr><th>{label}</th><th>{display}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
display=translate(_('Type')))
for field_id in metafields or default_fields(fields, 'in_meta'):
html += \
'<tr><td><strong>{label}</strong></td>'\
'<td>{display}</td></tr>\n'.format(
label=translate_field(
request, fields[field_id]['label'], field_id),
display=i18n_field_display(request, fields[field_id]))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
def _tab4view_seeds(self, request: Request) -> str:
"""Generate the seeds tab.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: chrysalio.helpers.literal.Literal
"""
seeders = request.registry['seeders']
seeds = request.registry['seeds']
html = ''
for dbseed in self.seeds:
seed = seeds.get(dbseed.seed_id)
if seed is not None and not seed.get('core') \
and seed['seeder'] in seeders:
html += Builder().div(
'✔ {}'.format(
translate_field(
request, seed['label'], dbseed.seed_id)))
return html if html else _('No seed.')
# -------------------------------------------------------------------------
def _tab4view_jobs(self, request: Request) -> str:
"""Generate the jobs tab.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if not request.registry.get('services') or not self.jobs:
return translate(_('No action.'))
html = '<table>\n<thead>\n'\
'<tr><th>{label}</th><th>{description}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
description=translate(_('Description')))
for dbjob in self.jobs:
html += \
'<tr><td><a href="{ref}"><strong>{label}</strong></a></td>'\
'<td>{description}</td></tr>\n'.format(
ref=request.route_path(
'job_view', job_id=dbjob.job_id),
label=dbjob.label(request),
description=dbjob.description(request))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
def _tab4view_users(
self, request: Request, form: Form, user_filter: Filter,
user_paging: Paging) -> str:
"""Generate the users tab.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:type user_filter: chrysalio.lib.filter.Filter
:param user_filter:
Filter for users.
:type user_paging: chrysalio.lib.paging.Paging
:param user_paging:
Paging for warehouse users.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if self.access == 'free':
return translate(_('Access to this warehouse is not restricted.'))
if self.access == 'closed':
return translate(_('This warehouse is closed.'))
if not self.users:
return translate(_('No user is authorized.'))
html = DBUser.paging_filter(request, form, user_filter, user_paging)
html += self._user_thead(request, user_paging)
users = {
k.user_id: (k.file_rights, k.meta_rights, k.favorite, k.in_menu)
for k in self.users
}
for dbuser in user_paging:
html += \
'<tr><th><a href="{user_view}">{login}</a></th>'\
'<td class="cioOptional">{fname}</td><td>{lname}</td>'\
'<td class="cioSelect">{file_rights}</td>'\
'<td class="cioSelect">{meta_rights}</td>'\
'<td class="cioBoolean">{favorite}</td>'\
'<td class="cioBoolean">{in_menu}</td></tr>\n'.format(
user_view=request.route_path(
'user_view', user_id=dbuser.user_id),
login=dbuser.login,
fname=dbuser.first_name or '',
lname=dbuser.last_name,
file_rights=translate(
FILE_RIGHTS_LABELS[users[dbuser.user_id][0]]
if users[dbuser.user_id][0] else ''),
meta_rights=translate(
META_RIGHTS_LABELS[users[dbuser.user_id][1]]
if users[dbuser.user_id][1] else ''),
favorite='✔' if users[dbuser.user_id][2] else '',
in_menu='✔' if users[dbuser.user_id][3] else '')
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
def _tab4view_groups(self, request: Request) -> str:
"""Generate the users tab.
:type request: pyramid.request.Request
:param request:
Current request.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if self.access == 'free':
return translate(_('Access to this warehouse is not restricted.'))
if self.access == 'closed':
return translate(_('This warehouse is closed.'))
if not self.groups:
return translate(_('No group is authorized.'))
dbgroups = request.dbsession.query(DBGroup).filter(
DBGroup.group_id.in_([k.group_id
for k in self.groups])).order_by('group_id')
groups = {
k.group_id: (k.file_rights, k.meta_rights)
for k in self.groups
}
html = '<table>\n<thead>\n'\
'<tr><th>{label}</th><th>{description}</th>'\
'<th class="cioSelect">{file_rights}</th>'\
'<th class="cioSelect">{meta_rights}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
description=translate(_('Description')),
file_rights=translate(_('File rights')),
meta_rights=translate(_('Metadata rights')))
for dbgroup in dbgroups:
html += \
'<tr><th><a href="{group_view}">{label}</a></th>'\
'<td>{description}</td>'\
'<td class="cioSelect">{file_rights}</td>'\
'<td class="cioSelect">{meta_rights}</td></tr>\n'.format(
group_view=request.route_path(
'group_view', group_id=dbgroup.group_id),
label=dbgroup.label(request),
description=dbgroup.description(request),
file_rights=translate(
FILE_RIGHTS_LABELS[groups[dbgroup.group_id][0]]),
meta_rights=translate(
META_RIGHTS_LABELS[groups[dbgroup.group_id][1]]))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
[docs]
@classmethod
def settings_schema(
cls,
request: Request,
defaults: dict,
groups: dict,
jobs: dict,
dbwarehouse: DBWarehouse | None = None
) -> tuple[colander.SchemaNode, dict]:
"""Return a Colander schema to edit a warehouse.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict defaults:
Default values for the form set by the user paging object.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:param dict jobs:
A dictionary such as ``{job_id: (label, description),...}``.
:type dbwarehouse: DBWarehouse
:param dbwarehouse: (optional)
Current user warehouse SqlAlchemy object.
:rtype: tuple
:return:
A tuple such as ``(schema, defaults)``.
"""
# pylint: disable = too-many-branches
# Schema
schema = colander.SchemaNode(colander.Mapping())
if dbwarehouse is None:
schema.add(colander.SchemaNode( # yapf: disable
colander.String(), name='warehouse_id',
validator=colander.Length(min=2, max=ID_LEN)))
# yapf: disable
locations = request.registry['modules']['ciowarehouse2'].locations
schema_i18n_labels(request, schema, LABEL_LEN, DESCRIPTION_LEN)
if dbwarehouse is None:
schema.add(colander.SchemaNode(
colander.String(), name='location',
validator=colander.OneOf(locations.keys())))
schema.add(colander.SchemaNode(
colander.Boolean(), name='vcs', missing=False))
schema.add(colander.SchemaNode(
colander.String(), name='vcs_url',
validator=colander.Length(max=URL_LEN), missing=None))
schema.add(colander.SchemaNode(
colander.String(), name='vcs_user',
validator=colander.Length(max=ID_LEN), missing=None))
schema.add(colander.SchemaNode(
colander.String(), name='vcs_password',
validator=colander.Length(max=64), missing=None))
schema.add(colander.SchemaNode(
colander.String(), name='normalize',
validator=colander.OneOf(WAREHOUSE_NORMALIZE.keys()),
missing=None))
schema.add(colander.SchemaNode(
colander.String(), name='access',
validator=colander.OneOf(WAREHOUSE_ACCESSES.keys())))
schema.add(colander.SchemaNode(
colander.String(), name='visible_url',
validator=colander.Length(max=URL_LEN), missing=None))
schema.add(colander.SchemaNode(
colander.Integer(), name='download_max_size',
validator=colander.Range(min=0), missing=DOWNLOAD_MAX_SIZE))
schema.add(colander.SchemaNode(
colander.Integer(), name='lock_ttl',
validator=colander.Range(min=0), missing=LOCK_TTL))
schema.add(colander.SchemaNode(
colander.Integer(), name='refresh_period',
validator=colander.Range(min=0), missing=REFRESH_PERIOD))
schema.add(colander.SchemaNode(
colander.String(), name='rendering_dir',
validator=colander.Regex(
r'^(/?[a-zA-Z0-9_\-]+:)?[a-zA-Z0-9_.\-/]+$'), missing=None))
# yapf: enable
# Fields
fields = request.registry['fields']
for uid in available_fields(fields, 'in_cards'):
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'fcrd:{uid}', missing=False))
for uid in available_fields(fields, 'in_list'):
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'flst:{uid}', missing=False))
for uid in available_fields(fields, 'in_meta'):
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'fmta:{uid}', missing=False))
# Seeds
for uid in request.registry['seeds']:
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'sed:{uid}', missing=False))
# Jobs
for uid in jobs:
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'job:{uid}', missing=False))
# Groups
for uid in groups:
schema.add(colander.SchemaNode( # yapf: disable
colander.Boolean(), name=f'grp:{uid}', missing=False))
# Defaults
if dbwarehouse is None:
defaults.update({ # yapf: disable
'vcs': True,
'access': 'free',
'download_max_size': DOWNLOAD_MAX_SIZE,
'lock_ttl': LOCK_TTL,
'refresh_period': REFRESH_PERIOD})
else:
defaults.update(defaults_i18n_labels(dbwarehouse))
for dbitem in dbwarehouse.cardfields:
defaults[f'fcrd:{dbitem.field_id}'] = True
for dbitem in dbwarehouse.listfields:
defaults[f'flst:{dbitem.field_id}'] = True
for dbitem in dbwarehouse.metafields:
defaults[f'fmta:{dbitem.field_id}'] = True
for dbitem in dbwarehouse.seeds:
defaults[f'sed:{dbitem.seed_id}'] = True
for dbitem in dbwarehouse.jobs:
defaults[f'job:{dbitem.job_id}'] = True
for dbitem in dbwarehouse.users:
defaults[f'usr:{dbitem.user_id}'] = \
dbitem.file_rights is not None
defaults[f'ufil:{dbitem.user_id}'] = dbitem.file_rights
defaults[f'umta:{dbitem.user_id}'] = dbitem.meta_rights
defaults[f'fav:{dbitem.user_id}'] = dbitem.favorite
defaults[f'mnu:{dbitem.user_id}'] = dbitem.in_menu
for dbitem in dbwarehouse.groups:
defaults[f'grp:{dbitem.group_id}'] = True
defaults[f'gfil:{dbitem.group_id}'] = dbitem.file_rights
defaults[f'gmta:{dbitem.group_id}'] = dbitem.meta_rights
return schema, defaults
# -------------------------------------------------------------------------
[docs]
@classmethod
def tab4edit(
cls,
request: Request,
tab_index: int,
form: Form,
user_filter: Filter,
user_paging: Paging,
groups: dict,
jobs: dict,
dbwarehouse: DBWarehouse | None = None) -> str:
"""Generate the tab content of user warehouse for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:param int tab_index:
Index of the tab.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:type user_filter: chrysalio.lib.filter.Filter
:param user_filter:
Filter for users.
:type user_paging: chrysalio.lib.paging.Paging
:param user_paging:
Paging for all users.
:param dict jobs:
A dictionary such as ``{job_id: (label, description),...}``.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:type dbwarehouse: DBWarehouse
:param dbwarehouse: (optional)
Current user warehouse SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
# pylint: disable = too-many-arguments, too-many-positional-arguments
# pylint: disable = too-many-return-statements
if tab_index == 0:
return cls._tab4edit_information(request, form, dbwarehouse)
if tab_index == 1:
return cls._tab4edit_displayfields(request, form)
if tab_index == 2:
return cls._tab4edit_metafields(request, form)
if tab_index == 3:
return cls._tab4edit_seeds(request, form)
if tab_index == 4:
return cls._tab4edit_jobs(request, form, jobs)
if tab_index == 5:
return cls._tab4edit_users(
request, form, user_filter, user_paging, dbwarehouse)
if tab_index == 6:
return cls._tab4edit_groups(request, form, groups, dbwarehouse)
return ''
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_information(
cls, request: Request, form: Form,
dbwarehouse: DBWarehouse | None) -> str:
"""Generate the information tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:type dbwarehouse: DBWarehouse
:param dbwarehouse:
Current user warehouse SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
# yapf: disable
translate = request.localizer.translate
if dbwarehouse is None:
html = form.grid_text(
'warehouse_id', translate(_('Identifier:')), required=True,
maxlength=ID_LEN, clear=True)
else:
html = form.grid_item(
translate(_('Identifier:')), dbwarehouse.warehouse_id,
clear=True)
html += edit_i18n_labels(request, form, LABEL_LEN, DESCRIPTION_LEN)
if dbwarehouse is None:
locations = request.registry['modules']['ciowarehouse2'].locations
html += form.grid_select(
'location', translate(_('Location:')),
[('', ' ')] + list(locations.keys()), required=True,
clear=True)
html += form.grid_custom_checkbox(
'vcs', translate(_('Versioning:')), clear=True)
html += form.grid_text(
'vcs_url', translate(_('VCS Remote URL:')), maxlength=URL_LEN,
clear=True)
else:
html += form.grid_item(
translate(_('Location:')), dbwarehouse.location, clear=True)
html += form.grid_item(
translate(_('Versioning:')),
translate(_('yes')) if dbwarehouse.vcs else '', clear=True)
html += form.grid_item(
translate(_('VCS Remote URL:')),
dbwarehouse.vcs_url if dbwarehouse.vcs else '',
clear=True)
if dbwarehouse is None or dbwarehouse.vcs:
html += form.grid_text(
'vcs_user', translate(_('VCS User:')), maxlength=ID_LEN,
clear=True)
html += form.grid_password(
'vcs_password', translate(_('VCS Password:')), maxlength=64,
clear=True)
html += form.grid_select(
'normalize', translate(_('Normalization:')),
[('', ' ')] + list(WAREHOUSE_NORMALIZE.items()), clear=True)
html += form.grid_select(
'access', translate(_('Access:')),
WAREHOUSE_ACCESSES.items(), required=True, clear=True)
html += form.grid_text(
'visible_url', translate(_('Visible URL:')), maxlength=URL_LEN,
clear=True)
html += form.grid_text(
'download_max_size', translate(_('Maximum size of download:')),
maxlength=10, hint=translate(DOWNLOAD_MAX_SIZE_HINT), clear=True)
html += form.grid_text(
'lock_ttl', translate(_('Lock TTL:')), maxlength=10,
hint=translate(LOCK_TTL_HINT), clear=True)
html += form.grid_text(
'refresh_period', translate(_('Refreshment every:')), maxlength=10,
hint=translate(REFRESH_PERIOD_HINT), clear=True)
html += form.grid_text(
'rendering_dir', translate(_('Rendering directory:')),
maxlength=PATH_LEN, clear=True)
# yapf: enable
return html
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_displayfields(cls, request: Request, form: Form) -> str:
"""Generate the tab to edit how to display files as cards, as list.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
fields = request.registry['fields']
cardshtml = '<h3>{title}</h3>\n'\
'<table class="cioPagingList">\n<thead>\n'\
'<tr><th></th><th>{label}</th>'\
'<th>{display}</th></tr>\n</thead>\n<tbody>\n'.format(
title=translate(_('Display as cards')),
label=translate(_('Label')),
display=translate(_('Type')))
for field_id in available_fields(fields, 'in_cards'):
cardshtml += \
'<tr><td class="cioCheckbox">{selected}</td>'\
'<td><label for="{uid}"{style}>'\
'<strong>{label}</strong></label></td>'\
'<td{style}>{display}</td></tr>\n'.format(
selected=form.custom_checkbox(f'fcrd:{field_id}'),
uid=f'fcrd{field_id}',
style='' if fields[field_id]['in_cards']
else ' class="cioInfo"',
label=translate_field(
request, fields[field_id]['label'], field_id),
display=i18n_field_display(request, fields[field_id]))
cardshtml += '</tbody>\n</table>\n'
listhtml = '<h3>{title}</h3>\n'\
'<table class="cioPagingList">\n<thead>\n'\
'<tr><th></th><th>{label}</th>'\
'<th>{display}</th></tr>\n</thead>\n<tbody>\n'.format(
title=translate(_('Display as list')),
label=translate(_('Label')),
display=translate(_('Type')))
for field_id in available_fields(fields, 'in_list'):
listhtml += \
'<tr><td class="cioCheckbox">{selected}</td>'\
'<td><label for="{uid}"{style}>'\
'<strong>{label}</strong></label></td>'\
'<td{style}>{display}</td></tr>\n'.format(
selected=form.custom_checkbox(f'flst:{field_id}'),
uid=f'flst{field_id}',
style='' if fields[field_id]['in_cards']
else ' class="cioInfo"',
label=translate_field(
request, fields[field_id]['label'], field_id),
display=i18n_field_display(request, fields[field_id]))
listhtml += '</tbody>\n</table>\n'
return Literal(
'<div class="cioFlexRowContainer">\n'
f'<section class="cioFlexItem">{cardshtml}</section>\n'
f'<section class="cioFlexItem">{listhtml}</section>\n'
'</div>\n')
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_metafields(cls, request: Request, form: Form) -> str:
"""Generate the metadata fields tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
fields = request.registry['fields']
html = '<table class="cioPagingList">\n<thead>\n'\
'<tr><th></th><th>{label}</th>'\
'<th>{description}</th></tr>\n</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
description=translate(_('Description')))
for field_id in available_fields(fields, 'in_meta'):
html += \
'<tr><td class="cioCheckbox">{selected}</td>'\
'<td><label for="{uid}"{style}>'\
'<strong>{label}</strong></label></td>'\
'<td{style}>{display}</td></tr>\n'.format(
selected=form.custom_checkbox(f'fmta:{field_id}'),
uid=f'fmta{field_id}',
style='' if fields[field_id]['in_meta']
else ' class="cioInfo"',
label=translate_field(
request, fields[field_id]['label'], field_id),
display=i18n_field_display(request, fields[field_id]))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_seeds(cls, request: Request, form: Form) -> str:
"""Generate the seeds tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
html = ''
seeds = request.registry['seeds']
seeders = request.registry['seeders']
for seed_id, seed in seeds.items():
if seed.get('core') or seed['seeder'] not in seeders:
continue
html += Builder().div(
form.custom_checkbox(f'sed:{seed_id}') + Literal(
f' <label for="sed{seed_id}">'
'{}</label>'.format(
translate_field(request, seed['label'], seed_id))))
return html
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_jobs(cls, request: Request, form: Form, jobs: dict) -> str:
"""Generate the jobs tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:param dict jobs:
A dictionary such as ``{job_id: (label, description),...}``.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if not request.registry.get('services') or not jobs:
return translate(_('No available action.'))
html = '<table class="cioPagingList">\n<thead>\n'\
'<tr><th></th><th>{label}</th>'\
'<th>{description}</th></tr>\n</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
description=translate(_('Description')))
for job_id in jobs:
html += \
'<tr><td class="cioSelect">{selected}</td>'\
'<td><label for="{id}"><strong>{label}</strong></label></td>'\
'<td>{description}</td></tr>\n'.format(
selected=form.custom_checkbox(f'job:{job_id}'),
id=f'job{job_id}', label=jobs[job_id][0],
description=jobs[job_id][1])
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_users(
cls, request: Request, form: Form, user_filter: Filter,
user_paging: Paging, dbwarehouse: DBWarehouse | None) -> str:
"""Generate the user tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:type user_filter: chrysalio.lib.filter.Filter
:param user_filter:
Filter for users.
:type user_paging: chrysalio.lib.paging.Paging
:param user_paging:
Paging for all users.
:type dbwarehouse: .lib.models.DBWarehouse
:param dbwarehouse:
Current user warehouse SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if dbwarehouse and dbwarehouse.access not in ( # yapf: disable
'restricted', 'restricted-readonly'):
return translate(_('Access to this warehouse is not restricted.'))
html = DBUser.paging_filter(request, form, user_filter, user_paging)
html += cls._user_thead(request, user_paging).replace(
'<tr>', '<tr><th class="cioCheckbox" id="check_all"></th>')
for dbuser in user_paging:
html += \
'<tr><td class="cioCheckbox cioSelect">{check}{visible}</td>'\
'<td>{login}</td>'\
'<td class="cioOptional">{fname}</td><td>{lname}</td>'\
'<td class="cioSelect">{file_rights}</td>'\
'<td class="cioSelect">{meta_rights}</td>'\
'<td class="cioBoolean">{favorite}</td>'\
'<td class="cioBoolean">{in_menu}</td></tr>\n'.format(
check=form.custom_checkbox(f'usr:{dbuser.user_id}'),
visible=form.hidden(f'set:{dbuser.user_id}', '1'),
login=dbuser.login,
fname=dbuser.first_name or '',
lname=dbuser.last_name,
file_rights=form.select(
f'ufil:{dbuser.user_id}',
None, [('', ' ')] + list(FILE_RIGHTS_LABELS.items())),
meta_rights=form.select(
f'umta:{dbuser.user_id}',
None, [('', ' ')] + list(META_RIGHTS_LABELS.items())),
favorite=form.custom_checkbox(
f'fav:{dbuser.user_id}'),
in_menu=form.custom_checkbox(
f'mnu:{dbuser.user_id}'))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_groups(
cls, request: Request, form: Form, groups: dict,
dbwarehouse: DBWarehouse | None) -> str:
"""Generate the group tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:param dict groups:
A dictionary such as ``{group_id: (label, description),...}``.
:type dbwarehouse: DBWarehouse
:param dbwarehouse:
Current user warehouse SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if dbwarehouse and dbwarehouse.access not in ( # yapf: disable
'restricted', 'restricted-readonly'):
return translate(_('Access to this warehouse is not restricted.'))
html = '<table class="cioPagingList">\n<thead>\n'\
'<tr><th></th><th>{label}</th>'\
'<th>{description}</th>'\
'<th class="cioSelect">{file_rights}</th>'\
'<th class="cioSelect">{meta_rights}</th></tr>\n'\
'</thead>\n<tbody>\n'.format(
label=translate(_('Label')),
description=translate(_('Description')),
file_rights=translate(_('File rights')),
meta_rights=translate(_('Metadata rights')))
for group_id in groups:
html += \
'<tr><td class="cioCheckbox">{selected}</td>'\
'<th><label for="{uid}">{label}</label></th>'\
'<td>{description}</td>'\
'<td class="cioSelect">{file_rights}</td>'\
'<td class="cioSelect">{meta_rights}</td></tr>\n'.format(
selected=form.custom_checkbox(f'grp:{group_id}'),
uid=f'grp{group_id}',
label=groups[group_id][0],
description=groups[group_id][1],
file_rights=form.select(
f'gfil:{group_id}',
None, [('', ' ')] + list(FILE_RIGHTS_LABELS.items())),
meta_rights=form.select(
f'gmta:{group_id}',
None, [('', ' ')] + list(META_RIGHTS_LABELS.items())))
html += '</tbody>\n</table>\n'
return Literal(html)
# -------------------------------------------------------------------------
@classmethod
def _user_thead(cls, request: Request, user_paging: Paging) -> str:
"""Table header for warehouse users.
:type request: pyramid.request.Request
:param request:
Current request.
:type user_paging: chrysalio.lib.paging.Paging
:param user_paging:
Paging for users.
:rtype: str
"""
translate = request.localizer.translate
return \
'<table class="cioPagingList">\n<thead><tr>'\
'<th>{login}</th>'\
'<th class="cioOptional">{fname}</th><th>{lname}</th>'\
'<th class="cioSelect">{file_rights}</th>'\
'<th class="cioSelect">{meta_rights}</th>'\
'<th class="cioBoolean">{favorite}</th>'\
'<th class="cioBoolean">{in_menu}</th>'\
'</tr></thead>\n<tbody>\n'.format(
login=user_paging.sortable_column(
translate(_('Login')), 'login'),
fname=user_paging.sortable_column(
translate(_('First name')), 'first_name'),
lname=user_paging.sortable_column(
translate(_('Last name')), 'last_name'),
file_rights=translate(_('File rights')),
meta_rights=translate(_('Metadata rights')),
favorite=translate(_('Favorite')),
in_menu=translate(_('In menu')))
# =============================================================================
[docs]
class DBWarehouseCardField(DBDeclarativeClass):
"""Class to link warehouses with their card fields (many-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_cardfields'
__table_args__ = {'mysql_engine': 'InnoDB'}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True)
field_id = Column(
String(ID_LEN),
ForeignKey('wrh2_fields.field_id', ondelete='CASCADE'),
primary_key=True)
position = Column(Integer, default=0)
classes = Column(String(CLASSES_LEN))
# =============================================================================
[docs]
class DBWarehouseListField(DBDeclarativeClass):
"""Class to link warehouses with their list fields (many-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_listfields'
__table_args__ = {'mysql_engine': 'InnoDB'}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True)
field_id = Column(
String(ID_LEN),
ForeignKey('wrh2_fields.field_id', ondelete='CASCADE'),
primary_key=True)
position = Column(Integer, default=0)
classes = Column(String(CLASSES_LEN))
# =============================================================================
# =============================================================================
[docs]
class DBWarehouseSeed(DBDeclarativeClass):
"""Class to link warehouses with their seeds."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_seeds'
__table_args__ = {'mysql_engine': 'InnoDB'}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True,
nullable=False)
seed_id = Column(String(ID_LEN), primary_key=True)
# =============================================================================
[docs]
class DBWarehouseJob(DBDeclarativeClass):
"""Class to link warehouses with their jobs (one-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_jobs'
__table_args__ = {'mysql_engine': 'InnoDB'}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True)
job_id = Column(
String(ID_LEN),
ForeignKey('srv_jobs.job_id', ondelete='CASCADE'),
primary_key=True)
# =============================================================================
[docs]
class DBWarehouseUser(DBDeclarativeClass):
"""Class to link warehouses with their authorized users (many-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_users'
__table_args__ = {'mysql_engine': 'InnoDB'}
__mapper_args__ = {'confirm_deleted_rows': False}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True)
user_id = Column(
Integer,
ForeignKey('users.user_id', ondelete='CASCADE'),
primary_key=True)
file_rights = Column(
Enum(*FILE_RIGHTS_LABELS.keys(), name='file_rights_enum'))
meta_rights = Column(
Enum(*META_RIGHTS_LABELS.keys(), name='meta_rights_enum'))
favorite = Column(Boolean(name='favorite'), default=False)
in_menu = Column(Boolean(name='in_menu'), default=False)
# =============================================================================
[docs]
class DBWarehouseGroup(DBDeclarativeClass):
"""Class to link warehouses with their authorized groups (many-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_warehouses_groups'
__table_args__ = {'mysql_engine': 'InnoDB'}
__mapper_args__ = {'confirm_deleted_rows': False}
warehouse_id = Column(
String(ID_LEN),
ForeignKey('wrh2_warehouses.warehouse_id', ondelete='CASCADE'),
primary_key=True)
group_id = Column(
String(ID_LEN),
ForeignKey('groups.group_id', ondelete='CASCADE'),
primary_key=True)
file_rights = Column(
Enum(*FILE_RIGHTS_LABELS.keys(), name='file_rights_enum'),
default='reader')
meta_rights = Column(
Enum(*META_RIGHTS_LABELS.keys(), name='meta_rights_enum'),
default='reader')