"""SQLAlchemy-powered model definitions for seeds."""
from __future__ import annotations
from os import makedirs
from os.path import join, exists
from shutil import copy2
from json import dumps
from sqlalchemy import Column, ForeignKey, String, Text
from sqlalchemy.orm import Session, relationship
from lxml import etree
from colander import SchemaNode, Mapping, String as CoString, Regex
from colander import Length, All
from pyramid.request import Request
from chrysalio.lib.i18n import view_i18n_labels, edit_i18n_labels
from chrysalio.lib.i18n import schema_i18n_labels, defaults_i18n_labels
from chrysalio.lib.i18n import record_format_i18n
from chrysalio.lib.utils import make_id
from chrysalio.lib.xml import i18n_xml_text, db2xml_i18n_labels
from chrysalio.lib.form import Form
from chrysalio.lib.attachment import attachment_url
from chrysalio.helpers.builder import Builder
from chrysalio.models import ID_LEN, LABEL_LEN, VALUE_LEN
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from ..lib.i18n import _
from ..relaxng import RELAXNG_CIOWAREHOUSE2
SEEDER_LEN = 128
# =============================================================================
[docs]
class DBSeed(DBDeclarativeClass, DBBaseClass):
"""SQLAlchemy-powered seed class."""
# pylint: disable = too-many-instance-attributes
suffix = 'ciosed2'
attachments_dir = 'Seeds'
_settings_tabs = (_('Information'), _('Model'), _('Values'))
__tablename__ = 'wrh2_seeds'
__table_args__ = {'mysql_engine': 'InnoDB'}
seed_id = Column(String(ID_LEN), primary_key=True)
i18n_label = Column(Text(), nullable=False)
attachments_key = Column(String(ID_LEN + 20))
icon = Column(String(ID_LEN + 4))
model = Column(String(ID_LEN + 4))
seeder = Column(String(SEEDER_LEN), nullable=False)
values = relationship('DBSeedValue', cascade='all, delete')
# -------------------------------------------------------------------------
[docs]
@classmethod
def xml2db(
cls,
dbsession: Session,
seed_elt: etree.Element,
error_if_exists: bool = True,
kwargs=None) -> str | None:
"""Load a seed from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type seed_elt: lxml.etree.Element
:param seed_elt:
Seed XML element.
:param bool error_if_exists: (default=True)
It returns an error if seed already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# pylint: disable = unused-argument
seed_id = seed_elt.get('id')
if seed_id is None:
return None
seed_id = make_id(seed_id, 'token', ID_LEN)
dbseed = dbsession.query(cls).filter_by(seed_id=seed_id).first()
if dbseed is not None:
if error_if_exists:
return _('Seed "${s}" already exists.', {'s': seed_id})
return None
# Create seed
record = cls.record_from_xml(seed_id, seed_elt)
error = cls.record_format(record)
if error:
return error
dbseed = cls(**record)
dbsession.add(dbseed)
# Add values
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
for elt in seed_elt.findall(
f'{{{namespace}}}values/{{{namespace}}}value'):
dbseed.values.append(
DBSeedValue(
variable=elt.get('variable')[:ID_LEN],
value='' if elt.text is None else elt.text[:VALUE_LEN]))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_from_xml(cls, seed_id: str, seed_elt: etree.Element):
"""Convert an user seed XML element into a dictionary.
:param str seed_id:
User seed ID.
:type seed_elt: lxml.etree.Element
:param seed_elt:
Seed XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
attachments_elt = seed_elt.find(f'{{{namespace}}}attachments')
seeder_elt = seed_elt.find(f'{{{namespace}}}seeder')
return { # yapf: disable
'seed_id': seed_id,
'i18n_label': dumps(
i18n_xml_text(seed_elt, 'ns0:label', {'ns0': namespace}),
ensure_ascii=False),
'attachments_key':
attachments_elt is not None and attachments_elt.get('key') or None,
'icon': attachments_elt is not None and attachments_elt.findtext(
f'{{{namespace}}}icon') or None,
'model': attachments_elt is not None and attachments_elt.findtext(
f'{{{namespace}}}model') or None,
'seeder': seeder_elt.text.strip()
}
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def db2xml(self, dbsession: Session | None = None) -> etree.Element:
"""Serialize a seed to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
seed_elt = etree.Element('seed')
seed_elt.set('id', self.seed_id)
# Labels
db2xml_i18n_labels(self, seed_elt, 3)
# Attachments
if self.attachments_key and (self.icon or self.model):
elt = etree.SubElement(
seed_elt, 'attachments', key=self.attachments_key)
if self.icon:
etree.SubElement(elt, 'icon').text = self.icon
if self.model:
etree.SubElement(elt, 'model').text = self.model
# Seeder
elt = etree.SubElement(seed_elt, 'seeder')
elt.text = self.seeder
# Values
if self.values:
elt = etree.SubElement(seed_elt, 'values')
for dbitem in self.values:
etree.SubElement(
elt, 'value', variable=dbitem.variable).text = dbitem.value
return seed_elt
# -------------------------------------------------------------------------
[docs]
def attachments2directory(self, attachments: str, directory: str):
"""Copy from attachments directory the file corresponding to the seed.
See: meth:`chrysalio.models.dbase.DBBaseClass.attachments2directory`
"""
if not self.attachments_key or (not self.icon and not self.model):
return
icon = join(
attachments, self.attachments_dir, self.attachments_key,
self.icon) if self.icon else None
model = join(
attachments, self.attachments_dir, self.attachments_key,
self.model) if self.model else None
if (not icon or not exists(icon)) and (not model or not exists(model)):
return
target = join(directory, self.attachments_dir, self.attachments_key)
if not exists(target):
makedirs(target)
if icon and exists(icon):
copy2(icon, target)
if model and exists(model):
copy2(model, target)
# -------------------------------------------------------------------------
[docs]
def tab4view(self, request: Request, tab_index: int, form: Form) -> str:
"""Generate the tab content of a seed.
:type request: pyramid.request.Request
:param request:
Current request.
:param int index:
Index of the tab.
:type form: .lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
if tab_index == 0:
return self._tab4view_information(request, form)
if tab_index == 1:
return self._tab4view_model(request, form)
if tab_index == 2:
return self._tab4view_values(request, form)
return ''
# -------------------------------------------------------------------------
def _tab4view_information(self, request: Request, form: Form) -> str:
"""Generate the information tab.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
seeder = request.registry['seeders'].get(self.seeder) \
if 'seeders' in request.registry else None
label = seeder.label if seeder is not None else _('Unknown seeder')
html = form.grid_item(
translate(_('Identifier:')), self.seed_id, clear=True)
html += form.grid_item(
translate(_('Seeder:')),
translate(label),
title=self.seeder,
clear=True)
html += view_i18n_labels(request, form, self, with_description=False)
return html
# -------------------------------------------------------------------------
def _tab4view_model(self, request: Request, form: Form) -> str:
"""Generate the model tab.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
seeder = request.registry['seeders'].get(self.seeder) \
if 'seeders' in request.registry else None
if seeder is None or not seeder.need_model:
return _('This seed does not define model.')
model_url = attachment_url(
request, self.attachments_dir, self.attachments_key, self.model)
if model_url is None:
return _('No model defined.')
translate = request.localizer.translate
builder = Builder()
download = builder.a(
href=model_url,
title=translate(_('Open the model in new tab')),
class_='cioIconButton cioButtonExternal',
target='blank')
return form.grid_item(translate(_('Model:')), download, clear=True)
# -------------------------------------------------------------------------
def _tab4view_values(self, request: Request, form: Form) -> str:
"""Generate the values tab.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:rtype: chrysalio.helpers.literal.Literal
"""
values = {k.variable: k.value for k in self.values}
seeder = request.registry['seeders'].get(self.seeder) \
if 'seeders' in request.registry else None
html = '' if seeder is None else seeder.values_tabview(
request, form, values)
if not html:
html = _('This seed does not define variables.')
return html
# -------------------------------------------------------------------------
[docs]
@classmethod
def settings_schema(
cls,
request: Request,
defaults: dict,
dbseed: DBSeed | None = None) -> tuple[SchemaNode, dict]:
"""Return a Colander schema to edit a seed.
:type request: pyramid.request.Request
:param request:
Current request.
:param dict defaults:
Default values for the form set by the user paging object.
:type dbseed: DBSeed
:param dbseed: (optional)
Current scheduled seed SqlAlchemy object.
:rtype: tuple
:return:
A tuple such as ``(schema, defaults)``.
"""
# Informations
schema = SchemaNode(Mapping())
if dbseed is None:
schema.add(SchemaNode( # yapf: disable
CoString(),
name='seed_id',
validator=All(
Regex(r'^[a-z0-9_-]+$'), Length(min=2, max=ID_LEN))))
schema.add(SchemaNode( # yapf: disable
CoString(), name='seeder', validator=Length(max=SEEDER_LEN)))
schema_i18n_labels(request, schema, LABEL_LEN)
# Defaults
if dbseed is None:
defaults.update({'seeder': 'file'})
else:
defaults.update(
defaults_i18n_labels(dbseed, with_description=False))
seeder = request.registry['seeders'].get(dbseed.seeder) \
if 'seeders' in request.registry else None
if seeder is not None:
seeder.values_schema(schema, defaults, dbseed)
return schema, defaults
# -------------------------------------------------------------------------
[docs]
@classmethod
def tab4edit(
cls,
request: Request,
tab_index: int,
form: Form,
dbseed: DBSeed | None = None) -> str:
"""Generate the tab content of user seed for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:param int tab_index:
Index of the tab.
:type form: .lib.form.Form
:param form:
Current form object.
:type dbseed: DBSeed
:param dbseed: (optional)
Current user seed SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
if tab_index == 0:
return cls._tab4edit_information(request, form, dbseed)
if tab_index == 1:
return cls._tab4edit_model(request, form, dbseed)
if tab_index == 2:
return cls._tab4edit_values(request, form, dbseed)
return ''
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_information(
cls,
request: Request,
form: Form,
dbseed: DBSeed | None = None) -> str:
"""Generate the information tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: .lib.form.Form
:param form:
Current form object.
:type dbseed: DBSeed
:param dbseed:
Current user seed SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
translate = request.localizer.translate
if dbseed is None:
seeder_labels = dict(
(k, request.registry['seeders'][k].label)
for k in request.registry.get('seeders', ''))
html = form.grid_text(
'seed_id',
translate(_('Identifier:')),
required=True,
maxlength=ID_LEN,
clear=True)
html += form.grid_select(
'seeder',
translate(_('Seeder:')),
[('', ' ')] + list(seeder_labels.items()),
required=True,
clear=True)
else:
seeder = request.registry['seeders'].get(dbseed.seeder) \
if 'seeders' in request.registry else None
html = form.grid_item(
translate(_('Identifier:')), dbseed.seed_id, clear=True)
html += form.grid_item(
translate(_('Seeder:')),
translate(
seeder.
label if seeder is not None else _('Unknown seeder')),
title=dbseed.seeder,
clear=True)
html += edit_i18n_labels(request, form, LABEL_LEN)
return html
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_model(
cls, request: Request, form: Form, dbseed: DBSeed | None) -> str:
"""Generate the model tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:type dbseed: DBSeed
:param dbseed:
Current user seed SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
if dbseed is None:
return _('Create the seed before setting model.')
seeder = request.registry['seeders'].get(dbseed.seeder) \
if 'seeders' in request.registry else None
if seeder is None or not seeder.need_model:
return _('This seed does not define model.')
translate = request.localizer.translate
model_url = attachment_url(
request, dbseed.attachments_dir, dbseed.attachments_key,
dbseed.model)
upload = '{} {}'.format(
form.upload('model', class_='cioHidden'),
form.submit(
'mdl?.x',
translate(_('Upload model')),
class_='cioInputFile cioButton cioButtonMini'))
if model_url is None:
return form.grid_item(translate(_('Model:')), upload, clear=True)
builder = Builder()
download = builder.a(
href=model_url,
title=translate(_('Open the model in new tab')),
class_="cioIconButton cioButtonExternal",
target='blank')
return form.grid_item(
translate(_('Model:')), f'{download} {upload}', clear=True)
# -------------------------------------------------------------------------
@classmethod
def _tab4edit_values(
cls, request: Request, form: Form, dbseed: DBSeed | None) -> str:
"""Generate the values tab for edition.
:type request: pyramid.request.Request
:param request:
Current request.
:type form: chrysalio.lib.form.Form
:param form:
Current form object.
:type dbseed: DBSeed
:param dbseed:
Current user seed SqlAlchemy object.
:rtype: chrysalio.helpers.literal.Literal
"""
if dbseed is None:
return _('Create the seed before setting variables.')
seeder = request.registry['seeders'].get(dbseed.seeder) \
if 'seeders' in request.registry else None
html = '' if seeder is None else seeder.values_tabedit(request, form)
return html
# =============================================================================
[docs]
class DBSeedValue(DBDeclarativeClass):
"""Class to link seeds with their values (one-to-many)."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_seeds_values'
__table_args__ = {'mysql_engine': 'InnoDB'}
seed_id = Column(
String(ID_LEN),
ForeignKey('wrh2_seeds.seed_id', ondelete='CASCADE'),
primary_key=True)
variable = Column(String(ID_LEN), primary_key=True)
value = Column(String(VALUE_LEN))