Cache: Beta stage
Add retry to race condition in intermediate image generation. Remove DB index from cache file size field. Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by Django 1.11.16 on 2018-12-02 08:03
|
||||
# Generated by Django 1.11.16 on 2018-12-03 08:12
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
@@ -45,7 +45,7 @@ class Migration(migrations.Migration):
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('datetime', models.DateTimeField(auto_now_add=True, db_index=True, verbose_name='Date time')),
|
||||
('filename', models.CharField(max_length=255, verbose_name='Filename')),
|
||||
('file_size', models.PositiveIntegerField(db_index=True, default=0, verbose_name='File size')),
|
||||
('file_size', models.PositiveIntegerField(default=0, verbose_name='File size')),
|
||||
('partition', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='files', to='common.CachePartition', verbose_name='Cache partition')),
|
||||
],
|
||||
options={
|
||||
@@ -10,7 +10,7 @@ from django.conf import settings
|
||||
from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.files.base import ContentFile
|
||||
from django.db import models, transaction, OperationalError
|
||||
from django.db import models, transaction
|
||||
from django.db.models import Sum
|
||||
from django.utils.encoding import force_text, python_2_unicode_compatible
|
||||
from django.utils.functional import cached_property
|
||||
@@ -92,13 +92,16 @@ class CachePartition(models.Model):
|
||||
lock_id = 'cache_partition-create_file-{}-{}'.format(self.pk, filename)
|
||||
try:
|
||||
logger.debug('trying to acquire lock: %s', lock_id)
|
||||
lock = locking_backend.acquire_lock(lock_id)#, LOCK_EXPIRE)
|
||||
lock = locking_backend.acquire_lock(lock_id)
|
||||
logger.debug('acquired lock: %s', lock_id)
|
||||
try:
|
||||
self.cache.prune()
|
||||
|
||||
# Since open "wb+" doesn't create files force the creation of an
|
||||
# empty file.
|
||||
self.cache.storage.delete(
|
||||
name=self.get_full_filename(filename=filename)
|
||||
)
|
||||
self.cache.storage.save(
|
||||
name=self.get_full_filename(filename=filename),
|
||||
content=ContentFile(content='')
|
||||
@@ -112,7 +115,7 @@ class CachePartition(models.Model):
|
||||
except Exception as exception:
|
||||
logger.error(
|
||||
'Unexpected exception while trying to save new '
|
||||
'cache file.'
|
||||
'cache file; %s', exception
|
||||
)
|
||||
self.cache.storage.delete(
|
||||
name=self.get_full_filename(filename=filename)
|
||||
@@ -150,7 +153,7 @@ class CachePartitionFile(models.Model):
|
||||
)
|
||||
filename = models.CharField(max_length=255, verbose_name=_('Filename'))
|
||||
file_size = models.PositiveIntegerField(
|
||||
db_index=True, default=0, verbose_name=_('File size')
|
||||
default=0, verbose_name=_('File size')
|
||||
)
|
||||
|
||||
class Meta:
|
||||
|
||||
@@ -38,7 +38,6 @@ from .serializers import (
|
||||
WritableDocumentTypeSerializer, WritableDocumentVersionSerializer
|
||||
)
|
||||
from .settings import settings_document_page_image_cache_time
|
||||
from .storages import storage_documentimagecache
|
||||
from .tasks import task_generate_document_page_image
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -234,7 +233,6 @@ class APIDocumentPageImageView(generics.RetrieveAPIView):
|
||||
)
|
||||
|
||||
cache_filename = task.get(timeout=DOCUMENT_IMAGE_TASK_TIMEOUT)
|
||||
#with storage_documentimagecache.open(cache_filename) as file_object:
|
||||
with self.get_object().cache_partition.get_file(filename=cache_filename).open() as file_object:
|
||||
response = HttpResponse(file_object.read(), content_type='image')
|
||||
if '_hash' in request.GET:
|
||||
|
||||
@@ -9,7 +9,7 @@ CHECK_TRASH_PERIOD_INTERVAL = 60
|
||||
DELETE_STALE_STUBS_INTERVAL = 60 * 10 # 10 minutes
|
||||
DEFAULT_DELETE_PERIOD = 30
|
||||
DEFAULT_DELETE_TIME_UNIT = TIME_DELTA_UNIT_DAYS
|
||||
DEFAULT_DOCUMENTS_CACHE_MAXIMUM_SIZE = 100 * 2 ** 20 # 100 Megabytes
|
||||
DEFAULT_DOCUMENTS_CACHE_MAXIMUM_SIZE = 500 * 2 ** 20 # 500 Megabytes
|
||||
DEFAULT_LANGUAGE = 'eng'
|
||||
DEFAULT_LANGUAGE_CODES = (
|
||||
'ilo', 'run', 'uig', 'hin', 'pan', 'pnb', 'wuu', 'msa', 'kxd', 'ind',
|
||||
@@ -33,6 +33,7 @@ DOCUMENT_IMAGE_TASK_TIMEOUT = 120
|
||||
DOCUMENT_IMAGES_CACHE_NAME = 'document_images'
|
||||
DOCUMENT_CACHE_STORAGE_INSTANCE_PATH = 'documents.storages.storage_documentimagecache'
|
||||
STUB_EXPIRATION_INTERVAL = 60 * 60 * 24 # 24 hours
|
||||
TASK_GENERATE_DODCUMENT_PAGE_IMAGE_RETRIES = 6
|
||||
UPDATE_PAGE_COUNT_RETRY_DELAY = 10
|
||||
UPLOAD_NEW_VERSION_RETRY_DELAY = 10
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ from furl import furl
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.core.files import File
|
||||
from django.core.files.base import ContentFile
|
||||
from django.db import models, transaction
|
||||
from django.template import Template, Context
|
||||
from django.urls import reverse
|
||||
@@ -28,7 +27,6 @@ from converter import (
|
||||
from converter.exceptions import InvalidOfficeFormat, PageCountError
|
||||
from converter.literals import DEFAULT_ZOOM_LEVEL, DEFAULT_ROTATION
|
||||
from converter.models import Transformation
|
||||
from lock_manager import LockError
|
||||
from mimetype.api import get_mimetype
|
||||
|
||||
from .events import (
|
||||
@@ -524,8 +522,6 @@ class DocumentVersion(models.Model):
|
||||
return first_page.get_api_image_url(*args, **kwargs)
|
||||
|
||||
def get_intermidiate_file(self):
|
||||
import time
|
||||
|
||||
cache_file = self.cache_partition.get_file(filename='intermediate_file')
|
||||
if cache_file:
|
||||
logger.debug('Intermidiate file found.')
|
||||
@@ -537,13 +533,9 @@ class DocumentVersion(models.Model):
|
||||
converter = converter_class(file_object=self.open())
|
||||
pdf_file_object = converter.to_pdf()
|
||||
|
||||
try:
|
||||
with self.cache_partition.create_file(filename='intermediate_file') as file_object:
|
||||
for chunk in pdf_file_object:
|
||||
file_object.write(chunk)
|
||||
except LockError:
|
||||
time.sleep(0.1)
|
||||
return self.get_intermidiate_file()
|
||||
with self.cache_partition.create_file(filename='intermediate_file') as file_object:
|
||||
for chunk in pdf_file_object:
|
||||
file_object.write(chunk)
|
||||
|
||||
return self.cache_partition.get_file(filename='intermediate_file').open()
|
||||
except InvalidOfficeFormat:
|
||||
|
||||
@@ -7,10 +7,13 @@ from django.contrib.auth import get_user_model
|
||||
from django.db import OperationalError
|
||||
|
||||
from converter.transformations import BaseTransformation
|
||||
from lock_manager import LockError
|
||||
from lock_manager.decorators import retry_on_lock_error
|
||||
from mayan.celery import app
|
||||
|
||||
from .literals import (
|
||||
UPDATE_PAGE_COUNT_RETRY_DELAY, UPLOAD_NEW_VERSION_RETRY_DELAY
|
||||
TASK_GENERATE_DODCUMENT_PAGE_IMAGE_RETRIES, UPDATE_PAGE_COUNT_RETRY_DELAY,
|
||||
UPLOAD_NEW_VERSION_RETRY_DELAY
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -76,8 +79,8 @@ def task_delete_stubs():
|
||||
logger.info('Finshed')
|
||||
|
||||
|
||||
@app.task()
|
||||
def task_generate_document_page_image(document_page_id, transformation_list=None, *args, **kwargs):
|
||||
@app.task(bind=True, retry_backoff=True, max_retries=TASK_GENERATE_DODCUMENT_PAGE_IMAGE_RETRIES, retry_jitter=True)
|
||||
def task_generate_document_page_image(self, document_page_id, transformation_list=None, *args, **kwargs):
|
||||
"""
|
||||
Arguments:
|
||||
* transformation_list: List of dictionaties with keys: name and kwargs
|
||||
@@ -97,7 +100,27 @@ def task_generate_document_page_image(document_page_id, transformation_list=None
|
||||
)(**transformation.get('kwargs', {}))
|
||||
)
|
||||
|
||||
return document_page.generate_image(transformations=transformations, *args, **kwargs)
|
||||
def task_core_function():
|
||||
return document_page.generate_image(
|
||||
transformations=transformations, *args, **kwargs
|
||||
)
|
||||
|
||||
if self.request.is_eager:
|
||||
# Task is running on eager mode, probably in development mode, so
|
||||
# retry the task manually.
|
||||
@retry_on_lock_error(
|
||||
retries=TASK_GENERATE_DODCUMENT_PAGE_IMAGE_RETRIES
|
||||
)
|
||||
def retry_task():
|
||||
return task_core_function()
|
||||
|
||||
return retry_task()
|
||||
else:
|
||||
# Setup retrying the task via Celery
|
||||
try:
|
||||
return task_core_function()
|
||||
except LockError as exception:
|
||||
raise self.retry(exc=exception)
|
||||
|
||||
|
||||
@app.task(ignore_result=True)
|
||||
|
||||
27
mayan/apps/lock_manager/decorators.py
Normal file
27
mayan/apps/lock_manager/decorators.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import time
|
||||
import random
|
||||
|
||||
from .exceptions import LockError
|
||||
|
||||
|
||||
def retry_on_lock_error(retries):
|
||||
def decorator(function):
|
||||
def wrapper():
|
||||
retry_count = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
return function()
|
||||
except LockError:
|
||||
if retry_count == retries:
|
||||
raise
|
||||
else:
|
||||
retry_count = retry_count + 1
|
||||
timeout = 2 ** retry_count
|
||||
time.sleep(timeout)
|
||||
# Add random jitter
|
||||
time.sleep(random.uniform(0.0, 1.0))
|
||||
return wrapper
|
||||
return decorator
|
||||
Reference in New Issue
Block a user