Add support for verification of detached signatures.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
@@ -10,7 +11,9 @@ import gnupg
|
||||
from django.db import models
|
||||
|
||||
from .classes import KeyStub, SignatureVerification
|
||||
from .exceptions import DecryptionError, KeyDoesNotExist, KeyFetchingError
|
||||
from .exceptions import (
|
||||
DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError
|
||||
)
|
||||
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
|
||||
from .settings import setting_gpg_path, setting_keyserver
|
||||
|
||||
@@ -31,7 +34,7 @@ class KeyManager(models.Manager):
|
||||
|
||||
shutil.rmtree(temporary_directory)
|
||||
|
||||
logger.debug('decrypt_result.__dict__: %s', decrypt_result.__dict__)
|
||||
logger.debug('decrypt_result.status: %s', decrypt_result.status)
|
||||
|
||||
if not decrypt_result.status or decrypt_result.status == 'no data was provided':
|
||||
raise DecryptionError('Unable to decrypt file')
|
||||
@@ -83,7 +86,7 @@ class KeyManager(models.Manager):
|
||||
def private_keys(self):
|
||||
return self.filter(key_type=KEY_TYPE_SECRET)
|
||||
|
||||
def verify_file(self, file_object, signature_file=None, key_fingerprint=None, all_keys=False):
|
||||
def verify_file(self, file_object, signature_file=None, key_id=None, key_fingerprint=None, all_keys=False):
|
||||
temporary_directory = tempfile.mkdtemp()
|
||||
|
||||
gpg = gnupg.GPG(
|
||||
@@ -102,23 +105,50 @@ class KeyManager(models.Manager):
|
||||
raise KeyDoesNotExist('Specified key for verification not found in keyring')
|
||||
else:
|
||||
gpg.import_keys(key_data=key.key_data)
|
||||
elif key_id:
|
||||
try:
|
||||
key = self.get(fingerprint__endswith=key_id)
|
||||
except self.model.DoesNotExist:
|
||||
shutil.rmtree(temporary_directory)
|
||||
raise KeyDoesNotExist('Specified key for verification not found in keyring')
|
||||
else:
|
||||
gpg.import_keys(key_data=key.key_data)
|
||||
|
||||
verify_result = gpg.verify_file(file=file_object)
|
||||
if signature_file:
|
||||
# Save the original data and invert the argument order
|
||||
# Signature first, file second
|
||||
temporary_file_object, temporary_filename = tempfile.mkstemp()
|
||||
os.write(temporary_file_object, file_object.read())
|
||||
os.close(temporary_file_object)
|
||||
|
||||
signature_file_buffer = io.BytesIO()
|
||||
signature_file_buffer.write(signature_file.read())
|
||||
signature_file_buffer.seek(0)
|
||||
verify_result = gpg.verify_file(
|
||||
file=signature_file_buffer, data_filename=temporary_filename
|
||||
)
|
||||
signature_file_buffer.close()
|
||||
# TODO: delete file
|
||||
else:
|
||||
verify_result = gpg.verify_file(file=file_object)
|
||||
|
||||
logger.debug('verify_result.status: %s', verify_result.status)
|
||||
|
||||
if 'no public key' in verify_result.status and not key_fingerprint and not all_keys:
|
||||
if verify_result:
|
||||
shutil.rmtree(temporary_directory)
|
||||
SignatureVerification(verify_result.__dict__)
|
||||
elif verify_result.status == 'no public key' and not (key_fingerprint or all_keys):
|
||||
# File is signed but we need the key for full verification
|
||||
try:
|
||||
key = self.get(fingerprint__endswith=verify_result.key_id)
|
||||
except self.model.DoesNotExist:
|
||||
shutil.rmtree(temporary_directory)
|
||||
raise KeyDoesNotExist('Signature key is not found in keyring')
|
||||
else:
|
||||
gpg.import_keys(key_data=key.key_data)
|
||||
file_object.seek(0)
|
||||
verify_result = gpg.verify_file(file=file_object)
|
||||
|
||||
shutil.rmtree(temporary_directory)
|
||||
|
||||
return SignatureVerification(verify_result.__dict__)
|
||||
#try:
|
||||
# key = self.get(fingerprint__endswith=verify_result.key_id)
|
||||
#except self.model.DoesNotExist:
|
||||
# shutil.rmtree(temporary_directory)
|
||||
# raise KeyDoesNotExist('Signature key is not found in keyring')
|
||||
#else:
|
||||
# gpg.import_keys(key_data=key.key_data)
|
||||
file_object.seek(0)
|
||||
return self.verify_file(file_object=file_object, signature_file=signature_file, key_id=verify_result.key_id, key_fingerprint=key_fingerprint, all_keys=all_keys)
|
||||
# verify_result = gpg.verify_file(file=file_object)
|
||||
else:
|
||||
shutil.rmtree(temporary_directory)
|
||||
raise VerificationError('File not signed')
|
||||
|
||||
@@ -4,12 +4,13 @@ import tempfile
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from ..exceptions import DecryptionError, KeyDoesNotExist
|
||||
from ..exceptions import DecryptionError, KeyDoesNotExist, VerificationError
|
||||
from ..models import Key
|
||||
|
||||
from .literals import (
|
||||
TEST_KEY_DATA, TEST_KEY_FINGERPRINT, TEST_SEARCH_FINGERPRINT,
|
||||
TEST_SEARCH_UID, TEST_SIGNED_FILE, TEST_SIGNED_FILE_CONTENT
|
||||
TEST_DETACHED_SIGNATURE, TEST_FILE, TEST_KEY_DATA, TEST_KEY_FINGERPRINT,
|
||||
TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID, TEST_SIGNED_FILE,
|
||||
TEST_SIGNED_FILE_CONTENT
|
||||
)
|
||||
|
||||
|
||||
@@ -82,3 +83,19 @@ class KeyTestCase(TestCase):
|
||||
Key.objects.decrypt_file(file_object=cleartext_file)
|
||||
|
||||
cleartext_file.close()
|
||||
|
||||
def test_detached_verification_no_key(self):
|
||||
with open(TEST_DETACHED_SIGNATURE) as signature_file:
|
||||
with open(TEST_FILE) as test_file:
|
||||
with self.assertRaises(VerificationError):
|
||||
Key.objects.verify_file(file_object=test_file, signature_file=signature_file)
|
||||
|
||||
def test_detached_verification_with_key(self):
|
||||
Key.objects.create(key_data=TEST_KEY_DATA)
|
||||
|
||||
with open(TEST_DETACHED_SIGNATURE) as signature_file:
|
||||
with open(TEST_FILE) as test_file:
|
||||
result = Key.objects.verify_file(file_object=test_file, signature_file=signature_file)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
|
||||
|
||||
@@ -32,7 +32,6 @@ def document_pre_open_hook(file_object, instance):
|
||||
else:
|
||||
file_object.close()
|
||||
return io.BytesIO(result)
|
||||
#return result
|
||||
else:
|
||||
return file_object
|
||||
|
||||
|
||||
@@ -7,10 +7,9 @@ from django.conf import settings
|
||||
from django.core.files.base import File
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from django_gpg.models import Key
|
||||
from documents.models import DocumentType
|
||||
from documents.tests import TEST_DOCUMENT_PATH, TEST_DOCUMENT_TYPE
|
||||
from django_gpg.literals import SIGNATURE_STATE_VALID
|
||||
from django_gpg.runtime import gpg
|
||||
|
||||
from ..models import DocumentVersionSignature
|
||||
|
||||
@@ -39,7 +38,7 @@ class DocumentTestCase(TestCase):
|
||||
)
|
||||
|
||||
with open(TEST_KEY_FILE) as file_object:
|
||||
gpg.import_key(file_object.read())
|
||||
Key.objects.create(key_data=file_object.read())
|
||||
|
||||
def tearDown(self):
|
||||
self.document_type.delete()
|
||||
@@ -60,7 +59,7 @@ class DocumentTestCase(TestCase):
|
||||
# Artifical delay since MySQL doesn't store microsecond data in
|
||||
# timestamps. Version timestamp is used to determine which version
|
||||
# is the latest.
|
||||
time.sleep(1)
|
||||
time.sleep(2)
|
||||
|
||||
self.assertEqual(
|
||||
DocumentVersionSignature.objects.has_detached_signature(
|
||||
|
||||
Reference in New Issue
Block a user