Source code for ciowarehouse2.models.populate

"""Function to import and export database from and into XML files."""

from __future__ import annotations
from collections import OrderedDict

from lxml import etree
from sqlalchemy import desc
from sqlalchemy.orm.session import Session

from chrysalio.models.populate import element2db
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from .dbfield import DBField
from .dbseed import DBSeed
from .dbwarehouse import DBWarehouse
from .dbsharing import DBSharing
from .dbinput import DBInputStream, DBInputRule


# =============================================================================
[docs] def xml2db( dbsession: Session, root_elt: etree.Element, only: str | None = None, error_if_exists: bool = True, modules: OrderedDict | None = None) -> list: """Load an XML configuration file for an included module. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type root_elt: lxml.etree.Element :param root_elt: XML element with the namespace of the module. :param str only: (optional) If not ``None``, only the items of type ``only`` are loaded. :param bool error_if_exists: (default=True) It returns an error if an item already exists. :type modules: collections.OrderedDict :param modules: (optional) Dictionary of modules to use to complete the loading. :rtype: list :return: A list of error messages. """ # pylint: disable = unused-argument # Fields errors = element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'field', 'class': DBField, 'relaxng': RELAXNG_CIOWAREHOUSE2 }) fields = [k[0] for k in dbsession.query(DBField.field_id)] # Seeds errors = element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'seed', 'class': DBSeed, 'relaxng': RELAXNG_CIOWAREHOUSE2 }) seeds = [k[0] for k in dbsession.query(DBSeed.seed_id)] # Warehouses users = dict(dbsession.query(DBUser.login, DBUser.user_id)) groups = [k[0] for k in dbsession.query(DBGroup.group_id)] jobs = [k[0] for k in dbsession.query(DBJob.job_id)] errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'warehouse', 'class': DBWarehouse, 'relaxng': RELAXNG_CIOWAREHOUSE2, 'kwargs': { 'fields': fields, 'seeds': seeds, 'users': users, 'groups': groups, 'jobs': jobs } }) warehouse_ids = [k[0] for k in dbsession.query(DBWarehouse.warehouse_id)] # Inputs errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'input-stream', 'class': DBInputStream, 'relaxng': RELAXNG_CIOWAREHOUSE2 }) errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'input-rule', 'class': DBInputRule, 'relaxng': RELAXNG_CIOWAREHOUSE2 }) # Sharings errors += element2db( dbsession, root_elt, only, error_if_exists, { 'tag': 'sharing', 'class': DBSharing, 'relaxng': RELAXNG_CIOWAREHOUSE2, 'kwargs': { 'warehouse_ids': warehouse_ids } }) return [k for k in errors if k is not None]
# ============================================================================= # pylint: disable = unused-argument
[docs] def db2xml(dbsession: Session, root_elt: etree.Element): """Fill ``root_elt`` with the XML configuration of the module. :type dbsession: sqlalchemy.orm.session.Session :param dbsession: SQLAlchemy session. :type root_elt: lxml.etree.Element :param root_elt: XML element with the namespace of the module. """ # Fields dbfields = dbsession.query(DBField).order_by('position').all() if dbfields: root_elt.append(etree.Comment('{0:.^64}'.format('fields'))) group_elt = etree.SubElement(root_elt, 'fields') for dbitem in dbfields: group_elt.append(dbitem.db2xml()) # Seeds dbseeds = dbsession.query(DBSeed).all() if dbseeds: root_elt.append(etree.Comment('{0:.^64}'.format('seeds'))) group_elt = etree.SubElement(root_elt, 'seeds') for dbitem in dbseeds: group_elt.append(dbitem.db2xml()) # Warehouses dbwarehouses = dbsession.query(DBWarehouse).order_by('warehouse_id').all() if dbwarehouses: root_elt.append(etree.Comment('{0:.^64}'.format('warehouses'))) group_elt = etree.SubElement(root_elt, 'warehouses') for dbitem in dbwarehouses: group_elt.append(dbitem.db2xml(dbsession)) # Inputs dbinputs = dbsession.query(DBInputStream).order_by('stream_id').all() if dbinputs: root_elt.append(etree.Comment('{0:.^64}'.format('input-streams'))) group_elt = etree.SubElement(root_elt, 'input-streams') for dbitem in dbinputs: group_elt.append(dbitem.db2xml()) dbinputs = dbsession.query(DBInputRule).order_by(desc('priority')).all() if dbinputs: root_elt.append(etree.Comment('{0:.^64}'.format('input-rules'))) group_elt = etree.SubElement(root_elt, 'input-rules') for dbitem in dbinputs: group_elt.append(dbitem.db2xml()) # Sharings dbsharings = dbsession.query(DBSharing).order_by('sharing_id').all() if dbsharings: root_elt.append(etree.Comment('{0:.^64}'.format('sharings'))) group_elt = etree.SubElement(root_elt, 'sharings') for dbitem in dbsharings: group_elt.append(dbitem.db2xml())