Fix document download views. Fixes GL issue #229. Thanks to @ammaranjith for the find and fix idea.

This commit is contained in:
Roberto Rosario
2015-10-21 03:38:56 -04:00
parent 58f73f0b1e
commit f739dd54ac
3 changed files with 185 additions and 29 deletions

View File

@@ -137,9 +137,9 @@ class DocumentDownloadForm(forms.Form):
)
def __init__(self, *args, **kwargs):
self.document_versions = kwargs.pop('document_versions', None)
self.queryset = kwargs.pop('queryset', None)
super(DocumentDownloadForm, self).__init__(*args, **kwargs)
if len(self.document_versions) > 1:
if self.queryset.count() > 1:
self.fields['compressed'].initial = True
self.fields['compressed'].widget.attrs.update({'disabled': True})

View File

@@ -8,6 +8,7 @@ from django.core.files import File
from django.core.urlresolvers import reverse
from django.test.client import Client
from django.test import TestCase, override_settings
from django.utils.six import BytesIO
from permissions.classes import Permission
from permissions.models import Role
@@ -18,10 +19,14 @@ from user_management.tests.literals import (
)
from ..literals import DEFAULT_DELETE_PERIOD, DEFAULT_DELETE_TIME_UNIT
from ..models import DeletedDocument, Document, DocumentType
from ..permissions import permission_document_properties_edit
from ..models import DeletedDocument, Document, DocumentType, HASH_FUNCTION
from ..permissions import (
permission_document_download, permission_document_properties_edit
)
from .literals import TEST_SMALL_DOCUMENT_PATH, TEST_DOCUMENT_TYPE
from .literals import (
TEST_DOCUMENT_TYPE, TEST_SMALL_DOCUMENT_CHECKSUM, TEST_SMALL_DOCUMENT_PATH
)
TEST_DOCUMENT_TYPE_EDITED_LABEL = 'test document type edited label'
TEST_DOCUMENT_TYPE_2_LABEL = 'test document type 2 label'
@@ -191,7 +196,6 @@ class DocumentsViewsTestCase(TestCase):
)
def test_document_multiple_document_type_change_user_view(self):
self.client.logout()
logged_in = self.client.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
@@ -242,6 +246,122 @@ class DocumentsViewsTestCase(TestCase):
Document.objects.first().document_type, document_type
)
def test_document_download_user_view(self):
logged_in = self.client.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
self.assertTrue(logged_in)
self.assertTrue(self.user.is_authenticated())
self.assertEqual(Document.objects.count(), 1)
response = self.client.post(
reverse(
'documents:document_download', args=(self.document.pk,)
)
)
self.assertEqual(response.status_code, 302)
self.role.permissions.add(
permission_document_download.stored_permission
)
response = self.client.post(
reverse(
'documents:document_download', args=(self.document.pk,)
)
)
self.assertEqual(response.status_code, 200)
buf = BytesIO()
buf.write(response.content)
self.assertEqual(
HASH_FUNCTION(buf.getvalue()), TEST_SMALL_DOCUMENT_CHECKSUM
)
del(buf)
def test_document_multiple_download_user_view(self):
logged_in = self.client.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
self.assertTrue(logged_in)
self.assertTrue(self.user.is_authenticated())
self.assertEqual(Document.objects.count(), 1)
response = self.client.post(
reverse('documents:document_multiple_download'),
data={'id_list': self.document.pk}
)
self.assertEqual(response.status_code, 302)
self.role.permissions.add(
permission_document_download.stored_permission
)
response = self.client.post(
reverse('documents:document_multiple_download'),
data={'id_list': self.document.pk}
)
self.assertEqual(response.status_code, 200)
buf = BytesIO()
buf.write(response.content)
self.assertEqual(
HASH_FUNCTION(buf.getvalue()), TEST_SMALL_DOCUMENT_CHECKSUM
)
del(buf)
def test_document_version_download_user_view(self):
logged_in = self.client.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
self.assertTrue(logged_in)
self.assertTrue(self.user.is_authenticated())
self.assertEqual(Document.objects.count(), 1)
response = self.client.post(
reverse(
'documents:document_version_download', args=(
self.document.latest_version.pk,
)
)
)
self.assertEqual(response.status_code, 302)
self.role.permissions.add(
permission_document_download.stored_permission
)
response = self.client.post(
reverse(
'documents:document_version_download', args=(
self.document.latest_version.pk,
)
)
)
self.assertEqual(response.status_code, 200)
buf = BytesIO()
buf.write(response.content)
self.assertEqual(
HASH_FUNCTION(buf.getvalue()), TEST_SMALL_DOCUMENT_CHECKSUM
)
del(buf)
@override_settings(OCR_AUTO_OCR=False)
class DocumentTypeViewsTestCase(TestCase):

View File

@@ -731,16 +731,41 @@ def document_download(request, document_id=None, document_id_list=None, document
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL))))
if document_id:
document_versions = [get_object_or_404(Document, pk=document_id).latest_version]
documents = Document.objects.filter(pk=document_id)
elif document_id_list:
document_versions = [get_object_or_404(Document, pk=document_id).latest_version for document_id in document_id_list.split(',')]
documents = Document.objects.filter(pk__in=document_id_list)
elif document_version_pk:
document_versions = [get_object_or_404(DocumentVersion, pk=document_version_pk)]
documents = Document.objects.filter(
pk=get_object_or_404(
DocumentVersion, pk=document_version_pk
).document.pk
)
try:
Permission.check_permissions(request.user, (permission_document_download,))
Permission.check_permissions(
request.user, (permission_document_download,)
)
except PermissionDenied:
document_versions = AccessControlList.objects.filter_by_access(permission_document_download, request.user, document_versions, related='document')
documents = AccessControlList.objects.filter_by_access(
permission_document_download, request.user, documents
)
if not documents:
messages.error(
request, _('Must provide at least one document or version.')
)
return HttpResponseRedirect(
request.META.get(
'HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL)
)
)
if document_version_pk:
queryset = DocumentVersion.objects.filter(pk=document_version_pk)
else:
queryset = DocumentVersion.objects.filter(
pk__in=[document.latest_version.pk for document in documents]
)
subtemplates_list = []
subtemplates_list.append(
@@ -748,7 +773,7 @@ def document_download(request, document_id=None, document_id_list=None, document
'name': 'appearance/generic_list_subtemplate.html',
'context': {
'title': _('Documents to be downloaded'),
'object_list': document_versions,
'object_list': queryset,
'hide_link': True,
'hide_object': True,
'hide_links': True,
@@ -765,21 +790,26 @@ def document_download(request, document_id=None, document_id_list=None, document
)
if request.method == 'POST':
form = DocumentDownloadForm(request.POST, document_versions=document_versions)
form = DocumentDownloadForm(request.POST, queryset=queryset)
if form.is_valid():
if form.cleaned_data['compressed'] or len(document_versions) > 1:
if form.cleaned_data['compressed'] or queryset.count() > 1:
try:
compressed_file = CompressedFile()
for document_version in document_versions:
for document_version in queryset:
descriptor = document_version.open()
compressed_file.add_file(descriptor, arcname=document_version.document.label)
compressed_file.add_file(
descriptor,
arcname=document_version.document.label
)
descriptor.close()
compressed_file.close()
return serve_file(
request,
compressed_file.as_file(form.cleaned_data['zip_filename']),
compressed_file.as_file(
form.cleaned_data['zip_filename']
),
save_as='"%s"' % form.cleaned_data['zip_filename'],
content_type='application/zip'
)
@@ -788,38 +818,42 @@ def document_download(request, document_id=None, document_id_list=None, document
raise
else:
messages.error(request, exception)
return HttpResponseRedirect(request.META['HTTP_REFERER'])
return HttpResponseRedirect(
request.META['HTTP_REFERER']
)
else:
try:
# Test permissions and trigger exception
fd = document_versions[0].open()
fd = queryset.first().open()
fd.close()
return serve_file(
request,
document_versions[0].file,
save_as='"%s"' % document_versions[0].document.label,
content_type=document_versions[0].mimetype if document_versions[0].mimetype else 'application/octet-stream'
queryset.first().file,
save_as='"%s"' % queryset.first().document.label,
content_type=queryset.first().mimetype if queryset.first().mimetype else 'application/octet-stream'
)
except Exception as exception:
if settings.DEBUG:
raise
else:
messages.error(request, exception)
return HttpResponseRedirect(request.META['HTTP_REFERER'])
return HttpResponseRedirect(
request.META['HTTP_REFERER']
)
else:
form = DocumentDownloadForm(document_versions=document_versions)
form = DocumentDownloadForm(queryset=queryset)
context = {
'form': form,
'previous': previous,
'submit_label': _('Download'),
'subtemplates_list': subtemplates_list,
'title': _('Download documents'),
'submit_label': _('Download'),
'previous': previous,
}
if len(document_versions) == 1:
context['object'] = document_versions[0].document
if queryset.count() == 1:
context['object'] = queryset.first().document
return render_to_response(
'appearance/generic_form.html',
@@ -830,7 +864,9 @@ def document_download(request, document_id=None, document_id_list=None, document
def document_multiple_download(request):
return document_download(
request, document_id_list=request.GET.get('id_list', [])
request, document_id_list=request.GET.get(
'id_list', request.POST.get('id_list', '')
).split(',')
)