Add support for starting and stopping a job queue
This commit is contained in:
@@ -15,7 +15,7 @@ from .models import JobQueue, JobProcessingConfig
|
||||
from .tasks import job_queue_poll
|
||||
from .links import (node_workers, job_queues, tool_link,
|
||||
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
|
||||
job_queue_config_edit, setup_link)
|
||||
job_queue_config_edit, setup_link, job_queue_start, job_queue_stop)
|
||||
|
||||
|
||||
@transaction.commit_on_success
|
||||
@@ -33,7 +33,7 @@ add_job_queue_jobs()
|
||||
register_tool(tool_link)
|
||||
register_setup(setup_link)
|
||||
bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
|
||||
bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error])
|
||||
bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error])
|
||||
bind_links([Node], [node_workers])
|
||||
bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu')
|
||||
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
#class WorkerAlreadyDisabled(Exception):
|
||||
# pass
|
||||
|
||||
|
||||
#class WorkerAlreadyEnabled(Exception):
|
||||
# pass
|
||||
|
||||
|
||||
class JobQueuePushError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class JobQueueNoPendingJobs(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class JobQueueAlreadyStarted(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class JobQueueAlreadyStopped(Exception):
|
||||
pass
|
||||
|
||||
@@ -6,7 +6,16 @@ from navigation.api import Link
|
||||
|
||||
from clustering.permissions import PERMISSION_NODES_VIEW
|
||||
|
||||
from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION
|
||||
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
|
||||
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP)
|
||||
|
||||
|
||||
def is_running(context):
|
||||
return context['object'].is_running()
|
||||
|
||||
def is_not_running(context):
|
||||
return not context['object'].is_running()
|
||||
|
||||
|
||||
node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW])
|
||||
tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW])
|
||||
@@ -15,5 +24,8 @@ job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pe
|
||||
job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
|
||||
job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
|
||||
|
||||
job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', sprite='control_play_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running)
|
||||
job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', sprite='control_stop_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running)
|
||||
|
||||
job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
|
||||
setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
|
||||
|
||||
@@ -18,4 +18,12 @@ JOB_STATE_CHOICES = (
|
||||
(JOB_STATE_ERROR, _(u'error')),
|
||||
)
|
||||
|
||||
JOB_QUEUE_STATE_STOPPED = 's'
|
||||
JOB_QUEUE_STATE_STARTED = 'r'
|
||||
|
||||
JOB_QUEUE_STATE_CHOICES = (
|
||||
(JOB_QUEUE_STATE_STOPPED, _(u'stopped')),
|
||||
(JOB_QUEUE_STATE_STARTED, _(u'started')),
|
||||
)
|
||||
|
||||
DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import datetime
|
||||
from south.db import db
|
||||
from south.v2 import SchemaMigration
|
||||
from django.db import models
|
||||
|
||||
|
||||
class Migration(SchemaMigration):
|
||||
|
||||
def forwards(self, orm):
|
||||
# Adding field 'JobQueue.state'
|
||||
db.add_column('job_processor_jobqueue', 'state',
|
||||
self.gf('django.db.models.fields.CharField')(default='r', max_length=4),
|
||||
keep_default=False)
|
||||
|
||||
|
||||
def backwards(self, orm):
|
||||
# Deleting field 'JobQueue.state'
|
||||
db.delete_column('job_processor_jobqueue', 'state')
|
||||
|
||||
|
||||
models = {
|
||||
'clustering.node': {
|
||||
'Meta': {'object_name': 'Node'},
|
||||
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}),
|
||||
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}),
|
||||
'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'})
|
||||
},
|
||||
'job_processor.jobprocessingconfig': {
|
||||
'Meta': {'object_name': 'JobProcessingConfig'},
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}),
|
||||
'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'})
|
||||
},
|
||||
'job_processor.jobqueue': {
|
||||
'Meta': {'object_name': 'JobQueue'},
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
|
||||
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}),
|
||||
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
|
||||
},
|
||||
'job_processor.jobqueueitem': {
|
||||
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
|
||||
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
|
||||
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
|
||||
'kwargs': ('django.db.models.fields.TextField', [], {}),
|
||||
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
|
||||
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
|
||||
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
|
||||
},
|
||||
'job_processor.worker': {
|
||||
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'},
|
||||
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)'}),
|
||||
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}),
|
||||
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
|
||||
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}),
|
||||
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
|
||||
'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}),
|
||||
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
|
||||
}
|
||||
}
|
||||
|
||||
complete_apps = ['job_processor']
|
||||
@@ -21,9 +21,11 @@ from clustering.models import Node
|
||||
|
||||
from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
|
||||
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
|
||||
WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL)
|
||||
from .exceptions import JobQueuePushError, JobQueueNoPendingJobs
|
||||
#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled)
|
||||
WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL,
|
||||
JOB_QUEUE_STATE_STOPPED, JOB_QUEUE_STATE_STARTED,
|
||||
JOB_QUEUE_STATE_CHOICES)
|
||||
from .exceptions import (JobQueuePushError, JobQueueNoPendingJobs,
|
||||
JobQueueAlreadyStarted, JobQueueAlreadyStopped)
|
||||
|
||||
job_queue_labels = {}
|
||||
job_types_registry = {}
|
||||
@@ -83,7 +85,11 @@ class JobQueue(models.Model):
|
||||
# Internal name
|
||||
name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True)
|
||||
unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True)
|
||||
|
||||
state = models.CharField(max_length=4,
|
||||
choices=JOB_QUEUE_STATE_CHOICES,
|
||||
default=JOB_QUEUE_STATE_STARTED,
|
||||
verbose_name=_(u'state'))
|
||||
|
||||
objects = JobQueueManager()
|
||||
|
||||
def __unicode__(self):
|
||||
@@ -136,22 +142,22 @@ class JobQueue(models.Model):
|
||||
job_queue_labels[self.name] = label
|
||||
return super(JobQueue, self).save(*args, **kwargs)
|
||||
|
||||
#def disable(self):
|
||||
# if self.state == WORKER_STATE_DISABLED:
|
||||
# raise WorkerAlreadyDisabled
|
||||
#
|
||||
# self.state = WORKER_STATE_DISABLED
|
||||
# self.save()
|
||||
#
|
||||
#def enable(self):
|
||||
# if self.state == WORKER_STATE_ENABLED:
|
||||
# raise WorkerAlreadyEnabled
|
||||
#
|
||||
# self.state = WORKER_STATE_ENABLED
|
||||
# self.save()
|
||||
#
|
||||
#def is_enabled(self):
|
||||
# return self.state == WORKER_STATE_ENABLED
|
||||
def stop(self):
|
||||
if self.state == JOB_QUEUE_STATE_STOPPED:
|
||||
raise JobQueueAlreadyStopped
|
||||
|
||||
self.state = JOB_QUEUE_STATE_STOPPED
|
||||
self.save()
|
||||
|
||||
def start(self):
|
||||
if self.state == JOB_QUEUE_STATE_STARTED:
|
||||
raise JobQueueAlreadyStarted
|
||||
|
||||
self.state = JOB_QUEUE_STATE_STARTED
|
||||
self.save()
|
||||
|
||||
def is_running(self):
|
||||
return self.state == JOB_QUEUE_STATE_STARTED
|
||||
|
||||
# TODO: custom runtime methods
|
||||
|
||||
|
||||
@@ -7,3 +7,4 @@ from permissions.models import PermissionNamespace, Permission
|
||||
namespace = PermissionNamespace('job_processor', _(u'Job processor'))
|
||||
PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster'))
|
||||
PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster'))
|
||||
PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster'))
|
||||
|
||||
|
Before Width: | Height: | Size: 1.9 KiB After Width: | Height: | Size: 1.9 KiB |
|
Before Width: | Height: | Size: 1.9 KiB After Width: | Height: | Size: 1.9 KiB |
@@ -7,6 +7,7 @@ from clustering.models import Node
|
||||
|
||||
from .models import JobQueue
|
||||
from .exceptions import JobQueueNoPendingJobs
|
||||
from .literals import JOB_QUEUE_STATE_STARTED
|
||||
|
||||
LOCK_EXPIRE = 10
|
||||
MAX_CPU_LOAD = 90.0
|
||||
@@ -30,7 +31,7 @@ def job_queue_poll():
|
||||
lock.release()
|
||||
raise
|
||||
else:
|
||||
for job_queue in JobQueue.objects.all():
|
||||
for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED):
|
||||
try:
|
||||
job_item = job_queue.get_oldest_pending_job()
|
||||
job_item.run()
|
||||
|
||||
@@ -7,5 +7,8 @@ urlpatterns = patterns('job_processor.views',
|
||||
url(r'^queue/(?P<job_queue_pk>\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'),
|
||||
url(r'^queue/(?P<job_queue_pk>\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'),
|
||||
url(r'^queue/(?P<job_queue_pk>\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'),
|
||||
url(r'^queue/(?P<job_queue_pk>\d+)/start/$', 'job_queue_start', (), 'job_queue_start'),
|
||||
url(r'^queue/(?P<job_queue_pk>\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'),
|
||||
|
||||
url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'),
|
||||
)
|
||||
|
||||
@@ -1,22 +1,23 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.shortcuts import render_to_response
|
||||
from django.contrib import messages
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.http import Http404, HttpResponseRedirect
|
||||
from django.shortcuts import render_to_response, get_object_or_404
|
||||
from django.template import RequestContext
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db.models.loading import get_model
|
||||
from django.http import Http404
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from permissions.models import Permission
|
||||
from common.utils import encapsulate
|
||||
|
||||
from acls.models import AccessEntry
|
||||
from common.utils import encapsulate
|
||||
from clustering.permissions import PERMISSION_NODES_VIEW
|
||||
from clustering.models import Node
|
||||
from permissions.models import Permission
|
||||
|
||||
from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted
|
||||
from .forms import JobProcessingConfigForm
|
||||
from .models import JobQueue, JobProcessingConfig
|
||||
from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION
|
||||
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
|
||||
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP)
|
||||
|
||||
|
||||
def node_workers(request, node_pk):
|
||||
@@ -73,6 +74,10 @@ def job_queues(request):
|
||||
'title': _(u'job queue'),
|
||||
'hide_link': True,
|
||||
'extra_columns': [
|
||||
{
|
||||
'name': _(u'state'),
|
||||
'attribute': 'get_state_display',
|
||||
},
|
||||
{
|
||||
'name': _(u'pending jobs'),
|
||||
'attribute': 'pending_jobs.count',
|
||||
@@ -182,4 +187,66 @@ def job_queue_config_edit(request):
|
||||
'form': form,
|
||||
'object': job_processing_config,
|
||||
'title': _(u'Edit job processing configuration')
|
||||
}, context_instance=RequestContext(request))
|
||||
}, context_instance=RequestContext(request))
|
||||
|
||||
|
||||
def job_queue_stop(request, job_queue_pk):
|
||||
job_queue = get_object_or_404(JobQueue, pk=job_queue_pk)
|
||||
|
||||
try:
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_START_STOP])
|
||||
except PermissionDenied:
|
||||
AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_START_STOP, request.user, job_queue)
|
||||
|
||||
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
|
||||
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
|
||||
|
||||
if request.method == 'POST':
|
||||
try:
|
||||
job_queue.stop()
|
||||
except JobQueueAlreadyStopped:
|
||||
messages.warning(request, _(u'job queue already stopped.'))
|
||||
return HttpResponseRedirect(previous)
|
||||
else:
|
||||
messages.success(request, _(u'Job queue stopped successfully.'))
|
||||
return HttpResponseRedirect(next)
|
||||
|
||||
return render_to_response('generic_confirm.html', {
|
||||
'object': job_queue,
|
||||
'object_name': _(u'job queue'),
|
||||
'title': _(u'Are you sure you wish to stop job queue: %s?') % job_queue,
|
||||
'next': next,
|
||||
'previous': previous,
|
||||
'form_icon': u'control_stop_blue.png',
|
||||
}, context_instance=RequestContext(request))
|
||||
|
||||
|
||||
def job_queue_start(request, job_queue_pk):
|
||||
job_queue = get_object_or_404(JobQueue, pk=job_queue_pk)
|
||||
|
||||
try:
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW])
|
||||
except PermissionDenied:
|
||||
AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_VIEW, request.user, job_queue)
|
||||
|
||||
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
|
||||
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
|
||||
|
||||
if request.method == 'POST':
|
||||
try:
|
||||
job_queue.start()
|
||||
except JobQueueAlreadyStarted:
|
||||
messages.warning(request, _(u'job queue already started.'))
|
||||
return HttpResponseRedirect(previous)
|
||||
else:
|
||||
messages.success(request, _(u'Job queue started successfully.'))
|
||||
return HttpResponseRedirect(next)
|
||||
|
||||
return render_to_response('generic_confirm.html', {
|
||||
'object': job_queue,
|
||||
'object_name': _(u'job queue'),
|
||||
'title': _(u'Are you sure you wish to start job queue: %s?') % job_queue,
|
||||
'next': next,
|
||||
'previous': previous,
|
||||
'form_icon': u'control_play_blue.png',
|
||||
}, context_instance=RequestContext(request))
|
||||
|
||||
Reference in New Issue
Block a user