Source code for ciowarehouse2.models.dbinput
"""SQLAlchemy-powered model definitions for input streams (file, FTP or email)
and input rules."""
from __future__ import annotations
from sqlalchemy import Column, ForeignKey, Enum, String, Integer, Boolean, Text
from sqlalchemy.orm import Session, relationship
from lxml import etree
from chrysalio.lib.utils import make_id, tostr, encrypt
from chrysalio.models import ID_LEN, EMAIL_LEN, VALUE_LEN, DBDeclarativeClass
from . import URL_LEN, PATH_LEN
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from ..lib.i18n import _
VARIABLE_TRANSFORM_LABELS = {
'capitalize': _('Capitalize'),
'lower': _('Lower'),
'upper': _('Upper'),
'replace': _('Replace')
}
DEFAULT_ARCHIVE_TTL = 720 # in hours = 30 days
# =============================================================================
[docs]
class DBInputStream(DBDeclarativeClass):
"""SQLAlchemy-powered stream class."""
suffix = 'cioinp2'
__tablename__ = 'wrh2_input_streams'
__table_args__ = {'mysql_engine': 'InnoDB'}
stream_id = Column(String(EMAIL_LEN), primary_key=True)
stream_type = Column(String(ID_LEN), nullable=False)
host = Column(String(URL_LEN), nullable=False)
path = Column(String(PATH_LEN))
port = Column(Integer)
ssl = Column(Boolean(name='ssl'), default=False)
user = Column(String(ID_LEN))
password = Column(String(128))
archive_id = Column(String(ID_LEN), nullable=False)
archive_path = Column(String(255))
archive_ttl = Column(Integer)
# -------------------------------------------------------------------------
[docs]
@classmethod
def xml2db(
cls,
dbsession: Session,
stream_elt: etree.Element,
error_if_exists: bool = True,
kwargs=None) -> str | None:
"""Load an stream description from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type stream_elt: lxml.etree.Element
:param stream_elt:
Stream XML element.
:param bool error_if_exists: (default=True)
It returns an error if stream already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# pylint: disable = unused-argument
# Check if already exists
stream_id = make_id(stream_elt.get('id'), 'token', EMAIL_LEN)
dbstream = dbsession.query(cls).filter_by(stream_id=stream_id).first()
if dbstream is not None:
if error_if_exists:
return _('Stream "${s}" already exists.', {'s': stream_id})
return None
# Create stream
record = cls.record_from_xml(stream_id, stream_elt)
error = cls.record_format(record)
if error:
return error
dbsession.add(cls(**record))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_from_xml(
cls, stream_id: str, stream_elt: etree.Element) -> dict:
"""Convert a stream XML element into a dictionary.
:param str stream_id:
Stream ID (identifier or mail address).
:type stream_elt: lxml.etree.Element
Stream XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
archive_elt = stream_elt.find('{{{0}}}archive'.format(namespace))
return {
'stream_id':
stream_id,
'stream_type':
stream_elt.get('type'),
'host':
stream_elt.findtext('{{{0}}}host'.format(namespace)),
'path':
stream_elt.findtext('{{{0}}}path'.format(namespace)),
'port':
stream_elt.findtext('{{{0}}}port'.format(namespace)),
'ssl':
stream_elt.findtext('{{{0}}}ssl'.format(namespace)) == 'true',
'user':
stream_elt.findtext('{{{0}}}user'.format(namespace)),
'password':
stream_elt.findtext('{{{0}}}password'.format(namespace)),
'archive_id':
archive_elt is not None and (archive_elt.get('warehouse')),
'archive_path':
archive_elt is not None and archive_elt.text,
'archive_ttl':
archive_elt is not None and archive_elt.get('ttl')
}
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_format(cls, record: dict) -> str | None:
"""Check and possibly correct a record before inserting it in the
database.
:param dict record:
Dictionary of values to check.
:rtype: ``None`` or :class:`pyramid.i18n.TranslationString`
:return:
``None`` or error message.
"""
for k in [i for i in record if record[i] is None]:
del record[k]
# Stream ID
if not record.get('stream_id'):
return _('Stream ID is missing.')
# Stream type
if not record.get('stream_type'):
return _('Stream type is missing.')
# Path
if record.get('path'):
record['path'] = record['path'].strip()
# Port
if record.get('port'):
record['port'] = int(record['port'])
# Password
password = record.get('password')
if password and not record.get('user'):
del record['password']
elif password and (not tostr(password).endswith('=')
or len(password) < 32):
record['password'] = encrypt(password, 'warehouse')
# Archive
if not record.get('archive_id'):
return _('Archive is missing.')
record['archive_ttl'] = int(record['archive_ttl']) \
if 'archive_ttl' in record else DEFAULT_ARCHIVE_TTL
return None
# -------------------------------------------------------------------------
[docs]
def db2xml(self, dbsession: Session | None = None) -> etree.Element:
"""Serialize a file stream to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
stream_elt = etree.Element('input-stream')
stream_elt.set('id', self.stream_id)
stream_elt.set('type', self.stream_type)
etree.SubElement(stream_elt, 'host').text = self.host
if self.path:
etree.SubElement(stream_elt, 'path').text = self.path
if self.port:
etree.SubElement(stream_elt, 'port').text = str(self.port)
if self.ssl:
etree.SubElement(stream_elt, 'ssl').text = 'true'
if self.user:
etree.SubElement(stream_elt, 'user').text = self.user
if self.password:
etree.SubElement(stream_elt, 'password').text = self.password
elt = etree.SubElement(
stream_elt, 'archive', warehouse=self.archive_id)
if self.archive_ttl is not None \
and self.archive_ttl != DEFAULT_ARCHIVE_TTL:
elt.set('ttl', str(self.archive_ttl))
elt.text = self.archive_path or ''
return stream_elt
# -------------------------------------------------------------------------
[docs]
def attachments2directory(self, attachments, directory):
"""Copy from attachments directory the file corresponding to the user.
:param str attachments:
Absolute path to the attachments directory.
:param str directory:
The backup directory.
"""
# =============================================================================
[docs]
class DBInputRule(DBDeclarativeClass):
"""SQLAlchemy-powered input rule class."""
suffix = 'cioinp'
__tablename__ = 'wrh2_input_rules'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(String(ID_LEN), primary_key=True)
rule_type = Column(String(ID_LEN), nullable=False, default='basic')
priority = Column(Integer, default=0, index=True)
exclusive = Column(Boolean(name='exclusive'), default=False)
warehouse_id = Column(String(2 * ID_LEN), nullable=False)
path = Column(String(PATH_LEN))
flat = Column(Boolean(name='flat'), default=False)
variables = relationship(
'DBInputRuleVariable',
order_by='DBInputRuleVariable.order',
cascade='all, delete')
conditions = relationship(
'DBInputRuleCondition',
order_by='DBInputRuleCondition.condition_id',
cascade='all, delete')
# -------------------------------------------------------------------------
[docs]
@classmethod
def xml2db(
cls,
dbsession: Session,
rule_elt: etree.Element,
error_if_exists: bool = True,
kwargs=None) -> str | None:
"""Load a user rule from a XML element.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type rule_elt: lxml.etree.Element
:param rule_elt:
User rule XML element.
:param bool error_if_exists: (default=True)
It returns an error if user rule already exists.
:param dict kwargs: (optional)
Dictionary of keyword arguments.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# pylint: disable = unused-argument
# Check if already exists
rule_id = make_id(rule_elt.get('id'), 'token', ID_LEN)
dbrule = dbsession.query(cls).filter_by(rule_id=rule_id).first()
if dbrule is not None:
if error_if_exists:
return _('Rule "${r}" already exists.', {'s': rule_id})
return None
# Create rule
record = cls.record_from_xml(rule_id, rule_elt)
error = cls.record_format(record)
if error:
return error
dbrule = cls(**record)
dbsession.add(dbrule)
# Add variables
dbsession.flush()
for number, elt in enumerate(rule_elt.findall(
f'{{{RELAXNG_CIOWAREHOUSE2["namespace"]}}}variable')):
dbrule.variables.append(
DBInputRuleVariable(
name=elt.get('name'),
order=number + 1,
key=elt.get('key'),
pattern=elt.text.strip() if elt.text is not None else None,
transform=elt.get('transform'),
args=elt.get('args')))
# Add conditions
dbsession.flush()
for number, elt in enumerate(rule_elt.findall(
f'{{{RELAXNG_CIOWAREHOUSE2["namespace"]}}}condition')):
dbrule.conditions.append(
DBInputRuleCondition(
condition_id=number + 1,
key=elt.get('key'),
pattern=elt.text.strip() if elt.text else None))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_from_xml(cls, rule_id: str, rule_elt: etree.Element) -> dict:
"""Convert an user rule XML element into a dictionary.
:param str rule_id:
Rule ID.
:type rule_elt: lxml.etree.Element
:param rule_elt:
Rule XML element.
:rtype: dict
"""
namespace = RELAXNG_CIOWAREHOUSE2['namespace']
destination_elt = rule_elt.find('{{{0}}}destination'.format(namespace))
path = destination_elt.text.strip() \
if destination_elt is not None else None
return {
'rule_id': rule_id,
'rule_type': rule_elt.get('type'),
'priority': int(rule_elt.get('priority', '0')),
'exclusive': rule_elt.get('exclusive') == 'true',
'warehouse_id': destination_elt.get('warehouse') if path else None,
'path': path,
'flat': destination_elt.get('flat') == 'true' if path else False
}
# -------------------------------------------------------------------------
[docs]
@classmethod
def record_format(cls, record: dict) -> str | None:
"""Check and possibly correct a record before inserting it in the
database.
:param dict record:
Dictionary of values to check.
:rtype: ``None`` or :class:`pyramid.i18n.TranslationString`
:return:
``None`` or error message.
"""
for k in [i for i in record if record[i] is None]:
del record[k]
# Rule ID
if not record.get('rule_id'):
return _('Rule ID is missing.')
# Rule type
if not record.get('rule_type'):
record['rule_type'] = 'basic'
# Destination
if not record.get('warehouse_id'):
return _('Rule without warehouse.')
return None
# -------------------------------------------------------------------------
[docs]
def db2xml(self, dbsession: Session | None = None) -> etree.Element:
"""Serialize an user rule to a XML representation.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession: (optional)
SQLAlchemy session.
:rtype: lxml.etree.Element
"""
# pylint: disable = unused-argument
rule_elt = etree.Element('input-rule')
rule_elt.set('id', self.rule_id)
if self.rule_type != 'basic':
rule_elt.set('type', self.rule_type)
if self.priority:
rule_elt.set('priority', str(self.priority))
if self.exclusive:
rule_elt.set('exclusive', 'true')
# Variables
for dbvariable in self.variables:
elt = etree.SubElement(rule_elt, 'variable')
elt.set('name', dbvariable.name)
elt.set('key', dbvariable.key)
if dbvariable.transform:
elt.set('transform', dbvariable.transform)
if dbvariable.args:
elt.set('args', dbvariable.args)
if dbvariable.pattern is not None:
elt.text = dbvariable.pattern
# Conditions
for dbcondition in self.conditions:
elt = etree.SubElement(rule_elt, 'condition')
elt.set('key', dbcondition.key)
elt.text = dbcondition.pattern
# Destination
elt = etree.SubElement(
rule_elt, 'destination', warehouse=self.warehouse_id)
elt.text = self.path
if self.flat:
elt.set('flat', 'true')
return rule_elt
# -------------------------------------------------------------------------
[docs]
def attachments2directory(self, attachments, directory):
"""Copy from attachments directory the file corresponding to the user.
:param str attachments:
Absolute path to the attachments directory.
:param str directory:
The backup directory.
"""
# =============================================================================
[docs]
class DBInputRuleVariable(DBDeclarativeClass):
"""SQLAlchemy-powered input rule variable class."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_input_rules_variables'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(
String(ID_LEN),
ForeignKey('wrh2_input_rules.rule_id', ondelete='CASCADE'),
primary_key=True)
name = Column(String(ID_LEN), primary_key=True)
order = Column(Integer, default=0)
key = Column(String(ID_LEN), nullable=False)
pattern = Column(String(VALUE_LEN))
transform = Column(
Enum(*VARIABLE_TRANSFORM_LABELS.keys(), name='rule_transform'))
args = Column(Text)
# =============================================================================
[docs]
class DBInputRuleCondition(DBDeclarativeClass):
"""SQLAlchemy-powered input rule condition class."""
# pylint: disable = too-few-public-methods
__tablename__ = 'wrh2_input_rules_conditions'
__table_args__ = {'mysql_engine': 'InnoDB'}
rule_id = Column(
String(ID_LEN),
ForeignKey('wrh2_input_rules.rule_id', ondelete='CASCADE'),
primary_key=True)
condition_id = Column(Integer, primary_key=True)
key = Column(String(VALUE_LEN), nullable=False)
pattern = Column(String(VALUE_LEN))