Add task manager app.

Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
Roberto Rosario
2017-06-21 02:25:22 -04:00
parent 638cbba237
commit 739596e4ee
30 changed files with 553 additions and 5 deletions

View File

@@ -25,6 +25,7 @@ from .permissions import (
permission_document_checkin, permission_document_checkin_override, permission_document_checkin, permission_document_checkin_override,
permission_document_checkout, permission_document_checkout_detail_view permission_document_checkout, permission_document_checkout_detail_view
) )
from .queues import * # NOQA
from .tasks import task_check_expired_check_outs # NOQA from .tasks import task_check_expired_check_outs # NOQA
# This import is required so that celerybeat can find the task # This import is required so that celerybeat can find the task

View File

@@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_checkouts_periodic = CeleryQueue(
name='checkouts_periodic', label=_('Checkouts periodic'), transient=True
)
queue_checkouts_periodic.add_task_type(
name='task_check_expired_check_outs',
label=_('Check expired checkouts')
)

View File

@@ -28,6 +28,7 @@ from .links import (
from .literals import DELETE_STALE_UPLOADS_INTERVAL from .literals import DELETE_STALE_UPLOADS_INTERVAL
from .menus import menu_about, menu_main, menu_tools, menu_user from .menus import menu_about, menu_main, menu_tools, menu_user
from .licenses import * # NOQA from .licenses import * # NOQA
from .queues import * # NOQA - Force queues registration
from .settings import setting_auto_logging from .settings import setting_auto_logging
from .tasks import task_delete_stale_uploads # NOQA - Force task registration from .tasks import task_delete_stale_uploads # NOQA - Force task registration

View File

@@ -0,0 +1,17 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_default = CeleryQueue(
name='default', label=_('Default'), is_default_queue=True
)
queue_tools = CeleryQueue(name='tools', label=_('Tools'))
queue_common_periodic = CeleryQueue(
name='common_periodic', label=_('Common periodic'), transient=True
)
queue_common_periodic.add_task_type(
name='common.tasks.task_delete_stale_uploads',
label=_('Delete stale uploads')
)

View File

@@ -37,6 +37,7 @@ from .permissions import (
permission_document_indexing_create, permission_document_indexing_delete, permission_document_indexing_create, permission_document_indexing_delete,
permission_document_indexing_edit, permission_document_indexing_view permission_document_indexing_edit, permission_document_indexing_view
) )
from .queues import * # NOQA
from .widgets import get_instance_link, index_instance_item_link, node_level from .widgets import get_instance_link, index_instance_item_link, node_level
@@ -50,6 +51,8 @@ class DocumentIndexingApp(MayanAppConfig):
def ready(self): def ready(self):
super(DocumentIndexingApp, self).ready() super(DocumentIndexingApp, self).ready()
APIEndPoint(app=self, version_string='1')
Document = apps.get_model( Document = apps.get_model(
app_label='documents', model_name='Document' app_label='documents', model_name='Document'
) )
@@ -65,8 +68,6 @@ class DocumentIndexingApp(MayanAppConfig):
IndexInstanceNode = self.get_model('IndexInstanceNode') IndexInstanceNode = self.get_model('IndexInstanceNode')
IndexTemplateNode = self.get_model('IndexTemplateNode') IndexTemplateNode = self.get_model('IndexTemplateNode')
APIEndPoint(app=self, version_string='1')
ModelPermission.register( ModelPermission.register(
model=Index, permissions=( model=Index, permissions=(
permission_acl_edit, permission_acl_view, permission_acl_edit, permission_acl_view,

View File

@@ -0,0 +1,25 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from common.queues import queue_tools
from mayan_task_manager.classes import CeleryQueue
queue_indexing = CeleryQueue(name='indexing', label=_('Indexing'))
queue_indexing.add_task_type(
name='document_indexing.tasks.task_delete_empty',
label=_('Delete empty index nodes')
)
queue_indexing.add_task_type(
name='document_indexing.tasks.task_remove_document',
label=_('Remove document')
)
queue_indexing.add_task_type(
name='document_indexing.tasks.task_index_document',
label=_('Index document')
)
queue_tools.add_task_type(
name='document_indexing.tasks.task_rebuild_index',
label=_('Rebuild index')
)

View File

@@ -39,6 +39,7 @@ from .permissions import (
permission_document_version_signature_upload, permission_document_version_signature_upload,
permission_document_version_signature_view, permission_document_version_signature_view,
) )
from .queues import * # NOQA
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,25 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from common.queues import queue_tools
from mayan_task_manager.classes import CeleryQueue
queue_signatures = CeleryQueue(name='signatures', label=_('Signatures'))
queue_signatures.add_task_type(
name='document_signatures.tasks.task_verify_key_signatures',
label=_('Verify key signatures')
)
queue_signatures.add_task_type(
name='document_signatures.tasks.task_unverify_key_signatures',
label=_('Unverify key signatures')
)
queue_signatures.add_task_type(
name='document_signatures.tasks.task_verify_document_version',
label=_('Verify document version')
)
queue_tools.add_task_type(
name='document_signatures.tasks.task_verify_missing_embedded_signature',
label=_('Verify missing embedded signature')
)

View File

@@ -34,6 +34,7 @@ from .links import (
link_workflow_state_list link_workflow_state_list
) )
from .permissions import permission_workflow_transition from .permissions import permission_workflow_transition
from .queues import * # NOQA
class DocumentStatesApp(MayanAppConfig): class DocumentStatesApp(MayanAppConfig):
@@ -163,7 +164,7 @@ class DocumentStatesApp(MayanAppConfig):
( (
Queue( Queue(
'document_states', Exchange('document_states'), 'document_states', Exchange('document_states'),
routing_key='converter' routing_key='document_states'
), ),
) )
) )

View File

@@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_document_states = CeleryQueue(
name='document_states', label=_('Document states')
)
queue_document_states.add_task_type(
name='document_states.tasks.task_launch_all_workflows',
label=_('Launch all workflows')
)

View File

@@ -70,6 +70,7 @@ from .permissions import (
permission_document_trash, permission_document_version_revert, permission_document_trash, permission_document_version_revert,
permission_document_view permission_document_view
) )
from .queues import * # NOQA
# Just import to initialize the search models # Just import to initialize the search models
from .search import document_search, document_page_search # NOQA from .search import document_search, document_page_search # NOQA
from .statistics import ( from .statistics import (

View File

@@ -0,0 +1,48 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from common.queues import queue_tools
from mayan_task_manager.classes import CeleryQueue
queue_converter = CeleryQueue(
name='converter', label=_('Converter'), transient=True
)
queue_documents_periodic = CeleryQueue(
name='documents_periodic', label=_('Documents periodic'), transient=True
)
queue_uploads = CeleryQueue(
name='uploads', label=_('Uploads')
)
queue_documents_periodic.add_task_type(
name='documents.tasks.task_check_delete_periods',
label=_('Check document type delete periods')
)
queue_documents_periodic.add_task_type(
name='documents.tasks.task_check_trash_periods',
label=_('Check document type trash periods')
)
queue_documents_periodic.add_task_type(
name='documents.tasks.task_delete_stubs',
label=_('Delete document stubs')
)
queue_tools.add_task_type(
name='documents.tasks.task_clear_image_cache',
label=_('Clear image cache')
)
queue_converter.add_task_type(
name='documents.tasks.task_generate_document_page_image',
label=_('Generate document page image')
)
queue_uploads.add_task_type(
name='documents.tasks.task_update_page_count',
label=_('Update document page count')
)
queue_uploads.add_task_type(
name='documents.tasks.task_upload_new_version',
label=_('Upload new document version')
)

View File

@@ -18,6 +18,7 @@ from .links import (
from .permissions import ( from .permissions import (
permission_mailing_link, permission_mailing_send_document permission_mailing_link, permission_mailing_send_document
) )
from .queues import * # NOQA
class MailerApp(MayanAppConfig): class MailerApp(MayanAppConfig):

View File

@@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_mailing = CeleryQueue(
name='mailing', label=_('Mailing')
)
queue_mailing.add_task_type(
name='mailer.tasks.task_send_document',
label=_('Send document')
)

View File

@@ -0,0 +1,3 @@
from __future__ import unicode_literals
default_app_config = 'mayan_task_manager.apps.TaskManagerApp'

View File

@@ -0,0 +1,87 @@
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from common import (
MayanAppConfig, menu_object, menu_secondary, menu_tools
)
from common.widgets import two_state_template
from navigation import SourceColumn
from .classes import CeleryQueue, Task
from .links import (
link_queue_list, link_queue_active_task_list,
link_queue_scheduled_task_list, link_queue_reserved_task_list,
link_task_manager
)
class TaskManagerApp(MayanAppConfig):
app_namespace = 'task_manager'
app_url = 'task_manager'
name = 'mayan_task_manager'
verbose_name = _('Task manager')
def ready(self):
super(TaskManagerApp, self).ready()
SourceColumn(
source=CeleryQueue, label=_('Label'), attribute='label'
)
SourceColumn(
source=CeleryQueue, label=_('Name'), attribute='name'
)
SourceColumn(
source=CeleryQueue, label=_('Default queue?'),
func=lambda context: two_state_template(
context['object'].is_default_queue
)
)
SourceColumn(
source=CeleryQueue, label=_('Is transient?'),
func=lambda context: two_state_template(
context['object'].is_transient
)
)
SourceColumn(
source=Task, label=_('Type'), attribute='task_type'
)
SourceColumn(
source=Task, label=_('Start time'), attribute='get_time_started'
)
SourceColumn(
source=Task, label=_('Host'),
func=lambda context: context['object'].kwargs['hostname']
)
SourceColumn(
source=Task, label=_('Acknowledged'),
func=lambda context: two_state_template(
context['object'].kwargs['acknowledged']
)
)
SourceColumn(
source=Task, label=_('Arguments'),
func=lambda context: context['object'].kwargs['args']
)
SourceColumn(
source=Task, label=_('Keyword arguments'),
func=lambda context: context['object'].kwargs['kwargs']
)
SourceColumn(
source=Task, label=_('Worker process ID'),
func=lambda context: context['object'].kwargs['worker_pid']
)
menu_object.bind_links(
links=(
link_queue_active_task_list, link_queue_scheduled_task_list,
link_queue_reserved_task_list,
), sources=(CeleryQueue,)
)
menu_secondary.bind_links(
links=(link_queue_list,),
sources=(CeleryQueue, Task, 'task_manager:queue_list')
)
menu_tools.bind_links(links=(link_task_manager,))

View File

@@ -0,0 +1,98 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from celery.five import monotonic
from celery.task.control import inspect
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.timezone import now
@python_2_unicode_compatible
class TaskType(object):
_registry = {}
@classmethod
def all(cls):
return cls._registry.values()
@classmethod
def get(cls, name):
return cls._registry[name]
def __init__(self, name, label):
self.name = name
self.label = label
self.__class__._registry[name] = self
def __str__(self):
return force_text(self.label)
@python_2_unicode_compatible
class Task(object):
def __init__(self, task_type, kwargs):
self.task_type = task_type
self.kwargs = kwargs
def __str__(self):
return force_text(self.task_type)
def get_time_started(self):
return now() - timedelta(seconds=monotonic() - self.kwargs['time_start'])
@python_2_unicode_compatible
class CeleryQueue(object):
_registry = {}
_inspect_instance = inspect()
@classmethod
def all(cls):
return sorted(
cls._registry.values(), key=lambda instance: instance.label
)
@classmethod
def get(cls, queue_name):
return cls._registry[queue_name]
def __init__(self, name, label, is_default_queue=False, transient=False):
self.name = name
self.label = label
self.is_default_queue = is_default_queue
self.is_transient = transient
self.task_types = []
self.__class__._registry[name] = self
def __str__(self):
return force_text(self.label)
def add_task_type(self, *args, **kwargs):
self.task_types.append(TaskType(*args, **kwargs))
def get_active_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.active()
)
def get_reserved_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.reserved()
)
def get_scheduled_tasks(self):
return self._process_task_dictionary(
task_dictionary=self.__class__._inspect_instance.scheduled()
)
def _process_task_dictionary(self, task_dictionary):
result = []
for worker, tasks in task_dictionary.items():
for task in tasks:
if task['delivery_info']['routing_key'] == self.name:
task_type = TaskType.get(name=task['name'])
result.append(Task(task_type=task_type, kwargs=task))
return result

View File

@@ -0,0 +1,28 @@
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from navigation import Link
from .permissions import permission_task_view
link_task_manager = Link(
icon='fa fa-braille', permissions=(permission_task_view,),
text=_('Task manager'), view='task_manager:queue_list'
)
link_queue_list = Link(
icon='fa fa-braille', permissions=(permission_task_view,),
text=_('Backgroun task queues'), view='task_manager:queue_list'
)
link_queue_active_task_list = Link(
args='resolved_object.name', permissions=(permission_task_view,),
text=_('Active tasks'), view='task_manager:queue_active_task_list'
)
link_queue_reserved_task_list = Link(
args='resolved_object.name', permissions=(permission_task_view,),
text=_('Reserved tasks'), view='task_manager:queue_reserved_task_list'
)
link_queue_scheduled_task_list = Link(
args='resolved_object.name', permissions=(permission_task_view,),
text=_('Scheduled tasks'), view='task_manager:queue_scheduled_task_list'
)

View File

@@ -0,0 +1,11 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from permissions import PermissionNamespace
namespace = PermissionNamespace('task_manager', _('Task manager'))
permission_task_view = namespace.add_permission(
name='task_view', label=_('View tasks')
)

View File

@@ -0,0 +1,28 @@
from __future__ import unicode_literals
from django.conf.urls import url
from .views import (
QueueListView, QueueActiveTaskListView, QueueScheduledTaskListView,
QueueReservedTaskListView
)
urlpatterns = [
url(
r'^queues/$', QueueListView.as_view(),
name='queue_list'
),
url(
r'^queues/(?P<queue_name>[-\w]+)/tasks/active/$',
QueueActiveTaskListView.as_view(), name='queue_active_task_list'
),
url(
r'^queues/(?P<queue_name>[-\w]+)/tasks/scheduled/$',
QueueScheduledTaskListView.as_view(), name='queue_scheduled_task_list'
),
url(
r'^queues/(?P<queue_name>[-\w]+)/tasks/reserved/$',
QueueReservedTaskListView.as_view(), name='queue_reserved_task_list'
),
]

View File

@@ -0,0 +1,60 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from common.generics import SingleObjectListView
from .classes import CeleryQueue
from .permissions import permission_task_view
class QueueListView(SingleObjectListView):
extra_context = {
'hide_object': True,
'title': _('Background task queues'),
}
view_permission = permission_task_view
def get_queryset(self):
return CeleryQueue.all()
class QueueActiveTaskListView(SingleObjectListView):
view_permission = permission_task_view
def get_extra_context(self):
return {
'hide_object': True,
'object': self.get_object(),
'title': _('Active tasks in queue: %s') % self.get_object()
}
def get_object(self):
return CeleryQueue.get(queue_name=self.kwargs['queue_name'])
def get_queryset(self):
return self.get_object().get_active_tasks()
class QueueScheduledTaskListView(QueueActiveTaskListView):
def get_extra_context(self):
return {
'hide_object': True,
'object': self.get_object(),
'title': _('Scheduled tasks in queue: %s') % self.get_object()
}
def get_queryset(self):
return self.get_object().get_scheduled_tasks()
class QueueReservedTaskListView(QueueActiveTaskListView):
def get_extra_context(self):
return {
'hide_object': True,
'object': self.get_object(),
'title': _('Reserved tasks in queue: %s') % self.get_object()
}
def get_queryset(self):
return self.get_object().get_reserved_tasks()

View File

@@ -41,6 +41,7 @@ from .permissions import (
permission_metadata_document_add, permission_metadata_document_edit, permission_metadata_document_add, permission_metadata_document_edit,
permission_metadata_document_remove, permission_metadata_document_view permission_metadata_document_remove, permission_metadata_document_view
) )
from .queues import * # NOQA
from .search import metadata_type_search # NOQA from .search import metadata_type_search # NOQA
from .widgets import get_metadata_string from .widgets import get_metadata_string

View File

@@ -0,0 +1,17 @@
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_metadata = CeleryQueue(
name='metadata', label=_('Metadata')
)
queue_metadata.add_task_type(
name='metadata.tasks.task_remove_metadata_type',
label=_('Remove metadata type')
)
queue_metadata.add_task_type(
name='metadata.tasks.task_add_required_metadata_type',
label=_('Add required metadata type')
)

View File

@@ -28,6 +28,7 @@ from .links import (
link_document_type_submit, link_entry_list link_document_type_submit, link_entry_list
) )
from .permissions import permission_ocr_document, permission_ocr_content_view from .permissions import permission_ocr_document, permission_ocr_content_view
from .queues import * # NOQA
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -55,6 +56,8 @@ class OCRApp(MayanAppConfig):
def ready(self): def ready(self):
super(OCRApp, self).ready() super(OCRApp, self).ready()
APIEndPoint(app=self, version_string='1')
Document = apps.get_model( Document = apps.get_model(
app_label='documents', model_name='Document' app_label='documents', model_name='Document'
) )
@@ -69,8 +72,6 @@ class OCRApp(MayanAppConfig):
DocumentVersionOCRError = self.get_model('DocumentVersionOCRError') DocumentVersionOCRError = self.get_model('DocumentVersionOCRError')
APIEndPoint(app=self, version_string='1')
Document.add_to_class('submit_for_ocr', document_ocr_submit) Document.add_to_class('submit_for_ocr', document_ocr_submit)
DocumentVersion.add_to_class( DocumentVersion.add_to_class(
'submit_for_ocr', document_version_ocr_submit 'submit_for_ocr', document_version_ocr_submit

10
mayan/apps/ocr/queues.py Normal file
View File

@@ -0,0 +1,10 @@
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_ocr = CeleryQueue(name='ocr', label=_('OCR'))
queue_ocr.add_task_type(
name='ocr.tasks.task_do_ocr', label=_('Document version OCR')
)

View File

@@ -30,6 +30,7 @@ from .links import (
link_setup_source_edit, link_setup_source_logs, link_staging_file_delete, link_setup_source_edit, link_setup_source_logs, link_staging_file_delete,
link_upload_version link_upload_version
) )
from .queues import * # NOQA
from .widgets import StagingFileThumbnailWidget from .widgets import StagingFileThumbnailWidget

View File

@@ -0,0 +1,26 @@
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_sources = CeleryQueue(
name='sources', label=_('Sources'), transient=True
)
queue_sources_periodic = CeleryQueue(
name='sources_periodic', label=_('Sources periodic')
)
queue_sources_periodic.add_task_type(
name='sources.tasks.task_check_interval_source',
label=_('Check interval source')
)
queue_sources.add_task_type(
name='sources.tasks.task_source_handle_upload',
label=_('Handle upload')
)
queue_sources.add_task_type(
name='sources.tasks.task_upload_document',
label=_('Upload document')
)

View File

@@ -15,6 +15,7 @@ from .links import (
link_statistics, link_view link_statistics, link_view
) )
from .licenses import * # NOQA from .licenses import * # NOQA
from .queues import * # NOQA
from .tasks import task_execute_statistic # NOQA - Force registration of task from .tasks import task_execute_statistic # NOQA - Force registration of task

View File

@@ -0,0 +1,14 @@
from __future__ import unicode_literals
from django.utils.translation import ugettext_lazy as _
from mayan_task_manager.classes import CeleryQueue
queue_statistics = CeleryQueue(
name='statistics', label=_('Statistics'), transient=True
)
queue_statistics.add_task_type(
name='statistics.tasks.task_execute_statistic',
label=_('Execute statistic')
)

View File

@@ -92,6 +92,7 @@ INSTALLED_APPS = (
# 'folders', # 'folders',
'linking', 'linking',
'mailer', 'mailer',
'mayan_task_manager',
'metadata', 'metadata',
'mirroring', 'mirroring',
'motd', 'motd',