diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index dee3d64da9..125d8fa2d0 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -4,9 +4,10 @@ from time import sleep from random import random from django.db.models import Q -from django.core.cache import get_cache from job_processor.api import process_job +from lock_manager.models import Lock +from lock_manager.exceptions import LockError from ocr.api import do_document_ocr from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \ @@ -21,36 +22,13 @@ from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes # TODO: Tie LOCK_EXPIRATION with hard task timeout -if CACHE_URI: - try: - cache_backend = get_cache(CACHE_URI) - except ImportError: - # TODO: display or log error - cache_backend = None -else: - cache_backend = None - - -def random_delay(): - sleep(random() * (QUEUE_PROCESSING_INTERVAL - 1)) - return True - - -if cache_backend: - acquire_lock = lambda lock_id: cache_backend.add(lock_id, u'true', LOCK_EXPIRE) - release_lock = lambda lock_id: cache_backend.delete(lock_id) -else: - acquire_lock = lambda lock_id: True - release_lock = lambda lock_id: True - - def task_process_queue_document(queue_document_id): lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id) - if acquire_lock(lock_id): + try: + lock = Lock.objects.acquire_lock(lock_id, LOCK_EXPIRE) queue_document = QueueDocument.objects.get(pk=queue_document_id) queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING queue_document.node_name = platform.node() - #queue_document.result = task_process_queue_document.request.id queue_document.save() try: do_document_ocr(queue_document) @@ -59,7 +37,10 @@ def task_process_queue_document(queue_document_id): queue_document.state = QUEUEDOCUMENT_STATE_ERROR queue_document.result = e queue_document.save() - release_lock(lock_id) + + lock.release() + except LockError: + pass def reset_orphans(): @@ -86,11 +67,9 @@ def reset_orphans(): orphan.node_name = None orphan.save() ''' - + def task_process_document_queues(): - if not cache_backend: - random_delay() # reset_orphans() # Causes problems with big clusters increased latency # Disabled until better solution @@ -108,8 +87,7 @@ def task_process_document_queues(): if oldest_queued_document_qs: oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - #task_process_queue_document.delay(oldest_queued_document.pk) - #task_process_queue_document(oldest_queued_document.pk) process_job(task_process_queue_document, oldest_queued_document.pk) except Exception, e: - print 'DocumentQueueWatcher exception: %s' % e + pass + #print 'DocumentQueueWatcher exception: %s' % e