Remove task inspection from task manager app

Signed-off-by: Roberto Rosario <roberto.rosario@mayan-edms.com>
This commit is contained in:
Roberto Rosario
2019-07-24 02:49:37 -04:00
parent fac77a2f73
commit 6f907d156a
8 changed files with 6 additions and 180 deletions

View File

@@ -6,9 +6,6 @@ import logging
from kombu import Exchange, Queue
from celery.five import monotonic
from celery.task.control import inspect
from django.apps import apps
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.module_loading import import_string
@@ -62,18 +59,10 @@ class Task(object):
def __str__(self):
return force_text(self.task_type)
def get_time_started(self):
time_start = self.kwargs.get('time_start')
if time_start:
return now() - timedelta(seconds=monotonic() - self.kwargs['time_start'])
else:
return None
@python_2_unicode_compatible
class CeleryQueue(object):
_registry = {}
_inspect_instance = inspect()
@staticmethod
def initialize():
@@ -132,21 +121,6 @@ class CeleryQueue(object):
self.task_types.append(task_type)
return task_type
def get_active_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.active()
)
def get_reserved_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.reserved()
)
def get_scheduled_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.scheduled()
)
def _update_celery(self):
kwargs = {
'name': self.name, 'exchange': Exchange(self.name),