diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index e69d816277..66289070de 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -7,6 +7,7 @@ from django.db.models import Q from django.utils.translation import ugettext as _ from celery.decorators import task, periodic_task +from celery.task.control import inspect from ocr.api import do_document_ocr from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \ @@ -37,8 +38,31 @@ def task_process_queue_document(queue_document_id): queue_document.save() +def reset_orphans(): + i = inspect() + active_tasks = [] + + if i: + for host, instances in i.active().items(): + for instance in instances: + active_tasks.append(instance['id']) + + for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): + orphans = document_queue.queuedocument_set.\ + filter(state=QUEUEDOCUMENT_STATE_PROCESSING).\ + exclude(result__in=active_tasks) + + for orphan in orphans: + orphan.result = _(u'Orphaned') + orphan.state = QUEUEDOCUMENT_STATE_PENDING + orphan.delay = False + orphan.node_name = None + orphan.save() + + @periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL)) def task_process_document_queues(): + reset_orphans() #Introduce random 0 < t < 1 second delay to further reduce the #chance of a race condition time.sleep(random.random())