Source code for ciowarehouse2.inputs.stream_file
"""Class for file input stream."""
from __future__ import annotations
from os import walk
from os.path import join, relpath, dirname, normpath
from datetime import datetime
from cioservice.lib.build import Build
from ..lib.utils import EXCLUDED_FILES
from ..models.dbinput import DBInputStream
from . import InputStream
# =============================================================================
[docs]
class InputStreamFile(InputStream):
"""Class to manage file input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs]
def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple:
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
# Check input
input_wrh = self._get_warehouse(build, dbstream.host)
if input_wrh is None:
return ()
# Prepare archive
archive_wrh, archive_ciopath = self.archive(build, dbstream)
if archive_wrh is None or archive_ciopath is None:
return ()
self.archive_clean(build, dbstream, archive_wrh, archive_ciopath)
# Loop over documents
input_absdir = normpath(join(input_wrh.root, dbstream.path or '.'))
if not input_absdir.startswith(input_wrh.root):
return ()
archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \
or archive_wrh.root
documents = []
for path, dirs, files in walk(input_absdir):
for name in dirs:
if name in EXCLUDED_FILES:
dirs.remove(name)
for name in files:
if name in EXCLUDED_FILES:
continue
source = join(path, name)
documents.append({ # yapf: disable
'stream-id': dbstream.stream_id,
'stream-type': dbstream.stream_type,
'from': dbstream.host,
'to': dbstream.stream_id,
'datetime': datetime.now().isoformat(' '),
'filename': name,
'dirname': dirname(relpath(source, input_absdir)),
'path': relpath(source, input_absdir),
'source': source})
# Archive
if documents:
self.archive_files(
build, dbstream, archive_wrh, archive_absdir, input_absdir)
# Clean up input directory
if build.settings.get('develop') != 'true':
self.empty_input_directory(
build, dbstream, input_wrh, input_absdir)
return documents