diff --git a/apps/job_processor/models.py b/apps/job_processor/models.py index 0bfdc069e3..4897bd403e 100644 --- a/apps/job_processor/models.py +++ b/apps/job_processor/models.py @@ -45,11 +45,16 @@ class Job(object): #function(**loads(job_queue_item.kwargs)) except Exception, exc: transaction.rollback() + @transaction.commit_on_success def set_state_error(): job_queue_item.result = exc job_queue_item.state = JOB_STATE_ERROR - job_queue_item.save() - transaction.commit_on_success(set_state_error)() + try: + job_queue_item.save() + except: + transaction.rollback() + + set_state_error() else: job_queue_item.delete() finally: @@ -228,6 +233,11 @@ class JobQueueItem(models.Model): def is_in_pending_state(self): return self.state == JOB_STATE_PENDING + @property + def is_in_processing_state(self): + return self.state == JOB_STATE_PROCESSING + + def requeue(self, force=False, at_top=False): """ Requeue a job so that it is executed again @@ -263,6 +273,9 @@ class Worker(models.Model): return u'%s-%s' % (self.node.hostname, self.pid) def terminate(self): + if self.node != Node.objects.myself(): + raise Exception('Not local worker') + #TODO: dispatch terminate request to remote nodes try: process = psutil.Process(int(self.pid)) except psutil.error.NoSuchProcess: