Source code for ciowarehouse2.models.dbinput

"""SQLAlchemy-powered model definitions for input streams (file, FTP or email)
and input rules."""

from __future__ import annotations

from sqlalchemy import Column, ForeignKey, Enum, String, Integer, Boolean, Text
from sqlalchemy.orm import Session, relationship
from lxml import etree

from chrysalio.lib.utils import make_id, tostr, encrypt
from chrysalio.models import ID_LEN, EMAIL_LEN, VALUE_LEN, DBDeclarativeClass
from . import URL_LEN, PATH_LEN
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.i18n import _

VARIABLE_TRANSFORM_LABELS = {
    'capitalize': _('Capitalize'),
    'lower': _('Lower'),
    'upper': _('Upper'),
    'replace': _('Replace')
}
DEFAULT_ARCHIVE_TTL = 720  # in hours = 30 days


# =============================================================================
[docs] class DBInputStream(DBDeclarativeClass): """SQLAlchemy-powered stream class.""" suffix = 'cioinp2' __tablename__ = 'wrh2_input_streams' __table_args__ = {'mysql_engine': 'InnoDB'} stream_id = Column(String(EMAIL_LEN), primary_key=True) stream_type = Column(String(ID_LEN), nullable=False) host = Column(String(URL_LEN), nullable=False) path = Column(String(PATH_LEN)) port = Column(Integer) ssl = Column(Boolean(name='ssl'), default=False) user = Column(String(ID_LEN)) password = Column(String(128)) archive_id = Column(String(ID_LEN), nullable=False) archive_path = Column(String(255)) archive_ttl = Column(Integer) # -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession: Session, stream_elt: etree.Element, error_if_exists: bool = True, kwargs=None) -> str | None: """Load an stream description from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type stream_elt: lxml.etree.Element :param stream_elt: Stream XML element. :param bool error_if_exists: (default=True) It returns an error if stream already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument # Check if already exists stream_id = make_id(stream_elt.get('id'), 'token', EMAIL_LEN) dbstream = dbsession.query(cls).filter_by(stream_id=stream_id).first() if dbstream is not None: if error_if_exists: return _('Stream "${s}" already exists.', {'s': stream_id}) return None # Create stream record = cls.record_from_xml(stream_id, stream_elt) error = cls.record_format(record) if error: return error dbsession.add(cls(**record)) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml( cls, stream_id: str, stream_elt: etree.Element) -> dict: """Convert a stream XML element into a dictionary. :param str stream_id: Stream ID (identifier or mail address). :type stream_elt: lxml.etree.Element Stream XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] archive_elt = stream_elt.find('{{{0}}}archive'.format(namespace)) return { 'stream_id': stream_id, 'stream_type': stream_elt.get('type'), 'host': stream_elt.findtext('{{{0}}}host'.format(namespace)), 'path': stream_elt.findtext('{{{0}}}path'.format(namespace)), 'port': stream_elt.findtext('{{{0}}}port'.format(namespace)), 'ssl': stream_elt.findtext('{{{0}}}ssl'.format(namespace)) == 'true', 'user': stream_elt.findtext('{{{0}}}user'.format(namespace)), 'password': stream_elt.findtext('{{{0}}}password'.format(namespace)), 'archive_id': archive_elt is not None and (archive_elt.get('warehouse')), 'archive_path': archive_elt is not None and archive_elt.text, 'archive_ttl': archive_elt is not None and archive_elt.get('ttl') }
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record: dict) -> str | None: """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Stream ID if not record.get('stream_id'): return _('Stream ID is missing.') # Stream type if not record.get('stream_type'): return _('Stream type is missing.') # Path if record.get('path'): record['path'] = record['path'].strip() # Port if record.get('port'): record['port'] = int(record['port']) # Password password = record.get('password') if password and not record.get('user'): del record['password'] elif password and (not tostr(password).endswith('=') or len(password) < 32): record['password'] = encrypt(password, 'warehouse') # Archive if not record.get('archive_id'): return _('Archive is missing.') record['archive_ttl'] = int(record['archive_ttl']) \ if 'archive_ttl' in record else DEFAULT_ARCHIVE_TTL return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession: Session | None = None) -> etree.Element: """Serialize a file stream to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument stream_elt = etree.Element('input-stream') stream_elt.set('id', self.stream_id) stream_elt.set('type', self.stream_type) etree.SubElement(stream_elt, 'host').text = self.host if self.path: etree.SubElement(stream_elt, 'path').text = self.path if self.port: etree.SubElement(stream_elt, 'port').text = str(self.port) if self.ssl: etree.SubElement(stream_elt, 'ssl').text = 'true' if self.user: etree.SubElement(stream_elt, 'user').text = self.user if self.password: etree.SubElement(stream_elt, 'password').text = self.password elt = etree.SubElement( stream_elt, 'archive', warehouse=self.archive_id) if self.archive_ttl is not None \ and self.archive_ttl != DEFAULT_ARCHIVE_TTL: elt.set('ttl', str(self.archive_ttl)) elt.text = self.archive_path or '' return stream_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory): """Copy from attachments directory the file corresponding to the user. :param str attachments: Absolute path to the attachments directory. :param str directory: The backup directory. """
# =============================================================================
[docs] class DBInputRule(DBDeclarativeClass): """SQLAlchemy-powered input rule class.""" suffix = 'cioinp' __tablename__ = 'wrh2_input_rules' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column(String(ID_LEN), primary_key=True) rule_type = Column(String(ID_LEN), nullable=False, default='basic') priority = Column(Integer, default=0, index=True) exclusive = Column(Boolean(name='exclusive'), default=False) warehouse_id = Column(String(2 * ID_LEN), nullable=False) path = Column(String(PATH_LEN)) flat = Column(Boolean(name='flat'), default=False) variables = relationship( 'DBInputRuleVariable', order_by='DBInputRuleVariable.order', cascade='all, delete') conditions = relationship( 'DBInputRuleCondition', order_by='DBInputRuleCondition.condition_id', cascade='all, delete') # -------------------------------------------------------------------------
[docs] @classmethod def xml2db( cls, dbsession: Session, rule_elt: etree.Element, error_if_exists: bool = True, kwargs=None) -> str | None: """Load a user rule from a XML element. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type rule_elt: lxml.etree.Element :param rule_elt: User rule XML element. :param bool error_if_exists: (default=True) It returns an error if user rule already exists. :param dict kwargs: (optional) Dictionary of keyword arguments. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # pylint: disable = unused-argument # Check if already exists rule_id = make_id(rule_elt.get('id'), 'token', ID_LEN) dbrule = dbsession.query(cls).filter_by(rule_id=rule_id).first() if dbrule is not None: if error_if_exists: return _('Rule "${r}" already exists.', {'s': rule_id}) return None # Create rule record = cls.record_from_xml(rule_id, rule_elt) error = cls.record_format(record) if error: return error dbrule = cls(**record) dbsession.add(dbrule) # Add variables dbsession.flush() for number, elt in enumerate(rule_elt.findall( f'{{{RELAXNG_CIOWAREHOUSE2["namespace"]}}}variable')): dbrule.variables.append( DBInputRuleVariable( name=elt.get('name'), order=number + 1, key=elt.get('key'), pattern=elt.text.strip() if elt.text is not None else None, transform=elt.get('transform'), args=elt.get('args'))) # Add conditions dbsession.flush() for number, elt in enumerate(rule_elt.findall( f'{{{RELAXNG_CIOWAREHOUSE2["namespace"]}}}condition')): dbrule.conditions.append( DBInputRuleCondition( condition_id=number + 1, key=elt.get('key'), pattern=elt.text.strip() if elt.text else None)) return None
# -------------------------------------------------------------------------
[docs] @classmethod def record_from_xml(cls, rule_id: str, rule_elt: etree.Element) -> dict: """Convert an user rule XML element into a dictionary. :param str rule_id: Rule ID. :type rule_elt: lxml.etree.Element :param rule_elt: Rule XML element. :rtype: dict """ namespace = RELAXNG_CIOWAREHOUSE2['namespace'] destination_elt = rule_elt.find('{{{0}}}destination'.format(namespace)) path = destination_elt.text.strip() \ if destination_elt is not None else None return { 'rule_id': rule_id, 'rule_type': rule_elt.get('type'), 'priority': int(rule_elt.get('priority', '0')), 'exclusive': rule_elt.get('exclusive') == 'true', 'warehouse_id': destination_elt.get('warehouse') if path else None, 'path': path, 'flat': destination_elt.get('flat') == 'true' if path else False }
# -------------------------------------------------------------------------
[docs] @classmethod def record_format(cls, record: dict) -> str | None: """Check and possibly correct a record before inserting it in the database. :param dict record: Dictionary of values to check. :rtype: ``None`` or :class:`pyramid.i18n.TranslationString` :return: ``None`` or error message. """ for k in [i for i in record if record[i] is None]: del record[k] # Rule ID if not record.get('rule_id'): return _('Rule ID is missing.') # Rule type if not record.get('rule_type'): record['rule_type'] = 'basic' # Destination if not record.get('warehouse_id'): return _('Rule without warehouse.') return None
# -------------------------------------------------------------------------
[docs] def db2xml(self, dbsession: Session | None = None) -> etree.Element: """Serialize an user rule to a XML representation. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: (optional) SQLAlchemy session. :rtype: lxml.etree.Element """ # pylint: disable = unused-argument rule_elt = etree.Element('input-rule') rule_elt.set('id', self.rule_id) if self.rule_type != 'basic': rule_elt.set('type', self.rule_type) if self.priority: rule_elt.set('priority', str(self.priority)) if self.exclusive: rule_elt.set('exclusive', 'true') # Variables for dbvariable in self.variables: elt = etree.SubElement(rule_elt, 'variable') elt.set('name', dbvariable.name) elt.set('key', dbvariable.key) if dbvariable.transform: elt.set('transform', dbvariable.transform) if dbvariable.args: elt.set('args', dbvariable.args) if dbvariable.pattern is not None: elt.text = dbvariable.pattern # Conditions for dbcondition in self.conditions: elt = etree.SubElement(rule_elt, 'condition') elt.set('key', dbcondition.key) elt.text = dbcondition.pattern # Destination elt = etree.SubElement( rule_elt, 'destination', warehouse=self.warehouse_id) elt.text = self.path if self.flat: elt.set('flat', 'true') return rule_elt
# -------------------------------------------------------------------------
[docs] def attachments2directory(self, attachments, directory): """Copy from attachments directory the file corresponding to the user. :param str attachments: Absolute path to the attachments directory. :param str directory: The backup directory. """
# =============================================================================
[docs] class DBInputRuleVariable(DBDeclarativeClass): """SQLAlchemy-powered input rule variable class.""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh2_input_rules_variables' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column( String(ID_LEN), ForeignKey('wrh2_input_rules.rule_id', ondelete='CASCADE'), primary_key=True) name = Column(String(ID_LEN), primary_key=True) order = Column(Integer, default=0) key = Column(String(ID_LEN), nullable=False) pattern = Column(String(VALUE_LEN)) transform = Column( Enum(*VARIABLE_TRANSFORM_LABELS.keys(), name='rule_transform')) args = Column(Text)
# =============================================================================
[docs] class DBInputRuleCondition(DBDeclarativeClass): """SQLAlchemy-powered input rule condition class.""" # pylint: disable = too-few-public-methods __tablename__ = 'wrh2_input_rules_conditions' __table_args__ = {'mysql_engine': 'InnoDB'} rule_id = Column( String(ID_LEN), ForeignKey('wrh2_input_rules.rule_id', ondelete='CASCADE'), primary_key=True) condition_id = Column(Integer, primary_key=True) key = Column(String(VALUE_LEN), nullable=False) pattern = Column(String(VALUE_LEN))