Add locking and arbitration to indexing updates, remove filesystem index mirroring issue #56

This commit is contained in:
Roberto Rosario
2015-01-02 21:52:33 -04:00
parent 155f543db4
commit 018284eec1
9 changed files with 39 additions and 220 deletions

View File

@@ -18,23 +18,23 @@ from .links import (document_index_list, document_index_main_menu_link,
rebuild_index_instances, template_node_create, rebuild_index_instances, template_node_create,
template_node_delete, template_node_edit) template_node_delete, template_node_edit)
from .models import Index, IndexTemplateNode, IndexInstanceNode from .models import Index, IndexTemplateNode, IndexInstanceNode
from .tasks import task_delete_indexes, task_update_indexes from .tasks import task_delete_empty_index_nodes, task_update_indexes
@receiver(pre_delete, dispatch_uid='document_index_delete', sender=Document) @receiver(post_delete, dispatch_uid='document_index_delete', sender=Document)
def document_index_delete(sender, **kwargs): def document_index_delete(sender, **kwargs):
task_delete_indexes.apply_async(kwargs=dict(document_id=kwargs['instance'].pk), queue='indexing') task_delete_empty_index_nodes.apply_async(queue='indexing')
@receiver(post_save, dispatch_uid='document_metadata_index_update', sender=DocumentMetadata) @receiver(post_save, dispatch_uid='document_metadata_index_update', sender=DocumentMetadata)
def document_metadata_index_update(sender, **kwargs): def document_metadata_index_update(sender, **kwargs):
task_delete_indexes.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing')
task_update_indexes.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing') task_update_indexes.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing')
task_delete_empty_index_nodes.apply_async(queue='indexing')
@receiver(pre_delete, dispatch_uid='document_metadata_index_delete', sender=DocumentMetadata) @receiver(post_delete, dispatch_uid='document_metadata_index_delete', sender=DocumentMetadata)
def document_metadata_index_delete(sender, **kwargs): def document_metadata_index_delete(sender, **kwargs):
task_delete_indexes.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing') task_delete_empty_index_nodes.apply_async(kwargs=dict(document_id=kwargs['instance'].document.pk), queue='indexing')
@receiver(post_delete, dispatch_uid='document_metadata_index_post_delete', sender=DocumentMetadata) @receiver(post_delete, dispatch_uid='document_metadata_index_post_delete', sender=DocumentMetadata)

View File

@@ -4,8 +4,7 @@ from django.contrib import admin
from mptt.admin import MPTTModelAdmin from mptt.admin import MPTTModelAdmin
from .models import (DocumentRenameCount, Index, IndexInstanceNode, from .models import Index, IndexInstanceNode, IndexTemplateNode
IndexTemplateNode)
class IndexTemplateNodeAdmin(MPTTModelAdmin): class IndexTemplateNodeAdmin(MPTTModelAdmin):
@@ -17,7 +16,6 @@ class IndexInstanceNodeAdmin(MPTTModelAdmin):
list_display = ('value',) list_display = ('value',)
admin.site.register(DocumentRenameCount)
admin.site.register(Index) admin.site.register(Index)
admin.site.register(IndexTemplateNode, IndexTemplateNodeAdmin) admin.site.register(IndexTemplateNode, IndexTemplateNodeAdmin)
admin.site.register(IndexInstanceNode, IndexInstanceNodeAdmin) admin.site.register(IndexInstanceNode, IndexInstanceNodeAdmin)

View File

@@ -3,24 +3,13 @@ from __future__ import absolute_import
import logging import logging
from django.db.models import Q from django.db.models import Q
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext from django.utils.translation import ugettext
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from .exceptions import MaxSuffixCountReached from .models import Index, IndexInstanceNode
from .filesystem import (assemble_suffixed_filename, fs_create_index_directory, from .settings import AVAILABLE_INDEXING_FUNCTIONS
fs_create_document_link, fs_delete_document_link,
fs_delete_index_directory)
from .models import Index, IndexInstanceNode, DocumentRenameCount
from .settings import (AVAILABLE_INDEXING_FUNCTIONS, MAX_SUFFIX_COUNT,
SLUGIFY_PATHS)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if SLUGIFY_PATHS:
SLUGIFY_FUNCTION = slugify
else:
# Do not slugify path or filenames and extensions
SLUGIFY_FUNCTION = lambda x: x
# External functions # External functions
@@ -42,22 +31,6 @@ def update_indexes(document):
return warnings return warnings
def delete_indexes(document):
"""
Delete all the index instances related to a document
"""
# TODO: convert this fuction into a manager method
warnings = []
for index_node in document.node_instances.all():
index_warnings = cascade_document_remove(document, index_node)
warnings.extend(index_warnings)
return warnings
# Internal functions # Internal functions
def find_lowest_available_suffix(index_instance, document): def find_lowest_available_suffix(index_instance, document):
index_instance_documents = DocumentRenameCount.objects.filter(index_instance_node=index_instance) index_instance_documents = DocumentRenameCount.objects.filter(index_instance_node=index_instance)
@@ -84,19 +57,13 @@ def cascade_eval(document, template_node, parent_index_instance=None):
try: try:
result = eval(template_node.expression, {'document': document}, AVAILABLE_INDEXING_FUNCTIONS) result = eval(template_node.expression, {'document': document}, AVAILABLE_INDEXING_FUNCTIONS)
except Exception as exception: except Exception as exception:
error_message = _(u'Error in document indexing update expression: %(expression)s; %(exception)s') % { error_message = _(u'Error indexing document: %(document)s; expression: %(expression)s; %(exception)s') % {
'expression': template_node.expression, 'exception': exception} 'document': document, 'expression': template_node.expression, 'exception': exception}
warnings.append(error_message) warnings.append(error_message)
logger.debug(error_message) logger.debug(error_message)
else: else:
if result: if result:
index_instance, created = IndexInstanceNode.objects.get_or_create(index_template_node=template_node, value=result, parent=parent_index_instance) index_instance, created = IndexInstanceNode.objects.get_or_create(index_template_node=template_node, value=result, parent=parent_index_instance)
# if created:
try:
fs_create_index_directory(index_instance)
except Exception as exception:
warnings.append(_(u'Error updating document index, expression: %(expression)s; %(exception)s') % {
'expression': template_node.expression, 'exception': exception})
if template_node.link_documents: if template_node.link_documents:
suffix = find_lowest_available_suffix(index_instance, document) suffix = find_lowest_available_suffix(index_instance, document)
@@ -106,15 +73,6 @@ def cascade_eval(document, template_node, parent_index_instance=None):
suffix=suffix suffix=suffix
) )
document_count.save() document_count.save()
try:
fs_create_document_link(index_instance, document, suffix)
except Exception as exception:
error_message = _(u'Error updating document index, expression: %(expression)s; %(exception)s') % {
'expression': template_node.expression, 'exception': exception}
warnings.append(error_message)
logger.debug(error_message)
index_instance.documents.add(document) index_instance.documents.add(document)
for child in template_node.get_children(): for child in template_node.get_children():
@@ -128,32 +86,24 @@ def cascade_eval(document, template_node, parent_index_instance=None):
return warnings return warnings
def cascade_document_remove(document, index_instance): def delete_empty_index_nodes():
""" """
Delete a documents reference from an index instance and call itself Delete empty index instance nodes
recusively deleting documents and empty index instances up to the
root of the tree
""" """
warnings = [] for instance_node in IndexInstanceNode.objects.filter(documents__isnull=True):
try: task_delete_empty_index_nodes_recursive(instance_node)
document_rename_count = DocumentRenameCount.objects.get(index_instance_node=index_instance, document=document)
fs_delete_document_link(index_instance, document, document_rename_count.suffix)
document_rename_count.delete()
index_instance.documents.remove(document)
if index_instance.documents.count() == 0 and index_instance.get_children().count() == 0:
# if there are no more documents and no children, delete
# node and check parent for the same conditions
parent = index_instance.parent
fs_delete_index_directory(index_instance)
index_instance.delete()
parent_warnings = cascade_document_remove(
document, parent
)
warnings.extend(parent_warnings)
except DocumentRenameCount.DoesNotExist:
return warnings
except Exception as exception:
warnings.append(_(u'Unable to delete document indexing node; %s') % exception)
return warnings
def task_delete_empty_index_nodes_recursive(instance_node):
"""
Calls itself recursively deleting empty index instance nodes up to root
"""
if instance_node.get_children().count() == 0:
# if there are no children, delete node and check parent for the
# same conditions
parent = instance_node.parent
instance_node.delete()
delete_empty_indexes(parent)

View File

@@ -1,6 +0,0 @@
class MaxSuffixCountReached(Exception):
"""
Raised when there are too many documents with the same filename in the
same node/directory
"""
pass

View File

@@ -1,106 +0,0 @@
from __future__ import absolute_import
import errno
import os
from django.utils.translation import ugettext_lazy as _
from .settings import (FILESYSTEM_SERVING, SUFFIX_SEPARATOR)
def assemble_suffixed_filename(filename, suffix=0):
"""
Split document filename, to attach suffix to the name part then
re attacht the extension
"""
if suffix:
name, extension = os.path.splitext(filename)
return SUFFIX_SEPARATOR.join([name, unicode(suffix), os.extsep, extension])
else:
return filename
def assemble_path_from_list(directory_list):
return os.path.normpath(os.sep.join(directory_list))
def get_instance_path(index_instance):
"""
Return a platform formated filesytem path corresponding to an
index instance
"""
names = []
for ancestor in index_instance.get_ancestors():
names.append(ancestor.value)
names.append(index_instance.value)
return assemble_path_from_list(names)
def fs_create_index_directory(index_instance):
if index_instance.index_template_node.index.name in FILESYSTEM_SERVING:
target_directory = assemble_path_from_list([FILESYSTEM_SERVING[index_instance.index_template_node.index.name], get_instance_path(index_instance)])
try:
os.mkdir(target_directory)
except OSError as exception:
if exception.errno == errno.EEXIST:
pass
else:
raise Exception(_(u'Unable to create indexing directory; %s') % exception)
def fs_create_document_link(index_instance, document, suffix=0):
if index_instance.index_template_node.index.name in FILESYSTEM_SERVING:
filename = assemble_suffixed_filename(document.file_filename, suffix)
filepath = assemble_path_from_list([FILESYSTEM_SERVING[index_instance.index_template_node.index.name], get_instance_path(index_instance), filename])
try:
os.symlink(document.file.path, filepath)
except OSError as exception:
if exception.errno == errno.EEXIST:
# This link should not exist, try to delete it
try:
os.unlink(filepath)
# Try again
os.symlink(document.file.path, filepath)
except Exception as exception:
raise Exception(_(u'Unable to create symbolic link, file exists and could not be deleted: %(filepath)s; %(exception)s') % {'filepath': filepath, 'exception': exception})
else:
raise Exception(_(u'Unable to create symbolic link: %(filepath)s; %(exception)s') % {'filepath': filepath, 'exception': exception})
def fs_delete_document_link(index_instance, document, suffix=0):
if index_instance.index_template_node.index.name in FILESYSTEM_SERVING:
filename = assemble_suffixed_filename(document.file_filename, suffix)
filepath = assemble_path_from_list([FILESYSTEM_SERVING[index_instance.index_template_node.index.name], get_instance_path(index_instance), filename])
try:
os.unlink(filepath)
except OSError as exception:
if exception.errno != errno.ENOENT:
# Raise when any error other than doesn't exits
raise Exception(_(u'Unable to delete document symbolic link; %s') % exception)
def fs_delete_index_directory(index_instance):
if index_instance.index_template_node.index.name in FILESYSTEM_SERVING:
target_directory = assemble_path_from_list([FILESYSTEM_SERVING[index_instance.index_template_node.index.name], get_instance_path(index_instance)])
try:
os.removedirs(target_directory)
except OSError as exception:
if exception.errno == errno.EEXIST:
pass
else:
raise Exception(_(u'Unable to delete indexing directory; %s') % exception)
def fs_delete_directory_recusive(index):
if index.name in FILESYSTEM_SERVING:
path = FILESYSTEM_SERVING[index.name]
for dirpath, dirnames, filenames in os.walk(path, topdown=False):
for filename in filenames:
os.unlink(os.path.join(dirpath, filename))
for dirname in dirnames:
os.rmdir(os.path.join(dirpath, dirname))

View File

@@ -98,16 +98,3 @@ class IndexInstanceNode(MPTTModel):
class Meta: class Meta:
verbose_name = _(u'Index node instance') verbose_name = _(u'Index node instance')
verbose_name_plural = _(u'Indexes node instances') verbose_name_plural = _(u'Indexes node instances')
class DocumentRenameCount(models.Model):
index_instance_node = models.ForeignKey(IndexInstanceNode, verbose_name=_(u'Index instance'))
document = models.ForeignKey(Document, verbose_name=_(u'Document'))
suffix = models.PositiveIntegerField(blank=True, verbose_name=(u'Suffix'))
def __unicode__(self):
return u'%s - %s - %s' % (self.index_instance_node, self.document, self.suffix or u'0')
class Meta:
verbose_name = _(u'Document rename count')
verbose_name_plural = _(u'Documents rename count')

View File

@@ -13,10 +13,5 @@ register_settings(
settings=[ settings=[
# Definition # Definition
{'name': u'AVAILABLE_INDEXING_FUNCTIONS', 'global_name': u'DOCUMENT_INDEXING_AVAILABLE_INDEXING_FUNCTIONS', 'default': available_indexing_functions}, {'name': u'AVAILABLE_INDEXING_FUNCTIONS', 'global_name': u'DOCUMENT_INDEXING_AVAILABLE_INDEXING_FUNCTIONS', 'default': available_indexing_functions},
{'name': u'SUFFIX_SEPARATOR', 'global_name': u'DOCUMENT_INDEXING_SUFFIX_SEPARATOR', 'default': u'_'},
# Filesystem serving
{'name': u'SLUGIFY_PATHS', 'global_name': u'DOCUMENT_INDEXING_FILESYSTEM_SLUGIFY_PATHS', 'default': False},
{'name': u'MAX_SUFFIX_COUNT', 'global_name': u'DOCUMENT_INDEXING_FILESYSTEM_MAX_SUFFIX_COUNT', 'default': 1000},
{'name': u'FILESYSTEM_SERVING', 'global_name': u'DOCUMENT_INDEXING_FILESYSTEM_SERVING', 'default': {}, 'description': _(u'A dictionary that maps the index name and where on the filesystem that index will be mirrored.')}
] ]
) )

View File

@@ -4,7 +4,7 @@ from mayan.celery import app
from documents.models import Document from documents.models import Document
from lock_manager import Lock, LockError from lock_manager import Lock, LockError
from .api import update_indexes, delete_indexes from .api import update_indexes, delete_empty_index_nodes
from .tools import do_rebuild_all_indexes from .tools import do_rebuild_all_indexes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -12,13 +12,13 @@ RETRY_DELAY = 20 # TODO: convert this into a config option
@app.task(ignore_result=True) @app.task(ignore_result=True)
def task_delete_indexes(document_id): def task_delete_empty_index_nodes():
document = Document.objects.get(pk=document_id) delete_empty_index_nodes()
delete_indexes(document)
@app.task(bind=True, ignore_result=True) @app.task(bind=True, ignore_result=True)
def task_update_indexes(self, document_id): def task_update_indexes(self, document_id):
# TODO: Add concurrent task control
try: try:
lock = Lock.acquire_lock('document_indexing_task_update_index_document_%d' % document_id) lock = Lock.acquire_lock('document_indexing_task_update_index_document_%d' % document_id)
except LockError as exception: except LockError as exception:

View File

@@ -3,15 +3,16 @@ from __future__ import absolute_import
from documents.models import Document from documents.models import Document
from .api import update_indexes from .api import update_indexes
from .filesystem import fs_delete_directory_recusive from .models import Index, IndexInstanceNode
from .models import Index, IndexInstanceNode, DocumentRenameCount
def do_rebuild_all_indexes(): def do_rebuild_all_indexes():
for index in Index.objects.all(): for instance_node in IndexInstanceNode.objects.all():
fs_delete_directory_recusive(index) instance_node.delete()
for index in Index.objects.all():
index.delete()
IndexInstanceNode.objects.all().delete()
DocumentRenameCount.objects.all().delete()
for document in Document.objects.all(): for document in Document.objects.all():
# TODO: Launch all concurrently as background tasks
update_indexes(document) update_indexes(document)