diff --git a/apps/main/views.py b/apps/main/views.py index cc1c40bebc..f2ff510bb7 100644 --- a/apps/main/views.py +++ b/apps/main/views.py @@ -101,6 +101,7 @@ def check_settings(request): {'name': 'OCR_REPLICATION_DELAY', 'value': ocr_settings.REPLICATION_DELAY}, {'name': 'OCR_PDFTOTEXT_PATH', 'value': ocr_settings.PDFTOTEXT_PATH, 'exists': True}, {'name': 'OCR_QUEUE_PROCESSING_INTERVAL', 'value': ocr_settings.QUEUE_PROCESSING_INTERVAL}, + {'name': 'OCR_CACHE_URI', 'value': ocr_settings.CACHE_URI}, # Search {'name': 'SEARCH_LIMIT', 'value': search_settings.LIMIT}, diff --git a/apps/ocr/conf/settings.py b/apps/ocr/conf/settings.py index 3b421f1e62..c1a745443d 100644 --- a/apps/ocr/conf/settings.py +++ b/apps/ocr/conf/settings.py @@ -8,3 +8,4 @@ NODE_CONCURRENT_EXECUTION = getattr(settings, 'OCR_NODE_CONCURRENT_EXECUTION', 1 AUTOMATIC_OCR = getattr(settings, 'OCR_AUTOMATIC_OCR', False) PDFTOTEXT_PATH = getattr(settings, 'OCR_PDFTOTEXT_PATH', u'/usr/bin/pdftotext') QUEUE_PROCESSING_INTERVAL = getattr(settings, 'OCR_QUEUE_PROCESSING_INTERVAL', 10) # In seconds +CACHE_URI = getattr(settings, 'OCR_CACHE_URI', None) diff --git a/apps/ocr/tasks.py b/apps/ocr/tasks.py index 7449cec525..20a726ad29 100644 --- a/apps/ocr/tasks.py +++ b/apps/ocr/tasks.py @@ -1,10 +1,12 @@ from datetime import timedelta, datetime import platform +from time import sleep +from random import random from django.db.models import Q from django.utils.translation import ugettext as _ -from django.core.cache.backends.locmem import CacheClass - +from django.core.cache import get_cache + from celery.decorators import task, periodic_task from celery.task.control import inspect @@ -16,25 +18,51 @@ from ocr.models import QueueDocument, DocumentQueue from ocr.conf.settings import NODE_CONCURRENT_EXECUTION from ocr.conf.settings import REPLICATION_DELAY from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL +from ocr.conf.settings import CACHE_URI -LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes -local_cache = CacheClass([], {}) +LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes +# TODO: Tie LOCK_EXPIRATION with hard task timeout + +if CACHE_URI: + try: + cache_backend = get_cache(CACHE_URI) + except ImportError: + # TODO: display or log error + cache_backend = None +else: + cache_backend = None + + +def random_delay(): + sleep(random() * (QUEUE_PROCESSING_INTERVAL - 1)) + return True + + +if cache_backend: + acquire_lock = lambda lock_id: cache_backend.add(lock_id, u'true', LOCK_EXPIRE) + release_lock = lambda lock_id: cache_backend.delete(lock_id) +else: + acquire_lock = lambda lock_id: True + release_lock = lambda lock_id: True @task def task_process_queue_document(queue_document_id): - queue_document = QueueDocument.objects.get(id=queue_document_id) - queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING - queue_document.node_name = platform.node() - queue_document.result = task_process_queue_document.request.id - queue_document.save() - try: - do_document_ocr(queue_document.document) - queue_document.delete() - except Exception, e: - queue_document.state = QUEUEDOCUMENT_STATE_ERROR - queue_document.result = e + lock_id = u'%s-lock-%d' % (u'task_process_queue_document', queue_document_id) + if acquire_lock(lock_id): + queue_document = QueueDocument.objects.get(pk=queue_document_id) + queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING + queue_document.node_name = platform.node() + queue_document.result = task_process_queue_document.request.id queue_document.save() + try: + do_document_ocr(queue_document.document) + queue_document.delete() + except Exception, e: + queue_document.state = QUEUEDOCUMENT_STATE_ERROR + queue_document.result = e + queue_document.save() + release_lock(lock_id) def reset_orphans(): @@ -62,26 +90,24 @@ def reset_orphans(): @periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL)) def task_process_document_queues(): - lock_id = '%s-lock-%s' % ('task_process_document_queues', platform.node()) - acquire_lock = lambda: local_cache.add(lock_id, 'true', LOCK_EXPIRE) - release_lock = lambda: local_cache.delete(lock_id) + if not cache_backend: + random_delay() + + reset_orphans() + q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) + q_delayed = Q(delay=True) + q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) + for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): + current_local_processing_count = QueueDocument.objects.filter( + state=QUEUEDOCUMENT_STATE_PROCESSING).filter( + node_name=platform.node()).count() + if current_local_processing_count < NODE_CONCURRENT_EXECUTION: + try: + oldest_queued_document_qs = document_queue.queuedocument_set.filter( + (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - if acquire_lock(): - reset_orphans() - q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING) - q_delayed = Q(delay=True) - q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY)) - for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE): - if QueueDocument.objects.filter( - state=QUEUEDOCUMENT_STATE_PROCESSING).filter( - node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION: - try: - oldest_queued_document_qs = document_queue.queuedocument_set.filter( - (q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval)) - - if oldest_queued_document_qs: - oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] - task_process_queue_document.delay(oldest_queued_document.id) - except Exception, e: - print 'DocumentQueueWatcher exception: %s' % e - release_lock() + if oldest_queued_document_qs: + oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0] + task_process_queue_document.delay(oldest_queued_document.pk) + except Exception, e: + print 'DocumentQueueWatcher exception: %s' % e diff --git a/settings.py b/settings.py index 808666fbc5..77f94d3d46 100644 --- a/settings.py +++ b/settings.py @@ -234,6 +234,7 @@ TEMPLATE_CONTEXT_PROCESSORS = ( #OCR_AUTOMATIC_OCR = False #OCR_PDFTOTEXT_PATH = u'/usr/bin/pdftotext' #OCR_QUEUE_PROCESSING_INTERVAL = 10 # In seconds +#OCR_CACHE_URI = None # Can be a single host (u'memcached://127.0.0.1:11211/'), or multiple separated by a semicolon #------------ Permissions -------------- #ROLES_DEFAULT_ROLES = [] #------------ Searching --------------