Source code for ciowarehouse2.inputs.stream_ftp

"""Class for FTP input stream."""

from __future__ import annotations
from os import walk
from os.path import join, dirname, exists, relpath
from shutil import move
from datetime import datetime

from chrysalio.lib.utils import decrypt
from chrysalio.lib.ftp import Ftp
from cioservice.lib.build import Build
from ..models.dbinput import DBInputStream
from . import EXCLUDED_FILES, InputStream

NEW_DIR = 'New'


# =============================================================================
[docs] class InputStreamFtp(InputStream): """Class to manage FTP input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple: """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ # Prepare archive archive_wrh, archive_ciopath = self.archive(build, dbstream) if archive_wrh is None or archive_ciopath is None: return () self.archive_clean(build, dbstream, archive_wrh, archive_ciopath) # Ftp connection ftp = Ftp(build.error) if not ftp.connect( {'ftp_host': dbstream.host, 'ftp_path': dbstream.path, 'ftp_port': dbstream.port or 21, 'ftp_ssl': dbstream.ssl, 'ftp_user': dbstream.user, 'ftp_password': decrypt( dbstream.password, 'warehouse'), 'ftp_pasv': True}): return () # Retrieve files archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \ or archive_wrh.root new_dir = join(dirname(archive_absdir), NEW_DIR) if not ftp.download(new_dir, EXCLUDED_FILES) or not exists(new_dir): return () move(new_dir, archive_absdir) dirs, files = ftp.list_directory() if build.settings.get('develop') != 'true': for name in dirs: if not ftp.rmtree(name): # pragma: nocover return () for name in files: if not ftp.delete(name): # pragma: nocover return () # Register documents documents = [] for root, ignored_, files in walk(archive_absdir): for name in files: source = join(root, name) documents.append({ # yapf: disable 'stream-id': dbstream.stream_id, 'stream-type': dbstream.stream_type, 'from': dbstream.host, 'to': dbstream.stream_id, 'datetime': datetime.now().isoformat(' '), 'filename': name, 'dirname': dirname(relpath(source, archive_absdir)), 'path': relpath(source, archive_absdir), 'source': source}) # Archive if documents: self.archive_files(build, dbstream, archive_wrh, archive_absdir) return documents