Add view, permission, links to edit global job processing config

This commit is contained in:
Roberto Rosario
2012-08-01 04:15:11 -04:00
parent 833e149921
commit f43f496241
9 changed files with 161 additions and 37 deletions

View File

@@ -1,31 +1,41 @@
from __future__ import absolute_import
from django.db import transaction, DatabaseError
from django.utils.translation import ugettext_lazy as _
from scheduler.api import LocalScheduler
from navigation.api import bind_links, register_model_list_columns
from project_tools.api import register_tool
from project_setup.api import register_setup
from common.utils import encapsulate
from clustering.models import Node
from .models import JobQueue
from .models import JobQueue, JobProcessingConfig
from .tasks import job_queue_poll
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active)
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
job_queue_config_edit, setup_link)
#TODO: fix this, make it cluster wide
JOB_QUEUE_POLL_INTERVAL = 1
job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor'))
job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL)
job_processor_scheduler.start()
@transaction.commit_on_success
def add_job_queue_jobs():
job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor'))
try:
job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JobProcessingConfig.get().job_queue_poll_interval)
except DatabaseError:
transaction.rollback()
job_processor_scheduler.start()
add_job_queue_jobs()
register_tool(tool_link)
register_setup(setup_link)
bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error])
bind_links([Node], [node_workers])
bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu')
Node.add_to_class('workers', lambda node: node.worker_set)

View File

@@ -0,0 +1,10 @@
from __future__ import absolute_import
from django import forms
from .models import JobProcessingConfig
class JobProcessingConfigForm(forms.ModelForm):
class Meta:
model = JobProcessingConfig

View File

@@ -6,13 +6,14 @@ from navigation.api import Link
from clustering.permissions import PERMISSION_NODES_VIEW
from .permissions import PERMISSION_JOB_QUEUE_VIEW
from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION
node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW])
tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])

View File

@@ -17,3 +17,5 @@ JOB_STATE_CHOICES = (
(JOB_STATE_PROCESSING, _(u'processing')),
(JOB_STATE_ERROR, _(u'error')),
)
DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'JobProcessingConfig'
db.create_table('job_processor_jobprocessingconfig', (
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)),
('job_queue_poll_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=2)),
))
db.send_create_signal('job_processor', ['JobProcessingConfig'])
def backwards(self, orm):
# Deleting model 'JobProcessingConfig'
db.delete_table('job_processor_jobprocessingconfig')
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'})
},
'job_processor.jobprocessingconfig': {
'Meta': {'object_name': 'JobProcessingConfig'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}),
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'})
},
'job_processor.jobqueue': {
'Meta': {'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -21,7 +21,7 @@ from clustering.models import Node
from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
WORKER_STATE_RUNNING)
WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL)
from .exceptions import JobQueuePushError, JobQueueNoPendingJobs
#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled)
@@ -135,6 +135,23 @@ class JobQueue(models.Model):
if label:
job_queue_labels[self.name] = label
return super(JobQueue, self).save(*args, **kwargs)
#def disable(self):
# if self.state == WORKER_STATE_DISABLED:
# raise WorkerAlreadyDisabled
#
# self.state = WORKER_STATE_DISABLED
# self.save()
#
#def enable(self):
# if self.state == WORKER_STATE_ENABLED:
# raise WorkerAlreadyEnabled
#
# self.state = WORKER_STATE_ENABLED
# self.save()
#
#def is_enabled(self):
# return self.state == WORKER_STATE_ENABLED
# TODO: custom runtime methods
@@ -206,36 +223,17 @@ class Worker(models.Model):
def __unicode__(self):
return u'%s-%s' % (self.node.hostname, self.pid)
#def disable(self):
# if self.state == WORKER_STATE_DISABLED:
# raise WorkerAlreadyDisabled
#
# self.state = WORKER_STATE_DISABLED
# self.save()
#
#def enable(self):
# if self.state == WORKER_STATE_ENABLED:
# raise WorkerAlreadyEnabled
#
# self.state = WORKER_STATE_ENABLED
# self.save()
#
#def is_enabled(self):
# return self.state == WORKER_STATE_ENABLED
class Meta:
ordering = ('creation_datetime',)
verbose_name = _(u'worker')
verbose_name_plural = _(u'workers')
"""
class JobProcessingConfig(Singleton):
worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead
worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval')
job_queue_poll_interval = models.PositiveIntegerField(verbose_name=(u'job queue poll interval (in seconds)'), default=DEFAULT_JOB_QUEUE_POLL_INTERVAL)
def __unicode__(self):
return ugettext('Workers configuration')
return ugettext(u'Job queues configuration')
class Meta:
verbose_name = verbose_name_plural = _(u'Workers configuration')
"""
verbose_name = verbose_name_plural = _(u'job queues configuration')

View File

@@ -6,3 +6,4 @@ from permissions.models import PermissionNamespace, Permission
namespace = PermissionNamespace('job_processor', _(u'Job processor'))
PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster'))
PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster'))

View File

@@ -7,4 +7,5 @@ urlpatterns = patterns('job_processor.views',
url(r'^queue/(?P<job_queue_pk>\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'),
url(r'^queue/(?P<job_queue_pk>\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'),
url(r'^queue/(?P<job_queue_pk>\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'),
url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'),
)

View File

@@ -14,8 +14,9 @@ from acls.models import AccessEntry
from clustering.permissions import PERMISSION_NODES_VIEW
from clustering.models import Node
from .models import JobQueue
from .permissions import PERMISSION_JOB_QUEUE_VIEW
from .forms import JobProcessingConfigForm
from .models import JobQueue, JobProcessingConfig
from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION
def node_workers(request, node_pk):
@@ -151,3 +152,34 @@ def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False,
return render_to_response('generic_list.html', context,
context_instance=RequestContext(request))
def job_queue_config_edit(request):
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_PROCESSING_CONFIGURATION])
job_processing_config = JobProcessingConfig.get()
post_action_redirect = None
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/')))
next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/')))
if request.method == 'POST':
form = JobProcessingConfigForm(data=request.POST)
if form.is_valid():
try:
form.save()
except Exception, exc:
messages.error(request, _(u'Error trying to edit job processing configuration; %s') % exc)
else:
messages.success(request, _(u'Job processing configuration edited successfully.'))
return HttpResponseRedirect(next)
else:
form = JobProcessingConfigForm(instance=job_processing_config)
return render_to_response('generic_form.html', {
'form': form,
'object': job_processing_config,
'title': _(u'Edit job processing configuration')
}, context_instance=RequestContext(request))