Source code for ciowarehouse2.inputs.stream_email
"""Class for email input stream."""
from __future__ import annotations
from os import makedirs
from os.path import join, exists
from imaplib import IMAP4, IMAP4_SSL
from email import message_from_string
from email.utils import parsedate_tz, mktime_tz
from email.header import decode_header
from email.message import Message
from mimetypes import guess_extension
from datetime import datetime
from socket import gaierror
from chrysalio.lib.utils import decrypt
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import normalize_filename
from ..models.dbinput import DBInputStream
from . import InputStream
MAIL_DATETIME_FORMAT = '%y%m%d%H%M%S'
# =============================================================================
[docs]
class InputStreamEmail(InputStream):
"""Class to manage email input stream.
See: :class:`.inputs.InputStream`
"""
# -------------------------------------------------------------------------
[docs]
def documents(self, build: Build, dbstream: DBInputStream) -> list | tuple:
"""Retrieve documents.
See: :meth:`.inputs.InputStream.documents`
"""
# Prepare archive
archive_wrh, archive_ciopath = self.archive(build, dbstream)
if archive_wrh is None or archive_ciopath is None:
return ()
self.archive_clean(build, dbstream, archive_wrh, archive_ciopath)
# Connect to the IMAP server
try:
imap = IMAP4_SSL(str(dbstream.host), int(dbstream.port or 993)) \
if dbstream.ssl \
else IMAP4(str(dbstream.host), int(dbstream.port or 143))
if dbstream.user:
imap.login(
str(dbstream.user),
decrypt(dbstream.password, 'warehouse'))
except (gaierror, IMAP4.error, OSError) as error:
build.error(error)
return ()
# Select mailbox
status, nums = imap.select(str(dbstream.path or 'INBOX'))
if status != 'OK':
build.error(translate(_( # yapf: disable
'${i}: unable to check mailbox "${m}"', {
'i': dbstream.stream_id,
'm': dbstream.path or 'INBOX'
}), lang=build.lang))
imap.logout()
return ()
if nums[0] is None or not int(nums[0]):
imap.logout()
return ()
# Retrieve message numbers
status, nums = imap.search(None, 'ALL')
if status != 'OK' or nums[0] is None: # pragma: nocover
imap.logout()
return ()
# Loop over messages
documents = []
archive_absdir = archive_ciopath.absolute_path(archive_wrh.root) \
or archive_wrh.root
for message_num in nums[0].split():
data = imap.fetch(message_num.decode('utf8'), '(RFC822)')[1][0]
if data is None: # pragma: nocover
continue
text = data[1].decode('utf8') \
if isinstance(data[1], bytes) else str(data[1])
document = self._archive_message(
message_from_string(text), int(message_num), archive_absdir)
if build.settings.get('develop') != 'true':
imap.store(message_num.decode('utf8'), '+FLAGS', '\\Deleted')
if document:
document['stream-id'] = dbstream.stream_id
document['stream-type'] = dbstream.stream_type
documents.append(document)
imap.expunge()
imap.close()
imap.logout()
# Archive
if documents:
self.archive_files(build, dbstream, archive_wrh, archive_absdir)
return documents
# -------------------------------------------------------------------------
@classmethod
def _archive_message(
cls, message: Message, message_num: int,
archive_absdir: str) -> dict:
"""Archive a message.
:type message: email.message.Message
:param message:
Current message
:param int message_num:
Message number.
:param str archive_dir:
Absolute path to the directory for these files in the archives.
:rtype: dict
"""
# Metadata
date = cls._message_date(message)
message_id = '{0}-{1}{2:0>3}'.format(
normalize_filename(
cls._decode_value(message['Subject']), mode='strict')[:32],
date.strftime(MAIL_DATETIME_FORMAT), message_num)
document = {
'from': cls._decode_value(message['From']),
'to': cls._decode_value(message['To']),
'title': cls._decode_value(message['Subject']),
'organization': cls._decode_value(message['Organization']),
'language': message['Content-Language'],
'datetime': date.isoformat(' '),
}
# Attachement
bodies = []
document['dirname'] = '.'
for part_num, part in enumerate(message.walk()):
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
bodies.append(part)
continue
if document['dirname'] == '.':
document['dirname'] = message_id
makedirs(
join(archive_absdir, document['dirname']), exist_ok=True)
document['filename'] = part.get_filename()
if not document['filename']: # pragma: nocover
document['filename'] = f'{0}-{1:0>3}{2}'.format(
message_id, part_num,
guess_extension(part.get_content_type()) or '.bin')
document['path'] = join(document['dirname'], document['filename'])
with open(join(archive_absdir, document['path']), 'wb') as hdl:
hdl.write(bytes(part.get_payload(decode=True)))
# Body
if bodies and not exists(archive_absdir):
makedirs(archive_absdir, exist_ok=True) # pragma: nocover
for part in bodies:
if part.get_content_type() == 'text/html':
document['filename'] = '{0}.html'.format(message_id)
document['path'] = join(
document['dirname'], document['filename'])
with open(join(archive_absdir, document['path']), 'wb') as hdl:
hdl.write(bytes(part.get_payload(decode=True)))
document['source'] = join(archive_absdir, document['path']) \
if document['dirname'] == '.' \
else join(archive_absdir, document['dirname'])
document['body'] = part.get_payload(decode=True)
break
if 'source' not in document and bodies:
document['filename'] = '{0}.txt'.format(message_id)
document['path'] = join(document['dirname'], document['filename'])
with open(join(archive_absdir, document['path']), 'wb') as hdl:
hdl.write(bytes(bodies[0].get_payload(decode=True)))
document['source'] = join(archive_absdir, document['path']) \
if document['dirname'] == '.' \
else join(archive_absdir, document['dirname'])
document['body'] = bodies[0].get_payload(decode=True)
return document
# -------------------------------------------------------------------------
@classmethod
def _message_date(cls, message: Message) -> datetime:
"""Return the date of the message.
:type message: email.message.Message
:param message:
Current message
:rtype: datetime.datetime
"""
date = None
if message['Date']:
date_tz = parsedate_tz(message['Date'])
if date_tz:
date = datetime.fromtimestamp(mktime_tz(date_tz))
return date or datetime.now()
# -------------------------------------------------------------------------
@classmethod
def _decode_value(cls, encoded: str) -> str:
"""A decoded value.
:param str encoded:
Encoded value.
:rtype: str
"""
if not encoded:
return ''
decoded = decode_header(encoded)
if decoded[0][1]: # pragma: nocover
return decoded[0][0].decode(decoded[0][1])
return decoded[0][0]