Update the OCR app to use Celery, remove OCR config options OCR_REPLICATION_DELAY, OCR_NODE_CONCURRENT_EXECUTION, OCR_QUEUE_PROCESSING_INTERVAL
This commit is contained in:
@@ -4,15 +4,12 @@ OCR
|
||||
To use: |Tools tab| |Right arrow| |OCR button|
|
||||
|
||||
Because OCR is an intensive operation, documents are queued for OCR for
|
||||
later handling, the amount of documents processed in parallel is
|
||||
controlled by the :setting:`OCR_NODE_CONCURRENT_EXECUTION` configuration
|
||||
option. Ideally the machine serving **Mayan EDMS** should disable OCR
|
||||
processing by settings this options to 0, with other machines or cloud
|
||||
instances then connected to the same database doing the OCR processing.
|
||||
The document is checked to see if there are text parsers available, is
|
||||
no parser is available for that file type then the document is passed
|
||||
later handling. Ideally the machine serving **Mayan EDMS** should not do OCR
|
||||
processing, with other machines or cloud instances then connected to the same
|
||||
database doing the OCR processing. The document is checked to see if there are
|
||||
text parsers available, is no parser is available for that file type then the document is passed
|
||||
to Tesseract_ page by page and the results stored per page, this is to
|
||||
keep the page image in sync with the transcribed text. However when
|
||||
keep the page image in sync with the transcribed text. However when
|
||||
viewing the document in the details tab all the pages text are
|
||||
concatenated and shown to the user. All newly uploaded documents will be
|
||||
queued automatically for OCR, if this is not desired setting the :setting:`OCR_AUTOMATIC_OCR`
|
||||
|
||||
@@ -288,25 +288,6 @@ Default: ``eng``
|
||||
Language code passed to the ``tesseract`` executable.
|
||||
|
||||
|
||||
.. setting:: OCR_REPLICATION_DELAY
|
||||
|
||||
**OCR_REPLICATION_DELAY**
|
||||
|
||||
Default: ``0``
|
||||
|
||||
Amount of seconds to delay OCR of documents to allow for the node's
|
||||
storage replication overhead.
|
||||
|
||||
|
||||
.. setting:: OCR_NODE_CONCURRENT_EXECUTION
|
||||
|
||||
**OCR_NODE_CONCURRENT_EXECUTION**
|
||||
|
||||
Default: ``1``
|
||||
|
||||
Maximum amount of concurrent document OCRs a node can perform.
|
||||
|
||||
|
||||
.. setting:: OCR_AUTOMATIC_OCR
|
||||
|
||||
**OCR_AUTOMATIC_OCR**
|
||||
@@ -317,13 +298,6 @@ Automatically queue newly created documents or newly uploaded versions
|
||||
of existing documents for OCR.
|
||||
|
||||
|
||||
.. setting:: OCR_QUEUE_PROCESSING_INTERVAL
|
||||
|
||||
**OCR_QUEUE_PROCESSING_INTERVAL**
|
||||
|
||||
Default: ``10``
|
||||
|
||||
|
||||
.. setting:: OCR_UNPAPER_PATH
|
||||
|
||||
**OCR_UNPAPER_PATH**
|
||||
|
||||
@@ -7,6 +7,8 @@ from django.db.models.signals import post_save
|
||||
from django.dispatch import receiver
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from south.signals import post_migrate
|
||||
|
||||
from acls.api import class_permissions
|
||||
from documents.models import Document, DocumentVersion
|
||||
from main.api import register_maintenance_links
|
||||
@@ -15,32 +17,22 @@ from project_tools.api import register_tool
|
||||
from scheduler.api import register_interval_job
|
||||
from statistics.classes import StatisticNamespace
|
||||
|
||||
from south.signals import post_migrate
|
||||
|
||||
from .exceptions import AlreadyQueued
|
||||
from .links import (all_document_ocr_cleanup, document_queue_disable,
|
||||
document_queue_enable, ocr_tool_link, queue_document_list,
|
||||
queue_document_multiple_delete, re_queue_multiple_document,
|
||||
submit_document, submit_document_multiple)
|
||||
from .literals import (QUEUEDOCUMENT_STATE_PENDING,
|
||||
QUEUEDOCUMENT_STATE_PROCESSING)
|
||||
from .links import (all_document_ocr_cleanup, ocr_tool_link,
|
||||
queue_document_list, queue_document_multiple_delete,
|
||||
re_queue_multiple_document, submit_document,
|
||||
submit_document_multiple)
|
||||
from .models import DocumentQueue
|
||||
from .permissions import PERMISSION_OCR_DOCUMENT
|
||||
from .settings import AUTOMATIC_OCR, QUEUE_PROCESSING_INTERVAL
|
||||
from .settings import AUTOMATIC_OCR
|
||||
from .statistics import OCRStatistics
|
||||
from .tasks import task_process_document_queues
|
||||
from .tasks import task_do_ocr
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
register_links(Document, [submit_document])
|
||||
register_multi_item_links(['documents:document_find_duplicates', 'folders:folder_view', 'indexing:index_instance_node_view', 'documents:document_type_document_list', 'search:search', 'search:results', 'linking:document_group_view', 'documents:document_list', 'document:document_list_recent', 'tags:tag_tagged_item_list'], [submit_document_multiple])
|
||||
|
||||
register_links(DocumentQueue, [document_queue_disable, document_queue_enable])
|
||||
|
||||
register_multi_item_links(['ocr:queue_document_list'], [re_queue_multiple_document, queue_document_multiple_delete])
|
||||
|
||||
register_links(['ocr:document_queue_disable', 'ocr:document_queue_enable', 'ocr:queue_document_list'], [queue_document_list], menu_name='secondary_menu')
|
||||
register_links(['ocr:queue_document_list'], [queue_document_list], menu_name='secondary_menu')
|
||||
|
||||
register_maintenance_links([all_document_ocr_cleanup], namespace='ocr', title=_(u'OCR'))
|
||||
|
||||
@@ -51,10 +43,7 @@ def document_post_save(sender, instance, **kwargs):
|
||||
logger.debug('instance: %s' % instance)
|
||||
if kwargs.get('created', False):
|
||||
if AUTOMATIC_OCR:
|
||||
try:
|
||||
DocumentQueue.objects.queue_document(instance.document)
|
||||
except AlreadyQueued:
|
||||
pass
|
||||
task_do_ocr(instance.document.pk)
|
||||
|
||||
|
||||
@receiver(post_migrate, dispatch_uid='create_default_queue')
|
||||
@@ -63,24 +52,9 @@ def create_default_queue_signal_handler(sender, **kwargs):
|
||||
DocumentQueue.objects.get_or_create(name='default')
|
||||
|
||||
|
||||
def reset_queue_documents():
|
||||
try:
|
||||
default_queue = DocumentQueue.objects.get(name='default')
|
||||
except (DatabaseError, DocumentQueue.DoesNotExist):
|
||||
pass
|
||||
else:
|
||||
default_queue.queuedocument_set.filter(state=QUEUEDOCUMENT_STATE_PROCESSING).update(state=QUEUEDOCUMENT_STATE_PENDING)
|
||||
|
||||
|
||||
register_interval_job('task_process_document_queues', _(u'Checks the OCR queue for pending documents.'), task_process_document_queues, seconds=QUEUE_PROCESSING_INTERVAL)
|
||||
|
||||
register_tool(ocr_tool_link)
|
||||
|
||||
class_permissions(Document, [
|
||||
PERMISSION_OCR_DOCUMENT,
|
||||
])
|
||||
|
||||
reset_queue_documents()
|
||||
class_permissions(Document, [PERMISSION_OCR_DOCUMENT])
|
||||
|
||||
namespace = StatisticNamespace(name='ocr', label=_(u'OCR'))
|
||||
namespace.add_statistic(OCRStatistics(name='ocr_stats', label=_(u'OCR queue statistics')))
|
||||
|
||||
@@ -14,7 +14,7 @@ class QueueDocumentInline(admin.StackedInline):
|
||||
|
||||
class DocumentQueueAdmin(admin.ModelAdmin):
|
||||
inlines = [QueueDocumentInline]
|
||||
list_display = ('name', 'label', 'state')
|
||||
list_display = ('name', 'label')
|
||||
|
||||
|
||||
admin.site.register(DocumentQueue, DocumentQueueAdmin)
|
||||
|
||||
@@ -30,14 +30,14 @@ except sh.CommandNotFound:
|
||||
UNPAPER = None
|
||||
|
||||
|
||||
def do_document_ocr(queue_document):
|
||||
def do_document_ocr(document):
|
||||
"""
|
||||
Try first to extract text from document pages using the registered
|
||||
parser, if the parser fails or if there is no parser registered for
|
||||
the document mimetype do a visual OCR by calling the corresponding
|
||||
OCR backend
|
||||
"""
|
||||
for document_page in queue_document.document.pages.all():
|
||||
for document_page in document.pages.all():
|
||||
try:
|
||||
# Try to extract text by means of a parser
|
||||
parse_document_page(document_page)
|
||||
@@ -73,9 +73,6 @@ def do_document_ocr(queue_document):
|
||||
document_page.content = ocr_cleanup(ocr_text)
|
||||
document_page.page_label = _(u'Text from OCR')
|
||||
document_page.save()
|
||||
except Exception as e:
|
||||
logger.debug('missing ocr backend: %s' % ocr_backend)
|
||||
logger.debug('I/O error({0}): {1}'.format(e.errno, e.strerror))
|
||||
finally:
|
||||
fs_cleanup(pre_ocr_filepath_w_ext)
|
||||
fs_cleanup(unpaper_input)
|
||||
|
||||
@@ -1,10 +1,3 @@
|
||||
class AlreadyQueued(Exception):
|
||||
"""
|
||||
Raised when a trying to queue document already in the queue
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class OCRError(Exception):
|
||||
"""
|
||||
Raised by the OCR backend
|
||||
|
||||
@@ -4,8 +4,7 @@ from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from .permissions import (PERMISSION_OCR_CLEAN_ALL_PAGES,
|
||||
PERMISSION_OCR_DOCUMENT,
|
||||
PERMISSION_OCR_DOCUMENT_DELETE,
|
||||
PERMISSION_OCR_QUEUE_ENABLE_DISABLE)
|
||||
PERMISSION_OCR_DOCUMENT_DELETE)
|
||||
|
||||
submit_document = {'text': _('Submit to OCR queue'), 'view': 'ocr:submit_document', 'args': 'object.id', 'famfam': 'hourglass_add', 'permissions': [PERMISSION_OCR_DOCUMENT]}
|
||||
submit_document_multiple = {'text': _('Submit to OCR queue'), 'view': 'ocr:submit_document_multiple', 'famfam': 'hourglass_add', 'permissions': [PERMISSION_OCR_DOCUMENT]}
|
||||
@@ -14,9 +13,6 @@ re_queue_multiple_document = {'text': _('Re-queue'), 'view': 'ocr:re_queue_multi
|
||||
queue_document_delete = {'text': _(u'Delete'), 'view': 'ocr:queue_document_delete', 'args': 'object.id', 'famfam': 'hourglass_delete', 'permissions': [PERMISSION_OCR_DOCUMENT_DELETE]}
|
||||
queue_document_multiple_delete = {'text': _(u'Delete'), 'view': 'ocr:queue_document_multiple_delete', 'famfam': 'hourglass_delete', 'permissions': [PERMISSION_OCR_DOCUMENT_DELETE]}
|
||||
|
||||
document_queue_disable = {'text': _(u'Stop queue'), 'view': 'ocr:document_queue_disable', 'args': 'queue.id', 'famfam': 'control_stop_blue', 'permissions': [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]}
|
||||
document_queue_enable = {'text': _(u'Activate queue'), 'view': 'ocr:document_queue_enable', 'args': 'queue.id', 'famfam': 'control_play_blue', 'permissions': [PERMISSION_OCR_QUEUE_ENABLE_DISABLE]}
|
||||
|
||||
all_document_ocr_cleanup = {'text': _(u'Clean up pages content'), 'view': 'ocr:all_document_ocr_cleanup', 'famfam': 'text_strikethrough', 'permissions': [PERMISSION_OCR_CLEAN_ALL_PAGES], 'description': _(u'Runs a language filter to remove common OCR mistakes from document pages content.')}
|
||||
|
||||
queue_document_list = {'text': _(u'Queue document list'), 'view': 'ocr:queue_document_list', 'famfam': 'hourglass', 'permissions': [PERMISSION_OCR_DOCUMENT]}
|
||||
|
||||
@@ -1,23 +1,5 @@
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
DOCUMENTQUEUE_STATE_STOPPED = 's'
|
||||
DOCUMENTQUEUE_STATE_ACTIVE = 'a'
|
||||
|
||||
DOCUMENTQUEUE_STATE_CHOICES = (
|
||||
(DOCUMENTQUEUE_STATE_STOPPED, _(u'stopped')),
|
||||
(DOCUMENTQUEUE_STATE_ACTIVE, _(u'active')),
|
||||
)
|
||||
|
||||
QUEUEDOCUMENT_STATE_PENDING = 'p'
|
||||
QUEUEDOCUMENT_STATE_PROCESSING = 'i'
|
||||
QUEUEDOCUMENT_STATE_ERROR = 'e'
|
||||
|
||||
QUEUEDOCUMENT_STATE_CHOICES = (
|
||||
(QUEUEDOCUMENT_STATE_PENDING, _(u'pending')),
|
||||
(QUEUEDOCUMENT_STATE_PROCESSING, _(u'processing')),
|
||||
(QUEUEDOCUMENT_STATE_ERROR, _(u'error')),
|
||||
)
|
||||
|
||||
DEFAULT_OCR_FILE_FORMAT = u'tiff'
|
||||
DEFAULT_OCR_FILE_EXTENSION = u'tif'
|
||||
UNPAPER_FILE_FORMAT = u'ppm'
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.db import models
|
||||
|
||||
from .exceptions import AlreadyQueued
|
||||
|
||||
|
||||
class DocumentQueueManager(models.Manager):
|
||||
"""
|
||||
Module manager class to handle adding documents to an OCR document
|
||||
queue
|
||||
"""
|
||||
|
||||
def queue_document(self, document, queue_name='default'):
|
||||
document_queue = self.model.objects.get(name=queue_name)
|
||||
if document_queue.queuedocument_set.filter(document=document):
|
||||
raise AlreadyQueued
|
||||
|
||||
document_queue.queuedocument_set.create(document=document, delay=True)
|
||||
|
||||
return document_queue
|
||||
@@ -0,0 +1,69 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from south.utils import datetime_utils as datetime
|
||||
from south.db import db
|
||||
from south.v2 import SchemaMigration
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Migration(SchemaMigration):
|
||||
|
||||
def forwards(self, orm):
|
||||
# Deleting field 'DocumentQueue.state'
|
||||
db.delete_column(u'ocr_documentqueue', 'state')
|
||||
|
||||
# Deleting field 'QueueDocument.delay'
|
||||
db.delete_column(u'ocr_queuedocument', 'delay')
|
||||
|
||||
# Deleting field 'QueueDocument.state'
|
||||
db.delete_column(u'ocr_queuedocument', 'state')
|
||||
|
||||
|
||||
def backwards(self, orm):
|
||||
# Adding field 'DocumentQueue.state'
|
||||
db.add_column(u'ocr_documentqueue', 'state',
|
||||
self.gf('django.db.models.fields.CharField')(default='a', max_length=4),
|
||||
keep_default=False)
|
||||
|
||||
# Adding field 'QueueDocument.delay'
|
||||
db.add_column(u'ocr_queuedocument', 'delay',
|
||||
self.gf('django.db.models.fields.BooleanField')(default=False),
|
||||
keep_default=False)
|
||||
|
||||
# Adding field 'QueueDocument.state'
|
||||
db.add_column(u'ocr_queuedocument', 'state',
|
||||
self.gf('django.db.models.fields.CharField')(default='p', max_length=4),
|
||||
keep_default=False)
|
||||
|
||||
|
||||
models = {
|
||||
u'documents.document': {
|
||||
'Meta': {'ordering': "['-date_added']", 'object_name': 'Document'},
|
||||
'date_added': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
|
||||
'description': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
|
||||
'document_type': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['documents.DocumentType']", 'null': 'True', 'blank': 'True'}),
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'uuid': ('django.db.models.fields.CharField', [], {'max_length': '48', 'blank': 'True'})
|
||||
},
|
||||
u'documents.documenttype': {
|
||||
'Meta': {'ordering': "['name']", 'object_name': 'DocumentType'},
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'})
|
||||
},
|
||||
u'ocr.documentqueue': {
|
||||
'Meta': {'object_name': 'DocumentQueue'},
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'label': ('django.db.models.fields.CharField', [], {'max_length': '64'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64'})
|
||||
},
|
||||
u'ocr.queuedocument': {
|
||||
'Meta': {'ordering': "('datetime_submitted',)", 'object_name': 'QueueDocument'},
|
||||
'datetime_submitted': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'db_index': 'True', 'blank': 'True'}),
|
||||
'document': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['documents.Document']"}),
|
||||
'document_queue': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'documents'", 'to': u"orm['ocr.DocumentQueue']"}),
|
||||
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'node_name': ('django.db.models.fields.CharField', [], {'max_length': '32', 'null': 'True', 'blank': 'True'}),
|
||||
'result': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'})
|
||||
}
|
||||
}
|
||||
|
||||
complete_apps = ['ocr']
|
||||
@@ -9,21 +9,11 @@ from django.utils.translation import ugettext_lazy as _
|
||||
from documents.models import Document
|
||||
|
||||
from .exceptions import ReQueueError
|
||||
from .literals import (DOCUMENTQUEUE_STATE_ACTIVE, DOCUMENTQUEUE_STATE_CHOICES,
|
||||
QUEUEDOCUMENT_STATE_CHOICES, QUEUEDOCUMENT_STATE_PENDING,
|
||||
QUEUEDOCUMENT_STATE_PROCESSING)
|
||||
from .managers import DocumentQueueManager
|
||||
|
||||
|
||||
class DocumentQueue(models.Model):
|
||||
name = models.CharField(max_length=64, unique=True, verbose_name=_(u'Name'))
|
||||
label = models.CharField(max_length=64, verbose_name=_(u'Label'))
|
||||
state = models.CharField(max_length=4,
|
||||
choices=DOCUMENTQUEUE_STATE_CHOICES,
|
||||
default=DOCUMENTQUEUE_STATE_ACTIVE,
|
||||
verbose_name=_(u'State'))
|
||||
|
||||
objects = DocumentQueueManager()
|
||||
|
||||
class Meta:
|
||||
verbose_name = _(u'Document queue')
|
||||
@@ -34,14 +24,9 @@ class DocumentQueue(models.Model):
|
||||
|
||||
|
||||
class QueueDocument(models.Model):
|
||||
document_queue = models.ForeignKey(DocumentQueue, verbose_name=_(u'Document queue'))
|
||||
document_queue = models.ForeignKey(DocumentQueue, related_name='documents', verbose_name=_(u'Document queue'))
|
||||
document = models.ForeignKey(Document, verbose_name=_(u'Document'))
|
||||
datetime_submitted = models.DateTimeField(verbose_name=_(u'Date time submitted'), auto_now_add=True, db_index=True)
|
||||
delay = models.BooleanField(verbose_name=_(u'Delay OCR'), default=False)
|
||||
state = models.CharField(max_length=4,
|
||||
choices=QUEUEDOCUMENT_STATE_CHOICES,
|
||||
default=QUEUEDOCUMENT_STATE_PENDING,
|
||||
verbose_name=_(u'State'))
|
||||
datetime_submitted = models.DateTimeField(verbose_name=_(u'Date time submitted'), auto_now=True, db_index=True)
|
||||
result = models.TextField(blank=True, null=True, verbose_name=_(u'Result'))
|
||||
node_name = models.CharField(max_length=32, verbose_name=_(u'Node name'), blank=True, null=True)
|
||||
|
||||
|
||||
@@ -7,6 +7,5 @@ from permissions.models import Permission, PermissionNamespace
|
||||
ocr_namespace = PermissionNamespace('ocr', _(u'OCR'))
|
||||
PERMISSION_OCR_DOCUMENT = Permission.objects.register(ocr_namespace, 'ocr_document', _(u'Submit documents for OCR'))
|
||||
PERMISSION_OCR_DOCUMENT_DELETE = Permission.objects.register(ocr_namespace, 'ocr_document_delete', _(u'Delete documents from OCR queue'))
|
||||
PERMISSION_OCR_QUEUE_ENABLE_DISABLE = Permission.objects.register(ocr_namespace, 'ocr_queue_enable_disable', _(u'Can enable/disable the OCR queue'))
|
||||
PERMISSION_OCR_CLEAN_ALL_PAGES = Permission.objects.register(ocr_namespace, 'ocr_clean_all_pages', _(u'Can execute the OCR clean up on all document pages'))
|
||||
PERMISSION_OCR_QUEUE_EDIT = Permission.objects.register(ocr_namespace, 'ocr_queue_edit', _(u'Can edit an OCR queue properties'))
|
||||
|
||||
@@ -10,10 +10,7 @@ register_settings(
|
||||
settings=[
|
||||
{'name': u'TESSERACT_PATH', 'global_name': u'OCR_TESSERACT_PATH', 'default': u'/usr/bin/tesseract', 'exists': True},
|
||||
{'name': u'LANGUAGE', 'global_name': u'OCR_LANGUAGE', 'default': u'eng'},
|
||||
{'name': u'REPLICATION_DELAY', 'global_name': u'OCR_REPLICATION_DELAY', 'default': 0, 'description': _(u'Amount of seconds to delay OCR of documents to allow for the node\'s storage replication overhead.')},
|
||||
{'name': u'NODE_CONCURRENT_EXECUTION', 'global_name': u'OCR_NODE_CONCURRENT_EXECUTION', 'default': 1, 'description': _(u'Maximum amount of concurrent document OCRs a node can perform.')},
|
||||
{'name': u'AUTOMATIC_OCR', 'global_name': u'OCR_AUTOMATIC_OCR', 'default': True, 'description': _(u'Automatically queue newly created documents for OCR.')},
|
||||
{'name': u'QUEUE_PROCESSING_INTERVAL', 'global_name': u'OCR_QUEUE_PROCESSING_INTERVAL', 'default': 10},
|
||||
{'name': u'UNPAPER_PATH', 'global_name': u'OCR_UNPAPER_PATH', 'default': u'/usr/bin/unpaper', 'description': _(u'File path to unpaper program.'), 'exists': True},
|
||||
{'name': u'PDFTOTEXT_PATH', 'global_name': u'OCR_PDFTOTEXT_PATH', 'default': u'/usr/bin/pdftotext', 'description': _(u'File path to poppler\'s pdftotext program used to extract text from PDF files.'), 'exists': True},
|
||||
{'name': u'BACKEND', 'global_name': u'OCR_BACKEND', 'default': u'ocr.backends.tesseract.Tesseract', 'description': _(u'Full path to the backend to be used to do OCR.')},
|
||||
|
||||
@@ -10,37 +10,34 @@ from django.conf import settings
|
||||
from django.db.models import Q
|
||||
from django.utils.timezone import now
|
||||
|
||||
from documents.models import Document
|
||||
from job_processor.api import process_job
|
||||
from lock_manager import Lock, LockError
|
||||
from mayan.celery import app
|
||||
|
||||
from .api import do_document_ocr
|
||||
from .literals import (DOCUMENTQUEUE_STATE_ACTIVE, QUEUEDOCUMENT_STATE_ERROR,
|
||||
QUEUEDOCUMENT_STATE_PENDING,
|
||||
QUEUEDOCUMENT_STATE_PROCESSING)
|
||||
from .models import DocumentQueue, QueueDocument
|
||||
from .settings import NODE_CONCURRENT_EXECUTION, REPLICATION_DELAY
|
||||
|
||||
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
|
||||
# TODO: Tie LOCK_EXPIRATION with hard task timeout
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
LOCK_EXPIRE = 60 * 10 # Adjust to worst case scenario
|
||||
|
||||
|
||||
def task_process_queue_document(queue_document_id):
|
||||
lock_id = u'task_proc_queue_doc-%d' % queue_document_id
|
||||
@app.task
|
||||
def task_do_ocr(document_pk):
|
||||
lock_id = u'task_do_ocr_doc-%d' % document_pk
|
||||
try:
|
||||
logger.debug('trying to acquire lock: %s' % lock_id)
|
||||
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
logger.debug('acquired lock: %s' % lock_id)
|
||||
queue_document = QueueDocument.objects.get(pk=queue_document_id)
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
|
||||
queue_document.node_name = platform.node()
|
||||
queue_document.save()
|
||||
try:
|
||||
do_document_ocr(queue_document)
|
||||
queue_document.delete()
|
||||
logger.info('Starting document OCR for document: %d' % document_pk)
|
||||
document = Document.objects.get(pk=document_pk)
|
||||
do_document_ocr(document)
|
||||
except Exception as exception:
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
|
||||
logger.error('OCR error for document: %d; %s' % (document_pk, exception))
|
||||
document_queue = DocumentQueue.objects.get(name='default')
|
||||
queue_document, created = document_queue.documents.get_or_create(document=document)
|
||||
queue_document.node_name = platform.node()
|
||||
|
||||
if settings.DEBUG:
|
||||
result = []
|
||||
@@ -52,37 +49,17 @@ def task_process_queue_document(queue_document_id):
|
||||
queue_document.result = exception
|
||||
|
||||
queue_document.save()
|
||||
|
||||
lock.release()
|
||||
else:
|
||||
logger.info('OCR for document: %d ended' % document_pk)
|
||||
document_queue = DocumentQueue.objects.get(name='default')
|
||||
try:
|
||||
queue_document = document_queue.documents.get(document=document)
|
||||
except QueueDocument.DoesNotExist:
|
||||
pass
|
||||
else:
|
||||
queue_document.delete()
|
||||
finally:
|
||||
lock.release()
|
||||
except LockError:
|
||||
logger.debug('unable to obtain lock')
|
||||
pass
|
||||
|
||||
|
||||
def task_process_document_queues():
|
||||
logger.debug('executed')
|
||||
# TODO: reset_orphans()
|
||||
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
|
||||
q_delayed = Q(delay=True)
|
||||
q_delay_interval = Q(datetime_submitted__lt=now() - timedelta(seconds=REPLICATION_DELAY))
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
current_local_processing_count = QueueDocument.objects.filter(
|
||||
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
|
||||
node_name=platform.node()).count()
|
||||
if current_local_processing_count < NODE_CONCURRENT_EXECUTION:
|
||||
try:
|
||||
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
|
||||
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
|
||||
|
||||
if oldest_queued_document_qs:
|
||||
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
|
||||
process_job(task_process_queue_document, oldest_queued_document.pk)
|
||||
except Exception as exception:
|
||||
logger.error('unhandled exception: %s' % exception)
|
||||
finally:
|
||||
# Don't process anymore from this queryset, might be stale
|
||||
break
|
||||
else:
|
||||
logger.debug('already processing maximum')
|
||||
else:
|
||||
logger.debug('nothing to process')
|
||||
|
||||
@@ -9,7 +9,4 @@ urlpatterns = patterns('ocr.views',
|
||||
url(r'^queue/document/(?P<queue_document_id>\d+)/re-queue/$', 're_queue_document', (), 're_queue_document'),
|
||||
url(r'^queue/document/multiple/re-queue/$', 're_queue_multiple_document', (), 're_queue_multiple_document'),
|
||||
|
||||
url(r'^queue/(?P<document_queue_id>\d+)/enable/$', 'document_queue_enable', (), 'document_queue_enable'),
|
||||
url(r'^queue/(?P<document_queue_id>\d+)/disable/$', 'document_queue_disable', (), 'document_queue_disable'),
|
||||
|
||||
url(r'^document/all/clean_up/$', 'all_document_ocr_cleanup', (), 'all_document_ocr_cleanup'),)
|
||||
|
||||
@@ -14,15 +14,12 @@ from documents.widgets import document_link, document_thumbnail
|
||||
from permissions.models import Permission
|
||||
|
||||
from .api import clean_pages
|
||||
from .exceptions import AlreadyQueued, ReQueueError
|
||||
from .literals import (DOCUMENTQUEUE_STATE_ACTIVE,
|
||||
DOCUMENTQUEUE_STATE_STOPPED,
|
||||
QUEUEDOCUMENT_STATE_PROCESSING)
|
||||
from .exceptions import ReQueueError
|
||||
from .models import DocumentQueue, QueueDocument
|
||||
from .permissions import (PERMISSION_OCR_CLEAN_ALL_PAGES,
|
||||
PERMISSION_OCR_DOCUMENT,
|
||||
PERMISSION_OCR_DOCUMENT_DELETE,
|
||||
PERMISSION_OCR_QUEUE_ENABLE_DISABLE)
|
||||
PERMISSION_OCR_DOCUMENT_DELETE)
|
||||
from .tasks import task_do_ocr
|
||||
|
||||
|
||||
def queue_document_list(request, queue_name='default'):
|
||||
@@ -31,7 +28,7 @@ def queue_document_list(request, queue_name='default'):
|
||||
document_queue = get_object_or_404(DocumentQueue, name=queue_name)
|
||||
|
||||
context = {
|
||||
'object_list': document_queue.queuedocument_set.all(),
|
||||
'object_list': document_queue.documents.all(),
|
||||
'title': _(u'Documents in queue: %s') % document_queue,
|
||||
'hide_object': True,
|
||||
'queue': document_queue,
|
||||
@@ -39,25 +36,13 @@ def queue_document_list(request, queue_name='default'):
|
||||
'navigation_object_name': 'queue',
|
||||
'list_object_variable_name': 'queue_document',
|
||||
'extra_columns': [
|
||||
{'name': 'document', 'attribute': encapsulate(lambda x: document_link(x.document) if hasattr(x, 'document') else _(u'Missing document.'))},
|
||||
{'name': _(u'thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document))},
|
||||
{'name': 'submitted', 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True},
|
||||
{'name': 'delay', 'attribute': 'delay'},
|
||||
{'name': 'state', 'attribute': encapsulate(lambda x: x.get_state_display())},
|
||||
{'name': 'node', 'attribute': 'node_name'},
|
||||
{'name': 'result', 'attribute': 'result'},
|
||||
{'name': _('Document'), 'attribute': encapsulate(lambda x: document_link(x.document) if hasattr(x, 'document') else _(u'Missing document.'))},
|
||||
{'name': _(u'Thumbnail'), 'attribute': encapsulate(lambda x: document_thumbnail(x.document))},
|
||||
{'name': _('Added'), 'attribute': encapsulate(lambda x: unicode(x.datetime_submitted).split('.')[0]), 'keep_together':True},
|
||||
{'name': _('Node'), 'attribute': 'node_name'},
|
||||
{'name': _('Result'), 'attribute': 'result'},
|
||||
],
|
||||
'multi_select_as_buttons': True,
|
||||
'sidebar_subtemplates_list': [
|
||||
{
|
||||
'name': 'main/generic_subtemplate.html',
|
||||
'context': {
|
||||
'side_bar': True,
|
||||
'title': _(u'Document queue properties'),
|
||||
'content': _(u'Current state: %s') % document_queue.get_state_display(),
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
return render_to_response('main/generic_list.html', context,
|
||||
@@ -81,12 +66,9 @@ def queue_document_delete(request, queue_document_id=None, queue_document_id_lis
|
||||
if request.method == 'POST':
|
||||
for queue_document in queue_documents:
|
||||
try:
|
||||
if queue_document.state == QUEUEDOCUMENT_STATE_PROCESSING:
|
||||
messages.error(request, _(u'Document: %s is being processed and can\'t be deleted.') % queue_document)
|
||||
else:
|
||||
queue_document.delete()
|
||||
messages.success(request, _(u'Queue document: %(document)s deleted successfully.') % {
|
||||
'document': queue_document.document})
|
||||
queue_document.delete()
|
||||
messages.success(request, _(u'Queue document: %(document)s deleted successfully.') % {
|
||||
'document': queue_document.document})
|
||||
|
||||
except Exception as exception:
|
||||
messages.error(request, _(u'Error deleting document: %(document)s; %(error)s') % {
|
||||
@@ -138,17 +120,10 @@ def submit_document_to_queue(request, document, post_submit_redirect=None):
|
||||
This view is meant to be reusable
|
||||
"""
|
||||
|
||||
try:
|
||||
document_queue = DocumentQueue.objects.queue_document(document)
|
||||
messages.success(request, _(u'Document: %(document)s was added to the OCR queue: %(queue)s.') % {
|
||||
'document': document, 'queue': document_queue.label}
|
||||
)
|
||||
except AlreadyQueued:
|
||||
messages.warning(request, _(u'Document: %(document)s is already queued.') % {
|
||||
'document': document}
|
||||
)
|
||||
except Exception as exception:
|
||||
messages.error(request, exception)
|
||||
task_do_ocr.delay(document.pk)
|
||||
messages.success(request, _(u'Document: %(document)s was added to the OCR queue.') % {
|
||||
'document': document}
|
||||
)
|
||||
|
||||
if post_submit_redirect:
|
||||
return HttpResponseRedirect(post_submit_redirect)
|
||||
@@ -208,60 +183,6 @@ def re_queue_multiple_document(request):
|
||||
return re_queue_document(request, queue_document_id_list=request.GET.get('id_list', []))
|
||||
|
||||
|
||||
def document_queue_disable(request, document_queue_id):
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
|
||||
|
||||
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
|
||||
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
|
||||
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
|
||||
|
||||
if document_queue.state == DOCUMENTQUEUE_STATE_STOPPED:
|
||||
messages.warning(request, _(u'Document queue: %s, already stopped.') % document_queue)
|
||||
return HttpResponseRedirect(previous)
|
||||
|
||||
if request.method == 'POST':
|
||||
document_queue.state = DOCUMENTQUEUE_STATE_STOPPED
|
||||
document_queue.save()
|
||||
messages.success(request, _(u'Document queue: %s, stopped successfully.') % document_queue)
|
||||
return HttpResponseRedirect(next)
|
||||
|
||||
return render_to_response('main/generic_confirm.html', {
|
||||
'queue': document_queue,
|
||||
'navigation_object_name': 'queue',
|
||||
'title': _(u'Are you sure you wish to disable document queue: %s') % document_queue,
|
||||
'next': next,
|
||||
'previous': previous,
|
||||
'form_icon': u'control_stop_blue.png',
|
||||
}, context_instance=RequestContext(request))
|
||||
|
||||
|
||||
def document_queue_enable(request, document_queue_id):
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_QUEUE_ENABLE_DISABLE])
|
||||
|
||||
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
|
||||
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
|
||||
document_queue = get_object_or_404(DocumentQueue, pk=document_queue_id)
|
||||
|
||||
if document_queue.state == DOCUMENTQUEUE_STATE_ACTIVE:
|
||||
messages.warning(request, _(u'Document queue: %s, already active.') % document_queue)
|
||||
return HttpResponseRedirect(previous)
|
||||
|
||||
if request.method == 'POST':
|
||||
document_queue.state = DOCUMENTQUEUE_STATE_ACTIVE
|
||||
document_queue.save()
|
||||
messages.success(request, _(u'Document queue: %s, activated successfully.') % document_queue)
|
||||
return HttpResponseRedirect(next)
|
||||
|
||||
return render_to_response('main/generic_confirm.html', {
|
||||
'queue': document_queue,
|
||||
'navigation_object_name': 'queue',
|
||||
'title': _(u'Are you sure you wish to activate document queue: %s') % document_queue,
|
||||
'next': next,
|
||||
'previous': previous,
|
||||
'form_icon': u'control_play_blue.png',
|
||||
}, context_instance=RequestContext(request))
|
||||
|
||||
|
||||
def all_document_ocr_cleanup(request):
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_OCR_CLEAN_ALL_PAGES])
|
||||
|
||||
|
||||
14
mayan/celery.py
Normal file
14
mayan/celery.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import django
|
||||
|
||||
from celery import Celery
|
||||
from django.conf import settings
|
||||
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mayan.settings')
|
||||
|
||||
app = Celery('mayan')
|
||||
|
||||
app.config_from_object('django.conf:settings')
|
||||
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
|
||||
@@ -46,6 +46,8 @@ INSTALLED_APPS = (
|
||||
'django.contrib.staticfiles',
|
||||
# 3rd party
|
||||
'south',
|
||||
'kombu.transport.django',
|
||||
'djcelery',
|
||||
'rest_framework_swagger',
|
||||
'filetransfers',
|
||||
'taggit',
|
||||
@@ -258,3 +260,6 @@ REST_FRAMEWORK = {
|
||||
'rest_framework.authentication.SessionAuthentication',
|
||||
)
|
||||
}
|
||||
# ----------- Celery ----------
|
||||
BROKER_URL = 'django://'
|
||||
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
APScheduler==2.1.2
|
||||
|
||||
celery==3.1.15
|
||||
cssmin==0.2.0
|
||||
|
||||
Django==1.6.5
|
||||
django-celery==3.1.16
|
||||
django-filetransfers==0.1.0
|
||||
django-pagination==1.0.7
|
||||
django-compressor==1.4
|
||||
|
||||
Reference in New Issue
Block a user