diff --git a/apps/clustering/__init__.py b/apps/clustering/__init__.py index aefed173b9..a387a3d700 100644 --- a/apps/clustering/__init__.py +++ b/apps/clustering/__init__.py @@ -1,3 +1,4 @@ +""" from __future__ import absolute_import from django.utils.translation import ugettext_lazy as _ @@ -5,14 +6,11 @@ from django.db import transaction, DatabaseError from scheduler.api import LocalScheduler from navigation.api import bind_links -from project_tools.api import register_tool -from project_setup.api import register_setup from .tasks import send_heartbeat, house_keeping -from .links import tool_link, node_list, clustering_config_edit, setup_link -from .models import Node, ClusteringConfig +from .links import tool_link, node_list, clustering_config_edit +from .models import Node -ClusteringConfig() @transaction.commit_on_success def add_clustering_jobs(): clustering_scheduler = LocalScheduler('clustering', _(u'Clustering')) @@ -29,3 +27,4 @@ register_tool(tool_link) #register_setup(setup_link) bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') bind_links(['clustering_config_edit'], [clustering_config_edit], menu_name='secondary_menu') +""" diff --git a/apps/clustering/forms.py b/apps/clustering/forms.py deleted file mode 100644 index e0e38e4755..0000000000 --- a/apps/clustering/forms.py +++ /dev/null @@ -1,10 +0,0 @@ -from __future__ import absolute_import - -from django import forms - -from .models import ClusteringConfig - - -class ClusteringConfigForm(forms.ModelForm): - class Meta: - model = ClusteringConfig diff --git a/apps/clustering/links.py b/apps/clustering/links.py index 0c36ad59ec..93824731a7 100644 --- a/apps/clustering/links.py +++ b/apps/clustering/links.py @@ -9,5 +9,3 @@ from .icons import icon_tool_link, icon_node_link tool_link = Link(text=_(u'clustering'), view='node_list', icon=icon_tool_link, permissions=[PERMISSION_NODES_VIEW]) node_list = Link(text=_(u'node list'), view='node_list', icon=icon_node_link, permissions=[PERMISSION_NODES_VIEW]) -clustering_config_edit = Link(text=_(u'edit cluster configuration'), view='clustering_config_edit', permissions=[PERMISSION_EDIT_CLUSTER_CONFIGURATION]) -setup_link = Link(text=_(u'cluster configuration'), view='clustering_config_edit', permissions=[PERMISSION_EDIT_CLUSTER_CONFIGURATION]) diff --git a/apps/clustering/migrations/0008_auto__del_clusteringconfig.py b/apps/clustering/migrations/0008_auto__del_clusteringconfig.py new file mode 100644 index 0000000000..bda5613957 --- /dev/null +++ b/apps/clustering/migrations/0008_auto__del_clusteringconfig.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting model 'ClusteringConfig' + db.delete_table('clustering_clusteringconfig') + + + def backwards(self, orm): + # Adding model 'ClusteringConfig' + db.create_table('clustering_clusteringconfig', ( + ('lock_id', self.gf('django.db.models.fields.CharField')(default=1, max_length=1, unique=True)), + ('node_heartbeat_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=10)), + ('dead_node_removal_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=10)), + ('node_heartbeat_timeout', self.gf('django.db.models.fields.PositiveIntegerField')(default=60)), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + )) + db.send_create_signal('clustering', ['ClusteringConfig']) + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'}) + } + } + + complete_apps = ['clustering'] \ No newline at end of file diff --git a/apps/clustering/models.py b/apps/clustering/models.py index 5f53b7c32e..2e34730398 100644 --- a/apps/clustering/models.py +++ b/apps/clustering/models.py @@ -30,6 +30,14 @@ class NodeManager(models.Manager): def live_nodes(self): return self.model.objects.filter(state=NODE_STATE_HEALTHY) + def dead_nodes(self): + from .literals import DEFAULT_NODE_HEARTBEAT_TIMEOUT + return self.model.objects.filter(state=NODE_STATE_HEALTHY).filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=DEFAULT_NODE_HEARTBEAT_TIMEOUT)) + + def check_dead_nodes(self): + for node in self.dead_nodes(): + node.mark_as_dead() + class Node(models.Model): hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'), unique=True) @@ -77,30 +85,3 @@ class Node(models.Model): class Meta: verbose_name = _(u'node') verbose_name_plural = _(u'nodes') - - -class ClusteringConfigManager(models.Manager): - def dead_nodes(self): - return Node.objects.filter(state=NODE_STATE_HEALTHY).filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_heartbeat_timeout)) - - def check_dead_nodes(self): - for node in self.dead_nodes(): - node.mark_as_dead() - - -class ClusteringConfig(Singleton): - node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'node heartbeat interval (in seconds)'), help_text=_(u'Interval of time for the node\'s heartbeat update to the cluster.'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL) - node_heartbeat_timeout = models.PositiveIntegerField(verbose_name=(u'node heartbeat timeout (in seconds)'), help_text=_(u'After this amount of time a node without heartbeat updates is considered dead and removed from the cluster node list.'), default=DEFAULT_NODE_HEARTBEAT_TIMEOUT) - dead_node_removal_interval = models.PositiveIntegerField(verbose_name=(u'dead node check and removal interval (in seconds)'), help_text=_(u'Interval of time to check the cluster for unresponsive nodes and remove them from the cluster.'), default=DEFAULT_DEAD_NODE_REMOVAL_INTERVAL) - - cluster = ClusteringConfigManager() - - def __unicode__(self): - return ugettext('clustering config') - - #def clean(self): - # if self.node_heartbeat_interval > self.node_heartbeat_timeout: - # raise ValidationError(_(u'Heartbeat interval cannot be greater than heartbeat timeout or else nodes will always be rated as "dead"')) - - class Meta: - verbose_name = verbose_name_plural = _(u'clustering config') diff --git a/apps/clustering/post_init.py b/apps/clustering/post_init.py new file mode 100644 index 0000000000..ef9f67f291 --- /dev/null +++ b/apps/clustering/post_init.py @@ -0,0 +1,28 @@ +from __future__ import absolute_import + +from django.utils.translation import ugettext_lazy as _ +from django.db import transaction, DatabaseError + +from scheduler.api import LocalScheduler +from navigation.api import bind_links + +from .tasks import send_heartbeat, house_keeping +from .links import tool_link, node_list +from .models import Node +from .settings import NODE_HEARTBEAT_INTERVAL, DEAD_NODE_REMOVAL_INTERVAL + +@transaction.commit_on_success +def add_clustering_jobs(): + clustering_scheduler = LocalScheduler('clustering', _(u'Clustering')) + try: + # TODO: auto convert setting using JSON loads + clustering_scheduler.add_interval_job('send_heartbeat', _(u'Update a node\'s properties.'), send_heartbeat, seconds=int(NODE_HEARTBEAT_INTERVAL)) + clustering_scheduler.add_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=int(DEAD_NODE_REMOVAL_INTERVAL)) + except DatabaseError: + transaction.rollback() + clustering_scheduler.start() + + +def init_clustering(): + add_clustering_jobs() + bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') diff --git a/apps/clustering/registry.py b/apps/clustering/registry.py index ba080bd074..86d81986f4 100644 --- a/apps/clustering/registry.py +++ b/apps/clustering/registry.py @@ -7,11 +7,13 @@ from smart_settings import ClusterScope from .icons import icon_tool_link from .literals import (DEFAULT_NODE_HEARTBEAT_INTERVAL, DEFAULT_NODE_HEARTBEAT_TIMEOUT, DEFAULT_DEAD_NODE_REMOVAL_INTERVAL) +from .links import tool_link label = _(u'Clustering') description = _(u'Registers nodes into a Citadel (Mayan EDMS cluster).') dependencies = ['app_registry', 'icons', 'navigation', 'scheduler'] icon = icon_tool_link +tool_links = [tool_link] settings = [ { 'name': 'NODE_HEARTBEAT_INTERVAL', @@ -32,3 +34,8 @@ settings = [ 'scopes': [ClusterScope()] }, ] + +# TODO: implement settings post edit clean like method for sanity checks +#def clean(self): +# if self.node_heartbeat_interval > self.node_heartbeat_timeout: +# raise ValidationError(_(u'Heartbeat interval cannot be greater than heartbeat timeout or else nodes will always be rated as "dead"')) diff --git a/apps/clustering/tasks.py b/apps/clustering/tasks.py index 1de712fa7a..d9ced647ba 100644 --- a/apps/clustering/tasks.py +++ b/apps/clustering/tasks.py @@ -4,7 +4,7 @@ import logging from lock_manager.decorators import simple_locking -from .models import Node, ClusteringConfig +from .models import Node from .signals import node_heartbeat LOCK_EXPIRE = 10 @@ -23,5 +23,5 @@ def send_heartbeat(): @simple_locking('house_keeping', 10) def house_keeping(): logger.debug('starting') - ClusteringConfig.cluster.check_dead_nodes() + Node.objects.check_dead_nodes() diff --git a/apps/clustering/urls.py b/apps/clustering/urls.py index fbeeaa1984..e43cf0041d 100644 --- a/apps/clustering/urls.py +++ b/apps/clustering/urls.py @@ -3,5 +3,4 @@ from django.conf.urls.defaults import patterns, url urlpatterns = patterns('clustering.views', url(r'^node/list/$', 'node_list', (), 'node_list'), - url(r'^edit/$', 'clustering_config_edit', (), 'clustering_config_edit'), ) diff --git a/apps/clustering/views.py b/apps/clustering/views.py index a72bc91c58..182cdaa4af 100644 --- a/apps/clustering/views.py +++ b/apps/clustering/views.py @@ -13,8 +13,7 @@ from permissions.models import Permission from common.utils import encapsulate from acls.models import AccessEntry -from .forms import ClusteringConfigForm -from .models import Node, ClusteringConfig +from .models import Node from .permissions import PERMISSION_NODES_VIEW, PERMISSION_EDIT_CLUSTER_CONFIGURATION @@ -70,34 +69,3 @@ def node_workers(request, node_pk): return render_to_response('generic_list.html', context, context_instance=RequestContext(request)) - - -def clustering_config_edit(request): - Permission.objects.check_permissions(request.user, [PERMISSION_EDIT_CLUSTER_CONFIGURATION]) - - cluster_config = ClusteringConfig.get() - - post_action_redirect = None - - previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/'))) - next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/'))) - - - if request.method == 'POST': - form = ClusteringConfigForm(data=request.POST) - if form.is_valid(): - try: - form.save() - except Exception, exc: - messages.error(request, _(u'Error trying to edit cluster configuration; %s') % exc) - else: - messages.success(request, _(u'Cluster configuration edited successfully.')) - return HttpResponseRedirect(next) - else: - form = ClusteringConfigForm(instance=cluster_config) - - return render_to_response('generic_form.html', { - 'form': form, - 'object': cluster_config, - 'title': _(u'Edit cluster configuration') - }, context_instance=RequestContext(request)) diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index ce20abd1dd..8b13789179 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -1,88 +1 @@ -from __future__ import absolute_import -import atexit -import logging -import psutil -from multiprocessing import active_children - -from django.db import transaction, DatabaseError -from django.dispatch import receiver -from django.utils.translation import ugettext_lazy as _ - -from scheduler.api import LocalScheduler -from navigation.api import bind_links, register_model_list_columns -from project_setup.api import register_setup -from common.utils import encapsulate - -from clustering.models import Node -from clustering.signals import node_died, node_heartbeat - -from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker -from .tasks import job_queue_poll, house_keeping -from .links import (node_workers, job_queues, tool_link, - job_queue_items_pending, job_queue_items_error, job_queue_items_active, - job_queue_config_edit, setup_link, job_queue_start, job_queue_stop, - job_requeue, job_delete, worker_terminate) - -logger = logging.getLogger(__name__) - - -@transaction.commit_on_success -def add_job_queue_jobs(): - job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) - try: - job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JobProcessingConfig.get().job_queue_poll_interval) - job_processor_scheduler.add_interval_job('house_keeping', _(u'Poll a job queue for pending jobs.'), house_keeping, seconds=JobProcessingConfig.get().dead_job_removal_interval) - except DatabaseError: - transaction.rollback() - - job_processor_scheduler.start() - - -add_job_queue_jobs() -register_setup(setup_link) -bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') -bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error]) -bind_links([Node], [node_workers]) -bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') -bind_links([JobQueueItem], [job_requeue, job_delete]) -bind_links([Worker], [worker_terminate]) - -Node.add_to_class('workers', lambda node: node.worker_set) - -register_model_list_columns(Node, [ - { - 'name': _(u'active workers'), - 'attribute': encapsulate(lambda x: x.workers().all().count()) - }, -]) - - -@receiver(node_died, dispatch_uid='process_dead_workers') -def process_dead_workers(sender, node, **kwargs): - logger.debug('received signal') - for dead_worker in node.worker_set.all(): - if dead_worker.job_queue_item: - dead_worker.job_queue_item.requeue(force=True, at_top=True) - dead_worker.delete() - - -@receiver(node_heartbeat, dispatch_uid='node_processes') -def node_processes(sender, node, **kwargs): - logger.debug('update current node\'s processes') - all_active_pids = psutil.get_pid_list() - # Remove stale workers based on current child pids - for dead_worker in node.worker_set.exclude(pid__in=all_active_pids): - if dead_worker.job_queue_item: - dead_worker.job_queue_item.requeue(force=True, at_top=True) - dead_worker.delete() - - -def kill_all_node_processes(): - logger.debug('terminating this node\'s all processes') - for process in active_children(): - process.terminate() - process.join() - - -atexit.register(kill_all_node_processes) diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index a078a01586..42b9e0af15 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -45,9 +45,6 @@ job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_acti job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', icon=icon_job_queue_start, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running) job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', icon=icon_job_queue_stop, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running) -job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) -setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) - job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', icon=icon_job_requeue, permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state) job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', icon=icon_job_delete, permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state) diff --git a/apps/job_processor/migrations/0011_auto__del_jobprocessingconfig.py b/apps/job_processor/migrations/0011_auto__del_jobprocessingconfig.py new file mode 100644 index 0000000000..2def407b6a --- /dev/null +++ b/apps/job_processor/migrations/0011_auto__del_jobprocessingconfig.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting model 'JobProcessingConfig' + db.delete_table('job_processor_jobprocessingconfig') + + + def backwards(self, orm): + # Adding model 'JobProcessingConfig' + db.create_table('job_processor_jobprocessingconfig', ( + ('lock_id', self.gf('django.db.models.fields.CharField')(default=1, max_length=1, unique=True)), + ('job_queue_poll_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=2)), + ('dead_job_removal_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=5)), + ('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), + )) + db.send_create_signal('job_processor', ['JobProcessingConfig']) + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'ordering': "('priority',)", 'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'priority': ('django.db.models.fields.IntegerField', [], {'default': '0'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'unique_together': "(('node', 'pid'),)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']", 'null': 'True', 'blank': 'True'}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index eed9fe6694..c2412676d6 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -16,7 +16,6 @@ from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext from django.utils.simplejson import loads, dumps -from common.models import Singleton from clustering.models import Node from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, @@ -293,14 +292,3 @@ class Worker(models.Model): verbose_name = _(u'worker') verbose_name_plural = _(u'workers') unique_together = ('node', 'pid') - - -class JobProcessingConfig(Singleton): - job_queue_poll_interval = models.PositiveIntegerField(verbose_name=(u'job queue poll interval (in seconds)'), default=DEFAULT_JOB_QUEUE_POLL_INTERVAL) - dead_job_removal_interval = models.PositiveIntegerField(verbose_name=(u'dead job check and removal interval (in seconds)'), help_text=_(u'Interval of time to check the cluster for and remove unresponsive jobs.'), default=DEFAULT_DEAD_JOB_REMOVAL_INTERVAL) - - def __unicode__(self): - return ugettext(u'Job queues configuration') - - class Meta: - verbose_name = verbose_name_plural = _(u'job queues configuration') diff --git a/apps/job_processor/post_init.py b/apps/job_processor/post_init.py new file mode 100644 index 0000000000..99b73ea50e --- /dev/null +++ b/apps/job_processor/post_init.py @@ -0,0 +1,86 @@ +from __future__ import absolute_import + +import atexit +import logging +import psutil +from multiprocessing import active_children + +from django.db import transaction, DatabaseError +from django.dispatch import receiver +from django.utils.translation import ugettext_lazy as _ + +from scheduler.api import LocalScheduler +from navigation.api import bind_links, register_model_list_columns +from common.utils import encapsulate +from clustering.models import Node +from clustering.signals import node_died, node_heartbeat + +from .models import JobQueue, JobQueueItem, Worker +from .tasks import job_queue_poll, house_keeping +from .links import (node_workers, job_queues, tool_link, + job_queue_items_pending, job_queue_items_error, job_queue_items_active, + job_queue_start, job_queue_stop, job_requeue, job_delete, worker_terminate) +from .settings import QUEUE_POLL_INTERVAL, DEAD_JOB_REMOVAL_INTERVAL + +logger = logging.getLogger(__name__) + + +@transaction.commit_on_success +def add_job_queue_jobs(): + job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor')) + try: + # TODO: auto convert settings to int + job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=int(QUEUE_POLL_INTERVAL)) + job_processor_scheduler.add_interval_job('house_keeping', _(u'Poll a job queue for pending jobs.'), house_keeping, seconds=int(DEAD_JOB_REMOVAL_INTERVAL)) + except DatabaseError: + transaction.rollback() + + job_processor_scheduler.start() + + +@receiver(node_died, dispatch_uid='process_dead_workers') +def process_dead_workers(sender, node, **kwargs): + logger.debug('received signal') + for dead_worker in node.worker_set.all(): + if dead_worker.job_queue_item: + dead_worker.job_queue_item.requeue(force=True, at_top=True) + dead_worker.delete() + + +@receiver(node_heartbeat, dispatch_uid='node_processes') +def node_processes(sender, node, **kwargs): + logger.debug('update current node\'s processes') + all_active_pids = psutil.get_pid_list() + # Remove stale workers based on current child pids + for dead_worker in node.worker_set.exclude(pid__in=all_active_pids): + if dead_worker.job_queue_item: + dead_worker.job_queue_item.requeue(force=True, at_top=True) + dead_worker.delete() + + +def kill_all_node_processes(): + logger.debug('terminating this node\'s all processes') + for process in active_children(): + process.terminate() + process.join() + + +def init_job_processor(): + atexit.register(kill_all_node_processes) + + add_job_queue_jobs() + bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') + bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error]) + bind_links([Node], [node_workers]) + bind_links([JobQueueItem], [job_requeue, job_delete]) + bind_links([Worker], [worker_terminate]) + + Node.add_to_class('workers', lambda node: node.worker_set) + + register_model_list_columns(Node, [ + { + 'name': _(u'active workers'), + 'attribute': encapsulate(lambda x: x.workers().all().count()) + }, + ]) + diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index 83cb4716dc..c9a2cc3762 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -9,8 +9,6 @@ urlpatterns = patterns('job_processor.views', url(r'^queue/(?P\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), url(r'^queue/(?P\d+)/start/$', 'job_queue_start', (), 'job_queue_start'), url(r'^queue/(?P\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'), - - url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'), url(r'^job/(?P\d+)/requeue/$', 'job_requeue', (), 'job_requeue'), url(r'^job/(?P\d+)/delete/$', 'job_delete', (), 'job_delete'), diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index a6b202e1fb..5b7146d8d9 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -14,8 +14,7 @@ from clustering.models import Node from permissions.models import Permission from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted -from .forms import JobProcessingConfigForm -from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker +from .models import JobQueue, JobQueueItem, Worker from .permissions import (PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, PERMISSION_JOB_REQUEUE) @@ -163,38 +162,7 @@ def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False, return render_to_response('generic_list.html', context, context_instance=RequestContext(request)) - -def job_queue_config_edit(request): - Permission.objects.check_permissions(request.user, [PERMISSION_JOB_PROCESSING_CONFIGURATION]) - - job_processing_config = JobProcessingConfig.get() - - post_action_redirect = None - - previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/'))) - next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/'))) - - - if request.method == 'POST': - form = JobProcessingConfigForm(data=request.POST) - if form.is_valid(): - try: - form.save() - except Exception, exc: - messages.error(request, _(u'Error trying to edit job processing configuration; %s') % exc) - else: - messages.success(request, _(u'Job processing configuration edited successfully.')) - return HttpResponseRedirect(next) - else: - form = JobProcessingConfigForm(instance=job_processing_config) - - return render_to_response('generic_form.html', { - 'form': form, - 'object': job_processing_config, - 'title': _(u'Edit job processing configuration') - }, context_instance=RequestContext(request)) - - + def job_queue_stop(request, job_queue_pk): job_queue = get_object_or_404(JobQueue, pk=job_queue_pk)