From 6d8dc886ca6b196ec97b3d9b48fec337291a5abc Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Sat, 4 Aug 2012 00:55:47 -0400 Subject: [PATCH] Improve job transaction handling --- apps/job_processor/models.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 0bfdc069e3..4897bd403e 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -45,11 +45,16 @@ class Job(object): #function(**loads(job_queue_item.kwargs)) except Exception, exc: transaction.rollback() + @transaction.commit_on_success def set_state_error(): job_queue_item.result = exc job_queue_item.state = JOB_STATE_ERROR - job_queue_item.save() - transaction.commit_on_success(set_state_error)() + try: + job_queue_item.save() + except: + transaction.rollback() + + set_state_error() else: job_queue_item.delete() finally: @@ -228,6 +233,11 @@ class JobQueueItem(models.Model): def is_in_pending_state(self): return self.state == JOB_STATE_PENDING + @property + def is_in_processing_state(self): + return self.state == JOB_STATE_PROCESSING + + def requeue(self, force=False, at_top=False): """ Requeue a job so that it is executed again @@ -263,6 +273,9 @@ class Worker(models.Model): return u'%s-%s' % (self.node.hostname, self.pid) def terminate(self): + if self.node != Node.objects.myself(): + raise Exception('Not local worker') + #TODO: dispatch terminate request to remote nodes try: process = psutil.Process(int(self.pid)) except psutil.error.NoSuchProcess: