Source code for ciowarehouse2.inputs
"""A collection of input stream and input rules."""
from __future__ import annotations
from os import scandir
from os.path import join, exists, getmtime, normpath, relpath
from logging import getLogger
from datetime import datetime, timedelta
from time import time
from chrysalio.lib.utils import copy_content
from chrysalio.scripts import ScriptRegistry
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import EXCLUDED_FILES
from ..lib.ciopath import CioPath
from ..lib.warehouse import Warehouse
from ..models.dbinput import DBInputStream
LOG = getLogger(__name__)
ARCHIVE_DATETIME_FORMAT = '%y%m%d-%H%M%S'
ARCHIVE_CLEAN_FLAG = 'clean'
ARCHIVE_CLEAN_PERIOD = 3600 * 24
# =============================================================================
[docs]
class InputStream():
"""Base class to manage input stream.
:type registry: chrysalio.scripts.ScriptRegistry
:param registry:
Application registry.
:param get_warehouse:
Function to retrieve a warehouse.
"""
# -------------------------------------------------------------------------
def __init__(self, registry: ScriptRegistry, get_warehouse):
"""Constructor method."""
self._get_warehouse = get_warehouse
self._backend = registry['modules']['ciowarehouse2'].backend
# -------------------------------------------------------------------------
[docs]
def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple:
"""Retrieve documents.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbstream: .models.dbinput.DBInputStream
:param dbstream:
SqlAlchemy object to build this input stream.
:rtype: list
"""
# pylint: disable = unused-argument
return ()
# -------------------------------------------------------------------------
[docs]
def archive(
self, build: Build, dbstream: DBInputStream
) -> tuple[Warehouse | None, CioPath | None]:
"""Return the warehouse object for archive and the `CioPath` to the
archive directory.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:rtype: tuple
:return:
A tuple such as ``(archive_warehouse, archive_ciopath)``.
"""
archive_wrh = self._get_warehouse(build, dbstream.archive_id)
if archive_wrh is None:
build.error(
_('Unknown warehouse "${w}"', {'w': dbstream.archive_id}))
return None, None
archive_ciopath = CioPath( # yapf: disable
archive_wrh.uid,
normpath(join(
dbstream.archive_path or '.',
datetime.now().strftime(ARCHIVE_DATETIME_FORMAT))),
True)
return archive_wrh, archive_ciopath
# -------------------------------------------------------------------------
[docs]
def archive_files(
self,
build: Build,
dbstream: DBInputStream,
archive_wrh: Warehouse,
archive_absdir: str,
input_absdir: str | None = None):
"""Save a copy of ``input_absdir`` into ``archive_absdir``, add
``archive_absdir`` to the repository, commit the action and refresh
the archive warehouse.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:type archive_wrh: .lib.warehouse.Warehouse
:param archive_wrh:
Warehouse object for archive.
:param str archive_absdir:
Absolute path to the directory for these files in the archives.
:param str input_absdir: (optional)
Absolute path to the directory to copy.
"""
# Possibly copy input directory into archive directory
if input_absdir is not None and exists(input_absdir):
copy_content(input_absdir, archive_absdir, exclude=EXCLUDED_FILES)
# Commit archive
archive_ciopath = CioPath(
archive_wrh.uid, relpath(archive_absdir, archive_wrh.root), True)
archive_wrh.vcs.add()
err = archive_wrh.vcs.commit(
translate(_('Automatic archiving'), lang=build.lang),
str(dbstream.stream_id))
if err is not None: # pragma: nocover
build.error(err)
else:
build.info(translate( # yapf: disable
_('Archive "${a}" created.', {'a': str(archive_ciopath)}),
lang=build.lang))
# Refresh archive
err = self._backend.index(archive_wrh, [archive_ciopath.path])
if err is not None: # pragma: nocover
build.error(err)
# -------------------------------------------------------------------------
[docs]
def archive_clean(
self, build: Build, dbstream: DBInputStream,
archive_wrh: Warehouse, archive_ciopath: CioPath):
"""Loop over archives and remove old ones.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:type archive_wrh: .lib.warehouse.Warehouse
:param archive_wrh:
Warehouse object for archive.
:type archive_ciopath: .lib.ciopath.CioPath
:param archive_ciopath:
`CioPath` to the archive directory.
"""
archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \
or archive_wrh.root
clean_flag = join(archive_absdir, ARCHIVE_CLEAN_FLAG)
period = min(ARCHIVE_CLEAN_PERIOD, 3600*int(dbstream.archive_ttl)) \
if dbstream.archive_ttl else ARCHIVE_CLEAN_PERIOD
if not dbstream.archive_ttl or not exists(archive_absdir) or (
exists(clean_flag) and getmtime(clean_flag) > time() - period):
return
found = False
deadline = (
datetime.now() - timedelta(hours=int(dbstream.archive_ttl))
).strftime(ARCHIVE_DATETIME_FORMAT)
for entry in scandir(archive_absdir):
if entry.name not in EXCLUDED_FILES and entry.is_dir() \
and entry.name < deadline:
subdir_ciopath = CioPath(
archive_wrh.uid, relpath(entry.path, archive_wrh.root),
True)
err = archive_wrh.vcs.remove(subdir_ciopath)
if err is not None: # pragma: nocover
build.error(err)
else:
build.info(translate(_( # yapf: disable
'Archive "${a}" cleaned.',
{'a': str(subdir_ciopath)}), lang=build.lang))
found = True
with open(clean_flag, 'wb'):
pass
if not found:
return
err = archive_wrh.vcs.add(archive_ciopath)
if err is not None: # pragma: nocover
build.error(err)
err = archive_wrh.vcs.commit(
translate(_('Automatic archive cleaning up'), lang=build.lang),
str(dbstream.stream_id))
if err is not None: # pragma: nocover
build.error(err)
err = self._backend.index(archive_wrh, [archive_ciopath.path])
if err is not None: # pragma: nocover
build.error(err)
# -------------------------------------------------------------------------
[docs]
def empty_input_directory(
self, build: Build, dbstream: DBInputStream, input_wrh: Warehouse,
input_absdir: str):
"""Empty an input directory.
:type build: cioservice.lib.build.Build
:param build:
Current build object.
:type dbsteam: .models.dbinput.DBInputStream
:param dbsteam:
SqlAlchemy object to build this input stream.
:type input_wrh: .lib.warehouse.Warehouse
:param input_wrh:
Object for the warehouse containing the directory.
:param str input_absdir:
Absolute path to the directory.
"""
for entry in scandir(input_absdir):
if entry.name not in EXCLUDED_FILES:
ciopath = CioPath(
input_wrh.uid, relpath(entry.path, input_wrh.root),
entry.is_dir())
input_wrh.vcs.remove(ciopath)
input_ciopath = CioPath(
input_wrh.uid, relpath(input_absdir, input_wrh.root), True)
err = input_wrh.vcs.add(input_ciopath)
if err is not None: # pragma: nocover
build.error(err)
err = input_wrh.vcs.commit(
translate(_('Automatic cleaning up'), lang=build.lang),
str(dbstream.stream_id))
if err is not None: # pragma: nocover
build.error(err)
err = self._backend.index(input_wrh, [input_ciopath.path])
if err is not None: # pragma: nocover
build.error(err)