From 155f543db4ef72f54634b56cdf0fb2c887e982a2 Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Fri, 2 Jan 2015 21:07:01 -0400 Subject: [PATCH] Avoid concurrent indexing of the same document --- mayan/apps/document_indexing/tasks.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/mayan/apps/document_indexing/tasks.py b/mayan/apps/document_indexing/tasks.py index 4fed73bb41..87d4448202 100644 --- a/mayan/apps/document_indexing/tasks.py +++ b/mayan/apps/document_indexing/tasks.py @@ -2,11 +2,13 @@ import logging from mayan.celery import app from documents.models import Document +from lock_manager import Lock, LockError from .api import update_indexes, delete_indexes from .tools import do_rebuild_all_indexes logger = logging.getLogger(__name__) +RETRY_DELAY = 20 # TODO: convert this into a config option @app.task(ignore_result=True) @@ -15,12 +17,24 @@ def task_delete_indexes(document_id): delete_indexes(document) -@app.task(ignore_result=True) -def task_update_indexes(document_id): - document = Document.objects.get(pk=document_id) - update_indexes(document) +@app.task(bind=True, ignore_result=True) +def task_update_indexes(self, document_id): + try: + lock = Lock.acquire_lock('document_indexing_task_update_index_document_%d' % document_id) + except LockError as exception: + # This document is being reindexed by another task, retry later + raise self.retry(exc=exception, countdown=RETRY_DELAY) + else: + try: + document = Document.objects.get(pk=document_id) + except Document.DoesNotExist: + # Document was deleted before we could execute, abort about updating + pass + else: + update_indexes(document) @app.task(ignore_result=True) def task_do_rebuild_all_indexes(): + # TODO: Find a way to rebuild after all pending updates are finished do_rebuild_all_indexes()