Updated the ocr app to use the lock manager

This commit is contained in:
Roberto Rosario
2011-11-22 15:07:29 -04:00
parent a6151fd9e5
commit c9e8f2fac0

View File

@@ -4,9 +4,10 @@ from time import sleep
from random import random from random import random
from django.db.models import Q from django.db.models import Q
from django.core.cache import get_cache
from job_processor.api import process_job from job_processor.api import process_job
from lock_manager.models import Lock
from lock_manager.exceptions import LockError
from ocr.api import do_document_ocr from ocr.api import do_document_ocr
from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \ from ocr.literals import QUEUEDOCUMENT_STATE_PENDING, \
@@ -21,36 +22,13 @@ from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
# TODO: Tie LOCK_EXPIRATION with hard task timeout # TODO: Tie LOCK_EXPIRATION with hard task timeout
if CACHE_URI:
try:
cache_backend = get_cache(CACHE_URI)
except ImportError:
# TODO: display or log error
cache_backend = None
else:
cache_backend = None
def random_delay():
sleep(random() * (QUEUE_PROCESSING_INTERVAL - 1))
return True
if cache_backend:
acquire_lock = lambda lock_id: cache_backend.add(lock_id, u'true', LOCK_EXPIRE)
release_lock = lambda lock_id: cache_backend.delete(lock_id)
else:
acquire_lock = lambda lock_id: True
release_lock = lambda lock_id: True
def task_process_queue_document(queue_document_id): def task_process_queue_document(queue_document_id):
lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id) lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id)
if acquire_lock(lock_id): try:
lock = Lock.objects.acquire_lock(lock_id, LOCK_EXPIRE)
queue_document = QueueDocument.objects.get(pk=queue_document_id) queue_document = QueueDocument.objects.get(pk=queue_document_id)
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.node_name = platform.node() queue_document.node_name = platform.node()
#queue_document.result = task_process_queue_document.request.id
queue_document.save() queue_document.save()
try: try:
do_document_ocr(queue_document) do_document_ocr(queue_document)
@@ -59,7 +37,10 @@ def task_process_queue_document(queue_document_id):
queue_document.state = QUEUEDOCUMENT_STATE_ERROR queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e queue_document.result = e
queue_document.save() queue_document.save()
release_lock(lock_id)
lock.release()
except LockError:
pass
def reset_orphans(): def reset_orphans():
@@ -89,8 +70,6 @@ def reset_orphans():
def task_process_document_queues(): def task_process_document_queues():
if not cache_backend:
random_delay()
# reset_orphans() # reset_orphans()
# Causes problems with big clusters increased latency # Causes problems with big clusters increased latency
# Disabled until better solution # Disabled until better solution
@@ -108,8 +87,7 @@ def task_process_document_queues():
if oldest_queued_document_qs: if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
#task_process_queue_document.delay(oldest_queued_document.pk)
#task_process_queue_document(oldest_queued_document.pk)
process_job(task_process_queue_document, oldest_queued_document.pk) process_job(task_process_queue_document, oldest_queued_document.pk)
except Exception, e: except Exception, e:
print 'DocumentQueueWatcher exception: %s' % e pass
#print 'DocumentQueueWatcher exception: %s' % e