Implement clustering housekeeping, deleting 'dead' nodes
This commit is contained in:
@@ -6,13 +6,12 @@ from scheduler.api import register_interval_job
|
||||
from navigation.api import bind_links
|
||||
from project_tools.api import register_tool
|
||||
|
||||
from .tasks import refresh_node
|
||||
from .tasks import node_heartbeat, house_keeping
|
||||
from .links import tool_link, node_list
|
||||
from .models import Node
|
||||
from .models import Node, ClusteringConfig
|
||||
|
||||
NODE_REFRESH_INTERVAL = 1
|
||||
|
||||
register_interval_job('refresh_node', _(u'Update a node\'s properties.'), refresh_node, seconds=NODE_REFRESH_INTERVAL)
|
||||
register_interval_job('node_heartbeat', _(u'Update a node\'s properties.'), node_heartbeat, seconds=ClusteringConfig.get().node_heartbeat_interval)
|
||||
register_interval_job('house_keeping', _(u'Check for unresponsive nodes in the cluster list.'), house_keeping, seconds=1)
|
||||
|
||||
register_tool(tool_link)
|
||||
bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu')
|
||||
|
||||
@@ -11,29 +11,46 @@ from django.db import close_connection
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.translation import ugettext
|
||||
|
||||
from common.models import Singleton
|
||||
|
||||
DEFAULT_NODE_TTL = 5
|
||||
DEFAULT_NODE_HEARTBEAT_INTERVAL = 1
|
||||
|
||||
|
||||
class NodeManager(models.Manager):
|
||||
def myself(self):
|
||||
node, created = self.model.objects.get_or_create(hostname=platform.node(), defaults={'memory_usage': 100})
|
||||
node, created = self.model.objects.get_or_create(hostname=platform.node())
|
||||
node.refresh()
|
||||
if created:
|
||||
# Store the refresh data because is a new instance
|
||||
node.save()
|
||||
return node
|
||||
|
||||
|
||||
class Node(models.Model):
|
||||
hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'))
|
||||
cpuload = models.PositiveIntegerField(blank=True, default=0, verbose_name=_(u'cpu load'))
|
||||
cpuload = models.FloatField(blank=True, default=0.0, verbose_name=_(u'cpu load'))
|
||||
heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check'))
|
||||
memory_usage = models.FloatField(blank=True, verbose_name=_(u'memory usage'))
|
||||
memory_usage = models.FloatField(blank=True, default=0.0, verbose_name=_(u'memory usage'))
|
||||
|
||||
objects = NodeManager()
|
||||
|
||||
@classmethod
|
||||
def platform_info(cls):
|
||||
return {
|
||||
'cpuload': psutil.cpu_percent(),
|
||||
'memory_usage': psutil.phymem_usage().percent
|
||||
}
|
||||
|
||||
def __unicode__(self):
|
||||
return self.hostname
|
||||
|
||||
def refresh(self):
|
||||
self.cpuload = psutil.cpu_percent()
|
||||
self.memory_usage = psutil.phymem_usage().percent
|
||||
self.save()
|
||||
if self.hostname == platform.node():
|
||||
# Make we can only update ourselves
|
||||
info = Node.platform_info()
|
||||
self.cpuload = info['cpuload']
|
||||
self.memory_usage = info['memory_usage']
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.heartbeat = datetime.datetime.now()
|
||||
@@ -42,3 +59,30 @@ class Node(models.Model):
|
||||
class Meta:
|
||||
verbose_name = _(u'node')
|
||||
verbose_name_plural = _(u'nodes')
|
||||
|
||||
|
||||
class ClusteringConfigManager(models.Manager):
|
||||
def dead_nodes(self):
|
||||
return Node.objects.filter(heartbeat__lt=datetime.datetime.now() - datetime.timedelta(seconds=self.model.get().node_time_to_live))
|
||||
|
||||
def delete_dead_nodes(self):
|
||||
self.dead_nodes().delete()
|
||||
|
||||
def zombiest_node(self):
|
||||
try:
|
||||
return self.dead_nodes().order_by('-heartbeat')[0]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
|
||||
class ClusteringConfig(Singleton):
|
||||
node_time_to_live = models.PositiveIntegerField(verbose_name=(u'time to live (in seconds)'), default=DEFAULT_NODE_TTL) # After this time a worker is considered dead
|
||||
node_heartbeat_interval = models.PositiveIntegerField(verbose_name=(u'heartbeat interval'), default=DEFAULT_NODE_HEARTBEAT_INTERVAL)
|
||||
|
||||
objects = ClusteringConfigManager()
|
||||
|
||||
def __unicode__(self):
|
||||
return ugettext('clustering config')
|
||||
|
||||
class Meta:
|
||||
verbose_name = verbose_name_plural = _(u'clustering config')
|
||||
|
||||
@@ -4,14 +4,22 @@ import logging
|
||||
|
||||
from lock_manager.decorators import simple_locking
|
||||
|
||||
from .models import Node
|
||||
from .models import Node, ClusteringConfig
|
||||
|
||||
LOCK_EXPIRE = 10
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@simple_locking('refresh_node', 10)
|
||||
def refresh_node():
|
||||
@simple_locking('node_heartbeat', 10)
|
||||
def node_heartbeat():
|
||||
logger.debug('starting')
|
||||
node = Node.objects.myself() # Automatically calls the refresh() method too
|
||||
node = Node.objects.myself()
|
||||
node.save()
|
||||
|
||||
|
||||
@simple_locking('house_keeping', 10)
|
||||
def house_keeping():
|
||||
logger.debug('starting')
|
||||
ClusteringConfig.objects.delete_dead_nodes()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user