Add initial support for deleting pending jobs and requeuing failed jobs

This commit is contained in:
Roberto Rosario
2012-08-03 13:35:06 -04:00
parent d7ddbc2c62
commit b812a68777
8 changed files with 111 additions and 13 deletions

View File

@@ -15,11 +15,12 @@ from common.utils import encapsulate
from clustering.models import Node
from clustering.signals import node_died
from .models import JobQueue, JobProcessingConfig
from .models import JobQueue, JobProcessingConfig, JobQueueItem
from .tasks import job_queue_poll
from .links import (node_workers, job_queues, tool_link,
job_queue_items_pending, job_queue_items_error, job_queue_items_active,
job_queue_config_edit, setup_link, job_queue_start, job_queue_stop)
job_queue_config_edit, setup_link, job_queue_start, job_queue_stop,
job_requeue, job_delete)
logger = logging.getLogger(__name__)
@@ -42,6 +43,7 @@ bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu')
bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error])
bind_links([Node], [node_workers])
bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu')
bind_links([JobQueueItem], [job_requeue, job_delete])
Node.add_to_class('workers', lambda node: node.worker_set)

View File

@@ -7,25 +7,38 @@ from navigation.api import Link
from clustering.permissions import PERMISSION_NODES_VIEW
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP)
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP,
PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE)
def is_running(context):
return context['object'].is_running()
def is_not_running(context):
return not context['object'].is_running()
def is_in_error_state(context):
return context['object'].is_in_error_state
def is_in_pending_state(context):
return context['object'].is_in_pending_state
node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW])
tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='cog_error', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW])
job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', sprite='control_play_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running)
job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', sprite='control_stop_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running)
job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION])
job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', sprite='cog_add', permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state)
job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', sprite='cog_delete', permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state)

View File

@@ -180,12 +180,13 @@ class JobQueueItem(models.Model):
return self.unique_id
def save(self, *args, **kwargs):
self.creation_datetime = datetime.datetime.now()
if not self.pk:
self.creation_datetime = datetime.datetime.now()
if self.job_queue.unique_jobs:
self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest()
else:
self.unique_id = unicode(uuid.uuid4())
if self.job_queue.unique_jobs:
self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest()
else:
self.unique_id = unicode(uuid.uuid4())
try:
super(JobQueueItem, self).save(*args, **kwargs)
except IntegrityError:
@@ -206,6 +207,20 @@ class JobQueueItem(models.Model):
except Worker.DoesNotExist:
return None
@property
def is_in_error_state(self):
return self.state == JOB_STATE_ERROR
@property
def is_in_pending_state(self):
return self.state == JOB_STATE_PENDING
def requeue(self):
if self.is_in_error_state:
self.state = JOB_STATE_PENDING
self.creation_datetime = datetime.datetime.now()
self.save()
class Meta:
ordering = ('creation_datetime',)
verbose_name = _(u'job queue item')

View File

@@ -8,3 +8,5 @@ namespace = PermissionNamespace('job_processor', _(u'Job processor'))
PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster'))
PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster'))
PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster'))
PERMISSION_JOB_REQUEUE = Permission.objects.register(namespace, 'job_requeue', _(u'Requeue a job in a Mayan cluster'))
PERMISSION_JOB_DELETE = Permission.objects.register(namespace, 'job_delete', _(u'Delete a pending job in a Mayan cluster'))

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

View File

@@ -11,4 +11,7 @@ urlpatterns = patterns('job_processor.views',
url(r'^queue/(?P<job_queue_pk>\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'),
url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'),
url(r'^job/(?P<job_item_pk>\d+)/requeue/$', 'job_requeue', (), 'job_requeue'),
url(r'^job/(?P<job_item_pk>\d+)/delete/$', 'job_delete', (), 'job_delete'),
)

View File

@@ -15,9 +15,10 @@ from permissions.models import Permission
from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted
from .forms import JobProcessingConfigForm
from .models import JobQueue, JobProcessingConfig
from .models import JobQueue, JobProcessingConfig, JobQueueItem
from .permissions import (PERMISSION_JOB_QUEUE_VIEW,
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP)
PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP,
PERMISSION_JOB_REQUEUE)
def node_workers(request, node_pk):
@@ -250,3 +251,65 @@ def job_queue_start(request, job_queue_pk):
'previous': previous,
'form_icon': u'control_play_blue.png',
}, context_instance=RequestContext(request))
def job_requeue(request, job_item_pk):
job = get_object_or_404(JobQueueItem, pk=job_item_pk)
#try:
# Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE])
#except PermissionDenied:
# AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue)
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
if request.method == 'POST':
#try:
job.requeue()
#except JobQueueAlreadyStarted:
# messages.warning(request, _(u'job ueue already started.'))
# return HttpResponseRedirect(previous)
#else:
messages.success(request, _(u'Job requeue successfully.'))
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'object': job,
'object_name': _(u'job'),
'title': _(u'Are you sure you wish to requeue job: %s?') % job,
'next': next,
'previous': previous,
'form_icon': u'cog_add.png',
}, context_instance=RequestContext(request))
def job_delete(request, job_item_pk):
job = get_object_or_404(JobQueueItem, pk=job_item_pk)
#try:
# Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE])
#except PermissionDenied:
# AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue)
next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None)))
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None)))
if request.method == 'POST':
try:
job.delete()
except Exception, exc:
messages.warning(request, _(u'Error deleting job; %s.') % exc)
return HttpResponseRedirect(previous)
else:
messages.success(request, _(u'Job deleted successfully.'))
return HttpResponseRedirect(next)
return render_to_response('generic_confirm.html', {
'object': job,
'object_name': _(u'job'),
'title': _(u'Are you sure you wish to delete job: %s?') % job,
'next': next,
'previous': previous,
'form_icon': u'cog_delete.png',
}, context_instance=RequestContext(request))