Changed the ocr queue processing task from a Class to a function
This commit is contained in:
@@ -5,8 +5,7 @@ import random
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
from celery.task import PeriodicTask
|
||||
from celery.decorators import task
|
||||
from celery.decorators import task, periodic_task
|
||||
|
||||
from ocr.api import do_document_ocr
|
||||
from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \
|
||||
@@ -35,31 +34,24 @@ def task_process_queue_document(queue_document_id):
|
||||
queue_document.save()
|
||||
|
||||
|
||||
class DocumentQueueWatcher(PeriodicTask):
|
||||
run_every = timedelta(seconds=5)
|
||||
@periodic_task(run_every=timedelta(seconds=15))
|
||||
def task_process_document_queues():
|
||||
#Introduce random 0 < t < 1 second delay to further reduce the
|
||||
#chance of a race condition
|
||||
time.sleep(random.random())
|
||||
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
|
||||
q_delayed = Q(delay=True)
|
||||
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
if QueueDocument.objects.filter(
|
||||
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
|
||||
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
|
||||
try:
|
||||
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
|
||||
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
|
||||
|
||||
def run(self, **kwargs):
|
||||
#Introduce random 0 < t < 1 second delay to further reduce the
|
||||
#chance of a race condition
|
||||
time.sleep(random.random())
|
||||
logger = self.get_logger(**kwargs)
|
||||
logger.info('Running queue watcher.')
|
||||
logger.debug('Active queues: %s' % DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE))
|
||||
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
|
||||
q_delayed = Q(delay=True)
|
||||
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
logger.debug('Analysing queue: %s' % document_queue)
|
||||
if QueueDocument.objects.filter(
|
||||
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
|
||||
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
|
||||
try:
|
||||
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
|
||||
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
|
||||
|
||||
if oldest_queued_document_qs:
|
||||
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
|
||||
task_process_queue_document.delay(oldest_queued_document.id)
|
||||
except Exception, e:
|
||||
print 'DocumentQueueWatcher exception: %s' % e
|
||||
return True
|
||||
if oldest_queued_document_qs:
|
||||
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
|
||||
task_process_queue_document.delay(oldest_queued_document.id)
|
||||
except Exception, e:
|
||||
print 'DocumentQueueWatcher exception: %s' % e
|
||||
|
||||
Reference in New Issue
Block a user