Job processing app updates
This commit is contained in:
@@ -3,11 +3,27 @@ from __future__ import absolute_import
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from scheduler.api import register_interval_job
|
||||
from navigation.api import bind_links, register_model_list_columns
|
||||
from project_tools.api import register_tool
|
||||
from common.utils import encapsulate
|
||||
|
||||
from .tasks import refresh_node, job_queue_poll
|
||||
from .tasks import job_queue_poll
|
||||
from .links import node_workers
|
||||
from clustering.models import Node
|
||||
|
||||
NODE_REFRESH_INTERVAL = 1
|
||||
JOB_QUEUE_POLL_INTERVAL = 1
|
||||
|
||||
register_interval_job('refresh_node', _(u'Update a node\'s properties.'), refresh_node, seconds=NODE_REFRESH_INTERVAL)
|
||||
register_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL)
|
||||
|
||||
#register_tool(tool_link)
|
||||
#bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu')
|
||||
bind_links([Node], [node_workers])
|
||||
|
||||
Node.add_to_class('workers', lambda node: node.worker_set)
|
||||
|
||||
register_model_list_columns(Node, [
|
||||
{
|
||||
'name': _(u'total workers'),
|
||||
'attribute': encapsulate(lambda x: x.workers().all().count())
|
||||
},
|
||||
])
|
||||
|
||||
@@ -3,17 +3,7 @@ from __future__ import absolute_import
|
||||
from django.contrib import admin
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from .models import Node, JobQueue, JobQueueItem, Worker
|
||||
|
||||
|
||||
class WorkerInline(admin.StackedInline):
|
||||
list_display = ('name', 'creation_datetime', 'state')
|
||||
model = Worker
|
||||
|
||||
|
||||
class NodeAdmin(admin.ModelAdmin):
|
||||
list_display = ('hostname', 'cpuload', 'heartbeat', 'memory_usage')
|
||||
inlines = [WorkerInline]
|
||||
from .models import JobQueue, JobQueueItem
|
||||
|
||||
|
||||
class JobQueueItemInline(admin.StackedInline):
|
||||
@@ -30,5 +20,4 @@ class JobQueueAdmin(admin.ModelAdmin):
|
||||
total_items.short_description = _(u'total items')
|
||||
|
||||
|
||||
admin.site.register(Node, NodeAdmin)
|
||||
admin.site.register(JobQueue, JobQueueAdmin)
|
||||
|
||||
15
apps/job_processor/links.py
Normal file
15
apps/job_processor/links.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from navigation.api import Link
|
||||
|
||||
from clustering.permissions import (PERMISSION_NODES_VIEW)
|
||||
|
||||
|
||||
node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW])
|
||||
#index_setup_create = Link(text=_(u'create index'), view='index_setup_create', sprite='tab_add', permissions=[PERMISSION_DOCUMENT_INDEXING_CREATE])
|
||||
#index_setup_edit = Link(text=_(u'edit'), view='index_setup_edit', args='index.pk', sprite='tab_edit', permissions=[PERMISSION_DOCUMENT_INDEXING_EDIT])
|
||||
#index_setup_delete = Link(text=_(u'delete'), view='index_setup_delete', args='index.pk', sprite='tab_delete', permissions=[PERMISSION_DOCUMENT_INDEXING_DELETE])
|
||||
#index_setup_view = Link(text=_(u'tree template'), view='index_setup_view', args='index.pk', sprite='textfield', permissions=[PERMISSION_DOCUMENT_INDEXING_SETUP])
|
||||
#index_setup_document_types = Link(text=_(u'document types'), view='index_setup_document_types', args='index.pk', sprite='layout', permissions=[PERMISSION_DOCUMENT_INDEXING_EDIT]) # children_view_regex=[r'^index_setup', r'^template_node'])
|
||||
@@ -17,6 +17,8 @@ from django.utils.translation import ugettext
|
||||
from django.utils.simplejson import loads, dumps
|
||||
|
||||
from common.models import Singleton
|
||||
from clustering.models import Node
|
||||
|
||||
from .literals import (JOB_STATE_CHOICES, JOB_STATE_PENDING,
|
||||
JOB_STATE_PROCESSING, JOB_STATE_ERROR, WORKER_STATE_CHOICES,
|
||||
WORKER_STATE_RUNNING)
|
||||
@@ -67,38 +69,6 @@ class JobType(object):
|
||||
p.start()
|
||||
|
||||
|
||||
class NodeManager(models.Manager):
|
||||
def myself(self):
|
||||
node, created = self.model.objects.get_or_create(hostname=platform.node(), defaults={'memory_usage': 100})
|
||||
node.refresh()
|
||||
return node
|
||||
|
||||
|
||||
class Node(models.Model):
|
||||
hostname = models.CharField(max_length=255, verbose_name=_(u'hostname'))
|
||||
cpuload = models.PositiveIntegerField(blank=True, default=0, verbose_name=_(u'cpu load'))
|
||||
heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'last heartbeat check'))
|
||||
memory_usage = models.FloatField(blank=True, verbose_name=_(u'memory usage'))
|
||||
|
||||
objects = NodeManager()
|
||||
|
||||
def __unicode__(self):
|
||||
return self.hostname
|
||||
|
||||
def refresh(self):
|
||||
self.cpuload = psutil.cpu_percent()
|
||||
self.memory_usage = psutil.phymem_usage().percent
|
||||
self.save()
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
self.heartbeat = datetime.datetime.now()
|
||||
return super(Node, self).save(*args, **kwargs)
|
||||
|
||||
class Meta:
|
||||
verbose_name = _(u'node')
|
||||
verbose_name_plural = _(u'nodes')
|
||||
|
||||
|
||||
class JobQueueManager(models.Manager):
|
||||
def get_or_create(self, *args, **kwargs):
|
||||
job_queue_labels[kwargs.get('name')] = kwargs.get('defaults', {}).get('label')
|
||||
|
||||
8
apps/job_processor/permissions.py
Normal file
8
apps/job_processor/permissions.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from permissions.models import PermissionNamespace, Permission
|
||||
|
||||
namespace = PermissionNamespace('job_processor', _(u'Job processor'))
|
||||
#PERMISSION_NODES_VIEW = Permission.objects.register(namespace, 'nodes_view', _(u'View the registeres nodes in a Mayan cluster'))
|
||||
@@ -3,31 +3,24 @@ from __future__ import absolute_import
|
||||
import logging
|
||||
|
||||
from lock_manager import Lock, LockError
|
||||
from lock_manager.decorators import simple_locking
|
||||
from clustering.models import Node
|
||||
|
||||
from .models import Node, JobQueue
|
||||
from .models import JobQueue
|
||||
from .exceptions import JobQueueNoPendingJobs
|
||||
|
||||
LOCK_EXPIRE = 10
|
||||
# TODO: Tie LOCK_EXPIRATION with hard task timeout
|
||||
MAX_CPU_LOAD = 90
|
||||
MAX_MEMORY_USAGE = 90
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@simple_locking('refresh_node', 10)
|
||||
def refresh_node():
|
||||
logger.debug('starting')
|
||||
node = Node.objects.myself() # Automatically calls the refresh() method too
|
||||
|
||||
|
||||
def job_queue_poll():
|
||||
logger.debug('starting')
|
||||
|
||||
node = Node.objects.myself() # Automatically calls the refresh() method too
|
||||
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE:
|
||||
# Poll job queues is node is not overloaded
|
||||
# Poll job queues if node is not overloaded
|
||||
lock_id = u'job_queue_poll'
|
||||
try:
|
||||
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
|
||||
15
apps/job_processor/urls.py
Normal file
15
apps/job_processor/urls.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from django.conf.urls.defaults import patterns, url
|
||||
|
||||
|
||||
urlpatterns = patterns('job_processor.views',
|
||||
#url(r'^node/list/$', 'node_list', (), 'node_list'),
|
||||
url(r'^node/(?P<node_pk>\d+)/workers/$', 'node_workers', (), 'node_workers'),
|
||||
#url(r'^create/$', 'folder_create', (), 'folder_create'),
|
||||
#url(r'^(?P<folder_id>\d+)/edit/$', 'folder_edit', (), 'folder_edit'),
|
||||
#url(r'^(?P<folder_id>\d+)/delete/$', 'folder_delete', (), 'folder_delete'),
|
||||
#url(r'^(?P<folder_id>\d+)/$', 'folder_view', (), 'folder_view'),
|
||||
#url(r'^(?P<folder_id>\d+)/remove/document/multiple/$', 'folder_document_multiple_remove', (), 'folder_document_multiple_remove'),
|
||||
#url(r'^document/(?P<document_id>\d+)/folder/add/$', 'folder_add_document', (), 'folder_add_document'),
|
||||
#url(r'^document/(?P<document_id>\d+)/folder/list/$', 'document_folder_list', (), 'document_folder_list'),
|
||||
#url(r'^(?P<folder_pk>\d+)/acl/list/$', 'folder_acl_list', (), 'folder_acl_list'),
|
||||
)
|
||||
@@ -1 +1,35 @@
|
||||
# Create your views here.
|
||||
from __future__ import absolute_import
|
||||
|
||||
from django.shortcuts import render_to_response
|
||||
from django.template import RequestContext
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db.models.loading import get_model
|
||||
from django.http import Http404
|
||||
from django.core.exceptions import PermissionDenied
|
||||
|
||||
from permissions.models import Permission
|
||||
from common.utils import encapsulate
|
||||
from acls.models import AccessEntry
|
||||
from clustering.permissions import PERMISSION_NODES_VIEW
|
||||
from clustering.models import Node
|
||||
|
||||
|
||||
def node_workers(request, node_pk):
|
||||
node = get_object_or_404(Node, pk=node_pk)
|
||||
|
||||
try:
|
||||
Permission.objects.check_permissions(request.user, [PERMISSION_NODES_VIEW])
|
||||
except PermissionDenied:
|
||||
AccessEntry.objects.check_access(PERMISSION_NODES_VIEW, request.user, node)
|
||||
|
||||
context = {
|
||||
'object_list': node.workers().all(),
|
||||
'title': _(u'workers for node: %s') % node,
|
||||
'object': node,
|
||||
'hide_object': True,
|
||||
}
|
||||
|
||||
return render_to_response('generic_list.html', context,
|
||||
context_instance=RequestContext(request))
|
||||
|
||||
Reference in New Issue
Block a user