Files
mayan-edms/mayan/apps/common/tests/mixins.py
Roberto Rosario a7b31fc171 Refactor and implement download code natively
- Use modified port of Django 2.2 FileResponse.
- Remove Django DownloadView library.

Signed-off-by: Roberto Rosario <roberto.rosario@mayan-edms.com>
2019-12-12 19:39:44 -04:00

553 lines
19 KiB
Python

from __future__ import unicode_literals
import glob
import importlib
import logging
import os
import random
from furl import furl
from django.apps import apps
from django.conf import settings
from django.conf.urls import url
from django.contrib.contenttypes.models import ContentType
from django.db import connection, connections, models
from django.db.models.signals import post_save, pre_save
from django.http import HttpResponse
from django.template import Context, Template
from django.test.utils import ContextList
from django.urls import clear_url_caches, reverse
from django.utils.encoding import (
DjangoUnicodeDecodeError, force_bytes, force_text
)
from django.utils.six import PY3
from mayan.apps.acls.classes import ModelPermission
from mayan.apps.storage.settings import setting_temporary_directory
from ..compat import FileResponse
from .literals import (
TEST_SERVER_HOST, TEST_SERVER_SCHEME, TEST_VIEW_NAME, TEST_VIEW_URL
)
if getattr(settings, 'COMMON_TEST_FILE_HANDLES', False):
import psutil
class ClientMethodsTestCaseMixin(object):
def _build_verb_kwargs(self, viewname=None, path=None, *args, **kwargs):
data = kwargs.pop('data', {})
follow = kwargs.pop('follow', False)
query = kwargs.pop('query', {})
headers = kwargs.pop('headers', {})
if viewname:
path = reverse(viewname=viewname, *args, **kwargs)
path = furl(url=path)
path.args.update(query)
result = {'follow': follow, 'data': data, 'path': path.tostr()}
result.update(headers)
return result
def delete(self, viewname=None, path=None, *args, **kwargs):
return self.client.delete(
**self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
def generic(self, method, viewname=None, path=None, *args, **kwargs):
return self.client.generic(
method=method, **self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
def get(self, viewname=None, path=None, *args, **kwargs):
return self.client.get(
**self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
def patch(self, viewname=None, path=None, *args, **kwargs):
return self.client.patch(
**self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
def post(self, viewname=None, path=None, *args, **kwargs):
return self.client.post(
**self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
def put(self, viewname=None, path=None, *args, **kwargs):
return self.client.put(
**self._build_verb_kwargs(
path=path, viewname=viewname, *args, **kwargs
)
)
class ConnectionsCheckTestCaseMixin(object):
_open_connections_check_enable = True
def _get_open_connections_count(self):
return len(connections.all())
def setUp(self):
super(ConnectionsCheckTestCaseMixin, self).setUp()
self._connections_count = self._get_open_connections_count()
def tearDown(self):
if self._open_connections_check_enable:
self.assertEqual(
self._connections_count, self._get_open_connections_count(),
msg='Database connection leak. The number of database '
'connections at the start and at the end of the test are not '
'the same.'
)
super(ConnectionsCheckTestCaseMixin, self).tearDown()
class ContentTypeCheckTestCaseMixin(object):
expected_content_types = ('text/html', 'text/html; charset=utf-8')
def _pre_setup(self):
super(ContentTypeCheckTestCaseMixin, self)._pre_setup()
test_instance = self
class CustomClient(self.client_class):
def request(self, *args, **kwargs):
response = super(CustomClient, self).request(*args, **kwargs)
content_type = response._headers.get('content-type', [None, ''])[1]
if test_instance.expected_content_types:
test_instance.assertTrue(
content_type in test_instance.expected_content_types,
msg='Unexpected response content type: {}, expected: {}.'.format(
content_type, ' or '.join(test_instance.expected_content_types)
)
)
return response
self.client = CustomClient()
class DownloadTestCaseMixin(object):
def assert_download_response(
self, response, content=None, filename=None, is_attachment=None,
mime_type=None
):
self.assertTrue(isinstance(response, FileResponse))
if filename:
self.assertEqual(
response[
'Content-Disposition'
].split('filename="')[1].split('"')[0], filename
)
if content:
response_content = b''.join(list(response))
try:
response_content = force_text(response_content)
except DjangoUnicodeDecodeError:
"""Leave as bytes"""
self.assertEqual(response_content, content)
if is_attachment is not None:
self.assertEqual(response['Content-Disposition'], 'attachment')
if mime_type:
self.assertTrue(response['Content-Type'].startswith(mime_type))
class EnvironmentTestCaseMixin(object):
def setUp(self):
super(EnvironmentTestCaseMixin, self).setUp()
self._test_environment_variables = []
def tearDown(self):
for name in self._test_environment_variables:
os.environ.pop(name)
super(EnvironmentTestCaseMixin, self).tearDown()
def _set_environment_variable(self, name, value):
self._test_environment_variables.append(name)
os.environ[name] = value
class ModelTestCaseMixin(object):
def _model_instance_to_dictionary(self, instance):
return instance._meta.model._default_manager.filter(
pk=instance.pk
).values()[0]
class OpenFileCheckTestCaseMixin(object):
def _get_descriptor_count(self):
process = psutil.Process()
return process.num_fds()
def _get_open_files(self):
process = psutil.Process()
return process.open_files()
def setUp(self):
super(OpenFileCheckTestCaseMixin, self).setUp()
if getattr(settings, 'COMMON_TEST_FILE_HANDLES', False):
self._open_files = self._get_open_files()
def tearDown(self):
if getattr(settings, 'COMMON_TEST_FILE_HANDLES', False) and not getattr(self, '_skip_file_descriptor_test', False):
for new_open_file in self._get_open_files():
self.assertFalse(
new_open_file not in self._open_files,
msg='File descriptor leak. The number of file descriptors '
'at the start and at the end of the test are not the same.'
)
self._skip_file_descriptor_test = False
super(OpenFileCheckTestCaseMixin, self).tearDown()
class RandomPrimaryKeyModelMonkeyPatchMixin(object):
random_primary_key_random_floor = 100
random_primary_key_random_ceiling = 10000
random_primary_key_maximum_attempts = 100
random_primary_key_enable = True
@staticmethod
def get_unique_primary_key(model):
pk_list = model._meta.default_manager.values_list('pk', flat=True)
attempts = 0
while True:
primary_key = random.randint(
RandomPrimaryKeyModelMonkeyPatchMixin.random_primary_key_random_floor,
RandomPrimaryKeyModelMonkeyPatchMixin.random_primary_key_random_ceiling
)
if primary_key not in pk_list:
break
attempts = attempts + 1
if attempts > RandomPrimaryKeyModelMonkeyPatchMixin.random_primary_key_maximum_attempts:
raise Exception(
'Maximum number of retries for an unique random primary '
'key reached.'
)
return primary_key
def setUp(self):
if self.random_primary_key_enable:
self.method_save_original = models.Model.save
def method_save_new(instance, *args, **kwargs):
if instance.pk:
return self.method_save_original(instance, *args, **kwargs)
else:
# Set meta.auto_created to True to have the original save_base
# not send the pre_save signal which would normally send
# the instance without a primary key. Since we assign a random
# primary key any pre_save signal handler that relies on an
# empty primary key will fail.
# The meta.auto_created and manual pre_save sending emulates
# the original behavior. Since meta.auto_created also disables
# the post_save signal we must also send it ourselves.
# This hack work with Django 1.11 .save_base() but can break
# in future versions if that method is updated.
pre_save.send(
sender=instance.__class__, instance=instance, raw=False,
update_fields=None,
)
instance._meta.auto_created = True
instance.pk = RandomPrimaryKeyModelMonkeyPatchMixin.get_unique_primary_key(
model=instance._meta.model
)
instance.id = instance.pk
result = instance.save_base(force_insert=True)
instance._meta.auto_created = False
post_save.send(
sender=instance.__class__, instance=instance, created=True,
update_fields=None, raw=False
)
return result
setattr(models.Model, 'save', method_save_new)
super(RandomPrimaryKeyModelMonkeyPatchMixin, self).setUp()
def tearDown(self):
if self.random_primary_key_enable:
models.Model.save = self.method_save_original
super(RandomPrimaryKeyModelMonkeyPatchMixin, self).tearDown()
class SilenceLoggerTestCaseMixin(object):
"""
Changes the log level of a specific logger for the duration of a test.
The default level for silenced loggers is CRITICAL.
Example: self._silence_logger(name='mayan.apps.converter.managers')
"""
test_case_silenced_logger = None
test_case_silenced_logger_new_level = logging.CRITICAL
def tearDown(self):
if self.test_case_silenced_logger:
self.test_case_silenced_logger.setLevel(
level=self.test_case_silenced_logger_level
)
super(SilenceLoggerTestCaseMixin, self).tearDown()
def _silence_logger(self, name):
self.test_case_silenced_logger = logging.getLogger(name=name)
self.test_case_silenced_logger_level = self.test_case_silenced_logger.level
self.test_case_silenced_logger.setLevel(
level=self.test_case_silenced_logger_new_level
)
class TempfileCheckTestCasekMixin(object):
# Ignore the jvmstat instrumentation and GitLab's CI .config files
# Ignore LibreOffice fontconfig cache dir
ignore_globs = ('hsperfdata_*', '.config', '.cache')
def _get_temporary_entries(self):
ignored_result = []
# Expand globs by joining the temporary directory and then flattening
# the list of lists into a single list
for item in self.ignore_globs:
ignored_result.extend(
glob.glob(
os.path.join(setting_temporary_directory.value, item)
)
)
# Remove the path and leave only the expanded filename
ignored_result = map(lambda x: os.path.split(x)[-1], ignored_result)
return set(
os.listdir(setting_temporary_directory.value)
) - set(ignored_result)
def setUp(self):
super(TempfileCheckTestCasekMixin, self).setUp()
if getattr(settings, 'COMMON_TEST_TEMP_FILES', False):
self._temporary_items = self._get_temporary_entries()
def tearDown(self):
if getattr(settings, 'COMMON_TEST_TEMP_FILES', False):
final_temporary_items = self._get_temporary_entries()
self.assertEqual(
self._temporary_items, final_temporary_items,
msg='Orphan temporary file. The number of temporary files and/or '
'directories at the start and at the end of the test are not the '
'same. Orphan entries: {}'.format(
','.join(final_temporary_items - self._temporary_items)
)
)
super(TempfileCheckTestCasekMixin, self).tearDown()
class TestModelTestMixin(object):
_test_models = []
def tearDown(self):
# Delete the test models' content type entries and deregister the
# permissions, this avoids their Content Type from being looked up
# in subsequent tests where they don't exists due to the database
# transaction rollback.
for model in self._test_models:
content_type = ContentType.objects.get_for_model(model=model)
if content_type.pk:
content_type.delete()
ModelPermission.deregister(model=model)
super(TestModelTestMixin, self).tearDown()
def _get_test_model_meta(self):
self.db_table = '{}_{}'.format(
self.app_config.label, self.model_name.lower()
)
class Meta(object):
app_label = self.app_config.label
db_table = self.db_table
verbose_name = self.model_name
if self.options:
for key, value in self.options.items():
setattr(Meta, key, value)
return Meta
def _get_test_model_save_method(self):
def save(instance, *args, **kwargs):
# Custom .save() method to use random primary key values.
if instance.pk:
return models.Model.self(instance, *args, **kwargs)
else:
instance.pk = RandomPrimaryKeyModelMonkeyPatchMixin.get_unique_primary_key(
model=instance._meta.model
)
instance.id = instance.pk
return instance.save_base(force_insert=True)
return save
def _create_test_model(
self, base_class=models.Model, fields=None, model_name=None,
options=None
):
self.model_name = model_name or 'TestModel'
self.options = options
# Obtain the app_config and app_label from the test's module path
self.app_config = apps.get_containing_app_config(
object_name=self.__class__.__module__
)
if connection.vendor == 'mysql':
self.skipTest(
reason='MySQL doesn\'t support schema changes inside an '
'atomic block.'
)
attrs = {
'__module__': self.__class__.__module__,
'save': self._get_test_model_save_method(),
'Meta': self._get_test_model_meta(),
}
if fields:
attrs.update(fields)
# Clear previous model registration before re-registering it again to
# avoid conflict with test models with the same name, in the same app
# but from another test module.
apps.all_models[self.app_config.label].pop(self.model_name.lower(), None)
if PY3:
model = type(
self.model_name, (base_class,), attrs
)
else:
model = type(
force_bytes(self.model_name), (base_class,), attrs
)
if not model._meta.proxy:
with connection.schema_editor() as schema_editor:
schema_editor.create_model(model=model)
self._test_models.append(model)
ContentType.objects.clear_cache()
return model
class TestServerTestCaseMixin(object):
def setUp(self):
super(TestServerTestCaseMixin, self).setUp()
self.testserver_prefix = self.get_testserver_prefix()
self.testserver_url = self.get_testserver_url()
self.test_view_request = None
def _test_view_factory(self, test_object=None):
def test_view(request):
self.test_view_request = request
return HttpResponse()
return test_view
def get_testserver_prefix(self):
return furl(
scheme=TEST_SERVER_SCHEME, host=TEST_SERVER_HOST,
).tostr()
def get_testserver_url(self):
return furl(
scheme=TEST_SERVER_SCHEME, host=TEST_SERVER_HOST,
path=self.test_view_url
).tostr()
class TestViewTestCaseMixin(object):
auto_add_test_view = False
has_test_view = False
test_view_object = None
test_view_name = TEST_VIEW_NAME
test_view_url = TEST_VIEW_URL
def setUp(self):
super(TestViewTestCaseMixin, self).setUp()
if self.auto_add_test_view:
self.add_test_view(test_object=self.test_view_object)
def tearDown(self):
urlconf = importlib.import_module(settings.ROOT_URLCONF)
self.client.logout()
if self.has_test_view:
urlconf.urlpatterns.pop(0)
super(TestViewTestCaseMixin, self).tearDown()
def _test_view_factory(self, test_object=None):
def test_view(request):
template = Template('{{ object }}')
context = Context(
{'object': test_object, 'resolved_object': test_object}
)
return HttpResponse(template.render(context=context))
return test_view
def add_test_view(self, test_object=None):
urlconf = importlib.import_module(settings.ROOT_URLCONF)
urlconf.urlpatterns.insert(
0, url(
regex=self.test_view_url, view=self._test_view_factory(
test_object=test_object
), name=self.test_view_name
)
)
clear_url_caches()
self.has_test_view = True
def get_test_view(self):
response = self.get(viewname=self.test_view_name)
if isinstance(response.context, ContextList):
# template widget rendering causes test client response to be
# ContextList rather than RequestContext. Typecast to dictionary
# before updating.
result = dict(response.context).copy()
result.update({'request': response.wsgi_request})
return Context(result)
else:
result = response.context or {}
result.update({'request': response.wsgi_request})
return Context(result)