From b812a6877791c261936bfa41d01c2b743704d9d2 Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Fri, 3 Aug 2012 13:35:06 -0400 Subject: [PATCH] Add initial support for deleting pending jobs and requeuing failed jobs --- apps/job_processor/__init__.py | 6 +- apps/job_processor/links.py | 21 ++++-- apps/job_processor/models.py | 25 +++++-- apps/job_processor/permissions.py | 2 + .../static/images/icons/cog_add.png | Bin 0 -> 2266 bytes .../static/images/icons/cog_delete.png | Bin 0 -> 2273 bytes apps/job_processor/urls.py | 3 + apps/job_processor/views.py | 67 +++++++++++++++++- 8 files changed, 111 insertions(+), 13 deletions(-) create mode 100644 apps/job_processor/static/images/icons/cog_add.png create mode 100644 apps/job_processor/static/images/icons/cog_delete.png diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index a9b2542cb0..7ac46626af 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -15,11 +15,12 @@ from common.utils import encapsulate from clustering.models import Node from clustering.signals import node_died -from .models import JobQueue, JobProcessingConfig +from .models import JobQueue, JobProcessingConfig, JobQueueItem from .tasks import job_queue_poll from .links import (node_workers, job_queues, tool_link, job_queue_items_pending, job_queue_items_error, job_queue_items_active, - job_queue_config_edit, setup_link, job_queue_start, job_queue_stop) + job_queue_config_edit, setup_link, job_queue_start, job_queue_stop, + job_requeue, job_delete) logger = logging.getLogger(__name__) @@ -42,6 +43,7 @@ bind_links([JobQueue, 'job_queues'], [job_queues], menu_name='secondary_menu') bind_links([JobQueue], [job_queue_start, job_queue_stop, job_queue_items_pending, job_queue_items_active, job_queue_items_error]) bind_links([Node], [node_workers]) bind_links(['job_queue_config_edit'], [job_queue_config_edit], menu_name='secondary_menu') +bind_links([JobQueueItem], [job_requeue, job_delete]) Node.add_to_class('workers', lambda node: node.worker_set) diff --git a/apps/job_processor/links.py b/apps/job_processor/links.py index b17ee06312..05ce923f7f 100644 --- a/apps/job_processor/links.py +++ b/apps/job_processor/links.py @@ -7,25 +7,38 @@ from navigation.api import Link from clustering.permissions import PERMISSION_NODES_VIEW from .permissions import (PERMISSION_JOB_QUEUE_VIEW, - PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, + PERMISSION_JOB_REQUEUE, PERMISSION_JOB_DELETE) def is_running(context): return context['object'].is_running() + def is_not_running(context): return not context['object'].is_running() +def is_in_error_state(context): + return context['object'].is_in_error_state + + +def is_in_pending_state(context): + return context['object'].is_in_pending_state + + node_workers = Link(text=_(u'workers'), view='node_workers', args='object.pk', sprite='lorry_go', permissions=[PERMISSION_NODES_VIEW]) tool_link = Link(text=_(u'job queues'), view='job_queues', icon='hourglass.png', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queues = Link(text=_(u'job queue list'), view='job_queues', sprite='hourglass', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) -job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='text_list_bullets', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_pending = Link(text=_(u'pending jobs'), view='job_queue_items_pending', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_error = Link(text=_(u'error jobs'), view='job_queue_items_error', args='object.pk', sprite='cog_error', permissions=[PERMISSION_JOB_QUEUE_VIEW]) +job_queue_items_active = Link(text=_(u'active jobs'), view='job_queue_items_active', args='object.pk', sprite='cog', permissions=[PERMISSION_JOB_QUEUE_VIEW]) job_queue_start = Link(text=_(u'start'), view='job_queue_start', args='object.pk', sprite='control_play_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_not_running) job_queue_stop = Link(text=_(u'stop'), view='job_queue_stop', args='object.pk', sprite='control_stop_blue', permissions=[PERMISSION_JOB_QUEUE_START_STOP], condition=is_running) job_queue_config_edit = Link(text=_(u'edit job processing configuration'), view='job_queue_config_edit', sprite='hourglass', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) setup_link = Link(text=_(u'job processing configuration'), view='job_queue_config_edit', icon='hourglass.png', permissions=[PERMISSION_JOB_PROCESSING_CONFIGURATION]) + +job_requeue = Link(text=_(u'requeue job'), view='job_requeue', args='object.pk', sprite='cog_add', permissions=[PERMISSION_JOB_REQUEUE], condition=is_in_error_state) +job_delete = Link(text=_(u'delete job'), view='job_delete', args='object.pk', sprite='cog_delete', permissions=[PERMISSION_JOB_DELETE], condition=is_in_pending_state) diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index b9a5b0acb8..d27e1629b4 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -180,12 +180,13 @@ class JobQueueItem(models.Model): return self.unique_id def save(self, *args, **kwargs): - self.creation_datetime = datetime.datetime.now() + if not self.pk: + self.creation_datetime = datetime.datetime.now() + if self.job_queue.unique_jobs: + self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() + else: + self.unique_id = unicode(uuid.uuid4()) - if self.job_queue.unique_jobs: - self.unique_id = hashlib.sha256(u'%s-%s' % (self.job_type, self.kwargs)).hexdigest() - else: - self.unique_id = unicode(uuid.uuid4()) try: super(JobQueueItem, self).save(*args, **kwargs) except IntegrityError: @@ -206,6 +207,20 @@ class JobQueueItem(models.Model): except Worker.DoesNotExist: return None + @property + def is_in_error_state(self): + return self.state == JOB_STATE_ERROR + + @property + def is_in_pending_state(self): + return self.state == JOB_STATE_PENDING + + def requeue(self): + if self.is_in_error_state: + self.state = JOB_STATE_PENDING + self.creation_datetime = datetime.datetime.now() + self.save() + class Meta: ordering = ('creation_datetime',) verbose_name = _(u'job queue item') diff --git a/apps/job_processor/permissions.py b/apps/job_processor/permissions.py index 4ae086c630..bf1544a702 100644 --- a/apps/job_processor/permissions.py +++ b/apps/job_processor/permissions.py @@ -8,3 +8,5 @@ namespace = PermissionNamespace('job_processor', _(u'Job processor')) PERMISSION_JOB_QUEUE_VIEW = Permission.objects.register(namespace, 'job_queue_view', _(u'View the job queues in a Mayan cluster')) PERMISSION_JOB_PROCESSING_CONFIGURATION = Permission.objects.register(namespace, 'job_processing_edit', _(u'Edit the the job processing configuration in a Mayan cluster')) PERMISSION_JOB_QUEUE_START_STOP = Permission.objects.register(namespace, 'job_queue_start_stop', _(u'Can start and stop a job queue in a Mayan cluster')) +PERMISSION_JOB_REQUEUE = Permission.objects.register(namespace, 'job_requeue', _(u'Requeue a job in a Mayan cluster')) +PERMISSION_JOB_DELETE = Permission.objects.register(namespace, 'job_delete', _(u'Delete a pending job in a Mayan cluster')) diff --git a/apps/job_processor/static/images/icons/cog_add.png b/apps/job_processor/static/images/icons/cog_add.png new file mode 100644 index 0000000000000000000000000000000000000000..b79b69d05123a5085d32bdab509eea9c8ef5c145 GIT binary patch literal 2266 zcmV<02qpK4P)RCwB~S9wrW_Z|Pdw{Mq)ML^^X ztT!N+1c}KcC`jXx1a0e>PWz8G_85D_2wIa`k2)$wL%gCnI@ZL|{*%U;Iz-!AR8+L7 zA_xi*lZnv6LeFLt-z@9#a-80`eD&AxcszclPN&2E1Md~`I?=%60-)?H-TgPx zZuZ(WMbY%#O=M)jLql$yIQdUVk_?~Ei-mK`z5xiv^XH7uz>)T0f82Jw(oj0pR5JT{wU4R9t-G&$F|$M8rEBF2BFY z&mt}+Po4;KLLy>fql7Sr!>(y63y_hK5eN+p#m65XL&?_7^XX_aQJ*H`p#WSfR~7#z zXGV5zqB#K;OEYOE3r$U?5TvK2z&c<>)zz!G+1QBY<`%?|*{4pOgaNCS0DFW*>~=e1 zVxr)3IuRWk_vO{9D;0eAw*{fHY4f(@lH@B?^Lz-)mM%qaZy!;w>SqFxnwo+O7cSz) zjT>#%SF85eY_QzEG2N0Y7Zc`IPc(-mjYP=CFImhTXCX>@=@5VZK=%orgCd+#jS+SdIkZ*2v5Nw)iPzaxvaARI{IF~v z+t8=p&vHi89<>znB5&4lI#Cqs=mPQ1rjuo%g+XsXQgTZCD8*zbEG*2(>mKsIo-C&j z2qrx#q?gop^1q5`^g<*-PyXgW^LjmSyFFC(EdGLHwGALGJq@?M_~LEaTs8uP68}U( zDX$L>+F?+F3AlkY)4^;uYu4~VFH`UE29FBBXCJgx?$}v6n>FA7T)zAjgpLV?-DVdC zv(cwHF_A%-i`T5Z<#f7!sVKTGO1p7EL573r#Gh!MgwFOpSO#iv_Rc$~ZM*Jw1G_WE z8$~qd`pBNZ@W$x%`bp0|mzzhaHI$l~_I&=F{Fe&~3PkyZcEUhWJT7u9?of5BuWx{h z%B;a)KxRe;CQqCKYo8l`sa=WM&N3{_`4Og1$c51mBF;P9gJ`h)2jz92pr_k&=*%Xp zAhyR}2Z%z6Oq^Ck<9@rc=*o%}zo48}TkS9inN$f035GGqsFq}6^18>V!$&pCVXP36d(tjB@T92dMY7=K;LrutxTMTnVF5dn7MznQw9y+yN1tA4kf&TAWfUV7<; zgd}RQeSQ6-22P*>^>PqycUR$H-7>6OvICzr9YFg)Gn^D9?<`c$zIH-F5UCkE#*EzY zMc7yNN3?!z%|E{jmBRyxP_TH->-Vy0g@_xC z$rLIoWH^;oqtPg8h0GC6#P52Y5!Lr#*|4o;@zvuz)_opSa*{g=Dks%NO70VO&r+UmETY=Z4&$D|F5qd2O>A>ngqgqV;pS?ky&3>15C9jKZdUE+k;BE*Jl-{h znlec^cdxc{6{lO0Q<67wEcgM?aWVpUTZfwpCIDt6;(L*wy(YhTB@;^<5PNsA2DNi3MYyhVL1jY&H5dR z1S1BfjcNW_ZBFaa8LIL06548HS4sGpdQP=aV0tx z=8k?O2b21gGzg*2Z#1Z8Y;U80uA@ojE>4~=#d^)Pb_tK?VMW4;5|ZX&*7#@eD?=Ej z``ZplarNW650Yfi4!>SE4oN2beX7o58zE7{s oeTIPhecDt!9MJyC^j`r600XXrydjdLmjD0&07*qoM6N<$g3mow8UO$Q literal 0 HcmV?d00001 diff --git a/apps/job_processor/static/images/icons/cog_delete.png b/apps/job_processor/static/images/icons/cog_delete.png new file mode 100644 index 0000000000000000000000000000000000000000..924e04d69ca011457c71bc0a76f992fbc0f1fda3 GIT binary patch literal 2273 zcmV<72p;!|P)scq%-Os9z0T|R{my}d|Njw>9DVaF0B4dU zDKNnAl47rS7Cx>1CvY4OvMj+9rXS52q*V-1RUA1siczY6sqBR<-xn2r zfidA=`RgCAT$%nvT3V{U{Ix&+7soVZFTT9Z?ePFcfrXtsr#Jnkahj)fw4V0SemaNF zyPE;?Cxrv$ul_A7BV$WZQ6a>~$AiHT6_J>?~N8k_>j6Eh9TOH>dW(`7;ERg$&?=!+x;*wW_SFtj$G* z>)}d6BRHKdShFS{Wb{(PaFgQE(NU({@4dt%CME`!E=>d;)57r_G&VLvNQf5J{$LGU ztgDCi_V$15+w<~M3#B6%10g&wtdnIKW-+9soDOepz6w!Mk*Ii#<((9B90#EH z%*qUli7~>b8C`{^87N9TSQNP|R~XsPoTK(W&mAox$fc@|5_BE@XNfsRSfYi zmnV>K%rnEqvSrDTxFi8$?$$;E`N1iK|)g2(NKn7BnfO^pq;boO_FP<&H6x;1FED-a1@y_|>OzXjI9k zrsng{E|jBV%KXT&8BG@n$4|d3Om*4H6ynpek!I(S}f3YLKF*l0Q*j%<9X(VTBU~6wDkA|$iy2F5uv4dALd`g zv{AkV%DeKNN5>@8D=$EX;v5!LxR`(@Nj{jJ^&#u^1qjDxpMlJ*Oz7(F{u3^)nrDO< zzj#Rn&D)#~;58cH@M=nEcv2#mUX$^o?oM zA8r#KxdN5APacEhb%h|bH-c~A7I2sv4nfW4sXpNKQu43>lGanR)#fTPorc6wm! zbZ|!$;&eJ%C?Io0M1@AD3u9jEkyf=@E#nF-uYRHL=%MG|7a=M(8hkgqfur^eYXLYg zpbncNb~Z+HzMEYT6|Dyi&lix+;C?`U_no&&w(t1GagS%V&}1^PD`>G;;M}=SL9dU1 zty{O^TaR_#=SKzcghoZ3De(QbkV63U)z|LYgiihBLE!ps&Rqp{umNZ`3{`&kzFC%} zqv&BF4YstjjGp+*pPzZ8c*DaEhjZU<%4tZ)Ega|#@e*P%P zm@<&(u^wTD*b&!ASY_w9etui-Q#u#!gZJOtsmy<)6dwa&gPKw|wPQTT07jJCAD0Lj zb{80ohyw(r>@Na$6n=kc7hw!b{j}HDLe4Jqf5Ds)E`=wYO40GU-R{c=4jd{V@0k*6 zOcAI?!{W}&p`Mk-MR9%_R%*+<0I;T&p5CER_-thI7&R%}J^T|BPF*;PabD!eV6Jji zia8@T`pJA`Mk)X@tAO%t*B32=CS=jB@#(5\d+)/stop/$', 'job_queue_stop', (), 'job_queue_stop'), url(r'^config/edit/$', 'job_queue_config_edit', (), 'job_queue_config_edit'), + + url(r'^job/(?P\d+)/requeue/$', 'job_requeue', (), 'job_requeue'), + url(r'^job/(?P\d+)/delete/$', 'job_delete', (), 'job_delete'), ) diff --git a/apps/job_processor/views.py b/apps/job_processor/views.py index 5feb983311..4d802e9b93 100644 --- a/apps/job_processor/views.py +++ b/apps/job_processor/views.py @@ -15,9 +15,10 @@ from permissions.models import Permission from .exceptions import JobQueueAlreadyStopped, JobQueueAlreadyStarted from .forms import JobProcessingConfigForm -from .models import JobQueue, JobProcessingConfig +from .models import JobQueue, JobProcessingConfig, JobQueueItem from .permissions import (PERMISSION_JOB_QUEUE_VIEW, - PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP) + PERMISSION_JOB_PROCESSING_CONFIGURATION, PERMISSION_JOB_QUEUE_START_STOP, + PERMISSION_JOB_REQUEUE) def node_workers(request, node_pk): @@ -250,3 +251,65 @@ def job_queue_start(request, job_queue_pk): 'previous': previous, 'form_icon': u'control_play_blue.png', }, context_instance=RequestContext(request)) + + +def job_requeue(request, job_item_pk): + job = get_object_or_404(JobQueueItem, pk=job_item_pk) + + #try: + # Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE]) + #except PermissionDenied: + # AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + #try: + job.requeue() + #except JobQueueAlreadyStarted: + # messages.warning(request, _(u'job ueue already started.')) + # return HttpResponseRedirect(previous) + #else: + messages.success(request, _(u'Job requeue successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job, + 'object_name': _(u'job'), + 'title': _(u'Are you sure you wish to requeue job: %s?') % job, + 'next': next, + 'previous': previous, + 'form_icon': u'cog_add.png', + }, context_instance=RequestContext(request)) + + +def job_delete(request, job_item_pk): + job = get_object_or_404(JobQueueItem, pk=job_item_pk) + + #try: + # Permission.objects.check_permissions(request.user, [PERMISSION_JOB_REQUEUE]) + #except PermissionDenied: + # AccessEntry.objects.check_access(PERMISSION_JOB_REQUEUE, request.user, job_queue) + + next = request.POST.get('next', request.GET.get('next', request.META.get('HTTP_REFERER', None))) + previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', None))) + + if request.method == 'POST': + try: + job.delete() + except Exception, exc: + messages.warning(request, _(u'Error deleting job; %s.') % exc) + return HttpResponseRedirect(previous) + else: + messages.success(request, _(u'Job deleted successfully.')) + return HttpResponseRedirect(next) + + return render_to_response('generic_confirm.html', { + 'object': job, + 'object_name': _(u'job'), + 'title': _(u'Are you sure you wish to delete job: %s?') % job, + 'next': next, + 'previous': previous, + 'form_icon': u'cog_delete.png', + }, context_instance=RequestContext(request))