Add support for terminating local workers

This commit is contained in:
Roberto Rosario
2012-08-03 17:38:38 -04:00
parent ac60654cc9
commit 384209d2aa
7 changed files with 66 additions and 18 deletions

View File

@@ -23,7 +23,7 @@ from .tasks import job_queue_poll, house_keeping
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
job_queue_config_edit, setup_link, job_queue_start, job_queue_stop,
job_requeue, job_delete)
job_requeue, job_delete, worker_terminate)
logger = logging.getLogger(__name__)
@@ -48,6 +48,7 @@ bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending
bind_links([Node], [node_workers])
bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu')
bind_links([JobQueueItem], [job_requeue, job_delete])
bind_links([Worker], [worker_terminate])
Node.add_to_class('workers', lambda node: node.worker_set)

View File

@@ -8,7 +8,7 @@ from clustering.permissions import PERMISSION_NODES_VIEW
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP,
PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE)
PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE, PERMISSION_WORKER_TERMINATE)
def is_running(context):
@@ -42,3 +42,5 @@ setup_link = Link(text=_(u'job processing configuration'), view='job_queue_confi
job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', sprite='cog_add', permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state)
job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', sprite='cog_delete', permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state)
worker_terminate = Link(text=_(u'terminate worker'), view='worker_terminate', args='object.pk', sprite='lorry_delete', permissions=[PERMISSION_WORKER_TERMINATE])

View File

@@ -262,6 +262,15 @@ class Worker(models.Model):
def __unicode__(self):
return u'%s-%s' % (self.node.hostname, self.pid)
def terminate(self):
try:
process = psutil.Process(int(self.pid))
except psutil.error.NoSuchProcess:
# Process must have finished before reaching this point
return
else:
process.terminate()
class Meta:
ordering = ('creation_datetime',)
verbose_name = _(u'worker')

View File

@@ -10,3 +10,4 @@ PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace,
PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster'))
PERMISSION_JOB_REQUEUE = Permission.objects.register(namespace, 'job_requeue', _(u'Requeue a job in a Mayan cluster'))
PERMISSION_JOB_DELETE = Permission.objects.register(namespace, 'job_delete', _(u'Delete a pending job in a Mayan cluster'))
PERMISSION_WORKER_TERMINATE = Permission.objects.register(namespace, 'worker_terminate', _(u'Terminate a worker processing a job in a Mayan cluster'))

View File

@@ -13,6 +13,7 @@ from .literals import JOB_QUEUE_STATE_STARTED
LOCK_EXPIRE = 10
MAX_CPU_LOAD = 90.0
MAX_MEMORY_USAGE = 90.0
NODE_MAX_WORKERS = 1
logger = logging.getLogger(__name__)
@@ -20,27 +21,28 @@ logger = logging.getLogger(__name__)
def job_queue_poll():
logger.debug('starting')
node = Node.objects.myself()
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE:
# Poll job queues if node is not overloaded
lock_id = u'job_queue_poll'
try:
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
except LockError:
pass
except Exception:
lock.release()
raise
else:
# Poll job queues if node is not overloaded
lock_id = u'job_queue_poll'
try:
lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE)
except LockError:
pass
except Exception:
lock.release()
raise
else:
node = Node.objects.myself()
if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count()<=NODE_MAX_WORKERS:
for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED):
try:
job_item = job_queue.get_oldest_pending_job()
job_item.run()
except JobQueueNoPendingJobs:
logger.debug('no pending jobs for job queue: %s' % job_queue)
lock.release()
else:
logger.debug('CPU load or memory usage over limit')
else:
logger.debug('CPU load or memory usage over limit')
lock.release()
@simple_locking('house_keeping', 10)

View File

@@ -14,4 +14,6 @@ urlpatterns = patterns('job_processor.views',
url(r'^job/(?P<job_item_pk>\d+)/requeue/$', 'job_requeue', (), 'job_requeue'),
url(r'^job/(?P<job_item_pk>\d+)/delete/$', 'job_delete', (), 'job_delete'),
url(r'^worker/(?P<worker_pk>\d+)/terminate/$', 'worker_terminate', (), 'worker_terminate'),
)

View File

@@ -15,7 +15,7 @@ from permissions.models import Permission
from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted
from .forms import JobProcessingConfigForm
from .models import JobQueue, JobProcessingConfig, JobQueueItem
from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP,
PERMISSION_JOB_REQUEUE)
@@ -313,3 +313,34 @@ def job_delete(request, job_item_pk):
'previous': previous,
'form_icon': u'cog_delete.png',
}, context_instance=RequestContext(request))
def worker_terminate(request, worker_pk):
worker = get_object_or_404(Worker, pk=worker_pk)
#try:
# Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE])
#except PermissionDenied:
# AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue)
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
if request.method == 'POST':
try:
worker.terminate()
except Exception, exc:
messages.warning(request, _(u'Error terminating worker; %s.') % exc)
return HttpResponseRedirect(previous)
else:
messages.success(request, _(u'Worker terminated successfully.'))
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'object': worker,
'object_name': _(u'worker'),
'title': _(u'Are you sure you wish to terminate worker: %s?') % worker,
'next': next,
'previous': previous,
'form_icon': u'lorry_delete.png',
}, context_instance=RequestContext(request))