Add support for verifying and unverifying signatures after a key is added or deleted.

This commit is contained in:
Roberto Rosario
2016-03-26 04:23:16 -04:00
parent 1f0dedc9aa
commit 5de63c4477
14 changed files with 325 additions and 148 deletions

View File

@@ -1,7 +1,5 @@
from __future__ import absolute_import, unicode_literals
from datetime import datetime
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission

View File

@@ -1,6 +1,6 @@
__all__ = (
'GPGException', 'GPGVerificationError', 'GPGSigningError',
'GPGDecryptionError', 'KeyDeleteError', 'KeyGenerationError',
'GPGException', 'VerificationError', 'SigningError',
'DecryptionError', 'KeyDeleteError', 'KeyGenerationError',
'KeyFetchingError', 'KeyDoesNotExist', 'KeyImportError'
)
@@ -13,7 +13,7 @@ class VerificationError(GPGException):
pass
class GPGSigningError(GPGException):
class SigningError(GPGException):
pass

View File

@@ -33,7 +33,7 @@ class KeyManager(models.Manager):
# Preload keys
if all_keys:
logger.debug('preloading all keys')
for key in Key.objects.all():
for key in self.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint)
@@ -129,7 +129,7 @@ class KeyManager(models.Manager):
# Preload keys
if all_keys:
logger.debug('preloading all keys')
for key in Key.objects.all():
for key in self.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint)

View File

@@ -28,9 +28,9 @@ class KeyDeleteView(SingleObjectDeleteView):
def get_post_action_redirect(self):
if self.get_object().key_type == KEY_TYPE_PUBLIC:
post_action_redirect = reverse_lazy('django_gpg:key_public_list')
return reverse_lazy('django_gpg:key_public_list')
else:
post_action_redirect = reverse_lazy('django_gpg:key_private_list')
return reverse_lazy('django_gpg:key_private_list')
def get_extra_context(self):
return {

View File

@@ -21,4 +21,3 @@ class EmbeddedSignatureAdmin(admin.ModelAdmin):
'public_key_fingerprint'
)
list_display_links = ('document_version',)

View File

@@ -2,7 +2,10 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save, post_delete
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission
@@ -10,8 +13,10 @@ from common import (
MayanAppConfig, menu_facet, menu_object, menu_secondary, menu_sidebar
)
from common.widgets import two_state_template
from mayan.celery import app
from navigation import SourceColumn
from .handlers import unverify_signatures, verify_signatures
from .links import (
link_document_version_signature_delete,
link_document_version_signature_details,
@@ -49,6 +54,10 @@ class DocumentSignaturesApp(MayanAppConfig):
app_label='documents', model_name='DocumentVersion'
)
Key = apps.get_model(
app_label='django_gpg', model_name='Key'
)
DetachedSignature = self.get_model('DetachedSignature')
EmbeddedSignature = self.get_model('EmbeddedSignature')
@@ -56,7 +65,7 @@ class DocumentSignaturesApp(MayanAppConfig):
SignatureBaseModel = self.get_model('SignatureBaseModel')
DocumentVersion.register_post_save_hook(
order=1, func=EmbeddedSignature.objects.check_signature
order=1, func=EmbeddedSignature.objects.create
)
DocumentVersion.register_pre_open_hook(
order=1, func=EmbeddedSignature.objects.open_signed
@@ -83,7 +92,7 @@ class DocumentSignaturesApp(MayanAppConfig):
func=lambda context: context['object'].signature_id or _('None')
)
SourceColumn(
source=SignatureBaseModel, label=_('Public key ID'),
source=SignatureBaseModel, label=_('Public key fingerprint'),
func=lambda context: context['object'].public_key_fingerprint or _('None')
)
SourceColumn(
@@ -103,6 +112,23 @@ class DocumentSignaturesApp(MayanAppConfig):
)
)
app.conf.CELERY_QUEUES.append(
Queue(
'signatures', Exchange('signatures'), routing_key='signatures'
),
)
app.conf.CELERY_ROUTES.update(
{
'document_signatures.tasks.task_verify_signatures': {
'queue': 'signatures'
},
'document_signatures.tasks.task_unverify_signatures': {
'queue': 'signatures'
},
}
)
menu_object.bind_links(
links=(link_document_version_signature_list,),
sources=(DocumentVersion,)
@@ -120,3 +146,13 @@ class DocumentSignaturesApp(MayanAppConfig):
link_document_version_signature_verify,
), sources=(DocumentVersion,)
)
post_delete.connect(
unverify_signatures,
dispatch_uid='unverify_signatures',
sender=Key
)
post_save.connect(
verify_signatures,
dispatch_uid='verify_signatures',
sender=Key
)

View File

@@ -0,0 +1,15 @@
from __future__ import unicode_literals
from .tasks import task_unverify_signatures, task_verify_signatures
def unverify_signatures(sender, **kwargs):
task_unverify_signatures.apply_async(
kwargs=dict(key_id=kwargs['instance'].key_id)
)
def verify_signatures(sender, **kwargs):
task_verify_signatures.apply_async(
kwargs=dict(key_pk=kwargs['instance'].pk)
)

View File

@@ -10,45 +10,7 @@ from django_gpg.models import Key
logger = logging.getLogger(__name__)
class DetachedSignatureManager(models.Manager):
def upload_signature(self, document_version, signature_file):
with document_version.open() as file_object:
try:
verify_result = Key.objects.verify_file(
file_object=file_object, signature_file=signature_file
)
except VerificationError:
# Not signed
pass
else:
instance = self.create(
document_version=document_version,
date=verify_result.date,
key_id=verify_result.key_id,
signature_id=verify_result.signature_id,
public_key_fingerprint=verify_result.pubkey_fingerprint,
)
class EmbeddedSignatureManager(models.Manager):
def check_signature(self, document_version):
logger.debug('checking for embedded signature')
with document_version.open() as file_object:
try:
verify_result = Key.objects.verify_file(file_object=file_object)
except VerificationError:
# Not signed
pass
else:
instance = self.create(
document_version=document_version,
date=verify_result.date,
key_id=verify_result.key_id,
signature_id=verify_result.signature_id,
public_key_fingerprint=verify_result.pubkey_fingerprint,
)
def open_signed(self, file_object, document_version):
for signature in self.filter(document_version=document_version):
try:
@@ -62,26 +24,3 @@ class EmbeddedSignatureManager(models.Manager):
return file_object
else:
return file_object
"""
def verify_signature(self, document_version):
document_version_descriptor = document_version.open(raw=True)
detached_signature = None
if self.has_detached_signature(document_version=document_version):
logger.debug('has detached signature')
detached_signature = self.detached_signature(
document_version=document_version
)
args = (document_version_descriptor, detached_signature)
else:
args = (document_version_descriptor,)
try:
return Key.objects.verify_file(*args)
except VerificationError:
return None
finally:
document_version_descriptor.close()
if detached_signature:
detached_signature.close()
"""

View File

@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('document_signatures', '0005_auto_20160325_0748'),
]
operations = [
migrations.AlterField(
model_name='signaturebasemodel',
name='public_key_fingerprint',
field=models.CharField(verbose_name='Public key fingerprint', max_length=40, null=True, editable=False, blank=True),
),
]

View File

@@ -15,7 +15,7 @@ from django_gpg.exceptions import DecryptionError, VerificationError
from django_gpg.models import Key
from documents.models import DocumentVersion
from .managers import EmbeddedSignatureManager, DetachedSignatureManager
from .managers import EmbeddedSignatureManager
from .runtime import storage_backend
logger = logging.getLogger(__name__)
@@ -42,7 +42,7 @@ class SignatureBaseModel(models.Model):
verbose_name=_('Signature ID')
)
public_key_fingerprint = models.CharField(
blank=True, editable=False, null=True, max_length=40, unique=True,
blank=True, editable=False, null=True, max_length=40,
verbose_name=_('Public key fingerprint')
)
@@ -77,6 +77,30 @@ class EmbeddedSignature(SignatureBaseModel):
verbose_name = _('Document version embedded signature')
verbose_name_plural = _('Document version embedded signatures')
def save(self, *args, **kwargs):
logger.debug('checking for embedded signature')
if self.pk:
raw = True
else:
raw = False
with self.document_version.open(raw=raw) as file_object:
try:
verify_result = Key.objects.verify_file(file_object=file_object)
except VerificationError as exception:
# Not signed
logger.debug(
'embedded signature verification error; %s', exception
)
else:
self.date = verify_result.date
self.key_id = verify_result.key_id
self.signature_id = verify_result.signature_id
self.public_key_fingerprint = verify_result.pubkey_fingerprint
super(EmbeddedSignature, self).save(*args, **kwargs)
class DetachedSignature(SignatureBaseModel):
signature_file = models.FileField(
@@ -84,8 +108,6 @@ class DetachedSignature(SignatureBaseModel):
verbose_name=_('Signature file')
)
objects = DetachedSignatureManager()
class Meta:
verbose_name = _('Document version detached signature')
verbose_name_plural = _('Document version detached signatures')
@@ -93,3 +115,24 @@ class DetachedSignature(SignatureBaseModel):
def delete(self, *args, **kwargs):
self.signature_file.storage.delete(self.signature_file.name)
super(DetachedSignature, self).delete(*args, **kwargs)
def save(self, *args, **kwargs):
with self.document_version.open() as file_object:
try:
verify_result = Key.objects.verify_file(
file_object=file_object, signature_file=self.signature_file
)
except VerificationError:
# Not signed
logger.debug(
'detached signature verification error; %s', exception
)
else:
self.signature_file.seek(0)
self.date = verify_result.date
self.key_id = verify_result.key_id
self.signature_id = verify_result.signature_id
self.public_key_fingerprint = verify_result.pubkey_fingerprint
return super(DetachedSignature, self).save(*args, **kwargs)

View File

@@ -0,0 +1,50 @@
from __future__ import unicode_literals
import logging
from django.apps import apps
from mayan.celery import app
RETRY_DELAY = 10
logger = logging.getLogger(__name__)
@app.task(bind=True, ignore_result=True)
def task_unverify_signatures(self, key_id):
DetachedSignature = apps.get_model(
app_label='document_signatures', model_name='DetachedSignature'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
for signature in DetachedSignature.objects.filter(key_id__endswith=key_id).filter(signature_id__isnull=False):
signature.save()
for signature in EmbeddedSignature.objects.filter(key_id__endswith=key_id).filter(signature_id__isnull=False):
signature.save()
@app.task(bind=True, ignore_result=True)
def task_verify_signatures(self, key_pk):
Key = apps.get_model(
app_label='django_gpg', model_name='Key'
)
DetachedSignature = apps.get_model(
app_label='document_signatures', model_name='DetachedSignature'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
key = Key.objects.get(pk=key_pk)
for signature in DetachedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True):
signature.save()
for signature in EmbeddedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True):
signature.save()

View File

@@ -4,6 +4,7 @@ import os
import time
from django.conf import settings
from django.core.files import File
from django.test import TestCase, override_settings
from django_gpg.models import Key
@@ -23,10 +24,11 @@ TEST_KEY_FILE = os.path.join(
'key0x5F3F7F75D210724D.asc'
)
TEST_KEY_ID = '5F3F7F75D210724D'
TEST_SIGNATURE_ID = 'XVkoGKw35yU1iq11dZPiv7uAY7k'
@override_settings(OCR_AUTO_OCR=False)
class DocumentTestCase(TestCase):
class DocumentSignaturesTestCase(TestCase):
def setUp(self):
self.document_type = DocumentType.objects.create(
label=TEST_DOCUMENT_TYPE
@@ -35,7 +37,7 @@ class DocumentTestCase(TestCase):
def tearDown(self):
self.document_type.delete()
def test_embedded_signature(self):
def test_embedded_signature_no_key(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_document = self.document_type.new_document(
file_object=file_object
@@ -49,6 +51,55 @@ class DocumentTestCase(TestCase):
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_post_key_verify(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, None)
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
def test_embedded_signature_post_no_key_verify(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
key.delete()
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_with_key(self):
with open(TEST_KEY_FILE) as file_object:
@@ -69,27 +120,106 @@ class DocumentTestCase(TestCase):
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
def test_detached_signature(self):
def test_detached_signature_no_key(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.upload_signature(
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=file_object
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
self.assertEqual(
DetachedSignature.objects.first().document_version,
document.latest_version
)
self.assertEqual(DetachedSignature.objects.first().key_id, TEST_KEY_ID)
# TODO: test_verify_signature_after_new_key(self):
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, None)
def test_detached_signature_with_key(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
def test_detached_signature_post_key_verify(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, None)
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
signature = DetachedSignature.objects.first()
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
def test_detached_signature_post_no_key_verify(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
key.delete()
signature = DetachedSignature.objects.first()
self.assertEqual(signature.public_key_fingerprint, None)
def test_document_no_signature(self):
with open(TEST_DOCUMENT_PATH) as file_object:

View File

@@ -103,62 +103,6 @@ class DocumentVersionSignatureListView(SingleObjectListView):
return queryset
def document_verify(request, document_pk):
document = get_object_or_404(Document, pk=document_pk)
try:
Permission.check_permissions(
request.user, (permission_document_verify,)
)
except PermissionDenied:
AccessControlList.objects.check_access(
permission_document_verify, request.user, document
)
document.add_as_recent_document_for_user(request.user)
try:
signature = DocumentVersionSignature.objects.verify_signature(
document.latest_version
)
except AttributeError:
signature_state = SIGNATURE_STATES.get(SIGNATURE_STATE_NONE)
signature = None
else:
signature_state = SIGNATURE_STATES.get(
getattr(signature, 'status', None)
)
paragraphs = [_('Signature status: %s') % signature_state['text']]
try:
if DocumentVersionSignature.objects.has_embedded_signature(document.latest_version):
signature_type = _('Embedded')
else:
signature_type = _('Detached')
except ValueError:
signature_type = _('None')
if signature:
paragraphs.extend(
[
_('Signature ID: %s') % signature.signature_id,
_('Signature type: %s') % signature_type,
_('Key fingerprint: %s') % signature.fingerprint,
_('Timestamp: %s') % signature.date,
_('Signee: %s') % escape(signature.user_id),
]
)
return render_to_response('appearance/generic_template.html', {
'document': document,
'object': document,
'paragraphs': paragraphs,
'title': _('Signature properties for document: %s') % document,
}, context_instance=RequestContext(request))
def document_version_signature_upload(request, pk):
document_version = get_object_or_404(DocumentVersion, pk=pk)
@@ -181,7 +125,7 @@ def document_version_signature_upload(request, pk):
form = DetachedSignatureForm(request.POST, request.FILES)
if form.is_valid():
try:
DetachedSignature.objects.upload_signature(
DetachedSignature.objects.create(
document_version=document_version,
signature_file=request.FILES['file']
)

View File

@@ -401,7 +401,9 @@ class DocumentVersion(models.Model):
super(DocumentVersion, self).save(*args, **kwargs)
for key in sorted(DocumentVersion._post_save_hooks):
DocumentVersion._post_save_hooks[key](self)
DocumentVersion._post_save_hooks[key](
document_version=self
)
if new_document_version:
# Only do this for new documents
@@ -499,7 +501,9 @@ class DocumentVersion(models.Model):
else:
result = self.file.storage.open(self.file.name)
for key in sorted(DocumentVersion._pre_open_hooks):
result = DocumentVersion._pre_open_hooks[key](result, self)
result = DocumentVersion._pre_open_hooks[key](
file_object=result, document_version=self
)
return result