diff --git a/apps/job_processor/__init__.py b/apps/job_processor/__init__.py index cb2fa4b4d6..6dbd7dabbd 100644 --- a/apps/job_processor/__init__.py +++ b/apps/job_processor/__init__.py @@ -61,28 +61,23 @@ register_model_list_columns(Node, [ @receiver(node_died, dispatch_uid='process_dead_workers') -def process_dead_workers(sender, **kwargs): - logger.debug('kwargs') - logger.debug(kwargs) - #TODO: delete all of the dying node workers and requeue their jobs +def process_dead_workers(sender, node, **kwargs): + logger.debug('received signal') + for dead_worker in node.worker_set.all(): + if dead_worker.job_queue_item: + dead_worker.job_queue_item.requeue(force=True, at_top=True) + dead_worker.delete() @receiver(node_heartbeat, dispatch_uid='node_processes') def node_processes(sender, node, **kwargs): logger.debug('update current node\'s processes') - pids = [process.pid for process in active_children()] - logger.debug('pids: %s' % pids) - # Create empty entry for a new unknown worker - for process in active_children(): - worker = Worker.objects.get_or_create( - node=node, pid=process.pid) - all_active_pids = psutil.get_pid_list() # Remove stale workers based on current child pids - #for dead_worker in Worker.objects.filter(node=node).exclude(pid__in=all_active_pids): - for dead_worker in Worker.objects.exclude(pid__in=all_active_pids): + for dead_worker in node.worker_set.exclude(pid__in=all_active_pids): + if dead_worker.job_queue_item: + dead_worker.job_queue_item.requeue(force=True, at_top=True) dead_worker.delete() - # TODO: requeue worker job or delete job? def kill_all_node_processes():