Improve IMAPMockServer

Fix failing IMAP source tests.

Signed-off-by: Roberto Rosario <roberto.rosario@mayan-edms.com>
This commit is contained in:
Roberto Rosario
2019-10-24 19:29:36 -04:00
parent 49b04acda0
commit cebd43d8c7
2 changed files with 129 additions and 36 deletions

View File

@@ -1,64 +1,133 @@
from __future__ import unicode_literals
from django.utils.encoding import force_text
from .literals import TEST_EMAIL_BASE64_FILENAME
class MockIMAPMessage(object):
flags = []
uid = None
def __init__(self, uid):
self.flags = []
self.mailbox = None
self.uid = uid
def __init__(self):
self.uid = '999'
def delete(self):
self.mailbox.messages.pop(self.uid)
def flags_add(self, flags_string):
for flag in flags_string.split():
if flag in self.flags:
self.flags.remove(flag)
def flags_remove(self, flags_string):
for flag in flags_string.split():
if flag not in self.flags:
self.flags.append(flag)
def flags_set(self, flags_string):
self.flags = flags_string.split()
def get_flags(self):
return ' '.join(self.flags)
def get_number(self):
return list(self.mailbox.messages.values()).index(self)
class MockIMAPMailbox(object):
messages = []
messages = {}
def __init__(self, name='INBOX'):
self.name = name
def get_message_by_number(self, message_number):
return list(self.messages.values())[message_number - 1]
def get_message_by_uid(self, uid):
return self.messages[uid]
def get_message_count(self):
return len(self.messages)
def get_messages(self):
return list(self.messages.values())
def messages_add(self, uid):
self.messages[uid] = MockIMAPMessage(uid=uid)
self.messages[uid].mailbox = self
class MockIMAPServer(object):
def __init__(self):
self.mailboxes = {
'INBOX': MockIMAPMailbox(name='INBOX')
}
self.mailboxes['INBOX'].messages.append(MockIMAPMessage())
self.mailboxes['INBOX'].messages_add(uid='999')
self.mailbox_selected = None
def _fetch(self, messages):
flag = '\\Seen'
flag_modified = []
message_numbers = []
results = []
uids = []
for message in messages:
if flag not in message.flags:
message.flags_add(flag)
flag_modified.append(message)
message_number = message.get_number()
message_numbers.append(force_text(message_number))
uid = message.uid
uids.append(uid)
body = TEST_EMAIL_BASE64_FILENAME
results.append(
(
'{} (UID {} RFC822 {{{}}}'.format(message_number, uid, len(body)),
body,
)
)
results.append(
' FLAGS ({}))'.format(flag),
)
results.append(
'{} (UID {} FLAGS ({}))'.format(
' '.join(message_numbers), ' '.join(uids), flag
)
)
return results
def close(self):
return ('OK', ['Returned to authenticated state. (Success)'])
def expunge(self):
result = []
for message in self.mailbox_selected.messages:
for message in self.mailbox_selected.get_messages():
if '\\Deleted' in message.flags:
result.append(
force_text(
self.mailbox_selected.messages.index(message)
message.get_number()
)
)
self.mailbox_selected.messages.remove(message)
message.delete()
return ('OK', ' '.join(result))
def fetch(self, message_set, message_parts):
results = []
for message_number in message_set.split():
message = self.mailbox_selected.messages[int(message_number) - 1]
if '\\Seen' not in message.flags:
message.flags.append('\\Seen')
messages = []
results.append(
(
'{} (RFC822 {{4800}}'.format(message_number),
TEST_EMAIL_BASE64_FILENAME,
' FLAGS ({}))'.format(message.get_flags())
for message_number in message_set.split():
messages.append(
self.mailbox_selected.get_message_by_number(
message_number=int(message_number)
)
)
return ('OK', results)
return ('OK', self._fetch(messages=messages))
def login(self, user, password):
return ('OK', ['{} authenticated (Success)'.format(user)])
@@ -83,12 +152,8 @@ class MockIMAPServer(object):
]
message_sequences = []
for result in results:
message_sequences.append(
force_text(
self.mailbox_selected.messages.index(result)
)
)
for message in results:
message_sequences.append(force_text(message.get_number()))
return ('OK', ' '.join(message_sequences))
@@ -97,7 +162,7 @@ class MockIMAPServer(object):
return (
'OK', [
len(self.mailbox_selected.messages)
self.mailbox_selected.get_message_count()
]
)
@@ -108,18 +173,46 @@ class MockIMAPServer(object):
message = self.mailbox_selected.messages[int(message_number) - 1]
if command == 'FLAGS':
message.flags = flags.split()
message.flags_set(flags_string=flags)
elif command == '+FLAGS':
for flag in flags.split():
if flag not in message.flags:
message.flags.append(flag)
message.flags_add(flags_string=flags)
elif command == '-FLAGS':
for flag in flags.split():
if flag in message.flags:
message.flags.remove(flag)
message.flags_remove(flags_string=flags)
results.append(
'{} (FLAGS ({}))'.format(message_number, message.get_flags())
)
return ('OK', results)
def uid(self, command, *args):
if command == 'FETCH':
uid = args[0]
messages = [self.mailbox_selected.get_message_by_uid(uid=uid)]
return ('OK', self._fetch(messages=messages))
elif command == 'STORE':
results = []
uid = args[0]
subcommand = args[1]
flags = args[2]
message = self.mailbox_selected.get_message_by_uid(uid=uid)
if subcommand == 'FLAGS':
message.flags_set(flags_string=flags)
elif subcommand == '+FLAGS':
message.flags_add(flags_string=flags)
elif subcommand == '-FLAGS':
message.flags_remove(flags_string=flags)
results.append(
'{} (FLAGS ({}))'.format(uid, message.get_flags())
)
return ('OK', results)
elif command == 'SEARCH':
message_sequences = [
self.mailbox_selected.get_message_by_number(
message_number=1
).uid
]
return ('OK', [' '.join(message_sequences)])

View File

@@ -309,8 +309,8 @@ class POP3SourceTestCase(GenericDocumentTestCase):
def test_download_document(self, mock_poplib):
mock_poplib.return_value = POP3SourceTestCase.MockMailbox()
self.source = POP3Email.objects.create(
document_type=self.test_document_type, label='', host='', password='',
username=''
document_type=self.test_document_type, label='', host='',
password='', username=''
)
self.source.check_source()