Improve removal of dead workers
This commit is contained in:
@@ -61,28 +61,23 @@ register_model_list_columns(Node, [
|
||||
|
||||
|
||||
@receiver(node_died, dispatch_uid='process_dead_workers')
|
||||
def process_dead_workers(sender, **kwargs):
|
||||
logger.debug('kwargs')
|
||||
logger.debug(kwargs)
|
||||
#TODO: delete all of the dying node workers and requeue their jobs
|
||||
def process_dead_workers(sender, node, **kwargs):
|
||||
logger.debug('received signal')
|
||||
for dead_worker in node.worker_set.all():
|
||||
if dead_worker.job_queue_item:
|
||||
dead_worker.job_queue_item.requeue(force=True, at_top=True)
|
||||
dead_worker.delete()
|
||||
|
||||
|
||||
@receiver(node_heartbeat, dispatch_uid='node_processes')
|
||||
def node_processes(sender, node, **kwargs):
|
||||
logger.debug('update current node\'s processes')
|
||||
pids = [process.pid for process in active_children()]
|
||||
logger.debug('pids: %s' % pids)
|
||||
# Create empty entry for a new unknown worker
|
||||
for process in active_children():
|
||||
worker = Worker.objects.get_or_create(
|
||||
node=node, pid=process.pid)
|
||||
|
||||
all_active_pids = psutil.get_pid_list()
|
||||
# Remove stale workers based on current child pids
|
||||
#for dead_worker in Worker.objects.filter(node=node).exclude(pid__in=all_active_pids):
|
||||
for dead_worker in Worker.objects.exclude(pid__in=all_active_pids):
|
||||
for dead_worker in node.worker_set.exclude(pid__in=all_active_pids):
|
||||
if dead_worker.job_queue_item:
|
||||
dead_worker.job_queue_item.requeue(force=True, at_top=True)
|
||||
dead_worker.delete()
|
||||
# TODO: requeue worker job or delete job?
|
||||
|
||||
|
||||
def kill_all_node_processes():
|
||||
|
||||
Reference in New Issue
Block a user