Use the new simple lock decorator to simplify a job processor task

This commit is contained in:
Roberto Rosario
2012-07-30 10:56:12 -04:00
parent 96a1b68221
commit 198538df4f

View File

@@ -1,55 +1,47 @@
from __future__ import absolute_import
from datetime import timedelta, datetime
import platform
import logging
import psutil
from lock_manager import Lock, LockError
from lock_manager.decorators import simple_locking
from .models import Node, JobQueue
from .exceptions import JobQueueNoPendingJobs
LOCK_EXPIRE = 10
# TODO: Tie LOCK_EXPIRATION with hard task timeout
MAX_CPU_LOAD = 90
MAX_MEMORY_USAGE = 90
logger = logging.getLogger(__name__)
@simple_locking('refresh_node', 10)
def refresh_node():
logger.debug('starting')
lock_id = u'refresh_node'
try:
logger.debug('trying to acquire lock: %s' % lock_id)
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
logger.debug('acquired lock: %s' % lock_id)
node = Node.objects.myself() # Automatically calls the refresh() method too
lock.release()
except LockError:
logger.debug('unable to obtain lock')
except Exception:
lock.release()
raise
node = Node.objects.myself() # Automatically calls the refresh() method too
def job_queue_poll():
logger.debug('starting')
lock_id = u'job_queue_poll'
try:
logger.debug('trying to acquire lock: %s' % lock_id)
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
logger.debug('acquired lock: %s' % lock_id)
for job_queue in JobQueue.objects.all():
try:
job_item = job_queue.get_oldest_pending_job()
job_item.run()
except JobQueueNoPendingJobs:
logger.debug('no pending jobs for job queue: %s' % job_queue)
lock.release()
except LockError:
logger.debug('unable to obtain lock')
except Exception:
lock.release()
raise
node = Node.objects.myself() # Automatically calls the refresh() method too
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE:
# Poll job queues is node is not overloaded
lock_id = u'job_queue_poll'
try:
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
except LockError:
pass
except Exception:
lock.release()
raise
else:
for job_queue in JobQueue.objects.all():
try:
job_item = job_queue.get_oldest_pending_job()
job_item.run()
except JobQueueNoPendingJobs:
logger.debug('no pending jobs for job queue: %s' % job_queue)
lock.release()