document_indexing: Make sure locks are release on exceptions. Remove document from nodes and delete empty nodes only at the start of the indexing process. When the indexing starts at the template node, make sure it starts with the root node.
Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
@@ -14,6 +14,7 @@ from mptt.models import MPTTModel
|
|||||||
from acls.models import AccessControlList
|
from acls.models import AccessControlList
|
||||||
from documents.models import Document, DocumentType
|
from documents.models import Document, DocumentType
|
||||||
from documents.permissions import permission_document_view
|
from documents.permissions import permission_document_view
|
||||||
|
from lock_manager import LockError
|
||||||
from lock_manager.runtime import locking_backend
|
from lock_manager.runtime import locking_backend
|
||||||
|
|
||||||
from .managers import (
|
from .managers import (
|
||||||
@@ -77,11 +78,22 @@ class Index(models.Model):
|
|||||||
|
|
||||||
def index_document(self, document):
|
def index_document(self, document):
|
||||||
logger.debug('Index; Indexing document: %s', document)
|
logger.debug('Index; Indexing document: %s', document)
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
# Remove the document from all instance nodes from
|
||||||
|
# this index
|
||||||
|
for index_instance_node in IndexInstanceNode.objects.filter(index_template_node__index=self, documents=document):
|
||||||
|
index_instance_node.remove_document(document=document)
|
||||||
|
|
||||||
|
# Delete all empty nodes. Starting from the bottom up
|
||||||
|
for index_instance_node in self.instance_root.get_leafnodes():
|
||||||
|
index_instance_node.delete_empty()
|
||||||
|
|
||||||
self.template_root.index_document(document=document)
|
self.template_root.index_document(document=document)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def instance_root(self):
|
def instance_root(self):
|
||||||
return self.template_root.index_instance_nodes.get()
|
return self.template_root.get_instance_root_node()
|
||||||
|
|
||||||
def natural_key(self):
|
def natural_key(self):
|
||||||
return (self.slug,)
|
return (self.slug,)
|
||||||
@@ -193,45 +205,38 @@ class IndexTemplateNode(MPTTModel):
|
|||||||
def get_lock_string(self):
|
def get_lock_string(self):
|
||||||
return 'indexing:indexing_template_node_{}'.format(self.pk)
|
return 'indexing:indexing_template_node_{}'.format(self.pk)
|
||||||
|
|
||||||
def index_document(self, document, acquire_lock=True, index_instance_node_parent=None):
|
def get_instance_root_node(self):
|
||||||
# Avoid another process indexing this same document for the same
|
index_instance_root_node, create = self.index_instance_nodes.get_or_create(parent=None)
|
||||||
# template node. This prevents this template node's index instance
|
return index_instance_root_node
|
||||||
# nodes from being deleted while the template is evaluated and
|
|
||||||
# documents added to it.
|
|
||||||
if acquire_lock:
|
|
||||||
lock = locking_backend.acquire_lock(self.get_lock_string())
|
|
||||||
|
|
||||||
|
def index_document(self, document, acquire_lock=True, index_instance_node_parent=None):
|
||||||
# Start transaction after the lock in case the locking backend uses
|
# Start transaction after the lock in case the locking backend uses
|
||||||
# the database.
|
# the database.
|
||||||
try:
|
try:
|
||||||
with transaction.atomic():
|
if acquire_lock:
|
||||||
|
lock = locking_backend.acquire_lock(
|
||||||
|
self.get_lock_string()
|
||||||
|
)
|
||||||
|
except LockError:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
try:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
'IndexTemplateNode; Indexing document: %s', document
|
'IndexTemplateNode; Indexing document: %s', document
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
if not index_instance_node_parent:
|
||||||
'Removing document "%s" from all index instance nodes',
|
# I'm the root
|
||||||
document
|
|
||||||
)
|
|
||||||
for index_instance_node in self.index_instance_nodes.all():
|
|
||||||
index_instance_node.remove_document(
|
|
||||||
document=document, acquire_lock=False
|
|
||||||
)
|
|
||||||
|
|
||||||
with transaction.atomic():
|
with transaction.atomic():
|
||||||
if not self.parent:
|
index_instance_root_node = self.get_instance_root_node()
|
||||||
logger.debug(
|
|
||||||
'IndexTemplateNode; parent: creating empty root index '
|
|
||||||
'instance node'
|
|
||||||
)
|
|
||||||
index_instance_node, created = self.index_instance_nodes.get_or_create()
|
|
||||||
|
|
||||||
for child in self.get_children():
|
for child in self.get_children():
|
||||||
child.index_document(
|
child.index_document(
|
||||||
document=document, acquire_lock=False,
|
document=document, acquire_lock=False,
|
||||||
index_instance_node_parent=index_instance_node
|
index_instance_node_parent=index_instance_root_node
|
||||||
)
|
)
|
||||||
elif self.enabled:
|
elif self.enabled:
|
||||||
|
with transaction.atomic():
|
||||||
logger.debug('IndexTemplateNode; non parent: evaluating')
|
logger.debug('IndexTemplateNode; non parent: evaluating')
|
||||||
logger.debug('My parent template is: %s', self.parent)
|
logger.debug('My parent template is: %s', self.parent)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@@ -305,30 +310,25 @@ class IndexInstanceNode(MPTTModel):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.value
|
return self.value
|
||||||
|
|
||||||
def delete_empty(self, acquire_lock=True):
|
def delete_empty(self):
|
||||||
"""
|
# Prevent another process to delete this node.
|
||||||
The argument `acquire_lock` controls whether or not this method
|
try:
|
||||||
acquires or lock. The case for this is to acquire when called directly
|
|
||||||
or not to acquire when called as part of a larger index process
|
|
||||||
that already has a lock
|
|
||||||
"""
|
|
||||||
# Prevent another process to work on this node. We use the node's
|
|
||||||
# parent template node for the lock
|
|
||||||
if acquire_lock:
|
|
||||||
lock = locking_backend.acquire_lock(
|
lock = locking_backend.acquire_lock(
|
||||||
self.index_template_node.get_lock_string()
|
self.index_template_node.get_lock_string()
|
||||||
)
|
)
|
||||||
# Start transaction after the lock in case the locking backend uses
|
except LockError:
|
||||||
# the database.
|
raise
|
||||||
with transaction.atomic():
|
else:
|
||||||
|
try:
|
||||||
if self.documents.count() == 0 and self.get_children().count() == 0:
|
if self.documents.count() == 0 and self.get_children().count() == 0:
|
||||||
if self.parent:
|
if not self.is_root_node():
|
||||||
self.parent.delete_empty(acquire_lock=False)
|
# I'm not a root node, I can be deleted
|
||||||
# Delete ourselves after the parent is deleted
|
|
||||||
# Sound counterintuitive but otherwise the tree becomes
|
|
||||||
# corrupted. Reference: test_models.test_date_based_index
|
|
||||||
self.delete()
|
self.delete()
|
||||||
if acquire_lock:
|
|
||||||
|
if self.parent.is_root_node():
|
||||||
|
# My parent is not a root node, it can be deleted
|
||||||
|
self.parent.delete_empty()
|
||||||
|
finally:
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
def get_absolute_url(self):
|
def get_absolute_url(self):
|
||||||
@@ -370,10 +370,13 @@ class IndexInstanceNode(MPTTModel):
|
|||||||
else:
|
else:
|
||||||
return self.get_children().count()
|
return self.get_children().count()
|
||||||
|
|
||||||
|
def get_lock_string(self):
|
||||||
|
return 'indexing:index_instance_node_{}'.format(self.pk)
|
||||||
|
|
||||||
def index(self):
|
def index(self):
|
||||||
return IndexInstance.objects.get(pk=self.index_template_node.index.pk)
|
return IndexInstance.objects.get(pk=self.index_template_node.index.pk)
|
||||||
|
|
||||||
def remove_document(self, document, acquire_lock=True):
|
def remove_document(self, document):
|
||||||
"""
|
"""
|
||||||
The argument `acquire_lock` controls whether or not this method
|
The argument `acquire_lock` controls whether or not this method
|
||||||
acquires or lock. The case for this is to acquire when called directly
|
acquires or lock. The case for this is to acquire when called directly
|
||||||
@@ -382,15 +385,16 @@ class IndexInstanceNode(MPTTModel):
|
|||||||
"""
|
"""
|
||||||
# Prevent another process to work on this node. We use the node's
|
# Prevent another process to work on this node. We use the node's
|
||||||
# parent template node for the lock
|
# parent template node for the lock
|
||||||
if acquire_lock:
|
try:
|
||||||
lock = locking_backend.acquire_lock(
|
lock = locking_backend.acquire_lock(
|
||||||
self.index_template_node.get_lock_string()
|
self.index_template_node.get_lock_string()
|
||||||
)
|
)
|
||||||
|
except LockError:
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
try:
|
||||||
self.documents.remove(document)
|
self.documents.remove(document)
|
||||||
self.delete_empty(acquire_lock=False)
|
finally:
|
||||||
|
|
||||||
if acquire_lock:
|
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user