diff --git a/apps/job_processor/literals.py b/apps/job_processor/literals.py index 0101551ebf..b8a85fec51 100644 --- a/apps/job_processor/literals.py +++ b/apps/job_processor/literals.py @@ -28,3 +28,4 @@ JOB_QUEUE_STATE_CHOICES = ( DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2 DEFAULT_DEAD_JOB_REMOVAL_INTERVAL = 5 +DEFAULT_JOB_QUEUE_PRIORITY = 0 diff --git a/apps/job_processor/migrations/0010_auto__add_field_jobqueue_priority.py b/apps/job_processor/migrations/0010_auto__add_field_jobqueue_priority.py new file mode 100644 index 0000000000..4c9f3fc65a --- /dev/null +++ b/apps/job_processor/migrations/0010_auto__add_field_jobqueue_priority.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'JobQueue.priority' + db.add_column('job_processor_jobqueue', 'priority', + self.gf('django.db.models.fields.IntegerField')(default=0), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'JobQueue.priority' + db.delete_column('job_processor_jobqueue', 'priority') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'}) + }, + 'job_processor.jobprocessingconfig': { + 'Meta': {'object_name': 'JobProcessingConfig'}, + 'dead_job_removal_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '5'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'ordering': "('priority',)", 'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'priority': ('django.db.models.fields.IntegerField', [], {'default': '0'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'unique_together': "(('node', 'pid'),)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 5, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']", 'null': 'True', 'blank': 'True'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 4897bd403e..77a98a7e81 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -23,7 +23,8 @@ from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES, WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL, JOB_QUEUE_STATE_STOPPED, JOB_QUEUE_STATE_STARTED, - JOB_QUEUE_STATE_CHOICES, DEFAULT_DEAD_JOB_REMOVAL_INTERVAL) + JOB_QUEUE_STATE_CHOICES, DEFAULT_DEAD_JOB_REMOVAL_INTERVAL, + DEFAULT_JOB_QUEUE_PRIORITY) from .exceptions import (JobQueuePushError, JobQueueNoPendingJobs, JobQueueAlreadyStarted, JobQueueAlreadyStopped) @@ -85,7 +86,6 @@ class JobQueueManager(models.Manager): class JobQueue(models.Model): - # TODO: support for stopping and starting job queues # Internal name name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True) unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True) @@ -93,6 +93,7 @@ class JobQueue(models.Model): choices=JOB_QUEUE_STATE_CHOICES, default=JOB_QUEUE_STATE_STARTED, verbose_name=_(u'state')) + priority = models.IntegerField(default=DEFAULT_JOB_QUEUE_PRIORITY, verbose_name=_(u'priority')) objects = JobQueueManager() @@ -168,6 +169,7 @@ class JobQueue(models.Model): class Meta: verbose_name = _(u'job queue') verbose_name_plural = _(u'job queues') + ordering = ('priority',) class JobQueueItemManager(models.Manager): diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py index b3794d6970..1388a6dd76 100644 --- a/apps/job_processor/tasks.py +++ b/apps/job_processor/tasks.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import logging +import psutil from lock_manager import Lock, LockError from lock_manager.decorators import simple_locking @@ -13,7 +14,7 @@ from .literals import JOB_QUEUE_STATE_STARTED LOCK_EXPIRE = 10 MAX_CPU_LOAD = 90.0 MAX_MEMORY_USAGE = 90.0 -NODE_MAX_WORKERS = 1 +NODE_MAX_WORKERS = len(psutil.cpu_percent(interval=0.1, percpu=True)) # Get CPU/cores count logger = logging.getLogger(__name__) @@ -27,21 +28,19 @@ def job_queue_poll(): lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) except LockError: pass - except Exception: - lock.release() - raise else: node = Node.objects.myself() - if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count()<=NODE_MAX_WORKERS: + if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count() < NODE_MAX_WORKERS: for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED): try: job_item = job_queue.get_oldest_pending_job() job_item.run() + break; except JobQueueNoPendingJobs: logger.debug('no pending jobs for job queue: %s' % job_queue) else: logger.debug('CPU load or memory usage over limit') - + finally: lock.release() diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index f3b0e5a8ce..accce6db77 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -91,6 +91,10 @@ def job_queues(request): 'name': _(u'error jobs'), 'attribute': 'error_jobs.count', }, + { + 'name': _(u'priority'), + 'attribute': 'priority', + }, ], }