Add SignatureVerification class to return verification results. Add support for specifing against which key to verify a signature. Add support to preload all keys before verifing a signature. All test for specific key verificatio and all key preloading.
This commit is contained in:
@@ -73,19 +73,16 @@ ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|||||||
SourceColumn(source=KeyStub, label=_('Key ID'), attribute='key_id')
|
SourceColumn(source=KeyStub, label=_('Key ID'), attribute='key_id')
|
||||||
SourceColumn(source=KeyStub, label=_('Type'), attribute='key_type')
|
SourceColumn(source=KeyStub, label=_('Type'), attribute='key_type')
|
||||||
SourceColumn(
|
SourceColumn(
|
||||||
source=KeyStub, label=_('Creation date'),
|
source=KeyStub, label=_('Creation date'), attribute='date'
|
||||||
func=lambda context: datetime.fromtimestamp(
|
|
||||||
int(context['object'].date)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
SourceColumn(
|
SourceColumn(
|
||||||
source=KeyStub, label=_('Expiration date'),
|
source=KeyStub, label=_('Expiration date'),
|
||||||
func=lambda context: datetime.fromtimestamp(int(context['object'].expires)) if context['object'].expires else _('No expiration')
|
func=lambda context: context['object'].expires or _('No expiration')
|
||||||
)
|
)
|
||||||
SourceColumn(source=KeyStub, label=_('Length'), attribute='length')
|
SourceColumn(source=KeyStub, label=_('Length'), attribute='length')
|
||||||
SourceColumn(
|
SourceColumn(
|
||||||
source=KeyStub, label=_('User ID'),
|
source=KeyStub, label=_('User ID'),
|
||||||
func=lambda context: ', '.join(context['object'].uids)
|
func=lambda context: ', '.join(context['object'].user_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
menu_object.bind_links(links=(link_key_detail,), sources=(Key,))
|
menu_object.bind_links(links=(link_key_detail,), sources=(Key,))
|
||||||
|
|||||||
@@ -1,15 +1,38 @@
|
|||||||
from __future__ import absolute_import, unicode_literals
|
from __future__ import absolute_import, unicode_literals
|
||||||
|
|
||||||
|
from datetime import date
|
||||||
|
|
||||||
|
|
||||||
class KeyStub(object):
|
class KeyStub(object):
|
||||||
def __init__(self, raw):
|
def __init__(self, raw):
|
||||||
self.fingerprint = raw['keyid']
|
self.fingerprint = raw['keyid']
|
||||||
self.key_type = raw['type']
|
self.key_type = raw['type']
|
||||||
self.date = raw['date']
|
self.date = date.fromtimestamp(int(raw['date']))
|
||||||
self.expires = raw['expires']
|
if raw['expires']:
|
||||||
|
self.expires = date.fromtimestamp(int(raw['expires']))
|
||||||
|
else:
|
||||||
|
self.expires = None
|
||||||
self.length = raw['length']
|
self.length = raw['length']
|
||||||
self.uids = raw['uids']
|
self.user_id = raw['uids']
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def key_id(self):
|
def key_id(self):
|
||||||
return self.fingerprint[-8:]
|
return self.fingerprint[-8:]
|
||||||
|
|
||||||
|
|
||||||
|
class SignatureVerification(object):
|
||||||
|
def __init__(self, raw):
|
||||||
|
self.user_id = raw['username']
|
||||||
|
self.status = raw['status']
|
||||||
|
self.pubkey_fingerprint = raw['pubkey_fingerprint']
|
||||||
|
self.date = date.fromtimestamp(int(raw['sig_timestamp']))
|
||||||
|
if raw['expire_timestamp']:
|
||||||
|
self.expires = date.fromtimestamp(int(raw['expire_timestamp']))
|
||||||
|
else:
|
||||||
|
self.expires = None
|
||||||
|
self.trust_text = raw['trust_text']
|
||||||
|
self.valid = raw['valid']
|
||||||
|
self.stderr = raw['stderr']
|
||||||
|
self.fingerprint = raw['fingerprint']
|
||||||
|
self.signature_id = raw['signature_id']
|
||||||
|
self.trust_level = raw['trust_level']
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ import gnupg
|
|||||||
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
|
||||||
from .classes import KeyStub
|
from .classes import KeyStub, SignatureVerification
|
||||||
from .exceptions import KeyDoesNotExist, KeyFetchingError
|
from .exceptions import KeyDoesNotExist, KeyFetchingError
|
||||||
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
|
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
|
||||||
from .settings import setting_gpg_path, setting_keyserver
|
from .settings import setting_gpg_path, setting_keyserver
|
||||||
@@ -63,20 +63,36 @@ class KeyManager(models.Manager):
|
|||||||
def private_keys(self):
|
def private_keys(self):
|
||||||
return self.filter(key_type=KEY_TYPE_SECRET)
|
return self.filter(key_type=KEY_TYPE_SECRET)
|
||||||
|
|
||||||
def verify_file(self, file_object, signature_file=None):
|
def verify_file(self, file_object, signature_file=None, key_fingerprint=None, all_keys=False):
|
||||||
temporary_directory = tempfile.mkdtemp()
|
temporary_directory = tempfile.mkdtemp()
|
||||||
|
|
||||||
gpg = gnupg.GPG(
|
gpg = gnupg.GPG(
|
||||||
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
|
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Preload keys
|
||||||
|
if all_keys:
|
||||||
|
for key in Key.objects.all():
|
||||||
|
gpg.import_keys(key_data=key.key_data)
|
||||||
|
elif key_fingerprint:
|
||||||
|
try:
|
||||||
|
key = self.get(fingerprint=key_fingerprint)
|
||||||
|
except self.model.DoesNotExist:
|
||||||
|
shutil.rmtree(temporary_directory)
|
||||||
|
raise KeyDoesNotExist('Specified key for verification not found in keyring')
|
||||||
|
else:
|
||||||
|
gpg.import_keys(key_data=key.key_data)
|
||||||
|
|
||||||
verify_result = gpg.verify_file(file=file_object)
|
verify_result = gpg.verify_file(file=file_object)
|
||||||
|
|
||||||
if 'no public key' in verify_result.status:
|
logger.debug('verify_result.status: %s', verify_result.status)
|
||||||
|
|
||||||
|
if 'no public key' in verify_result.status and not key_fingerprint and not all_keys:
|
||||||
# File is signed but we need the key for full verification
|
# File is signed but we need the key for full verification
|
||||||
try:
|
try:
|
||||||
key = self.get(fingerprint__endswith=verify_result.key_id)
|
key = self.get(fingerprint__endswith=verify_result.key_id)
|
||||||
except self.model.DoesNotExist:
|
except self.model.DoesNotExist:
|
||||||
|
shutil.rmtree(temporary_directory)
|
||||||
raise KeyDoesNotExist('Signature key is not found in keyring')
|
raise KeyDoesNotExist('Signature key is not found in keyring')
|
||||||
else:
|
else:
|
||||||
gpg.import_keys(key_data=key.key_data)
|
gpg.import_keys(key_data=key.key_data)
|
||||||
@@ -85,4 +101,4 @@ class KeyManager(models.Manager):
|
|||||||
|
|
||||||
shutil.rmtree(temporary_directory)
|
shutil.rmtree(temporary_directory)
|
||||||
|
|
||||||
return verify_result
|
return SignatureVerification(verify_result.__dict__)
|
||||||
|
|||||||
@@ -46,3 +46,19 @@ class KeyTestCase(TestCase):
|
|||||||
|
|
||||||
self.assertTrue(result)
|
self.assertTrue(result)
|
||||||
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
|
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
|
||||||
|
|
||||||
|
def test_embedded_verification_with_correct_fingerprint(self):
|
||||||
|
Key.objects.create(key_data=TEST_KEY_DATA)
|
||||||
|
|
||||||
|
with open(TEST_SIGNED_FILE) as signed_file:
|
||||||
|
result = Key.objects.verify_file(signed_file, key_fingerprint=TEST_KEY_FINGERPRINT)
|
||||||
|
|
||||||
|
self.assertTrue(result)
|
||||||
|
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
|
||||||
|
|
||||||
|
def test_embedded_verification_with_incorrect_fingerprint(self):
|
||||||
|
Key.objects.create(key_data=TEST_KEY_DATA)
|
||||||
|
|
||||||
|
with open(TEST_SIGNED_FILE) as signed_file:
|
||||||
|
with self.assertRaises(KeyDoesNotExist):
|
||||||
|
Key.objects.verify_file(signed_file, key_fingerprint='999')
|
||||||
|
|||||||
Reference in New Issue
Block a user