Rewrite the document indexer to allow single index rebuilds,

less locks and improve performance.

Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
Roberto Rosario
2017-05-28 15:22:13 -04:00
parent 3e3e247997
commit 88151df2bc
14 changed files with 306 additions and 208 deletions

View File

@@ -6,6 +6,7 @@
FUSE index mirror. GitLab issue #385
- Add support for check for the latest released version of Mayan from the
About menu.
- Rewrite document indexing code to be faster and use less locking.
2.2 (2017-04-26)
================

View File

@@ -20,7 +20,9 @@ Changes
to the index mirroring management command.
- Add support for check for the latest released version of Mayan from the
About menu.
- Add support for rebuilding specific indexes instead of only being able to
rebuild all index.
- Rewrite document indexing code to be faster and use less locking.
Removals
--------

View File

@@ -175,4 +175,4 @@ class APIDocumentIndexListView(generics.ListAPIView):
obj=document
)
return document.node_instances.all()
return document.index_instance_nodes.all()

View File

@@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save, post_delete
from django.db.models.signals import post_save, post_delete, pre_delete
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission
@@ -21,9 +21,9 @@ from navigation import SourceColumn
from rest_api.classes import APIEndPoint
from .handlers import (
document_created_index_update, create_default_document_index,
document_index_delete, document_metadata_index_update,
document_metadata_index_post_delete
create_default_document_index, document_metadata_index_update,
document_metadata_index_post_delete, handler_delete_empty,
handler_index_document, handler_remove_document
)
from .links import (
link_document_index_list, link_index_main_menu, link_index_setup,
@@ -145,13 +145,16 @@ class DocumentIndexingApp(MayanAppConfig):
app.conf.CELERY_ROUTES.update(
{
'document_indexing.tasks.task_delete_empty_index_nodes': {
'document_indexing.tasks.task_delete_empty': {
'queue': 'indexing'
},
'document_indexing.tasks.task_remove_document': {
'queue': 'indexing'
},
'document_indexing.tasks.task_index_document': {
'queue': 'indexing'
},
'document_indexing.tasks.task_do_rebuild_all_indexes': {
'document_indexing.tasks.task_rebuild_index': {
'queue': 'tools'
},
}
@@ -185,7 +188,11 @@ class DocumentIndexingApp(MayanAppConfig):
menu_tools.bind_links(links=(link_rebuild_index_instances,))
post_delete.connect(
document_index_delete, dispatch_uid='document_index_delete',
handler_delete_empty, dispatch_uid='handler_delete_empty',
sender=Document
)
pre_delete.connect(
handler_remove_document, dispatch_uid='handler_remove_document',
sender=Document
)
post_delete.connect(
@@ -194,8 +201,8 @@ class DocumentIndexingApp(MayanAppConfig):
sender=DocumentMetadata
)
post_document_created.connect(
document_created_index_update,
dispatch_uid='document_created_index_update', sender=Document
handler_index_document,
dispatch_uid='handler_index_document', sender=Document
)
post_initial_document_type.connect(
create_default_document_index,

View File

@@ -1,11 +1,20 @@
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals
from django import forms
from django.utils.translation import ugettext_lazy as _
from common.classes import ModelAttribute
from documents.models import Document
from .models import IndexTemplateNode
from .models import Index, IndexTemplateNode
class IndexListForm(forms.Form):
indexes = forms.ModelMultipleChoiceField(
help_text=_('Indexes to be queued for rebuilding.'),
label=_('Indexes'), queryset=Index.objects.filter(enabled=True),
required=False, widget=forms.widgets.CheckboxSelectMultiple()
)
class IndexTemplateNodeForm(forms.ModelForm):

View File

@@ -3,7 +3,9 @@ from __future__ import unicode_literals
from django.apps import apps
from django.utils.translation import ugettext_lazy as _
from .tasks import task_delete_empty_index_nodes, task_index_document
from .tasks import (
task_delete_empty, task_index_document, task_remove_document
)
def create_default_document_index(sender, **kwargs):
@@ -31,14 +33,20 @@ def create_default_document_index(sender, **kwargs):
)
def document_created_index_update(sender, **kwargs):
def handler_delete_empty(sender, **kwargs):
task_delete_empty.apply_async()
def handler_index_document(sender, **kwargs):
task_index_document.apply_async(
kwargs=dict(document_id=kwargs['instance'].pk)
)
def document_index_delete(sender, **kwargs):
task_delete_empty_index_nodes.apply_async()
def handler_remove_document(sender, **kwargs):
task_remove_document.apply_async(
kwargs=dict(document_id=kwargs['instance'].pk)
)
def document_metadata_index_update(sender, **kwargs):

View File

@@ -1,14 +1,6 @@
from __future__ import unicode_literals
import logging
from django.db import models, transaction
from django.template import Context, Template
from django.utils.translation import ugettext_lazy as _
from documents.models import Document
logger = logging.getLogger(__name__)
from django.db import models
class DocumentIndexInstanceNodeManager(models.Manager):
@@ -17,104 +9,25 @@ class DocumentIndexInstanceNodeManager(models.Manager):
class IndexManager(models.Manager):
def index_document(self, document):
for index in self.filter(enabled=True, document_types=document.document_type):
index.index_document(document=document)
def get_by_natural_key(self, name):
return self.get(name=name)
def rebuild(self):
for index in self.all():
index.rebuild()
class IndexInstanceNodeManager(models.Manager):
@staticmethod
def delete_empty_index_nodes_recursive(instance_node):
"""
Calls itself recursively deleting empty index instance nodes up to
root
"""
if instance_node.get_children().count() == 0:
# if there are no children, delete node and check parent for the
# same conditions
parent = instance_node.parent
if parent:
instance_node.delete()
IndexInstanceNodeManager.delete_empty_index_nodes_recursive(
parent
)
def cascade_eval(self, document, template_node, parent_index_instance=None):
"""
Evaluate an enabled index expression and update or create all the
related index instances also recursively calling itself to evaluate
all the index's children
"""
if template_node.enabled:
try:
template = Template(template_node.expression)
context = Context({'document': document})
result = template.render(context=context)
except Exception as exception:
error_message = _(
'Error indexing document: %(document)s; expression: '
'%(expression)s; %(exception)s'
) % {
'document': document,
'expression': template_node.expression,
'exception': exception
}
logger.debug(error_message)
else:
if result:
index_instance, created = self.get_or_create(
index_template_node=template_node, value=result,
parent=parent_index_instance
)
if template_node.link_documents:
index_instance.documents.add(document)
for child in template_node.get_children():
self.cascade_eval(
document=document,
template_node=child,
parent_index_instance=index_instance
)
def delete_empty_index_nodes(self):
"""
Delete empty index instance nodes
"""
for instance_node in self.filter(documents__isnull=True, parent__isnull=False):
IndexInstanceNodeManager.delete_empty_index_nodes_recursive(
instance_node
)
def index_document(self, document):
"""
Update or create all the index instances related to a document
"""
from .models import Index
with transaction.atomic():
self.remove_document(document)
# Only update indexes where the document type is found
for index in Index.objects.filter(enabled=True, document_types=document.document_type):
root_instance, created = self.get_or_create(
index_template_node=index.template_root, parent=None
)
for template_node in index.template_root.get_children():
self.cascade_eval(document, template_node, root_instance)
def delete_empty(self):
# Select leaf nodes only because .delete_empty() bubbles up
for root_nodes in self.filter(parent=None):
for index_instance_node in root_nodes.get_leafnodes():
index_instance_node.delete_empty()
def remove_document(self, document):
for index_node in self.filter(documents=document):
index_node.documents.remove(document)
self.delete_empty_index_nodes()
def rebuild_all_indexes(self):
for instance_node in self.all():
instance_node.delete()
for document in Document.objects.all():
self.index_document(document)
for index_instance_node in self.filter(documents=document):
index_instance_node.remove_document(document=document)

View File

@@ -15,6 +15,11 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='indextemplatenode',
name='expression',
field=models.TextField(help_text="Enter a template to render. Use Django's default templating language (https://docs.djangoproject.com/en/1.7/ref/templates/builtins/)", verbose_name='Indexing expression'),
field=models.TextField(
help_text="Enter a template to render. Use Django's default "
"templating language (https://docs.djangoproject.com/en/1.7/"
"ref/templates/builtins/)",
verbose_name='Indexing expression'
),
),
]

View File

@@ -1,7 +1,10 @@
from __future__ import absolute_import, unicode_literals
import logging
from django.core.urlresolvers import reverse
from django.db import models
from django.db import models, transaction
from django.template import Context, Template
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext, ugettext_lazy as _
@@ -11,11 +14,14 @@ from mptt.models import MPTTModel
from acls.models import AccessControlList
from documents.models import Document, DocumentType
from documents.permissions import permission_document_view
from lock_manager.runtime import locking_backend
from .managers import (
DocumentIndexInstanceNodeManager, IndexManager, IndexInstanceNodeManager
)
logger = logging.getLogger(__name__)
@python_2_unicode_compatible
class Index(models.Model):
@@ -45,14 +51,6 @@ class Index(models.Model):
objects = IndexManager()
@property
def template_root(self):
return self.node_templates.get(parent=None)
@property
def instance_root(self):
return self.template_root.node_instance.get()
def __str__(self):
return self.label
@@ -72,6 +70,14 @@ class Index(models.Model):
super(Index, self).save(*args, **kwargs)
IndexTemplateNode.objects.get_or_create(parent=None, index=self)
@property
def instance_root(self):
return self.template_root.index_instance_nodes.get()
@property
def template_root(self):
return self.node_templates.get(parent=None)
def get_document_types_names(self):
return ', '.join(
[
@@ -79,6 +85,34 @@ class Index(models.Model):
] or ['None']
)
def index_document(self, document):
logger.debug('Index; Indexing document: %s', document)
self.template_root.index_document(document=document)
def rebuild(self):
"""
Delete and reconstruct the index by deleting of all its instance nodes
and recreating them for the documents whose types are associated with
this index
"""
# Delete all index instance nodes by deleting the root index
# instance node. All child index instance nodes will be cascade
# deleted.
try:
self.instance_root.delete()
except IndexInstanceNode.DoesNotExist:
# Empty index, ignore this exception
pass
# Create the new root index instance node
self.template_root.index_instance_nodes.create()
# Re-index each document with a type associated with this index
for document in Document.objects.filter(document_type__in=self.document_types.all()):
# Evaluate each index template node for each document
# associated with this index.
self.index_document(document=document)
class Meta:
verbose_name = _('Index')
verbose_name_plural = _('Indexes')
@@ -145,6 +179,84 @@ class IndexTemplateNode(MPTTModel):
else:
return self.expression
def index_document(self, document, acquire_lock=True):
# Avoid another process to index this same document for the same
# template node. This prevents this template node's index instance
# nodes from being deleted while the template is evaluated and
# documents added to it.
if acquire_lock:
lock = locking_backend.acquire_lock(
'indexing:indexing_template_node_{}'.format(self.pk)
)
# Start transaction after the lock in case the locking backend uses
# the database.
with transaction.atomic():
logger.debug('IndexTemplateNode; Indexing document: %s', document)
logger.debug(
'Removing document "%s" from all index instance nodes',
document
)
for index_template_node in self.index_instance_nodes.all():
index_template_node.remove_document(
document=document, acquire_lock=False
)
if not self.parent:
logger.debug(
'IndexTemplateNode; parent: creating empty root index '
'instance node'
)
index_instance_node, created = self.index_instance_nodes.get_or_create()
for child in self.get_children():
child.index_document(document=document, acquire_lock=False)
if acquire_lock:
lock.release()
elif self.enabled:
logger.debug('IndexTemplateNode; non parent: evaluating')
logger.debug('My parent is: %s', self.parent)
logger.debug(
'My parent nodes: %s', self.parent.index_instance_nodes.all()
)
logger.debug(
'IndexTemplateNode; Evaluating template: %s', self.expression
)
try:
context = Context({'document': document})
template = Template(self.expression)
result = template.render(context=context)
except Exception as exception:
logger.debug('Evaluating error: %s', exception)
error_message = _(
'Error indexing document: %(document)s; expression: '
'%(expression)s; %(exception)s'
) % {
'document': document,
'expression': self.expression,
'exception': exception
}
logger.debug(error_message)
else:
logger.debug('Evaluation result: %s', result)
if result:
index_instance_node, created = self.index_instance_nodes.get_or_create(
parent=self.parent.index_instance_nodes.get(),
value=result
)
if self.link_documents:
index_instance_node.documents.add(document)
for child in self.get_children():
child.index_document(document=document, acquire_lock=False)
finally:
if acquire_lock:
lock.release()
class Meta:
verbose_name = _('Index node template')
verbose_name_plural = _('Indexes node template')
@@ -154,14 +266,15 @@ class IndexTemplateNode(MPTTModel):
class IndexInstanceNode(MPTTModel):
parent = TreeForeignKey('self', null=True, blank=True)
index_template_node = models.ForeignKey(
IndexTemplateNode, related_name='node_instance',
IndexTemplateNode, related_name='index_instance_nodes',
verbose_name=_('Index template node')
)
value = models.CharField(
blank=True, db_index=True, max_length=128, verbose_name=_('Value')
)
documents = models.ManyToManyField(
Document, related_name='node_instances', verbose_name=_('Documents')
Document, related_name='index_instance_nodes',
verbose_name=_('Documents')
)
objects = IndexInstanceNodeManager()
@@ -200,6 +313,52 @@ class IndexInstanceNode(MPTTModel):
return ' / '.join(result)
def delete_empty(self, acquire_lock=True):
"""
The argument `acquire_lock` controls whether or not this method
acquires or lock. The case for this is to acquire when called directly
or not to acquire when called as part of a larger index process
that already has a lock
"""
# Prevent another process to work on this node. We use the node's
# parent template node for the lock
if acquire_lock:
lock = locking_backend.acquire_lock(
'indexing:indexing_template_node_{}'.format(
self.index_template_node.pk
)
)
# Start transaction after the lock in case the locking backend uses
# the database.
with transaction.atomic():
if self.documents.count() == 0 and self.get_children().count() == 0:
if self.parent:
self.delete()
self.parent.delete_empty(acquire_lock=False)
if acquire_lock:
lock.release()
def remove_document(self, document, acquire_lock=True):
"""
The argument `acquire_lock` controls whether or not this method
acquires or lock. The case for this is to acquire when called directly
or not to acquire when called as part of a larger index process
that already has a lock
"""
# Prevent another process to work on this node. We use the node's
# parent template node for the lock
if acquire_lock:
lock = locking_backend.acquire_lock(
'indexing:indexing_template_node_{}'.format(
self.index_template_node.pk
)
)
self.documents.remove(document)
self.delete_empty(acquire_lock=False)
if acquire_lock:
lock.release()
class Meta:
verbose_name = _('Index node instance')
verbose_name_plural = _('Indexes node instances')

View File

@@ -7,7 +7,6 @@ from django.db import OperationalError
from mayan.celery import app
from lock_manager import LockError
from lock_manager.runtime import locking_backend
from .literals import RETRY_DELAY
@@ -15,23 +14,36 @@ logger = logging.getLogger(__name__)
@app.task(bind=True, default_retry_delay=RETRY_DELAY, max_retries=None, ignore_result=True)
def task_delete_empty_index_nodes(self):
def task_delete_empty(self):
IndexInstanceNode = apps.get_model(
app_label='document_indexing', model_name='IndexInstanceNode'
)
try:
rebuild_lock = locking_backend.acquire_lock(
'document_indexing_task_do_rebuild_all_indexes'
)
IndexInstanceNode.objects.delete_empty()
except LockError as exception:
# A rebuild is happening, retry later
raise self.retry(exc=exception)
@app.task(bind=True, default_retry_delay=RETRY_DELAY, max_retries=None, ignore_result=True)
def task_remove_document(self, document_id):
Document = apps.get_model(
app_label='documents', model_name='Document'
)
IndexInstanceNode = apps.get_model(
app_label='document_indexing', model_name='IndexInstanceNode'
)
try:
document = Document.objects.get(pk=document_id)
except Document.DoesNotExist:
# Document was deleted before we could execute, abort
pass
else:
try:
IndexInstanceNode.objects.delete_empty_index_nodes()
finally:
rebuild_lock.release()
IndexInstanceNode.objects.remove_document(document=document)
except LockError as exception:
raise self.retry(exc=exception)
@app.task(bind=True, default_retry_delay=RETRY_DELAY, max_retries=None, ignore_result=True)
@@ -39,27 +51,10 @@ def task_index_document(self, document_id):
Document = apps.get_model(
app_label='documents', model_name='Document'
)
IndexInstanceNode = apps.get_model(
app_label='document_indexing', model_name='IndexInstanceNode'
Index = apps.get_model(
app_label='document_indexing', model_name='Index'
)
try:
rebuild_lock = locking_backend.acquire_lock(
'document_indexing_task_do_rebuild_all_indexes'
)
except LockError as exception:
# A rebuild is happening, retry later
raise self.retry(exc=exception)
else:
try:
lock = locking_backend.acquire_lock(
'document_indexing_task_update_index_document_%d' % document_id
)
except LockError as exception:
# This document is being reindexed by another task, retry later
raise self.retry(exc=exception)
else:
try:
document = Document.objects.get(pk=document_id)
except Document.DoesNotExist:
@@ -68,37 +63,30 @@ def task_index_document(self, document_id):
pass
else:
try:
IndexInstanceNode.objects.index_document(document)
Index.objects.index_document(document=document)
except OperationalError as exception:
logger.warning(
'Operational error while trying to index document: '
'%s; %s', document, exception
)
lock.release()
raise self.retry(exc=exception)
else:
lock.release()
finally:
lock.release()
finally:
rebuild_lock.release()
except LockError as exception:
logger.warning(
'Unable to acquire lock for document %s; %s ',
document, exception
)
raise self.retry(exc=exception)
@app.task(bind=True, default_retry_delay=RETRY_DELAY, ignore_result=True)
def task_do_rebuild_all_indexes(self):
IndexInstanceNode = apps.get_model(
app_label='document_indexing', model_name='IndexInstanceNode'
def task_rebuild_index(self, index_id):
Index = apps.get_model(
app_label='document_indexing', model_name='Index'
)
try:
lock = locking_backend.acquire_lock(
'document_indexing_task_do_rebuild_all_indexes'
)
index = Index.objects.get(pk=index_id)
index.rebuild()
except LockError as exception:
# Another rebuild is happening, retry later
# This index is being rebuilt by another task, retry later
raise self.retry(exc=exception)
else:
try:
IndexInstanceNode.objects.rebuild_all_indexes()
finally:
lock.release()

View File

@@ -116,7 +116,7 @@ class IndexTestCase(BaseTestCase):
), ['', '0003']
)
# Document deleted from, index structure should update
# Document deleted, index structure should update
self.document.delete()
self.assertEqual(
list(
@@ -162,7 +162,7 @@ class IndexTestCase(BaseTestCase):
self.assertEqual(list(IndexInstanceNode.objects.all()), [])
# Rebuild all indexes
IndexInstanceNode.objects.rebuild_all_indexes()
Index.objects.rebuild()
# Check that document is in instance node
instance_node = IndexInstanceNode.objects.get(value='0001')

View File

@@ -5,7 +5,7 @@ from user_management.tests import (
TEST_USER_USERNAME, TEST_USER_PASSWORD
)
from ..models import Index, IndexInstanceNode
from ..models import Index
from ..permissions import (
permission_document_indexing_create, permission_document_indexing_delete,
permission_document_indexing_edit, permission_document_indexing_view
@@ -128,7 +128,7 @@ class IndexViewTestCase(GenericDocumentViewTestCase):
)
# Rebuild indexes
IndexInstanceNode.objects.rebuild_all_indexes()
Index.objects.rebuild()
return index

View File

@@ -9,9 +9,9 @@ from .api_views import (
)
from .views import (
DocumentIndexNodeListView, IndexInstanceNodeView, IndexListView,
RebuildIndexesConfirmView, SetupIndexDocumentTypesView,
SetupIndexCreateView, SetupIndexDeleteView, SetupIndexEditView,
SetupIndexListView, SetupIndexTreeTemplateListView, TemplateNodeCreateView,
RebuildIndexesView, SetupIndexDocumentTypesView, SetupIndexCreateView,
SetupIndexDeleteView, SetupIndexEditView, SetupIndexListView,
SetupIndexTreeTemplateListView, TemplateNodeCreateView,
TemplateNodeDeleteView, TemplateNodeEditView
)
@@ -61,7 +61,7 @@ urlpatterns = [
),
url(
r'^rebuild/all/$', RebuildIndexesConfirmView.as_view(),
r'^rebuild/all/$', RebuildIndexesView.as_view(),
name='rebuild_index_instances'
),
url(

View File

@@ -8,14 +8,14 @@ from django.utils.translation import ugettext_lazy as _
from acls.models import AccessControlList
from common.views import (
AssignRemoveView, ConfirmView, SingleObjectCreateView,
AssignRemoveView, FormView, SingleObjectCreateView,
SingleObjectDeleteView, SingleObjectEditView, SingleObjectListView
)
from documents.models import Document, DocumentType
from documents.permissions import permission_document_view
from documents.views import DocumentListView
from .forms import IndexTemplateNodeForm
from .forms import IndexListForm, IndexTemplateNodeForm
from .models import (
DocumentIndexInstanceNode, Index, IndexInstance, IndexInstanceNode,
IndexTemplateNode
@@ -25,7 +25,7 @@ from .permissions import (
permission_document_indexing_edit, permission_document_indexing_rebuild,
permission_document_indexing_view
)
from .tasks import task_do_rebuild_all_indexes
from .tasks import task_rebuild_index
from .widgets import node_tree
@@ -248,7 +248,9 @@ class IndexInstanceNodeView(DocumentListView):
return DocumentListView.get_queryset(self)
else:
self.object_permission = None
return self.index_instance_node.get_children().order_by('value')
return self.index_instance_node.get_children().order_by(
'value'
)
else:
self.object_permission = None
return IndexInstanceNode.objects.none()
@@ -312,16 +314,20 @@ class DocumentIndexNodeListView(SingleObjectListView):
return DocumentIndexInstanceNode.objects.get_for(self.get_document())
class RebuildIndexesConfirmView(ConfirmView):
class RebuildIndexesView(FormView):
extra_context = {
'message': _('On large databases this operation may take some time to execute.'),
'title': _('Rebuild all indexes?'),
'title': _('Rebuild indexes'),
}
form_class = IndexListForm
view_permission = permission_document_indexing_rebuild
def form_valid(self, form):
for index in form.cleaned_data['indexes']:
task_rebuild_index(index_id=index.pk)
messages.success(self.request, _('Index rebuild queued successfully.'))
return super(RebuildIndexesView, self).form_valid(form=form)
def get_post_action_redirect(self):
return reverse('common:tools_list')
def view_action(self):
task_do_rebuild_all_indexes.apply_async()
messages.success(self.request, _('Index rebuild queued successfully.'))