diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index a9b2542cb0..7ac46626af 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -15,11 +15,12 @@ from common.utils import encapsulate from clustering.models import Node from clustering.signals import node_died -from .models import JobQueue, JobProcessingConfig +from .models import JobQueue, JobProcessingConfig, JobQueueItem from .tasks import job_queue_poll from .links import (node_workers, job_queues, tool_link, job_queue_items_pending, job_queue_items_error, job_queue_items_active, - job_queue_config_edit, setup_link, job_queue_start, job_queue_stop) + job_queue_config_edit, setup_link, job_queue_start, job_queue_stop, + job_requeue, job_delete) logger = logging.getLogger(__name__) @@ -42,6 +43,7 @@ bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error]) bind_links([Node], [node_workers]) bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') +bind_links([JobQueueItem], [job_requeue, job_delete]) Node.add_to_class('workers', lambda node: node.worker_set) diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index b17ee06312..05ce923f7f 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -7,25 +7,38 @@ from navigation.api import Link from clustering.permissions import PERMISSION_NODES_VIEW from .permissions import (PERMISSION_JOB_QUEUE_VIEW, - PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, + PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE) def is_running(context): return context['object'].is_running() + def is_not_running(context): return not context['object'].is_running() +def is_in_error_state(context): + return context['object'].is_in_error_state + + +def is_in_pending_state(context): + return context['object'].is_in_pending_state + + node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='cog_error', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', sprite='control_play_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running) job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', sprite='control_stop_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running) job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) + +job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', sprite='cog_add', permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state) +job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', sprite='cog_delete', permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state) diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index b9a5b0acb8..d27e1629b4 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -180,12 +180,13 @@ class JobQueueItem(models.Model): return self.unique_id def save(self, *args, **kwargs): - self.creation_datetime = datetime.datetime.now() + if not self.pk: + self.creation_datetime = datetime.datetime.now() + if self.job_queue.unique_jobs: + self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() + else: + self.unique_id = unicode(uuid.uuid4()) - if self.job_queue.unique_jobs: - self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() - else: - self.unique_id = unicode(uuid.uuid4()) try: super(JobQueueItem, self).save(*args, **kwargs) except IntegrityError: @@ -206,6 +207,20 @@ class JobQueueItem(models.Model): except Worker.DoesNotExist: return None + @property + def is_in_error_state(self): + return self.state == JOB_STATE_ERROR + + @property + def is_in_pending_state(self): + return self.state == JOB_STATE_PENDING + + def requeue(self): + if self.is_in_error_state: + self.state = JOB_STATE_PENDING + self.creation_datetime = datetime.datetime.now() + self.save() + class Meta: ordering = ('creation_datetime',) verbose_name = _(u'job queue item') diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index 4ae086c630..bf1544a702 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -8,3 +8,5 @@ namespace = PermissionNamespace('job_processor', _(u'Job processor')) PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster')) PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster')) +PERMISSION_JOB_REQUEUE = Permission.objects.register(namespace, 'job_requeue', _(u'Requeue a job in a Mayan cluster')) +PERMISSION_JOB_DELETE = Permission.objects.register(namespace, 'job_delete', _(u'Delete a pending job in a Mayan cluster')) diff --git a/apps/job_processor/static/images/icons/cog_add.png b/apps/job_processor/static/images/icons/cog_add.png new file mode 100644 index 0000000000..b79b69d051 Binary files /dev/null and b/apps/job_processor/static/images/icons/cog_add.png differ diff --git a/apps/job_processor/static/images/icons/cog_delete.png b/apps/job_processor/static/images/icons/cog_delete.png new file mode 100644 index 0000000000..924e04d69c Binary files /dev/null and b/apps/job_processor/static/images/icons/cog_delete.png differ diff --git a/apps/job_processor/urls.py b/apps/job_processor/urls.py index 75b38fe3e2..9ec28f627d 100644 --- a/apps/job_processor/urls.py +++ b/apps/job_processor/urls.py @@ -11,4 +11,7 @@ urlpatterns = patterns('job_processor.views', url(r'^queue/(?P\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'), url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'), + + url(r'^job/(?P\d+)/requeue/$', 'job_requeue', (), 'job_requeue'), + url(r'^job/(?P\d+)/delete/$', 'job_delete', (), 'job_delete'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 5feb983311..4d802e9b93 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -15,9 +15,10 @@ from permissions.models import Permission from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted from .forms import JobProcessingConfigForm -from .models import JobQueue, JobProcessingConfig +from .models import JobQueue, JobProcessingConfig, JobQueueItem from .permissions import (PERMISSION_JOB_QUEUE_VIEW, - PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, + PERMISSION_JOB_REQUEUE) def node_workers(request, node_pk): @@ -250,3 +251,65 @@ def job_queue_start(request, job_queue_pk): 'previous': previous, 'form_icon': u'control_play_blue.png', }, context_instance=RequestContext(request)) + + +def job_requeue(request, job_item_pk): + job = get_object_or_404(JobQueueItem, pk=job_item_pk) + + #try: + # Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE]) + #except PermissionDenied: + # AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + #try: + job.requeue() + #except JobQueueAlreadyStarted: + # messages.warning(request, _(u'job ueue already started.')) + # return HttpResponseRedirect(previous) + #else: + messages.success(request, _(u'Job requeue successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job, + 'object_name': _(u'job'), + 'title': _(u'Are you sure you wish to requeue job: %s?') % job, + 'next': next, + 'previous': previous, + 'form_icon': u'cog_add.png', + }, context_instance=RequestContext(request)) + + +def job_delete(request, job_item_pk): + job = get_object_or_404(JobQueueItem, pk=job_item_pk) + + #try: + # Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE]) + #except PermissionDenied: + # AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + job.delete() + except Exception, exc: + messages.warning(request, _(u'Error deleting job; %s.') % exc) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'Job deleted successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job, + 'object_name': _(u'job'), + 'title': _(u'Are you sure you wish to delete job: %s?') % job, + 'next': next, + 'previous': previous, + 'form_icon': u'cog_delete.png', + }, context_instance=RequestContext(request))