diff --git a/HISTORY.rst b/HISTORY.rst index 2f1f50e6be..519934ee99 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -110,6 +110,8 @@ own module. - Update label and icon of the document sign form Label updated from "Save" to "Sign". +- Add list, create, detail and edit API views for + detached and embedded signatures. 3.2.9 (2019-11-03) ================== diff --git a/mayan/apps/django_gpg/tests/literals.py b/mayan/apps/django_gpg/tests/literals.py index c7dca32a6a..55782e1872 100644 --- a/mayan/apps/django_gpg/tests/literals.py +++ b/mayan/apps/django_gpg/tests/literals.py @@ -26,7 +26,7 @@ TEST_FILE = os.path.join( 'test_files', 'test_file.txt' ) -TEST_KEY_DATA = '''-----BEGIN PGP PRIVATE KEY BLOCK----- +TEST_KEY_PRIVATE_DATA = '''-----BEGIN PGP PRIVATE KEY BLOCK----- Version: GnuPG v1 lQO+BFbxfC8BCACnUZoD96W4+CSIaU9G8I08kXu2zJLzy2XgUtwLx8VQ8dOHr0E/ @@ -86,9 +86,9 @@ h4oCbUV5JHhOyB+89Y1w8haFU9LrgOER2kXff1xU6wMfLdcO5ApV/sRJcNdYL7Cg =JZ5G -----END PGP PRIVATE KEY BLOCK-----''' -TEST_KEY_ID = '4125E9C571F378AC' -TEST_KEY_FINGERPRINT = '6A24574E0A35004CDDFD22704125E9C571F378AC' -TEST_KEY_PASSPHRASE = 'testpassphrase' +TEST_KEY_PRIVATE_ID = '4125E9C571F378AC' +TEST_KEY_PRIVATE_FINGERPRINT = '6A24574E0A35004CDDFD22704125E9C571F378AC' +TEST_KEY_PRIVATE_PASSPHRASE = 'testpassphrase' TEST_KEYSERVERS = ['pool.sks-keyservers.net'] diff --git a/mayan/apps/django_gpg/tests/mixins.py b/mayan/apps/django_gpg/tests/mixins.py index 82bf007147..726a37eacd 100644 --- a/mayan/apps/django_gpg/tests/mixins.py +++ b/mayan/apps/django_gpg/tests/mixins.py @@ -2,9 +2,48 @@ from __future__ import unicode_literals from ..models import Key -from .literals import TEST_KEY_DATA +from .literals import TEST_KEY_PRIVATE_DATA + + +class KeyAPIViewTestMixin(object): + def _request_test_key_create_view(self): + return self.post( + viewname='rest_api:key-list', data={ + 'key_data': TEST_KEY_PRIVATE_DATA + } + ) + + def _request_test_key_delete_view(self): + return self.delete( + viewname='rest_api:key-detail', kwargs={ + 'pk': self.test_key_private.pk + } + ) + + def _request_test_key_detail_view(self): + return self.get( + viewname='rest_api:key-detail', kwargs={ + 'pk': self.test_key_private.pk + } + ) class KeyTestMixin(object): - def _create_test_key(self): - self.test_key = Key.objects.create(key_data=TEST_KEY_DATA) + def _create_test_key_private(self): + self.test_key_private = Key.objects.create( + key_data=TEST_KEY_PRIVATE_DATA + ) + + +class KeyViewTestMixin(object): + def _request_test_key_download_view(self): + return self.get( + viewname='django_gpg:key_download', kwargs={'pk': self.test_key_private.pk} + ) + + def _request_test_key_upload_view(self): + return self.post( + viewname='django_gpg:key_upload', data={ + 'key_data': TEST_KEY_PRIVATE_DATA + } + ) diff --git a/mayan/apps/django_gpg/tests/test_api.py b/mayan/apps/django_gpg/tests/test_api.py index 6e9ab76b8f..3a6b5fa849 100644 --- a/mayan/apps/django_gpg/tests/test_api.py +++ b/mayan/apps/django_gpg/tests/test_api.py @@ -9,27 +9,8 @@ from ..permissions import ( permission_key_delete, permission_key_upload, permission_key_view ) -from .literals import TEST_KEY_DATA, TEST_KEY_FINGERPRINT -from .mixins import KeyTestMixin - - -class KeyAPIViewTestMixin(object): - def _request_test_key_create_view(self): - return self.post( - viewname='rest_api:key-list', data={ - 'key_data': TEST_KEY_DATA - } - ) - - def _request_test_key_delete_view(self): - return self.delete( - viewname='rest_api:key-detail', kwargs={'pk': self.test_key.pk} - ) - - def _request_test_key_detail_view(self): - return self.get( - viewname='rest_api:key-detail', kwargs={'pk': self.test_key.pk} - ) +from .literals import TEST_KEY_PRIVATE_FINGERPRINT +from .mixins import KeyAPIViewTestMixin, KeyTestMixin class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase): @@ -44,14 +25,16 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase): response = self._request_test_key_create_view() self.assertEqual(response.status_code, status.HTTP_201_CREATED) - self.assertEqual(response.data['fingerprint'], TEST_KEY_FINGERPRINT) + self.assertEqual( + response.data['fingerprint'], TEST_KEY_PRIVATE_FINGERPRINT + ) key = Key.objects.first() self.assertEqual(Key.objects.count(), 1) - self.assertEqual(key.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(key.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) def test_key_delete_view_no_access(self): - self._create_test_key() + self._create_test_key_private() response = self._request_test_key_delete_view() self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) @@ -59,9 +42,9 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase): self.assertEqual(Key.objects.count(), 1) def test_key_delete_view_with_access(self): - self._create_test_key() + self._create_test_key_private() self.grant_access( - obj=self.test_key, permission=permission_key_delete + obj=self.test_key_private, permission=permission_key_delete ) response = self._request_test_key_delete_view() @@ -70,19 +53,19 @@ class KeyAPITestCase(KeyTestMixin, KeyAPIViewTestMixin, BaseAPITestCase): self.assertEqual(Key.objects.count(), 0) def test_key_detail_view_no_access(self): - self._create_test_key() + self._create_test_key_private() response = self._request_test_key_detail_view() self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) def test_key_detail_view_with_access(self): - self._create_test_key() + self._create_test_key_private() self.grant_access( - obj=self.test_key, permission=permission_key_view + obj=self.test_key_private, permission=permission_key_view ) response = self._request_test_key_detail_view() self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual( - response.data['fingerprint'], self.test_key.fingerprint + response.data['fingerprint'], self.test_key_private.fingerprint ) diff --git a/mayan/apps/django_gpg/tests/test_models.py b/mayan/apps/django_gpg/tests/test_models.py index b854e8a00f..cd08b4e31c 100644 --- a/mayan/apps/django_gpg/tests/test_models.py +++ b/mayan/apps/django_gpg/tests/test_models.py @@ -18,9 +18,9 @@ from ..models import Key from .literals import ( MOCK_SEARCH_KEYS_RESPONSE, TEST_DETACHED_SIGNATURE, TEST_FILE, - TEST_KEY_DATA, TEST_KEY_FINGERPRINT, TEST_KEY_PASSPHRASE, - TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID, TEST_SIGNED_FILE, - TEST_SIGNED_FILE_CONTENT + TEST_KEY_PRIVATE_DATA, TEST_KEY_PRIVATE_FINGERPRINT, + TEST_KEY_PRIVATE_PASSPHRASE, TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID, + TEST_SIGNED_FILE, TEST_SIGNED_FILE_CONTENT ) from .mocks import mock_recv_keys @@ -28,9 +28,9 @@ from .mocks import mock_recv_keys class KeyTestCase(BaseTestCase): def test_key_instance_creation(self): # Creating a Key instance is analogous to importing a key - key = Key.objects.create(key_data=TEST_KEY_DATA) + key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) - self.assertEqual(key.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(key.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) @mock.patch.object(gnupg.GPG, 'search_keys', autospec=True) def test_key_search(self, search_keys): @@ -69,36 +69,36 @@ class KeyTestCase(BaseTestCase): with open(TEST_SIGNED_FILE, mode='rb') as signed_file: result = Key.objects.verify_file(signed_file) - self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT) + self.assertTrue(result.key_id in TEST_KEY_PRIVATE_FINGERPRINT) def test_embedded_verification_with_key(self): - Key.objects.create(key_data=TEST_KEY_DATA) + Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_SIGNED_FILE, mode='rb') as signed_file: result = Key.objects.verify_file(signed_file) - self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) def test_embedded_verification_with_correct_fingerprint(self): - Key.objects.create(key_data=TEST_KEY_DATA) + Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_SIGNED_FILE, mode='rb') as signed_file: result = Key.objects.verify_file( - signed_file, key_fingerprint=TEST_KEY_FINGERPRINT + signed_file, key_fingerprint=TEST_KEY_PRIVATE_FINGERPRINT ) self.assertTrue(result.valid) - self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) def test_embedded_verification_with_incorrect_fingerprint(self): - Key.objects.create(key_data=TEST_KEY_DATA) + Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_SIGNED_FILE, mode='rb') as signed_file: with self.assertRaises(KeyDoesNotExist): Key.objects.verify_file(signed_file, key_fingerprint='999') def test_signed_file_decryption(self): - Key.objects.create(key_data=TEST_KEY_DATA) + Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_SIGNED_FILE, mode='rb') as signed_file: result = Key.objects.decrypt_file(file_object=signed_file) @@ -122,10 +122,10 @@ class KeyTestCase(BaseTestCase): file_object=test_file, signature_file=signature_file ) - self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT) + self.assertTrue(result.key_id in TEST_KEY_PRIVATE_FINGERPRINT) def test_detached_verification_with_key(self): - Key.objects.create(key_data=TEST_KEY_DATA) + Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_DETACHED_SIGNATURE, mode='rb') as signature_file: with open(TEST_FILE, mode='rb') as test_file: @@ -134,10 +134,10 @@ class KeyTestCase(BaseTestCase): ) self.assertTrue(result) - self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) def test_detached_signing_no_passphrase(self): - key = Key.objects.create(key_data=TEST_KEY_DATA) + key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with self.assertRaises(NeedPassphrase): with open(TEST_FILE, mode='rb') as test_file: @@ -146,7 +146,7 @@ class KeyTestCase(BaseTestCase): ) def test_detached_signing_bad_passphrase(self): - key = Key.objects.create(key_data=TEST_KEY_DATA) + key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with self.assertRaises(PassphraseError): with open(TEST_FILE, mode='rb') as test_file: @@ -156,12 +156,12 @@ class KeyTestCase(BaseTestCase): ) def test_detached_signing_with_passphrase(self): - key = Key.objects.create(key_data=TEST_KEY_DATA) + key = Key.objects.create(key_data=TEST_KEY_PRIVATE_DATA) with open(TEST_FILE, mode='rb') as test_file: detached_signature = key.sign_file( file_object=test_file, detached=True, - passphrase=TEST_KEY_PASSPHRASE + passphrase=TEST_KEY_PRIVATE_PASSPHRASE ) signature_file = io.BytesIO() @@ -175,4 +175,4 @@ class KeyTestCase(BaseTestCase): signature_file.close() self.assertTrue(result) - self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT) + self.assertEqual(result.fingerprint, TEST_KEY_PRIVATE_FINGERPRINT) diff --git a/mayan/apps/django_gpg/tests/test_views.py b/mayan/apps/django_gpg/tests/test_views.py index b0f05d88cb..a7c4c8137f 100644 --- a/mayan/apps/django_gpg/tests/test_views.py +++ b/mayan/apps/django_gpg/tests/test_views.py @@ -7,25 +7,13 @@ from mayan.apps.common.tests.base import GenericViewTestCase from ..models import Key from ..permissions import permission_key_download, permission_key_upload -from .literals import TEST_KEY_DATA, TEST_KEY_FINGERPRINT -from .mixins import KeyTestMixin - - -class KeyViewTestMixin(object): - def _request_test_key_download_view(self): - return self.get( - viewname='django_gpg:key_download', kwargs={'pk': self.test_key.pk} - ) - - def _request_test_key_upload_view(self): - return self.post( - viewname='django_gpg:key_upload', data={'key_data': TEST_KEY_DATA} - ) +from .literals import TEST_KEY_PRIVATE_FINGERPRINT +from .mixins import KeyTestMixin, KeyViewTestMixin class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase): def test_key_download_view_no_permission(self): - self._create_test_key() + self._create_test_key_private() response = self._request_test_key_download_view() self.assertEqual(response.status_code, 403) @@ -33,16 +21,16 @@ class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase): def test_key_download_view_with_permission(self): self.expected_content_type = 'application/octet-stream; charset=utf-8' - self._create_test_key() + self._create_test_key_private() self.grant_access( - obj=self.test_key, permission=permission_key_download + obj=self.test_key_private, permission=permission_key_download ) response = self._request_test_key_download_view() assert_download_response( - self, response=response, content=self.test_key.key_data, - basename=self.test_key.key_id, + self, response=response, content=self.test_key_private.key_data, + basename=self.test_key_private.key_id, ) def test_key_upload_view_no_permission(self): @@ -59,5 +47,5 @@ class KeyViewTestCase(KeyTestMixin, KeyViewTestMixin, GenericViewTestCase): self.assertEqual(Key.objects.count(), 1) self.assertEqual( - Key.objects.first().fingerprint, TEST_KEY_FINGERPRINT + Key.objects.first().fingerprint, TEST_KEY_PRIVATE_FINGERPRINT ) diff --git a/mayan/apps/document_signatures/api_views.py b/mayan/apps/document_signatures/api_views.py new file mode 100644 index 0000000000..5cb7a2e90c --- /dev/null +++ b/mayan/apps/document_signatures/api_views.py @@ -0,0 +1,252 @@ +from __future__ import absolute_import, unicode_literals + +from django.shortcuts import get_object_or_404 + +from mayan.apps.acls.models import AccessControlList +from mayan.apps.documents.models import Document +from mayan.apps.rest_api import generics + +from .models import DetachedSignature, EmbeddedSignature +from .permissions import ( + permission_document_version_sign_detached, + permission_document_version_sign_embedded, + permission_document_version_signature_delete, + permission_document_version_signature_view +) +from .serializers import ( + DetachedSignatureSerializer, EmbeddedSignatureSerializer +) + + +class APIDocumentDetachedSignatureListView(generics.ListCreateAPIView): + """ + get: Returns a list of all the detached signatures of a document version. + post: Create an detached signature for a document version. + """ + serializer_class = DetachedSignatureSerializer + + def get_document(self): + return get_object_or_404( + klass=self.get_document_queryset(), pk=self.kwargs['document_id'] + ) + + def get_document_queryset(self): + if self.request.method == 'GET': + permission = permission_document_version_signature_view + elif self.request.method == 'POST': + permission = permission_document_version_sign_detached + + return AccessControlList.objects.restrict_queryset( + permission=permission, queryset=Document.objects.all(), + user=self.request.user + ) + + def get_document_version(self): + return get_object_or_404( + klass=self.get_document_version_queryset(), + pk=self.kwargs['document_version_id'] + ) + + def get_document_version_queryset(self): + return self.get_document().versions.all() + + def get_queryset(self): + return DetachedSignature.objects.filter( + document_version=self.get_document_version() + ) + + def get_serializer(self, *args, **kwargs): + if not self.request: + return None + + return super( + APIDocumentDetachedSignatureListView, self + ).get_serializer(*args, **kwargs) + + def get_serializer_context(self): + """ + Extra context provided to the serializer class. + """ + context = super( + APIDocumentDetachedSignatureListView, self + ).get_serializer_context() + if self.kwargs: + context.update( + { + 'document_version': self.get_document_version(), + } + ) + + return context + + +class APIDocumentDetachedSignatureView(generics.RetrieveDestroyAPIView): + """ + delete: Delete an detached signature of the selected document. + get: Returns the details of the selected detached signature. + """ + lookup_url_kwarg = 'detached_signature_id' + serializer_class = DetachedSignatureSerializer + + def get_document(self): + return get_object_or_404( + klass=self.get_document_queryset(), pk=self.kwargs['document_id'] + ) + + def get_document_queryset(self): + if self.request.method == 'GET': + permission = permission_document_version_signature_view + elif self.request.method == 'POST': + permission = permission_document_version_signature_view + elif self.request.method == 'DELETE': + permission = permission_document_version_signature_delete + + return AccessControlList.objects.restrict_queryset( + permission=permission, queryset=Document.objects.all(), + user=self.request.user + ) + + def get_document_version(self): + return get_object_or_404( + klass=self.get_document_version_queryset(), + pk=self.kwargs['document_version_id'] + ) + + def get_document_version_queryset(self): + return self.get_document().versions.all() + + def get_queryset(self): + return DetachedSignature.objects.filter( + document_version=self.get_document_version() + ) + + def get_serializer_context(self): + """ + Extra context provided to the serializer class. + """ + context = super(APIDocumentDetachedSignatureView, self).get_serializer_context() + if self.kwargs: + context.update( + { + 'document': self.get_document(), + } + ) + + return context + + +class APIDocumentEmbeddedSignatureListView(generics.ListCreateAPIView): + """ + get: Returns a list of all the embedded signatures of a document version. + post: Create an embedded signature for a document version. + """ + serializer_class = EmbeddedSignatureSerializer + + def get_document(self): + return get_object_or_404( + klass=self.get_document_queryset(), pk=self.kwargs['document_id'] + ) + + def get_document_queryset(self): + if self.request.method == 'GET': + permission = permission_document_version_signature_view + elif self.request.method == 'POST': + permission = permission_document_version_sign_embedded + + return AccessControlList.objects.restrict_queryset( + permission=permission, queryset=Document.objects.all(), + user=self.request.user + ) + + def get_document_version(self): + return get_object_or_404( + klass=self.get_document_version_queryset(), + pk=self.kwargs['document_version_id'] + ) + + def get_document_version_queryset(self): + return self.get_document().versions.all() + + def get_queryset(self): + return EmbeddedSignature.objects.filter( + document_version=self.get_document_version() + ) + + def get_serializer(self, *args, **kwargs): + if not self.request: + return None + + return super( + APIDocumentEmbeddedSignatureListView, self + ).get_serializer(*args, **kwargs) + + def get_serializer_context(self): + """ + Extra context provided to the serializer class. + """ + context = super( + APIDocumentEmbeddedSignatureListView, self + ).get_serializer_context() + if self.kwargs: + context.update( + { + 'document_version': self.get_document_version(), + } + ) + + return context + + +class APIDocumentEmbeddedSignatureView(generics.RetrieveDestroyAPIView): + """ + delete: Delete an embedded signature of the selected document. + get: Returns the details of the selected embedded signature. + """ + lookup_url_kwarg = 'embedded_signature_id' + serializer_class = EmbeddedSignatureSerializer + + def get_document(self): + return get_object_or_404( + klass=self.get_document_queryset(), pk=self.kwargs['document_id'] + ) + + def get_document_queryset(self): + if self.request.method == 'GET': + permission = permission_document_version_signature_view + elif self.request.method == 'POST': + permission = permission_document_version_signature_view + elif self.request.method == 'DELETE': + permission = permission_document_version_signature_delete + + return AccessControlList.objects.restrict_queryset( + permission=permission, queryset=Document.objects.all(), + user=self.request.user + ) + + def get_document_version(self): + return get_object_or_404( + klass=self.get_document_version_queryset(), + pk=self.kwargs['document_version_id'] + ) + + def get_document_version_queryset(self): + return self.get_document().versions.all() + + def get_queryset(self): + return EmbeddedSignature.objects.filter( + document_version=self.get_document_version() + ) + + def get_serializer_context(self): + """ + Extra context provided to the serializer class. + """ + context = super(APIDocumentEmbeddedSignatureView, self).get_serializer_context() + if self.kwargs: + context.update( + { + 'document': self.get_document(), + } + ) + + return context diff --git a/mayan/apps/document_signatures/apps.py b/mayan/apps/document_signatures/apps.py index 12dfc3e18a..9e522e878c 100644 --- a/mayan/apps/document_signatures/apps.py +++ b/mayan/apps/document_signatures/apps.py @@ -42,6 +42,7 @@ logger = logging.getLogger(__name__) class DocumentSignaturesApp(MayanAppConfig): app_namespace = 'signatures' app_url = 'signatures' + has_rest_api = True has_tests = True name = 'mayan.apps.document_signatures' verbose_name = _('Document signatures') diff --git a/mayan/apps/document_signatures/managers.py b/mayan/apps/document_signatures/managers.py index c8ace7c6c3..1718f27731 100644 --- a/mayan/apps/document_signatures/managers.py +++ b/mayan/apps/document_signatures/managers.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals import logging import os +from django.core.files import File from django.db import models from mayan.apps.django_gpg.exceptions import DecryptionError @@ -13,6 +14,29 @@ from mayan.apps.storage.utils import mkstemp logger = logging.getLogger(__name__) +class DetachedSignatureManager(models.Manager): + def sign_document_version( + self, document_version, key, passphrase=None, user=None + ): + temporary_file_object, temporary_filename = mkstemp() + + try: + with document_version.open() as file_object: + key.sign_file( + binary=True, detached=True, file_object=file_object, + output=temporary_filename, passphrase=passphrase + ) + except Exception: + raise + else: + return self.create( + document_version=document_version, + signature_file=File(temporary_file_object) + ) + finally: + os.unlink(temporary_filename) + + class EmbeddedSignatureManager(models.Manager): def open_signed(self, file_object, document_version): for signature in self.filter(document_version=document_version): @@ -28,7 +52,9 @@ class EmbeddedSignatureManager(models.Manager): else: return file_object - def sign_document_version(self, document_version, key, passphrase=None, user=None): + def sign_document_version( + self, document_version, key, passphrase=None, user=None + ): temporary_file_object, temporary_filename = mkstemp() try: @@ -44,11 +70,13 @@ class EmbeddedSignatureManager(models.Manager): new_version = document_version.document.new_version( file_object=file_object, _user=user ) + # This is a potential race condition but we have not way + # to access the final signature at this point. + signature = self.filter(document_version=new_version).first() + return signature or self.none() finally: os.unlink(temporary_filename) - return new_version - def unsigned_document_versions(self): return DocumentVersion.objects.exclude( pk__in=self.values('document_version') diff --git a/mayan/apps/document_signatures/models.py b/mayan/apps/document_signatures/models.py index 524fb893ee..32ddcea334 100644 --- a/mayan/apps/document_signatures/models.py +++ b/mayan/apps/document_signatures/models.py @@ -14,7 +14,7 @@ from mayan.apps.django_gpg.exceptions import VerificationError from mayan.apps.django_gpg.models import Key from mayan.apps.documents.models import DocumentVersion -from .managers import EmbeddedSignatureManager +from .managers import DetachedSignatureManager, EmbeddedSignatureManager from .storages import storage_detachedsignature logger = logging.getLogger(__name__) @@ -57,6 +57,7 @@ class SignatureBaseModel(models.Model): objects = InheritanceManager() class Meta: + ordering = ('pk',) verbose_name = _('Document version signature') verbose_name_plural = _('Document version signatures') @@ -131,8 +132,7 @@ class DetachedSignature(SignatureBaseModel): upload_to=upload_to, verbose_name=_('Signature file') ) - # Don't inherit the SignatureBaseModel manager - objects = models.Manager() + objects = DetachedSignatureManager() class Meta: verbose_name = _('Document version detached signature') diff --git a/mayan/apps/document_signatures/serializers.py b/mayan/apps/document_signatures/serializers.py new file mode 100644 index 0000000000..dfc7b8cd88 --- /dev/null +++ b/mayan/apps/document_signatures/serializers.py @@ -0,0 +1,147 @@ +from __future__ import absolute_import, unicode_literals + +from rest_framework import serializers +from rest_framework.exceptions import ValidationError + +from mayan.apps.acls.models import AccessControlList +from mayan.apps.django_gpg.models import Key +from mayan.apps.django_gpg.permissions import permission_key_sign +from mayan.apps.rest_api.relations import MultiKwargHyperlinkedIdentityField + +from .models import DetachedSignature, EmbeddedSignature + + +class DetachedSignatureSerializer(serializers.HyperlinkedModelSerializer): + document_version_url = MultiKwargHyperlinkedIdentityField( + view_kwargs=( + { + 'lookup_field': 'document_version_id', + 'lookup_url_kwarg': 'version_pk', + }, + { + 'lookup_field': 'document_version.document.pk', + 'lookup_url_kwarg': 'pk', + } + ), + view_name='rest_api:documentversion-detail' + ) + + url = MultiKwargHyperlinkedIdentityField( + view_kwargs=( + { + 'lookup_field': 'document_version.document.pk', + 'lookup_url_kwarg': 'document_id', + }, + { + 'lookup_field': 'document_version_id', + 'lookup_url_kwarg': 'document_version_id', + }, + { + 'lookup_field': 'pk', + 'lookup_url_kwarg': 'detached_signature_id', + }, + ), + view_name='rest_api:detachedsignature-detail' + ) + passphrase = serializers.CharField(required=False, write_only=True) + + class Meta: + fields = ( + 'date', 'document_version_url', 'key_id', 'signature_id', + 'passphrase', 'public_key_fingerprint', 'url' + ) + model = DetachedSignature + + def create(self, validated_data): + key_id = validated_data.pop('key_id') + passphrase = validated_data.pop('passphrase', None) + + key_queryset = AccessControlList.objects.restrict_queryset( + permission=permission_key_sign, queryset=Key.objects.all(), + user=self.context['request'].user + ) + + try: + key = key_queryset.get(fingerprint__endswith=key_id) + except Key.DoesNotExist: + raise ValidationError( + { + 'key_id': [ + 'Key "{}" not found.'.format(key_id) + ] + }, code='invalid' + ) + + return DetachedSignature.objects.sign_document_version( + document_version=self.context['document_version'], key=key, + passphrase=passphrase, user=self.context['request'].user + ) + + +class EmbeddedSignatureSerializer(serializers.HyperlinkedModelSerializer): + document_version_url = MultiKwargHyperlinkedIdentityField( + view_kwargs=( + { + 'lookup_field': 'document_version_id', + 'lookup_url_kwarg': 'version_pk', + }, + { + 'lookup_field': 'document_version.document.pk', + 'lookup_url_kwarg': 'pk', + } + ), + view_name='rest_api:documentversion-detail' + ) + + url = MultiKwargHyperlinkedIdentityField( + view_kwargs=( + { + 'lookup_field': 'document_version.document.pk', + 'lookup_url_kwarg': 'document_id', + }, + { + 'lookup_field': 'document_version_id', + 'lookup_url_kwarg': 'document_version_id', + }, + { + 'lookup_field': 'pk', + 'lookup_url_kwarg': 'embedded_signature_id', + }, + ), + view_name='rest_api:embeddedsignature-detail' + ) + passphrase = serializers.CharField(required=False, write_only=True) + + class Meta: + fields = ( + 'date', 'document_version_url', 'key_id', 'signature_id', + 'passphrase', 'public_key_fingerprint', 'url' + ) + model = EmbeddedSignature + + def create(self, validated_data): + key_id = validated_data.pop('key_id') + passphrase = validated_data.pop('passphrase', None) + + key_queryset = AccessControlList.objects.restrict_queryset( + permission=permission_key_sign, queryset=Key.objects.all(), + user=self.context['request'].user + ) + + try: + key = key_queryset.get(fingerprint__endswith=key_id) + except Key.DoesNotExist: + raise ValidationError( + { + 'key_id': [ + 'Key "{}" not found.'.format(key_id) + ] + }, code='invalid' + ) + + signature = EmbeddedSignature.objects.sign_document_version( + document_version=self.context['document_version'], key=key, + passphrase=passphrase, user=self.context['request'].user + ) + + return signature diff --git a/mayan/apps/document_signatures/tests/literals.py b/mayan/apps/document_signatures/tests/literals.py index 040366be1e..49aebf355a 100644 --- a/mayan/apps/document_signatures/tests/literals.py +++ b/mayan/apps/document_signatures/tests/literals.py @@ -12,9 +12,9 @@ TEST_SIGNATURE_FILE_PATH = os.path.join( settings.BASE_DIR, 'apps', 'document_signatures', 'tests', 'contrib', 'sample_documents', 'mayan_11_1.pdf.sig' ) -TEST_KEY_FILE = os.path.join( +TEST_KEY_FILE_PATH = os.path.join( settings.BASE_DIR, 'apps', 'document_signatures', 'tests', 'contrib', 'sample_documents', 'key0x5F3F7F75D210724D.asc' ) -TEST_KEY_ID = '5F3F7F75D210724D' +TEST_KEY_PUBLIC_ID = '5F3F7F75D210724D' TEST_SIGNATURE_ID = 'XVkoGKw35yU1iq11dZPiv7uAY7k' diff --git a/mayan/apps/document_signatures/tests/mixins.py b/mayan/apps/document_signatures/tests/mixins.py index aafafa93f8..e0da4b49c7 100644 --- a/mayan/apps/document_signatures/tests/mixins.py +++ b/mayan/apps/document_signatures/tests/mixins.py @@ -3,13 +3,116 @@ from __future__ import absolute_import, unicode_literals from django.core.files import File from mayan.apps.django_gpg.models import Key +from mayan.apps.django_gpg.tests.literals import TEST_KEY_PRIVATE_PASSPHRASE from ..models import DetachedSignature -from .literals import TEST_KEY_FILE, TEST_SIGNATURE_FILE_PATH +from .literals import TEST_KEY_FILE_PATH, TEST_SIGNATURE_FILE_PATH -class SignaturesTestMixin(object): +class DetachedSignatureAPIViewTestMixin(object): + def _request_test_document_signature_detached_create_view(self): + return self.post( + viewname='rest_api:document-version-signature-detached-list', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk + }, data={ + 'key_id': self.test_key_private.key_id, + 'passphrase': TEST_KEY_PRIVATE_PASSPHRASE + } + ) + + def _request_test_document_signature_detached_delete_view(self): + return self.delete( + viewname='rest_api:detachedsignature-detail', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk, + 'detached_signature_id': self.test_document_version.signatures.first().pk + } + ) + + def _request_test_document_signature_detached_detail_view(self): + return self.get( + viewname='rest_api:detachedsignature-detail', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk, + 'detached_signature_id': self.test_document_version.signatures.first().pk + } + ) + + def _request_test_document_signature_detached_list_view(self): + return self.get( + viewname='rest_api:document-version-signature-detached-list', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk + } + ) + + +class DetachedSignatureViewTestMixin(object): + def _request_test_document_version_signature_download_view(self): + return self.get( + viewname='signatures:document_version_signature_download', + kwargs={'pk': self.test_signature.pk} + ) + + def _request_test_document_version_signature_upload_view(self): + with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object: + return self.post( + viewname='signatures:document_version_signature_upload', + kwargs={'pk': self.test_document.latest_version.pk}, + data={'signature_file': file_object} + ) + + +class EmbeddedSignatureAPIViewTestMixin(object): + def _request_test_document_signature_embedded_create_view(self): + return self.post( + viewname='rest_api:document-version-signature-embedded-list', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk + }, data={ + 'key_id': self.test_key_private.key_id, + 'passphrase': TEST_KEY_PRIVATE_PASSPHRASE + } + ) + + def _request_test_document_signature_embedded_delete_view(self): + return self.delete( + viewname='rest_api:embeddedsignature-detail', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk, + 'embedded_signature_id': self.test_document_version.signatures.first().pk + } + ) + + def _request_test_document_signature_embedded_detail_view(self): + return self.get( + viewname='rest_api:embeddedsignature-detail', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk, + 'embedded_signature_id': self.test_document_version.signatures.first().pk + } + ) + + def _request_test_document_signature_embedded_list_view(self): + return self.get( + viewname='rest_api:document-version-signature-embedded-list', + kwargs={ + 'document_id': self.test_document.pk, + 'document_version_id': self.test_document_version.pk + } + ) + + +class SignatureTestMixin(object): def _create_test_detached_signature(self): with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object: self.test_signature = DetachedSignature.objects.create( @@ -17,6 +120,33 @@ class SignaturesTestMixin(object): signature_file=File(file_object) ) - def _create_test_key(self): - with open(TEST_KEY_FILE, mode='rb') as file_object: - self.test_key = Key.objects.create(key_data=file_object.read()) + def _create_test_public_key(self): + with open(TEST_KEY_FILE_PATH, mode='rb') as file_object: + self.test_key_public = Key.objects.create( + key_data=file_object.read() + ) + + +class SignatureViewTestMixin(object): + def _request_test_document_version_signature_delete_view(self): + return self.post( + viewname='signatures:document_version_signature_delete', + kwargs={'pk': self.test_signature.pk} + ) + + def _request_test_document_version_signature_details_view(self): + return self.get( + viewname='signatures:document_version_signature_details', + kwargs={'pk': self.test_signature.pk} + ) + + def _request_test_document_version_signature_list_view(self, document): + return self.get( + viewname='signatures:document_version_signature_list', + kwargs={'pk': self.test_document.latest_version.pk} + ) + + def _request_all_test_document_version_signature_verify_view(self): + return self.post( + viewname='signatures:all_document_version_signature_verify' + ) diff --git a/mayan/apps/document_signatures/tests/test_api.py b/mayan/apps/document_signatures/tests/test_api.py new file mode 100644 index 0000000000..b5b4efb67d --- /dev/null +++ b/mayan/apps/document_signatures/tests/test_api.py @@ -0,0 +1,341 @@ +from __future__ import unicode_literals + +from rest_framework import status + +from mayan.apps.django_gpg.permissions import permission_key_sign +from mayan.apps.django_gpg.tests.mixins import KeyTestMixin +from mayan.apps.documents.tests.mixins import DocumentTestMixin +from mayan.apps.rest_api.tests.base import BaseAPITestCase + +from ..permissions import ( + permission_document_version_sign_detached, + permission_document_version_sign_embedded, + permission_document_version_signature_delete, + permission_document_version_signature_view +) + +from .literals import TEST_KEY_PUBLIC_ID, TEST_SIGNED_DOCUMENT_PATH +from .mixins import ( + DetachedSignatureAPIViewTestMixin, EmbeddedSignatureAPIViewTestMixin, + SignatureTestMixin +) + + +class DetachedSignatureDocumentAPIViewTestCase( + DocumentTestMixin, DetachedSignatureAPIViewTestMixin, + KeyTestMixin, SignatureTestMixin, BaseAPITestCase +): + auto_upload_document = False + + def test_document_signature_detached_delete_no_permission(self): + self.upload_document() + self._create_test_detached_signature() + + signatures = self.test_document.latest_version.signatures.count() + + response = self._request_test_document_signature_detached_delete_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_detached_delete_with_access(self): + self.upload_document() + self._create_test_detached_signature() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_delete + ) + + response = self._request_test_document_signature_detached_delete_view() + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures - 1 + ) + + def test_document_signature_detached_detail_no_permission(self): + self.upload_document() + self._create_test_detached_signature() + + response = self._request_test_document_signature_detached_detail_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_document_signature_detached_detail_with_access(self): + self.upload_document() + self._create_test_detached_signature() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_view + ) + + response = self._request_test_document_signature_detached_detail_view() + self.assertEqual(response.status_code, status.HTTP_200_OK) + + self.assertEqual( + response.data['key_id'], TEST_KEY_PUBLIC_ID + ) + + def test_document_signature_detached_create_view_no_permission(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + response = self._request_test_document_signature_detached_create_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_detached_create_view_with_document_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_sign_detached + ) + + response = self._request_test_document_signature_detached_create_view() + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_detached_create_view_with_key_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_key_private, + permission=permission_key_sign + ) + + response = self._request_test_document_signature_detached_create_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_detached_create_view_with_full_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_sign_detached + ) + self.grant_access( + obj=self.test_key_private, + permission=permission_key_sign + ) + + response = self._request_test_document_signature_detached_create_view() + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + 1 + ) + + def test_document_signature_detached_list_view_no_permission(self): + self.upload_document() + self._create_test_detached_signature() + + response = self._request_test_document_signature_detached_list_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_document_signature_detached_list_view_with_access(self): + self.upload_document() + self._create_test_detached_signature() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_view + ) + + response = self._request_test_document_signature_detached_list_view() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual( + response.data['results'][0]['key_id'], TEST_KEY_PUBLIC_ID + ) + + +class EmbeddedSignatureDocumentAPIViewTestCase( + DocumentTestMixin, EmbeddedSignatureAPIViewTestMixin, + KeyTestMixin, BaseAPITestCase +): + auto_upload_document = False + + def test_document_signature_embedded_delete_no_permission(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + signatures = self.test_document.latest_version.signatures.count() + + response = self._request_test_document_signature_embedded_delete_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_embedded_delete_with_access(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_delete + ) + + response = self._request_test_document_signature_embedded_delete_view() + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures - 1 + ) + + def test_document_signature_embedded_detail_no_permission(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + response = self._request_test_document_signature_embedded_detail_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_document_signature_embedded_detail_with_access(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_view + ) + + response = self._request_test_document_signature_embedded_detail_view() + self.assertEqual(response.status_code, status.HTTP_200_OK) + + self.assertEqual( + response.data['key_id'], TEST_KEY_PUBLIC_ID + ) + + def test_document_signature_embedded_create_view_no_permission(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + response = self._request_test_document_signature_embedded_create_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_embedded_create_view_with_document_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_sign_embedded + ) + + response = self._request_test_document_signature_embedded_create_view() + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_embedded_create_view_with_key_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_key_private, + permission=permission_key_sign + ) + + response = self._request_test_document_signature_embedded_create_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + ) + + def test_document_signature_embedded_create_view_with_full_access(self): + self.upload_document() + self._create_test_key_private() + + signatures = self.test_document.latest_version.signatures.count() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_sign_embedded + ) + self.grant_access( + obj=self.test_key_private, + permission=permission_key_sign + ) + + response = self._request_test_document_signature_embedded_create_view() + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + self.assertEqual( + self.test_document.latest_version.signatures.count(), + signatures + 1 + ) + + def test_document_signature_embedded_list_view_no_permission(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + response = self._request_test_document_signature_embedded_list_view() + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_document_signature_embedded_list_view_with_access(self): + self.test_document_path = TEST_SIGNED_DOCUMENT_PATH + self.upload_document() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_view + ) + + response = self._request_test_document_signature_embedded_list_view() + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual( + response.data['results'][0]['key_id'], TEST_KEY_PUBLIC_ID + ) diff --git a/mayan/apps/document_signatures/tests/test_links.py b/mayan/apps/document_signatures/tests/test_links.py index 8ad2d010c2..55f3bf703b 100644 --- a/mayan/apps/document_signatures/tests/test_links.py +++ b/mayan/apps/document_signatures/tests/test_links.py @@ -14,10 +14,12 @@ from ..permissions import ( permission_document_version_signature_view ) from .literals import TEST_SIGNED_DOCUMENT_PATH -from .mixins import SignaturesTestMixin +from .mixins import SignatureTestMixin -class DocumentSignatureLinksTestCase(SignaturesTestMixin, GenericDocumentViewTestCase): +class DocumentSignatureLinksTestCase( + SignatureTestMixin, GenericDocumentViewTestCase +): def test_document_version_signature_detail_link_no_permission(self): self.test_document_path = TEST_SIGNED_DOCUMENT_PATH self.upload_document() diff --git a/mayan/apps/document_signatures/tests/test_models.py b/mayan/apps/document_signatures/tests/test_models.py index 2ac5c03ab4..00e4cfd298 100644 --- a/mayan/apps/document_signatures/tests/test_models.py +++ b/mayan/apps/document_signatures/tests/test_models.py @@ -3,10 +3,8 @@ from __future__ import unicode_literals import hashlib import time -from mayan.apps.django_gpg.models import Key -from mayan.apps.django_gpg.tests.literals import ( - TEST_KEY_DATA, TEST_KEY_PASSPHRASE -) +from mayan.apps.django_gpg.tests.literals import TEST_KEY_PRIVATE_PASSPHRASE +from mayan.apps.django_gpg.tests.mixins import KeyTestMixin from mayan.apps.documents.models import DocumentVersion from mayan.apps.documents.tests.base import GenericDocumentTestCase from mayan.apps.documents.tests.literals import TEST_DOCUMENT_PATH @@ -14,11 +12,13 @@ from mayan.apps.documents.tests.literals import TEST_DOCUMENT_PATH from ..models import DetachedSignature, EmbeddedSignature from ..tasks import task_verify_missing_embedded_signature -from .literals import TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_ID, TEST_SIGNATURE_ID -from .mixins import SignaturesTestMixin +from .literals import ( + TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_PUBLIC_ID, TEST_SIGNATURE_ID +) +from .mixins import SignatureTestMixin -class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): +class DocumentSignaturesTestCase(SignatureTestMixin, GenericDocumentTestCase): auto_upload_document = False def test_embedded_signature_no_key(self): @@ -31,7 +31,7 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.assertEqual( signature.document_version, self.test_document.latest_version ) - self.assertEqual(signature.key_id, TEST_KEY_ID) + self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual(signature.signature_id, None) def test_embedded_signature_post_key_verify(self): @@ -44,17 +44,17 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.assertEqual( signature.document_version, self.test_document.latest_version ) - self.assertEqual(signature.key_id, TEST_KEY_ID) + self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual(signature.signature_id, None) - self._create_test_key() + self._create_test_public_key() signature = EmbeddedSignature.objects.first() self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID) def test_embedded_signature_post_no_key_verify(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_SIGNED_DOCUMENT_PATH self.upload_document() @@ -65,17 +65,17 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.assertEqual( signature.document_version, self.test_document.latest_version ) - self.assertEqual(signature.key_id, TEST_KEY_ID) + self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID) - self.test_key.delete() + self.test_key_public.delete() signature = EmbeddedSignature.objects.first() self.assertEqual(signature.signature_id, None) def test_embedded_signature_with_key(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_SIGNED_DOCUMENT_PATH self.upload_document() @@ -87,9 +87,9 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): signature.document_version, self.test_document.latest_version ) - self.assertEqual(signature.key_id, TEST_KEY_ID) + self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual( - signature.public_key_fingerprint, self.test_key.fingerprint + signature.public_key_fingerprint, self.test_key_public.fingerprint ) self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID) @@ -102,13 +102,14 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.assertEqual(DetachedSignature.objects.count(), 1) self.assertEqual( - self.test_signature.document_version, self.test_document.latest_version + self.test_signature.document_version, + self.test_document.latest_version ) - self.assertEqual(self.test_signature.key_id, TEST_KEY_ID) + self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual(self.test_signature.public_key_fingerprint, None) def test_detached_signature_with_key(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -117,12 +118,13 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.assertEqual(DetachedSignature.objects.count(), 1) self.assertEqual( - self.test_signature.document_version, self.test_document.latest_version + self.test_signature.document_version, + self.test_document.latest_version ) - self.assertEqual(self.test_signature.key_id, TEST_KEY_ID) + self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual( self.test_signature.public_key_fingerprint, - self.test_key.fingerprint + self.test_key_public.fingerprint ) def test_detached_signature_post_key_verify(self): @@ -137,19 +139,19 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.test_signature.document_version, self.test_document.latest_version ) - self.assertEqual(self.test_signature.key_id, TEST_KEY_ID) + self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual(self.test_signature.public_key_fingerprint, None) - self._create_test_key() + self._create_test_public_key() signature = DetachedSignature.objects.first() self.assertEqual( - signature.public_key_fingerprint, self.test_key.fingerprint + signature.public_key_fingerprint, self.test_key_public.fingerprint ) def test_detached_signature_post_no_key_verify(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -161,13 +163,13 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): self.test_signature.document_version, self.test_document.latest_version ) - self.assertEqual(self.test_signature.key_id, TEST_KEY_ID) + self.assertEqual(self.test_signature.key_id, TEST_KEY_PUBLIC_ID) self.assertEqual( self.test_signature.public_key_fingerprint, - self.test_key.fingerprint + self.test_key_public.fingerprint ) - self.test_key.delete() + self.test_key_public.delete() signature = DetachedSignature.objects.first() @@ -198,10 +200,10 @@ class DocumentSignaturesTestCase(SignaturesTestMixin, GenericDocumentTestCase): signature = EmbeddedSignature.objects.first() self.assertEqual(signature.document_version, signed_version) - self.assertEqual(signature.key_id, TEST_KEY_ID) + self.assertEqual(signature.key_id, TEST_KEY_PUBLIC_ID) -class EmbeddedSignaturesTestCase(GenericDocumentTestCase): +class EmbeddedSignaturesTestCase(KeyTestMixin, GenericDocumentTestCase): auto_upload_document = False def test_unsigned_document_version_method(self): @@ -255,7 +257,7 @@ class EmbeddedSignaturesTestCase(GenericDocumentTestCase): ) def test_signing(self): - self.test_key = Key.objects.create(key_data=TEST_KEY_DATA) + self._create_test_key_private() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -266,15 +268,15 @@ class EmbeddedSignaturesTestCase(GenericDocumentTestCase): file_object.seek(0) original_hash = hashlib.sha256(file_object.read()).hexdigest() - new_version = EmbeddedSignature.objects.sign_document_version( + signature = EmbeddedSignature.objects.sign_document_version( document_version=self.test_document.latest_version, - key=self.test_key, - passphrase=TEST_KEY_PASSPHRASE + key=self.test_key_private, + passphrase=TEST_KEY_PRIVATE_PASSPHRASE ) self.assertEqual(EmbeddedSignature.objects.count(), 1) - with new_version.open() as file_object: + with signature.document_version.open() as file_object: file_object.seek(0, 2) new_size = file_object.tell() file_object.seek(0) diff --git a/mayan/apps/document_signatures/tests/test_views.py b/mayan/apps/document_signatures/tests/test_views.py index dda0ec0639..1c3ee0d01b 100644 --- a/mayan/apps/document_signatures/tests/test_views.py +++ b/mayan/apps/document_signatures/tests/test_views.py @@ -15,59 +15,23 @@ from ..permissions import ( permission_document_version_signature_view ) -from .literals import TEST_SIGNATURE_FILE_PATH, TEST_SIGNED_DOCUMENT_PATH -from .mixins import SignaturesTestMixin +from .literals import TEST_SIGNED_DOCUMENT_PATH +from .mixins import ( + DetachedSignatureViewTestMixin, SignatureTestMixin, + SignatureViewTestMixin +) TEST_UNSIGNED_DOCUMENT_COUNT = 4 TEST_SIGNED_DOCUMENT_COUNT = 2 -class SignaturesViewTestMixin(object): - def _request_test_document_version_signature_delete_view(self): - return self.post( - viewname='signatures:document_version_signature_delete', - kwargs={'pk': self.test_signature.pk} - ) - - def _request_test_document_version_signature_details_view(self): - return self.get( - viewname='signatures:document_version_signature_details', - kwargs={'pk': self.test_signature.pk} - ) - - def _request_test_document_version_signature_download_view(self): - return self.get( - viewname='signatures:document_version_signature_download', - kwargs={'pk': self.test_signature.pk} - ) - - def _request_test_document_version_signature_list_view(self, document): - return self.get( - viewname='signatures:document_version_signature_list', - kwargs={'pk': self.test_document.latest_version.pk} - ) - - def _request_test_document_version_signature_upload_view(self): - with open(TEST_SIGNATURE_FILE_PATH, mode='rb') as file_object: - return self.post( - viewname='signatures:document_version_signature_upload', - kwargs={'pk': self.test_document.latest_version.pk}, - data={'signature_file': file_object} - ) - - def _request_all_test_document_version_signature_verify_view(self): - return self.post( - viewname='signatures:all_document_version_signature_verify' - ) - - class SignaturesViewTestCase( - SignaturesTestMixin, SignaturesViewTestMixin, GenericDocumentViewTestCase + SignatureTestMixin, SignatureViewTestMixin, GenericDocumentViewTestCase ): auto_upload_document = False def test_signature_delete_view_no_permission(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -84,7 +48,7 @@ class SignaturesViewTestCase( self.assertEqual(DetachedSignature.objects.count(), 1) def test_signature_delete_view_with_access(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -105,7 +69,7 @@ class SignaturesViewTestCase( self.assertEqual(DetachedSignature.objects.count(), 0) def test_signature_detail_view_no_permission(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -116,7 +80,7 @@ class SignaturesViewTestCase( self.assertEqual(response.status_code, 404) def test_signature_detail_view_with_access(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -134,37 +98,8 @@ class SignaturesViewTestCase( status_code=200 ) - def test_signature_download_view_no_permission(self): - self.test_document_path = TEST_DOCUMENT_PATH - self.upload_document() - - self._create_test_detached_signature() - - response = self._request_test_document_version_signature_download_view() - self.assertEqual(response.status_code, 403) - - def test_signature_download_view_with_access(self): - self.test_document_path = TEST_DOCUMENT_PATH - self.upload_document() - - self._create_test_detached_signature() - - self.grant_access( - obj=self.test_document, - permission=permission_document_version_signature_download - ) - - self.expected_content_type = 'application/octet-stream; charset=utf-8' - - response = self._request_test_document_version_signature_download_view() - - with self.test_signature.signature_file as file_object: - assert_download_response( - self, response=response, content=file_object.read(), - ) - def test_signature_list_view_no_permission(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -174,10 +109,10 @@ class SignaturesViewTestCase( response = self._request_test_document_version_signature_list_view( document=self.test_document ) - self.assertEqual(response.status_code, 403) + self.assertEqual(response.status_code, 404) def test_signature_list_view_with_access(self): - self._create_test_key() + self._create_test_public_key() self.test_document_path = TEST_DOCUMENT_PATH self.upload_document() @@ -195,29 +130,6 @@ class SignaturesViewTestCase( self.assertEqual(response.status_code, 200) self.assertEqual(response.context['object_list'].count(), 1) - def test_signature_upload_view_no_permission(self): - self.test_document_path = TEST_DOCUMENT_PATH - self.upload_document() - - response = self._request_test_document_version_signature_upload_view() - self.assertEqual(response.status_code, 403) - - self.assertEqual(DetachedSignature.objects.count(), 0) - - def test_signature_upload_view_with_access(self): - self.test_document_path = TEST_DOCUMENT_PATH - self.upload_document() - - self.grant_access( - obj=self.test_document, - permission=permission_document_version_signature_upload - ) - - response = self._request_test_document_version_signature_upload_view() - self.assertEqual(response.status_code, 302) - - self.assertEqual(DetachedSignature.objects.count(), 1) - def test_missing_signature_verify_view_no_permission(self): # Silence converter logging self._silence_logger(name='mayan.apps.converter.backends') @@ -287,3 +199,62 @@ class SignaturesViewTestCase( EmbeddedSignature.objects.unsigned_document_versions().count(), TEST_UNSIGNED_DOCUMENT_COUNT ) + + +class DetachedSignaturesViewTestCase( + SignatureTestMixin, DetachedSignatureViewTestMixin, + GenericDocumentViewTestCase +): + auto_upload_document = False + + def test_signature_download_view_no_permission(self): + self.test_document_path = TEST_DOCUMENT_PATH + self.upload_document() + + self._create_test_detached_signature() + + response = self._request_test_document_version_signature_download_view() + self.assertEqual(response.status_code, 403) + + def test_signature_download_view_with_access(self): + self.test_document_path = TEST_DOCUMENT_PATH + self.upload_document() + + self._create_test_detached_signature() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_download + ) + + self.expected_content_type = 'application/octet-stream; charset=utf-8' + + response = self._request_test_document_version_signature_download_view() + + with self.test_signature.signature_file as file_object: + assert_download_response( + self, response=response, content=file_object.read(), + ) + + def test_signature_upload_view_no_permission(self): + self.test_document_path = TEST_DOCUMENT_PATH + self.upload_document() + + response = self._request_test_document_version_signature_upload_view() + self.assertEqual(response.status_code, 403) + + self.assertEqual(DetachedSignature.objects.count(), 0) + + def test_signature_upload_view_with_access(self): + self.test_document_path = TEST_DOCUMENT_PATH + self.upload_document() + + self.grant_access( + obj=self.test_document, + permission=permission_document_version_signature_upload + ) + + response = self._request_test_document_version_signature_upload_view() + self.assertEqual(response.status_code, 302) + + self.assertEqual(DetachedSignature.objects.count(), 1) diff --git a/mayan/apps/document_signatures/urls.py b/mayan/apps/document_signatures/urls.py index cbc1896df5..aec4ed0ccf 100644 --- a/mayan/apps/document_signatures/urls.py +++ b/mayan/apps/document_signatures/urls.py @@ -2,6 +2,11 @@ from __future__ import unicode_literals from django.conf.urls import url +from .api_views import ( + APIDocumentDetachedSignatureListView, APIDocumentDetachedSignatureView, + APIDocumentEmbeddedSignatureListView, APIDocumentEmbeddedSignatureView +) + from .views import ( AllDocumentSignatureVerifyView, DocumentVersionDetachedSignatureCreateView, DocumentVersionEmbeddedSignatureCreateView, @@ -52,3 +57,26 @@ urlpatterns = [ name='all_document_version_signature_verify' ), ] + +api_urls = [ + url( + regex=r'^documents/(?P[0-9]+)/versions/(?P[0-9]+)/signatures/detached/$', + view=APIDocumentDetachedSignatureListView.as_view(), + name='document-version-signature-detached-list' + ), + url( + regex=r'^documents/(?P[0-9]+)/versions/(?P[0-9]+)/signatures/detached/(?P[0-9]+)/$', + view=APIDocumentDetachedSignatureView.as_view(), + name='detachedsignature-detail' + ), + url( + regex=r'^documents/(?P[0-9]+)/versions/(?P[0-9]+)/signatures/embedded/$', + view=APIDocumentEmbeddedSignatureListView.as_view(), + name='document-version-signature-embedded-list' + ), + url( + regex=r'^documents/(?P[0-9]+)/versions/(?P[0-9]+)/signatures/embedded/(?P[0-9]+)/$', + view=APIDocumentEmbeddedSignatureView.as_view(), + name='embeddedsignature-detail' + ), +] diff --git a/mayan/apps/document_signatures/views.py b/mayan/apps/document_signatures/views.py index c5a1d8535e..6e890104a4 100644 --- a/mayan/apps/document_signatures/views.py +++ b/mayan/apps/document_signatures/views.py @@ -3,7 +3,6 @@ from __future__ import absolute_import, unicode_literals import logging from django.contrib import messages -from django.core.files import File from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404 from django.template import RequestContext @@ -16,10 +15,10 @@ from mayan.apps.common.generics import ( ConfirmView, FormView, SingleObjectCreateView, SingleObjectDeleteView, SingleObjectDetailView, SingleObjectDownloadView, SingleObjectListView ) +from mayan.apps.common.mixins import ExternalObjectMixin from mayan.apps.django_gpg.exceptions import NeedPassphrase, PassphraseError from mayan.apps.django_gpg.permissions import permission_key_sign from mayan.apps.documents.models import DocumentVersion -from mayan.apps.storage.utils import TemporaryFile from .forms import ( DocumentVersionSignatureCreateForm, @@ -35,7 +34,7 @@ from .links import ( link_document_version_signature_embedded_create, link_document_version_signature_upload ) -from .models import DetachedSignature, SignatureBaseModel +from .models import DetachedSignature, EmbeddedSignature, SignatureBaseModel from .permissions import ( permission_document_version_sign_detached, permission_document_version_sign_embedded, @@ -62,11 +61,10 @@ class DocumentVersionDetachedSignatureCreateView(FormView): ) try: - with self.get_document_version().open() as file_object: - detached_signature = key.sign_file( - file_object=file_object, detached=True, - passphrase=passphrase - ) + DetachedSignature.objects.sign_document_version( + document_version=self.get_document_version(), + key=key, passphrase=passphrase, user=self.request.user + ) except NeedPassphrase: messages.error( message=_('Passphrase is needed to unlock this key.'), @@ -90,17 +88,6 @@ class DocumentVersionDetachedSignatureCreateView(FormView): ) ) else: - temporary_file_object = TemporaryFile() - temporary_file_object.write(detached_signature.data) - temporary_file_object.seek(0) - - DetachedSignature.objects.create( - document_version=self.get_document_version(), - signature_file=File(temporary_file_object) - ) - - temporary_file_object.close() - messages.success( message=_('Document version signed successfully.'), request=self.request @@ -156,10 +143,10 @@ class DocumentVersionEmbeddedSignatureCreateView(FormView): ) try: - with self.get_document_version().open() as file_object: - signature_result = key.sign_file( - binary=True, file_object=file_object, passphrase=passphrase - ) + new_version = EmbeddedSignature.objects.sign_document_version( + document_version=self.get_document_version(), + key=key, passphrase=passphrase, user=self.request.user + ) except NeedPassphrase: messages.error( message=_('Passphrase is needed to unlock this key.'), @@ -183,16 +170,6 @@ class DocumentVersionEmbeddedSignatureCreateView(FormView): ) ) else: - temporary_file_object = TemporaryFile() - temporary_file_object.write(signature_result.data) - temporary_file_object.seek(0) - - new_version = self.get_document_version().document.new_version( - file_object=temporary_file_object, _user=self.request.user - ) - - temporary_file_object.close() - messages.success( message=_('Document version signed successfully.'), request=self.request @@ -285,20 +262,11 @@ class DocumentVersionSignatureDownloadView(SingleObjectDownloadView): ) -class DocumentVersionSignatureListView(SingleObjectListView): - def dispatch(self, request, *args, **kwargs): - AccessControlList.objects.check_access( - obj=self.get_document_version(), - permissions=(permission_document_version_signature_view,), - user=request.user - ) - - return super( - DocumentVersionSignatureListView, self - ).dispatch(request, *args, **kwargs) - - def get_document_version(self): - return get_object_or_404(klass=DocumentVersion, pk=self.kwargs['pk']) +class DocumentVersionSignatureListView( + ExternalObjectMixin, SingleObjectListView +): + external_object_class = DocumentVersion + external_object_permission = permission_document_version_signature_view def get_extra_context(self): return { @@ -314,34 +282,34 @@ class DocumentVersionSignatureListView(SingleObjectListView): link_document_version_signature_detached_create.resolve( RequestContext( request=self.request, dict_={ - 'object': self.get_document_version() + 'object': self.external_object } ) ), link_document_version_signature_embedded_create.resolve( RequestContext( request=self.request, dict_={ - 'object': self.get_document_version() + 'object': self.external_object } ) ), link_document_version_signature_upload.resolve( RequestContext( request=self.request, dict_={ - 'object': self.get_document_version() + 'object': self.external_object } ) ), ], 'no_results_title': _('There are no signatures for this document.'), - 'object': self.get_document_version(), + 'object': self.external_object, 'title': _( 'Signatures for document version: %s' - ) % self.get_document_version(), + ) % self.external_object, } def get_source_queryset(self): - return self.get_document_version().signatures.all() + return self.external_object.signatures.all() class DocumentVersionSignatureUploadView(SingleObjectCreateView): diff --git a/mayan/apps/rest_api/relations.py b/mayan/apps/rest_api/relations.py new file mode 100644 index 0000000000..c1371638c4 --- /dev/null +++ b/mayan/apps/rest_api/relations.py @@ -0,0 +1,91 @@ +from __future__ import unicode_literals + +from django.apps import apps +from django.core.exceptions import ImproperlyConfigured +from django.db.models import Manager +from django.db.models.query import QuerySet + +from rest_framework import serializers +from rest_framework.relations import HyperlinkedIdentityField + +from mayan.apps.common.utils import resolve_attribute + + +class FilteredPrimaryKeyRelatedField(serializers.PrimaryKeyRelatedField): + def __init__(self, **kwargs): + self.source_model = kwargs.pop('source_model', None) + self.source_permission = kwargs.pop('source_permission', None) + self.source_queryset = kwargs.pop('source_queryset', None) + self.source_queryset_method = kwargs.pop('source_queryset_method', None) + super(FilteredPrimaryKeyRelatedField, self).__init__(**kwargs) + + def get_queryset(self): + AccessControlList = apps.get_model( + app_label='acls', model_name='AccessControlList' + ) + + if self.source_model: + queryset = self.source_model._meta.default_manager.all() + elif self.source_queryset: + queryset = self.source_queryset + if isinstance(queryset, (QuerySet, Manager)): + # Ensure queryset is re-evaluated whenever used. + queryset = queryset.all() + else: + method_name = self.source_queryset_method or 'get_{}_queryset'.format( + self.field_name + ) + try: + queryset = getattr(self.parent, method_name)() + except AttributeError: + raise ImproperlyConfigured( + 'Need to provide a source_model, a ' + 'source_queryset, a source_queryset_method, or ' + 'a method named "%s".' % method_name + ) + + assert 'request' in self.context, ( + "`%s` requires the request in the serializer" + " context. Add `context={'request': request}` when instantiating " + "the serializer." % self.__class__.__name__ + ) + + request = self.context['request'] + + if self.source_permission: + return AccessControlList.objects.restrict_queryset( + permission=self.source_permission, queryset=queryset, + user=request.user + ) + else: + return queryset + + +class MultiKwargHyperlinkedIdentityField(HyperlinkedIdentityField): + def __init__(self, *args, **kwargs): + self.view_kwargs = kwargs.pop('view_kwargs', []) + super(MultiKwargHyperlinkedIdentityField, self).__init__(*args, **kwargs) + + def get_url(self, obj, view_name, request, format): + """ + Extends HyperlinkedRelatedField to allow passing more than one view + keyword argument. + ---- + Given an object, return the URL that hyperlinks to the object. + + May raise a `NoReverseMatch` if the `view_name` and `lookup_field` + attributes are not configured to correctly match the URL conf. + """ + # Unsaved objects will not yet have a valid URL. + if hasattr(obj, 'pk') and obj.pk in (None, ''): + return None + + kwargs = {} + for entry in self.view_kwargs: + kwargs[entry['lookup_url_kwarg']] = resolve_attribute( + obj=obj, attribute=entry['lookup_field'] + ) + + return self.reverse( + viewname=view_name, kwargs=kwargs, request=request, format=format + )