diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index 66fb010bff..ecda0f6ff5 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -1,31 +1,41 @@ from __future__ import absolute_import +from django.db import transaction, DatabaseError from django.utils.translation import ugettext_lazy as _ from scheduler.api import LocalScheduler from navigation.api import bind_links, register_model_list_columns from project_tools.api import register_tool +from project_setup.api import register_setup from common.utils import encapsulate from clustering.models import Node -from .models import JobQueue +from .models import JobQueue, JobProcessingConfig from .tasks import job_queue_poll from .links import (node_workers, job_queues, tool_link, - job_queue_items_pending, job_queue_items_error, job_queue_items_active) + job_queue_items_pending, job_queue_items_error, job_queue_items_active, + job_queue_config_edit, setup_link) -#TODO: fix this, make it cluster wide -JOB_QUEUE_POLL_INTERVAL = 1 -job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) -job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) -job_processor_scheduler.start() +@transaction.commit_on_success +def add_job_queue_jobs(): + job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) + try: + job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JobProcessingConfig.get().job_queue_poll_interval) + except DatabaseError: + transaction.rollback() + job_processor_scheduler.start() + + +add_job_queue_jobs() register_tool(tool_link) +register_setup(setup_link) bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error]) - bind_links([Node], [node_workers]) +bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') Node.add_to_class('workers', lambda node: node.worker_set) diff --git a/apps/job_processor/forms.py b/apps/job_processor/forms.py new file mode 100644 index 0000000000..42170dfc84 --- /dev/null +++ b/apps/job_processor/forms.py @@ -0,0 +1,10 @@ +from __future__ import absolute_import + +from django import forms + +from .models import JobProcessingConfig + + +class JobProcessingConfigForm(forms.ModelForm): + class Meta: + model = JobProcessingConfig diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index a3fffee82b..30ce533b24 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -6,13 +6,14 @@ from navigation.api import Link from clustering.permissions import PERMISSION_NODES_VIEW -from .permissions import PERMISSION_JOB_QUEUE_VIEW - +from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) - tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) + +job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) +setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) diff --git a/apps/job_processor/literals.py b/apps/job_processor/literals.py index a8a1bdf7aa..7a0afb2535 100644 --- a/apps/job_processor/literals.py +++ b/apps/job_processor/literals.py @@ -17,3 +17,5 @@ JOB_STATE_CHOICES = ( (JOB_STATE_PROCESSING, _(u'processing')), (JOB_STATE_ERROR, _(u'error')), ) + +DEFAULT_JOB_QUEUE_POLL_INTERVAL = 2 diff --git a/apps/job_processor/migrations/0005_auto__add_jobprocessingconfig.py b/apps/job_processor/migrations/0005_auto__add_jobprocessingconfig.py new file mode 100644 index 0000000000..0af1e00544 --- /dev/null +++ b/apps/job_processor/migrations/0005_auto__add_jobprocessingconfig.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Adding model 'JobProcessingConfig' + db.create_table('job_processor_jobprocessingconfig', ( + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + ('lock_id', self.gf('django.db.models.fields.CharField')(default=1, unique=True, max_length=1)), + ('job_queue_poll_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=2)), + )) + db.send_create_signal('job_processor', ['JobProcessingConfig']) + + + def backwards(self, orm): + # Deleting model 'JobProcessingConfig' + db.delete_table('job_processor_jobprocessingconfig') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}) + }, + 'job_processor.jobprocessingconfig': { + 'Meta': {'object_name': 'JobProcessingConfig'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_poll_interval': ('django.db.models.fields.PositiveIntegerField', [], {'default': '2'}), + 'lock_id': ('django.db.models.fields.CharField', [], {'default': '1', 'unique': 'True', 'max_length': '1'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 8, 1, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index ed10c670bb..5ab68ba0c9 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -21,7 +21,7 @@ from clustering.models import Node from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES, - WORKER_STATE_RUNNING) + WORKER_STATE_RUNNING, DEFAULT_JOB_QUEUE_POLL_INTERVAL) from .exceptions import JobQueuePushError, JobQueueNoPendingJobs #from .exceptions import (WorkerAlreadyDisabled, WorkerAlreadyEnabled) @@ -135,6 +135,23 @@ class JobQueue(models.Model): if label: job_queue_labels[self.name] = label return super(JobQueue, self).save(*args, **kwargs) + + #def disable(self): + # if self.state == WORKER_STATE_DISABLED: + # raise WorkerAlreadyDisabled + # + # self.state = WORKER_STATE_DISABLED + # self.save() + # + #def enable(self): + # if self.state == WORKER_STATE_ENABLED: + # raise WorkerAlreadyEnabled + # + # self.state = WORKER_STATE_ENABLED + # self.save() + # + #def is_enabled(self): + # return self.state == WORKER_STATE_ENABLED # TODO: custom runtime methods @@ -206,36 +223,17 @@ class Worker(models.Model): def __unicode__(self): return u'%s-%s' % (self.node.hostname, self.pid) - #def disable(self): - # if self.state == WORKER_STATE_DISABLED: - # raise WorkerAlreadyDisabled - # - # self.state = WORKER_STATE_DISABLED - # self.save() - # - #def enable(self): - # if self.state == WORKER_STATE_ENABLED: - # raise WorkerAlreadyEnabled - # - # self.state = WORKER_STATE_ENABLED - # self.save() - # - #def is_enabled(self): - # return self.state == WORKER_STATE_ENABLED - class Meta: ordering = ('creation_datetime',) verbose_name = _(u'worker') verbose_name_plural = _(u'workers') -""" + class JobProcessingConfig(Singleton): - worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead - worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') + job_queue_poll_interval = models.PositiveIntegerField(verbose_name=(u'job queue poll interval (in seconds)'), default=DEFAULT_JOB_QUEUE_POLL_INTERVAL) def __unicode__(self): - return ugettext('Workers configuration') + return ugettext(u'Job queues configuration') class Meta: - verbose_name = verbose_name_plural = _(u'Workers configuration') -""" + verbose_name = verbose_name_plural = _(u'job queues configuration') diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index 74c06b609a..fac0392d5c 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -6,3 +6,4 @@ from permissions.models import PermissionNamespace, Permission namespace = PermissionNamespace('job_processor', _(u'Job processor')) PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) +PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster')) diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index d932d04fdc..751408def3 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -7,4 +7,5 @@ urlpatterns = patterns('job_processor.views', url(r'^queue/(?P\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'), url(r'^queue/(?P\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'), url(r'^queue/(?P\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), + url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 101697d46a..7c71c16ff6 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -14,8 +14,9 @@ from acls.models import AccessEntry from clustering.permissions import PERMISSION_NODES_VIEW from clustering.models import Node -from .models import JobQueue -from .permissions import PERMISSION_JOB_QUEUE_VIEW +from .forms import JobProcessingConfigForm +from .models import JobQueue, JobProcessingConfig +from .permissions import PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION def node_workers(request, node_pk): @@ -151,3 +152,34 @@ def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False, return render_to_response('generic_list.html', context, context_instance=RequestContext(request)) + + +def job_queue_config_edit(request): + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_PROCESSING_CONFIGURATION]) + + job_processing_config = JobProcessingConfig.get() + + post_action_redirect = None + + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/'))) + next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/'))) + + + if request.method == 'POST': + form = JobProcessingConfigForm(data=request.POST) + if form.is_valid(): + try: + form.save() + except Exception, exc: + messages.error(request, _(u'Error trying to edit job processing configuration; %s') % exc) + else: + messages.success(request, _(u'Job processing configuration edited successfully.')) + return HttpResponseRedirect(next) + else: + form = JobProcessingConfigForm(instance=job_processing_config) + + return render_to_response('generic_form.html', { + 'form': form, + 'object': job_processing_config, + 'title': _(u'Edit job processing configuration') + }, context_instance=RequestContext(request))