"""Function to import and export database from and into XML files."""
from __future__ import annotations
from collections import OrderedDict
from lxml import etree
from sqlalchemy import desc
from sqlalchemy.orm.session import Session
from chrysalio.models.populate import element2db
from chrysalio.models.dbuser import DBUser
from chrysalio.models.dbgroup import DBGroup
from cioservice.models.dbjob import DBJob
from ..relaxng import RELAXNG_CIOWAREHOUSE2
from .dbfield import DBField
from .dbseed import DBSeed
from .dbwarehouse import DBWarehouse
from .dbsharing import DBSharing
from .dbinput import DBInputStream, DBInputRule
# =============================================================================
[docs]
def xml2db(
dbsession: Session,
root_elt: etree.Element,
only: str | None = None,
error_if_exists: bool = True,
modules: OrderedDict | None = None) -> list:
"""Load an XML configuration file for an included module.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type root_elt: lxml.etree.Element
:param root_elt:
XML element with the namespace of the module.
:param str only: (optional)
If not ``None``, only the items of type ``only`` are loaded.
:param bool error_if_exists: (default=True)
It returns an error if an item already exists.
:type modules: collections.OrderedDict
:param modules: (optional)
Dictionary of modules to use to complete the loading.
:rtype: list
:return:
A list of error messages.
"""
# pylint: disable = unused-argument
# Fields
errors = element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'field',
'class': DBField,
'relaxng': RELAXNG_CIOWAREHOUSE2
})
fields = [k[0] for k in dbsession.query(DBField.field_id)]
# Seeds
errors = element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'seed',
'class': DBSeed,
'relaxng': RELAXNG_CIOWAREHOUSE2
})
seeds = [k[0] for k in dbsession.query(DBSeed.seed_id)]
# Warehouses
users = dict(dbsession.query(DBUser.login, DBUser.user_id))
groups = [k[0] for k in dbsession.query(DBGroup.group_id)]
jobs = [k[0] for k in dbsession.query(DBJob.job_id)]
errors += element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'warehouse',
'class': DBWarehouse,
'relaxng': RELAXNG_CIOWAREHOUSE2,
'kwargs': {
'fields': fields,
'seeds': seeds,
'users': users,
'groups': groups,
'jobs': jobs
}
})
warehouse_ids = [k[0] for k in dbsession.query(DBWarehouse.warehouse_id)]
# Inputs
errors += element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'input-stream',
'class': DBInputStream,
'relaxng': RELAXNG_CIOWAREHOUSE2
})
errors += element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'input-rule',
'class': DBInputRule,
'relaxng': RELAXNG_CIOWAREHOUSE2
})
# Sharings
errors += element2db(
dbsession, root_elt, only, error_if_exists, {
'tag': 'sharing',
'class': DBSharing,
'relaxng': RELAXNG_CIOWAREHOUSE2,
'kwargs': {
'warehouse_ids': warehouse_ids
}
})
return [k for k in errors if k is not None]
# =============================================================================
# pylint: disable = unused-argument
[docs]
def db2xml(dbsession: Session, root_elt: etree.Element):
"""Fill ``root_elt`` with the XML configuration of the module.
:type dbsession: sqlalchemy.orm.session.Session
:param dbsession:
SQLAlchemy session.
:type root_elt: lxml.etree.Element
:param root_elt:
XML element with the namespace of the module.
"""
# Fields
dbfields = dbsession.query(DBField).order_by('position').all()
if dbfields:
root_elt.append(etree.Comment('{0:.^64}'.format('fields')))
group_elt = etree.SubElement(root_elt, 'fields')
for dbitem in dbfields:
group_elt.append(dbitem.db2xml())
# Seeds
dbseeds = dbsession.query(DBSeed).all()
if dbseeds:
root_elt.append(etree.Comment('{0:.^64}'.format('seeds')))
group_elt = etree.SubElement(root_elt, 'seeds')
for dbitem in dbseeds:
group_elt.append(dbitem.db2xml())
# Warehouses
dbwarehouses = dbsession.query(DBWarehouse).order_by('warehouse_id').all()
if dbwarehouses:
root_elt.append(etree.Comment('{0:.^64}'.format('warehouses')))
group_elt = etree.SubElement(root_elt, 'warehouses')
for dbitem in dbwarehouses:
group_elt.append(dbitem.db2xml(dbsession))
# Inputs
dbinputs = dbsession.query(DBInputStream).order_by('stream_id').all()
if dbinputs:
root_elt.append(etree.Comment('{0:.^64}'.format('input-streams')))
group_elt = etree.SubElement(root_elt, 'input-streams')
for dbitem in dbinputs:
group_elt.append(dbitem.db2xml())
dbinputs = dbsession.query(DBInputRule).order_by(desc('priority')).all()
if dbinputs:
root_elt.append(etree.Comment('{0:.^64}'.format('input-rules')))
group_elt = etree.SubElement(root_elt, 'input-rules')
for dbitem in dbinputs:
group_elt.append(dbitem.db2xml())
# Sharings
dbsharings = dbsession.query(DBSharing).order_by('sharing_id').all()
if dbsharings:
root_elt.append(etree.Comment('{0:.^64}'.format('sharings')))
group_elt = etree.SubElement(root_elt, 'sharings')
for dbitem in dbsharings:
group_elt.append(dbitem.db2xml())