Source code for ciowarehouse2.inputs.stream_ftp
"""Class for FTP input stream."""
from __future__ import annotations
from os import walk
from os.path import join, dirname, exists, relpath
from shutil import move
from datetime import datetime
from chrysalio.lib.utils import decrypt
from chrysalio.lib.ftp import Ftp
from cioservice.lib.build import Build
from ..models.dbinput import DBInputStream
from . import EXCLUDED_FILES, InputStream
NEW_DIR = 'New'
# =============================================================================
[docs]
class InputStreamFtp(InputStream):
"""Class to manage FTP input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs]
def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple:
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
# Prepare archive
archive_wrh, archive_ciopath = self.archive(build, dbstream)
if archive_wrh is None or archive_ciopath is None:
return ()
self.archive_clean(build, dbstream, archive_wrh, archive_ciopath)
# Ftp connection
ftp = Ftp(build.error)
if not ftp.connect(
{'ftp_host': dbstream.host, 'ftp_path': dbstream.path, 'ftp_port':
dbstream.port or 21, 'ftp_ssl': dbstream.ssl, 'ftp_user':
dbstream.user, 'ftp_password': decrypt(
dbstream.password, 'warehouse'), 'ftp_pasv': True}):
return ()
# Retrieve files
archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \
or archive_wrh.root
new_dir = join(dirname(archive_absdir), NEW_DIR)
if not ftp.download(new_dir, EXCLUDED_FILES) or not exists(new_dir):
return ()
move(new_dir, archive_absdir)
dirs, files = ftp.list_directory()
if build.settings.get('develop') != 'true':
for name in dirs:
if not ftp.rmtree(name): # pragma: nocover
return ()
for name in files:
if not ftp.delete(name): # pragma: nocover
return ()
# Register documents
documents = []
for root, ignored_, files in walk(archive_absdir):
for name in files:
source = join(root, name)
documents.append({ # yapf: disable
'stream-id': dbstream.stream_id,
'stream-type': dbstream.stream_type,
'from': dbstream.host,
'to': dbstream.stream_id,
'datetime': datetime.now().isoformat(' '),
'filename': name,
'dirname': dirname(relpath(source, archive_absdir)),
'path': relpath(source, archive_absdir),
'source': source})
# Archive
if documents:
self.archive_files(build, dbstream, archive_wrh, archive_absdir)
return documents