Finish document upload task refactor. Increase failure tolerance to database Operational Errors.

This commit is contained in:
Roberto Rosario
2015-07-15 04:36:23 -04:00
parent 0b1230f214
commit 91a9b3b045
7 changed files with 141 additions and 113 deletions

View File

@@ -14,7 +14,7 @@ from .models import IndexInstanceNode
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@app.task(bind=True, default_retry_delay=RETRY_DELAY, ignore_result=True) @app.task(bind=True, default_retry_delay=RETRY_DELAY, max_retries=None, ignore_result=True)
def task_delete_empty_index_nodes(self): def task_delete_empty_index_nodes(self):
try: try:
rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes') rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes')
@@ -28,7 +28,7 @@ def task_delete_empty_index_nodes(self):
rebuild_lock.release() rebuild_lock.release()
@app.task(bind=True, default_retry_delay=RETRY_DELAY, ignore_result=True) @app.task(bind=True, default_retry_delay=RETRY_DELAY, max_retries=None, ignore_result=True)
def task_index_document(self, document_id): def task_index_document(self, document_id):
try: try:
rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes') rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes')

View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('documents', '0023_auto_20150715_0259'),
]
operations = [
migrations.AlterField(
model_name='documentversion',
name='comment',
field=models.TextField(default='', verbose_name='Comment', blank=True),
preserve_default=True,
),
]

View File

@@ -81,7 +81,8 @@ class DocumentType(models.Model):
document = self.documents.create(description=description or '', label=label or unicode(file_object), language=language or setting_language.value) document = self.documents.create(description=description or '', label=label or unicode(file_object), language=language or setting_language.value)
document.save(_user=_user) document.save(_user=_user)
return document.new_version(file_object=file_object, _user=_user) document.new_version(file_object=file_object, _user=_user)
return document
except Exception as exception: except Exception as exception:
logger.critical('Unexpected exception while trying to create new document "%s" from document type "%s"; %s', label or unicode(file_object), self, exception) logger.critical('Unexpected exception while trying to create new document "%s" from document type "%s"; %s', label or unicode(file_object), self, exception)
raise raise
@@ -174,7 +175,7 @@ class Document(models.Model):
def new_version(self, file_object, comment=None, _user=None): def new_version(self, file_object, comment=None, _user=None):
logger.info('Creating new document version for document: %s', self) logger.info('Creating new document version for document: %s', self)
document_version = DocumentVersion(document=self, comment=comment, file=File(file_object)) document_version = DocumentVersion(document=self, comment=comment or '', file=File(file_object))
document_version.save(_user=_user) document_version.save(_user=_user)
logger.info('New document version queued for document: %s', self) logger.info('New document version queued for document: %s', self)
@@ -267,7 +268,7 @@ class DocumentVersion(models.Model):
document = models.ForeignKey(Document, related_name='versions', verbose_name=_('Document')) document = models.ForeignKey(Document, related_name='versions', verbose_name=_('Document'))
timestamp = models.DateTimeField(auto_now_add=True, db_index=True, verbose_name=_('Timestamp')) timestamp = models.DateTimeField(auto_now_add=True, db_index=True, verbose_name=_('Timestamp'))
comment = models.TextField(blank=True, default='', null=True, verbose_name=_('Comment')) comment = models.TextField(blank=True, default='', verbose_name=_('Comment'))
# File related fields # File related fields
file = models.FileField(storage=storage_backend, upload_to=UUID_FUNCTION, verbose_name=_('File')) file = models.FileField(storage=storage_backend, upload_to=UUID_FUNCTION, verbose_name=_('File'))

View File

@@ -32,7 +32,7 @@ class DocumentTestCase(TestCase):
ocr_settings.save() ocr_settings.save()
with open(TEST_DOCUMENT_PATH) as file_object: with open(TEST_DOCUMENT_PATH) as file_object:
self.document = self.document_type.new_document(file_object=File(file_object), label='mayan_11_1.pdf').document self.document = self.document_type.new_document(file_object=File(file_object), label='mayan_11_1.pdf')
def tearDown(self): def tearDown(self):
self.document_type.delete() self.document_type.delete()

View File

@@ -35,7 +35,7 @@ class DocumentsViewsFunctionalTestCase(TestCase):
self.assertTrue(self.admin_user.is_authenticated()) self.assertTrue(self.admin_user.is_authenticated())
with open(TEST_SMALL_DOCUMENT_PATH) as file_object: with open(TEST_SMALL_DOCUMENT_PATH) as file_object:
self.document = self.document_type.new_document(file_object=File(file_object), label='mayan_11_1.pdf').document self.document = self.document_type.new_document(file_object=File(file_object), label='mayan_11_1.pdf')
def tearDown(self): def tearDown(self):
self.document_type.delete() self.document_type.delete()

View File

@@ -35,50 +35,6 @@ from .literals import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UploadHandler(object):
def document_upload(self, file_object):
document_version = self.document_type.new_document(file_object=file_object, description=self.description, label=self.label, language=self.language, _user=self.user)
self.post_document_upload(document=document_version.document)
self.post_document_version_upload(document_version=document_version)
def get_objects(self):
self.source = Source.objects.get_subclass(pk=self.kwargs['source'].pk)
self.file_object = self.kwargs['file_object']
self.document_type = self.kwargs.get('document_type')#, self.source.document_type)
self.description = self.kwargs.get('description')
self.expand = self.kwargs.get('expand', False)
self.label = self.kwargs.get('label')
self.language = self.kwargs.get('language')
self.metadata_dict_list = self.kwargs.get('metadata_dict_list')
self.user = self.kwargs.get('user')
def handle_upload(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.get_objects()
if self.expand:
try:
compressed_file = CompressedFile(self.file_object)
for compressed_file_child in compressed_file.children():
self.document_upload(file_object=File(compressed_file_child))
compressed_file_child.close()
except NotACompressedFile:
logging.debug('Exception: NotACompressedFile')
self.document_upload(file_object=self.file_object)
else:
self.document_upload(file_object=self.file_object)
def post_document_upload(self, document):
if self.metadata_dict_list:
save_metadata_list(self.metadata_dict_list, document, create=True)
def post_document_version_upload(self, document_version):
Transformation.objects.copy(source=self.source, targets=document_version.pages.all())
@python_2_unicode_compatible @python_2_unicode_compatible
class Source(models.Model): class Source(models.Model):
label = models.CharField(max_length=64, verbose_name=_('Label')) label = models.CharField(max_length=64, verbose_name=_('Label'))
@@ -96,9 +52,46 @@ class Source(models.Model):
def fullname(self): def fullname(self):
return ' '.join([self.class_fullname(), '"%s"' % self.label]) return ' '.join([self.class_fullname(), '"%s"' % self.label])
def handle_upload(self, *args, **kwargs): def upload_document(self, file_object, document_type, description=None, label=None, language=None, metadata_dict_list=None, user=None):
handler = UploadHandler() try:
handler.handle_upload(*args, source=self, **kwargs) with transaction.atomic():
document = Document.objects.create(description=description or '', document_type=document_type, label=label or unicode(file_object), language=language or setting_language.value)
document.save(_user=user)
document_version = document.new_version(file_object=file_object, _user=user)
Transformation.objects.copy(source=self, targets=document_version.pages.all())
if metadata_dict_list:
save_metadata_list(metadata_dict_list, document, create=True)
except Exception as exception:
logger.critical('Unexpected exception while trying to create new document "%s" from source "%s"; %s', label or unicode(file_object), self, exception)
raise
def handle_upload(self, file_object, description=None, document_type=None, expand=False, label=None, language=None, metadata_dict_list=None, user=None):
if not document_type:
document_type = self.document_type
kwargs = {
'description': description, 'document_type': document_type,
'label': label, 'language': language,
'metadata_dict_list': metadata_dict_list, 'user': user
}
if expand:
try:
compressed_file = CompressedFile(file_object)
for compressed_file_child in compressed_file.children():
kwargs.update({'label': unicode(compressed_file_child)})
self.upload_document(file_object=File(compressed_file_child), **kwargs)
compressed_file_child.close()
except NotACompressedFile:
logging.debug('Exception: NotACompressedFile')
self.upload_document(file_object=file_object, **kwargs)
else:
self.upload_document(file_object=file_object, **kwargs)
def get_upload_file_object(self, form_data): def get_upload_file_object(self, form_data):
pass pass

View File

@@ -1,14 +1,17 @@
import logging import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.db import OperationalError from django.core.files import File
from django.db import OperationalError, transaction
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from mayan.celery import app from mayan.celery import app
from common.compressed_files import CompressedFile, NotACompressedFile
from common.models import SharedUploadedFile from common.models import SharedUploadedFile
from converter.models import Transformation from converter.models import Transformation
from documents.models import DocumentType from documents.models import Document, DocumentType
from documents.settings import setting_language
from metadata.api import save_metadata_list from metadata.api import save_metadata_list
from .literals import DEFAULT_SOURCE_TASK_RETRY_DELAY from .literals import DEFAULT_SOURCE_TASK_RETRY_DELAY
@@ -31,23 +34,36 @@ def task_check_interval_source(source_id):
@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True) @app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True)
def task_post_document_version_upload(self, source_id, document_version_id): def task_upload_document(self, source_id, document_type_id, shared_uploaded_file_id, description=None, label=None, language=None, metadata_dict_list=None, user_id=None):
try: try:
document_type = DocumentType.objects.get(pk=document_type_id)
source = Source.objects.get_subclass(pk=source_id) source = Source.objects.get_subclass(pk=source_id)
document_version = DocumentVersion.objects.get(pk=document_version_id) shared_upload = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id)
if user_id:
user = User.objects.get(pk=user_id)
else:
user = None
with shared_upload.open() as file_object:
source.upload_document(file_object=file_object, document_type=document_type, description=description, label=label, language=language, metadata_dict_list=metadata_dict_list, user=user)
Transformation.objects.copy(source=Source.objects.get_subclass(pk=source_id), targets=document_version.pages.all())
except OperationalError as exception: except OperationalError as exception:
logger.warning('Operational error during post source document upload processing: %s. Retrying.', exception) logger.warning('Operational exception while trying to create new document "%s" from source id %d; %s. Retying.', label or shared_upload.filename, source_id, exception)
raise self.retry(exc=exception) raise self.retry(exc=exception)
else:
try:
shared_upload.delete()
except OperationalError as exception:
logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_upload, exception)
@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True) @app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True)
def task_upload_document(self, document_type_id, shared_uploaded_file_id, label=None, language=None, user_id=None, description=None, metadata_dict_list=None): def task_source_handle_upload(self, document_type_id, shared_uploaded_file_id, source_id, description=None, expand=False, label=None, language=None, metadata_dict_list=None, skip_list=None, user_id=None):
try: try:
shared_uploaded_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id)
source = Source.objects.get_subclass(pk=source_id)
document_type = DocumentType.objects.get(pk=document_type_id) document_type = DocumentType.objects.get(pk=document_type_id)
source = Source.objects.get_subclass(pk=source_id)
shared_upload = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id)
if user_id: if user_id:
user = User.objects.get(pk=user_id) user = User.objects.get(pk=user_id)
@@ -55,62 +71,60 @@ def task_upload_document(self, document_type_id, shared_uploaded_file_id, label=
user = None user = None
if not label: if not label:
label = shared_uploaded_file.filename label = shared_upload.filename
with transaction.atomic():
document = DocumentVersion.objects.create(document_type=document_type)
document_version = document.new_document(
file_object=file_object, label=label, description=description,
language=language, _user=user
)
if metadata_dict_list:
save_metadata_list(metadata_dict_list, document, create=True)
task_post_source_document_version_upload.delay(source_id=source_id, document_version_id=document_version.pk, metadata_dict_list=metadata_dict_list)
except OperationalError as exception:
logger.warning('Operational error during attempt to handle source upload: %s. Retrying.', exception)
raise self.retry(exc=exception)
try:
shared_uploaded_file.delete()
except OperationalError as exception:
logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_uploaded_file, exception)
@app.task(bind=True, default_retry_delay=DEFAULT_SOURCE_TASK_RETRY_DELAY, ignore_result=True)
def task_source_handle_upload(self, label, document_type_id, shared_uploaded_file_id, source_id, description=None, expand=False, language=None, metadata_dict_list=None, user_id=None):
try:
shared_uploaded_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id)
source = Source.objects.get_subclass(pk=source_id)
document_type = DocumentType.objects.get(pk=document_type_id)
if user_id:
user = User.objects.get(pk=user_id)
else:
user = None
if not label:
label = shared_uploaded_file.filename
except OperationalError as exception: except OperationalError as exception:
logger.warning('Operational error during attempt to load data to handle source upload: %s. Retrying.', exception) logger.warning('Operational error during attempt to load data to handle source upload: %s. Retrying.', exception)
raise self.retry(exc=exception) raise self.retry(exc=exception)
with shared_uploaded_file.open() as file_object: kwargs = {
source.handle_upload(description=description, document_type=document_type, expand=expand, file_object=file_object, label=label, language=language, metadata_dict_list=metadata_dict_list, user=user) 'description': description, 'document_type_id': document_type.pk,
'label': label, 'language': language,
'metadata_dict_list': metadata_dict_list,
'source_id': source_id, 'user_id': user_id
}
try: if not skip_list:
shared_uploaded_file.delete() skip_list = []
except OperationalError as exception:
logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_uploaded_file, exception)
# TODO: Report/record how was file uploaded with shared_upload.open() as file_object:
# if result['is_compressed'] is None: if expand:
# messages.success(request, _('File uploaded successfully.')) try:
compressed_file = CompressedFile(file_object)
for compressed_file_child in compressed_file.children():
# TODO: find way to uniquely identify child files
# Use filename in the mean time.
if unicode(compressed_file_child) not in skip_list:
kwargs.update({'label': unicode(compressed_file_child)})
# if result['is_compressed'] is True: try:
# messages.success(request, _('File uncompressed successfully and uploaded as individual files.')) child_shared_uploaded_file = SharedUploadedFile.objects.create(file=File(compressed_file_child))
except OperationalError as exception:
logger.warning('Operational error while preparing to upload child document: %s. Rescheduling.', exception)
# if result['is_compressed'] is False: task_source_handle_upload.delay(
# messages.warning(request, _('File was not a compressed file, uploaded as it was.')) document_type_id=document_type_id,
shared_uploaded_file_id=shared_uploaded_file_id,
source_id=source_id, description=description,
expand=expand, label=label,
language=language,
metadata_dict_list=metadata_dict_list,
skip_list=skip_list, user_id=user_id
)
return
else:
skip_list.append(unicode(compressed_file_child))
task_upload_document.delay(shared_uploaded_file_id=child_shared_uploaded_file.pk, **kwargs)
finally:
compressed_file_child.close()
compressed_file_child.close()
try:
shared_upload.delete()
except OperationalError as exception:
logger.warning('Operational error during attempt to delete shared upload file: %s; %s. Retrying.', shared_uploaded_file, exception)
except NotACompressedFile:
logging.debug('Exception: NotACompressedFile')
task_upload_document.delay(shared_uploaded_file_id=shared_upload.pk, **kwargs)
else:
task_upload_document.delay(shared_uploaded_file_id=shared_upload.pk, **kwargs)