Source code for ciowarehouse2.inputs.stream_email

"""Class for email input stream."""

from __future__ import annotations
from os import makedirs
from os.path import join, exists
from imaplib import IMAP4, IMAP4_SSL
from email import message_from_string
from email.utils import parsedate_tz, mktime_tz
from email.header import decode_header
from email.message import Message
from mimetypes import guess_extension
from datetime import datetime
from socket import gaierror

from chrysalio.lib.utils import decrypt
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import normalize_filename
from ..models.dbinput import DBInputStream
from . import InputStream

MAIL_DATETIME_FORMAT = '%y%m%d%H%M%S'


# =============================================================================
[docs] class InputStreamEmail(InputStream): """Class to manage email input stream. See: :class:`.inputs.InputStream` """ # -------------------------------------------------------------------------
[docs] def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple: """Retrieve documents. See: :meth:`.inputs.InputStream.documents` """ # Prepare archive archive_wrh, archive_ciopath = self.archive(build, dbstream) if archive_wrh is None or archive_ciopath is None: return () self.archive_clean(build, dbstream, archive_wrh, archive_ciopath) # Connect to the IMAP server try: imap = IMAP4_SSL(str(dbstream.host), int(dbstream.port or 993)) \ if dbstream.ssl \ else IMAP4(str(dbstream.host), int(dbstream.port or 143)) if dbstream.user: imap.login( str(dbstream.user), decrypt(dbstream.password, 'warehouse')) except (gaierror, IMAP4.error, OSError) as error: build.error(error) return () # Select mailbox status, nums = imap.select(str(dbstream.path or 'INBOX')) if status != 'OK': build.error(translate(_( # yapf: disable '${i}: unable to check mailbox "${m}"', { 'i': dbstream.stream_id, 'm': dbstream.path or 'INBOX' }), lang=build.lang)) imap.logout() return () if nums[0] is None or not int(nums[0]): imap.logout() return () # Retrieve message numbers status, nums = imap.search(None, 'ALL') if status != 'OK' or nums[0] is None: # pragma: nocover imap.logout() return () # Loop over messages documents = [] archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \ or archive_wrh.root for message_num in nums[0].split(): data = imap.fetch(message_num.decode('utf8'), '(RFC822)')[1][0] if data is None: # pragma: nocover continue text = data[1].decode('utf8') \ if isinstance(data[1], bytes) else str(data[1]) document = self._archive_message( message_from_string(text), int(message_num), archive_absdir) if build.settings.get('develop') != 'true': imap.store(message_num.decode('utf8'), '+FLAGS', '\\Deleted') if document: document['stream-id'] = dbstream.stream_id document['stream-type'] = dbstream.stream_type documents.append(document) imap.expunge() imap.close() imap.logout() # Archive if documents: self.archive_files(build, dbstream, archive_wrh, archive_absdir) return documents
# ------------------------------------------------------------------------- @classmethod def _archive_message( cls, message: Message, message_num: int, archive_absdir: str) -> dict: """Archive a message. :type message: email.message.Message :param message: Current message :param int message_num: Message number. :param str archive_dir: Absolute path to the directory for these files in the archives. :rtype: dict """ # Metadata date = cls._message_date(message) message_id = '{0}-{1}{2:0>3}'.format( normalize_filename( cls._decode_value(message['Subject']), mode='strict')[:32], date.strftime(MAIL_DATETIME_FORMAT), message_num) document = { 'from': cls._decode_value(message['From']), 'to': cls._decode_value(message['To']), 'title': cls._decode_value(message['Subject']), 'organization': cls._decode_value(message['Organization']), 'language': message['Content-Language'], 'datetime': date.isoformat(' '), } # Attachement bodies = [] document['dirname'] = '.' for part_num, part in enumerate(message.walk()): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: bodies.append(part) continue if document['dirname'] == '.': document['dirname'] = message_id makedirs( join(archive_absdir, document['dirname']), exist_ok=True) document['filename'] = part.get_filename() if not document['filename']: # pragma: nocover document['filename'] = f'{0}-{1:0>3}{2}'.format( message_id, part_num, guess_extension(part.get_content_type()) or '.bin') document['path'] = join(document['dirname'], document['filename']) with open(join(archive_absdir, document['path']), 'wb') as hdl: hdl.write(bytes(part.get_payload(decode=True))) # Body if bodies and not exists(archive_absdir): makedirs(archive_absdir, exist_ok=True) # pragma: nocover for part in bodies: if part.get_content_type() == 'text/html': document['filename'] = '{0}.html'.format(message_id) document['path'] = join( document['dirname'], document['filename']) with open(join(archive_absdir, document['path']), 'wb') as hdl: hdl.write(bytes(part.get_payload(decode=True))) document['source'] = join(archive_absdir, document['path']) \ if document['dirname'] == '.' \ else join(archive_absdir, document['dirname']) document['body'] = part.get_payload(decode=True) break if 'source' not in document and bodies: document['filename'] = '{0}.txt'.format(message_id) document['path'] = join(document['dirname'], document['filename']) with open(join(archive_absdir, document['path']), 'wb') as hdl: hdl.write(bytes(bodies[0].get_payload(decode=True))) document['source'] = join(archive_absdir, document['path']) \ if document['dirname'] == '.' \ else join(archive_absdir, document['dirname']) document['body'] = bodies[0].get_payload(decode=True) return document # ------------------------------------------------------------------------- @classmethod def _message_date(cls, message: Message) -> datetime: """Return the date of the message. :type message: email.message.Message :param message: Current message :rtype: datetime.datetime """ date = None if message['Date']: date_tz = parsedate_tz(message['Date']) if date_tz: date = datetime.fromtimestamp(mktime_tz(date_tz)) return date or datetime.now() # ------------------------------------------------------------------------- @classmethod def _decode_value(cls, encoded: str) -> str: """A decoded value. :param str encoded: Encoded value. :rtype: str """ if not encoded: return '' decoded = decode_header(encoded) if decoded[0][1]: # pragma: nocover return decoded[0][0].decode(decoded[0][1]) return decoded[0][0]