diff --git a/apps/ocr/models.py b/apps/ocr/models.py index c92251970e..1b8276bdb4 100644 --- a/apps/ocr/models.py +++ b/apps/ocr/models.py @@ -17,7 +17,7 @@ class DocumentQueueManager(models.Manager): if QueueDocument.objects.filter(document_queue=document_queue, document=document).count(): raise AlreadyQueued - queue_document = QueueDocument(document_queue=document_queue, document=document) + queue_document = QueueDocument(document_queue=document_queue, document=document, delay=True) queue_document.save() return document_queue @@ -45,6 +45,7 @@ class QueueDocument(models.Model): document_queue = models.ForeignKey(DocumentQueue, verbose_name=_(u'document queue')) document = models.ForeignKey(Document, verbose_name=_(u'document')) datetime_submitted = models.DateTimeField(verbose_name=_(u'date time submitted'), auto_now_add=True, db_index=True) + delay = models.BooleanField(verbose_name=_(u'delay ocr'), default=False) state = models.CharField(max_length=4, choices=QUEUEDOCUMENT_STATE_CHOICES, default=QUEUEDOCUMENT_STATE_PENDING, diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index c351164042..1da083c154 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -1,4 +1,7 @@ -from datetime import date, timedelta +from datetime import date, timedelta, datetime + +from django.db.models import Q + from celery.task import Task, PeriodicTask from celery.decorators import task @@ -34,16 +37,20 @@ class DocumentQueueWatcher(PeriodicTask): logger = self.get_logger(**kwargs) logger.info('Running queue watcher.') logger.debug('Active queues: %s' % DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE)) + q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) + q_delayed = Q(delay=True) + q_delay_interval = Q(datetime_submitted__lt=datetime.now()-timedelta(seconds=REPLICATION_DELAY)) for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): logger.debug('Analysing queue: %s' % document_queue) current_running_queues = QueueDocument.objects.filter(state=QUEUEDOCUMENT_STATE_PROCESSING).count() if current_running_queues < MAX_CONCURRENT_EXECUTION: try: - oldest_queued_document = document_queue.queuedocument_set.filter( - state=QUEUEDOCUMENT_STATE_PENDING).filter(datetime_submitted__lt=datetime.datetime.now()-datetime.timedelta(seconds=REPLICATION_DELAY)).order_by('datetime_submitted')[0] - - task_process_queue_document(oldest_queued_document.id).delay() - except: - #No Documents in queue - pass + oldest_queued_document_qs = document_queue.queuedocument_set.filter( + (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) + + if oldest_queued_document_qs: + oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] + task_process_queue_document.delay(oldest_queued_document.id) + except Exception, e: + print 'DocumentQueueWatcher exception: %s' % e return True diff --git a/apps/ocr/views.py b/apps/ocr/views.py index 3dc776213a..da8690a53c 100644 --- a/apps/ocr/views.py +++ b/apps/ocr/views.py @@ -49,6 +49,7 @@ def queue_document_list(request, queue_name='default'): {'name':'document', 'attribute': lambda x: '%s' % (x.document.get_absolute_url(), x.document) if hasattr(x, 'document') else _(u'Missing document.')}, {'name':_(u'thumbnail'), 'attribute': lambda x: _display_thumbnail(x) }, {'name':'submitted', 'attribute': lambda x: unicode(x.datetime_submitted).split('.')[0], 'keep_together':True}, + {'name':'delay', 'attribute':'delay'}, {'name':'state', 'attribute': lambda x: x.get_state_display()}, {'name':'result', 'attribute':'result'}, ], @@ -179,6 +180,7 @@ def re_queue_document(request, queue_document_id=None, queue_document_id_list=[] if queue_document.state == QUEUEDOCUMENT_STATE_ERROR: queue_document.datetime_submitted = datetime.datetime.now() queue_document.state = QUEUEDOCUMENT_STATE_PENDING + queue_document.delay = False queue_document.save() messages.success(request, _(u'Document: %(document)s was re-queued to the OCR queue: %(queue)s') % { 'document':queue_document.document, 'queue':queue_document.document_queue.label})