Add job queue priority support
This commit is contained in:
@@ -28,3 +28,4 @@ JOB_QUEUE_STATE_CHOICES = (
|
|||||||
|
|
||||||
DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2
|
DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2
|
||||||
DEFAULT_DEAD_JOB_REMOVAL_INTERVAL = 5
|
DEFAULT_DEAD_JOB_REMOVAL_INTERVAL = 5
|
||||||
|
DEFAULT_JOB_QUEUE_PRIORITY = 0
|
||||||
|
|||||||
@@ -0,0 +1,70 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import datetime
|
||||||
|
from south.db import db
|
||||||
|
from south.v2 import SchemaMigration
|
||||||
|
from django.db import models
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(SchemaMigration):
|
||||||
|
|
||||||
|
def forwards(self, orm):
|
||||||
|
# Adding field 'JobQueue.priority'
|
||||||
|
db.add_column('job_processor_jobqueue', 'priority',
|
||||||
|
self.gf('django.db.models.fields.IntegerField')(default=0),
|
||||||
|
keep_default=False)
|
||||||
|
|
||||||
|
|
||||||
|
def backwards(self, orm):
|
||||||
|
# Deleting field 'JobQueue.priority'
|
||||||
|
db.delete_column('job_processor_jobqueue', 'priority')
|
||||||
|
|
||||||
|
|
||||||
|
models = {
|
||||||
|
'clustering.node': {
|
||||||
|
'Meta': {'object_name': 'Node'},
|
||||||
|
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
|
||||||
|
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)', 'blank': 'True'}),
|
||||||
|
'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}),
|
||||||
|
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||||
|
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
|
||||||
|
'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'})
|
||||||
|
},
|
||||||
|
'job_processor.jobprocessingconfig': {
|
||||||
|
'Meta': {'object_name': 'JobProcessingConfig'},
|
||||||
|
'dead_job_removal_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '5'}),
|
||||||
|
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||||
|
'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}),
|
||||||
|
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'})
|
||||||
|
},
|
||||||
|
'job_processor.jobqueue': {
|
||||||
|
'Meta': {'ordering': "('priority',)", 'object_name': 'JobQueue'},
|
||||||
|
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||||
|
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
|
||||||
|
'priority': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
|
||||||
|
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}),
|
||||||
|
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
|
||||||
|
},
|
||||||
|
'job_processor.jobqueueitem': {
|
||||||
|
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
|
||||||
|
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
|
||||||
|
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||||
|
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
|
||||||
|
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
|
||||||
|
'kwargs': ('django.db.models.fields.TextField', [], {}),
|
||||||
|
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||||
|
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
|
||||||
|
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
|
||||||
|
},
|
||||||
|
'job_processor.worker': {
|
||||||
|
'Meta': {'ordering': "('creation_datetime',)", 'unique_together': "(('node', 'pid'),)", 'object_name': 'Worker'},
|
||||||
|
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)'}),
|
||||||
|
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)', 'blank': 'True'}),
|
||||||
|
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||||
|
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']", 'null': 'True', 'blank': 'True'}),
|
||||||
|
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
|
||||||
|
'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}),
|
||||||
|
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
complete_apps = ['job_processor']
|
||||||
@@ -23,7 +23,8 @@ from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
|
|||||||
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
|
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
|
||||||
WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL,
|
WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL,
|
||||||
JOB_QUEUE_STATE_STOPPED, JOB_QUEUE_STATE_STARTED,
|
JOB_QUEUE_STATE_STOPPED, JOB_QUEUE_STATE_STARTED,
|
||||||
JOB_QUEUE_STATE_CHOICES, DEFAULT_DEAD_JOB_REMOVAL_INTERVAL)
|
JOB_QUEUE_STATE_CHOICES, DEFAULT_DEAD_JOB_REMOVAL_INTERVAL,
|
||||||
|
DEFAULT_JOB_QUEUE_PRIORITY)
|
||||||
from .exceptions import (JobQueuePushError, JobQueueNoPendingJobs,
|
from .exceptions import (JobQueuePushError, JobQueueNoPendingJobs,
|
||||||
JobQueueAlreadyStarted, JobQueueAlreadyStopped)
|
JobQueueAlreadyStarted, JobQueueAlreadyStopped)
|
||||||
|
|
||||||
@@ -85,7 +86,6 @@ class JobQueueManager(models.Manager):
|
|||||||
|
|
||||||
|
|
||||||
class JobQueue(models.Model):
|
class JobQueue(models.Model):
|
||||||
# TODO: support for stopping and starting job queues
|
|
||||||
# Internal name
|
# Internal name
|
||||||
name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True)
|
name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True)
|
||||||
unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True)
|
unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True)
|
||||||
@@ -93,6 +93,7 @@ class JobQueue(models.Model):
|
|||||||
choices=JOB_QUEUE_STATE_CHOICES,
|
choices=JOB_QUEUE_STATE_CHOICES,
|
||||||
default=JOB_QUEUE_STATE_STARTED,
|
default=JOB_QUEUE_STATE_STARTED,
|
||||||
verbose_name=_(u'state'))
|
verbose_name=_(u'state'))
|
||||||
|
priority = models.IntegerField(default=DEFAULT_JOB_QUEUE_PRIORITY, verbose_name=_(u'priority'))
|
||||||
|
|
||||||
objects = JobQueueManager()
|
objects = JobQueueManager()
|
||||||
|
|
||||||
@@ -168,6 +169,7 @@ class JobQueue(models.Model):
|
|||||||
class Meta:
|
class Meta:
|
||||||
verbose_name = _(u'job queue')
|
verbose_name = _(u'job queue')
|
||||||
verbose_name_plural = _(u'job queues')
|
verbose_name_plural = _(u'job queues')
|
||||||
|
ordering = ('priority',)
|
||||||
|
|
||||||
|
|
||||||
class JobQueueItemManager(models.Manager):
|
class JobQueueItemManager(models.Manager):
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import psutil
|
||||||
|
|
||||||
from lock_manager import Lock, LockError
|
from lock_manager import Lock, LockError
|
||||||
from lock_manager.decorators import simple_locking
|
from lock_manager.decorators import simple_locking
|
||||||
@@ -13,7 +14,7 @@ from .literals import JOB_QUEUE_STATE_STARTED
|
|||||||
LOCK_EXPIRE = 10
|
LOCK_EXPIRE = 10
|
||||||
MAX_CPU_LOAD = 90.0
|
MAX_CPU_LOAD = 90.0
|
||||||
MAX_MEMORY_USAGE = 90.0
|
MAX_MEMORY_USAGE = 90.0
|
||||||
NODE_MAX_WORKERS = 1
|
NODE_MAX_WORKERS = len(psutil.cpu_percent(interval=0.1, percpu=True)) # Get CPU/cores count
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -27,21 +28,19 @@ def job_queue_poll():
|
|||||||
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||||
except LockError:
|
except LockError:
|
||||||
pass
|
pass
|
||||||
except Exception:
|
|
||||||
lock.release()
|
|
||||||
raise
|
|
||||||
else:
|
else:
|
||||||
node = Node.objects.myself()
|
node = Node.objects.myself()
|
||||||
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count()<=NODE_MAX_WORKERS:
|
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count() < NODE_MAX_WORKERS:
|
||||||
for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED):
|
for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED):
|
||||||
try:
|
try:
|
||||||
job_item = job_queue.get_oldest_pending_job()
|
job_item = job_queue.get_oldest_pending_job()
|
||||||
job_item.run()
|
job_item.run()
|
||||||
|
break;
|
||||||
except JobQueueNoPendingJobs:
|
except JobQueueNoPendingJobs:
|
||||||
logger.debug('no pending jobs for job queue: %s' % job_queue)
|
logger.debug('no pending jobs for job queue: %s' % job_queue)
|
||||||
else:
|
else:
|
||||||
logger.debug('CPU load or memory usage over limit')
|
logger.debug('CPU load or memory usage over limit')
|
||||||
|
finally:
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,10 @@ def job_queues(request):
|
|||||||
'name': _(u'error jobs'),
|
'name': _(u'error jobs'),
|
||||||
'attribute': 'error_jobs.count',
|
'attribute': 'error_jobs.count',
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
'name': _(u'priority'),
|
||||||
|
'attribute': 'priority',
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user