Improve job transaction handling
This commit is contained in:
@@ -45,11 +45,16 @@ class Job(object):
|
||||
#function(**loads(job_queue_item.kwargs))
|
||||
except Exception, exc:
|
||||
transaction.rollback()
|
||||
@transaction.commit_on_success
|
||||
def set_state_error():
|
||||
job_queue_item.result = exc
|
||||
job_queue_item.state = JOB_STATE_ERROR
|
||||
job_queue_item.save()
|
||||
transaction.commit_on_success(set_state_error)()
|
||||
try:
|
||||
job_queue_item.save()
|
||||
except:
|
||||
transaction.rollback()
|
||||
|
||||
set_state_error()
|
||||
else:
|
||||
job_queue_item.delete()
|
||||
finally:
|
||||
@@ -228,6 +233,11 @@ class JobQueueItem(models.Model):
|
||||
def is_in_pending_state(self):
|
||||
return self.state == JOB_STATE_PENDING
|
||||
|
||||
@property
|
||||
def is_in_processing_state(self):
|
||||
return self.state == JOB_STATE_PROCESSING
|
||||
|
||||
|
||||
def requeue(self, force=False, at_top=False):
|
||||
"""
|
||||
Requeue a job so that it is executed again
|
||||
@@ -263,6 +273,9 @@ class Worker(models.Model):
|
||||
return u'%s-%s' % (self.node.hostname, self.pid)
|
||||
|
||||
def terminate(self):
|
||||
if self.node != Node.objects.myself():
|
||||
raise Exception('Not local worker')
|
||||
#TODO: dispatch terminate request to remote nodes
|
||||
try:
|
||||
process = psutil.Process(int(self.pid))
|
||||
except psutil.error.NoSuchProcess:
|
||||
|
||||
Reference in New Issue
Block a user