Add plugable locking backend support. Add threadsafe file lock backend.
This commit is contained in:
@@ -46,7 +46,7 @@ class CheckoutsApp(MayanAppConfig):
|
||||
DashboardWidget(
|
||||
icon='fa fa-shopping-cart',
|
||||
queryset=DocumentCheckout.objects.all(),
|
||||
label=_('Checkout documents'),
|
||||
label=_('Checkedout documents'),
|
||||
link=reverse_lazy('checkouts:checkout_list')
|
||||
)
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
from django.apps import apps
|
||||
|
||||
from lock_manager import LockError
|
||||
from lock_manager.runtime import locking_backend
|
||||
from mayan.celery import app
|
||||
|
||||
from .literals import CHECKOUT_EXPIRATION_LOCK_EXPIRE
|
||||
@@ -17,15 +18,12 @@ def task_check_expired_check_outs():
|
||||
DocumentCheckout = apps.get_model(
|
||||
app_label='checkouts', model_name='DocumentCheckout'
|
||||
)
|
||||
Lock = apps.get_model(
|
||||
app_label='lock_manager', model_name='Lock'
|
||||
)
|
||||
|
||||
logger.debug('executing...')
|
||||
lock_id = 'task_expired_check_outs'
|
||||
try:
|
||||
logger.debug('trying to acquire lock: %s', lock_id)
|
||||
lock = Lock.objects.acquire_lock(
|
||||
lock = locking_backend.acquire_lock(
|
||||
name=lock_id, timeout=CHECKOUT_EXPIRATION_LOCK_EXPIRE
|
||||
)
|
||||
logger.debug('acquired lock: %s', lock_id)
|
||||
|
||||
@@ -7,6 +7,7 @@ from django.db import OperationalError
|
||||
|
||||
from mayan.celery import app
|
||||
from lock_manager import LockError
|
||||
from lock_manager.runtime import locking_backend
|
||||
|
||||
from .literals import RETRY_DELAY
|
||||
|
||||
@@ -18,12 +19,9 @@ def task_delete_empty_index_nodes(self):
|
||||
IndexInstanceNode = apps.get_model(
|
||||
app_label='document_indexing', model_name='IndexInstanceNode'
|
||||
)
|
||||
Lock = apps.get_model(
|
||||
app_label='lock_manager', model_name='Lock'
|
||||
)
|
||||
|
||||
try:
|
||||
rebuild_lock = Lock.objects.acquire_lock(
|
||||
rebuild_lock = locking_backend.acquire_lock(
|
||||
'document_indexing_task_do_rebuild_all_indexes'
|
||||
)
|
||||
except LockError as exception:
|
||||
@@ -46,12 +44,8 @@ def task_index_document(self, document_id):
|
||||
app_label='document_indexing', model_name='IndexInstanceNode'
|
||||
)
|
||||
|
||||
Lock = apps.get_model(
|
||||
app_label='lock_manager', model_name='Lock'
|
||||
)
|
||||
|
||||
try:
|
||||
rebuild_lock = Lock.objects.acquire_lock(
|
||||
rebuild_lock = locking_backend.acquire_lock(
|
||||
'document_indexing_task_do_rebuild_all_indexes'
|
||||
)
|
||||
except LockError as exception:
|
||||
@@ -59,7 +53,7 @@ def task_index_document(self, document_id):
|
||||
raise self.retry(exc=exception)
|
||||
else:
|
||||
try:
|
||||
lock = Lock.objects.acquire_lock(
|
||||
lock = locking_backend.acquire_lock(
|
||||
'document_indexing_task_update_index_document_%d' % document_id
|
||||
)
|
||||
except LockError as exception:
|
||||
@@ -96,16 +90,8 @@ def task_do_rebuild_all_indexes(self):
|
||||
app_label='document_indexing', model_name='IndexInstanceNode'
|
||||
)
|
||||
|
||||
Lock = apps.get_model(
|
||||
app_label='lock_manager', model_name='Lock'
|
||||
)
|
||||
|
||||
if Lock.objects.check_existing(name__startswith='document_indexing_task_update_index_document'):
|
||||
# A document index update is happening, wait
|
||||
raise self.retry()
|
||||
|
||||
try:
|
||||
lock = Lock.objects.acquire_lock(
|
||||
lock = locking_backend.acquire_lock(
|
||||
'document_indexing_task_do_rebuild_all_indexes'
|
||||
)
|
||||
except LockError as exception:
|
||||
|
||||
0
mayan/apps/lock_manager/backends/__init__.py
Normal file
0
mayan/apps/lock_manager/backends/__init__.py
Normal file
92
mayan/apps/lock_manager/backends/file_lock.py
Normal file
92
mayan/apps/lock_manager/backends/file_lock.py
Normal file
@@ -0,0 +1,92 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import cPickle as pickle
|
||||
import datetime
|
||||
import logging
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from django.core.files import locks
|
||||
|
||||
from common.utils import mkstemp
|
||||
|
||||
from ..exceptions import LockError
|
||||
|
||||
lock = threading.Lock()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
temporary_file = mkstemp()[1]
|
||||
logger.debug('temporary_file: %s', temporary_file)
|
||||
|
||||
|
||||
class FileLock(object):
|
||||
@classmethod
|
||||
def acquire_lock(cls, name, timeout=None):
|
||||
instance = FileLock(name=name, timeout=timeout)
|
||||
return instance
|
||||
|
||||
def _get_lock_dictionary(self):
|
||||
if self.timeout:
|
||||
result = {
|
||||
'expiration': datetime.datetime.now() + datetime.timedelta(seconds=self.timeout),
|
||||
'uuid': self.uuid
|
||||
}
|
||||
else:
|
||||
result = {
|
||||
'expiration': 0,
|
||||
'uuid': self.uuid
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
def __init__(self, name, timeout):
|
||||
self.name = name
|
||||
self.timeout = timeout or 0
|
||||
self.uuid = uuid.uuid4().get_hex()
|
||||
|
||||
lock.acquire()
|
||||
with open(temporary_file, 'r+') as file_object:
|
||||
locks.lock(f=file_object, flags=locks.LOCK_EX)
|
||||
|
||||
try:
|
||||
file_locks = pickle.loads(file_object.read())
|
||||
except EOFError:
|
||||
file_locks = {}
|
||||
|
||||
if name in file_locks:
|
||||
# Someone already got this lock, check to see if it is expired
|
||||
if file_locks[name]['expiration'] and datetime.datetime.now() > file_locks[name]['expiration']:
|
||||
# It expires and has expired, we re-acquired it
|
||||
file_locks[name] = self._get_lock_dictionary()
|
||||
else:
|
||||
lock.release()
|
||||
raise LockError
|
||||
else:
|
||||
file_locks[name] = self._get_lock_dictionary()
|
||||
|
||||
file_object.seek(0)
|
||||
file_object.write(pickle.dumps(file_locks))
|
||||
lock.release()
|
||||
|
||||
def release(self):
|
||||
lock.acquire()
|
||||
with open(temporary_file, 'r+') as file_object:
|
||||
locks.lock(f=file_object, flags=locks.LOCK_EX)
|
||||
try:
|
||||
file_locks = pickle.loads(file_object.read())
|
||||
except EOFError:
|
||||
file_locks = {}
|
||||
|
||||
if self.name in file_locks:
|
||||
if file_locks[self.name]['uuid'] == self.uuid:
|
||||
file_locks.pop(self.name)
|
||||
else:
|
||||
# Lock expired and someone else acquired it
|
||||
pass
|
||||
else:
|
||||
# Lock expired and someone else released it
|
||||
pass
|
||||
|
||||
file_object.seek(0)
|
||||
file_object.write(pickle.dumps(file_locks))
|
||||
lock.release()
|
||||
10
mayan/apps/lock_manager/backends/model_lock.py
Normal file
10
mayan/apps/lock_manager/backends/model_lock.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.apps import apps
|
||||
|
||||
|
||||
class ModelLock(object):
|
||||
@classmethod
|
||||
def acquire_lock(cls, name, timeout=None):
|
||||
Lock = apps.get_model(app_label='lock_manager', model_name='Lock')
|
||||
return Lock.objects.acquire_lock(name=name, timeout=timeout)
|
||||
@@ -49,21 +49,3 @@ class LockManager(models.Manager):
|
||||
else:
|
||||
logger.debug('acquired lock: %s', name)
|
||||
return lock
|
||||
|
||||
def check_existing(self, **kwargs):
|
||||
try:
|
||||
existing_lock = self.get(**kwargs)
|
||||
except self.model.DoesNotExist:
|
||||
return False
|
||||
else:
|
||||
# Lock exists, try to re-acquire it in case it is a stale lock
|
||||
try:
|
||||
lock = self.acquire_lock(existing_lock.name)
|
||||
except LockError:
|
||||
# This is expected, try to acquire it to force it to
|
||||
# timeout in case it is a stale lock.
|
||||
return True
|
||||
else:
|
||||
# Able to re-acquire anothers lock, so we release it now
|
||||
lock.release()
|
||||
return False
|
||||
|
||||
@@ -5,7 +5,7 @@ from django.utils.encoding import python_2_unicode_compatible
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from .managers import LockManager
|
||||
from .settings import DEFAULT_LOCK_TIMEOUT
|
||||
from .settings import setting_default_lock_timeout
|
||||
|
||||
|
||||
@python_2_unicode_compatible
|
||||
@@ -14,7 +14,7 @@ class Lock(models.Model):
|
||||
auto_now_add=True, verbose_name=_('Creation datetime')
|
||||
)
|
||||
timeout = models.IntegerField(
|
||||
default=DEFAULT_LOCK_TIMEOUT, verbose_name=_('Timeout')
|
||||
default=setting_default_lock_timeout.value, verbose_name=_('Timeout')
|
||||
)
|
||||
name = models.CharField(
|
||||
max_length=64, unique=True, verbose_name=_('Name')
|
||||
@@ -27,7 +27,7 @@ class Lock(models.Model):
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
if not self.timeout and not kwargs.get('timeout'):
|
||||
self.timeout = DEFAULT_LOCK_TIMEOUT
|
||||
self.timeout = setting_default_lock_timeout.value
|
||||
|
||||
super(Lock, self).save(*args, **kwargs)
|
||||
|
||||
|
||||
5
mayan/apps/lock_manager/runtime.py
Normal file
5
mayan/apps/lock_manager/runtime.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from .settings import setting_backend
|
||||
|
||||
locking_backend = import_string(setting_backend.value)
|
||||
@@ -1,9 +1,20 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from smart_settings import Namespace
|
||||
|
||||
DEFAULT_BACKEND = 'lock_manager.backends.model_lock.ModelLock'
|
||||
DEFAULT_LOCK_TIMEOUT_VALUE = 30
|
||||
|
||||
DEFAULT_LOCK_TIMEOUT = getattr(
|
||||
settings, 'LOCK_MANAGER_DEFAULT_LOCK_TIMEOUT', DEFAULT_LOCK_TIMEOUT_VALUE
|
||||
namespace = Namespace(name='lock_manager', label=_('Lock manager'))
|
||||
|
||||
setting_backend = namespace.add_setting(
|
||||
default=DEFAULT_BACKEND,
|
||||
global_name='LOCK_MANAGER_DEFAULT_BACKEND',
|
||||
)
|
||||
|
||||
setting_default_lock_timeout = namespace.add_setting(
|
||||
default=DEFAULT_LOCK_TIMEOUT_VALUE,
|
||||
global_name='LOCK_MANAGER_DEFAULT_LOCK_TIMEOUT',
|
||||
)
|
||||
|
||||
@@ -3,48 +3,53 @@ from __future__ import unicode_literals
|
||||
import time
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from ..exceptions import LockError
|
||||
from ..models import Lock
|
||||
|
||||
|
||||
class LockTestCase(TestCase):
|
||||
class FileLockTestCase(TestCase):
|
||||
backend_string = 'lock_manager.backends.file_lock.FileLock'
|
||||
|
||||
def setUp(self):
|
||||
self.locking_backend = import_string(self.backend_string)
|
||||
|
||||
def test_exclusive(self):
|
||||
lock_1 = Lock.objects.acquire_lock(name='test_lock_1')
|
||||
lock_1 = self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
with self.assertRaises(LockError):
|
||||
Lock.objects.acquire_lock(name='test_lock_1')
|
||||
self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
|
||||
# Cleanup
|
||||
lock_1.release()
|
||||
|
||||
def test_release(self):
|
||||
lock_1 = Lock.objects.acquire_lock(name='test_lock_1')
|
||||
lock_1 = self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
lock_1.release()
|
||||
lock_2 = Lock.objects.acquire_lock(name='test_lock_1')
|
||||
lock_2 = self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
|
||||
# Cleanup
|
||||
lock_2.release()
|
||||
|
||||
def test_timeout_expired(self):
|
||||
Lock.objects.acquire_lock(name='test_lock_1', timeout=1)
|
||||
self.locking_backend.acquire_lock(name='test_lock_1', timeout=1)
|
||||
|
||||
# lock_1 not release and not expired, should raise LockError
|
||||
with self.assertRaises(LockError):
|
||||
Lock.objects.acquire_lock(name='test_lock_1')
|
||||
self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
|
||||
time.sleep(2)
|
||||
# lock_1 not release but has expired, should not raise LockError
|
||||
lock_2 = Lock.objects.acquire_lock(name='test_lock_1')
|
||||
lock_2 = self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
|
||||
# Cleanup
|
||||
lock_2.release()
|
||||
|
||||
def test_double_release(self):
|
||||
lock_1 = Lock.objects.acquire_lock(name='test_lock_1')
|
||||
lock_1 = self.locking_backend.acquire_lock(name='test_lock_1')
|
||||
lock_1.release()
|
||||
|
||||
def test_release_expired(self):
|
||||
lock_1 = Lock.objects.acquire_lock(name='test_lock_1', timeout=1)
|
||||
lock_1 = self.locking_backend.acquire_lock(name='test_lock_1', timeout=1)
|
||||
time.sleep(2)
|
||||
lock_1.release()
|
||||
# No exception is raised even though the lock has expired.
|
||||
@@ -54,7 +59,11 @@ class LockTestCase(TestCase):
|
||||
|
||||
def test_release_expired_reaquired(self):
|
||||
time.sleep(2)
|
||||
lock_2 = Lock.objects.acquire_lock(name='test_lock_1', timeout=1)
|
||||
lock_2 = self.locking_backend.acquire_lock(name='test_lock_1', timeout=1)
|
||||
|
||||
# Cleanup
|
||||
lock_2.release()
|
||||
|
||||
|
||||
class ModelLockTestCase(FileLockTestCase):
|
||||
backend_string = 'lock_manager.backends.model_lock.ModelLock'
|
||||
@@ -10,6 +10,7 @@ from django.db import OperationalError
|
||||
|
||||
from documents.models import DocumentVersion
|
||||
from lock_manager import LockError
|
||||
from lock_manager.runtime import locking_backend
|
||||
from mayan.celery import app
|
||||
|
||||
from .classes import TextExtractor
|
||||
@@ -22,16 +23,12 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
@app.task(bind=True, default_retry_delay=DO_OCR_RETRY_DELAY, ignore_result=True)
|
||||
def task_do_ocr(self, document_version_pk):
|
||||
Lock = apps.get_model(
|
||||
app_label='lock_manager', model_name='Lock'
|
||||
)
|
||||
|
||||
lock_id = 'task_do_ocr_doc_version-%d' % document_version_pk
|
||||
try:
|
||||
logger.debug('trying to acquire lock: %s', lock_id)
|
||||
# Acquire lock to avoid doing OCR on the same document version more than
|
||||
# once concurrently
|
||||
lock = Lock.objects.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
lock = locking_backend.acquire_lock(lock_id, LOCK_EXPIRE)
|
||||
logger.debug('acquired lock: %s', lock_id)
|
||||
document_version = None
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user