Source code for ciowarehouse2.models.dbfield

"""SQLAlchemy-powered model definitions for fields."""

from __future__ import annotations
from json import dumps, loads

from sqlalchemy import Column, ForeignKey, String, Text, Enum, Integer
from sqlalchemy.orm import Session, relationship
from lxml import etree

from pyramid.request import Request

from chrysalio.lib.i18n import translate_field, record_format_i18n
from chrysalio.lib.utils import make_id
from chrysalio.lib.xml import XML_NS, i18n_xml_text, db2xml_i18n_labels
from chrysalio.models import DBDeclarativeClass, ID_LEN, VALUE_LEN
from chrysalio.models.dbbase import DBBaseClass
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.field import DISPLAYS
from ..lib.i18n import _
from . import CLASSES_LEN


# =============================================================================
[docs] class DBField(DBDeclarativeClass, DBBaseClass): """SQLAlchemy-powered field class.""" suffix = 'ciofld2' __tablename__ = 'wrh2_fields' __table_args__ = {'mysql_engine': 'InnoDB'} __mapper_args__ = {'confirm_deleted_rows': False} field_id = Column(String(ID_LEN), primary_key=True) i18n_label = Column(Text(), nullable=False) display = Column(Enum(*DISPLAYS.keys(), name='display'), nullable=False) position = Column(Integer(), default=0) in_cards = Column(Integer(), default=0) in_list = Column(Integer(), default=0) in_snippets = Column(Integer(), default=0) in_filter = Column(Integer(), default=0) in_meta = Column(Integer(), default=0) classes = Column(String(CLASSES_LEN)) choices = relationship('DBFieldChoice', cascade='all, delete') # -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession: Session, field_elt: etree.Element, error_if_exists: bool = True, kwargs: dict | None = None) -> str | None: """Load a metadata field from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type field_elt: lxml.etree.Element :param field_elt: Metadata field XML element. :param bool error_if_exists: (default=True) It returns an error if metadata field already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument # Check if already exists if not field_elt.get('id'): return None field_id = make_id(field_elt.get('id'), 'token', ID_LEN) dbfield = dbsession.query(cls).filter_by(field_id=field_id).first() if dbfield is not None: if error_if_exists: return _('Field "${f}" already exists.', {'f': field_id}) return None # Create field record = cls.record_from_xml(field_id, field_elt) error = cls.record_format(record) if error: return error dbfield = cls(**record) dbsession.add(dbfield) # Add choices dbsession.flush() namespace = RELAXNG_CIOWAREHOUSE2['namespace'] for elt in field_elt.xpath('ns:choice', namespaces={'ns': namespace}): dbfield.choices.append( DBFieldChoice( value=elt.get('value'), i18n_label=dumps( i18n_xml_text(elt, 'ns:label', {'ns': namespace})))) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, field_id: str, field_elt: etree.Element): """Convert a field XML element into a dictionary. :param str field_id: Field ID. :type field_elt: lxml.etree.Element :param field_elt: Field XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] return { # yapf: disable 'field_id': field_id, 'i18n_label': dumps( i18n_xml_text(field_elt, 'ns:label', {'ns': namespace})), 'display': field_elt.get('display'), 'position': int(field_elt.xpath('count(preceding-sibling::*)')), 'in_cards': int(field_elt.get('in-cards', '0')), 'in_list': int(field_elt.get('in-list', '0')), 'in_snippets': int(field_elt.get('in-snippets', '0')), 'in_filter': int(field_elt.get('in-filter', '0')), 'in_meta': int(field_elt.get('in-meta', '0')), 'classes': field_elt.get('classes') }
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record: dict): """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Meta field ID if not record.get('field_id'): return _('Field without ID.') record['field_id'] = make_id(record['field_id'], 'token', ID_LEN) # Labels if not record_format_i18n(record): return _('Field without label.') # Display mode if not record.get('display'): return _('Field without display mode.') return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession: Session | None = None) -> etree.Element: """Serialize a metadata field to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument field_elt = etree.Element('field') field_elt.set('id', self.field_id) field_elt.set('display', self.display) if self.in_cards: field_elt.set('in-cards', str(self.in_cards)) if self.in_list: field_elt.set('in-list', str(self.in_list)) if self.in_snippets: field_elt.set('in-snippets', str(self.in_snippets)) if self.in_filter: field_elt.set('in-filter', str(self.in_filter)) if self.in_meta: field_elt.set('in-meta', str(self.in_meta)) if self.classes: field_elt.set('classes', str(self.classes)) db2xml_i18n_labels(self, field_elt, 6) if self.display in ('list', 'color'): for choice in self.choices: choice_elt = etree.SubElement( field_elt, 'choice', value=choice.value) i18n = loads(choice.i18n_label) for lang in sorted(i18n): elt = etree.SubElement(choice_elt, 'label') elt.set('{0}lang'.format(XML_NS), lang) elt.text = i18n[lang] return field_elt
# =============================================================================
[docs] class DBFieldChoice(DBDeclarativeClass): """SQLAlchemy-powered field choices class (one-to-many).""" __tablename__ = 'wrh2_fields_choices' __table_args__ = {'mysql_engine': 'InnoDB'} field_id = Column( String(ID_LEN), ForeignKey('wrh2_fields.field_id', ondelete='CASCADE'), primary_key=True) value = Column(String(VALUE_LEN), primary_key=True, nullable=False) i18n_label = Column(Text(), nullable=False) # -------------------------------------------------------------------------
[docs] def label(self, request: Request): """Return a translated label. :type request: pyramid.request.Request :param request: Current request. :rtype: str """ return translate_field(request, loads(self.i18n_label)) # type: ignore