Move task registration to the CeleryQueue class

Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
Roberto Rosario
2019-05-12 03:34:32 -04:00
parent 1acc352801
commit f76b9e4f3f
29 changed files with 168 additions and 420 deletions

View File

@@ -256,6 +256,9 @@
metadata.
* Create intermedia file cache folder. Fixes preview errors
when the first document uploaded is an office file.
* Move queue and task registration to the CeleryQueue class.
The .queues.py module is now loaded automatically.
3.1.11 (2019-04-XX)
===================

View File

@@ -566,6 +566,8 @@ Other changes
metadata.
* Create intermedia file cache folder. Fixes preview errors
when the first document uploaded is an office file.
* Move queue and task registration to the CeleryQueue class.
The .queues.py module is now loaded automatically.
Removals

View File

@@ -1,9 +1,5 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import pre_save
from django.utils.translation import ugettext_lazy as _
@@ -13,7 +9,6 @@ from mayan.apps.common.apps import MayanAppConfig
from mayan.apps.common.menus import menu_facet, menu_main, menu_secondary
from mayan.apps.dashboards.dashboards import dashboard_main
from mayan.apps.events.classes import ModelEventType
from mayan.celery import app
from .dashboard_widgets import DashboardWidgetTotalCheckouts
from .events import (
@@ -25,7 +20,6 @@ from .links import (
link_check_in_document, link_check_out_document, link_check_out_info,
link_check_out_list
)
from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL
from .methods import (
method_check_in, method_get_check_out_info, method_get_check_out_state,
method_is_checked_out
@@ -34,7 +28,6 @@ from .permissions import (
permission_document_check_in, permission_document_check_in_override,
permission_document_check_out, permission_document_check_out_detail_view
)
from .queues import * # NOQA
from .tasks import task_check_expired_check_outs # NOQA
# This import is required so that celerybeat can find the task
@@ -84,32 +77,6 @@ class CheckoutsApp(MayanAppConfig):
)
)
app.conf.CELERYBEAT_SCHEDULE.update(
{
'task_check_expired_check_outs': {
'task': 'mayan.apps.checkouts.tasks.task_check_expired_check_outs',
'schedule': timedelta(
seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL
),
},
}
)
app.conf.CELERY_QUEUES.append(
Queue(
'checkouts_periodic', Exchange('checkouts_periodic'),
routing_key='checkouts_periodic', delivery_mode=1
),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.checkouts.tasks.task_check_expired_check_outs': {
'queue': 'checkouts_periodic'
},
}
)
dashboard_main.add_widget(
widget=DashboardWidgetTotalCheckouts, order=-1
)

View File

@@ -1,13 +1,21 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from django.utils.translation import ugettext_lazy as _
from mayan.apps.task_manager.classes import CeleryQueue
from .literals import CHECK_EXPIRED_CHECK_OUTS_INTERVAL
queue_checkouts_periodic = CeleryQueue(
label=_('Checkouts periodic'), name='checkouts_periodic', transient=True
)
queue_checkouts_periodic.add_task_type(
label=_('Check expired checkouts'),
name='mayan.apps.task_check_expired_check_outs'
name='task_check_expired_check_outs',
dotted_path='mayan.apps.task_check_expired_check_outs',
schedule=timedelta(
seconds=CHECK_EXPIRED_CHECK_OUTS_INTERVAL
),
)

View File

@@ -1,14 +1,11 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
import logging
import os
import sys
import traceback
import warnings
from kombu import Exchange, Queue
from django import apps
from django.conf import settings
from django.conf.urls import include, url
@@ -17,8 +14,6 @@ from django.db.models.signals import post_save
from django.utils.encoding import force_text
from django.utils.translation import ugettext_lazy as _
from mayan.celery import app
from .classes import Template
from .dependencies import * # NOQA
from .handlers import (
@@ -30,11 +25,10 @@ from .links import (
link_object_error_list_clear, link_setup, link_tools
)
from .literals import DELETE_STALE_UPLOADS_INTERVAL, MESSAGE_SQLITE_WARNING
from .literals import MESSAGE_SQLITE_WARNING
from .menus import (
menu_about, menu_main, menu_secondary, menu_user
)
from .queues import * # NOQA - Force queues registration
from .settings import (
setting_auto_logging, setting_production_error_log_path,
setting_production_error_logging
@@ -100,37 +94,6 @@ class CommonApp(MayanAppConfig):
name='menu_main', template_name='appearance/main_menu.html'
)
app.conf.CELERYBEAT_SCHEDULE.update(
{
'task_delete_stale_uploads': {
'task': 'mayan.apps.common.tasks.task_delete_stale_uploads',
'schedule': timedelta(
seconds=DELETE_STALE_UPLOADS_INTERVAL
),
},
}
)
app.conf.CELERY_QUEUES.extend(
(
Queue('default', Exchange('default'), routing_key='default'),
Queue('tools', Exchange('tools'), routing_key='tools'),
Queue(
'common_periodic', Exchange('common_periodic'),
routing_key='common_periodic', delivery_mode=1
),
)
)
app.conf.CELERY_DEFAULT_QUEUE = 'default'
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.common.tasks.task_delete_stale_uploads': {
'queue': 'common_periodic'
},
}
)
menu_user.bind_links(
links=(
link_current_user_locale_profile_edit,

View File

@@ -1,17 +1,24 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from django.utils.translation import ugettext_lazy as _
from mayan.apps.task_manager.classes import CeleryQueue
from .literals import DELETE_STALE_UPLOADS_INTERVAL
queue_default = CeleryQueue(
is_default_queue=True, label=_('Default'), name='default'
default_queue=True, label=_('Default'), name='default'
)
queue_tools = CeleryQueue(label=_('Tools'), name='tools')
queue_common_periodic = CeleryQueue(
label=_('Common periodic'), name='common_periodic', transient=True
)
queue_common_periodic.add_task_type(
label=_('Delete stale uploads'),
name='mayan.apps.common.tasks.task_delete_stale_uploads'
dotted_path='mayan.apps.common.tasks.task_delete_stale_uploads',
label=_('Delete stale uploads'), name='task_delete_stale_uploads',
schedule=timedelta(
seconds=DELETE_STALE_UPLOADS_INTERVAL
)
)

View File

@@ -1,7 +1,5 @@
from __future__ import absolute_import, unicode_literals
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_delete, post_save, pre_delete
from django.utils.translation import ugettext_lazy as _
@@ -21,7 +19,6 @@ from mayan.apps.events.links import (
link_events_for_object, link_object_event_types_user_subcriptions_list
)
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .events import event_index_template_created, event_index_template_edited
from .handlers import (
@@ -47,7 +44,6 @@ from .permissions import (
permission_document_indexing_instance_view,
permission_document_indexing_rebuild, permission_document_indexing_view
)
from .queues import * # NOQA
class DocumentIndexingApp(MayanAppConfig):
@@ -178,27 +174,6 @@ class DocumentIndexingApp(MayanAppConfig):
), label=_('Documents'), source=DocumentIndexInstanceNode
)
app.conf.CELERY_QUEUES.append(
Queue('indexing', Exchange('indexing'), routing_key='indexing'),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.document_indexing.tasks.task_delete_empty': {
'queue': 'indexing'
},
'mayan.apps.document_indexing.tasks.task_remove_document': {
'queue': 'indexing'
},
'mayan.apps.document_indexing.tasks.task_index_document': {
'queue': 'indexing'
},
'mayan.apps.document_indexing.tasks.task_rebuild_index': {
'queue': 'tools'
},
}
)
menu_facet.bind_links(
links=(link_document_index_instance_list,), sources=(Document,)
)

View File

@@ -9,17 +9,17 @@ queue_indexing = CeleryQueue(label=_('Indexing'), name='indexing')
queue_indexing.add_task_type(
label=_('Delete empty index nodes'),
name='mayan.apps.document_indexing.tasks.task_delete_empty'
dotted_path='mayan.apps.document_indexing.tasks.task_delete_empty'
)
queue_indexing.add_task_type(
label=_('Remove document'),
name='mayan.apps.document_indexing.tasks.task_remove_document'
dotted_path='mayan.apps.document_indexing.tasks.task_remove_document'
)
queue_indexing.add_task_type(
label=_('Index document'),
name='mayan.apps.document_indexing.tasks.task_index_document'
dotted_path='mayan.apps.document_indexing.tasks.task_index_document'
)
queue_tools.add_task_type(
label=_('Rebuild index'),
name='mayan.apps.document_indexing.tasks.task_rebuild_index'
dotted_path='mayan.apps.document_indexing.tasks.task_rebuild_index'
)

View File

@@ -2,8 +2,6 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
@@ -17,7 +15,6 @@ from mayan.apps.common.menus import (
from mayan.apps.documents.search import document_search, document_page_search
from mayan.apps.documents.signals import post_version_upload
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .dependencies import * # NOQA
from .handlers import (
@@ -120,18 +117,6 @@ class DocumentParsingApp(MayanAppConfig):
attribute='result'
)
app.conf.CELERY_QUEUES.append(
Queue('parsing', Exchange('parsing'), routing_key='parsing'),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.document_parsing.tasks.task_parse_document_version': {
'queue': 'parsing'
},
}
)
document_search.add_model_field(
field='versions__pages__content__content', label=_('Content')
)

View File

@@ -6,6 +6,6 @@ from mayan.apps.task_manager.classes import CeleryQueue
queue_ocr = CeleryQueue(name='parsing', label=_('Parsing'))
queue_ocr.add_task_type(
name='mayan.apps.document_parsing.tasks.task_parse_document_version',
dotted_path='mayan.apps.document_parsing.tasks.task_parse_document_version',
label=_('Document version parsing')
)

View File

@@ -2,8 +2,6 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save, post_delete
from django.utils.translation import ugettext_lazy as _
@@ -14,7 +12,6 @@ from mayan.apps.common.menus import (
menu_facet, menu_object, menu_secondary, menu_tools
)
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .handlers import (
handler_unverify_key_signatures, handler_verify_key_signatures
@@ -38,7 +35,6 @@ from .permissions import (
permission_document_version_signature_upload,
permission_document_version_signature_view,
)
from .queues import * # NOQA
logger = logging.getLogger(__name__)
@@ -112,29 +108,6 @@ class DocumentSignaturesApp(MayanAppConfig):
).get_signature_type_display()
)
app.conf.CELERY_QUEUES.append(
Queue(
'signatures', Exchange('signatures'), routing_key='signatures'
),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.document_signatures.tasks.task_handler_verify_key_signatures': {
'queue': 'signatures'
},
'mayan.apps.document_signatures.tasks.task_handler_unhandler_verify_key_signatures': {
'queue': 'signatures'
},
'mayan.apps.document_signatures.tasks.task_verify_document_version': {
'queue': 'signatures'
},
'mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature': {
'queue': 'tools'
},
}
)
menu_facet.bind_links(
links=(link_document_signature_list,), sources=(Document,)
)

View File

@@ -9,18 +9,18 @@ queue_signatures = CeleryQueue(label=_('Signatures'), name='signatures')
queue_signatures.add_task_type(
label=_('Verify key signatures'),
name='mayan.apps.document_signatures.tasks.task_verify_key_signatures'
dotted_path='mayan.apps.document_signatures.tasks.task_verify_key_signatures'
)
queue_signatures.add_task_type(
label=_('Unverify key signatures'),
name='mayan.apps.document_signatures.tasks.task_unverify_key_signatures'
dotted_path='mayan.apps.document_signatures.tasks.task_unverify_key_signatures'
)
queue_signatures.add_task_type(
label=_('Verify document version'),
name='mayan.apps.document_signatures.tasks.task_verify_document_version'
dotted_path='mayan.apps.document_signatures.tasks.task_verify_document_version'
)
queue_tools.add_task_type(
label=_('Verify missing embedded signature'),
name='mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature'
dotted_path='mayan.apps.document_signatures.tasks.task_verify_missing_embedded_signature'
)

View File

@@ -4,8 +4,6 @@ from django.apps import apps
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
from kombu import Exchange, Queue
from mayan.apps.acls.classes import ModelPermission
from mayan.apps.acls.links import link_acl_list
from mayan.apps.common.apps import MayanAppConfig
@@ -22,7 +20,6 @@ from mayan.apps.events.links import (
link_events_for_object, link_object_event_types_user_subcriptions_list
)
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .classes import DocumentStateHelper, WorkflowAction
from .events import event_workflow_created, event_workflow_edited
@@ -53,7 +50,6 @@ from .permissions import (
permission_workflow_delete, permission_workflow_edit,
permission_workflow_transition, permission_workflow_view
)
from .queues import * # NOQA
from .widgets import widget_transition_events
@@ -260,23 +256,6 @@ class DocumentStatesApp(MayanAppConfig):
)
)
app.conf.CELERY_QUEUES.extend(
(
Queue(
'document_states', Exchange('document_states'),
routing_key='document_states'
),
)
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.document_states.tasks.task_launch_all_workflows': {
'queue': 'document_states'
},
}
)
menu_facet.bind_links(
links=(link_document_workflow_instance_list,), sources=(Document,)
)

View File

@@ -8,6 +8,6 @@ queue_document_states = CeleryQueue(
name='document_states', label=_('Document states')
)
queue_document_states.add_task_type(
name='mayan.apps.document_states.tasks.task_launch_all_workflows',
dotted_path='mayan.apps.document_states.tasks.task_launch_all_workflows',
label=_('Launch all workflows')
)

View File

@@ -1,9 +1,5 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from kombu import Exchange, Queue
from django.db.models.signals import post_delete
from django.utils.translation import ugettext_lazy as _
@@ -32,7 +28,6 @@ from mayan.apps.events.links import (
from mayan.apps.events.permissions import permission_events_view
from mayan.apps.navigation.classes import SourceColumn
from mayan.apps.rest_api.fields import DynamicSerializerField
from mayan.celery import app
from .dashboard_widgets import (
DashboardWidgetDocumentPagesTotal, DashboardWidgetDocumentsInTrash,
@@ -83,10 +78,6 @@ from .links import (
link_document_version_view, link_duplicated_document_list,
link_duplicated_document_scan, link_trash_can_empty
)
from .literals import (
CHECK_DELETE_PERIOD_INTERVAL, CHECK_TRASH_PERIOD_INTERVAL,
DELETE_STALE_STUBS_INTERVAL
)
from .menus import menu_documents
from .permissions import (
permission_document_create, permission_document_delete,
@@ -98,7 +89,6 @@ from .permissions import (
permission_document_version_revert, permission_document_version_view,
permission_document_view
)
from .queues import * # NOQA
# Just import to initialize the search models
from .search import document_search, document_page_search # NOQA
from .signals import post_version_upload
@@ -356,80 +346,6 @@ class DocumentsApp(MayanAppConfig):
template_name='documents/invalid_document.html'
)
app.conf.CELERYBEAT_SCHEDULE.update(
{
'task_check_delete_periods': {
'task': 'mayan.apps.documents.tasks.task_check_delete_periods',
'schedule': timedelta(
seconds=CHECK_DELETE_PERIOD_INTERVAL
),
},
'task_check_trash_periods': {
'task': 'mayan.apps.documents.tasks.task_check_trash_periods',
'schedule': timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL),
},
'task_delete_stubs': {
'task': 'mayan.apps.documents.tasks.task_delete_stubs',
'schedule': timedelta(seconds=DELETE_STALE_STUBS_INTERVAL),
},
}
)
app.conf.CELERY_QUEUES.extend(
(
Queue(
'converter', Exchange('converter'),
routing_key='converter', delivery_mode=1
),
Queue(
'documents_periodic', Exchange('documents_periodic'),
routing_key='documents_periodic', delivery_mode=1
),
Queue('uploads', Exchange('uploads'), routing_key='uploads'),
Queue(
'documents', Exchange('documents'), routing_key='documents'
),
)
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.documents.tasks.task_check_delete_periods': {
'queue': 'documents_periodic'
},
'mayan.apps.documents.tasks.task_check_trash_periods': {
'queue': 'documents_periodic'
},
'mayan.apps.documents.tasks.task_clean_empty_duplicate_lists': {
'queue': 'documents'
},
'mayan.apps.documents.tasks.task_clear_image_cache': {
'queue': 'tools'
},
'mayan.apps.documents.tasks.task_delete_document': {
'queue': 'documents'
},
'mayan.apps.documents.tasks.task_delete_stubs': {
'queue': 'documents_periodic'
},
'mayan.apps.documents.tasks.task_generate_document_page_image': {
'queue': 'converter'
},
'mayan.apps.documents.tasks.task_scan_duplicates_all': {
'queue': 'tools'
},
'mayan.apps.documents.tasks.task_scan_duplicates_for': {
'queue': 'uploads'
},
'mayan.apps.documents.tasks.task_update_page_count': {
'queue': 'uploads'
},
'mayan.apps.documents.tasks.task_upload_new_version': {
'queue': 'uploads'
},
}
)
dashboard_main.add_widget(
widget=DashboardWidgetDocumentsTotal, order=0
)

View File

@@ -8,27 +8,15 @@ from mayan.apps.converter.permissions import (
from mayan.apps.navigation.classes import Link
from .icons import (
icon_clear_image_cache,
icon_document_list_recent_access,
icon_recent_added_document_list,
icon_document_page_navigation_first,
icon_document_page_navigation_last,
icon_document_page_navigation_next,
icon_document_page_navigation_previous,
icon_document_page_return,
icon_document_page_rotate_left,
icon_document_page_rotate_right,
icon_document_page_zoom_in,
icon_document_page_zoom_out,
icon_document_type_create,
icon_document_type_delete,
icon_document_type_edit,
icon_document_type_setup,
icon_duplicated_document_list,
icon_duplicated_document_scan
icon_clear_image_cache, icon_document_list_recent_access,
icon_recent_added_document_list, icon_document_page_navigation_first,
icon_document_page_navigation_last, icon_document_page_navigation_next,
icon_document_page_navigation_previous, icon_document_page_return,
icon_document_page_rotate_left, icon_document_page_rotate_right,
icon_document_page_zoom_in, icon_document_page_zoom_out,
icon_document_type_create, icon_document_type_delete,
icon_document_type_edit, icon_document_type_setup,
icon_duplicated_document_list, icon_duplicated_document_scan
)
from .permissions import (
permission_document_delete, permission_document_download,

View File

@@ -1,10 +1,17 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from django.utils.translation import ugettext_lazy as _
from mayan.apps.common.queues import queue_tools
from mayan.apps.task_manager.classes import CeleryQueue
from .literals import (
CHECK_DELETE_PERIOD_INTERVAL, CHECK_TRASH_PERIOD_INTERVAL,
DELETE_STALE_STUBS_INTERVAL
)
queue_converter = CeleryQueue(
name='converter', label=_('Converter'), transient=True
)
@@ -19,42 +26,58 @@ queue_documents = CeleryQueue(
)
queue_converter.add_task_type(
name='mayan.apps.documents.tasks.task_generate_document_page_image',
dotted_path='mayan.apps.documents.tasks.task_generate_document_page_image',
label=_('Generate document page image')
)
queue_documents.add_task_type(
name='mayan.apps.documents.tasks.task_delete_document',
dotted_path='mayan.apps.documents.tasks.task_delete_document',
label=_('Delete a document')
)
queue_documents.add_task_type(
name='mayan.apps.documents.tasks.task_clean_empty_duplicate_lists',
dotted_path='mayan.apps.documents.tasks.task_clean_empty_duplicate_lists',
label=_('Clean empty duplicate lists')
)
queue_documents_periodic.add_task_type(
name='mayan.apps.documents.tasks.task_check_delete_periods',
label=_('Check document type delete periods')
dotted_path='mayan.apps.documents.tasks.task_check_delete_periods',
label=_('Check document type delete periods'),
name='task_check_delete_periods',
schedule=timedelta(
seconds=CHECK_DELETE_PERIOD_INTERVAL
),
)
queue_documents_periodic.add_task_type(
name='mayan.apps.documents.tasks.task_check_trash_periods',
label=_('Check document type trash periods')
dotted_path='mayan.apps.documents.tasks.task_check_trash_periods',
label=_('Check document type trash periods'),
name='task_check_trash_periods',
schedule=timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL),
)
queue_documents_periodic.add_task_type(
name='mayan.apps.documents.tasks.task_delete_stubs',
label=_('Delete document stubs')
dotted_path='mayan.apps.documents.tasks.task_delete_stubs',
label=_('Delete document stubs'),
name='task_delete_stubs',
schedule=timedelta(seconds=DELETE_STALE_STUBS_INTERVAL),
)
queue_tools.add_task_type(
name='mayan.apps.documents.tasks.task_clear_image_cache',
dotted_path='mayan.apps.documents.tasks.task_clear_image_cache',
label=_('Clear image cache')
)
queue_tools.add_task_type(
dotted_path='mayan.apps.documents.tasks.task_scan_duplicates_all',
label=_('Duplicated document scan')
)
queue_uploads.add_task_type(
name='mayan.apps.documents.tasks.task_update_page_count',
dotted_path='mayan.apps.documents.tasks.task_update_page_count',
label=_('Update document page count')
)
queue_uploads.add_task_type(
name='mayan.apps.documents.tasks.task_upload_new_version',
dotted_path='mayan.apps.documents.tasks.task_upload_new_version',
label=_('Upload new document version')
)
queue_uploads.add_task_type(
dotted_path='mayan.apps.documents.tasks.task_scan_duplicates_for',
label=_('Scan document duplicates')
)

View File

@@ -1,7 +1,5 @@
from __future__ import unicode_literals
from kombu import Exchange, Queue
from django.apps import apps
from django.utils.translation import ugettext_lazy as _
@@ -19,7 +17,6 @@ from mayan.apps.events.links import (
link_events_for_object, link_object_event_types_user_subcriptions_list
)
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .classes import MailerBackend
from .events import event_email_sent
@@ -35,7 +32,6 @@ from .permissions import (
permission_user_mailer_delete, permission_user_mailer_edit,
permission_user_mailer_use, permission_user_mailer_view,
)
from .queues import * # NOQA
class MailerApp(MayanAppConfig):
@@ -98,18 +94,6 @@ class MailerApp(MayanAppConfig):
)
)
app.conf.CELERY_QUEUES.append(
Queue('mailing', Exchange('mailing'), routing_key='mailing'),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.mailer.tasks.task_send_document': {
'queue': 'mailing'
},
}
)
menu_list_facet.bind_links(
links=(
link_acl_list, link_events_for_object,

View File

@@ -7,5 +7,5 @@ from mayan.apps.task_manager.classes import CeleryQueue
queue_mailing = CeleryQueue(label=_('Mailing'), name='mailing')
queue_mailing.add_task_type(
label=_('Send document'),
name='mayan.apps.mailer.tasks.task_send_document'
dotted_path='mayan.apps.mailer.tasks.task_send_document'
)

View File

@@ -1,13 +1,10 @@
from __future__ import unicode_literals
from kombu import Exchange, Queue
from django.utils.translation import ugettext_lazy as _
from mayan.apps.common.apps import MayanAppConfig
from mayan.apps.common.menus import menu_object, menu_secondary, menu_tools
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .classes import StatisticLineChart, StatisticNamespace
from .dependencies import * # NOQA
@@ -15,7 +12,6 @@ from .links import (
link_execute, link_namespace_details, link_namespace_list,
link_statistics, link_view
)
from .queues import * # NOQA
from .tasks import task_execute_statistic # NOQA - Force registration of task
@@ -37,23 +33,6 @@ class StatisticsApp(MayanAppConfig):
attribute='schedule',
)
app.conf.CELERY_QUEUES.extend(
(
Queue(
'statistics', Exchange('statistics'),
routing_key='statistics', delivery_mode=1
),
)
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.mayan_statistics.tasks.task_execute_statistic': {
'queue': 'statistics'
},
}
)
menu_object.bind_links(
links=(link_execute, link_view), sources=(StatisticLineChart,)
)

View File

@@ -10,5 +10,5 @@ queue_statistics = CeleryQueue(
queue_statistics.add_task_type(
label=_('Execute statistic'),
name='mayan.apps.mayan_statistics.tasks.task_execute_statistic'
dotted_path='mayan.apps.mayan_statistics.tasks.task_execute_statistic'
)

View File

@@ -2,8 +2,6 @@ from __future__ import absolute_import, unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_delete, post_save
from django.utils.translation import ugettext_lazy as _
@@ -25,7 +23,6 @@ from mayan.apps.events.links import (
link_events_for_object, link_object_event_types_user_subcriptions_list,
)
from mayan.apps.events.permissions import permission_events_view
from mayan.celery import app
from mayan.apps.navigation.classes import SourceColumn
from .classes import DocumentMetadataHelper
@@ -55,7 +52,6 @@ from .permissions import (
permission_metadata_type_view
)
from .queues import * # NOQA
from .search import metadata_type_search # NOQA
from .widgets import get_metadata_string
@@ -184,21 +180,6 @@ class MetadataApp(MayanAppConfig):
)
SourceColumn(attribute='name', is_sortable=True, source=MetadataType)
app.conf.CELERY_QUEUES.append(
Queue('metadata', Exchange('metadata'), routing_key='metadata'),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.metadata.tasks.task_remove_metadata_type': {
'queue': 'metadata'
},
'mayan.apps.metadata.tasks.task_add_required_metadata_type': {
'queue': 'metadata'
},
}
)
document_search.add_model_field(
field='metadata__metadata_type__name', label=_('Metadata type')
)

View File

@@ -9,9 +9,9 @@ queue_metadata = CeleryQueue(
)
queue_metadata.add_task_type(
label=_('Remove metadata type'),
name='mayan.apps.metadata.tasks.task_remove_metadata_type'
dotted_path='mayan.apps.metadata.tasks.task_remove_metadata_type'
)
queue_metadata.add_task_type(
label=_('Add required metadata type'),
name='mayan.apps.metadata.tasks.task_add_required_metadata_type'
dotted_path='mayan.apps.metadata.tasks.task_add_required_metadata_type'
)

View File

@@ -2,8 +2,6 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
@@ -17,7 +15,6 @@ from mayan.apps.common.menus import (
from mayan.apps.documents.search import document_search, document_page_search
from mayan.apps.documents.signals import post_version_upload
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .dependencies import * # NOQA
from .handlers import (
@@ -38,7 +35,6 @@ from .permissions import (
permission_document_type_ocr_setup, permission_ocr_document,
permission_ocr_content_view
)
from .queues import * # NOQA
from .signals import post_document_version_ocr
from .utils import get_document_ocr_content
@@ -117,18 +113,6 @@ class OCRApp(MayanAppConfig):
attribute='result'
)
app.conf.CELERY_QUEUES.append(
Queue('ocr', Exchange('ocr'), routing_key='ocr'),
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.ocr.tasks.task_do_ocr': {
'queue': 'ocr'
},
}
)
document_search.add_model_field(
field='versions__pages__ocr_content__content', label=_('OCR')
)

View File

@@ -6,5 +6,6 @@ from mayan.apps.task_manager.classes import CeleryQueue
queue_ocr = CeleryQueue(name='ocr', label=_('OCR'))
queue_ocr.add_task_type(
name='mayan.apps.ocr.tasks.task_do_ocr', label=_('Document version OCR')
dotted_path='mayan.apps.ocr.tasks.task_do_ocr',
label=_('Document version OCR')
)

View File

@@ -2,8 +2,6 @@ from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from kombu import Exchange, Queue
from mayan.apps.common.apps import MayanAppConfig
from mayan.apps.common.classes import MissingItem
from mayan.apps.common.html_widgets import TwoStateWidget
@@ -15,7 +13,6 @@ from mayan.apps.converter.links import link_transformation_list
from mayan.apps.documents.menus import menu_documents
from mayan.apps.documents.signals import post_version_upload
from mayan.apps.navigation.classes import SourceColumn
from mayan.celery import app
from .classes import StagingFile
from .dependencies import * # NOQA
@@ -32,7 +29,6 @@ from .links import (
link_setup_source_edit, link_setup_source_logs, link_staging_file_delete,
link_document_version_upload
)
from .queues import * # NOQA
from .widgets import StagingFileThumbnailWidget
@@ -105,38 +101,6 @@ class SourcesApp(MayanAppConfig):
func=lambda context: context['object'].message
)
app.conf.CELERY_QUEUES.extend(
(
Queue(
'sources', Exchange('sources'), routing_key='sources'
),
Queue(
'sources_fast', Exchange('sources_fast'),
routing_key='sources_fast', delivery_mode=1
),
Queue(
'sources_periodic', Exchange('sources_periodic'),
routing_key='sources_periodic', delivery_mode=1
),
)
)
app.conf.CELERY_ROUTES.update(
{
'mayan.apps.sources.tasks.task_check_interval_source': {
'queue': 'sources_periodic'
},
'mayan.apps.sources.tasks.task_generate_staging_file_image': {
'queue': 'sources_fast'
},
'mayan.apps.sources.tasks.task_source_handle_upload': {
'queue': 'sources'
},
'mayan.apps.sources.tasks.task_upload_document': {
'queue': 'sources'
},
}
)
menu_documents.bind_links(links=(link_document_create_multiple,))
menu_list_facet.bind_links(

View File

@@ -16,17 +16,17 @@ queue_sources_fast = CeleryQueue(
queue_sources_fast.add_task_type(
label=_('Generate staging file image'),
name='mayan.apps.sources.tasks.task_generate_staging_file_image'
dotted_path='mayan.apps.sources.tasks.task_generate_staging_file_image'
)
queue_sources_periodic.add_task_type(
label=_('Check interval source'),
name='mayan.apps.sources.tasks.task_check_interval_source'
dotted_path='mayan.apps.sources.tasks.task_check_interval_source'
)
queue_sources.add_task_type(
label=_('Handle upload'),
name='mayan.apps.sources.tasks.task_source_handle_upload'
dotted_path='mayan.apps.sources.tasks.task_source_handle_upload'
)
queue_sources.add_task_type(
label=_('Upload document'),
name='mayan.apps.sources.tasks.task_upload_document'
dotted_path='mayan.apps.sources.tasks.task_upload_document'
)

View File

@@ -25,6 +25,8 @@ class TaskManagerApp(MayanAppConfig):
def ready(self):
super(TaskManagerApp, self).ready()
CeleryQueue.initialize()
SourceColumn(
source=CeleryQueue, label=_('Label'), attribute='label'
)

View File

@@ -1,13 +1,23 @@
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from importlib import import_module
import logging
from kombu import Exchange, Queue
from celery.five import monotonic
from celery.task.control import inspect
from django.apps import apps
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.timezone import now
from mayan.celery import app as celery_app
logger = logging.getLogger(__name__)
@python_2_unicode_compatible
class TaskType(object):
@@ -21,9 +31,11 @@ class TaskType(object):
def get(cls, name):
return cls._registry[name]
def __init__(self, name, label):
self.name = name
def __init__(self, dotted_path, label, name=None, schedule=None):
self.name = name or dotted_path.split('.')[-1]
self.label = label
self.dotted_path = dotted_path
self.schedule = schedule
self.__class__._registry[name] = self
def __str__(self):
@@ -52,6 +64,20 @@ class CeleryQueue(object):
_registry = {}
_inspect_instance = inspect()
@staticmethod
def initialize():
for app in apps.get_app_configs():
try:
import_module('{}.queues'.format(app.name))
except ImportError as exception:
if force_text(exception) not in ('No module named queues', 'No module named \'{}.queues\''.format(app.name)):
logger.error(
'Error importing %s queues.py file; %s', app.name,
exception
)
CeleryQueue.update_celery()
@classmethod
def all(cls):
return sorted(
@@ -62,17 +88,32 @@ class CeleryQueue(object):
def get(cls, queue_name):
return cls._registry[queue_name]
def __init__(self, name, label, is_default_queue=False, transient=False):
@classmethod
def update_celery(cls):
for instance in cls.all():
instance._update_celery()
def __init__(self, name, label, default_queue=False, transient=False):
self.name = name
self.label = label
self.is_default_queue = is_default_queue
self.is_transient = transient
self.default_queue = default_queue
self.transient = transient
self.task_types = []
self.__class__._registry[name] = self
def __str__(self):
return force_text(self.label)
def _process_task_dictionary(self, task_dictionary):
result = []
for worker, tasks in task_dictionary.items():
for task in tasks:
if task['delivery_info']['routing_key'] == self.name:
task_type = TaskType.get(name=task['name'])
result.append(Task(task_type=task_type, kwargs=task))
return result
def add_task_type(self, *args, **kwargs):
self.task_types.append(TaskType(*args, **kwargs))
@@ -91,12 +132,35 @@ class CeleryQueue(object):
task_dictionary=self.__class__._inspect_instance.scheduled()
)
def _process_task_dictionary(self, task_dictionary):
result = []
for worker, tasks in task_dictionary.items():
for task in tasks:
if task['delivery_info']['routing_key'] == self.name:
task_type = TaskType.get(name=task['name'])
result.append(Task(task_type=task_type, kwargs=task))
def _update_celery(self):
kwargs = {
'name': self.name, 'exchange': Exchange(self.name),
'routing_key': self.name
}
return result
if self.transient:
kwargs['delivery_mode'] = 1
celery_app.conf.CELERY_QUEUES.append(Queue(**kwargs))
if self.default_queue:
celery_app.conf.CELERY_DEFAULT_QUEUE = self.name
for task_type in self.task_types:
celery_app.conf.CELERY_ROUTES.update(
{
task_type.dotted_path: {
'queue': self.name
},
}
)
if task_type.schedule:
celery_app.conf.CELERYBEAT_SCHEDULE.update(
{
task_type.name: {
'task': task_type.dotted_path,
'schedule': task_type.schedule
},
}
)