Update the clustering and job processor app to use the ClusterScope setting and not their own settings singleton

This commit is contained in:
Roberto Rosario
2012-09-15 04:56:58 -04:00
parent fa2795dd96
commit a4bbc65508
17 changed files with 244 additions and 218 deletions

View File

@@ -1,3 +1,4 @@
"""
from __future__ import absolute_import from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
@@ -5,14 +6,11 @@ from django.db import transaction, DatabaseError
from scheduler.api import LocalScheduler from scheduler.api import LocalScheduler
from navigation.api import bind_links from navigation.api import bind_links
from project_tools.api import register_tool
from project_setup.api import register_setup
from .tasks import send_heartbeat, house_keeping from .tasks import send_heartbeat, house_keeping
from .links import tool_link, node_list, clustering_config_edit, setup_link from .links import tool_link, node_list, clustering_config_edit
from .models import Node, ClusteringConfig from .models import Node
ClusteringConfig()
@transaction.commit_on_success @transaction.commit_on_success
def add_clustering_jobs(): def add_clustering_jobs():
clustering_scheduler = LocalScheduler('clustering', _(u'Clustering')) clustering_scheduler = LocalScheduler('clustering', _(u'Clustering'))
@@ -29,3 +27,4 @@ register_tool(tool_link)
#register_setup(setup_link) #register_setup(setup_link)
bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu')
bind_links(['clustering_config_edit'], [clustering_config_edit], menu_name='secondary_menu') bind_links(['clustering_config_edit'], [clustering_config_edit], menu_name='secondary_menu')
"""

View File

@@ -1,10 +0,0 @@
from __future__ import absolute_import
from django import forms
from .models import ClusteringConfig
class ClusteringConfigForm(forms.ModelForm):
class Meta:
model = ClusteringConfig

View File

@@ -9,5 +9,3 @@ from .icons import icon_tool_link, icon_node_link
tool_link = Link(text=_(u'clustering'), view='node_list', icon=icon_tool_link, permissions=[PERMISSION_NODES_VIEW]) tool_link = Link(text=_(u'clustering'), view='node_list', icon=icon_tool_link, permissions=[PERMISSION_NODES_VIEW])
node_list = Link(text=_(u'node list'), view='node_list', icon=icon_node_link, permissions=[PERMISSION_NODES_VIEW]) node_list = Link(text=_(u'node list'), view='node_list', icon=icon_node_link, permissions=[PERMISSION_NODES_VIEW])
clustering_config_edit = Link(text=_(u'edit cluster configuration'), view='clustering_config_edit', permissions=[PERMISSION_EDIT_CLUSTER_CONFIGURATION])
setup_link = Link(text=_(u'cluster configuration'), view='clustering_config_edit', permissions=[PERMISSION_EDIT_CLUSTER_CONFIGURATION])

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting model 'ClusteringConfig'
db.delete_table('clustering_clusteringconfig')
def backwards(self, orm):
# Adding model 'ClusteringConfig'
db.create_table('clustering_clusteringconfig', (
('lock_id', self.gf('django.db.models.fields.CharField')(default=1, max_length=1, unique=True)),
('node_heartbeat_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=10)),
('dead_node_removal_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=10)),
('node_heartbeat_timeout', self.gf('django.db.models.fields.PositiveIntegerField')(default=60)),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
))
db.send_create_signal('clustering', ['ClusteringConfig'])
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'})
}
}
complete_apps = ['clustering']

View File

@@ -30,6 +30,14 @@ class NodeManager(models.Manager):
def live_nodes(self): def live_nodes(self):
return self.model.objects.filter(state=NODE_STATE_HEALTHY) return self.model.objects.filter(state=NODE_STATE_HEALTHY)
def dead_nodes(self):
from .literals import DEFAULT_NODE_HEARTBEAT_TIMEOUT
return self.model.objects.filter(state=NODE_STATE_HEALTHY).filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=DEFAULT_NODE_HEARTBEAT_TIMEOUT))
def check_dead_nodes(self):
for node in self.dead_nodes():
node.mark_as_dead()
class Node(models.Model): class Node(models.Model):
hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'), unique=True) hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'), unique=True)
@@ -77,30 +85,3 @@ class Node(models.Model):
class Meta: class Meta:
verbose_name = _(u'node') verbose_name = _(u'node')
verbose_name_plural = _(u'nodes') verbose_name_plural = _(u'nodes')
class ClusteringConfigManager(models.Manager):
def dead_nodes(self):
return Node.objects.filter(state=NODE_STATE_HEALTHY).filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_heartbeat_timeout))
def check_dead_nodes(self):
for node in self.dead_nodes():
node.mark_as_dead()
class ClusteringConfig(Singleton):
node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'node heartbeat interval (in seconds)'), help_text=_(u'Interval of time for the node\'s heartbeat update to the cluster.'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL)
node_heartbeat_timeout = models.PositiveIntegerField(verbose_name=(u'node heartbeat timeout (in seconds)'), help_text=_(u'After this amount of time a node without heartbeat updates is considered dead and removed from the cluster node list.'), default=DEFAULT_NODE_HEARTBEAT_TIMEOUT)
dead_node_removal_interval = models.PositiveIntegerField(verbose_name=(u'dead node check and removal interval (in seconds)'), help_text=_(u'Interval of time to check the cluster for unresponsive nodes and remove them from the cluster.'), default=DEFAULT_DEAD_NODE_REMOVAL_INTERVAL)
cluster = ClusteringConfigManager()
def __unicode__(self):
return ugettext('clustering config')
#def clean(self):
# if self.node_heartbeat_interval > self.node_heartbeat_timeout:
# raise ValidationError(_(u'Heartbeat interval cannot be greater than heartbeat timeout or else nodes will always be rated as "dead"'))
class Meta:
verbose_name = verbose_name_plural = _(u'clustering config')

View File

@@ -0,0 +1,28 @@
from __future__ import absolute_import
from django.utils.translation import ugettext_lazy as _
from django.db import transaction, DatabaseError
from scheduler.api import LocalScheduler
from navigation.api import bind_links
from .tasks import send_heartbeat, house_keeping
from .links import tool_link, node_list
from .models import Node
from .settings import NODE_HEARTBEAT_INTERVAL, DEAD_NODE_REMOVAL_INTERVAL
@transaction.commit_on_success
def add_clustering_jobs():
clustering_scheduler = LocalScheduler('clustering', _(u'Clustering'))
try:
# TODO: auto convert setting using JSON loads
clustering_scheduler.add_interval_job('send_heartbeat', _(u'Update a node\'s properties.'), send_heartbeat, seconds=int(NODE_HEARTBEAT_INTERVAL))
clustering_scheduler.add_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=int(DEAD_NODE_REMOVAL_INTERVAL))
except DatabaseError:
transaction.rollback()
clustering_scheduler.start()
def init_clustering():
add_clustering_jobs()
bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu')

View File

@@ -7,11 +7,13 @@ from smart_settings import ClusterScope
from .icons import icon_tool_link from .icons import icon_tool_link
from .literals import (DEFAULT_NODE_HEARTBEAT_INTERVAL, DEFAULT_NODE_HEARTBEAT_TIMEOUT, from .literals import (DEFAULT_NODE_HEARTBEAT_INTERVAL, DEFAULT_NODE_HEARTBEAT_TIMEOUT,
DEFAULT_DEAD_NODE_REMOVAL_INTERVAL) DEFAULT_DEAD_NODE_REMOVAL_INTERVAL)
from .links import tool_link
label = _(u'Clustering') label = _(u'Clustering')
description = _(u'Registers nodes into a Citadel (Mayan EDMS cluster).') description = _(u'Registers nodes into a Citadel (Mayan EDMS cluster).')
dependencies = ['app_registry', 'icons', 'navigation', 'scheduler'] dependencies = ['app_registry', 'icons', 'navigation', 'scheduler']
icon = icon_tool_link icon = icon_tool_link
tool_links = [tool_link]
settings = [ settings = [
{ {
'name': 'NODE_HEARTBEAT_INTERVAL', 'name': 'NODE_HEARTBEAT_INTERVAL',
@@ -32,3 +34,8 @@ settings = [
'scopes': [ClusterScope()] 'scopes': [ClusterScope()]
}, },
] ]
# TODO: implement settings post edit clean like method for sanity checks
#def clean(self):
# if self.node_heartbeat_interval > self.node_heartbeat_timeout:
# raise ValidationError(_(u'Heartbeat interval cannot be greater than heartbeat timeout or else nodes will always be rated as "dead"'))

View File

@@ -4,7 +4,7 @@ import logging
from lock_manager.decorators import simple_locking from lock_manager.decorators import simple_locking
from .models import Node, ClusteringConfig from .models import Node
from .signals import node_heartbeat from .signals import node_heartbeat
LOCK_EXPIRE = 10 LOCK_EXPIRE = 10
@@ -23,5 +23,5 @@ def send_heartbeat():
@simple_locking('house_keeping', 10) @simple_locking('house_keeping', 10)
def house_keeping(): def house_keeping():
logger.debug('starting') logger.debug('starting')
ClusteringConfig.cluster.check_dead_nodes() Node.objects.check_dead_nodes()

View File

@@ -3,5 +3,4 @@ from django.conf.urls.defaults import patterns, url
urlpatterns = patterns('clustering.views', urlpatterns = patterns('clustering.views',
url(r'^node/list/$', 'node_list', (), 'node_list'), url(r'^node/list/$', 'node_list', (), 'node_list'),
url(r'^edit/$', 'clustering_config_edit', (), 'clustering_config_edit'),
) )

View File

@@ -13,8 +13,7 @@ from permissions.models import Permission
from common.utils import encapsulate from common.utils import encapsulate
from acls.models import AccessEntry from acls.models import AccessEntry
from .forms import ClusteringConfigForm from .models import Node
from .models import Node, ClusteringConfig
from .permissions import PERMISSION_NODES_VIEW, PERMISSION_EDIT_CLUSTER_CONFIGURATION from .permissions import PERMISSION_NODES_VIEW, PERMISSION_EDIT_CLUSTER_CONFIGURATION
@@ -70,34 +69,3 @@ def node_workers(request, node_pk):
return render_to_response('generic_list.html', context, return render_to_response('generic_list.html', context,
context_instance=RequestContext(request)) context_instance=RequestContext(request))
def clustering_config_edit(request):
Permission.objects.check_permissions(request.user, [PERMISSION_EDIT_CLUSTER_CONFIGURATION])
cluster_config = ClusteringConfig.get()
post_action_redirect = None
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/')))
next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/')))
if request.method == 'POST':
form = ClusteringConfigForm(data=request.POST)
if form.is_valid():
try:
form.save()
except Exception, exc:
messages.error(request, _(u'Error trying to edit cluster configuration; %s') % exc)
else:
messages.success(request, _(u'Cluster configuration edited successfully.'))
return HttpResponseRedirect(next)
else:
form = ClusteringConfigForm(instance=cluster_config)
return render_to_response('generic_form.html', {
'form': form,
'object': cluster_config,
'title': _(u'Edit cluster configuration')
}, context_instance=RequestContext(request))

View File

@@ -1,88 +1 @@
from __future__ import absolute_import
import atexit
import logging
import psutil
from multiprocessing import active_children
from django.db import transaction, DatabaseError
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from scheduler.api import LocalScheduler
from navigation.api import bind_links, register_model_list_columns
from project_setup.api import register_setup
from common.utils import encapsulate
from clustering.models import Node
from clustering.signals import node_died, node_heartbeat
from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker
from .tasks import job_queue_poll, house_keeping
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
job_queue_config_edit, setup_link, job_queue_start, job_queue_stop,
job_requeue, job_delete, worker_terminate)
logger = logging.getLogger(__name__)
@transaction.commit_on_success
def add_job_queue_jobs():
job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor'))
try:
job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JobProcessingConfig.get().job_queue_poll_interval)
job_processor_scheduler.add_interval_job('house_keeping', _(u'Poll a job queue for pending jobs.'), house_keeping, seconds=JobProcessingConfig.get().dead_job_removal_interval)
except DatabaseError:
transaction.rollback()
job_processor_scheduler.start()
add_job_queue_jobs()
register_setup(setup_link)
bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error])
bind_links([Node], [node_workers])
bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu')
bind_links([JobQueueItem], [job_requeue, job_delete])
bind_links([Worker], [worker_terminate])
Node.add_to_class('workers', lambda node: node.worker_set)
register_model_list_columns(Node, [
{
'name': _(u'active workers'),
'attribute': encapsulate(lambda x: x.workers().all().count())
},
])
@receiver(node_died, dispatch_uid='process_dead_workers')
def process_dead_workers(sender, node, **kwargs):
logger.debug('received signal')
for dead_worker in node.worker_set.all():
if dead_worker.job_queue_item:
dead_worker.job_queue_item.requeue(force=True, at_top=True)
dead_worker.delete()
@receiver(node_heartbeat, dispatch_uid='node_processes')
def node_processes(sender, node, **kwargs):
logger.debug('update current node\'s processes')
all_active_pids = psutil.get_pid_list()
# Remove stale workers based on current child pids
for dead_worker in node.worker_set.exclude(pid__in=all_active_pids):
if dead_worker.job_queue_item:
dead_worker.job_queue_item.requeue(force=True, at_top=True)
dead_worker.delete()
def kill_all_node_processes():
logger.debug('terminating this node\'s all processes')
for process in active_children():
process.terminate()
process.join()
atexit.register(kill_all_node_processes)

View File

@@ -45,9 +45,6 @@ job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_acti
job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', icon=icon_job_queue_start, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running) job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', icon=icon_job_queue_start, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running)
job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', icon=icon_job_queue_stop, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running) job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', icon=icon_job_queue_stop, permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running)
job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', icon=icon_job_requeue, permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state) job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', icon=icon_job_requeue, permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state)
job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', icon=icon_job_delete, permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state) job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', icon=icon_job_delete, permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state)

View File

@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Deleting model 'JobProcessingConfig'
db.delete_table('job_processor_jobprocessingconfig')
def backwards(self, orm):
# Adding model 'JobProcessingConfig'
db.create_table('job_processor_jobprocessingconfig', (
('lock_id', self.gf('django.db.models.fields.CharField')(default=1, max_length=1, unique=True)),
('job_queue_poll_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=2)),
('dead_job_removal_interval', self.gf('django.db.models.fields.PositiveIntegerField')(default=5)),
('id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
))
db.send_create_signal('job_processor', ['JobProcessingConfig'])
models = {
'clustering.node': {
'Meta': {'object_name': 'Node'},
'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}),
'hostname': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '255'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0', 'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'d'", 'max_length': '4'})
},
'job_processor.jobqueue': {
'Meta': {'ordering': "('priority',)", 'object_name': 'JobQueue'},
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}),
'priority': ('django.db.models.fields.IntegerField', [], {'default': '0'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}),
'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'})
},
'job_processor.jobqueueitem': {
'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}),
'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}),
'kwargs': ('django.db.models.fields.TextField', [], {}),
'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}),
'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'})
},
'job_processor.worker': {
'Meta': {'ordering': "('creation_datetime',)", 'unique_together': "(('node', 'pid'),)", 'object_name': 'Worker'},
'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)'}),
'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 9, 15, 0, 0)', 'blank': 'True'}),
'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']", 'null': 'True', 'blank': 'True'}),
'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}),
'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}),
'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'})
}
}
complete_apps = ['job_processor']

View File

@@ -16,7 +16,6 @@ from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ugettext from django.utils.translation import ugettext
from django.utils.simplejson import loads, dumps from django.utils.simplejson import loads, dumps
from common.models import Singleton
from clustering.models import Node from clustering.models import Node
from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING, from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
@@ -293,14 +292,3 @@ class Worker(models.Model):
verbose_name = _(u'worker') verbose_name = _(u'worker')
verbose_name_plural = _(u'workers') verbose_name_plural = _(u'workers')
unique_together = ('node', 'pid') unique_together = ('node', 'pid')
class JobProcessingConfig(Singleton):
job_queue_poll_interval = models.PositiveIntegerField(verbose_name=(u'job queue poll interval (in seconds)'), default=DEFAULT_JOB_QUEUE_POLL_INTERVAL)
dead_job_removal_interval = models.PositiveIntegerField(verbose_name=(u'dead job check and removal interval (in seconds)'), help_text=_(u'Interval of time to check the cluster for and remove unresponsive jobs.'), default=DEFAULT_DEAD_JOB_REMOVAL_INTERVAL)
def __unicode__(self):
return ugettext(u'Job queues configuration')
class Meta:
verbose_name = verbose_name_plural = _(u'job queues configuration')

View File

@@ -0,0 +1,86 @@
from __future__ import absolute_import
import atexit
import logging
import psutil
from multiprocessing import active_children
from django.db import transaction, DatabaseError
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from scheduler.api import LocalScheduler
from navigation.api import bind_links, register_model_list_columns
from common.utils import encapsulate
from clustering.models import Node
from clustering.signals import node_died, node_heartbeat
from .models import JobQueue, JobQueueItem, Worker
from .tasks import job_queue_poll, house_keeping
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
job_queue_start, job_queue_stop, job_requeue, job_delete, worker_terminate)
from .settings import QUEUE_POLL_INTERVAL, DEAD_JOB_REMOVAL_INTERVAL
logger = logging.getLogger(__name__)
@transaction.commit_on_success
def add_job_queue_jobs():
job_processor_scheduler = LocalScheduler('job_processor', _(u'Job processor'))
try:
# TODO: auto convert settings to int
job_processor_scheduler.add_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=int(QUEUE_POLL_INTERVAL))
job_processor_scheduler.add_interval_job('house_keeping', _(u'Poll a job queue for pending jobs.'), house_keeping, seconds=int(DEAD_JOB_REMOVAL_INTERVAL))
except DatabaseError:
transaction.rollback()
job_processor_scheduler.start()
@receiver(node_died, dispatch_uid='process_dead_workers')
def process_dead_workers(sender, node, **kwargs):
logger.debug('received signal')
for dead_worker in node.worker_set.all():
if dead_worker.job_queue_item:
dead_worker.job_queue_item.requeue(force=True, at_top=True)
dead_worker.delete()
@receiver(node_heartbeat, dispatch_uid='node_processes')
def node_processes(sender, node, **kwargs):
logger.debug('update current node\'s processes')
all_active_pids = psutil.get_pid_list()
# Remove stale workers based on current child pids
for dead_worker in node.worker_set.exclude(pid__in=all_active_pids):
if dead_worker.job_queue_item:
dead_worker.job_queue_item.requeue(force=True, at_top=True)
dead_worker.delete()
def kill_all_node_processes():
logger.debug('terminating this node\'s all processes')
for process in active_children():
process.terminate()
process.join()
def init_job_processor():
atexit.register(kill_all_node_processes)
add_job_queue_jobs()
bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error])
bind_links([Node], [node_workers])
bind_links([JobQueueItem], [job_requeue, job_delete])
bind_links([Worker], [worker_terminate])
Node.add_to_class('workers', lambda node: node.worker_set)
register_model_list_columns(Node, [
{
'name': _(u'active workers'),
'attribute': encapsulate(lambda x: x.workers().all().count())
},
])

View File

@@ -9,8 +9,6 @@ urlpatterns = patterns('job_processor.views',
url(r'^queue/(?P<job_queue_pk>\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), url(r'^queue/(?P<job_queue_pk>\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'),
url(r'^queue/(?P<job_queue_pk>\d+)/start/$', 'job_queue_start', (), 'job_queue_start'), url(r'^queue/(?P<job_queue_pk>\d+)/start/$', 'job_queue_start', (), 'job_queue_start'),
url(r'^queue/(?P<job_queue_pk>\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'), url(r'^queue/(?P<job_queue_pk>\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'),
url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'),
url(r'^job/(?P<job_item_pk>\d+)/requeue/$', 'job_requeue', (), 'job_requeue'), url(r'^job/(?P<job_item_pk>\d+)/requeue/$', 'job_requeue', (), 'job_requeue'),
url(r'^job/(?P<job_item_pk>\d+)/delete/$', 'job_delete', (), 'job_delete'), url(r'^job/(?P<job_item_pk>\d+)/delete/$', 'job_delete', (), 'job_delete'),

View File

@@ -14,8 +14,7 @@ from clustering.models import Node
from permissions.models import Permission from permissions.models import Permission
from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted
from .forms import JobProcessingConfigForm from .models import JobQueue, JobQueueItem, Worker
from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker
from .permissions import (PERMISSION_JOB_QUEUE_VIEW, from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP,
PERMISSION_JOB_REQUEUE) PERMISSION_JOB_REQUEUE)
@@ -163,38 +162,7 @@ def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False,
return render_to_response('generic_list.html', context, return render_to_response('generic_list.html', context,
context_instance=RequestContext(request)) context_instance=RequestContext(request))
def job_queue_config_edit(request):
Permission.objects.check_permissions(request.user, [PERMISSION_JOB_PROCESSING_CONFIGURATION])
job_processing_config = JobProcessingConfig.get()
post_action_redirect = None
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', '/')))
next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', '/')))
if request.method == 'POST':
form = JobProcessingConfigForm(data=request.POST)
if form.is_valid():
try:
form.save()
except Exception, exc:
messages.error(request, _(u'Error trying to edit job processing configuration; %s') % exc)
else:
messages.success(request, _(u'Job processing configuration edited successfully.'))
return HttpResponseRedirect(next)
else:
form = JobProcessingConfigForm(instance=job_processing_config)
return render_to_response('generic_form.html', {
'form': form,
'object': job_processing_config,
'title': _(u'Edit job processing configuration')
}, context_instance=RequestContext(request))
def job_queue_stop(request, job_queue_pk): def job_queue_stop(request, job_queue_pk):
job_queue = get_object_or_404(JobQueue, pk=job_queue_pk) job_queue = get_object_or_404(JobQueue, pk=job_queue_pk)