Made the OCR cache backend used for locking configurable, move ocr locking to queued document from periodic task, added again a random delay fallback in case no cache backend is used

This commit is contained in:
Roberto Rosario
2011-05-06 15:31:49 -04:00
parent 472a55f5f3
commit 7469fe991f
4 changed files with 66 additions and 37 deletions

View File

@@ -101,6 +101,7 @@ def check_settings(request):
{'name': 'OCR_REPLICATION_DELAY', 'value': ocr_settings.REPLICATION_DELAY},
{'name': 'OCR_PDFTOTEXT_PATH', 'value': ocr_settings.PDFTOTEXT_PATH, 'exists': True},
{'name': 'OCR_QUEUE_PROCESSING_INTERVAL', 'value': ocr_settings.QUEUE_PROCESSING_INTERVAL},
{'name': 'OCR_CACHE_URI', 'value': ocr_settings.CACHE_URI},
# Search
{'name': 'SEARCH_LIMIT', 'value': search_settings.LIMIT},

View File

@@ -8,3 +8,4 @@ NODE_CONCURRENT_EXECUTION = getattr(settings, 'OCR_NODE_CONCURRENT_EXECUTION', 1
AUTOMATIC_OCR = getattr(settings, 'OCR_AUTOMATIC_OCR', False)
PDFTOTEXT_PATH = getattr(settings, 'OCR_PDFTOTEXT_PATH', u'/usr/bin/pdftotext')
QUEUE_PROCESSING_INTERVAL = getattr(settings, 'OCR_QUEUE_PROCESSING_INTERVAL', 10) # In seconds
CACHE_URI = getattr(settings, 'OCR_CACHE_URI', None)

View File

@@ -1,10 +1,12 @@
from datetime import timedelta, datetime
import platform
from time import sleep
from random import random
from django.db.models import Q
from django.utils.translation import ugettext as _
from django.core.cache.backends.locmem import CacheClass
from django.core.cache import get_cache
from celery.decorators import task, periodic_task
from celery.task.control import inspect
@@ -16,25 +18,51 @@ from ocr.models import QueueDocument, DocumentQueue
from ocr.conf.settings import NODE_CONCURRENT_EXECUTION
from ocr.conf.settings import REPLICATION_DELAY
from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
from ocr.conf.settings import CACHE_URI
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
local_cache = CacheClass([], {})
LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
# TODO: Tie LOCK_EXPIRATION with hard task timeout
if CACHE_URI:
try:
cache_backend = get_cache(CACHE_URI)
except ImportError:
# TODO: display or log error
cache_backend = None
else:
cache_backend = None
def random_delay():
sleep(random() * (QUEUE_PROCESSING_INTERVAL - 1))
return True
if cache_backend:
acquire_lock = lambda lock_id: cache_backend.add(lock_id, u'true', LOCK_EXPIRE)
release_lock = lambda lock_id: cache_backend.delete(lock_id)
else:
acquire_lock = lambda lock_id: True
release_lock = lambda lock_id: True
@task
def task_process_queue_document(queue_document_id):
queue_document = QueueDocument.objects.get(id=queue_document_id)
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.node_name = platform.node()
queue_document.result = task_process_queue_document.request.id
queue_document.save()
try:
do_document_ocr(queue_document.document)
queue_document.delete()
except Exception, e:
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e
lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id)
if acquire_lock(lock_id):
queue_document = QueueDocument.objects.get(pk=queue_document_id)
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.node_name = platform.node()
queue_document.result = task_process_queue_document.request.id
queue_document.save()
try:
do_document_ocr(queue_document.document)
queue_document.delete()
except Exception, e:
queue_document.state = QUEUEDOCUMENT_STATE_ERROR
queue_document.result = e
queue_document.save()
release_lock(lock_id)
def reset_orphans():
@@ -62,26 +90,24 @@ def reset_orphans():
@periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL))
def task_process_document_queues():
lock_id = '%s-lock-%s' % ('task_process_document_queues', platform.node())
acquire_lock = lambda: local_cache.add(lock_id, 'true', LOCK_EXPIRE)
release_lock = lambda: local_cache.delete(lock_id)
if not cache_backend:
random_delay()
reset_orphans()
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
q_delayed = Q(delay=True)
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
current_local_processing_count = QueueDocument.objects.filter(
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
node_name=platform.node()).count()
if current_local_processing_count < NODE_CONCURRENT_EXECUTION:
try:
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
if acquire_lock():
reset_orphans()
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
q_delayed = Q(delay=True)
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
if QueueDocument.objects.filter(
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
try:
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
task_process_queue_document.delay(oldest_queued_document.id)
except Exception, e:
print 'DocumentQueueWatcher exception: %s' % e
release_lock()
if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
task_process_queue_document.delay(oldest_queued_document.pk)
except Exception, e:
print 'DocumentQueueWatcher exception: %s' % e

View File

@@ -234,6 +234,7 @@ TEMPLATE_CONTEXT_PROCESSORS = (
#OCR_AUTOMATIC_OCR = False
#OCR_PDFTOTEXT_PATH = u'/usr/bin/pdftotext'
#OCR_QUEUE_PROCESSING_INTERVAL = 10 # In seconds
#OCR_CACHE_URI = None # Can be a single host (u'memcached://127.0.0.1:11211/'), or multiple separated by a semicolon
#------------ Permissions --------------
#ROLES_DEFAULT_ROLES = []
#------------ Searching --------------