Source code for ciowarehouse2.services.inputs

"""A service to periodically retrieve inputs (file, FTP, email), apply rules
and possibly import new documents."""

from __future__ import annotations
from os.path import exists

from transaction import manager
from sqlalchemy import desc

from pyramid.config import Configurator

from chrysalio.models import get_tm_dbsession
from chrysalio.scripts import ScriptRegistry
from cioservice.lib.service import Service
from cioservice.lib.build import Build
from ..lib.i18n import _
from ..lib.warehouse import Warehouse
from ..inputs.stream_file import InputStreamFile
from ..inputs.stream_ftp import InputStreamFtp
from ..inputs.stream_email import InputStreamEmail
from ..inputs.rule_basic import InputRuleBasic
from ..models.dbwarehouse import DBWarehouse
from ..models.dbinput import DBInputStream, DBInputRule


# =============================================================================
[docs] def includeme(configurator: Configurator): """Function to include `inputs` service. :type configurator: pyramid.config.Configurator :param configurator: Object used to do configuration declaration within the application. """ Service.register(configurator, ServiceInputs)
# =============================================================================
[docs] class ServiceInputs(Service): """Class to manage `inputs` service. See: :class:`chrysalio.lib.service.Service` This service loops over input streams. Each stream returns a list of documents. Documents are submitted to the input rules. The first rule that matches with a document determines its destination. Regardless of its origin, a document is represented by a dictionary with the following keys: * ``'stream-id'`` * ``'stream-type'`` * ``'from'`` * ``'to'`` * ``'title'`` * ``'organization'`` * ``'language'`` * ``'datetime'`` * ``'path'`` * ``'filename'`` * ``'dirname'`` * ``'source'`` * ``'body'`` """ label = _('Inputs service') archives: dict = {} _warehouses: dict = {} # ------------------------------------------------------------------------- def __init__(self, registry: ScriptRegistry): """Constructor method.""" super().__init__(registry) if 'input_streams' not in self._registry: self._registry['input_streams'] = {} self._registry['input_streams']['file'] = InputStreamFile( registry, self.get_warehouse) self._registry['input_streams']['ftp'] = InputStreamFtp( registry, self.get_warehouse) self._registry['input_streams']['email'] = InputStreamEmail( registry, self.get_warehouse) if 'input_rules' not in self._registry: self._registry['input_rules'] = {} self._registry['input_rules']['basic'] = InputRuleBasic # -------------------------------------------------------------------------
[docs] def get_warehouse( self, build: Build, warehouse_id: str) -> Warehouse | None: """Return the warehouse with ID ``warehouse_id`` or ``None``. :type build: cioservice.lib.build.Build :param build: Current build object. :param str warehouse_id: ID of the warehouse to return. :rtype: :class:`.lib.warehouse.Warehouse` or ``None`` """ if not warehouse_id: return None if warehouse_id in self._warehouses: return self._warehouses[warehouse_id] locations = self._registry['modules']['ciowarehouse2'].locations dbwarehouse = build.dbsession.query(DBWarehouse).filter_by( warehouse_id=warehouse_id).first() if dbwarehouse is None or dbwarehouse.location not in locations: return None warehouse = Warehouse(self._registry, dbwarehouse, locations) if not exists(warehouse.root): return None self._warehouses[warehouse_id] = warehouse return self._warehouses[warehouse_id]
# ------------------------------------------------------------------------- def _run(self, build: Build): """Execute the service on the build ``build``. See: :meth:`chrysalio.lib.service.Service._run` """ # Find the DB session if build.dbsession is not None: self._browse_input_streams(build) self.write_traces(build) elif self._registry.get('dbsession_factory'): with manager: build.dbsession = get_tm_dbsession( self._registry['dbsession_factory'], manager) self._browse_input_streams(build) self.write_traces(build) # ------------------------------------------------------------------------- def _browse_input_streams(self, build: Build): """Load rules and browse all input streams. :type build: cioservice.lib.build.Build :param build: Current build object. """ # Load rules rules = [] for dbrule in build.dbsession.query(DBInputRule).order_by( desc('priority')): if dbrule.rule_type in self._registry['input_rules']: rules.append( self._registry['input_rules'][dbrule.rule_type]( self._registry, self.get_warehouse, dbrule)) else: build.error( _('Unknown type of rule "${r}"', {'r': dbrule.rule_type})) if not rules: return # Loop over input streams for dbstream in build.dbsession.query(DBInputStream).order_by( 'stream_id'): if dbstream.stream_type not in self._registry['input_streams']: build.error(_( # yapf: disable 'Unknown type of stream "${s}".', {'s': dbstream.stream_type})) continue for document in self._registry['input_streams'][ dbstream.stream_type].documents(build, dbstream): for rule in rules: applied = rule.apply(build, document) if applied and rule.exclusive: break