Add locking to avoid index rebuild clashing with index updates or index node deletion
This commit is contained in:
@@ -11,32 +11,60 @@ logger = logging.getLogger(__name__)
|
|||||||
RETRY_DELAY = 20 # TODO: convert this into a config option
|
RETRY_DELAY = 20 # TODO: convert this into a config option
|
||||||
|
|
||||||
|
|
||||||
@app.task(ignore_result=True)
|
@app.task(bind=True, ignore_result=True)
|
||||||
def task_delete_empty_index_nodes():
|
def task_delete_empty_index_nodes(self):
|
||||||
delete_empty_index_nodes()
|
try:
|
||||||
|
rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes')
|
||||||
|
except LockError as exception:
|
||||||
|
# A rebuild is happening, retry later
|
||||||
|
raise self.retry(exc=exception, countdown=RETRY_DELAY)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
delete_empty_index_nodes()
|
||||||
|
finally:
|
||||||
|
rebuild_lock.release()
|
||||||
|
|
||||||
|
|
||||||
@app.task(bind=True, ignore_result=True)
|
@app.task(bind=True, ignore_result=True)
|
||||||
def task_index_document(self, document_id):
|
def task_index_document(self, document_id):
|
||||||
# TODO: Add concurrent task control
|
|
||||||
try:
|
try:
|
||||||
lock = Lock.acquire_lock('document_indexing_task_update_index_document_%d' % document_id)
|
rebuild_lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes')
|
||||||
except LockError as exception:
|
except LockError as exception:
|
||||||
# This document is being reindexed by another task, retry later
|
# A rebuild is happening, retry later
|
||||||
raise self.retry(exc=exception, countdown=RETRY_DELAY)
|
raise self.retry(exc=exception, countdown=RETRY_DELAY)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
document = Document.objects.get(pk=document_id)
|
lock = Lock.acquire_lock('document_indexing_task_update_index_document_%d' % document_id)
|
||||||
except Document.DoesNotExist:
|
except LockError as exception:
|
||||||
# Document was deleted before we could execute, abort about updating
|
# This document is being reindexed by another task, retry later
|
||||||
pass
|
raise self.retry(exc=exception, countdown=RETRY_DELAY)
|
||||||
else:
|
else:
|
||||||
index_document(document)
|
try:
|
||||||
|
document = Document.objects.get(pk=document_id)
|
||||||
|
except Document.DoesNotExist:
|
||||||
|
# Document was deleted before we could execute, abort about updating
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
index_document(document)
|
||||||
|
finally:
|
||||||
|
lock.release()
|
||||||
|
finally:
|
||||||
|
rebuild_lock.release()
|
||||||
|
|
||||||
|
|
||||||
|
@app.task(bind=True, ignore_result=True)
|
||||||
|
def task_do_rebuild_all_indexes(self):
|
||||||
|
if Lock.filter(name__startswith='document_indexing_task_update_index_document'):
|
||||||
|
# A document index update is happening, wait
|
||||||
|
raise self.retry(countdown=RETRY_DELAY)
|
||||||
|
|
||||||
|
try:
|
||||||
|
lock = Lock.acquire_lock('document_indexing_task_do_rebuild_all_indexes')
|
||||||
|
except LockError as exception:
|
||||||
|
# Another rebuild is happening, retry later
|
||||||
|
raise self.retry(exc=exception, countdown=RETRY_DELAY)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
do_rebuild_all_indexes()
|
||||||
finally:
|
finally:
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
||||||
|
|
||||||
@app.task(ignore_result=True)
|
|
||||||
def task_do_rebuild_all_indexes():
|
|
||||||
# TODO: Find a way to rebuild after all pending updates are finished
|
|
||||||
do_rebuild_all_indexes()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user