Implement local task locking using Django locmem cache backend

This commit is contained in:
Roberto Rosario
2011-04-25 16:41:42 -04:00
parent cd77a8ffdf
commit df60924ebb

View File

@@ -1,10 +1,9 @@
from datetime import timedelta, datetime
import platform
import time
import random
from django.db.models import Q
from django.utils.translation import ugettext as _
from django.core.cache.backends.locmem import CacheClass
from celery.decorators import task, periodic_task
from celery.task.control import inspect
@@ -18,13 +17,12 @@ from ocr.conf.settings import NODE_CONCURRENT_EXECUTION
from ocr.conf.settings import REPLICATION_DELAY
from ocr.conf.settings import QUEUE_PROCESSING_INTERVAL
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
local_cache = CacheClass([], {})
@task
def task_process_queue_document(queue_document_id):
queue_document = QueueDocument.objects.get(id=queue_document_id)
if queue_document.state == QUEUEDOCUMENT_STATE_PROCESSING:
#Recheck to avoid race condition
return
queue_document.state = QUEUEDOCUMENT_STATE_PROCESSING
queue_document.node_name = platform.node()
queue_document.result = task_process_queue_document.request.id
@@ -63,23 +61,26 @@ def reset_orphans():
@periodic_task(run_every=timedelta(seconds=QUEUE_PROCESSING_INTERVAL))
def task_process_document_queues():
reset_orphans()
#Introduce random 0 < t < 1 second delay to further reduce the
#chance of a race condition
time.sleep(random.random())
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
q_delayed = Q(delay=True)
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
if QueueDocument.objects.filter(
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
try:
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
lock_id = '%s-lock-%s' % ('task_process_document_queues', platform.node())
acquire_lock = lambda: local_cache.add(lock_id, 'true', LOCK_EXPIRE)
release_lock = lambda: local_cache.delete(lock_id)
if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
task_process_queue_document.delay(oldest_queued_document.id)
except Exception, e:
print 'DocumentQueueWatcher exception: %s' % e
if acquire_lock():
reset_orphans()
q_pending = Q(state=QUEUEDOCUMENT_STATE_PENDING)
q_delayed = Q(delay=True)
q_delay_interval = Q(datetime_submitted__lt=datetime.now() - timedelta(seconds=REPLICATION_DELAY))
for document_queue in DocumentQueue.objects.filter(state=DOCUMENTQUEUE_STATE_ACTIVE):
if QueueDocument.objects.filter(
state=QUEUEDOCUMENT_STATE_PROCESSING).filter(
node_name=platform.node()).count() < NODE_CONCURRENT_EXECUTION:
try:
oldest_queued_document_qs = document_queue.queuedocument_set.filter(
(q_pending & ~q_delayed) | (q_pending & q_delayed & q_delay_interval))
if oldest_queued_document_qs:
oldest_queued_document = oldest_queued_document_qs.order_by('datetime_submitted')[0]
task_process_queue_document.delay(oldest_queued_document.id)
except Exception, e:
print 'DocumentQueueWatcher exception: %s' % e
release_lock()