From 87c958638df68fd6a45b59314bb10075d12275bd Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Mon, 30 Jul 2012 23:51:05 -0400 Subject: [PATCH] Implement clustering housekeeping, deleting 'dead' nodes --- apps/clustering/__init__.py | 9 +++--- apps/clustering/models.py | 62 +++++++++++++++++++++++++++++++------ apps/clustering/tasks.py | 16 +++++++--- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/apps/clustering/__init__.py b/apps/clustering/__init__.py index 4149c12347..f5b453f6d2 100644 --- a/apps/clustering/__init__.py +++ b/apps/clustering/__init__.py @@ -6,13 +6,12 @@ from scheduler.api import register_interval_job from navigation.api import bind_links from project_tools.api import register_tool -from .tasks import refresh_node +from .tasks import node_heartbeat, house_keeping from .links import tool_link, node_list -from .models import Node +from .models import Node, ClusteringConfig -NODE_REFRESH_INTERVAL = 1 - -register_interval_job('refresh_node', _(u'Update a node\'s properties.'), refresh_node, seconds=NODE_REFRESH_INTERVAL) +register_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval) +register_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1) register_tool(tool_link) bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') diff --git a/apps/clustering/models.py b/apps/clustering/models.py index 67ded6cf04..764691aadc 100644 --- a/apps/clustering/models.py +++ b/apps/clustering/models.py @@ -11,30 +11,47 @@ from django.db import close_connection from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext +from common.models import Singleton + +DEFAULT_NODE_TTL = 5 +DEFAULT_NODE_HEARTBEAT_INTERVAL = 1 + class NodeManager(models.Manager): def myself(self): - node, created = self.model.objects.get_or_create(hostname=platform.node(), defaults={'memory_usage': 100}) + node, created = self.model.objects.get_or_create(hostname=platform.node()) node.refresh() + if created: + # Store the refresh data because is a new instance + node.save() return node class Node(models.Model): hostname = models.CharField(max_length=255, verbose_name=_(u'hostname')) - cpuload = models.PositiveIntegerField(blank=True, default=0, verbose_name=_(u'cpu load')) + cpuload = models.FloatField(blank=True, default=0.0, verbose_name=_(u'cpu load')) heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check')) - memory_usage = models.FloatField(blank=True, verbose_name=_(u'memory usage')) - - objects = NodeManager() + memory_usage = models.FloatField(blank=True, default=0.0, verbose_name=_(u'memory usage')) + objects = NodeManager() + + @classmethod + def platform_info(cls): + return { + 'cpuload': psutil.cpu_percent(), + 'memory_usage': psutil.phymem_usage().percent + } + def __unicode__(self): return self.hostname def refresh(self): - self.cpuload = psutil.cpu_percent() - self.memory_usage = psutil.phymem_usage().percent - self.save() - + if self.hostname == platform.node(): + # Make we can only update ourselves + info = Node.platform_info() + self.cpuload = info['cpuload'] + self.memory_usage = info['memory_usage'] + def save(self, *args, **kwargs): self.heartbeat = datetime.datetime.now() return super(Node, self).save(*args, **kwargs) @@ -42,3 +59,30 @@ class Node(models.Model): class Meta: verbose_name = _(u'node') verbose_name_plural = _(u'nodes') + + +class ClusteringConfigManager(models.Manager): + def dead_nodes(self): + return Node.objects.filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_time_to_live)) + + def delete_dead_nodes(self): + self.dead_nodes().delete() + + def zombiest_node(self): + try: + return self.dead_nodes().order_by('-heartbeat')[0] + except IndexError: + return None + + +class ClusteringConfig(Singleton): + node_time_to_live = models.PositiveIntegerField(verbose_name=(u'time to live (in seconds)'), default=DEFAULT_NODE_TTL) # After this time a worker is considered dead + node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'heartbeat interval'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL) + + objects = ClusteringConfigManager() + + def __unicode__(self): + return ugettext('clustering config') + + class Meta: + verbose_name = verbose_name_plural = _(u'clustering config') diff --git a/apps/clustering/tasks.py b/apps/clustering/tasks.py index aa9f01b53e..c5938047d9 100644 --- a/apps/clustering/tasks.py +++ b/apps/clustering/tasks.py @@ -4,14 +4,22 @@ import logging from lock_manager.decorators import simple_locking -from .models import Node +from .models import Node, ClusteringConfig LOCK_EXPIRE = 10 logger = logging.getLogger(__name__) -@simple_locking('refresh_node', 10) -def refresh_node(): +@simple_locking('node_heartbeat', 10) +def node_heartbeat(): logger.debug('starting') - node = Node.objects.myself() # Automatically calls the refresh() method too + node = Node.objects.myself() + node.save() + + +@simple_locking('house_keeping', 10) +def house_keeping(): + logger.debug('starting') + ClusteringConfig.objects.delete_dead_nodes() +