diff --git a/mayan/apps/converter/migrations/0009_auto_20150714_2228.py b/mayan/apps/converter/migrations/0009_auto_20150714_2228.py new file mode 100644 index 0000000000..fa19203692 --- /dev/null +++ b/mayan/apps/converter/migrations/0009_auto_20150714_2228.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('converter', '0008_auto_20150711_0723'), + ] + + operations = [ + migrations.AlterField( + model_name='transformation', + name='order', + field=models.PositiveIntegerField(default=0, help_text='Order in which the transformations will be executed. If left unchanged, an automatic order value will be assigned.', db_index=True, verbose_name='Order', blank=True), + preserve_default=True, + ), + ] diff --git a/mayan/apps/sources/apps.py b/mayan/apps/sources/apps.py index 7dca79bfbc..34ee8bc4e5 100644 --- a/mayan/apps/sources/apps.py +++ b/mayan/apps/sources/apps.py @@ -65,6 +65,9 @@ class SourcesApp(MayanAppConfig): 'sources.tasks.task_source_handle_upload': { 'queue': 'sources' }, + 'sources.tasks.task_upload_document': { + 'queue': 'sources' + }, } ) diff --git a/mayan/apps/sources/literals.py b/mayan/apps/sources/literals.py index ab43e0511d..0c81c2e66f 100644 --- a/mayan/apps/sources/literals.py +++ b/mayan/apps/sources/literals.py @@ -34,3 +34,4 @@ SOURCE_CHOICES = ( DEFAULT_INTERVAL = 600 DEFAULT_POP3_TIMEOUT = 60 DEFAULT_IMAP_MAILBOX = 'INBOX' +DEFAULT_SOURCE_TASK_RETRY_DELAY = 10 diff --git a/mayan/apps/sources/models.py b/mayan/apps/sources/models.py index ffcc4d2263..50811d4158 100644 --- a/mayan/apps/sources/models.py +++ b/mayan/apps/sources/models.py @@ -9,7 +9,7 @@ import os import poplib from django.core.files import File -from django.db import models +from django.db import models, transaction from django.utils.encoding import python_2_unicode_compatible from django.utils.translation import ugettext_lazy as _ @@ -35,6 +35,50 @@ from .literals import ( logger = logging.getLogger(__name__) +class UploadHandler(object): + def document_upload(self, file_object): + document_version = self.document_type.new_document(file_object=file_object, description=self.description, label=self.label, language=self.language, _user=self.user) + self.post_document_upload(document=document_version.document) + self.post_document_version_upload(document_version=document_version) + + def get_objects(self): + self.source = Source.objects.get_subclass(pk=self.kwargs['source'].pk) + self.file_object = self.kwargs['file_object'] + self.document_type = self.kwargs.get('document_type')#, self.source.document_type) + self.description = self.kwargs.get('description') + self.expand = self.kwargs.get('expand', False) + self.label = self.kwargs.get('label') + self.language = self.kwargs.get('language') + self.metadata_dict_list = self.kwargs.get('metadata_dict_list') + self.user = self.kwargs.get('user') + + def handle_upload(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + self.get_objects() + + if self.expand: + try: + compressed_file = CompressedFile(self.file_object) + for compressed_file_child in compressed_file.children(): + self.document_upload(file_object=File(compressed_file_child)) + compressed_file_child.close() + + except NotACompressedFile: + logging.debug('Exception: NotACompressedFile') + self.document_upload(file_object=self.file_object) + else: + self.document_upload(file_object=self.file_object) + + def post_document_upload(self, document): + if self.metadata_dict_list: + save_metadata_list(self.metadata_dict_list, document, create=True) + + def post_document_version_upload(self, document_version): + Transformation.objects.copy(source=self.source, targets=document_version.pages.all()) + + @python_2_unicode_compatible class Source(models.Model): label = models.CharField(max_length=64, verbose_name=_('Label')) @@ -52,34 +96,9 @@ class Source(models.Model): def fullname(self): return ' '.join([self.class_fullname(), '"%s"' % self.label]) - def upload_document(self, document_type, file_object, label, language, user, description=None, metadata_dict_list=None): - document = document_type.new_document( - file_object=file_object, label=label, description=description, - language=language, _user=user - ) - - Transformation.objects.get_for_model(document).delete() - Transformation.objects.copy(source=Source.objects.get_subclass(pk=self.pk), targets=Document.objects.filter(pk=document.pk)) - - if metadata_dict_list: - save_metadata_list(metadata_dict_list, document, create=True) - - def handle_upload(self, file_object, label, description=None, document_type=None, expand=False, language=None, metadata_dict_list=None, user=None): - if not document_type: - document_type = self.document_type - - if expand: - try: - compressed_file = CompressedFile(file_object) - for compressed_file_child in compressed_file.children(): - self.upload_document(document_type=document_type, file_object=compressed_file_child, description=description, label=unicode(compressed_file_child), language=language or setting_language.value, metadata_dict_list=metadata_dict_list, user=user) - compressed_file_child.close() - - except NotACompressedFile: - logging.debug('Exception: NotACompressedFile') - self.upload_document(document_type=document_type, file_object=file_object, description=description, label=label, language=language or setting_language.value, metadata_dict_list=metadata_dict_list, user=user) - else: - self.upload_document(document_type=document_type, file_object=file_object, description=description, label=label, language=language or setting_language.value, metadata_dict_list=metadata_dict_list, user=user) + def handle_upload(self, *args, **kwargs): + handler = UploadHandler() + handler.handle_upload(*args, source=self, **kwargs) def get_upload_file_object(self, form_data): pass @@ -249,7 +268,7 @@ class EmailBaseModel(IntervalBaseModel): logger.debug('filename: %s', filename) file_object = Attachment(part, name=filename) - source.handle_upload(file_object=file_object, label=filename, expand=(source.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y), document_type=source.document_type) + source.handle_upload(document_type=source.document_type, file_object=file_object, label=filename, expand=(source.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y)) class Meta: verbose_name = _('Email source') @@ -348,7 +367,7 @@ class WatchFolderSource(IntervalBaseModel): full_path = os.path.join(self.folder_path, file_name) if os.path.isfile(full_path): with File(file=open(full_path, mode='rb')) as file_object: - self.handle_upload(file_object, label=file_name, expand=(self.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y)) + self.handle_upload(file_object=file_object, expand=(self.uncompress == SOURCE_UNCOMPRESS_CHOICE_Y), label=file_name) os.unlink(full_path) class Meta: diff --git a/mayan/apps/sources/tasks.py b/mayan/apps/sources/tasks.py index ba64a3d5cd..92c5fcaf50 100644 --- a/mayan/apps/sources/tasks.py +++ b/mayan/apps/sources/tasks.py @@ -1,13 +1,17 @@ import logging from django.contrib.auth.models import User +from django.db import OperationalError from django.utils.translation import ugettext_lazy as _ from mayan.celery import app from common.models import SharedUploadedFile +from converter.models import Transformation from documents.models import DocumentType +from metadata.api import save_metadata_list +from .literals import DEFAULT_SOURCE_TASK_RETRY_DELAY from .models import Source logger = logging.getLogger(__name__) @@ -26,24 +30,80 @@ def task_check_interval_source(source_id): source.logs.all().delete() -@app.task(ignore_result=True) -def task_source_handle_upload(label, document_type_id, shared_uploaded_file_id, source_id, description=None, expand=False, language=None, metadata_dict_list=None, user_id=None): - shared_uploaded_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id) - source = Source.objects.get_subclass(pk=source_id) - document_type = DocumentType.objects.get(pk=document_type_id) +@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True) +def task_post_document_version_upload(self, source_id, document_version_id): + try: + source = Source.objects.get_subclass(pk=source_id) + document_version = DocumentVersion.objects.get(pk=document_version_id) - if user_id: - user = User.objects.get(pk=user_id) - else: - user = None + Transformation.objects.copy(source=Source.objects.get_subclass(pk=source_id), targets=document_version.pages.all()) + except OperationalError as exception: + logger.warning('Operational error during post source document upload processing: %s. Retrying.', exception) + raise self.retry(exc=exception) - if not label: - label = shared_uploaded_file.filename + +@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True) +def task_upload_document(self, document_type_id, shared_uploaded_file_id, label=None, language=None, user_id=None, description=None, metadata_dict_list=None): + try: + shared_uploaded_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id) + source = Source.objects.get_subclass(pk=source_id) + document_type = DocumentType.objects.get(pk=document_type_id) + + if user_id: + user = User.objects.get(pk=user_id) + else: + user = None + + if not label: + label = shared_uploaded_file.filename + + with transaction.atomic(): + document = DocumentVersion.objects.create(document_type=document_type) + + document_version = document.new_document( + file_object=file_object, label=label, description=description, + language=language, _user=user + ) + + if metadata_dict_list: + save_metadata_list(metadata_dict_list, document, create=True) + + task_post_source_document_version_upload.delay(source_id=source_id, document_version_id=document_version.pk, metadata_dict_list=metadata_dict_list) + except OperationalError as exception: + logger.warning('Operational error during attempt to handle source upload: %s. Retrying.', exception) + raise self.retry(exc=exception) + + try: + shared_uploaded_file.delete() + except OperationalError as exception: + logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_uploaded_file, exception) + + +@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True) +def task_source_handle_upload(self, label, document_type_id, shared_uploaded_file_id, source_id, description=None, expand=False, language=None, metadata_dict_list=None, user_id=None): + try: + shared_uploaded_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id) + source = Source.objects.get_subclass(pk=source_id) + document_type = DocumentType.objects.get(pk=document_type_id) + + if user_id: + user = User.objects.get(pk=user_id) + else: + user = None + + if not label: + label = shared_uploaded_file.filename + except OperationalError as exception: + logger.warning('Operational error during attempt to load data to handle source upload: %s. Retrying.', exception) + raise self.retry(exc=exception) with shared_uploaded_file.open() as file_object: source.handle_upload(description=description, document_type=document_type, expand=expand, file_object=file_object, label=label, language=language, metadata_dict_list=metadata_dict_list, user=user) - shared_uploaded_file.delete() + try: + shared_uploaded_file.delete() + except OperationalError as exception: + logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_uploaded_file, exception) # TODO: Report/record how was file uploaded # if result['is_compressed'] is None: