diff --git a/mayan/apps/sources/tests/mocks.py b/mayan/apps/sources/tests/mocks.py index 10ee8f4a4e..b80fb664e0 100644 --- a/mayan/apps/sources/tests/mocks.py +++ b/mayan/apps/sources/tests/mocks.py @@ -1,64 +1,133 @@ from __future__ import unicode_literals +from django.utils.encoding import force_text + +from .literals import TEST_EMAIL_BASE64_FILENAME + class MockIMAPMessage(object): - flags = [] - uid = None + def __init__(self, uid): + self.flags = [] + self.mailbox = None + self.uid = uid - def __init__(self): - self.uid = '999' + def delete(self): + self.mailbox.messages.pop(self.uid) + + def flags_add(self, flags_string): + for flag in flags_string.split(): + if flag in self.flags: + self.flags.remove(flag) + + def flags_remove(self, flags_string): + for flag in flags_string.split(): + if flag not in self.flags: + self.flags.append(flag) + + def flags_set(self, flags_string): + self.flags = flags_string.split() def get_flags(self): return ' '.join(self.flags) + def get_number(self): + return list(self.mailbox.messages.values()).index(self) + class MockIMAPMailbox(object): - messages = [] + messages = {} def __init__(self, name='INBOX'): self.name = name + def get_message_by_number(self, message_number): + return list(self.messages.values())[message_number - 1] + + def get_message_by_uid(self, uid): + return self.messages[uid] + + def get_message_count(self): + return len(self.messages) + + def get_messages(self): + return list(self.messages.values()) + + def messages_add(self, uid): + self.messages[uid] = MockIMAPMessage(uid=uid) + self.messages[uid].mailbox = self + class MockIMAPServer(object): def __init__(self): self.mailboxes = { 'INBOX': MockIMAPMailbox(name='INBOX') } - self.mailboxes['INBOX'].messages.append(MockIMAPMessage()) + self.mailboxes['INBOX'].messages_add(uid='999') self.mailbox_selected = None + def _fetch(self, messages): + flag = '\\Seen' + flag_modified = [] + message_numbers = [] + results = [] + uids = [] + + for message in messages: + if flag not in message.flags: + message.flags_add(flag) + flag_modified.append(message) + + message_number = message.get_number() + message_numbers.append(force_text(message_number)) + uid = message.uid + uids.append(uid) + body = TEST_EMAIL_BASE64_FILENAME + + results.append( + ( + '{} (UID {} RFC822 {{{}}}'.format(message_number, uid, len(body)), + body, + ) + ) + + results.append( + ' FLAGS ({}))'.format(flag), + ) + results.append( + '{} (UID {} FLAGS ({}))'.format( + ' '.join(message_numbers), ' '.join(uids), flag + ) + ) + return results + def close(self): return ('OK', ['Returned to authenticated state. (Success)']) def expunge(self): result = [] - for message in self.mailbox_selected.messages: + for message in self.mailbox_selected.get_messages(): if '\\Deleted' in message.flags: result.append( force_text( - self.mailbox_selected.messages.index(message) + message.get_number() ) ) - self.mailbox_selected.messages.remove(message) + message.delete() return ('OK', ' '.join(result)) def fetch(self, message_set, message_parts): - results = [] - for message_number in message_set.split(): - message = self.mailbox_selected.messages[int(message_number) - 1] - if '\\Seen' not in message.flags: - message.flags.append('\\Seen') + messages = [] - results.append( - ( - '{} (RFC822 {{4800}}'.format(message_number), - TEST_EMAIL_BASE64_FILENAME, - ' FLAGS ({}))'.format(message.get_flags()) + for message_number in message_set.split(): + messages.append( + self.mailbox_selected.get_message_by_number( + message_number=int(message_number) ) ) - return ('OK', results) + + return ('OK', self._fetch(messages=messages)) def login(self, user, password): return ('OK', ['{} authenticated (Success)'.format(user)]) @@ -83,12 +152,8 @@ class MockIMAPServer(object): ] message_sequences = [] - for result in results: - message_sequences.append( - force_text( - self.mailbox_selected.messages.index(result) - ) - ) + for message in results: + message_sequences.append(force_text(message.get_number())) return ('OK', ' '.join(message_sequences)) @@ -97,7 +162,7 @@ class MockIMAPServer(object): return ( 'OK', [ - len(self.mailbox_selected.messages) + self.mailbox_selected.get_message_count() ] ) @@ -108,18 +173,46 @@ class MockIMAPServer(object): message = self.mailbox_selected.messages[int(message_number) - 1] if command == 'FLAGS': - message.flags = flags.split() + message.flags_set(flags_string=flags) elif command == '+FLAGS': - for flag in flags.split(): - if flag not in message.flags: - message.flags.append(flag) + message.flags_add(flags_string=flags) elif command == '-FLAGS': - for flag in flags.split(): - if flag in message.flags: - message.flags.remove(flag) + message.flags_remove(flags_string=flags) results.append( '{} (FLAGS ({}))'.format(message_number, message.get_flags()) ) return ('OK', results) + + def uid(self, command, *args): + if command == 'FETCH': + uid = args[0] + messages = [self.mailbox_selected.get_message_by_uid(uid=uid)] + return ('OK', self._fetch(messages=messages)) + elif command == 'STORE': + results = [] + uid = args[0] + subcommand = args[1] + flags = args[2] + message = self.mailbox_selected.get_message_by_uid(uid=uid) + + if subcommand == 'FLAGS': + message.flags_set(flags_string=flags) + elif subcommand == '+FLAGS': + message.flags_add(flags_string=flags) + elif subcommand == '-FLAGS': + message.flags_remove(flags_string=flags) + + results.append( + '{} (FLAGS ({}))'.format(uid, message.get_flags()) + ) + return ('OK', results) + elif command == 'SEARCH': + message_sequences = [ + self.mailbox_selected.get_message_by_number( + message_number=1 + ).uid + ] + + return ('OK', [' '.join(message_sequences)]) diff --git a/mayan/apps/sources/tests/test_models.py b/mayan/apps/sources/tests/test_models.py index 9540dacff6..6021ad8247 100644 --- a/mayan/apps/sources/tests/test_models.py +++ b/mayan/apps/sources/tests/test_models.py @@ -309,8 +309,8 @@ class POP3SourceTestCase(GenericDocumentTestCase): def test_download_document(self, mock_poplib): mock_poplib.return_value = POP3SourceTestCase.MockMailbox() self.source = POP3Email.objects.create( - document_type=self.test_document_type, label='', host='', password='', - username='' + document_type=self.test_document_type, label='', host='', + password='', username='' ) self.source.check_source()