Source code for ciowarehouse2.inputs.stream_file

"""Class for file input stream."""

from __future__ import annotations
from os import walk
from os.path import join, relpath, dirname, normpath
from datetime import datetime

from cioservice.lib.build import Build
from ..lib.utils import EXCLUDED_FILES
from ..models.dbinput import DBInputStream
from . import InputStream


# =============================================================================
[docs] class InputStreamFile(InputStream): """Class to manage file input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple: """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ # Check input input_wrh = self._get_warehouse(build, dbstream.host) if input_wrh is None: return () # Prepare archive archive_wrh, archive_ciopath = self.archive(build, dbstream) if archive_wrh is None or archive_ciopath is None: return () self.archive_clean(build, dbstream, archive_wrh, archive_ciopath) # Loop over documents input_absdir = normpath(join(input_wrh.root, dbstream.path or '.')) if not input_absdir.startswith(input_wrh.root): return () archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \ or archive_wrh.root documents = [] for path, dirs, files in walk(input_absdir): for name in dirs: if name in EXCLUDED_FILES: dirs.remove(name) for name in files: if name in EXCLUDED_FILES: continue source = join(path, name) documents.append({ # yapf: disable 'stream-id': dbstream.stream_id, 'stream-type': dbstream.stream_type, 'from': dbstream.host, 'to': dbstream.stream_id, 'datetime': datetime.now().isoformat(' '), 'filename': name, 'dirname': dirname(relpath(source, input_absdir)), 'path': relpath(source, input_absdir), 'source': source}) # Archive if documents: self.archive_files( build, dbstream, archive_wrh, archive_absdir, input_absdir) # Clean up input directory if build.settings.get('develop') != 'true': self.empty_input_directory( build, dbstream, input_wrh, input_absdir) return documents