Made the concurrent ocr code more granular, per node, every node can handle different amounts of concurrent ocr tasks
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from datetime import date, timedelta, datetime
|
||||
import platform
|
||||
|
||||
from django.db.models import Q
|
||||
|
||||
@@ -12,7 +13,7 @@ from literals import QUEUEDOCUMENT_STATE_PENDING, \
|
||||
QUEUEDOCUMENT_STATE_PROCESSING, DOCUMENTQUEUE_STATE_ACTIVE, \
|
||||
QUEUEDOCUMENT_STATE_ERROR
|
||||
from models import QueueDocument, DocumentQueue
|
||||
from ocr.conf.settings import MAX_CONCURRENT_EXECUTION
|
||||
from ocr.conf.settings import NODE_CONCURRENT_EXECUTION
|
||||
from ocr.conf.settings import REPLICATION_DELAY
|
||||
|
||||
|
||||
@@ -20,6 +21,7 @@ from ocr.conf.settings import REPLICATION_DELAY
|
||||
def task_process_queue_document(queue_document_id):
|
||||
queue_document = QueueDocument.objects.get(id=queue_document_id)
|
||||
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
|
||||
queue_document.node_name = platform.node()
|
||||
queue_document.save()
|
||||
try:
|
||||
do_document_ocr(queue_document.document)
|
||||
@@ -42,8 +44,9 @@ class DocumentQueueWatcher(PeriodicTask):
|
||||
q_delay_interval = Q(datetime_submitted__lt=datetime.now()-timedelta(seconds=REPLICATION_DELAY))
|
||||
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
|
||||
logger.debug('Analysing queue: %s' % document_queue)
|
||||
current_running_queues = QueueDocument.objects.filter(state=QUEUEDOCUMENT_STATE_PROCESSING).count()
|
||||
if current_running_queues < MAX_CONCURRENT_EXECUTION:
|
||||
if QueueDocument.objects.filter(
|
||||
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
|
||||
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
|
||||
try:
|
||||
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
|
||||
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
|
||||
|
||||
Reference in New Issue
Block a user