From f76b9e4f3f5423e871b81abd099d0b506a2c7357 Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Sun, 12 May 2019 03:34:32 -0400 Subject: [PATCH] Move task registration to the CeleryQueue class Signed-off-by: Roberto Rosario --- HISTORY.rst | 3 + docs/releases/3.2.rst | 2 + mayan/apps/checkouts/apps.py | 33 --------- mayan/apps/checkouts/queues.py | 10 ++- mayan/apps/common/apps.py | 39 +--------- mayan/apps/common/queues.py | 13 +++- mayan/apps/document_indexing/apps.py | 25 ------- mayan/apps/document_indexing/queues.py | 8 +-- mayan/apps/document_parsing/apps.py | 15 ---- mayan/apps/document_parsing/queues.py | 2 +- mayan/apps/document_signatures/apps.py | 27 ------- mayan/apps/document_signatures/queues.py | 8 +-- mayan/apps/document_states/apps.py | 21 ------ mayan/apps/document_states/queues.py | 2 +- mayan/apps/documents/apps.py | 84 ---------------------- mayan/apps/documents/links.py | 30 +++----- mayan/apps/documents/queues.py | 47 +++++++++---- mayan/apps/mailer/apps.py | 16 ----- mayan/apps/mailer/queues.py | 2 +- mayan/apps/mayan_statistics/apps.py | 21 ------ mayan/apps/mayan_statistics/queues.py | 2 +- mayan/apps/metadata/apps.py | 19 ----- mayan/apps/metadata/queues.py | 4 +- mayan/apps/ocr/apps.py | 16 ----- mayan/apps/ocr/queues.py | 3 +- mayan/apps/sources/apps.py | 36 ---------- mayan/apps/sources/queues.py | 8 +-- mayan/apps/task_manager/apps.py | 2 + mayan/apps/task_manager/classes.py | 90 ++++++++++++++++++++---- 29 files changed, 168 insertions(+), 420 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index f56486ba16..c067f97feb 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -256,6 +256,9 @@ metadata. * Create intermedia file cache folder. Fixes preview errors when the first document uploaded is an office file. +* Move queue and task registration to the CeleryQueue class. + The .queues.py module is now loaded automatically. + 3.1.11 (2019-04-XX) =================== diff --git a/docs/releases/3.2.rst b/docs/releases/3.2.rst index 3cf18015fb..866b3c26dd 100644 --- a/docs/releases/3.2.rst +++ b/docs/releases/3.2.rst @@ -566,6 +566,8 @@ Other changes metadata. * Create intermedia file cache folder. Fixes preview errors when the first document uploaded is an office file. +* Move queue and task registration to the CeleryQueue class. + The .queues.py module is now loaded automatically. Removals diff --git a/mayan/apps/checkouts/apps.py b/mayan/apps/checkouts/apps.py index 8713f8101b..6f9e5b7c00 100644 --- a/mayan/apps/checkouts/apps.py +++ b/mayan/apps/checkouts/apps.py @@ -1,9 +1,5 @@ from __future__ import absolute_import, unicode_literals -from datetime import timedelta - -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import pre_save from django.utils.translation import ugettext_lazy as _ @@ -13,7 +9,6 @@ from mayan.apps.common.apps import MayanAppConfig from mayan.apps.common.menus import menu_facet, menu_main, menu_secondary from mayan.apps.dashboards.dashboards import dashboard_main from mayan.apps.events.classes import ModelEventType -from mayan.celery import app from .dashboard_widgets import DashboardWidgetTotalCheckouts from .events import ( @@ -25,7 +20,6 @@ from .links import ( link_check_in_document, link_check_out_document, link_check_out_info, link_check_out_list ) -from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL from .methods import ( method_check_in, method_get_check_out_info, method_get_check_out_state, method_is_checked_out @@ -34,7 +28,6 @@ from .permissions import ( permission_document_check_in, permission_document_check_in_override, permission_document_check_out, permission_document_check_out_detail_view ) -from .queues import * # NOQA from .tasks import task_check_expired_check_outs # NOQA # This import is required so that celerybeat can find the task @@ -84,32 +77,6 @@ class CheckoutsApp(MayanAppConfig): ) ) - app.conf.CELERYBEAT_SCHEDULE.update( - { - 'task_check_expired_check_outs': { - 'task': 'mayan.apps.checkouts.tasks.task_check_expired_check_outs', - 'schedule': timedelta( - seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL - ), - }, - } - ) - - app.conf.CELERY_QUEUES.append( - Queue( - 'checkouts_periodic', Exchange('checkouts_periodic'), - routing_key='checkouts_periodic', delivery_mode=1 - ), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.checkouts.tasks.task_check_expired_check_outs': { - 'queue': 'checkouts_periodic' - }, - } - ) - dashboard_main.add_widget( widget=DashboardWidgetTotalCheckouts, order=-1 ) diff --git a/mayan/apps/checkouts/queues.py b/mayan/apps/checkouts/queues.py index 9ef477bdc8..323f0d4d5f 100644 --- a/mayan/apps/checkouts/queues.py +++ b/mayan/apps/checkouts/queues.py @@ -1,13 +1,21 @@ from __future__ import absolute_import, unicode_literals +from datetime import timedelta + from django.utils.translation import ugettext_lazy as _ from mayan.apps.task_manager.classes import CeleryQueue +from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL + queue_checkouts_periodic = CeleryQueue( label=_('Checkouts periodic'), name='checkouts_periodic', transient=True ) queue_checkouts_periodic.add_task_type( label=_('Check expired checkouts'), - name='mayan.apps.task_check_expired_check_outs' + name='task_check_expired_check_outs', + dotted_path='mayan.apps.task_check_expired_check_outs', + schedule=timedelta( + seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL + ), ) diff --git a/mayan/apps/common/apps.py b/mayan/apps/common/apps.py index 0e00767e07..2f72500fe3 100644 --- a/mayan/apps/common/apps.py +++ b/mayan/apps/common/apps.py @@ -1,14 +1,11 @@ from __future__ import absolute_import, unicode_literals -from datetime import timedelta import logging import os import sys import traceback import warnings -from kombu import Exchange, Queue - from django import apps from django.conf import settings from django.conf.urls import include, url @@ -17,8 +14,6 @@ from django.db.models.signals import post_save from django.utils.encoding import force_text from django.utils.translation import ugettext_lazy as _ -from mayan.celery import app - from .classes import Template from .dependencies import * # NOQA from .handlers import ( @@ -30,11 +25,10 @@ from .links import ( link_object_error_list_clear, link_setup, link_tools ) -from .literals import DELETE_STALE_UPLOADS_INTERVAL, MESSAGE_SQLITE_WARNING +from .literals import MESSAGE_SQLITE_WARNING from .menus import ( menu_about, menu_main, menu_secondary, menu_user ) -from .queues import * # NOQA - Force queues registration from .settings import ( setting_auto_logging, setting_production_error_log_path, setting_production_error_logging @@ -100,37 +94,6 @@ class CommonApp(MayanAppConfig): name='menu_main', template_name='appearance/main_menu.html' ) - app.conf.CELERYBEAT_SCHEDULE.update( - { - 'task_delete_stale_uploads': { - 'task': 'mayan.apps.common.tasks.task_delete_stale_uploads', - 'schedule': timedelta( - seconds=DELETE_STALE_UPLOADS_INTERVAL - ), - }, - } - ) - - app.conf.CELERY_QUEUES.extend( - ( - Queue('default', Exchange('default'), routing_key='default'), - Queue('tools', Exchange('tools'), routing_key='tools'), - Queue( - 'common_periodic', Exchange('common_periodic'), - routing_key='common_periodic', delivery_mode=1 - ), - ) - ) - - app.conf.CELERY_DEFAULT_QUEUE = 'default' - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.common.tasks.task_delete_stale_uploads': { - 'queue': 'common_periodic' - }, - } - ) menu_user.bind_links( links=( link_current_user_locale_profile_edit, diff --git a/mayan/apps/common/queues.py b/mayan/apps/common/queues.py index 01569efe9e..9612710e2f 100644 --- a/mayan/apps/common/queues.py +++ b/mayan/apps/common/queues.py @@ -1,17 +1,24 @@ from __future__ import absolute_import, unicode_literals +from datetime import timedelta + from django.utils.translation import ugettext_lazy as _ from mayan.apps.task_manager.classes import CeleryQueue +from .literals import DELETE_STALE_UPLOADS_INTERVAL + queue_default = CeleryQueue( - is_default_queue=True, label=_('Default'), name='default' + default_queue=True, label=_('Default'), name='default' ) queue_tools = CeleryQueue(label=_('Tools'), name='tools') queue_common_periodic = CeleryQueue( label=_('Common periodic'), name='common_periodic', transient=True ) queue_common_periodic.add_task_type( - label=_('Delete stale uploads'), - name='mayan.apps.common.tasks.task_delete_stale_uploads' + dotted_path='mayan.apps.common.tasks.task_delete_stale_uploads', + label=_('Delete stale uploads'), name='task_delete_stale_uploads', + schedule=timedelta( + seconds=DELETE_STALE_UPLOADS_INTERVAL + ) ) diff --git a/mayan/apps/document_indexing/apps.py b/mayan/apps/document_indexing/apps.py index e1ae55c8ef..c9b9dc5cbd 100644 --- a/mayan/apps/document_indexing/apps.py +++ b/mayan/apps/document_indexing/apps.py @@ -1,7 +1,5 @@ from __future__ import absolute_import, unicode_literals -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import post_delete, post_save, pre_delete from django.utils.translation import ugettext_lazy as _ @@ -21,7 +19,6 @@ from mayan.apps.events.links import ( link_events_for_object, link_object_event_types_user_subcriptions_list ) from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .events import event_index_template_created, event_index_template_edited from .handlers import ( @@ -47,7 +44,6 @@ from .permissions import ( permission_document_indexing_instance_view, permission_document_indexing_rebuild, permission_document_indexing_view ) -from .queues import * # NOQA class DocumentIndexingApp(MayanAppConfig): @@ -178,27 +174,6 @@ class DocumentIndexingApp(MayanAppConfig): ), label=_('Documents'), source=DocumentIndexInstanceNode ) - app.conf.CELERY_QUEUES.append( - Queue('indexing', Exchange('indexing'), routing_key='indexing'), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.document_indexing.tasks.task_delete_empty': { - 'queue': 'indexing' - }, - 'mayan.apps.document_indexing.tasks.task_remove_document': { - 'queue': 'indexing' - }, - 'mayan.apps.document_indexing.tasks.task_index_document': { - 'queue': 'indexing' - }, - 'mayan.apps.document_indexing.tasks.task_rebuild_index': { - 'queue': 'tools' - }, - } - ) - menu_facet.bind_links( links=(link_document_index_instance_list,), sources=(Document,) ) diff --git a/mayan/apps/document_indexing/queues.py b/mayan/apps/document_indexing/queues.py index f8749d9235..e000909136 100644 --- a/mayan/apps/document_indexing/queues.py +++ b/mayan/apps/document_indexing/queues.py @@ -9,17 +9,17 @@ queue_indexing = CeleryQueue(label=_('Indexing'), name='indexing') queue_indexing.add_task_type( label=_('Delete empty index nodes'), - name='mayan.apps.document_indexing.tasks.task_delete_empty' + dotted_path='mayan.apps.document_indexing.tasks.task_delete_empty' ) queue_indexing.add_task_type( label=_('Remove document'), - name='mayan.apps.document_indexing.tasks.task_remove_document' + dotted_path='mayan.apps.document_indexing.tasks.task_remove_document' ) queue_indexing.add_task_type( label=_('Index document'), - name='mayan.apps.document_indexing.tasks.task_index_document' + dotted_path='mayan.apps.document_indexing.tasks.task_index_document' ) queue_tools.add_task_type( label=_('Rebuild index'), - name='mayan.apps.document_indexing.tasks.task_rebuild_index' + dotted_path='mayan.apps.document_indexing.tasks.task_rebuild_index' ) diff --git a/mayan/apps/document_parsing/apps.py b/mayan/apps/document_parsing/apps.py index e5969f8c79..844198cd9d 100644 --- a/mayan/apps/document_parsing/apps.py +++ b/mayan/apps/document_parsing/apps.py @@ -2,8 +2,6 @@ from __future__ import unicode_literals import logging -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import post_save from django.utils.translation import ugettext_lazy as _ @@ -17,7 +15,6 @@ from mayan.apps.common.menus import ( from mayan.apps.documents.search import document_search, document_page_search from mayan.apps.documents.signals import post_version_upload from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .dependencies import * # NOQA from .handlers import ( @@ -120,18 +117,6 @@ class DocumentParsingApp(MayanAppConfig): attribute='result' ) - app.conf.CELERY_QUEUES.append( - Queue('parsing', Exchange('parsing'), routing_key='parsing'), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.document_parsing.tasks.task_parse_document_version': { - 'queue': 'parsing' - }, - } - ) - document_search.add_model_field( field='versions__pages__content__content', label=_('Content') ) diff --git a/mayan/apps/document_parsing/queues.py b/mayan/apps/document_parsing/queues.py index 8fa72c2cc4..fed6d834a7 100644 --- a/mayan/apps/document_parsing/queues.py +++ b/mayan/apps/document_parsing/queues.py @@ -6,6 +6,6 @@ from mayan.apps.task_manager.classes import CeleryQueue queue_ocr = CeleryQueue(name='parsing', label=_('Parsing')) queue_ocr.add_task_type( - name='mayan.apps.document_parsing.tasks.task_parse_document_version', + dotted_path='mayan.apps.document_parsing.tasks.task_parse_document_version', label=_('Document version parsing') ) diff --git a/mayan/apps/document_signatures/apps.py b/mayan/apps/document_signatures/apps.py index b1035ca51e..12dfc3e18a 100644 --- a/mayan/apps/document_signatures/apps.py +++ b/mayan/apps/document_signatures/apps.py @@ -2,8 +2,6 @@ from __future__ import unicode_literals import logging -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import post_save, post_delete from django.utils.translation import ugettext_lazy as _ @@ -14,7 +12,6 @@ from mayan.apps.common.menus import ( menu_facet, menu_object, menu_secondary, menu_tools ) from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .handlers import ( handler_unverify_key_signatures, handler_verify_key_signatures @@ -38,7 +35,6 @@ from .permissions import ( permission_document_version_signature_upload, permission_document_version_signature_view, ) -from .queues import * # NOQA logger = logging.getLogger(__name__) @@ -112,29 +108,6 @@ class DocumentSignaturesApp(MayanAppConfig): ).get_signature_type_display() ) - app.conf.CELERY_QUEUES.append( - Queue( - 'signatures', Exchange('signatures'), routing_key='signatures' - ), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.document_signatures.tasks.task_handler_verify_key_signatures': { - 'queue': 'signatures' - }, - 'mayan.apps.document_signatures.tasks.task_handler_unhandler_verify_key_signatures': { - 'queue': 'signatures' - }, - 'mayan.apps.document_signatures.tasks.task_verify_document_version': { - 'queue': 'signatures' - }, - 'mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature': { - 'queue': 'tools' - }, - } - ) - menu_facet.bind_links( links=(link_document_signature_list,), sources=(Document,) ) diff --git a/mayan/apps/document_signatures/queues.py b/mayan/apps/document_signatures/queues.py index a10d60caac..839c72f8a0 100644 --- a/mayan/apps/document_signatures/queues.py +++ b/mayan/apps/document_signatures/queues.py @@ -9,18 +9,18 @@ queue_signatures = CeleryQueue(label=_('Signatures'), name='signatures') queue_signatures.add_task_type( label=_('Verify key signatures'), - name='mayan.apps.document_signatures.tasks.task_verify_key_signatures' + dotted_path='mayan.apps.document_signatures.tasks.task_verify_key_signatures' ) queue_signatures.add_task_type( label=_('Unverify key signatures'), - name='mayan.apps.document_signatures.tasks.task_unverify_key_signatures' + dotted_path='mayan.apps.document_signatures.tasks.task_unverify_key_signatures' ) queue_signatures.add_task_type( label=_('Verify document version'), - name='mayan.apps.document_signatures.tasks.task_verify_document_version' + dotted_path='mayan.apps.document_signatures.tasks.task_verify_document_version' ) queue_tools.add_task_type( label=_('Verify missing embedded signature'), - name='mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature' + dotted_path='mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature' ) diff --git a/mayan/apps/document_states/apps.py b/mayan/apps/document_states/apps.py index f784c0af85..f49310b836 100644 --- a/mayan/apps/document_states/apps.py +++ b/mayan/apps/document_states/apps.py @@ -4,8 +4,6 @@ from django.apps import apps from django.db.models.signals import post_save from django.utils.translation import ugettext_lazy as _ -from kombu import Exchange, Queue - from mayan.apps.acls.classes import ModelPermission from mayan.apps.acls.links import link_acl_list from mayan.apps.common.apps import MayanAppConfig @@ -22,7 +20,6 @@ from mayan.apps.events.links import ( link_events_for_object, link_object_event_types_user_subcriptions_list ) from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .classes import DocumentStateHelper, WorkflowAction from .events import event_workflow_created, event_workflow_edited @@ -53,7 +50,6 @@ from .permissions import ( permission_workflow_delete, permission_workflow_edit, permission_workflow_transition, permission_workflow_view ) -from .queues import * # NOQA from .widgets import widget_transition_events @@ -260,23 +256,6 @@ class DocumentStatesApp(MayanAppConfig): ) ) - app.conf.CELERY_QUEUES.extend( - ( - Queue( - 'document_states', Exchange('document_states'), - routing_key='document_states' - ), - ) - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.document_states.tasks.task_launch_all_workflows': { - 'queue': 'document_states' - }, - } - ) - menu_facet.bind_links( links=(link_document_workflow_instance_list,), sources=(Document,) ) diff --git a/mayan/apps/document_states/queues.py b/mayan/apps/document_states/queues.py index b35f50b54e..4b077b703c 100644 --- a/mayan/apps/document_states/queues.py +++ b/mayan/apps/document_states/queues.py @@ -8,6 +8,6 @@ queue_document_states = CeleryQueue( name='document_states', label=_('Document states') ) queue_document_states.add_task_type( - name='mayan.apps.document_states.tasks.task_launch_all_workflows', + dotted_path='mayan.apps.document_states.tasks.task_launch_all_workflows', label=_('Launch all workflows') ) diff --git a/mayan/apps/documents/apps.py b/mayan/apps/documents/apps.py index d080f2e5b1..fd4c59da31 100644 --- a/mayan/apps/documents/apps.py +++ b/mayan/apps/documents/apps.py @@ -1,9 +1,5 @@ from __future__ import absolute_import, unicode_literals -from datetime import timedelta - -from kombu import Exchange, Queue - from django.db.models.signals import post_delete from django.utils.translation import ugettext_lazy as _ @@ -32,7 +28,6 @@ from mayan.apps.events.links import ( from mayan.apps.events.permissions import permission_events_view from mayan.apps.navigation.classes import SourceColumn from mayan.apps.rest_api.fields import DynamicSerializerField -from mayan.celery import app from .dashboard_widgets import ( DashboardWidgetDocumentPagesTotal, DashboardWidgetDocumentsInTrash, @@ -83,10 +78,6 @@ from .links import ( link_document_version_view, link_duplicated_document_list, link_duplicated_document_scan, link_trash_can_empty ) -from .literals import ( - CHECK_DELETE_PERIOD_INTERVAL, CHECK_TRASH_PERIOD_INTERVAL, - DELETE_STALE_STUBS_INTERVAL -) from .menus import menu_documents from .permissions import ( permission_document_create, permission_document_delete, @@ -98,7 +89,6 @@ from .permissions import ( permission_document_version_revert, permission_document_version_view, permission_document_view ) -from .queues import * # NOQA # Just import to initialize the search models from .search import document_search, document_page_search # NOQA from .signals import post_version_upload @@ -356,80 +346,6 @@ class DocumentsApp(MayanAppConfig): template_name='documents/invalid_document.html' ) - app.conf.CELERYBEAT_SCHEDULE.update( - { - 'task_check_delete_periods': { - 'task': 'mayan.apps.documents.tasks.task_check_delete_periods', - 'schedule': timedelta( - seconds=CHECK_DELETE_PERIOD_INTERVAL - ), - }, - 'task_check_trash_periods': { - 'task': 'mayan.apps.documents.tasks.task_check_trash_periods', - 'schedule': timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL), - }, - 'task_delete_stubs': { - 'task': 'mayan.apps.documents.tasks.task_delete_stubs', - 'schedule': timedelta(seconds=DELETE_STALE_STUBS_INTERVAL), - }, - } - ) - - app.conf.CELERY_QUEUES.extend( - ( - Queue( - 'converter', Exchange('converter'), - routing_key='converter', delivery_mode=1 - ), - Queue( - 'documents_periodic', Exchange('documents_periodic'), - routing_key='documents_periodic', delivery_mode=1 - ), - Queue('uploads', Exchange('uploads'), routing_key='uploads'), - Queue( - 'documents', Exchange('documents'), routing_key='documents' - ), - ) - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.documents.tasks.task_check_delete_periods': { - 'queue': 'documents_periodic' - }, - 'mayan.apps.documents.tasks.task_check_trash_periods': { - 'queue': 'documents_periodic' - }, - 'mayan.apps.documents.tasks.task_clean_empty_duplicate_lists': { - 'queue': 'documents' - }, - 'mayan.apps.documents.tasks.task_clear_image_cache': { - 'queue': 'tools' - }, - 'mayan.apps.documents.tasks.task_delete_document': { - 'queue': 'documents' - }, - 'mayan.apps.documents.tasks.task_delete_stubs': { - 'queue': 'documents_periodic' - }, - 'mayan.apps.documents.tasks.task_generate_document_page_image': { - 'queue': 'converter' - }, - 'mayan.apps.documents.tasks.task_scan_duplicates_all': { - 'queue': 'tools' - }, - 'mayan.apps.documents.tasks.task_scan_duplicates_for': { - 'queue': 'uploads' - }, - 'mayan.apps.documents.tasks.task_update_page_count': { - 'queue': 'uploads' - }, - 'mayan.apps.documents.tasks.task_upload_new_version': { - 'queue': 'uploads' - }, - } - ) - dashboard_main.add_widget( widget=DashboardWidgetDocumentsTotal, order=0 ) diff --git a/mayan/apps/documents/links.py b/mayan/apps/documents/links.py index 59bc340029..7bc3e9f47e 100644 --- a/mayan/apps/documents/links.py +++ b/mayan/apps/documents/links.py @@ -8,27 +8,15 @@ from mayan.apps.converter.permissions import ( from mayan.apps.navigation.classes import Link from .icons import ( - icon_clear_image_cache, - - icon_document_list_recent_access, - icon_recent_added_document_list, - icon_document_page_navigation_first, - icon_document_page_navigation_last, - icon_document_page_navigation_next, - icon_document_page_navigation_previous, - icon_document_page_return, - icon_document_page_rotate_left, - icon_document_page_rotate_right, - icon_document_page_zoom_in, - icon_document_page_zoom_out, - - icon_document_type_create, - icon_document_type_delete, - icon_document_type_edit, - icon_document_type_setup, - - icon_duplicated_document_list, - icon_duplicated_document_scan + icon_clear_image_cache, icon_document_list_recent_access, + icon_recent_added_document_list, icon_document_page_navigation_first, + icon_document_page_navigation_last, icon_document_page_navigation_next, + icon_document_page_navigation_previous, icon_document_page_return, + icon_document_page_rotate_left, icon_document_page_rotate_right, + icon_document_page_zoom_in, icon_document_page_zoom_out, + icon_document_type_create, icon_document_type_delete, + icon_document_type_edit, icon_document_type_setup, + icon_duplicated_document_list, icon_duplicated_document_scan ) from .permissions import ( permission_document_delete, permission_document_download, diff --git a/mayan/apps/documents/queues.py b/mayan/apps/documents/queues.py index c97f3dc61c..460b2cd0a2 100644 --- a/mayan/apps/documents/queues.py +++ b/mayan/apps/documents/queues.py @@ -1,10 +1,17 @@ from __future__ import absolute_import, unicode_literals +from datetime import timedelta + from django.utils.translation import ugettext_lazy as _ from mayan.apps.common.queues import queue_tools from mayan.apps.task_manager.classes import CeleryQueue +from .literals import ( + CHECK_DELETE_PERIOD_INTERVAL, CHECK_TRASH_PERIOD_INTERVAL, + DELETE_STALE_STUBS_INTERVAL +) + queue_converter = CeleryQueue( name='converter', label=_('Converter'), transient=True ) @@ -19,42 +26,58 @@ queue_documents = CeleryQueue( ) queue_converter.add_task_type( - name='mayan.apps.documents.tasks.task_generate_document_page_image', + dotted_path='mayan.apps.documents.tasks.task_generate_document_page_image', label=_('Generate document page image') ) queue_documents.add_task_type( - name='mayan.apps.documents.tasks.task_delete_document', + dotted_path='mayan.apps.documents.tasks.task_delete_document', label=_('Delete a document') ) queue_documents.add_task_type( - name='mayan.apps.documents.tasks.task_clean_empty_duplicate_lists', + dotted_path='mayan.apps.documents.tasks.task_clean_empty_duplicate_lists', label=_('Clean empty duplicate lists') ) queue_documents_periodic.add_task_type( - name='mayan.apps.documents.tasks.task_check_delete_periods', - label=_('Check document type delete periods') + dotted_path='mayan.apps.documents.tasks.task_check_delete_periods', + label=_('Check document type delete periods'), + name='task_check_delete_periods', + schedule=timedelta( + seconds=CHECK_DELETE_PERIOD_INTERVAL + ), ) queue_documents_periodic.add_task_type( - name='mayan.apps.documents.tasks.task_check_trash_periods', - label=_('Check document type trash periods') + dotted_path='mayan.apps.documents.tasks.task_check_trash_periods', + label=_('Check document type trash periods'), + name='task_check_trash_periods', + schedule=timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL), ) queue_documents_periodic.add_task_type( - name='mayan.apps.documents.tasks.task_delete_stubs', - label=_('Delete document stubs') + dotted_path='mayan.apps.documents.tasks.task_delete_stubs', + label=_('Delete document stubs'), + name='task_delete_stubs', + schedule=timedelta(seconds=DELETE_STALE_STUBS_INTERVAL), ) queue_tools.add_task_type( - name='mayan.apps.documents.tasks.task_clear_image_cache', + dotted_path='mayan.apps.documents.tasks.task_clear_image_cache', label=_('Clear image cache') ) +queue_tools.add_task_type( + dotted_path='mayan.apps.documents.tasks.task_scan_duplicates_all', + label=_('Duplicated document scan') +) queue_uploads.add_task_type( - name='mayan.apps.documents.tasks.task_update_page_count', + dotted_path='mayan.apps.documents.tasks.task_update_page_count', label=_('Update document page count') ) queue_uploads.add_task_type( - name='mayan.apps.documents.tasks.task_upload_new_version', + dotted_path='mayan.apps.documents.tasks.task_upload_new_version', label=_('Upload new document version') ) +queue_uploads.add_task_type( + dotted_path='mayan.apps.documents.tasks.task_scan_duplicates_for', + label=_('Scan document duplicates') +) diff --git a/mayan/apps/mailer/apps.py b/mayan/apps/mailer/apps.py index e193363acd..61d5860136 100644 --- a/mayan/apps/mailer/apps.py +++ b/mayan/apps/mailer/apps.py @@ -1,7 +1,5 @@ from __future__ import unicode_literals -from kombu import Exchange, Queue - from django.apps import apps from django.utils.translation import ugettext_lazy as _ @@ -19,7 +17,6 @@ from mayan.apps.events.links import ( link_events_for_object, link_object_event_types_user_subcriptions_list ) from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .classes import MailerBackend from .events import event_email_sent @@ -35,7 +32,6 @@ from .permissions import ( permission_user_mailer_delete, permission_user_mailer_edit, permission_user_mailer_use, permission_user_mailer_view, ) -from .queues import * # NOQA class MailerApp(MayanAppConfig): @@ -98,18 +94,6 @@ class MailerApp(MayanAppConfig): ) ) - app.conf.CELERY_QUEUES.append( - Queue('mailing', Exchange('mailing'), routing_key='mailing'), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.mailer.tasks.task_send_document': { - 'queue': 'mailing' - }, - } - ) - menu_list_facet.bind_links( links=( link_acl_list, link_events_for_object, diff --git a/mayan/apps/mailer/queues.py b/mayan/apps/mailer/queues.py index b086590da5..23906b02cd 100644 --- a/mayan/apps/mailer/queues.py +++ b/mayan/apps/mailer/queues.py @@ -7,5 +7,5 @@ from mayan.apps.task_manager.classes import CeleryQueue queue_mailing = CeleryQueue(label=_('Mailing'), name='mailing') queue_mailing.add_task_type( label=_('Send document'), - name='mayan.apps.mailer.tasks.task_send_document' + dotted_path='mayan.apps.mailer.tasks.task_send_document' ) diff --git a/mayan/apps/mayan_statistics/apps.py b/mayan/apps/mayan_statistics/apps.py index de22da4c7c..78ebd94189 100644 --- a/mayan/apps/mayan_statistics/apps.py +++ b/mayan/apps/mayan_statistics/apps.py @@ -1,13 +1,10 @@ from __future__ import unicode_literals -from kombu import Exchange, Queue - from django.utils.translation import ugettext_lazy as _ from mayan.apps.common.apps import MayanAppConfig from mayan.apps.common.menus import menu_object, menu_secondary, menu_tools from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .classes import StatisticLineChart, StatisticNamespace from .dependencies import * # NOQA @@ -15,7 +12,6 @@ from .links import ( link_execute, link_namespace_details, link_namespace_list, link_statistics, link_view ) -from .queues import * # NOQA from .tasks import task_execute_statistic # NOQA - Force registration of task @@ -37,23 +33,6 @@ class StatisticsApp(MayanAppConfig): attribute='schedule', ) - app.conf.CELERY_QUEUES.extend( - ( - Queue( - 'statistics', Exchange('statistics'), - routing_key='statistics', delivery_mode=1 - ), - ) - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.mayan_statistics.tasks.task_execute_statistic': { - 'queue': 'statistics' - }, - } - ) - menu_object.bind_links( links=(link_execute, link_view), sources=(StatisticLineChart,) ) diff --git a/mayan/apps/mayan_statistics/queues.py b/mayan/apps/mayan_statistics/queues.py index 29a7881032..e529f59f24 100644 --- a/mayan/apps/mayan_statistics/queues.py +++ b/mayan/apps/mayan_statistics/queues.py @@ -10,5 +10,5 @@ queue_statistics = CeleryQueue( queue_statistics.add_task_type( label=_('Execute statistic'), - name='mayan.apps.mayan_statistics.tasks.task_execute_statistic' + dotted_path='mayan.apps.mayan_statistics.tasks.task_execute_statistic' ) diff --git a/mayan/apps/metadata/apps.py b/mayan/apps/metadata/apps.py index 9aae81f458..44fd6bb546 100644 --- a/mayan/apps/metadata/apps.py +++ b/mayan/apps/metadata/apps.py @@ -2,8 +2,6 @@ from __future__ import absolute_import, unicode_literals import logging -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import post_delete, post_save from django.utils.translation import ugettext_lazy as _ @@ -25,7 +23,6 @@ from mayan.apps.events.links import ( link_events_for_object, link_object_event_types_user_subcriptions_list, ) from mayan.apps.events.permissions import permission_events_view -from mayan.celery import app from mayan.apps.navigation.classes import SourceColumn from .classes import DocumentMetadataHelper @@ -55,7 +52,6 @@ from .permissions import ( permission_metadata_type_view ) -from .queues import * # NOQA from .search import metadata_type_search # NOQA from .widgets import get_metadata_string @@ -184,21 +180,6 @@ class MetadataApp(MayanAppConfig): ) SourceColumn(attribute='name', is_sortable=True, source=MetadataType) - app.conf.CELERY_QUEUES.append( - Queue('metadata', Exchange('metadata'), routing_key='metadata'), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.metadata.tasks.task_remove_metadata_type': { - 'queue': 'metadata' - }, - 'mayan.apps.metadata.tasks.task_add_required_metadata_type': { - 'queue': 'metadata' - }, - } - ) - document_search.add_model_field( field='metadata__metadata_type__name', label=_('Metadata type') ) diff --git a/mayan/apps/metadata/queues.py b/mayan/apps/metadata/queues.py index c57d66cc4c..ba86bbf12d 100644 --- a/mayan/apps/metadata/queues.py +++ b/mayan/apps/metadata/queues.py @@ -9,9 +9,9 @@ queue_metadata = CeleryQueue( ) queue_metadata.add_task_type( label=_('Remove metadata type'), - name='mayan.apps.metadata.tasks.task_remove_metadata_type' + dotted_path='mayan.apps.metadata.tasks.task_remove_metadata_type' ) queue_metadata.add_task_type( label=_('Add required metadata type'), - name='mayan.apps.metadata.tasks.task_add_required_metadata_type' + dotted_path='mayan.apps.metadata.tasks.task_add_required_metadata_type' ) diff --git a/mayan/apps/ocr/apps.py b/mayan/apps/ocr/apps.py index d0f3dba0c4..93a7e34d70 100644 --- a/mayan/apps/ocr/apps.py +++ b/mayan/apps/ocr/apps.py @@ -2,8 +2,6 @@ from __future__ import unicode_literals import logging -from kombu import Exchange, Queue - from django.apps import apps from django.db.models.signals import post_save from django.utils.translation import ugettext_lazy as _ @@ -17,7 +15,6 @@ from mayan.apps.common.menus import ( from mayan.apps.documents.search import document_search, document_page_search from mayan.apps.documents.signals import post_version_upload from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .dependencies import * # NOQA from .handlers import ( @@ -38,7 +35,6 @@ from .permissions import ( permission_document_type_ocr_setup, permission_ocr_document, permission_ocr_content_view ) -from .queues import * # NOQA from .signals import post_document_version_ocr from .utils import get_document_ocr_content @@ -117,18 +113,6 @@ class OCRApp(MayanAppConfig): attribute='result' ) - app.conf.CELERY_QUEUES.append( - Queue('ocr', Exchange('ocr'), routing_key='ocr'), - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.ocr.tasks.task_do_ocr': { - 'queue': 'ocr' - }, - } - ) - document_search.add_model_field( field='versions__pages__ocr_content__content', label=_('OCR') ) diff --git a/mayan/apps/ocr/queues.py b/mayan/apps/ocr/queues.py index 537dae91c1..be89a399e1 100644 --- a/mayan/apps/ocr/queues.py +++ b/mayan/apps/ocr/queues.py @@ -6,5 +6,6 @@ from mayan.apps.task_manager.classes import CeleryQueue queue_ocr = CeleryQueue(name='ocr', label=_('OCR')) queue_ocr.add_task_type( - name='mayan.apps.ocr.tasks.task_do_ocr', label=_('Document version OCR') + dotted_path='mayan.apps.ocr.tasks.task_do_ocr', + label=_('Document version OCR') ) diff --git a/mayan/apps/sources/apps.py b/mayan/apps/sources/apps.py index 32de8a78fc..f74954b481 100644 --- a/mayan/apps/sources/apps.py +++ b/mayan/apps/sources/apps.py @@ -2,8 +2,6 @@ from __future__ import absolute_import, unicode_literals from django.utils.translation import ugettext_lazy as _ -from kombu import Exchange, Queue - from mayan.apps.common.apps import MayanAppConfig from mayan.apps.common.classes import MissingItem from mayan.apps.common.html_widgets import TwoStateWidget @@ -15,7 +13,6 @@ from mayan.apps.converter.links import link_transformation_list from mayan.apps.documents.menus import menu_documents from mayan.apps.documents.signals import post_version_upload from mayan.apps.navigation.classes import SourceColumn -from mayan.celery import app from .classes import StagingFile from .dependencies import * # NOQA @@ -32,7 +29,6 @@ from .links import ( link_setup_source_edit, link_setup_source_logs, link_staging_file_delete, link_document_version_upload ) -from .queues import * # NOQA from .widgets import StagingFileThumbnailWidget @@ -105,38 +101,6 @@ class SourcesApp(MayanAppConfig): func=lambda context: context['object'].message ) - app.conf.CELERY_QUEUES.extend( - ( - Queue( - 'sources', Exchange('sources'), routing_key='sources' - ), - Queue( - 'sources_fast', Exchange('sources_fast'), - routing_key='sources_fast', delivery_mode=1 - ), - Queue( - 'sources_periodic', Exchange('sources_periodic'), - routing_key='sources_periodic', delivery_mode=1 - ), - ) - ) - - app.conf.CELERY_ROUTES.update( - { - 'mayan.apps.sources.tasks.task_check_interval_source': { - 'queue': 'sources_periodic' - }, - 'mayan.apps.sources.tasks.task_generate_staging_file_image': { - 'queue': 'sources_fast' - }, - 'mayan.apps.sources.tasks.task_source_handle_upload': { - 'queue': 'sources' - }, - 'mayan.apps.sources.tasks.task_upload_document': { - 'queue': 'sources' - }, - } - ) menu_documents.bind_links(links=(link_document_create_multiple,)) menu_list_facet.bind_links( diff --git a/mayan/apps/sources/queues.py b/mayan/apps/sources/queues.py index 8793292a97..56eb0b36f0 100644 --- a/mayan/apps/sources/queues.py +++ b/mayan/apps/sources/queues.py @@ -16,17 +16,17 @@ queue_sources_fast = CeleryQueue( queue_sources_fast.add_task_type( label=_('Generate staging file image'), - name='mayan.apps.sources.tasks.task_generate_staging_file_image' + dotted_path='mayan.apps.sources.tasks.task_generate_staging_file_image' ) queue_sources_periodic.add_task_type( label=_('Check interval source'), - name='mayan.apps.sources.tasks.task_check_interval_source' + dotted_path='mayan.apps.sources.tasks.task_check_interval_source' ) queue_sources.add_task_type( label=_('Handle upload'), - name='mayan.apps.sources.tasks.task_source_handle_upload' + dotted_path='mayan.apps.sources.tasks.task_source_handle_upload' ) queue_sources.add_task_type( label=_('Upload document'), - name='mayan.apps.sources.tasks.task_upload_document' + dotted_path='mayan.apps.sources.tasks.task_upload_document' ) diff --git a/mayan/apps/task_manager/apps.py b/mayan/apps/task_manager/apps.py index b6c4fbb2d5..9eb2ab6a49 100644 --- a/mayan/apps/task_manager/apps.py +++ b/mayan/apps/task_manager/apps.py @@ -25,6 +25,8 @@ class TaskManagerApp(MayanAppConfig): def ready(self): super(TaskManagerApp, self).ready() + CeleryQueue.initialize() + SourceColumn( source=CeleryQueue, label=_('Label'), attribute='label' ) diff --git a/mayan/apps/task_manager/classes.py b/mayan/apps/task_manager/classes.py index 53bc727977..4b484c61fa 100644 --- a/mayan/apps/task_manager/classes.py +++ b/mayan/apps/task_manager/classes.py @@ -1,13 +1,23 @@ from __future__ import absolute_import, unicode_literals from datetime import timedelta +from importlib import import_module +import logging + +from kombu import Exchange, Queue from celery.five import monotonic from celery.task.control import inspect +from django.apps import apps + from django.utils.encoding import force_text, python_2_unicode_compatible from django.utils.timezone import now +from mayan.celery import app as celery_app + +logger = logging.getLogger(__name__) + @python_2_unicode_compatible class TaskType(object): @@ -21,9 +31,11 @@ class TaskType(object): def get(cls, name): return cls._registry[name] - def __init__(self, name, label): - self.name = name + def __init__(self, dotted_path, label, name=None, schedule=None): + self.name = name or dotted_path.split('.')[-1] self.label = label + self.dotted_path = dotted_path + self.schedule = schedule self.__class__._registry[name] = self def __str__(self): @@ -52,6 +64,20 @@ class CeleryQueue(object): _registry = {} _inspect_instance = inspect() + @staticmethod + def initialize(): + for app in apps.get_app_configs(): + try: + import_module('{}.queues'.format(app.name)) + except ImportError as exception: + if force_text(exception) not in ('No module named queues', 'No module named \'{}.queues\''.format(app.name)): + logger.error( + 'Error importing %s queues.py file; %s', app.name, + exception + ) + + CeleryQueue.update_celery() + @classmethod def all(cls): return sorted( @@ -62,17 +88,32 @@ class CeleryQueue(object): def get(cls, queue_name): return cls._registry[queue_name] - def __init__(self, name, label, is_default_queue=False, transient=False): + @classmethod + def update_celery(cls): + for instance in cls.all(): + instance._update_celery() + + def __init__(self, name, label, default_queue=False, transient=False): self.name = name self.label = label - self.is_default_queue = is_default_queue - self.is_transient = transient + self.default_queue = default_queue + self.transient = transient self.task_types = [] self.__class__._registry[name] = self def __str__(self): return force_text(self.label) + def _process_task_dictionary(self, task_dictionary): + result = [] + for worker, tasks in task_dictionary.items(): + for task in tasks: + if task['delivery_info']['routing_key'] == self.name: + task_type = TaskType.get(name=task['name']) + result.append(Task(task_type=task_type, kwargs=task)) + + return result + def add_task_type(self, *args, **kwargs): self.task_types.append(TaskType(*args, **kwargs)) @@ -91,12 +132,35 @@ class CeleryQueue(object): task_dictionary=self.__class__._inspect_instance.scheduled() ) - def _process_task_dictionary(self, task_dictionary): - result = [] - for worker, tasks in task_dictionary.items(): - for task in tasks: - if task['delivery_info']['routing_key'] == self.name: - task_type = TaskType.get(name=task['name']) - result.append(Task(task_type=task_type, kwargs=task)) + def _update_celery(self): + kwargs = { + 'name': self.name, 'exchange': Exchange(self.name), + 'routing_key': self.name + } - return result + if self.transient: + kwargs['delivery_mode'] = 1 + + celery_app.conf.CELERY_QUEUES.append(Queue(**kwargs)) + + if self.default_queue: + celery_app.conf.CELERY_DEFAULT_QUEUE = self.name + + for task_type in self.task_types: + celery_app.conf.CELERY_ROUTES.update( + { + task_type.dotted_path: { + 'queue': self.name + }, + } + ) + + if task_type.schedule: + celery_app.conf.CELERYBEAT_SCHEDULE.update( + { + task_type.name: { + 'task': task_type.dotted_path, + 'schedule': task_type.schedule + }, + } + )