diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index eaa6339041..b66ada46cf 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -7,16 +7,21 @@ from navigation.api import bind_links, register_model_list_columns from project_tools.api import register_tool from common.utils import encapsulate -from .tasks import job_queue_poll -from .links import node_workers from clustering.models import Node +from .models import JobQueue +from .tasks import job_queue_poll +from .links import (node_workers, job_queues, tool_link, + job_queue_items_pending, job_queue_items_error, job_queue_items_active) + JOB_QUEUE_POLL_INTERVAL = 1 register_interval_job('job_queue_poll', _(u'Poll a job queue for pending jobs.'), job_queue_poll, seconds=JOB_QUEUE_POLL_INTERVAL) -#register_tool(tool_link) -#bind_links([Node, 'node_list'], [node_list], menu_name='secondary_menu') +register_tool(tool_link) +bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') +bind_links([JobQueue], [job_queue_items_pending, job_queue_items_active, job_queue_items_error]) + bind_links([Node], [node_workers]) Node.add_to_class('workers', lambda node: node.worker_set) diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index 5d6dbb44cc..222c95d04c 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -4,12 +4,15 @@ from django.utils.translation import ugettext_lazy as _ from navigation.api import Link -from clustering.permissions import (PERMISSION_NODES_VIEW) +from clustering.permissions import PERMISSION_NODES_VIEW + +from .permissions import PERMISSION_JOB_QUEUE_VIEW node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) -#index_setup_create = Link(text=_(u'create index'), view='index_setup_create', sprite='tab_add', permissions=[PERMISSION_DOCUMENT_INDEXING_CREATE]) -#index_setup_edit = Link(text=_(u'edit'), view='index_setup_edit', args='index.pk', sprite='tab_edit', permissions=[PERMISSION_DOCUMENT_INDEXING_EDIT]) -#index_setup_delete = Link(text=_(u'delete'), view='index_setup_delete', args='index.pk', sprite='tab_delete', permissions=[PERMISSION_DOCUMENT_INDEXING_DELETE]) -#index_setup_view = Link(text=_(u'tree template'), view='index_setup_view', args='index.pk', sprite='textfield', permissions=[PERMISSION_DOCUMENT_INDEXING_SETUP]) -#index_setup_document_types = Link(text=_(u'document types'), view='index_setup_document_types', args='index.pk', sprite='layout', permissions=[PERMISSION_DOCUMENT_INDEXING_EDIT]) # children_view_regex=[r'^index_setup', r'^template_node']) + +tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queues = Link(text=_(u'job queues list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) diff --git a/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py b/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py new file mode 100644 index 0000000000..77c4260b45 --- /dev/null +++ b/apps/job_processor/migrations/0004_auto__del_field_worker_name__add_field_worker_pid.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +import datetime +from south.db import db +from south.v2 import SchemaMigration +from django.db import models + + +class Migration(SchemaMigration): + + def forwards(self, orm): + # Deleting field 'Worker.name' + db.delete_column('job_processor_worker', 'name') + + # Adding field 'Worker.pid' + db.add_column('job_processor_worker', 'pid', + self.gf('django.db.models.fields.PositiveIntegerField')(default=1, max_length=255), + keep_default=False) + + + def backwards(self, orm): + + # User chose to not deal with backwards NULL issues for 'Worker.name' + raise RuntimeError("Cannot reverse this migration. 'Worker.name' and its values cannot be restored.") + # Deleting field 'Worker.pid' + db.delete_column('job_processor_worker', 'pid') + + + models = { + 'clustering.node': { + 'Meta': {'object_name': 'Node'}, + 'cpuload': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'hostname': ('django.db.models.fields.CharField', [], {'max_length': '255'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'memory_usage': ('django.db.models.fields.FloatField', [], {'default': '0.0', 'blank': 'True'}) + }, + 'job_processor.jobqueue': { + 'Meta': {'object_name': 'JobQueue'}, + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'name': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '32'}), + 'unique_jobs': ('django.db.models.fields.BooleanField', [], {'default': 'True'}) + }, + 'job_processor.jobqueueitem': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'JobQueueItem'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueue']"}), + 'job_type': ('django.db.models.fields.CharField', [], {'max_length': '32'}), + 'kwargs': ('django.db.models.fields.TextField', [], {}), + 'result': ('django.db.models.fields.TextField', [], {'blank': 'True'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'p'", 'max_length': '4'}), + 'unique_id': ('django.db.models.fields.CharField', [], {'unique': 'True', 'max_length': '64', 'blank': 'True'}) + }, + 'job_processor.worker': { + 'Meta': {'ordering': "('creation_datetime',)", 'object_name': 'Worker'}, + 'creation_datetime': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)'}), + 'heartbeat': ('django.db.models.fields.DateTimeField', [], {'default': 'datetime.datetime(2012, 7, 30, 0, 0)', 'blank': 'True'}), + 'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), + 'job_queue_item': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['job_processor.JobQueueItem']"}), + 'node': ('django.db.models.fields.related.ForeignKey', [], {'to': "orm['clustering.Node']"}), + 'pid': ('django.db.models.fields.PositiveIntegerField', [], {'max_length': '255'}), + 'state': ('django.db.models.fields.CharField', [], {'default': "'r'", 'max_length': '4'}) + } + } + + complete_apps = ['job_processor'] \ No newline at end of file diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 72a9142c51..ed10c670bb 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -35,7 +35,7 @@ class Job(object): # Run sync or launch async subprocess # OR launch 2 processes: monitor & actual process node = Node.objects.myself() - worker = Worker.objects.create(node=node, name=os.getpid(), job_queue_item=job_queue_item) + worker = Worker.objects.create(node=node, pid=os.getpid(), job_queue_item=job_queue_item) try: close_connection() transaction.commit_on_success(function)(**loads(job_queue_item.kwargs)) @@ -62,6 +62,9 @@ class JobType(object): self.function = function job_types_registry[self.name] = self + def __unicode__(self): + return unicode(self.label) + def run(self, job_queue_item, **kwargs): job_queue_item.state = JOB_STATE_PROCESSING job_queue_item.save() @@ -111,6 +114,14 @@ class JobQueue(models.Model): @property def pending_jobs(self): return self.items.filter(state=JOB_STATE_PENDING) + + @property + def error_jobs(self): + return self.items.filter(state=JOB_STATE_ERROR) + + @property + def active_jobs(self): + return self.items.filter(state=JOB_STATE_PROCESSING) @property def items(self): @@ -161,9 +172,19 @@ class JobQueueItem(models.Model): # TODO: Maybe replace instead of rasining exception w/ replace flag raise JobQueuePushError + def get_job_type(self): + return job_types_registry.get(self.job_type) + def run(self): - job_type_instance = job_types_registry.get(self.job_type) + job_type_instance = self.get_job_type() job_type_instance.run(self) + + @property + def worker(self): + try: + return self.worker_set.get() + except Worker.DoesNotExist: + return None class Meta: ordering = ('creation_datetime',) @@ -173,7 +194,7 @@ class JobQueueItem(models.Model): class Worker(models.Model): node = models.ForeignKey(Node, verbose_name=_(u'node')) - name = models.CharField(max_length=255, verbose_name=_(u'name')) + pid = models.PositiveIntegerField(max_length=255, verbose_name=_(u'name')) creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False) heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check')) state = models.CharField(max_length=4, @@ -183,7 +204,7 @@ class Worker(models.Model): job_queue_item = models.ForeignKey(JobQueueItem, verbose_name=_(u'job queue item')) def __unicode__(self): - return u'%s-%s' % (self.node.hostname, self.name) + return u'%s-%s' % (self.node.hostname, self.pid) #def disable(self): # if self.state == WORKER_STATE_DISABLED: @@ -211,7 +232,6 @@ class Worker(models.Model): class JobProcessingConfig(Singleton): worker_time_to_live = models.PositiveInteger(verbose_name=(u'time to live (in seconds)') # After this time a worker is considered dead worker_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') - node_heartbeat_interval = models.PositiveInteger(verbose_name=(u'heartbeat interval') def __unicode__(self): return ugettext('Workers configuration') diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index 4b5988c48d..74c06b609a 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -5,4 +5,4 @@ from django.utils.translation import ugettext_lazy as _ from permissions.models import PermissionNamespace, Permission namespace = PermissionNamespace('job_processor', _(u'Job processor')) -#PERMISSION_NODES_VIEW = Permission.objects.register(namespace, 'nodes_view', _(u'View the registeres nodes in a Mayan cluster')) +PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) diff --git a/apps/job_processor/static/images/icons/hourglass.png b/apps/job_processor/static/images/icons/hourglass.png new file mode 100755 index 0000000000..cd14446b9f Binary files /dev/null and b/apps/job_processor/static/images/icons/hourglass.png differ diff --git a/apps/job_processor/tasks.py b/apps/job_processor/tasks.py index 32280c80a9..fac91c03d3 100644 --- a/apps/job_processor/tasks.py +++ b/apps/job_processor/tasks.py @@ -18,7 +18,7 @@ logger = logging.getLogger(__name__) def job_queue_poll(): logger.debug('starting') - node = Node.objects.myself() # Automatically calls the refresh() method too + node = Node.objects.myself() if node.cpuload < MAX_CPU_LOAD and node.memory_usage < MAX_MEMORY_USAGE: # Poll job queues if node is not overloaded lock_id = u'job_queue_poll' @@ -37,4 +37,7 @@ def job_queue_poll(): except JobQueueNoPendingJobs: logger.debug('no pending jobs for job queue: %s' % job_queue) lock.release() + else: + logger.debug('CPU load or memory usage over limit') + diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index 572d8ce103..d932d04fdc 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -2,14 +2,9 @@ from django.conf.urls.defaults import patterns, url urlpatterns = patterns('job_processor.views', - #url(r'^node/list/$', 'node_list', (), 'node_list'), url(r'^node/(?P\d+)/workers/$', 'node_workers', (), 'node_workers'), - #url(r'^create/$', 'folder_create', (), 'folder_create'), - #url(r'^(?P\d+)/edit/$', 'folder_edit', (), 'folder_edit'), - #url(r'^(?P\d+)/delete/$', 'folder_delete', (), 'folder_delete'), - #url(r'^(?P\d+)/$', 'folder_view', (), 'folder_view'), - #url(r'^(?P\d+)/remove/document/multiple/$', 'folder_document_multiple_remove', (), 'folder_document_multiple_remove'), - #url(r'^document/(?P\d+)/folder/add/$', 'folder_add_document', (), 'folder_add_document'), - #url(r'^document/(?P\d+)/folder/list/$', 'document_folder_list', (), 'document_folder_list'), - #url(r'^(?P\d+)/acl/list/$', 'folder_acl_list', (), 'folder_acl_list'), + url(r'^queue/list/$', 'job_queues', (), 'job_queues'), + url(r'^queue/(?P\d+)/items/pending/$', 'job_queue_items', {'pending_jobs': True}, 'job_queue_items_pending'), + url(r'^queue/(?P\d+)/items/error/$', 'job_queue_items', {'error_jobs' :True}, 'job_queue_items_error'), + url(r'^queue/(?P\d+)/items/active/$', 'job_queue_items', {'active_jobs' :True}, 'job_queue_items_active'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index b1b5461c0b..101697d46a 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -8,13 +8,15 @@ from django.contrib.contenttypes.models import ContentType from django.db.models.loading import get_model from django.http import Http404 from django.core.exceptions import PermissionDenied - from permissions.models import Permission from common.utils import encapsulate from acls.models import AccessEntry from clustering.permissions import PERMISSION_NODES_VIEW from clustering.models import Node +from .models import JobQueue +from .permissions import PERMISSION_JOB_QUEUE_VIEW + def node_workers(request, node_pk): node = get_object_or_404(Node, pk=node_pk) @@ -28,7 +30,7 @@ def node_workers(request, node_pk): 'object_list': node.workers().all(), 'title': _(u'workers for node: %s') % node, 'object': node, - 'hide_links': True, + 'hide_link': True, 'extra_columns': [ { 'name': _(u'created'), @@ -48,7 +50,11 @@ def node_workers(request, node_pk): }, { 'name': _(u'job type'), - 'attribute': 'job_queue_item.job_type', + 'attribute': 'job_queue_item.get_job_type', + }, + { + 'name': _(u'job queue'), + 'attribute': 'job_queue_item.job_queue', }, ], } @@ -57,11 +63,91 @@ def node_workers(request, node_pk): context_instance=RequestContext(request)) - node = models.ForeignKey(Node, verbose_name=_(u'node')) - name = models.CharField(max_length=255, verbose_name=_(u'name')) - creation_datetime = models.DateTimeField(verbose_name=_(u'creation datetime'), default=lambda: datetime.datetime.now(), editable=False) - heartbeat = models.DateTimeField(blank=True, default=datetime.datetime.now(), verbose_name=_(u'heartbeat check')) - stat#e = models.CharField(max_length=4, - #choices=WORKER_STATE_CHOICES, - #default=WORKER_STATE_RUNNING, - #verbose_name=_(u'state')) +def job_queues(request): + # TODO: permissiong list filtering + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW]) + + context = { + 'object_list': JobQueue.objects.all(), + 'title': _(u'job queue'), + 'hide_link': True, + 'extra_columns': [ + { + 'name': _(u'pending jobs'), + 'attribute': 'pending_jobs.count', + }, + { + 'name': _(u'active jobs'), + 'attribute': 'active_jobs.count', + }, + { + 'name': _(u'error jobs'), + 'attribute': 'error_jobs.count', + }, + ], + } + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request)) + + +def job_queue_items(request, job_queue_pk, pending_jobs=False, error_jobs=False, active_jobs=False): + job_queue = get_object_or_404(JobQueue, pk=job_queue_pk) + + try: + Permission.objects.check_permissions(request.user, [PERMISSION_JOB_QUEUE_VIEW]) + except PermissionDenied: + AccessEntry.objects.check_access(PERMISSION_JOB_QUEUE_VIEW, request.user, job_queue) + + jobs = set() + if pending_jobs: + jobs = job_queue.pending_jobs.all() + title = _(u'pending jobs for queue: %s') % job_queue + + if error_jobs: + jobs = job_queue.error_jobs.all() + title = _(u'error jobs for queue: %s') % job_queue + + if active_jobs: + jobs = job_queue.active_jobs.all() + title = _(u'active jobs for queue: %s') % job_queue + + context = { + 'object_list': jobs, + 'title': title, + 'object': job_queue, + 'hide_link': True, + 'extra_columns': [ + { + 'name': _(u'created'), + 'attribute': 'creation_datetime', + }, + { + 'name': _(u'job type'), + 'attribute': 'get_job_type', + }, + { + 'name': _(u'arguments'), + 'attribute': 'kwargs', + }, + ], + } + + if active_jobs: + context['extra_columns'].append( + { + 'name': _(u'worker'), + 'attribute': encapsulate(lambda x: x.worker or _(u'Unknown')), + } + ) + + if error_jobs: + context['extra_columns'].append( + { + 'name': _(u'result'), + 'attribute': 'result', + } + ) + + return render_to_response('generic_list.html', context, + context_instance=RequestContext(request))