Add view, task and post upgrade signal handler to verify all documents for embedded signatures.

This commit is contained in:
Roberto Rosario
2016-03-29 20:13:58 -04:00
parent d83a80c65b
commit 7da6cf1863
10 changed files with 287 additions and 27 deletions

View File

@@ -10,13 +10,18 @@ from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission from acls import ModelPermission
from common import ( from common import (
MayanAppConfig, menu_object, menu_sidebar MayanAppConfig, menu_object, menu_sidebar, menu_tools
) )
from common.signals import post_upgrade
from mayan.celery import app from mayan.celery import app
from navigation import SourceColumn from navigation import SourceColumn
from .handlers import unverify_signatures, verify_signatures from .handlers import (
unverify_key_signatures, verify_key_signatures,
verify_missing_embedded_signature
)
from .links import ( from .links import (
link_all_document_version_signature_verify,
link_document_version_signature_delete, link_document_version_signature_delete,
link_document_version_signature_details, link_document_version_signature_details,
link_document_version_signature_download, link_document_version_signature_download,
@@ -101,12 +106,18 @@ class DocumentSignaturesApp(MayanAppConfig):
app.conf.CELERY_ROUTES.update( app.conf.CELERY_ROUTES.update(
{ {
'document_signatures.tasks.task_verify_signatures': { 'document_signatures.tasks.task_verify_key_signatures': {
'queue': 'signatures' 'queue': 'signatures'
}, },
'document_signatures.tasks.task_unverify_signatures': { 'document_signatures.tasks.task_unverify_key_signatures': {
'queue': 'signatures' 'queue': 'signatures'
}, },
'document_signatures.tasks.task_verify_document_version': {
'queue': 'signatures'
},
'document_signatures.tasks.task_verify_missing_embedded_signature': {
'queue': 'tools'
},
} }
) )
@@ -126,13 +137,21 @@ class DocumentSignaturesApp(MayanAppConfig):
link_document_version_signature_upload, link_document_version_signature_upload,
), sources=(DocumentVersion,) ), sources=(DocumentVersion,)
) )
menu_tools.bind_links(
links=(link_all_document_version_signature_verify,)
)
post_delete.connect( post_delete.connect(
unverify_signatures, unverify_key_signatures,
dispatch_uid='unverify_signatures', dispatch_uid='unverify_key_signatures',
sender=Key sender=Key
) )
post_upgrade.connect(
verify_missing_embedded_signature,
dispatch_uid='verify_missing_embedded_signature',
)
post_save.connect( post_save.connect(
verify_signatures, verify_key_signatures,
dispatch_uid='verify_signatures', dispatch_uid='verify_key_signatures',
sender=Key sender=Key
) )

View File

@@ -1,15 +1,22 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from .tasks import task_unverify_signatures, task_verify_signatures from .tasks import (
task_unverify_key_signatures, task_verify_missing_embedded_signature,
task_verify_key_signatures
)
def unverify_signatures(sender, **kwargs): def unverify_key_signatures(sender, **kwargs):
task_unverify_signatures.apply_async( task_unverify_key_signatures.apply_async(
kwargs=dict(key_id=kwargs['instance'].key_id) kwargs=dict(key_id=kwargs['instance'].key_id)
) )
def verify_signatures(sender, **kwargs): def verify_key_signatures(sender, **kwargs):
task_verify_signatures.apply_async( task_verify_key_signatures.apply_async(
kwargs=dict(key_pk=kwargs['instance'].pk) kwargs=dict(key_pk=kwargs['instance'].pk)
) )
def verify_missing_embedded_signature(sender, **kwargs):
task_verify_missing_embedded_signature.delay()

View File

@@ -9,6 +9,7 @@ from .permissions import (
permission_document_version_signature_delete, permission_document_version_signature_delete,
permission_document_version_signature_download, permission_document_version_signature_download,
permission_document_version_signature_upload, permission_document_version_signature_upload,
permission_document_version_signature_verify,
permission_document_version_signature_view permission_document_version_signature_view
) )
@@ -23,6 +24,11 @@ def is_detached_signature(context):
).is_detached ).is_detached
link_all_document_version_signature_verify = Link(
permissions=(permission_document_version_signature_verify,),
text=_('Verify all documents'),
view='signatures:all_document_version_signature_verify',
)
link_document_version_signature_delete = Link( link_document_version_signature_delete = Link(
args='resolved_object.pk', condition=is_detached_signature, args='resolved_object.pk', condition=is_detached_signature,
permissions=(permission_document_version_signature_delete,), permissions=(permission_document_version_signature_delete,),

View File

@@ -6,6 +6,7 @@ from django.db import models
from django_gpg.exceptions import DecryptionError from django_gpg.exceptions import DecryptionError
from django_gpg.models import Key from django_gpg.models import Key
from documents.models import DocumentVersion
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -24,3 +25,8 @@ class EmbeddedSignatureManager(models.Manager):
return file_object return file_object
else: else:
return file_object return file_object
def unsigned_document_versions(self):
return DocumentVersion.objects.exclude(
pk__in=self.values('document_version')
)

View File

@@ -20,6 +20,10 @@ permission_document_version_signature_upload = namespace.add_permission(
name='document_version_signature_upload', name='document_version_signature_upload',
label=_('Upload detached document signatures') label=_('Upload detached document signatures')
) )
permission_document_version_signature_verify = namespace.add_permission(
name='document_version_signature_verify',
label=_('Verify document signatures')
)
permission_document_version_signature_view = namespace.add_permission( permission_document_version_signature_view = namespace.add_permission(
name='document_version_signature_view', name='document_version_signature_view',
label=_('View details of document signatures') label=_('View details of document signatures')

View File

@@ -11,7 +11,7 @@ logger = logging.getLogger(__name__)
@app.task(bind=True, ignore_result=True) @app.task(bind=True, ignore_result=True)
def task_unverify_signatures(self, key_id): def task_unverify_key_signatures(self, key_id):
DetachedSignature = apps.get_model( DetachedSignature = apps.get_model(
app_label='document_signatures', model_name='DetachedSignature' app_label='document_signatures', model_name='DetachedSignature'
) )
@@ -28,7 +28,7 @@ def task_unverify_signatures(self, key_id):
@app.task(bind=True, ignore_result=True) @app.task(bind=True, ignore_result=True)
def task_verify_signatures(self, key_pk): def task_verify_key_signatures(self, key_pk):
Key = apps.get_model( Key = apps.get_model(
app_label='django_gpg', model_name='Key' app_label='django_gpg', model_name='Key'
) )
@@ -48,3 +48,29 @@ def task_verify_signatures(self, key_pk):
for signature in EmbeddedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True): for signature in EmbeddedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True):
signature.save() signature.save()
@app.task(bind=True, ignore_result=True)
def task_verify_missing_embedded_signature(self):
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
for document_version in EmbeddedSignature.objects.unsigned_document_versions():
task_verify_document_version.apply_async(
kwargs=dict(document_version_pk=document_version.pk)
)
@app.task(bind=True, ignore_result=True)
def task_verify_document_version(self, document_version_pk):
DocumentVersion = apps.get_model(
app_label='documents', model_name='DocumentVersion'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
document_version = DocumentVersion.objects.get(pk=document_version_pk)
EmbeddedSignature.objects.create(document_version=document_version)

View File

@@ -6,10 +6,11 @@ from django.core.files import File
from django.test import TestCase, override_settings from django.test import TestCase, override_settings
from django_gpg.models import Key from django_gpg.models import Key
from documents.models import DocumentType from documents.models import DocumentType, DocumentVersion
from documents.tests import TEST_DOCUMENT_PATH, TEST_DOCUMENT_TYPE from documents.tests import TEST_DOCUMENT_PATH, TEST_DOCUMENT_TYPE
from ..models import DetachedSignature, EmbeddedSignature from ..models import DetachedSignature, EmbeddedSignature
from ..tasks import task_verify_missing_embedded_signature
from .literals import ( from .literals import (
TEST_SIGNED_DOCUMENT_PATH, TEST_SIGNATURE_FILE_PATH, TEST_KEY_FILE, TEST_SIGNED_DOCUMENT_PATH, TEST_SIGNATURE_FILE_PATH, TEST_KEY_FILE,
@@ -241,3 +242,69 @@ class DocumentSignaturesTestCase(TestCase):
self.assertEqual(signature.document_version, signed_version) self.assertEqual(signature.document_version, signed_version)
self.assertEqual(signature.key_id, TEST_KEY_ID) self.assertEqual(signature.key_id, TEST_KEY_ID)
@override_settings(OCR_AUTO_OCR=False)
class EmbeddedSignaturesTestCase(TestCase):
def setUp(self):
self.document_type = DocumentType.objects.create(
label=TEST_DOCUMENT_TYPE
)
def tearDown(self):
self.document_type.delete()
def test_unsigned_document_version_method(self):
TEST_UNSIGNED_DOCUMENT_COUNT = 3
TEST_SIGNED_DOCUMENT_COUNT = 3
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)
def test_task_verify_missing_embedded_signature(self):
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
TEST_UNSIGNED_DOCUMENT_COUNT = 4
TEST_SIGNED_DOCUMENT_COUNT = 2
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
task_verify_missing_embedded_signature.delay()
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)

View File

@@ -5,21 +5,29 @@ from django.core.files import File
from django_downloadview.test import assert_download_response from django_downloadview.test import assert_download_response
from django_gpg.models import Key from django_gpg.models import Key
from documents.models import Document, DocumentVersion
from documents.tests.literals import TEST_DOCUMENT_PATH from documents.tests.literals import TEST_DOCUMENT_PATH
from documents.tests.test_views import GenericDocumentViewTestCase from documents.tests.test_views import GenericDocumentViewTestCase
from user_management.tests import ( from user_management.tests import (
TEST_USER_USERNAME, TEST_USER_PASSWORD TEST_USER_USERNAME, TEST_USER_PASSWORD
) )
from ..models import DetachedSignature from ..models import DetachedSignature, EmbeddedSignature
from ..permissions import ( from ..permissions import (
permission_document_version_signature_view, permission_document_version_signature_view,
permission_document_version_signature_delete, permission_document_version_signature_delete,
permission_document_version_signature_download, permission_document_version_signature_download,
permission_document_version_signature_upload, permission_document_version_signature_upload,
permission_document_version_signature_verify,
permission_document_version_signature_view
) )
from .literals import TEST_SIGNATURE_FILE_PATH, TEST_KEY_FILE from .literals import (
TEST_SIGNATURE_FILE_PATH, TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_FILE
)
TEST_UNSIGNED_DOCUMENT_COUNT = 4
TEST_SIGNED_DOCUMENT_COUNT = 2
class SignaturesViewTestCase(GenericDocumentViewTestCase): class SignaturesViewTestCase(GenericDocumentViewTestCase):
@@ -45,7 +53,7 @@ class SignaturesViewTestCase(GenericDocumentViewTestCase):
args=(document.latest_version.pk,) args=(document.latest_version.pk,)
) )
self.assertContains(response, 'Total: 0', status_code=200) self.assertEqual(response.status_code, 403)
def test_signature_list_view_with_permission(self): def test_signature_list_view_with_permission(self):
with open(TEST_KEY_FILE) as file_object: with open(TEST_KEY_FILE) as file_object:
@@ -232,6 +240,10 @@ class SignaturesViewTestCase(GenericDocumentViewTestCase):
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD) self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.post( response = self.post(
'signatures:document_version_signature_delete', 'signatures:document_version_signature_delete',
args=(signature.pk,) args=(signature.pk,)
@@ -260,6 +272,9 @@ class SignaturesViewTestCase(GenericDocumentViewTestCase):
self.role.permissions.add( self.role.permissions.add(
permission_document_version_signature_delete.stored_permission permission_document_version_signature_delete.stored_permission
) )
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.post( response = self.post(
'signatures:document_version_signature_delete', 'signatures:document_version_signature_delete',
@@ -268,3 +283,87 @@ class SignaturesViewTestCase(GenericDocumentViewTestCase):
self.assertContains(response, 'deleted', status_code=200) self.assertContains(response, 'deleted', status_code=200)
self.assertEqual(DetachedSignature.objects.count(), 0) self.assertEqual(DetachedSignature.objects.count(), 0)
def test_missing_signature_verify_view_no_permission(self):
for document in self.document_type.documents.all():
document.delete(to_trash=False)
from documents.models import DocumentType
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.post(
'signatures:all_document_version_signature_verify', follow=True
)
self.assertEqual(response.status_code, 403)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
def test_missing_signature_verify_view_with_permission(self):
for document in self.document_type.documents.all():
document.delete(to_trash=False)
from documents.models import DocumentType
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_verify.stored_permission
)
response = self.post(
'signatures:all_document_version_signature_verify', follow=True
)
self.assertContains(response, 'queued', status_code=200)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)

View File

@@ -3,9 +3,9 @@ from __future__ import unicode_literals
from django.conf.urls import patterns, url from django.conf.urls import patterns, url
from .views import ( from .views import (
DocumentVersionSignatureDeleteView, DocumentVersionSignatureDetailView, AllDocumentSignatureVerifyView, DocumentVersionSignatureDeleteView,
DocumentVersionSignatureDownloadView, DocumentVersionSignatureListView, DocumentVersionSignatureDetailView, DocumentVersionSignatureDownloadView,
DocumentVersionSignatureUploadView DocumentVersionSignatureListView, DocumentVersionSignatureUploadView
) )
urlpatterns = patterns( urlpatterns = patterns(
@@ -35,4 +35,9 @@ urlpatterns = patterns(
DocumentVersionSignatureDeleteView.as_view(), DocumentVersionSignatureDeleteView.as_view(),
name='document_version_signature_delete' name='document_version_signature_delete'
), ),
url(
r'^tools/all/document/version/signature/verify/$',
AllDocumentSignatureVerifyView.as_view(),
name='all_document_version_signature_verify'
),
) )

View File

@@ -2,6 +2,7 @@ from __future__ import absolute_import, unicode_literals
import logging import logging
from django.contrib import messages
from django.core.exceptions import PermissionDenied from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
@@ -9,8 +10,8 @@ from django.utils.translation import ugettext_lazy as _
from acls.models import AccessControlList from acls.models import AccessControlList
from common.generics import ( from common.generics import (
SingleObjectCreateView, SingleObjectDeleteView, SingleObjectDetailView, ConfirmView, SingleObjectCreateView, SingleObjectDeleteView,
SingleObjectDownloadView, SingleObjectListView SingleObjectDetailView, SingleObjectDownloadView, SingleObjectListView
) )
from documents.models import DocumentVersion from documents.models import DocumentVersion
from permissions import Permission from permissions import Permission
@@ -18,11 +19,13 @@ from permissions import Permission
from .forms import DocumentVersionSignatureDetailForm from .forms import DocumentVersionSignatureDetailForm
from .models import DetachedSignature, SignatureBaseModel from .models import DetachedSignature, SignatureBaseModel
from .permissions import ( from .permissions import (
permission_document_version_signature_view, permission_document_version_signature_delete,
permission_document_version_signature_upload,
permission_document_version_signature_download, permission_document_version_signature_download,
permission_document_version_signature_delete permission_document_version_signature_upload,
permission_document_version_signature_verify,
permission_document_version_signature_view,
) )
from .tasks import task_verify_missing_embedded_signature
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -160,3 +163,21 @@ class DocumentVersionSignatureUploadView(SingleObjectCreateView):
'signatures:document_version_signature_list', 'signatures:document_version_signature_list',
args=(self.get_document_version().pk,) args=(self.get_document_version().pk,)
) )
class AllDocumentSignatureVerifyView(ConfirmView):
extra_context = {
'message': _(
'On large databases this operation may take some time to execute.'
), 'title': _('Verify all document for signatures?'),
}
view_permission = permission_document_version_signature_verify
def get_post_action_redirect(self):
return reverse('common:tools_list')
def view_action(self):
task_verify_missing_embedded_signature.delay()
messages.success(
self.request, _('Signature verification queued successfully.')
)