diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index 1cc4e9eff9..bb218d58ce 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -5,8 +5,7 @@ import random from django.db.models import Q -from celery.task import PeriodicTask -from celery.decorators import task +from celery.decorators import task, periodic_task from ocr.api import do_document_ocr from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \ @@ -35,31 +34,24 @@ def task_process_queue_document(queue_document_id): queue_document.save() -class DocumentQueueWatcher(PeriodicTask): - run_every = timedelta(seconds=5) +@periodic_task(run_every=timedelta(seconds=15)) +def task_process_document_queues(): + #Introduce random 0 < t < 1 second delay to further reduce the + #chance of a race condition + time.sleep(random.random()) + q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) + q_delayed = Q(delay=True) + q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) + for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): + if QueueDocument.objects.filter( + state=QUEUEDOCUMENT_STATE_PROCESSING).filter( + node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION: + try: + oldest_queued_document_qs = document_queue.queuedocument_set.filter( + (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - def run(self, **kwargs): - #Introduce random 0 < t < 1 second delay to further reduce the - #chance of a race condition - time.sleep(random.random()) - logger = self.get_logger(**kwargs) - logger.info('Running queue watcher.') - logger.debug('Active queues: %s' % DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE)) - q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) - q_delayed = Q(delay=True) - q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) - for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): - logger.debug('Analysing queue: %s' % document_queue) - if QueueDocument.objects.filter( - state=QUEUEDOCUMENT_STATE_PROCESSING).filter( - node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION: - try: - oldest_queued_document_qs = document_queue.queuedocument_set.filter( - (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - - if oldest_queued_document_qs: - oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - task_process_queue_document.delay(oldest_queued_document.id) - except Exception, e: - print 'DocumentQueueWatcher exception: %s' % e - return True + if oldest_queued_document_qs: + oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] + task_process_queue_document.delay(oldest_queued_document.id) + except Exception, e: + print 'DocumentQueueWatcher exception: %s' % e