diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index 5fd9ef110c..cb2fa4b4d6 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -23,7 +23,7 @@ from .tasks import job_queue_poll, house_keeping from .links import (node_workers, job_queues, tool_link, job_queue_items_pending, job_queue_items_error, job_queue_items_active, job_queue_config_edit, setup_link, job_queue_start, job_queue_stop, - job_requeue, job_delete) + job_requeue, job_delete, worker_terminate) logger = logging.getLogger(__name__) @@ -48,6 +48,7 @@ bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending bind_links([Node], [node_workers]) bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') bind_links([JobQueueItem], [job_requeue, job_delete]) +bind_links([Worker], [worker_terminate]) Node.add_to_class('workers', lambda node: node.worker_set) diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index 05ce923f7f..7c12e04de1 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -8,7 +8,7 @@ from clustering.permissions import PERMISSION_NODES_VIEW from .permissions import (PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, - PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE) + PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE, PERMISSION_WORKER_TERMINATE) def is_running(context): @@ -42,3 +42,5 @@ setup_link = Link(text=_(u'job processing configuration'), view='job_queue_confi job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', sprite='cog_add', permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state) job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', sprite='cog_delete', permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state) + +worker_terminate = Link(text=_(u'terminate worker'), view='worker_terminate', args='object.pk', sprite='lorry_delete', permissions=[PERMISSION_WORKER_TERMINATE]) diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 70e9986f6d..0bfdc069e3 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -262,6 +262,15 @@ class Worker(models.Model): def __unicode__(self): return u'%s-%s' % (self.node.hostname, self.pid) + def terminate(self): + try: + process = psutil.Process(int(self.pid)) + except psutil.error.NoSuchProcess: + # Process must have finished before reaching this point + return + else: + process.terminate() + class Meta: ordering = ('creation_datetime',) verbose_name = _(u'worker') diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index bf1544a702..e8e26d61d8 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -10,3 +10,4 @@ PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster')) PERMISSION_JOB_REQUEUE = Permission.objects.register(namespace, 'job_requeue', _(u'Requeue a job in a Mayan cluster')) PERMISSION_JOB_DELETE = Permission.objects.register(namespace, 'job_delete', _(u'Delete a pending job in a Mayan cluster')) +PERMISSION_WORKER_TERMINATE = Permission.objects.register(namespace, 'worker_terminate', _(u'Terminate a worker processing a job in a Mayan cluster')) diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py index 954fb7b0cc..b3794d6970 100644 --- a/apps/job_processor/tasks.py +++ b/apps/job_processor/tasks.py @@ -13,6 +13,7 @@ from .literals import JOB_QUEUE_STATE_STARTED LOCK_EXPIRE = 10 MAX_CPU_LOAD = 90.0 MAX_MEMORY_USAGE = 90.0 +NODE_MAX_WORKERS = 1 logger = logging.getLogger(__name__) @@ -20,27 +21,28 @@ logger = logging.getLogger(__name__) def job_queue_poll(): logger.debug('starting') - node = Node.objects.myself() - if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE: - # Poll job queues if node is not overloaded - lock_id = u'job_queue_poll' - try: - lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) - except LockError: - pass - except Exception: - lock.release() - raise - else: + # Poll job queues if node is not overloaded + lock_id = u'job_queue_poll' + try: + lock = Lock.acquire_lock(lock_id, LOCK_EXPIRE) + except LockError: + pass + except Exception: + lock.release() + raise + else: + node = Node.objects.myself() + if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE and node.worker_set.count()<=NODE_MAX_WORKERS: for job_queue in JobQueue.objects.filter(state=JOB_QUEUE_STATE_STARTED): try: job_item = job_queue.get_oldest_pending_job() job_item.run() except JobQueueNoPendingJobs: logger.debug('no pending jobs for job queue: %s' % job_queue) - lock.release() - else: - logger.debug('CPU load or memory usage over limit') + else: + logger.debug('CPU load or memory usage over limit') + + lock.release() @simple_locking('house_keeping', 10) diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index 9ec28f627d..83cb4716dc 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -14,4 +14,6 @@ urlpatterns = patterns('job_processor.views', url(r'^job/(?P\d+)/requeue/$', 'job_requeue', (), 'job_requeue'), url(r'^job/(?P\d+)/delete/$', 'job_delete', (), 'job_delete'), + + url(r'^worker/(?P\d+)/terminate/$', 'worker_terminate', (), 'worker_terminate'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 4d802e9b93..f3b0e5a8ce 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -15,7 +15,7 @@ from permissions.models import Permission from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted from .forms import JobProcessingConfigForm -from .models import JobQueue, JobProcessingConfig, JobQueueItem +from .models import JobQueue, JobProcessingConfig, JobQueueItem, Worker from .permissions import (PERMISSION_JOB_QUEUE_VIEW, PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, PERMISSION_JOB_REQUEUE) @@ -313,3 +313,34 @@ def job_delete(request, job_item_pk): 'previous': previous, 'form_icon': u'cog_delete.png', }, context_instance=RequestContext(request)) + + +def worker_terminate(request, worker_pk): + worker = get_object_or_404(Worker, pk=worker_pk) + + #try: + # Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE]) + #except PermissionDenied: + # AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + worker.terminate() + except Exception, exc: + messages.warning(request, _(u'Error terminating worker; %s.') % exc) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'Worker terminated successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': worker, + 'object_name': _(u'worker'), + 'title': _(u'Are you sure you wish to terminate worker: %s?') % worker, + 'next': next, + 'previous': previous, + 'form_icon': u'lorry_delete.png', + }, context_instance=RequestContext(request))