Add first set of document signatures API views

Add list, create, detail and edit API views for detached and embedded
signatures.

Signed-off-by: Roberto Rosario <roberto.rosario@mayan-edms.com>
This commit is contained in:
Roberto Rosario
2019-11-04 19:46:32 -04:00
parent d12fa430dc
commit 0167ff24b8
20 changed files with 1256 additions and 283 deletions

View File

@@ -110,6 +110,8 @@
own module.
- Update label and icon of the document sign form
Label updated from "Save" to "Sign".
- Add list, create, detail and edit API views for
detached and embedded signatures.
3.2.9 (2019-11-03)
==================

View File

@@ -26,7 +26,7 @@ TEST_FILE = os.path.join(
'test_files', 'test_file.txt'
)
TEST_KEY_DATA = '''-----BEGIN PGP PRIVATE KEY BLOCK-----
TEST_KEY_PRIVATE_DATA = '''-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1
lQO+BFbxfC8BCACnUZoD96W4+CSIaU9G8I08kXu2zJLzy2XgUtwLx8VQ8dOHr0E/
@@ -86,9 +86,9 @@ h4oCbUV5JHhOyB+89Y1w8haFU9LrgOER2kXff1xU6wMfLdcO5ApV/sRJcNdYL7Cg
=JZ5G
-----END PGP PRIVATE KEY BLOCK-----'''
TEST_KEY_ID = '4125E9C571F378AC'
TEST_KEY_FINGERPRINT = '6A24574E0A35004CDDFD22704125E9C571F378AC'
TEST_KEY_PASSPHRASE = 'testpassphrase'
TEST_KEY_PRIVATE_ID = '4125E9C571F378AC'
TEST_KEY_PRIVATE_FINGERPRINT = '6A24574E0A35004CDDFD22704125E9C571F378AC'
TEST_KEY_PRIVATE_PASSPHRASE = 'testpassphrase'
TEST_KEYSERVERS = ['pool.sks-keyservers.net']

View File

@@ -2,9 +2,48 @@ from __future__ import unicode_literals
from ..models import Key
from .literals import TEST_KEY_DATA
from .literals import TEST_KEY_PRIVATE_DATA
class KeyAPIViewTestMixin(object):
def _request_test_key_create_view(self):
return self.post(
viewname='rest_api:key-list', data={
'key_data': TEST_KEY_PRIVATE_DATA
}
)
def _request_test_key_delete_view(self):
return self.delete(
viewname='rest_api:key-detail', kwargs={
'pk': self.test_key_private.pk
}
)
def _request_test_key_detail_view(self):
return self.get(
viewname='rest_api:key-detail', kwargs={
'pk': self.test_key_private.pk
}
)
class KeyTestMixin(object):
def _create_test_key(self):
self.test_key = Key.objects.create(key_data=TEST_KEY_DATA)
def _create_test_key_private(self):
self.test_key_private = Key.objects.create(
key_data=TEST_KEY_PRIVATE_DATA
)
class KeyViewTestMixin(object):
def _request_test_key_download_view(self):
return self.get(
viewname='django_gpg:key_download', kwargs={'pk': self.test_key_private.pk}
)
def _request_test_key_upload_view(self):
return self.post(
viewname='django_gpg:key_upload', data={
'key_data': TEST_KEY_PRIVATE_DATA
}
)

View File

@@ -9,27 +9,8 @@ from ..permissions import (
permission_key_delete, permission_key_upload, permission_key_view
)
from .literals import TEST_KEY_DATA, TEST_KEY_FINGERPRINT
from .mixins import KeyTestMixin
class KeyAPIViewTestMixin(object):
def _request_test_key_create_view(self):
return self.post(
viewname='rest_api:key-list', data={
'key_data': TEST_KEY_DATA
}
)
def _request_test_key_delete_view(self):
return self.delete(
viewname='rest_api:key-detail', kwargs={'pk': self.test_key.pk}
)
def _request_test_key_detail_view(self):
return self.get(
viewname='rest_api:key-detail', kwargs={'pk': self.test_key.pk}
)
from .literals import TEST_KEY_PRIVATE_FINGERPRINT
from .mixins import KeyAPIViewTestMixin, KeyTestMixin
class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase):
@@ -44,14 +25,16 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase):
response = self._request_test_key_create_view()
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.data['fingerprint'], TEST_KEY_FINGERPRINT)
self.assertEqual(
response.data['fingerprint'], TEST_KEY_PRIVATE_FINGERPRINT
)
key = Key.objects.first()
self.assertEqual(Key.objects.count(), 1)
self.assertEqual(key.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(key.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)
def test_key_delete_view_no_access(self):
self._create_test_key()
self._create_test_key_private()
response = self._request_test_key_delete_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
@@ -59,9 +42,9 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase):
self.assertEqual(Key.objects.count(), 1)
def test_key_delete_view_with_access(self):
self._create_test_key()
self._create_test_key_private()
self.grant_access(
obj=self.test_key, permission=permission_key_delete
obj=self.test_key_private, permission=permission_key_delete
)
response = self._request_test_key_delete_view()
@@ -70,19 +53,19 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase):
self.assertEqual(Key.objects.count(), 0)
def test_key_detail_view_no_access(self):
self._create_test_key()
self._create_test_key_private()
response = self._request_test_key_detail_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_key_detail_view_with_access(self):
self._create_test_key()
self._create_test_key_private()
self.grant_access(
obj=self.test_key, permission=permission_key_view
obj=self.test_key_private, permission=permission_key_view
)
response = self._request_test_key_detail_view()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data['fingerprint'], self.test_key.fingerprint
response.data['fingerprint'], self.test_key_private.fingerprint
)

View File

@@ -18,9 +18,9 @@ from ..models import Key
from .literals import (
MOCK_SEARCH_KEYS_RESPONSE, TEST_DETACHED_SIGNATURE, TEST_FILE,
TEST_KEY_DATA, TEST_KEY_FINGERPRINT, TEST_KEY_PASSPHRASE,
TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID, TEST_SIGNED_FILE,
TEST_SIGNED_FILE_CONTENT
TEST_KEY_PRIVATE_DATA, TEST_KEY_PRIVATE_FINGERPRINT,
TEST_KEY_PRIVATE_PASSPHRASE, TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID,
TEST_SIGNED_FILE, TEST_SIGNED_FILE_CONTENT
)
from .mocks import mock_recv_keys
@@ -28,9 +28,9 @@ from .mocks import mock_recv_keys
class KeyTestCase(BaseTestCase):
def test_key_instance_creation(self):
# Creating a Key instance is analogous to importing a key
key = Key.objects.create(key_data=TEST_KEY_DATA)
key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
self.assertEqual(key.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(key.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)
@mock.patch.object(gnupg.GPG, 'search_keys', autospec=True)
def test_key_search(self, search_keys):
@@ -69,36 +69,36 @@ class KeyTestCase(BaseTestCase):
with open(TEST_SIGNED_FILE, mode='rb') as signed_file:
result = Key.objects.verify_file(signed_file)
self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT)
self.assertTrue(result.key_id in TEST_KEY_PRIVATE_FINGERPRINT)
def test_embedded_verification_with_key(self):
Key.objects.create(key_data=TEST_KEY_DATA)
Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_SIGNED_FILE, mode='rb') as signed_file:
result = Key.objects.verify_file(signed_file)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)
def test_embedded_verification_with_correct_fingerprint(self):
Key.objects.create(key_data=TEST_KEY_DATA)
Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_SIGNED_FILE, mode='rb') as signed_file:
result = Key.objects.verify_file(
signed_file, key_fingerprint=TEST_KEY_FINGERPRINT
signed_file, key_fingerprint=TEST_KEY_PRIVATE_FINGERPRINT
)
self.assertTrue(result.valid)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)
def test_embedded_verification_with_incorrect_fingerprint(self):
Key.objects.create(key_data=TEST_KEY_DATA)
Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_SIGNED_FILE, mode='rb') as signed_file:
with self.assertRaises(KeyDoesNotExist):
Key.objects.verify_file(signed_file, key_fingerprint='999')
def test_signed_file_decryption(self):
Key.objects.create(key_data=TEST_KEY_DATA)
Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_SIGNED_FILE, mode='rb') as signed_file:
result = Key.objects.decrypt_file(file_object=signed_file)
@@ -122,10 +122,10 @@ class KeyTestCase(BaseTestCase):
file_object=test_file, signature_file=signature_file
)
self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT)
self.assertTrue(result.key_id in TEST_KEY_PRIVATE_FINGERPRINT)
def test_detached_verification_with_key(self):
Key.objects.create(key_data=TEST_KEY_DATA)
Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_DETACHED_SIGNATURE, mode='rb') as signature_file:
with open(TEST_FILE, mode='rb') as test_file:
@@ -134,10 +134,10 @@ class KeyTestCase(BaseTestCase):
)
self.assertTrue(result)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)
def test_detached_signing_no_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with self.assertRaises(NeedPassphrase):
with open(TEST_FILE, mode='rb') as test_file:
@@ -146,7 +146,7 @@ class KeyTestCase(BaseTestCase):
)
def test_detached_signing_bad_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with self.assertRaises(PassphraseError):
with open(TEST_FILE, mode='rb') as test_file:
@@ -156,12 +156,12 @@ class KeyTestCase(BaseTestCase):
)
def test_detached_signing_with_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA)
with open(TEST_FILE, mode='rb') as test_file:
detached_signature = key.sign_file(
file_object=test_file, detached=True,
passphrase=TEST_KEY_PASSPHRASE
passphrase=TEST_KEY_PRIVATE_PASSPHRASE
)
signature_file = io.BytesIO()
@@ -175,4 +175,4 @@ class KeyTestCase(BaseTestCase):
signature_file.close()
self.assertTrue(result)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT)

View File

@@ -7,25 +7,13 @@ from mayan.apps.common.tests.base import GenericViewTestCase
from ..models import Key
from ..permissions import permission_key_download, permission_key_upload
from .literals import TEST_KEY_DATA, TEST_KEY_FINGERPRINT
from .mixins import KeyTestMixin
class KeyViewTestMixin(object):
def _request_test_key_download_view(self):
return self.get(
viewname='django_gpg:key_download', kwargs={'pk': self.test_key.pk}
)
def _request_test_key_upload_view(self):
return self.post(
viewname='django_gpg:key_upload', data={'key_data': TEST_KEY_DATA}
)
from .literals import TEST_KEY_PRIVATE_FINGERPRINT
from .mixins import KeyTestMixin, KeyViewTestMixin
class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase):
def test_key_download_view_no_permission(self):
self._create_test_key()
self._create_test_key_private()
response = self._request_test_key_download_view()
self.assertEqual(response.status_code, 403)
@@ -33,16 +21,16 @@ class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase):
def test_key_download_view_with_permission(self):
self.expected_content_type = 'application/octet-stream; charset=utf-8'
self._create_test_key()
self._create_test_key_private()
self.grant_access(
obj=self.test_key, permission=permission_key_download
obj=self.test_key_private, permission=permission_key_download
)
response = self._request_test_key_download_view()
assert_download_response(
self, response=response, content=self.test_key.key_data,
basename=self.test_key.key_id,
self, response=response, content=self.test_key_private.key_data,
basename=self.test_key_private.key_id,
)
def test_key_upload_view_no_permission(self):
@@ -59,5 +47,5 @@ class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase):
self.assertEqual(Key.objects.count(), 1)
self.assertEqual(
Key.objects.first().fingerprint, TEST_KEY_FINGERPRINT
Key.objects.first().fingerprint, TEST_KEY_PRIVATE_FINGERPRINT
)

View File

@@ -0,0 +1,252 @@
from __future__ import absolute_import, unicode_literals
from django.shortcuts import get_object_or_404
from mayan.apps.acls.models import AccessControlList
from mayan.apps.documents.models import Document
from mayan.apps.rest_api import generics
from .models import DetachedSignature, EmbeddedSignature
from .permissions import (
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
permission_document_version_signature_delete,
permission_document_version_signature_view
)
from .serializers import (
DetachedSignatureSerializer, EmbeddedSignatureSerializer
)
class APIDocumentDetachedSignatureListView(generics.ListCreateAPIView):
"""
get: Returns a list of all the detached signatures of a document version.
post: Create an detached signature for a document version.
"""
serializer_class = DetachedSignatureSerializer
def get_document(self):
return get_object_or_404(
klass=self.get_document_queryset(), pk=self.kwargs['document_id']
)
def get_document_queryset(self):
if self.request.method == 'GET':
permission = permission_document_version_signature_view
elif self.request.method == 'POST':
permission = permission_document_version_sign_detached
return AccessControlList.objects.restrict_queryset(
permission=permission, queryset=Document.objects.all(),
user=self.request.user
)
def get_document_version(self):
return get_object_or_404(
klass=self.get_document_version_queryset(),
pk=self.kwargs['document_version_id']
)
def get_document_version_queryset(self):
return self.get_document().versions.all()
def get_queryset(self):
return DetachedSignature.objects.filter(
document_version=self.get_document_version()
)
def get_serializer(self, *args, **kwargs):
if not self.request:
return None
return super(
APIDocumentDetachedSignatureListView, self
).get_serializer(*args, **kwargs)
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
context = super(
APIDocumentDetachedSignatureListView, self
).get_serializer_context()
if self.kwargs:
context.update(
{
'document_version': self.get_document_version(),
}
)
return context
class APIDocumentDetachedSignatureView(generics.RetrieveDestroyAPIView):
"""
delete: Delete an detached signature of the selected document.
get: Returns the details of the selected detached signature.
"""
lookup_url_kwarg = 'detached_signature_id'
serializer_class = DetachedSignatureSerializer
def get_document(self):
return get_object_or_404(
klass=self.get_document_queryset(), pk=self.kwargs['document_id']
)
def get_document_queryset(self):
if self.request.method == 'GET':
permission = permission_document_version_signature_view
elif self.request.method == 'POST':
permission = permission_document_version_signature_view
elif self.request.method == 'DELETE':
permission = permission_document_version_signature_delete
return AccessControlList.objects.restrict_queryset(
permission=permission, queryset=Document.objects.all(),
user=self.request.user
)
def get_document_version(self):
return get_object_or_404(
klass=self.get_document_version_queryset(),
pk=self.kwargs['document_version_id']
)
def get_document_version_queryset(self):
return self.get_document().versions.all()
def get_queryset(self):
return DetachedSignature.objects.filter(
document_version=self.get_document_version()
)
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
context = super(APIDocumentDetachedSignatureView, self).get_serializer_context()
if self.kwargs:
context.update(
{
'document': self.get_document(),
}
)
return context
class APIDocumentEmbeddedSignatureListView(generics.ListCreateAPIView):
"""
get: Returns a list of all the embedded signatures of a document version.
post: Create an embedded signature for a document version.
"""
serializer_class = EmbeddedSignatureSerializer
def get_document(self):
return get_object_or_404(
klass=self.get_document_queryset(), pk=self.kwargs['document_id']
)
def get_document_queryset(self):
if self.request.method == 'GET':
permission = permission_document_version_signature_view
elif self.request.method == 'POST':
permission = permission_document_version_sign_embedded
return AccessControlList.objects.restrict_queryset(
permission=permission, queryset=Document.objects.all(),
user=self.request.user
)
def get_document_version(self):
return get_object_or_404(
klass=self.get_document_version_queryset(),
pk=self.kwargs['document_version_id']
)
def get_document_version_queryset(self):
return self.get_document().versions.all()
def get_queryset(self):
return EmbeddedSignature.objects.filter(
document_version=self.get_document_version()
)
def get_serializer(self, *args, **kwargs):
if not self.request:
return None
return super(
APIDocumentEmbeddedSignatureListView, self
).get_serializer(*args, **kwargs)
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
context = super(
APIDocumentEmbeddedSignatureListView, self
).get_serializer_context()
if self.kwargs:
context.update(
{
'document_version': self.get_document_version(),
}
)
return context
class APIDocumentEmbeddedSignatureView(generics.RetrieveDestroyAPIView):
"""
delete: Delete an embedded signature of the selected document.
get: Returns the details of the selected embedded signature.
"""
lookup_url_kwarg = 'embedded_signature_id'
serializer_class = EmbeddedSignatureSerializer
def get_document(self):
return get_object_or_404(
klass=self.get_document_queryset(), pk=self.kwargs['document_id']
)
def get_document_queryset(self):
if self.request.method == 'GET':
permission = permission_document_version_signature_view
elif self.request.method == 'POST':
permission = permission_document_version_signature_view
elif self.request.method == 'DELETE':
permission = permission_document_version_signature_delete
return AccessControlList.objects.restrict_queryset(
permission=permission, queryset=Document.objects.all(),
user=self.request.user
)
def get_document_version(self):
return get_object_or_404(
klass=self.get_document_version_queryset(),
pk=self.kwargs['document_version_id']
)
def get_document_version_queryset(self):
return self.get_document().versions.all()
def get_queryset(self):
return EmbeddedSignature.objects.filter(
document_version=self.get_document_version()
)
def get_serializer_context(self):
"""
Extra context provided to the serializer class.
"""
context = super(APIDocumentEmbeddedSignatureView, self).get_serializer_context()
if self.kwargs:
context.update(
{
'document': self.get_document(),
}
)
return context

View File

@@ -42,6 +42,7 @@ logger = logging.getLogger(__name__)
class DocumentSignaturesApp(MayanAppConfig):
app_namespace = 'signatures'
app_url = 'signatures'
has_rest_api = True
has_tests = True
name = 'mayan.apps.document_signatures'
verbose_name = _('Document signatures')

View File

@@ -3,6 +3,7 @@ from __future__ import unicode_literals
import logging
import os
from django.core.files import File
from django.db import models
from mayan.apps.django_gpg.exceptions import DecryptionError
@@ -13,6 +14,29 @@ from mayan.apps.storage.utils import mkstemp
logger = logging.getLogger(__name__)
class DetachedSignatureManager(models.Manager):
def sign_document_version(
self, document_version, key, passphrase=None, user=None
):
temporary_file_object, temporary_filename = mkstemp()
try:
with document_version.open() as file_object:
key.sign_file(
binary=True, detached=True, file_object=file_object,
output=temporary_filename, passphrase=passphrase
)
except Exception:
raise
else:
return self.create(
document_version=document_version,
signature_file=File(temporary_file_object)
)
finally:
os.unlink(temporary_filename)
class EmbeddedSignatureManager(models.Manager):
def open_signed(self, file_object, document_version):
for signature in self.filter(document_version=document_version):
@@ -28,7 +52,9 @@ class EmbeddedSignatureManager(models.Manager):
else:
return file_object
def sign_document_version(self, document_version, key, passphrase=None, user=None):
def sign_document_version(
self, document_version, key, passphrase=None, user=None
):
temporary_file_object, temporary_filename = mkstemp()
try:
@@ -44,11 +70,13 @@ class EmbeddedSignatureManager(models.Manager):
new_version = document_version.document.new_version(
file_object=file_object, _user=user
)
# This is a potential race condition but we have not way
# to access the final signature at this point.
signature = self.filter(document_version=new_version).first()
return signature or self.none()
finally:
os.unlink(temporary_filename)
return new_version
def unsigned_document_versions(self):
return DocumentVersion.objects.exclude(
pk__in=self.values('document_version')

View File

@@ -14,7 +14,7 @@ from mayan.apps.django_gpg.exceptions import VerificationError
from mayan.apps.django_gpg.models import Key
from mayan.apps.documents.models import DocumentVersion
from .managers import EmbeddedSignatureManager
from .managers import DetachedSignatureManager, EmbeddedSignatureManager
from .storages import storage_detachedsignature
logger = logging.getLogger(__name__)
@@ -57,6 +57,7 @@ class SignatureBaseModel(models.Model):
objects = InheritanceManager()
class Meta:
ordering = ('pk',)
verbose_name = _('Document version signature')
verbose_name_plural = _('Document version signatures')
@@ -131,8 +132,7 @@ class DetachedSignature(SignatureBaseModel):
upload_to=upload_to, verbose_name=_('Signature file')
)
# Don't inherit the SignatureBaseModel manager
objects = models.Manager()
objects = DetachedSignatureManager()
class Meta:
verbose_name = _('Document version detached signature')

View File

@@ -0,0 +1,147 @@
from __future__ import absolute_import, unicode_literals
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from mayan.apps.acls.models import AccessControlList
from mayan.apps.django_gpg.models import Key
from mayan.apps.django_gpg.permissions import permission_key_sign
from mayan.apps.rest_api.relations import MultiKwargHyperlinkedIdentityField
from .models import DetachedSignature, EmbeddedSignature
class DetachedSignatureSerializer(serializers.HyperlinkedModelSerializer):
document_version_url = MultiKwargHyperlinkedIdentityField(
view_kwargs=(
{
'lookup_field': 'document_version_id',
'lookup_url_kwarg': 'version_pk',
},
{
'lookup_field': 'document_version.document.pk',
'lookup_url_kwarg': 'pk',
}
),
view_name='rest_api:documentversion-detail'
)
url = MultiKwargHyperlinkedIdentityField(
view_kwargs=(
{
'lookup_field': 'document_version.document.pk',
'lookup_url_kwarg': 'document_id',
},
{
'lookup_field': 'document_version_id',
'lookup_url_kwarg': 'document_version_id',
},
{
'lookup_field': 'pk',
'lookup_url_kwarg': 'detached_signature_id',
},
),
view_name='rest_api:detachedsignature-detail'
)
passphrase = serializers.CharField(required=False, write_only=True)
class Meta:
fields = (
'date', 'document_version_url', 'key_id', 'signature_id',
'passphrase', 'public_key_fingerprint', 'url'
)
model = DetachedSignature
def create(self, validated_data):
key_id = validated_data.pop('key_id')
passphrase = validated_data.pop('passphrase', None)
key_queryset = AccessControlList.objects.restrict_queryset(
permission=permission_key_sign, queryset=Key.objects.all(),
user=self.context['request'].user
)
try:
key = key_queryset.get(fingerprint__endswith=key_id)
except Key.DoesNotExist:
raise ValidationError(
{
'key_id': [
'Key "{}" not found.'.format(key_id)
]
}, code='invalid'
)
return DetachedSignature.objects.sign_document_version(
document_version=self.context['document_version'], key=key,
passphrase=passphrase, user=self.context['request'].user
)
class EmbeddedSignatureSerializer(serializers.HyperlinkedModelSerializer):
document_version_url = MultiKwargHyperlinkedIdentityField(
view_kwargs=(
{
'lookup_field': 'document_version_id',
'lookup_url_kwarg': 'version_pk',
},
{
'lookup_field': 'document_version.document.pk',
'lookup_url_kwarg': 'pk',
}
),
view_name='rest_api:documentversion-detail'
)
url = MultiKwargHyperlinkedIdentityField(
view_kwargs=(
{
'lookup_field': 'document_version.document.pk',
'lookup_url_kwarg': 'document_id',
},
{
'lookup_field': 'document_version_id',
'lookup_url_kwarg': 'document_version_id',
},
{
'lookup_field': 'pk',
'lookup_url_kwarg': 'embedded_signature_id',
},
),
view_name='rest_api:embeddedsignature-detail'
)
passphrase = serializers.CharField(required=False, write_only=True)
class Meta:
fields = (
'date', 'document_version_url', 'key_id', 'signature_id',
'passphrase', 'public_key_fingerprint', 'url'
)
model = EmbeddedSignature
def create(self, validated_data):
key_id = validated_data.pop('key_id')
passphrase = validated_data.pop('passphrase', None)
key_queryset = AccessControlList.objects.restrict_queryset(
permission=permission_key_sign, queryset=Key.objects.all(),
user=self.context['request'].user
)
try:
key = key_queryset.get(fingerprint__endswith=key_id)
except Key.DoesNotExist:
raise ValidationError(
{
'key_id': [
'Key "{}" not found.'.format(key_id)
]
}, code='invalid'
)
signature = EmbeddedSignature.objects.sign_document_version(
document_version=self.context['document_version'], key=key,
passphrase=passphrase, user=self.context['request'].user
)
return signature

View File

@@ -12,9 +12,9 @@ TEST_SIGNATURE_FILE_PATH = os.path.join(
settings.BASE_DIR, 'apps', 'document_signatures', 'tests', 'contrib',
'sample_documents', 'mayan_11_1.pdf.sig'
)
TEST_KEY_FILE = os.path.join(
TEST_KEY_FILE_PATH = os.path.join(
settings.BASE_DIR, 'apps', 'document_signatures', 'tests', 'contrib',
'sample_documents', 'key0x5F3F7F75D210724D.asc'
)
TEST_KEY_ID = '5F3F7F75D210724D'
TEST_KEY_PUBLIC_ID = '5F3F7F75D210724D'
TEST_SIGNATURE_ID = 'XVkoGKw35yU1iq11dZPiv7uAY7k'

View File

@@ -3,13 +3,116 @@ from __future__ import absolute_import, unicode_literals
from django.core.files import File
from mayan.apps.django_gpg.models import Key
from mayan.apps.django_gpg.tests.literals import TEST_KEY_PRIVATE_PASSPHRASE
from ..models import DetachedSignature
from .literals import TEST_KEY_FILE, TEST_SIGNATURE_FILE_PATH
from .literals import TEST_KEY_FILE_PATH, TEST_SIGNATURE_FILE_PATH
class SignaturesTestMixin(object):
class DetachedSignatureAPIViewTestMixin(object):
def _request_test_document_signature_detached_create_view(self):
return self.post(
viewname='rest_api:document-version-signature-detached-list',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk
}, data={
'key_id': self.test_key_private.key_id,
'passphrase': TEST_KEY_PRIVATE_PASSPHRASE
}
)
def _request_test_document_signature_detached_delete_view(self):
return self.delete(
viewname='rest_api:detachedsignature-detail',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk,
'detached_signature_id': self.test_document_version.signatures.first().pk
}
)
def _request_test_document_signature_detached_detail_view(self):
return self.get(
viewname='rest_api:detachedsignature-detail',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk,
'detached_signature_id': self.test_document_version.signatures.first().pk
}
)
def _request_test_document_signature_detached_list_view(self):
return self.get(
viewname='rest_api:document-version-signature-detached-list',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk
}
)
class DetachedSignatureViewTestMixin(object):
def _request_test_document_version_signature_download_view(self):
return self.get(
viewname='signatures:document_version_signature_download',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_upload_view(self):
with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object:
return self.post(
viewname='signatures:document_version_signature_upload',
kwargs={'pk': self.test_document.latest_version.pk},
data={'signature_file': file_object}
)
class EmbeddedSignatureAPIViewTestMixin(object):
def _request_test_document_signature_embedded_create_view(self):
return self.post(
viewname='rest_api:document-version-signature-embedded-list',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk
}, data={
'key_id': self.test_key_private.key_id,
'passphrase': TEST_KEY_PRIVATE_PASSPHRASE
}
)
def _request_test_document_signature_embedded_delete_view(self):
return self.delete(
viewname='rest_api:embeddedsignature-detail',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk,
'embedded_signature_id': self.test_document_version.signatures.first().pk
}
)
def _request_test_document_signature_embedded_detail_view(self):
return self.get(
viewname='rest_api:embeddedsignature-detail',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk,
'embedded_signature_id': self.test_document_version.signatures.first().pk
}
)
def _request_test_document_signature_embedded_list_view(self):
return self.get(
viewname='rest_api:document-version-signature-embedded-list',
kwargs={
'document_id': self.test_document.pk,
'document_version_id': self.test_document_version.pk
}
)
class SignatureTestMixin(object):
def _create_test_detached_signature(self):
with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object:
self.test_signature = DetachedSignature.objects.create(
@@ -17,6 +120,33 @@ class SignaturesTestMixin(object):
signature_file=File(file_object)
)
def _create_test_key(self):
with open(TEST_KEY_FILE, mode='rb') as file_object:
self.test_key = Key.objects.create(key_data=file_object.read())
def _create_test_public_key(self):
with open(TEST_KEY_FILE_PATH, mode='rb') as file_object:
self.test_key_public = Key.objects.create(
key_data=file_object.read()
)
class SignatureViewTestMixin(object):
def _request_test_document_version_signature_delete_view(self):
return self.post(
viewname='signatures:document_version_signature_delete',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_details_view(self):
return self.get(
viewname='signatures:document_version_signature_details',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_list_view(self, document):
return self.get(
viewname='signatures:document_version_signature_list',
kwargs={'pk': self.test_document.latest_version.pk}
)
def _request_all_test_document_version_signature_verify_view(self):
return self.post(
viewname='signatures:all_document_version_signature_verify'
)

View File

@@ -0,0 +1,341 @@
from __future__ import unicode_literals
from rest_framework import status
from mayan.apps.django_gpg.permissions import permission_key_sign
from mayan.apps.django_gpg.tests.mixins import KeyTestMixin
from mayan.apps.documents.tests.mixins import DocumentTestMixin
from mayan.apps.rest_api.tests.base import BaseAPITestCase
from ..permissions import (
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
permission_document_version_signature_delete,
permission_document_version_signature_view
)
from .literals import TEST_KEY_PUBLIC_ID, TEST_SIGNED_DOCUMENT_PATH
from .mixins import (
DetachedSignatureAPIViewTestMixin, EmbeddedSignatureAPIViewTestMixin,
SignatureTestMixin
)
class DetachedSignatureDocumentAPIViewTestCase(
DocumentTestMixin, DetachedSignatureAPIViewTestMixin,
KeyTestMixin, SignatureTestMixin, BaseAPITestCase
):
auto_upload_document = False
def test_document_signature_detached_delete_no_permission(self):
self.upload_document()
self._create_test_detached_signature()
signatures = self.test_document.latest_version.signatures.count()
response = self._request_test_document_signature_detached_delete_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_detached_delete_with_access(self):
self.upload_document()
self._create_test_detached_signature()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_delete
)
response = self._request_test_document_signature_detached_delete_view()
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures - 1
)
def test_document_signature_detached_detail_no_permission(self):
self.upload_document()
self._create_test_detached_signature()
response = self._request_test_document_signature_detached_detail_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_document_signature_detached_detail_with_access(self):
self.upload_document()
self._create_test_detached_signature()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_view
)
response = self._request_test_document_signature_detached_detail_view()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data['key_id'], TEST_KEY_PUBLIC_ID
)
def test_document_signature_detached_create_view_no_permission(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
response = self._request_test_document_signature_detached_create_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_detached_create_view_with_document_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_sign_detached
)
response = self._request_test_document_signature_detached_create_view()
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_detached_create_view_with_key_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_key_private,
permission=permission_key_sign
)
response = self._request_test_document_signature_detached_create_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_detached_create_view_with_full_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_sign_detached
)
self.grant_access(
obj=self.test_key_private,
permission=permission_key_sign
)
response = self._request_test_document_signature_detached_create_view()
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures + 1
)
def test_document_signature_detached_list_view_no_permission(self):
self.upload_document()
self._create_test_detached_signature()
response = self._request_test_document_signature_detached_list_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_document_signature_detached_list_view_with_access(self):
self.upload_document()
self._create_test_detached_signature()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_view
)
response = self._request_test_document_signature_detached_list_view()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data['results'][0]['key_id'], TEST_KEY_PUBLIC_ID
)
class EmbeddedSignatureDocumentAPIViewTestCase(
DocumentTestMixin, EmbeddedSignatureAPIViewTestMixin,
KeyTestMixin, BaseAPITestCase
):
auto_upload_document = False
def test_document_signature_embedded_delete_no_permission(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
signatures = self.test_document.latest_version.signatures.count()
response = self._request_test_document_signature_embedded_delete_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_embedded_delete_with_access(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_delete
)
response = self._request_test_document_signature_embedded_delete_view()
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures - 1
)
def test_document_signature_embedded_detail_no_permission(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
response = self._request_test_document_signature_embedded_detail_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_document_signature_embedded_detail_with_access(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_view
)
response = self._request_test_document_signature_embedded_detail_view()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data['key_id'], TEST_KEY_PUBLIC_ID
)
def test_document_signature_embedded_create_view_no_permission(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
response = self._request_test_document_signature_embedded_create_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_embedded_create_view_with_document_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_sign_embedded
)
response = self._request_test_document_signature_embedded_create_view()
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_embedded_create_view_with_key_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_key_private,
permission=permission_key_sign
)
response = self._request_test_document_signature_embedded_create_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures
)
def test_document_signature_embedded_create_view_with_full_access(self):
self.upload_document()
self._create_test_key_private()
signatures = self.test_document.latest_version.signatures.count()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_sign_embedded
)
self.grant_access(
obj=self.test_key_private,
permission=permission_key_sign
)
response = self._request_test_document_signature_embedded_create_view()
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(
self.test_document.latest_version.signatures.count(),
signatures + 1
)
def test_document_signature_embedded_list_view_no_permission(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
response = self._request_test_document_signature_embedded_list_view()
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_document_signature_embedded_list_view_with_access(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_view
)
response = self._request_test_document_signature_embedded_list_view()
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(
response.data['results'][0]['key_id'], TEST_KEY_PUBLIC_ID
)

View File

@@ -14,10 +14,12 @@ from ..permissions import (
permission_document_version_signature_view
)
from .literals import TEST_SIGNED_DOCUMENT_PATH
from .mixins import SignaturesTestMixin
from .mixins import SignatureTestMixin
class DocumentSignatureLinksTestCase(SignaturesTestMixin, GenericDocumentViewTestCase):
class DocumentSignatureLinksTestCase(
SignatureTestMixin, GenericDocumentViewTestCase
):
def test_document_version_signature_detail_link_no_permission(self):
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()

View File

@@ -3,10 +3,8 @@ from __future__ import unicode_literals
import hashlib
import time
from mayan.apps.django_gpg.models import Key
from mayan.apps.django_gpg.tests.literals import (
TEST_KEY_DATA, TEST_KEY_PASSPHRASE
)
from mayan.apps.django_gpg.tests.literals import TEST_KEY_PRIVATE_PASSPHRASE
from mayan.apps.django_gpg.tests.mixins import KeyTestMixin
from mayan.apps.documents.models import DocumentVersion
from mayan.apps.documents.tests.base import GenericDocumentTestCase
from mayan.apps.documents.tests.literals import TEST_DOCUMENT_PATH
@@ -14,11 +12,13 @@ from mayan.apps.documents.tests.literals import TEST_DOCUMENT_PATH
from ..models import DetachedSignature, EmbeddedSignature
from ..tasks import task_verify_missing_embedded_signature
from .literals import TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_ID, TEST_SIGNATURE_ID
from .mixins import SignaturesTestMixin
from .literals import (
TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_PUBLIC_ID, TEST_SIGNATURE_ID
)
from .mixins import SignatureTestMixin
class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
class DocumentSignaturesTestCase(SignatureTestMixin, GenericDocumentTestCase):
auto_upload_document = False
def test_embedded_signature_no_key(self):
@@ -31,7 +31,7 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.assertEqual(
signature.document_version, self.test_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_post_key_verify(self):
@@ -44,17 +44,17 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.assertEqual(
signature.document_version, self.test_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(signature.signature_id, None)
self._create_test_key()
self._create_test_public_key()
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
def test_embedded_signature_post_no_key_verify(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
@@ -65,17 +65,17 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.assertEqual(
signature.document_version, self.test_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
self.test_key.delete()
self.test_key_public.delete()
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_with_key(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_SIGNED_DOCUMENT_PATH
self.upload_document()
@@ -87,9 +87,9 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
signature.document_version,
self.test_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(
signature.public_key_fingerprint, self.test_key.fingerprint
signature.public_key_fingerprint, self.test_key_public.fingerprint
)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
@@ -102,13 +102,14 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.assertEqual(DetachedSignature.objects.count(), 1)
self.assertEqual(
self.test_signature.document_version, self.test_document.latest_version
self.test_signature.document_version,
self.test_document.latest_version
)
self.assertEqual(self.test_signature.key_id, TEST_KEY_ID)
self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(self.test_signature.public_key_fingerprint, None)
def test_detached_signature_with_key(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -117,12 +118,13 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.assertEqual(DetachedSignature.objects.count(), 1)
self.assertEqual(
self.test_signature.document_version, self.test_document.latest_version
self.test_signature.document_version,
self.test_document.latest_version
)
self.assertEqual(self.test_signature.key_id, TEST_KEY_ID)
self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(
self.test_signature.public_key_fingerprint,
self.test_key.fingerprint
self.test_key_public.fingerprint
)
def test_detached_signature_post_key_verify(self):
@@ -137,19 +139,19 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.test_signature.document_version,
self.test_document.latest_version
)
self.assertEqual(self.test_signature.key_id, TEST_KEY_ID)
self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(self.test_signature.public_key_fingerprint, None)
self._create_test_key()
self._create_test_public_key()
signature = DetachedSignature.objects.first()
self.assertEqual(
signature.public_key_fingerprint, self.test_key.fingerprint
signature.public_key_fingerprint, self.test_key_public.fingerprint
)
def test_detached_signature_post_no_key_verify(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -161,13 +163,13 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
self.test_signature.document_version,
self.test_document.latest_version
)
self.assertEqual(self.test_signature.key_id, TEST_KEY_ID)
self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID)
self.assertEqual(
self.test_signature.public_key_fingerprint,
self.test_key.fingerprint
self.test_key_public.fingerprint
)
self.test_key.delete()
self.test_key_public.delete()
signature = DetachedSignature.objects.first()
@@ -198,10 +200,10 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase):
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.document_version, signed_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID)
class EmbeddedSignaturesTestCase(GenericDocumentTestCase):
class EmbeddedSignaturesTestCase(KeyTestMixin, GenericDocumentTestCase):
auto_upload_document = False
def test_unsigned_document_version_method(self):
@@ -255,7 +257,7 @@ class EmbeddedSignaturesTestCase(GenericDocumentTestCase):
)
def test_signing(self):
self.test_key = Key.objects.create(key_data=TEST_KEY_DATA)
self._create_test_key_private()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -266,15 +268,15 @@ class EmbeddedSignaturesTestCase(GenericDocumentTestCase):
file_object.seek(0)
original_hash = hashlib.sha256(file_object.read()).hexdigest()
new_version = EmbeddedSignature.objects.sign_document_version(
signature = EmbeddedSignature.objects.sign_document_version(
document_version=self.test_document.latest_version,
key=self.test_key,
passphrase=TEST_KEY_PASSPHRASE
key=self.test_key_private,
passphrase=TEST_KEY_PRIVATE_PASSPHRASE
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
with new_version.open() as file_object:
with signature.document_version.open() as file_object:
file_object.seek(0, 2)
new_size = file_object.tell()
file_object.seek(0)

View File

@@ -15,59 +15,23 @@ from ..permissions import (
permission_document_version_signature_view
)
from .literals import TEST_SIGNATURE_FILE_PATH, TEST_SIGNED_DOCUMENT_PATH
from .mixins import SignaturesTestMixin
from .literals import TEST_SIGNED_DOCUMENT_PATH
from .mixins import (
DetachedSignatureViewTestMixin, SignatureTestMixin,
SignatureViewTestMixin
)
TEST_UNSIGNED_DOCUMENT_COUNT = 4
TEST_SIGNED_DOCUMENT_COUNT = 2
class SignaturesViewTestMixin(object):
def _request_test_document_version_signature_delete_view(self):
return self.post(
viewname='signatures:document_version_signature_delete',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_details_view(self):
return self.get(
viewname='signatures:document_version_signature_details',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_download_view(self):
return self.get(
viewname='signatures:document_version_signature_download',
kwargs={'pk': self.test_signature.pk}
)
def _request_test_document_version_signature_list_view(self, document):
return self.get(
viewname='signatures:document_version_signature_list',
kwargs={'pk': self.test_document.latest_version.pk}
)
def _request_test_document_version_signature_upload_view(self):
with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object:
return self.post(
viewname='signatures:document_version_signature_upload',
kwargs={'pk': self.test_document.latest_version.pk},
data={'signature_file': file_object}
)
def _request_all_test_document_version_signature_verify_view(self):
return self.post(
viewname='signatures:all_document_version_signature_verify'
)
class SignaturesViewTestCase(
SignaturesTestMixin, SignaturesViewTestMixin, GenericDocumentViewTestCase
SignatureTestMixin, SignatureViewTestMixin, GenericDocumentViewTestCase
):
auto_upload_document = False
def test_signature_delete_view_no_permission(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -84,7 +48,7 @@ class SignaturesViewTestCase(
self.assertEqual(DetachedSignature.objects.count(), 1)
def test_signature_delete_view_with_access(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -105,7 +69,7 @@ class SignaturesViewTestCase(
self.assertEqual(DetachedSignature.objects.count(), 0)
def test_signature_detail_view_no_permission(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -116,7 +80,7 @@ class SignaturesViewTestCase(
self.assertEqual(response.status_code, 404)
def test_signature_detail_view_with_access(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -134,37 +98,8 @@ class SignaturesViewTestCase(
status_code=200
)
def test_signature_download_view_no_permission(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self._create_test_detached_signature()
response = self._request_test_document_version_signature_download_view()
self.assertEqual(response.status_code, 403)
def test_signature_download_view_with_access(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self._create_test_detached_signature()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_download
)
self.expected_content_type = 'application/octet-stream; charset=utf-8'
response = self._request_test_document_version_signature_download_view()
with self.test_signature.signature_file as file_object:
assert_download_response(
self, response=response, content=file_object.read(),
)
def test_signature_list_view_no_permission(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -174,10 +109,10 @@ class SignaturesViewTestCase(
response = self._request_test_document_version_signature_list_view(
document=self.test_document
)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.status_code, 404)
def test_signature_list_view_with_access(self):
self._create_test_key()
self._create_test_public_key()
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
@@ -195,29 +130,6 @@ class SignaturesViewTestCase(
self.assertEqual(response.status_code, 200)
self.assertEqual(response.context['object_list'].count(), 1)
def test_signature_upload_view_no_permission(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
response = self._request_test_document_version_signature_upload_view()
self.assertEqual(response.status_code, 403)
self.assertEqual(DetachedSignature.objects.count(), 0)
def test_signature_upload_view_with_access(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_upload
)
response = self._request_test_document_version_signature_upload_view()
self.assertEqual(response.status_code, 302)
self.assertEqual(DetachedSignature.objects.count(), 1)
def test_missing_signature_verify_view_no_permission(self):
# Silence converter logging
self._silence_logger(name='mayan.apps.converter.backends')
@@ -287,3 +199,62 @@ class SignaturesViewTestCase(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)
class DetachedSignaturesViewTestCase(
SignatureTestMixin, DetachedSignatureViewTestMixin,
GenericDocumentViewTestCase
):
auto_upload_document = False
def test_signature_download_view_no_permission(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self._create_test_detached_signature()
response = self._request_test_document_version_signature_download_view()
self.assertEqual(response.status_code, 403)
def test_signature_download_view_with_access(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self._create_test_detached_signature()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_download
)
self.expected_content_type = 'application/octet-stream; charset=utf-8'
response = self._request_test_document_version_signature_download_view()
with self.test_signature.signature_file as file_object:
assert_download_response(
self, response=response, content=file_object.read(),
)
def test_signature_upload_view_no_permission(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
response = self._request_test_document_version_signature_upload_view()
self.assertEqual(response.status_code, 403)
self.assertEqual(DetachedSignature.objects.count(), 0)
def test_signature_upload_view_with_access(self):
self.test_document_path = TEST_DOCUMENT_PATH
self.upload_document()
self.grant_access(
obj=self.test_document,
permission=permission_document_version_signature_upload
)
response = self._request_test_document_version_signature_upload_view()
self.assertEqual(response.status_code, 302)
self.assertEqual(DetachedSignature.objects.count(), 1)

View File

@@ -2,6 +2,11 @@ from __future__ import unicode_literals
from django.conf.urls import url
from .api_views import (
APIDocumentDetachedSignatureListView, APIDocumentDetachedSignatureView,
APIDocumentEmbeddedSignatureListView, APIDocumentEmbeddedSignatureView
)
from .views import (
AllDocumentSignatureVerifyView, DocumentVersionDetachedSignatureCreateView,
DocumentVersionEmbeddedSignatureCreateView,
@@ -52,3 +57,26 @@ urlpatterns = [
name='all_document_version_signature_verify'
),
]
api_urls = [
url(
regex=r'^documents/(?P<document_id>[0-9]+)/versions/(?P<document_version_id>[0-9]+)/signatures/detached/$',
view=APIDocumentDetachedSignatureListView.as_view(),
name='document-version-signature-detached-list'
),
url(
regex=r'^documents/(?P<document_id>[0-9]+)/versions/(?P<document_version_id>[0-9]+)/signatures/detached/(?P<detached_signature_id>[0-9]+)/$',
view=APIDocumentDetachedSignatureView.as_view(),
name='detachedsignature-detail'
),
url(
regex=r'^documents/(?P<document_id>[0-9]+)/versions/(?P<document_version_id>[0-9]+)/signatures/embedded/$',
view=APIDocumentEmbeddedSignatureListView.as_view(),
name='document-version-signature-embedded-list'
),
url(
regex=r'^documents/(?P<document_id>[0-9]+)/versions/(?P<document_version_id>[0-9]+)/signatures/embedded/(?P<embedded_signature_id>[0-9]+)/$',
view=APIDocumentEmbeddedSignatureView.as_view(),
name='embeddedsignature-detail'
),
]

View File

@@ -3,7 +3,6 @@ from __future__ import absolute_import, unicode_literals
import logging
from django.contrib import messages
from django.core.files import File
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.template import RequestContext
@@ -16,10 +15,10 @@ from mayan.apps.common.generics import (
ConfirmView, FormView, SingleObjectCreateView, SingleObjectDeleteView,
SingleObjectDetailView, SingleObjectDownloadView, SingleObjectListView
)
from mayan.apps.common.mixins import ExternalObjectMixin
from mayan.apps.django_gpg.exceptions import NeedPassphrase, PassphraseError
from mayan.apps.django_gpg.permissions import permission_key_sign
from mayan.apps.documents.models import DocumentVersion
from mayan.apps.storage.utils import TemporaryFile
from .forms import (
DocumentVersionSignatureCreateForm,
@@ -35,7 +34,7 @@ from .links import (
link_document_version_signature_embedded_create,
link_document_version_signature_upload
)
from .models import DetachedSignature, SignatureBaseModel
from .models import DetachedSignature, EmbeddedSignature, SignatureBaseModel
from .permissions import (
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
@@ -62,11 +61,10 @@ class DocumentVersionDetachedSignatureCreateView(FormView):
)
try:
with self.get_document_version().open() as file_object:
detached_signature = key.sign_file(
file_object=file_object, detached=True,
passphrase=passphrase
)
DetachedSignature.objects.sign_document_version(
document_version=self.get_document_version(),
key=key, passphrase=passphrase, user=self.request.user
)
except NeedPassphrase:
messages.error(
message=_('Passphrase is needed to unlock this key.'),
@@ -90,17 +88,6 @@ class DocumentVersionDetachedSignatureCreateView(FormView):
)
)
else:
temporary_file_object = TemporaryFile()
temporary_file_object.write(detached_signature.data)
temporary_file_object.seek(0)
DetachedSignature.objects.create(
document_version=self.get_document_version(),
signature_file=File(temporary_file_object)
)
temporary_file_object.close()
messages.success(
message=_('Document version signed successfully.'),
request=self.request
@@ -156,10 +143,10 @@ class DocumentVersionEmbeddedSignatureCreateView(FormView):
)
try:
with self.get_document_version().open() as file_object:
signature_result = key.sign_file(
binary=True, file_object=file_object, passphrase=passphrase
)
new_version = EmbeddedSignature.objects.sign_document_version(
document_version=self.get_document_version(),
key=key, passphrase=passphrase, user=self.request.user
)
except NeedPassphrase:
messages.error(
message=_('Passphrase is needed to unlock this key.'),
@@ -183,16 +170,6 @@ class DocumentVersionEmbeddedSignatureCreateView(FormView):
)
)
else:
temporary_file_object = TemporaryFile()
temporary_file_object.write(signature_result.data)
temporary_file_object.seek(0)
new_version = self.get_document_version().document.new_version(
file_object=temporary_file_object, _user=self.request.user
)
temporary_file_object.close()
messages.success(
message=_('Document version signed successfully.'),
request=self.request
@@ -285,20 +262,11 @@ class DocumentVersionSignatureDownloadView(SingleObjectDownloadView):
)
class DocumentVersionSignatureListView(SingleObjectListView):
def dispatch(self, request, *args, **kwargs):
AccessControlList.objects.check_access(
obj=self.get_document_version(),
permissions=(permission_document_version_signature_view,),
user=request.user
)
return super(
DocumentVersionSignatureListView, self
).dispatch(request, *args, **kwargs)
def get_document_version(self):
return get_object_or_404(klass=DocumentVersion, pk=self.kwargs['pk'])
class DocumentVersionSignatureListView(
ExternalObjectMixin, SingleObjectListView
):
external_object_class = DocumentVersion
external_object_permission = permission_document_version_signature_view
def get_extra_context(self):
return {
@@ -314,34 +282,34 @@ class DocumentVersionSignatureListView(SingleObjectListView):
link_document_version_signature_detached_create.resolve(
RequestContext(
request=self.request, dict_={
'object': self.get_document_version()
'object': self.external_object
}
)
),
link_document_version_signature_embedded_create.resolve(
RequestContext(
request=self.request, dict_={
'object': self.get_document_version()
'object': self.external_object
}
)
),
link_document_version_signature_upload.resolve(
RequestContext(
request=self.request, dict_={
'object': self.get_document_version()
'object': self.external_object
}
)
),
],
'no_results_title': _('There are no signatures for this document.'),
'object': self.get_document_version(),
'object': self.external_object,
'title': _(
'Signatures for document version: %s'
) % self.get_document_version(),
) % self.external_object,
}
def get_source_queryset(self):
return self.get_document_version().signatures.all()
return self.external_object.signatures.all()
class DocumentVersionSignatureUploadView(SingleObjectCreateView):

View File

@@ -0,0 +1,91 @@
from __future__ import unicode_literals
from django.apps import apps
from django.core.exceptions import ImproperlyConfigured
from django.db.models import Manager
from django.db.models.query import QuerySet
from rest_framework import serializers
from rest_framework.relations import HyperlinkedIdentityField
from mayan.apps.common.utils import resolve_attribute
class FilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField):
def __init__(self, **kwargs):
self.source_model = kwargs.pop('source_model', None)
self.source_permission = kwargs.pop('source_permission', None)
self.source_queryset = kwargs.pop('source_queryset', None)
self.source_queryset_method = kwargs.pop('source_queryset_method', None)
super(FilteredPrimaryKeyRelatedField, self).__init__(**kwargs)
def get_queryset(self):
AccessControlList = apps.get_model(
app_label='acls', model_name='AccessControlList'
)
if self.source_model:
queryset = self.source_model._meta.default_manager.all()
elif self.source_queryset:
queryset = self.source_queryset
if isinstance(queryset, (QuerySet, Manager)):
# Ensure queryset is re-evaluated whenever used.
queryset = queryset.all()
else:
method_name = self.source_queryset_method or 'get_{}_queryset'.format(
self.field_name
)
try:
queryset = getattr(self.parent, method_name)()
except AttributeError:
raise ImproperlyConfigured(
'Need to provide a source_model, a '
'source_queryset, a source_queryset_method, or '
'a method named "%s".' % method_name
)
assert 'request' in self.context, (
"`%s` requires the request in the serializer"
" context. Add `context={'request': request}` when instantiating "
"the serializer." % self.__class__.__name__
)
request = self.context['request']
if self.source_permission:
return AccessControlList.objects.restrict_queryset(
permission=self.source_permission, queryset=queryset,
user=request.user
)
else:
return queryset
class MultiKwargHyperlinkedIdentityField(HyperlinkedIdentityField):
def __init__(self, *args, **kwargs):
self.view_kwargs = kwargs.pop('view_kwargs', [])
super(MultiKwargHyperlinkedIdentityField, self).__init__(*args, **kwargs)
def get_url(self, obj, view_name, request, format):
"""
Extends HyperlinkedRelatedField to allow passing more than one view
keyword argument.
----
Given an object, return the URL that hyperlinks to the object.
May raise a `NoReverseMatch` if the `view_name` and `lookup_field`
attributes are not configured to correctly match the URL conf.
"""
# Unsaved objects will not yet have a valid URL.
if hasattr(obj, 'pk') and obj.pk in (None, ''):
return None
kwargs = {}
for entry in self.view_kwargs:
kwargs[entry['lookup_url_kwarg']] = resolve_attribute(
obj=obj, attribute=entry['lookup_field']
)
return self.reverse(
viewname=view_name, kwargs=kwargs, request=request, format=format
)