Add support for retrying upload queue and ocr queue tasks in the event of Database locking errors.
This commit is contained in:
@@ -6,3 +6,5 @@ DEFAULT_DELETE_PERIOD = 30
|
||||
DEFAULT_DELETE_TIME_UNIT = 'days'
|
||||
DEFAULT_ZIP_FILENAME = 'document_bundle.zip'
|
||||
DOCUMENT_IMAGE_TASK_TIMEOUT = 20
|
||||
UPDATE_PAGE_COUNT_RETRY_DELAY = 10
|
||||
UPLOAD_NEW_VERSION_RETRY_DELAY = 10
|
||||
|
||||
@@ -4,12 +4,16 @@ from datetime import timedelta
|
||||
import logging
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.db import OperationalError
|
||||
from django.utils.timezone import now
|
||||
|
||||
from mayan.celery import app
|
||||
|
||||
from common.models import SharedUploadedFile
|
||||
|
||||
from .literals import (
|
||||
UPDATE_PAGE_COUNT_RETRY_DELAY, UPLOAD_NEW_VERSION_RETRY_DELAY
|
||||
)
|
||||
from .models import (
|
||||
DeletedDocument, Document, DocumentPage, DocumentType, DocumentVersion
|
||||
)
|
||||
@@ -69,14 +73,18 @@ def task_get_document_page_image(document_page_id, *args, **kwargs):
|
||||
return document_page.get_image(*args, **kwargs)
|
||||
|
||||
|
||||
@app.task(ignore_result=True)
|
||||
def task_update_page_count(version_id):
|
||||
@app.task(bind=True, default_retry_delay=UPDATE_PAGE_COUNT_RETRY_DELAY, ignore_result=True)
|
||||
def task_update_page_count(self, version_id):
|
||||
document_version = DocumentVersion.objects.get(pk=version_id)
|
||||
document_version.update_page_count()
|
||||
try:
|
||||
document_version.update_page_count()
|
||||
except OperationalError as exception:
|
||||
logger.warning('Operational error during attempt to update page count for document version: %s; %s. Retrying.', document, exception)
|
||||
raise self.retry(exc=exception)
|
||||
|
||||
|
||||
@app.task(ignore_result=True)
|
||||
def task_upload_new_version(document_id, shared_uploaded_file_id, user_id, comment=None):
|
||||
@app.task(bind=True, default_retry_delay=UPLOAD_NEW_VERSION_RETRY_DELAY, ignore_result=True)
|
||||
def task_upload_new_version(self, document_id, shared_uploaded_file_id, user_id, comment=None):
|
||||
document = Document.objects.get(pk=document_id)
|
||||
|
||||
shared_file = SharedUploadedFile.objects.get(pk=shared_uploaded_file_id)
|
||||
@@ -87,10 +95,20 @@ def task_upload_new_version(document_id, shared_uploaded_file_id, user_id, comme
|
||||
user = None
|
||||
|
||||
with shared_file.open() as file_object:
|
||||
document_version = DocumentVersion(document=document, comment=comment or '', file=file_object)
|
||||
try:
|
||||
document_version = DocumentVersion(document=document, comment=comment or '', file=file_object)
|
||||
document_version.save(_user=user)
|
||||
except Warning as warning:
|
||||
logger.info('Warning during attempt to create new document version for document: %s ; %s', document, warning)
|
||||
finally:
|
||||
# New document version are blocked
|
||||
logger.info('Warning during attempt to create new document version for document: %s; %s', document, warning)
|
||||
shared_file.delete()
|
||||
except OperationalError as exception:
|
||||
# Database is locked for example
|
||||
logger.warning('Operational error during attempt to create new document version for document: %s; %s. Retrying.', document, exception)
|
||||
raise self.retry(exc=exception)
|
||||
except Exception as exception:
|
||||
# This except and else block emulate a finally:
|
||||
logger.error('Unexpected error during attempt to create new document version for document: %s; %s', document, warning)
|
||||
shared_file.delete()
|
||||
else:
|
||||
shared_file.delete()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
DO_OCR_RETRY_DELAY = 10
|
||||
LOCK_EXPIRE = 60 * 10 # Adjust to worst case scenario
|
||||
|
||||
@@ -5,20 +5,21 @@ import sys
|
||||
import traceback
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import OperationalError
|
||||
|
||||
from documents.models import DocumentVersion
|
||||
from lock_manager import Lock, LockError
|
||||
from mayan.celery import app
|
||||
|
||||
from .runtime import ocr_backend_class
|
||||
from .literals import LOCK_EXPIRE
|
||||
from .literals import DO_OCR_RETRY_DELAY, LOCK_EXPIRE
|
||||
from .models import DocumentVersionOCRError
|
||||
from .signals import post_document_version_ocr
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.task(bind=True, ignore_result=True)
|
||||
@app.task(bind=True, default_retry_delay=DO_OCR_RETRY_DELAY, ignore_result=True)
|
||||
def task_do_ocr(self, document_version_pk):
|
||||
lock_id = 'task_do_ocr_doc_version-%d' % document_version_pk
|
||||
try:
|
||||
@@ -33,6 +34,9 @@ def task_do_ocr(self, document_version_pk):
|
||||
logger.info('Starting document OCR for document version: %s', document_version)
|
||||
backend = ocr_backend_class()
|
||||
backend.process_document_version(document_version)
|
||||
except OperationalError as exception:
|
||||
logger.error('OCR error for document version: %s; %s. Retrying.', document_version, exception)
|
||||
raise self.retry(exc=exception)
|
||||
except Exception as exception:
|
||||
logger.error('OCR error for document version: %s; %s', document_version, exception)
|
||||
if document_version:
|
||||
|
||||
Reference in New Issue
Block a user