Added detection and reset of orphaned ocr documents being left as 'processing' when celery dies
This commit is contained in:
@@ -7,6 +7,7 @@ from django.db.models import Q
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from celery.decorators import task, periodic_task
|
||||
from celery.task.control import inspect
|
||||
|
||||
from ocr.api import do_document_ocr
|
||||
from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \
|
||||
@@ -37,8 +38,31 @@ def task_process_queue_document(queue_document_id):
|
||||
queue_document.save()
|
||||
|
||||
|
||||
def reset_orphans():
|
||||
i = inspect()
|
||||
active_tasks = []
|
||||
|
||||
if i:
|
||||
for host, instances in i.active().items():
|
||||
for instance in instances:
|
||||
active_tasks.append(instance['id'])
|
||||
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
orphans = document_queue.queuedocument_set.\
|
||||
filter(state=QUEUEDOCUMENT_STATE_PROCESSING).\
|
||||
exclude(result__in=active_tasks)
|
||||
|
||||
for orphan in orphans:
|
||||
orphan.result = _(u'Orphaned')
|
||||
orphan.state = QUEUEDOCUMENT_STATE_PENDING
|
||||
orphan.delay = False
|
||||
orphan.node_name = None
|
||||
orphan.save()
|
||||
|
||||
|
||||
@periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL))
|
||||
def task_process_document_queues():
|
||||
reset_orphans()
|
||||
#Introduce random 0 < t < 1 second delay to further reduce the
|
||||
#chance of a race condition
|
||||
time.sleep(random.random())
|
||||
|
||||
Reference in New Issue
Block a user