Assign tasks to specific queues. Add support for transient queues. gh-issue #222, gh-issue #230.

This commit is contained in:
Roberto Rosario
2015-07-10 21:25:20 -04:00
parent e891fe9525
commit 133fcdc53c
18 changed files with 202 additions and 74 deletions

View File

@@ -2,6 +2,8 @@ from __future__ import absolute_import, unicode_literals
import logging
from kombu import Exchange, Queue
from django import apps
from django.conf import settings
from django.conf.urls import include, url
@@ -9,6 +11,8 @@ from django.contrib.auth.signals import user_logged_in
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
from mayan.celery import app
from .handlers import (
user_locale_profile_session_config, user_locale_profile_create
)
@@ -47,6 +51,10 @@ class CommonApp(MayanAppConfig):
def ready(self):
super(CommonApp, self).ready()
app.conf.CELERY_QUEUES.append(
Queue('tools', Exchange('tools'), routing_key='tools'),
)
menu_facet.bind_links(links=[link_current_user_details, link_current_user_locale_profile_details, link_tools, link_setup], sources=['common:current_user_details', 'common:current_user_edit', 'common:current_user_locale_profile_details', 'common:current_user_locale_profile_edit', 'authentication:password_change_view', 'common:setup_list', 'common:tools_list'])
menu_main.bind_links(links=[link_about], position=-1)
menu_secondary.bind_links(

View File

@@ -1,5 +1,7 @@
from __future__ import absolute_import, unicode_literals
from kombu import Exchange, Queue
from django.db.models.signals import post_save, post_delete
from django.utils.translation import ugettext_lazy as _
@@ -9,6 +11,7 @@ from common import (
)
from documents.models import Document
from documents.signals import post_document_created
from mayan.celery import app
from metadata.models import DocumentMetadata
from rest_api.classes import APIEndPoint
@@ -38,6 +41,24 @@ class DocumentIndexingApp(MayanAppConfig):
APIEndPoint('indexes', app_name='document_indexing')
app.conf.CELERY_QUEUES.append(
Queue('indexing', Exchange('indexing'), routing_key='indexing'),
)
app.conf.CELERY_ROUTES.update(
{
'document_indexing.tasks.task_delete_empty_index_nodes': {
'queue': 'indexing'
},
'document_indexing.tasks.task_index_document': {
'queue': 'indexing'
},
'document_indexing.tasks.task_do_rebuild_all_indexes': {
'queue': 'tools'
},
}
)
menu_facet.bind_links(links=[link_document_index_list], sources=[Document])
menu_object.bind_links(links=[link_index_setup_edit, link_index_setup_view, link_index_setup_document_types, link_index_setup_delete], sources=[Index])
menu_object.bind_links(links=[link_template_node_create, link_template_node_edit, link_template_node_delete], sources=[IndexTemplateNode])

View File

@@ -4,16 +4,16 @@ from .tasks import task_delete_empty_index_nodes, task_index_document
def document_created_index_update(sender, **kwargs):
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].pk), queue='indexing')
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].pk))
def document_index_delete(sender, **kwargs):
task_delete_empty_index_nodes.apply_async(queue='indexing')
task_delete_empty_index_nodes.apply_async()
def document_metadata_index_update(sender, **kwargs):
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing')
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk))
def document_metadata_index_post_delete(sender, **kwargs):
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing')
task_index_document.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk))

View File

@@ -344,7 +344,7 @@ def rebuild_index_instances(request):
'message': _('On large databases this operation may take some time to execute.'),
}, context_instance=RequestContext(request))
else:
task_do_rebuild_all_indexes.apply_async(queue='tools')
task_do_rebuild_all_indexes.apply_async()
messages.success(request, _('Index rebuild queued successfully.'))
return HttpResponseRedirect(next)

View File

@@ -194,7 +194,7 @@ class APIDocumentImageView(generics.GenericAPIView):
document_page = document.pages.get(page_number=page)
try:
task = task_get_document_page_image.apply_async(kwargs=dict(document_page_id=document_page.pk, size=size, zoom=zoom, rotation=rotation, as_base64=True, version=version), queue='converter')
task = task_get_document_page_image.apply_async(kwargs=dict(document_page_id=document_page.pk, size=size, zoom=zoom, rotation=rotation, as_base64=True, version=version))
# TODO: prepend 'data:%s;base64,%s' based on format specified in
# async call
return Response({

View File

@@ -2,6 +2,8 @@ from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from kombu import Exchange, Queue
from django.utils.translation import ugettext_lazy as _
from actstream import registry
@@ -111,21 +113,49 @@ class DocumentsApp(MayanAppConfig):
SourceColumn(source=DeletedDocument, label=_('Type'), attribute='document_type')
SourceColumn(source=DeletedDocument, label=_('Date time trashed'), attribute='deleted_date_time')
app.conf.CELERYBEAT_SCHEDULE.update({
'task_check_trash_periods': {
'task': 'documents.tasks.task_check_trash_periods',
'schedule': timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL),
'options': {'queue': 'documents'}
},
})
app.conf.CELERYBEAT_SCHEDULE.update({
app.conf.CELERYBEAT_SCHEDULE.update(
{
'task_check_delete_periods': {
'task': 'documents.tasks.task_check_delete_periods',
'schedule': timedelta(seconds=CHECK_DELETE_PERIOD_INTERVAL),
'options': {'queue': 'documents'}
},
})
'task_check_trash_periods': {
'task': 'documents.tasks.task_check_trash_periods',
'schedule': timedelta(seconds=CHECK_TRASH_PERIOD_INTERVAL),
},
}
)
app.conf.CELERY_QUEUES.extend(
(
Queue('converter', Exchange('converter'), routing_key='converter', delivery_mode=1),
Queue('documents_periodic', Exchange('documents_periodic'), routing_key='documents_periodic', delivery_mode=1),
Queue('uploads', Exchange('uploads'), routing_key='uploads'),
)
)
app.conf.CELERY_ROUTES.update(
{
'documents.tasks.task_check_delete_periods': {
'queue': 'documents'
},
'documents.tasks.task_check_trash_periods': {
'queue': 'documents'
},
'documents.tasks.task_clear_image_cache': {
'queue': 'tools'
},
'documents.tasks.task_get_document_page_image': {
'queue': 'converter'
},
'documents.tasks.task_update_page_count': {
'queue': 'tools'
},
'documents.tasks.task_upload_new_version': {
'queue': 'uploads'
},
}
)
menu_front_page.bind_links(links=[link_document_list_recent, link_document_list, link_document_list_deleted])
menu_setup.bind_links(links=[link_document_type_setup])

View File

@@ -189,7 +189,7 @@ class Document(models.Model):
task_upload_new_version.apply_async(kwargs=dict(
shared_uploaded_file_id=shared_uploaded_file.pk,
document_id=self.pk, user_id=user_id,
), queue='uploads')
))
logger.info('New document version queued for document: %s', self)

View File

@@ -17,10 +17,42 @@ from .models import (
logger = logging.getLogger(__name__)
@app.task(compression='zlib')
def task_get_document_page_image(document_page_id, *args, **kwargs):
document_page = DocumentPage.objects.get(pk=document_page_id)
return document_page.get_image(*args, **kwargs)
@app.task(ignore_result=True)
def task_check_delete_periods():
logger.info('Executing')
for document_type in DocumentType.objects.all():
logger.info('Checking deletion period of document type: %s', document_type)
if document_type.delete_time_period and document_type.delete_time_unit:
delta = timedelta(**{document_type.delete_time_unit: document_type.delete_time_period})
logger.info('Document type: %s, has a deletion period delta of: %s', document_type, delta)
for document in DeletedDocument.objects.filter(document_type=document_type):
if now() > document.deleted_date_time + delta:
logger.info('Document "%s" with id: %d, trashed on: %s, exceded delete period', document, document.pk, document.deleted_date_time)
document.delete()
else:
logger.info('Document type: %s, has a no retention delta', document_type)
logger.info('Finshed')
@app.task(ignore_result=True)
def task_check_trash_periods():
logger.info('Executing')
for document_type in DocumentType.objects.all():
logger.info('Checking trash period of document type: %s', document_type)
if document_type.trash_time_period and document_type.trash_time_unit:
delta = timedelta(**{document_type.trash_time_unit: document_type.trash_time_period})
logger.info('Document type: %s, has a trash period delta of: %s', document_type, delta)
for document in Document.objects.filter(document_type=document_type):
if now() > document.date_added + delta:
logger.info('Document "%s" with id: %d, added on: %s, exceded trash period', document, document.pk, document.date_added)
document.delete()
else:
logger.info('Document type: %s, has a no retention delta', document_type)
logger.info('Finshed')
@app.task(ignore_result=True)
@@ -31,6 +63,12 @@ def task_clear_image_cache():
logger.info('Finished document cache invalidation')
@app.task(compression='zlib')
def task_get_document_page_image(document_page_id, *args, **kwargs):
document_page = DocumentPage.objects.get(pk=document_page_id)
return document_page.get_image(*args, **kwargs)
@app.task(ignore_result=True)
def task_update_page_count(version_id):
document_version = DocumentVersion.objects.get(pk=version_id)
@@ -56,41 +94,3 @@ def task_upload_new_version(document_id, shared_uploaded_file_id, user_id, comme
logger.info('Warning during attempt to create new document version for document: %s ; %s', document, warning)
finally:
shared_file.delete()
@app.task(ignore_result=True)
def task_check_trash_periods():
logger.info('Executing')
for document_type in DocumentType.objects.all():
logger.info('Checking trash period of document type: %s', document_type)
if document_type.trash_time_period and document_type.trash_time_unit:
delta = timedelta(**{document_type.trash_time_unit: document_type.trash_time_period})
logger.info('Document type: %s, has a trash period delta of: %s', document_type, delta)
for document in Document.objects.filter(document_type=document_type):
if now() > document.date_added + delta:
logger.info('Document "%s" with id: %d, added on: %s, exceded trash period', document, document.pk, document.date_added)
document.delete()
else:
logger.info('Document type: %s, has a no retention delta', document_type)
logger.info('Finshed')
@app.task(ignore_result=True)
def task_check_delete_periods():
logger.info('Executing')
for document_type in DocumentType.objects.all():
logger.info('Checking deletion period of document type: %s', document_type)
if document_type.delete_time_period and document_type.delete_time_unit:
delta = timedelta(**{document_type.delete_time_unit: document_type.delete_time_period})
logger.info('Document type: %s, has a deletion period delta of: %s', document_type, delta)
for document in DeletedDocument.objects.filter(document_type=document_type):
if now() > document.deleted_date_time + delta:
logger.info('Document "%s" with id: %d, trashed on: %s, exceded delete period', document, document.pk, document.deleted_date_time)
document.delete()
else:
logger.info('Document type: %s, has a no retention delta', document_type)
logger.info('Finshed')

View File

@@ -455,7 +455,7 @@ def get_document_image(request, document_id, size=setting_preview_size.value):
document_page = document.pages.get(page_number=page)
task = task_get_document_page_image.apply_async(kwargs=dict(document_page_id=document_page.pk, size=size, zoom=zoom, rotation=rotation, as_base64=True, version=version), queue='converter')
task = task_get_document_page_image.apply_async(kwargs=dict(document_page_id=document_page.pk, size=size, zoom=zoom, rotation=rotation, as_base64=True, version=version))
data = task.get(timeout=DOCUMENT_IMAGE_TASK_TIMEOUT)
return HttpResponse(base64.b64decode(data[21:]), content_type='image')
@@ -587,7 +587,7 @@ def document_update_page_count(request, document_id=None, document_id_list=None)
if request.method == 'POST':
for document in documents:
task_update_page_count.apply_async(kwargs={'version_id': document.latest_version.pk}, queue='tools')
task_update_page_count.apply_async(kwargs={'version_id': document.latest_version.pk})
messages.success(
request,
@@ -1102,7 +1102,7 @@ def document_clear_image_cache(request):
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL))))
if request.method == 'POST':
task_clear_image_cache.apply_async(queue='tools')
task_clear_image_cache.apply_async()
messages.success(request, _('Document image cache clearing queued successfully.'))
return HttpResponseRedirect(previous)

View File

@@ -1,10 +1,13 @@
from __future__ import unicode_literals
from kombu import Exchange, Queue
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission
from common import MayanAppConfig, menu_object
from documents.models import Document
from mayan.celery import app
from .links import link_send_document_link, link_send_document
from .permissions import (
@@ -25,4 +28,16 @@ class MailerApp(MayanAppConfig):
)
)
app.conf.CELERY_QUEUES.append(
Queue('mailing', Exchange('mailing'), routing_key='mailing'),
)
app.conf.CELERY_ROUTES.update(
{
'mailer.tasks.task_send_document': {
'queue': 'mailing'
},
}
)
menu_object.bind_links(links=[link_send_document_link, link_send_document], sources=[Document])

View File

@@ -65,7 +65,7 @@ def send_document_link(request, document_id=None, document_id_list=None, as_atta
subject_template = Template(form.cleaned_data['subject'])
subject_text = strip_tags(subject_template.render(context))
task_send_document.apply_async(args=(subject_text, body_text_content, request.user.email, form.cleaned_data['email']), kwargs={'document_id': document.pk, 'as_attachment': as_attachment}, queue='mailing')
task_send_document.apply_async(args=(subject_text, body_text_content, request.user.email, form.cleaned_data['email']), kwargs={'document_id': document.pk, 'as_attachment': as_attachment})
# TODO: Pluralize
messages.success(request, _('Successfully queued for delivery via email.'))

View File

@@ -2,6 +2,8 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.db.models.signals import post_delete, post_save
from django.utils.translation import ugettext_lazy as _
@@ -15,6 +17,7 @@ from common.utils import encapsulate
from documents.models import Document, DocumentType
from documents.search import document_search
from documents.signals import post_document_type_change
from mayan.celery import app
from navigation import SourceColumn
from rest_api.classes import APIEndPoint
@@ -67,6 +70,21 @@ class MetadataApp(MayanAppConfig):
SourceColumn(source=Document, label=_('Metadata'), attribute=encapsulate(lambda document: get_metadata_string(document)))
app.conf.CELERY_QUEUES.append(
Queue('metadata', Exchange('metadata'), routing_key='metadata'),
)
app.conf.CELERY_ROUTES.update(
{
'metadata.tasks.task_remove_metadata_type': {
'queue': 'metadata'
},
'metadata.tasks.task_add_required_metadata_type': {
'queue': 'metadata'
},
}
)
document_search.add_model_field(field='metadata__metadata_type__name', label=_('Metadata type'))
document_search.add_model_field(field='metadata__value', label=_('Metadata value'))

View File

@@ -12,12 +12,12 @@ def post_document_type_metadata_type_add(sender, instance, created, **kwargs):
logger.debug('instance: %s', instance)
if created and instance.required:
task_add_required_metadata_type.apply_async(kwargs={'document_type_id': instance.document_type.pk, 'metadata_type_id': instance.metadata_type.pk}, queue='metadata')
task_add_required_metadata_type.apply_async(kwargs={'document_type_id': instance.document_type.pk, 'metadata_type_id': instance.metadata_type.pk})
def post_document_type_metadata_type_delete(sender, instance, **kwargs):
logger.debug('instance: %s', instance)
task_remove_metadata_type.apply_async(kwargs={'document_type_id': instance.document_type.pk, 'metadata_type_id': instance.metadata_type.pk}, queue='metadata')
task_remove_metadata_type.apply_async(kwargs={'document_type_id': instance.document_type.pk, 'metadata_type_id': instance.metadata_type.pk})
def post_post_document_type_change_metadata(sender, instance, **kwargs):

View File

@@ -2,6 +2,7 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
import sh
from django.db.models.signals import post_save
@@ -18,6 +19,7 @@ from documents.search import document_search
from documents.signals import post_version_upload
from documents.widgets import document_link
from installation import PropertyNamespace
from mayan.celery import app
from navigation import SourceColumn
from rest_api.classes import APIEndPoint
@@ -37,11 +39,11 @@ logger = logging.getLogger(__name__)
def document_ocr_submit(self):
task_do_ocr.apply_async(args=[self.latest_version.pk], queue='ocr')
task_do_ocr.apply_async(args=[self.latest_version.pk])
def document_version_ocr_submit(self):
task_do_ocr.apply_async(args=[self.pk], queue='ocr')
task_do_ocr.apply_async(args=[self.pk])
class OCRApp(MayanAppConfig):
@@ -66,6 +68,18 @@ class OCRApp(MayanAppConfig):
SourceColumn(source=DocumentVersionOCRError, label=_('Added'), attribute='datetime_submitted')
SourceColumn(source=DocumentVersionOCRError, label=_('Result'), attribute='result')
app.conf.CELERY_QUEUES.append(
Queue('ocr', Exchange('ocr'), routing_key='ocr'),
)
app.conf.CELERY_ROUTES.update(
{
'ocr.tasks.task_do_ocr': {
'queue': 'ocr'
},
}
)
document_search.add_model_field(field='versions__pages__ocr_content__content', label=_('Content'))
menu_facet.bind_links(links=[link_document_content], sources=[Document])

View File

@@ -2,6 +2,8 @@ from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from kombu import Exchange, Queue
from common import (
MayanAppConfig, MissingItem, menu_front_page, menu_object, menu_secondary,
menu_sidebar, menu_setup
@@ -11,6 +13,7 @@ from common.utils import encapsulate
from converter.links import link_transformation_list
from documents.models import Document
from documents.signals import post_version_upload
from mayan.celery import app
from navigation import SourceColumn
from rest_api.classes import APIEndPoint
@@ -44,6 +47,24 @@ class SourcesApp(MayanAppConfig):
SourceColumn(source=StagingFile, label=_('Thumbnail'), attribute=encapsulate(lambda staging_file: staging_file_thumbnail(staging_file, gallery_name='sources:staging_list', title=staging_file.filename, size='100')))
app.conf.CELERY_QUEUES.extend(
(
Queue('sources', Exchange('sources'), routing_key='sources'),
Queue('sources_periodic', Exchange('sources_periodic'), routing_key='sources_periodic', delivery_mode=1),
)
)
app.conf.CELERY_ROUTES.update(
{
'sources.tasks.task_check_interval_source': {
'queue': 'sources_periodic'
},
'sources.tasks.task_source_upload_document': {
'queue': 'sources'
},
}
)
menu_front_page.bind_links(links=[link_document_create_multiple])
menu_object.bind_links(links=[link_document_create_siblings], sources=[Document])
menu_object.bind_links(links=[link_setup_source_edit, link_setup_source_delete, link_transformation_list, link_setup_source_logs], sources=[Source])

View File

@@ -206,7 +206,6 @@ class IntervalBaseModel(OutOfProcessSource):
name=self._get_periodic_task_name(),
interval=interval_instance,
task='sources.tasks.task_check_interval_source',
queue='uploads',
kwargs=json.dumps({'source_id': self.pk})
)

View File

@@ -245,7 +245,7 @@ class UploadInteractiveView(UploadBaseView):
shared_uploaded_file_id=shared_uploaded_file.pk,
source_id=self.source.pk,
user_id=user_id,
), queue='uploads')
))
messages.success(self.request, _('New document queued for uploaded and will be available shortly.'))
return HttpResponseRedirect(self.request.get_full_path())
@@ -310,7 +310,7 @@ class UploadInteractiveVersionView(UploadBaseView):
document_id=self.document.pk,
user_id=user_id,
comment=forms['document_form'].cleaned_data.get('comment')
), queue='uploads')
))
messages.success(self.request, _('New document version queued for uploaded and will be available shortly.'))
return HttpResponseRedirect(reverse('documents:document_version_list', args=[self.document.pk]))

View File

@@ -265,6 +265,8 @@ CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
CELERY_QUEUES = []
CELERY_ROUTES = {}
# ------------ CORS ------------
CORS_ORIGIN_ALLOW_ALL = True
# ------ Django REST Swagger -----