from __future__ import unicode_literals from email.Utils import collapse_rfc2231_value from email import message_from_string import json import imaplib import logging import os import poplib import yaml from django.core.files import File from django.db import models, transaction from django.utils.encoding import python_2_unicode_compatible from django.utils.translation import ugettext_lazy as _ from model_utils.managers import InheritanceManager from common.compressed_files import CompressedFile, NotACompressedFile from converter.literals import DIMENSION_SEPARATOR from converter.models import Transformation from djcelery.models import PeriodicTask, IntervalSchedule from documents.models import Document, DocumentType from documents.settings import setting_language from metadata.api import save_metadata_list, set_bulk_metadata from .classes import Attachment, SourceUploadedFile, StagingFile from .literals import ( DEFAULT_INTERVAL, DEFAULT_POP3_TIMEOUT, DEFAULT_IMAP_MAILBOX, DEFAULT_METADATA_ATTACHMENT_NAME, SOURCE_CHOICES, SOURCE_CHOICE_STAGING, SOURCE_CHOICE_WATCH, SOURCE_CHOICE_WEB_FORM, SOURCE_INTERACTIVE_UNCOMPRESS_CHOICES, SOURCE_UNCOMPRESS_CHOICES, SOURCE_UNCOMPRESS_CHOICE_Y, SOURCE_CHOICE_EMAIL_IMAP, SOURCE_CHOICE_EMAIL_POP3 ) logger = logging.getLogger(__name__) @python_2_unicode_compatible class Source(models.Model): label = models.CharField(max_length=64, verbose_name=_('Label')) enabled = models.BooleanField(default=True, verbose_name=_('Enabled')) objects = InheritanceManager() @classmethod def class_fullname(cls): return unicode(dict(SOURCE_CHOICES).get(cls.source_type)) def __str__(self): return '%s' % self.label def fullname(self): return ' '.join([self.class_fullname(), '"%s"' % self.label]) def upload_document(self, file_object, document_type, description=None, label=None, language=None, metadata_dict_list=None, metadata_dictionary=None, user=None): try: with transaction.atomic(): document = Document.objects.create( description=description or '', document_type=document_type, label=label or unicode(file_object), language=language or setting_language.value ) document.save(_user=user) document_version = document.new_version( file_object=file_object, _user=user ) Transformation.objects.copy( source=self, targets=document_version.pages.all() ) if metadata_dict_list: save_metadata_list( metadata_dict_list, document, create=True ) if metadata_dictionary: set_bulk_metadata( document=document, metadata_dictionary=metadata_dictionary ) except Exception as exception: logger.critical( 'Unexpected exception while trying to create new document ' '"%s" from source "%s"; %s', label or unicode(file_object), self, exception ) raise def handle_upload(self, file_object, description=None, document_type=None, expand=False, label=None, language=None, metadata_dict_list=None, metadata_dictionary=None, user=None): if not document_type: document_type = self.document_type kwargs = { 'description': description, 'document_type': document_type, 'label': label, 'language': language, 'metadata_dict_list': metadata_dict_list, 'metadata_dictionary': metadata_dictionary, 'user': user } if expand: try: compressed_file = CompressedFile(file_object) for compressed_file_child in compressed_file.children(): kwargs.update({'label': unicode(compressed_file_child)}) self.upload_document( file_object=File(compressed_file_child), **kwargs ) compressed_file_child.close() except NotACompressedFile: logging.debug('Exception: NotACompressedFile') self.upload_document(file_object=file_object, **kwargs) else: self.upload_document(file_object=file_object, **kwargs) def get_upload_file_object(self, form_data): pass # TODO: Should raise NotImplementedError()? def clean_up_upload_file(self, upload_file_object): pass # TODO: Should raise NotImplementedError()? class Meta: ordering = ('label',) verbose_name = _('Source') verbose_name_plural = _('Sources') class InteractiveSource(Source): objects = InheritanceManager() class Meta: verbose_name = _('Interactive source') verbose_name_plural = _('Interactive sources') class StagingFolderSource(InteractiveSource): is_interactive = True source_type = SOURCE_CHOICE_STAGING folder_path = models.CharField( max_length=255, help_text=_('Server side filesystem path.'), verbose_name=_('Folder path') ) preview_width = models.IntegerField( help_text=_('Width value to be passed to the converter backend.'), verbose_name=_('Preview width') ) preview_height = models.IntegerField( blank=True, null=True, help_text=_('Height value to be passed to the converter backend.'), verbose_name=_('Preview height') ) uncompress = models.CharField( choices=SOURCE_INTERACTIVE_UNCOMPRESS_CHOICES, max_length=1, help_text=_('Whether to expand or not compressed archives.'), verbose_name=_('Uncompress') ) delete_after_upload = models.BooleanField( default=True, help_text=_( 'Delete the file after is has been successfully uploaded.' ), verbose_name=_('Delete after upload') ) def get_preview_size(self): dimensions = [] dimensions.append(unicode(self.preview_width)) if self.preview_height: dimensions.append(unicode(self.preview_height)) return DIMENSION_SEPARATOR.join(dimensions) def get_file(self, *args, **kwargs): return StagingFile(staging_folder=self, *args, **kwargs) def get_files(self): try: for entry in sorted([os.path.normcase(f) for f in os.listdir(self.folder_path) if os.path.isfile(os.path.join(self.folder_path, f))]): yield self.get_file(filename=entry) except OSError as exception: logger.error( 'Unable get list of staging files from source: %s; %s', self, exception ) raise Exception( _('Unable get list of staging files: %s') % exception ) def get_upload_file_object(self, form_data): staging_file = self.get_file( encoded_filename=form_data['staging_file_id'] ) return SourceUploadedFile( source=self, file=staging_file.as_file(), extra_data=staging_file ) def clean_up_upload_file(self, upload_file_object): if self.delete_after_upload: try: upload_file_object.extra_data.delete() except Exception as exception: logger.error( 'Error deleting staging file: %s; %s', upload_file_object, exception ) raise Exception( _('Error deleting staging file; %s') % exception ) class Meta: verbose_name = _('Staging folder') verbose_name_plural = _('Staging folders') class WebFormSource(InteractiveSource): is_interactive = True source_type = SOURCE_CHOICE_WEB_FORM # TODO: unify uncompress as an InteractiveSource field uncompress = models.CharField( choices=SOURCE_INTERACTIVE_UNCOMPRESS_CHOICES, help_text=_('Whether to expand or not compressed archives.'), max_length=1, verbose_name=_('Uncompress') ) # Default path def get_upload_file_object(self, form_data): return SourceUploadedFile(source=self, file=form_data['file']) class Meta: verbose_name = _('Web form') verbose_name_plural = _('Web forms') class OutOfProcessSource(Source): is_interactive = False class Meta: verbose_name = _('Out of process') verbose_name_plural = _('Out of process') class IntervalBaseModel(OutOfProcessSource): interval = models.PositiveIntegerField( default=DEFAULT_INTERVAL, help_text=_('Interval in seconds between checks for new documents.'), verbose_name=_('Interval') ) document_type = models.ForeignKey( DocumentType, help_text=_( 'Assign a document type to documents uploaded from this source.' ), verbose_name=_('Document type') ) uncompress = models.CharField( choices=SOURCE_UNCOMPRESS_CHOICES, help_text=_('Whether to expand or not, compressed archives.'), max_length=1, verbose_name=_('Uncompress') ) def _get_periodic_task_name(self, pk=None): return 'check_interval_source-%i' % (pk or self.pk) def _delete_periodic_task(self, pk=None): try: periodic_task = PeriodicTask.objects.get( name=self._get_periodic_task_name(pk) ) interval_instance = periodic_task.interval if tuple(interval_instance.periodictask_set.values_list('id', flat=True)) == (periodic_task.pk,): # Only delete the interval if nobody else is using it interval_instance.delete() else: periodic_task.delete() except PeriodicTask.DoesNotExist: logger.warning( 'Tried to delete non existant periodic task "%s"', self._get_periodic_task_name(pk) ) def save(self, *args, **kwargs): new_source = not self.pk super(IntervalBaseModel, self).save(*args, **kwargs) if not new_source: self._delete_periodic_task() interval_instance, created = IntervalSchedule.objects.get_or_create( every=self.interval, period='seconds' ) # Create a new interval or reuse someone else's PeriodicTask.objects.create( name=self._get_periodic_task_name(), interval=interval_instance, task='sources.tasks.task_check_interval_source', kwargs=json.dumps({'source_id': self.pk}) ) def delete(self, *args, **kwargs): pk = self.pk super(IntervalBaseModel, self).delete(*args, **kwargs) self._delete_periodic_task(pk) class Meta: verbose_name = _('Interval source') verbose_name_plural = _('Interval sources') class EmailBaseModel(IntervalBaseModel): host = models.CharField(max_length=128, verbose_name=_('Host')) ssl = models.BooleanField(default=True, verbose_name=_('SSL')) port = models.PositiveIntegerField(blank=True, null=True, help_text=_( 'Typical choices are 110 for POP3, 995 for POP3 over SSL, 143 for ' 'IMAP, 993 for IMAP over SSL.'), verbose_name=_('Port') ) username = models.CharField(max_length=96, verbose_name=_('Username')) password = models.CharField(max_length=96, verbose_name=_('Password')) metadata_attachment_name = models.CharField( default=DEFAULT_METADATA_ATTACHMENT_NAME, help_text=_( 'Name of the attachment that will contains the metadata type names ' 'and value pairs to be assigned to the rest of the downloaded ' 'attachments. Note: This attachment has to be the first attachment.' ), max_length=128, verbose_name=_('Metadata attachment name') ) # From: http://bookmarks.honewatson.com/2009/08/11/ # python-gmail-imaplib-search-subject-get-attachments/ # TODO: Add lock to avoid running more than once concurrent same document # download # TODO: Use message ID for lock @staticmethod def process_message(source, message): counter = 1 email = message_from_string(message) metadata_dictionary = None for part in email.walk(): disposition = part.get('Content-Disposition', 'none') logger.debug('Disposition: %s', disposition) if disposition.startswith('attachment'): raw_filename = part.get_filename() if raw_filename: filename = collapse_rfc2231_value(raw_filename) else: filename = _('attachment-%i') % counter counter += 1 logger.debug('filename: %s', filename) with Attachment(part, name=filename) as file_object: if filename == source.metadata_attachment_name: metadata_dictionary = yaml.safe_load( file_object.read() ) logger.debug( 'Got metadata dictionary: %s', metadata_dictionary ) else: source.handle_upload( document_type=source.document_type, file_object=file_object, label=filename, expand=( source.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y ), metadata_dictionary=metadata_dictionary ) class Meta: verbose_name = _('Email source') verbose_name_plural = _('Email sources') class POP3Email(EmailBaseModel): source_type = SOURCE_CHOICE_EMAIL_POP3 timeout = models.PositiveIntegerField( default=DEFAULT_POP3_TIMEOUT, verbose_name=_('Timeout') ) def check_source(self): logger.debug('Starting POP3 email fetch') logger.debug('host: %s', self.host) logger.debug('ssl: %s', self.ssl) if self.ssl: mailbox = poplib.POP3_SSL(self.host, self.port) else: mailbox = poplib.POP3(self.host, self.port, timeout=self.timeout) mailbox.getwelcome() mailbox.user(self.username) mailbox.pass_(self.password) messages_info = mailbox.list() logger.debug('messages_info:') logger.debug(messages_info) logger.debug('messages count: %s', len(messages_info[1])) for message_info in messages_info[1]: message_number, message_size = message_info.split() logger.debug('message_number: %s', message_number) logger.debug('message_size: %s', message_size) complete_message = '\n'.join(mailbox.retr(message_number)[1]) EmailBaseModel.process_message( source=self, message=complete_message ) mailbox.dele(message_number) mailbox.quit() class Meta: verbose_name = _('POP email') verbose_name_plural = _('POP email') class IMAPEmail(EmailBaseModel): source_type = SOURCE_CHOICE_EMAIL_IMAP mailbox = models.CharField( default=DEFAULT_IMAP_MAILBOX, help_text=_( 'Mail from which to check for messages with attached documents.' ), max_length=64, verbose_name=_('Mailbox') ) # http://www.doughellmann.com/PyMOTW/imaplib/ def check_source(self): logger.debug('Starting IMAP email fetch') logger.debug('host: %s', self.host) logger.debug('ssl: %s', self.ssl) if self.ssl: mailbox = imaplib.IMAP4_SSL(self.host, self.port) else: mailbox = imaplib.IMAP4(self.host, self.port) mailbox.login(self.username, self.password) mailbox.select(self.mailbox) status, data = mailbox.search(None, 'NOT', 'DELETED') if data: messages_info = data[0].split() logger.debug('messages count: %s', len(messages_info)) for message_number in messages_info: logger.debug('message_number: %s', message_number) status, data = mailbox.fetch(message_number, '(RFC822)') EmailBaseModel.process_message( source=self, message=data[0][1] ) mailbox.store(message_number, '+FLAGS', '\\Deleted') mailbox.expunge() mailbox.close() mailbox.logout() class Meta: verbose_name = _('IMAP email') verbose_name_plural = _('IMAP email') class WatchFolderSource(IntervalBaseModel): source_type = SOURCE_CHOICE_WATCH folder_path = models.CharField( help_text=_('Server side filesystem path.'), max_length=255, verbose_name=_('Folder path') ) def check_source(self): # Force self.folder_path to unicode to avoid os.listdir returning # str for non-latin filenames, gh-issue #163 for file_name in os.listdir(unicode(self.folder_path)): full_path = os.path.join(self.folder_path, file_name) if os.path.isfile(full_path): with File(file=open(full_path, mode='rb')) as file_object: self.handle_upload( file_object=file_object, expand=(self.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y), label=file_name ) os.unlink(full_path) class Meta: verbose_name = _('Watch folder') verbose_name_plural = _('Watch folders') class SourceLog(models.Model): source = models.ForeignKey( Source, related_name='logs', verbose_name=_('Source') ) datetime = models.DateTimeField( auto_now_add=True, editable=False, verbose_name=_('Date time') ) message = models.TextField( blank=True, editable=False, verbose_name=_('Message') ) class Meta: get_latest_by = 'datetime' ordering = ('-datetime',) verbose_name = _('Log entry') verbose_name_plural = _('Log entries')