Source code for ciowarehouse2.inputs

"""A collection of input stream and input rules."""

from __future__ import annotations
from os import scandir
from os.path import join, exists, getmtime, normpath, relpath
from logging import getLogger
from datetime import datetime, timedelta
from time import time

from chrysalio.lib.utils import copy_content
from chrysalio.scripts import ScriptRegistry
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES
from ..lib.ciopath import CioPath
from ..lib.warehouse import Warehouse
from ..models.dbinput import DBInputStream

LOG = getLogger(__name__)
ARCHIVE_DATETIME_FORMAT = '%y%m%d-%H%M%S'
ARCHIVE_CLEAN_FLAG = 'clean'
ARCHIVE_CLEAN_PERIOD = 3600 * 24


# =============================================================================
[docs] class InputStream(): """Base class to manage input stream. :type registry: chrysalio.scripts.ScriptRegistry :param registry: Application registry. :param get_warehouse: Function to retrieve a warehouse. """ # ------------------------------------------------------------------------- def __init__(self, registry: ScriptRegistry, get_warehouse): """Constructor method.""" self._get_warehouse = get_warehouse self._backend = registry['modules']['ciowarehouse2'].backend # -------------------------------------------------------------------------
[docs] def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple: """Retrieve documents. :type build: cioservice.lib.build.Build :param build: Current build object. :type dbstream: .models.dbinput.DBInputStream :param dbstream: SqlAlchemy object to build this input stream. :rtype: list """ # pylint: disable = unused-argument return ()
# -------------------------------------------------------------------------
[docs] def archive( self, build: Build, dbstream: DBInputStream ) -> tuple[Warehouse | None, CioPath | None]: """Return the warehouse object for archive and the `CioPath` to the archive directory. :type build: cioservice.lib.build.Build :param build: Current build object. :type dbsteam: .models.dbinput.DBInputStream :param dbsteam: SqlAlchemy object to build this input stream. :rtype: tuple :return: A tuple such as ``(archive_warehouse, archive_ciopath)``. """ archive_wrh = self._get_warehouse(build, dbstream.archive_id) if archive_wrh is None: build.error( _('Unknown warehouse "${w}"', {'w': dbstream.archive_id})) return None, None archive_ciopath = CioPath( # yapf: disable archive_wrh.uid, normpath(join( dbstream.archive_path or '.', datetime.now().strftime(ARCHIVE_DATETIME_FORMAT))), True) return archive_wrh, archive_ciopath
# -------------------------------------------------------------------------
[docs] def archive_files( self, build: Build, dbstream: DBInputStream, archive_wrh: Warehouse, archive_absdir: str, input_absdir: str | None = None): """Save a copy of ``input_absdir`` into ``archive_absdir``, add ``archive_absdir`` to the repository, commit the action and refresh the archive warehouse. :type build: cioservice.lib.build.Build :param build: Current build object. :type dbsteam: .models.dbinput.DBInputStream :param dbsteam: SqlAlchemy object to build this input stream. :type archive_wrh: .lib.warehouse.Warehouse :param archive_wrh: Warehouse object for archive. :param str archive_absdir: Absolute path to the directory for these files in the archives. :param str input_absdir: (optional) Absolute path to the directory to copy. """ # Possibly copy input directory into archive directory if input_absdir is not None and exists(input_absdir): copy_content(input_absdir, archive_absdir, exclude=EXCLUDED_FILES) # Commit archive archive_ciopath = CioPath( archive_wrh.uid, relpath(archive_absdir, archive_wrh.root), True) archive_wrh.vcs.add() err = archive_wrh.vcs.commit( translate(_('Automatic archiving'), lang=build.lang), str(dbstream.stream_id)) if err is not None: # pragma: nocover build.error(err) else: build.info(translate( # yapf: disable _('Archive "${a}" created.', {'a': str(archive_ciopath)}), lang=build.lang)) # Refresh archive err = self._backend.index(archive_wrh, [archive_ciopath.path]) if err is not None: # pragma: nocover build.error(err)
# -------------------------------------------------------------------------
[docs] def archive_clean( self, build: Build, dbstream: DBInputStream, archive_wrh: Warehouse, archive_ciopath: CioPath): """Loop over archives and remove old ones. :type build: cioservice.lib.build.Build :param build: Current build object. :type dbsteam: .models.dbinput.DBInputStream :param dbsteam: SqlAlchemy object to build this input stream. :type archive_wrh: .lib.warehouse.Warehouse :param archive_wrh: Warehouse object for archive. :type archive_ciopath: .lib.ciopath.CioPath :param archive_ciopath: `CioPath` to the archive directory. """ archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \ or archive_wrh.root clean_flag = join(archive_absdir, ARCHIVE_CLEAN_FLAG) period = min(ARCHIVE_CLEAN_PERIOD, 3600*int(dbstream.archive_ttl)) \ if dbstream.archive_ttl else ARCHIVE_CLEAN_PERIOD if not dbstream.archive_ttl or not exists(archive_absdir) or ( exists(clean_flag) and getmtime(clean_flag) > time() - period): return found = False deadline = ( datetime.now() - timedelta(hours=int(dbstream.archive_ttl)) ).strftime(ARCHIVE_DATETIME_FORMAT) for entry in scandir(archive_absdir): if entry.name not in EXCLUDED_FILES and entry.is_dir() \ and entry.name < deadline: subdir_ciopath = CioPath( archive_wrh.uid, relpath(entry.path, archive_wrh.root), True) err = archive_wrh.vcs.remove(subdir_ciopath) if err is not None: # pragma: nocover build.error(err) else: build.info(translate(_( # yapf: disable 'Archive "${a}" cleaned.', {'a': str(subdir_ciopath)}), lang=build.lang)) found = True with open(clean_flag, 'wb'): pass if not found: return err = archive_wrh.vcs.add(archive_ciopath) if err is not None: # pragma: nocover build.error(err) err = archive_wrh.vcs.commit( translate(_('Automatic archive cleaning up'), lang=build.lang), str(dbstream.stream_id)) if err is not None: # pragma: nocover build.error(err) err = self._backend.index(archive_wrh, [archive_ciopath.path]) if err is not None: # pragma: nocover build.error(err)
# -------------------------------------------------------------------------
[docs] def empty_input_directory( self, build: Build, dbstream: DBInputStream, input_wrh: Warehouse, input_absdir: str): """Empty an input directory. :type build: cioservice.lib.build.Build :param build: Current build object. :type dbsteam: .models.dbinput.DBInputStream :param dbsteam: SqlAlchemy object to build this input stream. :type input_wrh: .lib.warehouse.Warehouse :param input_wrh: Object for the warehouse containing the directory. :param str input_absdir: Absolute path to the directory. """ for entry in scandir(input_absdir): if entry.name not in EXCLUDED_FILES: ciopath = CioPath( input_wrh.uid, relpath(entry.path, input_wrh.root), entry.is_dir()) input_wrh.vcs.remove(ciopath) input_ciopath = CioPath( input_wrh.uid, relpath(input_absdir, input_wrh.root), True) err = input_wrh.vcs.add(input_ciopath) if err is not None: # pragma: nocover build.error(err) err = input_wrh.vcs.commit( translate(_('Automatic cleaning up'), lang=build.lang), str(dbstream.stream_id)) if err is not None: # pragma: nocover build.error(err) err = self._backend.index(input_wrh, [input_ciopath.path]) if err is not None: # pragma: nocover build.error(err)