Source code for ciowarehouse2.models.dbseed

"""SQLAlchemy-powered model definitions for seeds."""

from __future__ import annotations
from os import makedirs
from os.path import join, exists
from shutil import copy2
from json import dumps

from sqlalchemy import Column, ForeignKey, String, Text
from sqlalchemy.orm import Session, relationship
from lxml import etree
from colander import SchemaNode, Mapping, String as CoString, Regex
from colander import Length, All

from pyramid.request import Request

from chrysalio.lib.i18n import view_i18n_labels, edit_i18n_labels
from chrysalio.lib.i18n import schema_i18n_labels, defaults_i18n_labels
from chrysalio.lib.i18n import record_format_i18n
from chrysalio.lib.utils import make_id
from chrysalio.lib.xml import i18n_xml_text, db2xml_i18n_labels
from chrysalio.lib.form import Form
from chrysalio.lib.attachment import attachment_url
from chrysalio.helpers.builder import Builder
from chrysalio.models import ID_LEN, LABEL_LEN, VALUE_LEN
from chrysalio.models import DBDeclarativeClass
from chrysalio.models.dbbase import DBBaseClass
from ..lib.i18n import _
from ..relaxng import RELAXNG_CIOWAREHOUSE2

SEEDER_LEN = 128


# =============================================================================
[docs] class DBSeed(DBDeclarativeClass, DBBaseClass): """SQLAlchemy-powered seed class.""" # pylint: disable = too-many-instance-attributes suffix = 'ciosed2' attachments_dir = 'Seeds' _settings_tabs = (_('Information'), _('Model'), _('Values')) __tablename__ = 'wrh2_seeds' __table_args__ = {'mysql_engine': 'InnoDB'} seed_id = Column(String(ID_LEN), primary_key=True) i18n_label = Column(Text(), nullable=False) attachments_key = Column(String(ID_LEN + 20)) icon = Column(String(ID_LEN + 4)) model = Column(String(ID_LEN + 4)) seeder = Column(String(SEEDER_LEN), nullable=False) values = relationship('DBSeedValue', cascade='all, delete') # -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession: Session, seed_elt: etree.Element, error_if_exists: bool = True, kwargs=None) -> str | None: """Load a seed from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type seed_elt: lxml.etree.Element :param seed_elt: Seed XML element. :param bool error_if_exists: (default=True) It returns an error if seed already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument seed_id = seed_elt.get('id') if seed_id is None: return None seed_id = make_id(seed_id, 'token', ID_LEN) dbseed = dbsession.query(cls).filter_by(seed_id=seed_id).first() if dbseed is not None: if error_if_exists: return _('Seed "${s}" already exists.', {'s': seed_id}) return None # Create seed record = cls.record_from_xml(seed_id, seed_elt) error = cls.record_format(record) if error: return error dbseed = cls(**record) dbsession.add(dbseed) # Add values namespace = RELAXNG_CIOWAREHOUSE2['namespace'] for elt in seed_elt.findall( f'{{{namespace}}}values/{{{namespace}}}value'): dbseed.values.append( DBSeedValue( variable=elt.get('variable')[:ID_LEN], value='' if elt.text is None else elt.text[:VALUE_LEN])) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, seed_id: str, seed_elt: etree.Element): """Convert an user seed XML element into a dictionary. :param str seed_id: User seed ID. :type seed_elt: lxml.etree.Element :param seed_elt: Seed XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] attachments_elt = seed_elt.find(f'{{{namespace}}}attachments') seeder_elt = seed_elt.find(f'{{{namespace}}}seeder') return { # yapf: disable 'seed_id': seed_id, 'i18n_label': dumps( i18n_xml_text(seed_elt, 'ns0:label', {'ns0': namespace}), ensure_ascii=False), 'attachments_key': attachments_elt is not None and attachments_elt.get('key') or None, 'icon': attachments_elt is not None and attachments_elt.findtext( f'{{{namespace}}}icon') or None, 'model': attachments_elt is not None and attachments_elt.findtext( f'{{{namespace}}}model') or None, 'seeder': seeder_elt.text.strip() }
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record: dict) -> str | None: """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Seed ID if not record.get('seed_id'): return _('Seed without ID.') record['seed_id'] = make_id(record['seed_id'], 'token', ID_LEN) # Labels if not record_format_i18n(record): return _('Seed without label.') # Seeder if not record.get('seeder'): return _('Seed without seeder.') return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession: Session | None = None) -> etree.Element: """Serialize a seed to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument seed_elt = etree.Element('seed') seed_elt.set('id', self.seed_id) # Labels db2xml_i18n_labels(self, seed_elt, 3) # Attachments if self.attachments_key and (self.icon or self.model): elt = etree.SubElement( seed_elt, 'attachments', key=self.attachments_key) if self.icon: etree.SubElement(elt, 'icon').text = self.icon if self.model: etree.SubElement(elt, 'model').text = self.model # Seeder elt = etree.SubElement(seed_elt, 'seeder') elt.text = self.seeder # Values if self.values: elt = etree.SubElement(seed_elt, 'values') for dbitem in self.values: etree.SubElement( elt, 'value', variable=dbitem.variable).text = dbitem.value return seed_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments: str, directory: str): """Copy from attachments directory the file corresponding to the seed. See: meth:`chrysalio.models.dbase.DBBaseClass.attachments2directory` """ if not self.attachments_key or (not self.icon and not self.model): return icon = join( attachments, self.attachments_dir, self.attachments_key, self.icon) if self.icon else None model = join( attachments, self.attachments_dir, self.attachments_key, self.model) if self.model else None if (not icon or not exists(icon)) and (not model or not exists(model)): return target = join(directory, self.attachments_dir, self.attachments_key) if not exists(target): makedirs(target) if icon and exists(icon): copy2(icon, target) if model and exists(model): copy2(model, target)
# -------------------------------------------------------------------------
[docs] def tab4view(self, request: Request, tab_index: int, form: Form) -> str: """Generate the tab content of a seed. :type request: pyramid.request.Request :param request: Current request. :param int index: Index of the tab. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ if tab_index == 0: return self._tab4view_information(request, form) if tab_index == 1: return self._tab4view_model(request, form) if tab_index == 2: return self._tab4view_values(request, form) return ''
# ------------------------------------------------------------------------- def _tab4view_information(self, request: Request, form: Form) -> str: """Generate the information tab. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate seeder = request.registry['seeders'].get(self.seeder) \ if 'seeders' in request.registry else None label = seeder.label if seeder is not None else _('Unknown seeder') html = form.grid_item( translate(_('Identifier:')), self.seed_id, clear=True) html += form.grid_item( translate(_('Seeder:')), translate(label), title=self.seeder, clear=True) html += view_i18n_labels(request, form, self, with_description=False) return html # ------------------------------------------------------------------------- def _tab4view_model(self, request: Request, form: Form) -> str: """Generate the model tab. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ seeder = request.registry['seeders'].get(self.seeder) \ if 'seeders' in request.registry else None if seeder is None or not seeder.need_model: return _('This seed does not define model.') model_url = attachment_url( request, self.attachments_dir, self.attachments_key, self.model) if model_url is None: return _('No model defined.') translate = request.localizer.translate builder = Builder() download = builder.a( href=model_url, title=translate(_('Open the model in new tab')), class_='cioIconButton cioButtonExternal', target='blank') return form.grid_item(translate(_('Model:')), download, clear=True) # ------------------------------------------------------------------------- def _tab4view_values(self, request: Request, form: Form) -> str: """Generate the values tab. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :rtype: chrysalio.helpers.literal.Literal """ values = {k.variable: k.value for k in self.values} seeder = request.registry['seeders'].get(self.seeder) \ if 'seeders' in request.registry else None html = '' if seeder is None else seeder.values_tabview( request, form, values) if not html: html = _('This seed does not define variables.') return html # -------------------------------------------------------------------------
[docs] @classmethod def settings_schema( cls, request: Request, defaults: dict, dbseed: DBSeed | None = None) -> tuple[SchemaNode, dict]: """Return a Colander schema to edit a seed. :type request: pyramid.request.Request :param request: Current request. :param dict defaults: Default values for the form set by the user paging object. :type dbseed: DBSeed :param dbseed: (optional) Current scheduled seed SqlAlchemy object. :rtype: tuple :return: A tuple such as ``(schema, defaults)``. """ # Informations schema = SchemaNode(Mapping()) if dbseed is None: schema.add(SchemaNode( # yapf: disable CoString(), name='seed_id', validator=All( Regex(r'^[a-z0-9_-]+$'), Length(min=2, max=ID_LEN)))) schema.add(SchemaNode( # yapf: disable CoString(), name='seeder', validator=Length(max=SEEDER_LEN))) schema_i18n_labels(request, schema, LABEL_LEN) # Defaults if dbseed is None: defaults.update({'seeder': 'file'}) else: defaults.update( defaults_i18n_labels(dbseed, with_description=False)) seeder = request.registry['seeders'].get(dbseed.seeder) \ if 'seeders' in request.registry else None if seeder is not None: seeder.values_schema(schema, defaults, dbseed) return schema, defaults
# -------------------------------------------------------------------------
[docs] @classmethod def tab4edit( cls, request: Request, tab_index: int, form: Form, dbseed: DBSeed | None = None) -> str: """Generate the tab content of user seed for edition. :type request: pyramid.request.Request :param request: Current request. :param int tab_index: Index of the tab. :type form: .lib.form.Form :param form: Current form object. :type dbseed: DBSeed :param dbseed: (optional) Current user seed SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ if tab_index == 0: return cls._tab4edit_information(request, form, dbseed) if tab_index == 1: return cls._tab4edit_model(request, form, dbseed) if tab_index == 2: return cls._tab4edit_values(request, form, dbseed) return ''
# ------------------------------------------------------------------------- @classmethod def _tab4edit_information( cls, request: Request, form: Form, dbseed: DBSeed | None = None) -> str: """Generate the information tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: .lib.form.Form :param form: Current form object. :type dbseed: DBSeed :param dbseed: Current user seed SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ translate = request.localizer.translate if dbseed is None: seeder_labels = dict( (k, request.registry['seeders'][k].label) for k in request.registry.get('seeders', '')) html = form.grid_text( 'seed_id', translate(_('Identifier:')), required=True, maxlength=ID_LEN, clear=True) html += form.grid_select( 'seeder', translate(_('Seeder:')), [('', ' ')] + list(seeder_labels.items()), required=True, clear=True) else: seeder = request.registry['seeders'].get(dbseed.seeder) \ if 'seeders' in request.registry else None html = form.grid_item( translate(_('Identifier:')), dbseed.seed_id, clear=True) html += form.grid_item( translate(_('Seeder:')), translate( seeder. label if seeder is not None else _('Unknown seeder')), title=dbseed.seeder, clear=True) html += edit_i18n_labels(request, form, LABEL_LEN) return html # ------------------------------------------------------------------------- @classmethod def _tab4edit_model( cls, request: Request, form: Form, dbseed: DBSeed | None) -> str: """Generate the model tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: chrysalio.lib.form.Form :param form: Current form object. :type dbseed: DBSeed :param dbseed: Current user seed SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ if dbseed is None: return _('Create the seed before setting model.') seeder = request.registry['seeders'].get(dbseed.seeder) \ if 'seeders' in request.registry else None if seeder is None or not seeder.need_model: return _('This seed does not define model.') translate = request.localizer.translate model_url = attachment_url( request, dbseed.attachments_dir, dbseed.attachments_key, dbseed.model) upload = '{} {}'.format( form.upload('model', class_='cioHidden'), form.submit( 'mdl?.x', translate(_('Upload model')), class_='cioInputFile cioButton cioButtonMini')) if model_url is None: return form.grid_item(translate(_('Model:')), upload, clear=True) builder = Builder() download = builder.a( href=model_url, title=translate(_('Open the model in new tab')), class_="cioIconButton cioButtonExternal", target='blank') return form.grid_item( translate(_('Model:')), f'{download} {upload}', clear=True) # ------------------------------------------------------------------------- @classmethod def _tab4edit_values( cls, request: Request, form: Form, dbseed: DBSeed | None) -> str: """Generate the values tab for edition. :type request: pyramid.request.Request :param request: Current request. :type form: chrysalio.lib.form.Form :param form: Current form object. :type dbseed: DBSeed :param dbseed: Current user seed SqlAlchemy object. :rtype: chrysalio.helpers.literal.Literal """ if dbseed is None: return _('Create the seed before setting variables.') seeder = request.registry['seeders'].get(dbseed.seeder) \ if 'seeders' in request.registry else None html = '' if seeder is None else seeder.values_tabedit(request, form) return html
# =============================================================================
[docs] class DBSeedValue(DBDeclarativeClass): """Class to link seeds with their values (one-to-many).""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh2_seeds_values' __table_args__ = {'mysql_engine': 'InnoDB'} seed_id = Column( String(ID_LEN), ForeignKey('wrh2_seeds.seed_id', ondelete='CASCADE'), primary_key=True) variable = Column(String(ID_LEN), primary_key=True) value = Column(String(VALUE_LEN))