diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index ecda0f6ff5..1568baa723 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -15,7 +15,7 @@ from .models import JobQueue, JobProcessingConfig from .tasks import job_queue_poll from .links import (node_workers, job_queues, tool_link, job_queue_items_pending, job_queue_items_error, job_queue_items_active, - job_queue_config_edit, setup_link) + job_queue_config_edit, setup_link, job_queue_start, job_queue_stop) @transaction.commit_on_success @@ -33,7 +33,7 @@ add_job_queue_jobs() register_tool(tool_link) register_setup(setup_link) bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') -bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error]) +bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error]) bind_links([Node], [node_workers]) bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') diff --git a/apps/job_processor/exceptions.py b/apps/job_processor/exceptions.py index bac36e7b37..dae8150925 100644 --- a/apps/job_processor/exceptions.py +++ b/apps/job_processor/exceptions.py @@ -1,14 +1,14 @@ -#class WorkerAlreadyDisabled(Exception): -# pass - - -#class WorkerAlreadyEnabled(Exception): -# pass - - class JobQueuePushError(Exception): pass class JobQueueNoPendingJobs(Exception): pass + + +class JobQueueAlreadyStarted(Exception): + pass + + +class JobQueueAlreadyStopped(Exception): + pass diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index 30ce533b24..b17ee06312 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -6,7 +6,16 @@ from navigation.api import Link from clustering.permissions import PERMISSION_NODES_VIEW -from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION +from .permissions import (PERMISSION_JOB_QUEUE_VIEW, + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) + + +def is_running(context): + return context['object'].is_running() + +def is_not_running(context): + return not context['object'].is_running() + node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) @@ -15,5 +24,8 @@ job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pe job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', sprite='control_play_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running) +job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', sprite='control_stop_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running) + job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) diff --git a/apps/job_processor/literals.py b/apps/job_processor/literals.py index 7a0afb2535..73a05ca00a 100644 --- a/apps/job_processor/literals.py +++ b/apps/job_processor/literals.py @@ -18,4 +18,12 @@ JOB_STATE_CHOICES = ( (JOB_STATE_ERROR, _(u'error')), ) +JOB_QUEUE_STATE_STOPPED = 's' +JOB_QUEUE_STATE_STARTED = 'r' + +JOB_QUEUE_STATE_CHOICES = ( + (JOB_QUEUE_STATE_STOPPED, _(u'stopped')), + (JOB_QUEUE_STATE_STARTED, _(u'started')), +) + DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2 diff --git a/apps/job_processor/migrations/0006_auto__add_field_jobqueue_state.py b/apps/job_processor/migrations/0006_auto__add_field_jobqueue_state.py new file mode 100644 index 0000000000..e35fa3df06 --- /dev/null +++ b/apps/job_processor/migrations/0006_auto__add_field_jobqueue_state.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding field 'JobQueue.state' + db.add_column('job_processor_jobqueue', 'state', + self.gf('django.db.models.fields.CharField')(default='r', max_length=4), + keep_default=False) + + + def backwards(self, orm): + # Deleting field 'JobQueue.state' + db.delete_column('job_processor_jobqueue', 'state') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}) + }, + 'job_processor.jobprocessingconfig': { + 'Meta': {'object_name': 'JobProcessingConfig'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 5ab68ba0c9..6db0ae34bd 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -21,9 +21,11 @@ from clustering.models import Node from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES, - WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL) -from .exceptions import JobQueuePushError, JobQueueNoPendingJobs -#from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled) + WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL, + JOB_QUEUE_STATE_STOPPED, JOB_QUEUE_STATE_STARTED, + JOB_QUEUE_STATE_CHOICES) +from .exceptions import (JobQueuePushError, JobQueueNoPendingJobs, + JobQueueAlreadyStarted, JobQueueAlreadyStopped) job_queue_labels = {} job_types_registry = {} @@ -83,7 +85,11 @@ class JobQueue(models.Model): # Internal name name = models.CharField(max_length=32, verbose_name=_(u'name'), unique=True) unique_jobs = models.BooleanField(verbose_name=_(u'unique jobs'), default=True) - + state = models.CharField(max_length=4, + choices=JOB_QUEUE_STATE_CHOICES, + default=JOB_QUEUE_STATE_STARTED, + verbose_name=_(u'state')) + objects = JobQueueManager() def __unicode__(self): @@ -136,22 +142,22 @@ class JobQueue(models.Model): job_queue_labels[self.name] = label return super(JobQueue, self).save(*args, **kwargs) - #def disable(self): - # if self.state == WORKER_STATE_DISABLED: - # raise WorkerAlreadyDisabled - # - # self.state = WORKER_STATE_DISABLED - # self.save() - # - #def enable(self): - # if self.state == WORKER_STATE_ENABLED: - # raise WorkerAlreadyEnabled - # - # self.state = WORKER_STATE_ENABLED - # self.save() - # - #def is_enabled(self): - # return self.state == WORKER_STATE_ENABLED + def stop(self): + if self.state == JOB_QUEUE_STATE_STOPPED: + raise JobQueueAlreadyStopped + + self.state = JOB_QUEUE_STATE_STOPPED + self.save() + + def start(self): + if self.state == JOB_QUEUE_STATE_STARTED: + raise JobQueueAlreadyStarted + + self.state = JOB_QUEUE_STATE_STARTED + self.save() + + def is_running(self): + return self.state == JOB_QUEUE_STATE_STARTED # TODO: custom runtime methods diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index fac0392d5c..4ae086c630 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -7,3 +7,4 @@ from permissions.models import PermissionNamespace, Permission namespace = PermissionNamespace('job_processor', _(u'Job processor')) PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster')) +PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster')) diff --git a/apps/main/static/images/icons/control_play_blue.png b/apps/job_processor/static/images/icons/control_play_blue.png similarity index 100% rename from apps/main/static/images/icons/control_play_blue.png rename to apps/job_processor/static/images/icons/control_play_blue.png diff --git a/apps/main/static/images/icons/control_stop_blue.png b/apps/job_processor/static/images/icons/control_stop_blue.png similarity index 100% rename from apps/main/static/images/icons/control_stop_blue.png rename to apps/job_processor/static/images/icons/control_stop_blue.png diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py index fac91c03d3..b27d6dc223 100644 --- a/apps/job_processor/tasks.py +++ b/apps/job_processor/tasks.py @@ -7,6 +7,7 @@ from clustering.models import Node from .models import JobQueue from .exceptions import JobQueueNoPendingJobs +from .literals import JOB_QUEUE_STATE_STARTED LOCK_EXPIRE = 10 MAX_CPU_LOAD = 90.0 @@ -30,7 +31,7 @@ def job_queue_poll(): lock.release() raise else: - for job_queue in JobQueue.objects.all(): + for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED): try: job_item = job_queue.get_oldest_pending_job() job_item.run() diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index 751408def3..75b38fe3e2 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -7,5 +7,8 @@ urlpatterns = patterns('job_processor.views', url(r'^queue/(?P\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'), url(r'^queue/(?P\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'), url(r'^queue/(?P\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), + url(r'^queue/(?P\d+)/start/$', 'job_queue_start', (), 'job_queue_start'), + url(r'^queue/(?P\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'), + url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 7c71c16ff6..5feb983311 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -1,22 +1,23 @@ from __future__ import absolute_import -from django.shortcuts import render_to_response +from django.contrib import messages +from django.core.exceptions import PermissionDenied +from django.http import Http404, HttpResponseRedirect +from django.shortcuts import render_to_response, get_object_or_404 from django.template import RequestContext from django.utils.translation import ugettext_lazy as _ -from django.shortcuts import get_object_or_404 -from django.contrib.contenttypes.models import ContentType -from django.db.models.loading import get_model -from django.http import Http404 -from django.core.exceptions import PermissionDenied -from permissions.models import Permission -from common.utils import encapsulate + from acls.models import AccessEntry +from common.utils import encapsulate from clustering.permissions import PERMISSION_NODES_VIEW from clustering.models import Node +from permissions.models import Permission +from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted from .forms import JobProcessingConfigForm from .models import JobQueue, JobProcessingConfig -from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION +from .permissions import (PERMISSION_JOB_QUEUE_VIEW, + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) def node_workers(request, node_pk): @@ -73,6 +74,10 @@ def job_queues(request): 'title': _(u'job queue'), 'hide_link': True, 'extra_columns': [ + { + 'name': _(u'state'), + 'attribute': 'get_state_display', + }, { 'name': _(u'pending jobs'), 'attribute': 'pending_jobs.count', @@ -182,4 +187,66 @@ def job_queue_config_edit(request): 'form': form, 'object': job_processing_config, 'title': _(u'Edit job processing configuration') - }, context_instance=RequestContext(request)) + }, context_instance=RequestContext(request)) + + +def job_queue_stop(request, job_queue_pk): + job_queue = get_object_or_404(JobQueue, pk=job_queue_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_START_STOP]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_START_STOP, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + job_queue.stop() + except JobQueueAlreadyStopped: + messages.warning(request, _(u'job queue already stopped.')) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'Job queue stopped successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job_queue, + 'object_name': _(u'job queue'), + 'title': _(u'Are you sure you wish to stop job queue: %s?') % job_queue, + 'next': next, + 'previous': previous, + 'form_icon': u'control_stop_blue.png', + }, context_instance=RequestContext(request)) + + +def job_queue_start(request, job_queue_pk): + job_queue = get_object_or_404(JobQueue, pk=job_queue_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_VIEW, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + job_queue.start() + except JobQueueAlreadyStarted: + messages.warning(request, _(u'job queue already started.')) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'Job queue started successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job_queue, + 'object_name': _(u'job queue'), + 'title': _(u'Are you sure you wish to start job queue: %s?') % job_queue, + 'next': next, + 'previous': previous, + 'form_icon': u'control_play_blue.png', + }, context_instance=RequestContext(request))