Encapsulate python-gnupg code into its own backend class.

This commit is contained in:
Roberto Rosario
2016-10-24 18:48:26 -04:00
parent 665b814641
commit d3e6b21146
5 changed files with 186 additions and 148 deletions

View File

@@ -1,6 +1,136 @@
from __future__ import absolute_import, unicode_literals from __future__ import absolute_import, unicode_literals
from datetime import date from datetime import date
import os
import shutil
import gnupg
from common.utils import mkdtemp
class GPGBackend(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
class PythonGNUPGBackend(GPGBackend):
@staticmethod
def _import_key(gpg, **kwargs):
return gpg.import_keys(**kwargs)
@staticmethod
def _list_keys(gpg, **kwargs):
return gpg.list_keys(**kwargs)
@staticmethod
def _import_and_list_keys(gpg, **kwargs):
import_results = gpg.import_keys(**kwargs)
return import_results, gpg.list_keys(
keys=import_results.fingerprints[0]
)[0]
@staticmethod
def _sign_file(gpg, file_object, key_data, passphrase, clearsign, detached, binary, output):
import_results = gpg.import_keys(key_data=key_data)
return gpg.sign_file(
file=file_object, keyid=import_results.fingerprints[0],
passphrase=passphrase, clearsign=clearsign, detach=detached,
binary=binary, output=output
)
@staticmethod
def _decrypt_file(gpg, file_object, keys):
for key in keys:
gpg.import_keys(key_data=key['key_data'])
return gpg.decrypt_file(file=file_object)
@staticmethod
def _verify_file(gpg, file_object, keys, data_filename=None):
for key in keys:
gpg.import_keys(key_data=key['key_data'])
return gpg.verify_file(
file=file_object, data_filename=data_filename
)
@staticmethod
def _recv_keys(gpg, keyserver, key_id):
import_results = gpg.recv_keys(keyserver, key_id)
if import_results.count:
key_data = gpg.export_keys(import_results.fingerprints[0])
else:
key_data = None
return key_data
@staticmethod
def _search_keys(gpg, keyserver, query):
return gpg.search_keys(
keyserver=keyserver, query=query
)
def gpg_command(self, function, **kwargs):
temporary_directory = mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=self.kwargs['binary_path']
)
result = function(gpg=gpg, **kwargs)
shutil.rmtree(temporary_directory)
return result
def import_key(self, key_data):
return self.gpg_command(
function=PythonGNUPGBackend._import_key, key_data=key_data
)
def list_keys(self, keys):
return self.gpg_command(
function=PythonGNUPGBackend._list_keys, keys=keys
)
def import_and_list_keys(self, key_data):
return self.gpg_command(
function=PythonGNUPGBackend._import_and_list_keys,
key_data=key_data
)
def sign_file(self, file_object, key_data, passphrase, clearsign, detached, binary, output):
return self.gpg_command(
function=PythonGNUPGBackend._sign_file, file_object=file_object,
key_data=key_data, passphrase=passphrase, clearsign=clearsign,
detached=detached, binary=binary, output=output
)
def decrypt_file(self, file_object, keys):
return self.gpg_command(
function=PythonGNUPGBackend._decrypt_file, file_object=file_object,
keys=keys
)
def verify_file(self, file_object, keys, data_filename=None):
return self.gpg_command(
function=PythonGNUPGBackend._verify_file, file_object=file_object,
keys=keys, data_filename=data_filename
)
def recv_keys(self, keyserver, key_id):
return self.gpg_command(
function=PythonGNUPGBackend._recv_keys, keyserver=keyserver,
key_id=key_id
)
def search_keys(self, keyserver, query):
return self.gpg_command(
function=PythonGNUPGBackend._search_keys, keyserver=keyserver,
query=query
)
class KeyStub(object): class KeyStub(object):

View File

@@ -3,64 +3,56 @@ from __future__ import absolute_import, unicode_literals
import io import io
import logging import logging
import os import os
import shutil
import gnupg
from django.db import models from django.db import models
from common.utils import mkdtemp, mkstemp from common.utils import mkstemp
from .classes import KeyStub, SignatureVerification from .classes import KeyStub, SignatureVerification
from .exceptions import ( from .exceptions import (
DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError
) )
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
from .settings import setting_gpg_path, setting_keyserver from .runtime import gpg_backend
from .settings import setting_keyserver
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class KeyManager(models.Manager): class KeyManager(models.Manager):
def decrypt_file(self, file_object, all_keys=False, key_fingerprint=None, key_id=None): def _preload_keys(self, all_keys=False, key_fingerprint=None, key_id=None):
temporary_directory = mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
# Preload keys # Preload keys
if all_keys: if all_keys:
logger.debug('preloading all keys') logger.debug('preloading all keys')
for key in self.all(): keys = self.values()
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint: elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint) logger.debug('preloading key fingerprint: %s', key_fingerprint)
try: keys = self.filter(fingerprint=key_fingerprint).values()
key = self.get(fingerprint=key_fingerprint) if not keys:
except self.model.DoesNotExist:
logger.debug('key fingerprint %s not found', key_fingerprint) logger.debug('key fingerprint %s not found', key_fingerprint)
shutil.rmtree(temporary_directory)
raise KeyDoesNotExist( raise KeyDoesNotExist(
'Specified key for verification not found' 'Specified key for verification not found'
) )
else:
gpg.import_keys(key_data=key.key_data)
elif key_id: elif key_id:
logger.debug('preloading key id: %s', key_id) logger.debug('preloading key id: %s', key_id)
try: keys = self.filter(fingerprint__endswith=key_id).values()
key = self.get(fingerprint__endswith=key_id) if keys:
except self.model.DoesNotExist:
logger.debug('key id %s not found', key_id)
else:
gpg.import_keys(key_data=key.key_data)
logger.debug('key id %s impored', key_id) logger.debug('key id %s impored', key_id)
else:
logger.debug('key id %s not found', key_id)
else:
keys = ()
decrypt_result = gpg.decrypt_file(file=file_object) return keys
shutil.rmtree(temporary_directory) def decrypt_file(self, file_object, all_keys=False, key_fingerprint=None, key_id=None):
keys = self._preload_keys(
all_keys=all_keys, key_fingerprint=key_fingerprint, key_id=key_id
)
decrypt_result = gpg_backend.decrypt_file(
file_object=file_object, keys=keys
)
logger.debug('decrypt_result.status: %s', decrypt_result.status) logger.debug('decrypt_result.status: %s', decrypt_result.status)
@@ -72,40 +64,20 @@ class KeyManager(models.Manager):
return io.BytesIO(decrypt_result.data) return io.BytesIO(decrypt_result.data)
def receive_key(self, key_id): def receive_key(self, key_id):
temporary_directory = mkdtemp() key_data = gpg_backend.recv_keys(
keyserver=setting_keyserver.value, key_id=key_id
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
) )
import_results = gpg.recv_keys(setting_keyserver.value, key_id) if not key_data:
if not import_results.count:
shutil.rmtree(temporary_directory)
raise KeyFetchingError('No key found') raise KeyFetchingError('No key found')
else: else:
key_data = gpg.export_keys(import_results.fingerprints[0])
shutil.rmtree(temporary_directory)
return self.create(key_data=key_data) return self.create(key_data=key_data)
def search(self, query): def search(self, query):
temporary_directory = mkdtemp() key_data_list = gpg_backend.search_keys(
keyserver=setting_keyserver.value, query=query
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
) )
key_data_list = gpg.search_keys(
query=query, keyserver=setting_keyserver.value
)
shutil.rmtree(temporary_directory)
result = [] result = []
for key_data in key_data_list: for key_data in key_data_list:
result.append(KeyStub(raw=key_data)) result.append(KeyStub(raw=key_data))
@@ -119,41 +91,10 @@ class KeyManager(models.Manager):
return self.filter(key_type=KEY_TYPE_SECRET) return self.filter(key_type=KEY_TYPE_SECRET)
def verify_file(self, file_object, signature_file=None, all_keys=False, key_fingerprint=None, key_id=None): def verify_file(self, file_object, signature_file=None, all_keys=False, key_fingerprint=None, key_id=None):
temporary_directory = mkdtemp() keys = self._preload_keys(
all_keys=all_keys, key_fingerprint=key_fingerprint, key_id=key_id
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
) )
# Preload keys
if all_keys:
logger.debug('preloading all keys')
for key in self.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint)
try:
key = self.get(fingerprint=key_fingerprint)
except self.model.DoesNotExist:
logger.debug('key fingerprint %s not found', key_fingerprint)
shutil.rmtree(temporary_directory)
raise KeyDoesNotExist(
'Specified key for verification not found'
)
else:
gpg.import_keys(key_data=key.key_data)
elif key_id:
logger.debug('preloading key id: %s', key_id)
try:
key = self.get(fingerprint__endswith=key_id)
except self.model.DoesNotExist:
logger.debug('key id %s not found', key_id)
else:
gpg.import_keys(key_data=key.key_data)
logger.debug('key id %s impored', key_id)
if signature_file: if signature_file:
# Save the original data and invert the argument order # Save the original data and invert the argument order
# Signature first, file second # Signature first, file second
@@ -165,18 +106,19 @@ class KeyManager(models.Manager):
signature_file_buffer.write(signature_file.read()) signature_file_buffer.write(signature_file.read())
signature_file_buffer.seek(0) signature_file_buffer.seek(0)
signature_file.seek(0) signature_file.seek(0)
verify_result = gpg.verify_file( verify_result = gpg_backend.verify_file(
file=signature_file_buffer, data_filename=temporary_filename file_object=signature_file_buffer,
data_filename=temporary_filename, keys=keys
) )
signature_file_buffer.close() signature_file_buffer.close()
os.unlink(temporary_filename) os.unlink(temporary_filename)
else: else:
verify_result = gpg.verify_file(file=file_object) verify_result = gpg_backend.verify_file(
file_object=file_object, keys=keys
)
logger.debug('verify_result.status: %s', verify_result.status) logger.debug('verify_result.status: %s', verify_result.status)
shutil.rmtree(temporary_directory)
if verify_result: if verify_result:
# Signed and key present # Signed and key present
logger.debug('signed and key present') logger.debug('signed and key present')

View File

@@ -14,6 +14,9 @@ class Migration(migrations.Migration):
migrations.AlterField( migrations.AlterField(
model_name='key', model_name='key',
name='key_data', name='key_data',
field=models.TextField(help_text='ASCII armored version of the key.', verbose_name='Key data'), field=models.TextField(
help_text='ASCII armored version of the key.',
verbose_name='Key data'
),
), ),
] ]

View File

@@ -2,10 +2,6 @@ from __future__ import absolute_import, unicode_literals
from datetime import date from datetime import date
import logging import logging
import os
import shutil
import gnupg
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse from django.core.urlresolvers import reverse
@@ -13,8 +9,6 @@ from django.db import models
from django.utils.encoding import python_2_unicode_compatible from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from common.utils import mkdtemp
from .exceptions import NeedPassphrase, PassphraseError from .exceptions import NeedPassphrase, PassphraseError
from .literals import ( from .literals import (
ERROR_MSG_NEED_PASSPHRASE, ERROR_MSG_BAD_PASSPHRASE, ERROR_MSG_NEED_PASSPHRASE, ERROR_MSG_BAD_PASSPHRASE,
@@ -22,26 +16,11 @@ from .literals import (
OUTPUT_MESSAGE_CONTAINS_PRIVATE_KEY OUTPUT_MESSAGE_CONTAINS_PRIVATE_KEY
) )
from .managers import KeyManager from .managers import KeyManager
from .settings import setting_gpg_path from .runtime import gpg_backend
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def gpg_command(function):
temporary_directory = mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
result = function(gpg=gpg)
shutil.rmtree(temporary_directory)
return result
@python_2_unicode_compatible @python_2_unicode_compatible
class Key(models.Model): class Key(models.Model):
key_data = models.TextField( key_data = models.TextField(
@@ -78,10 +57,7 @@ class Key(models.Model):
verbose_name_plural = _('Keys') verbose_name_plural = _('Keys')
def clean(self): def clean(self):
def import_key(gpg): import_results = gpg_backend.import_key(key_data=self.key_data)
return gpg.import_keys(key_data=self.key_data)
import_results = gpg_command(function=import_key)
if not import_results.count: if not import_results.count:
raise ValidationError(_('Invalid key data')) raise ValidationError(_('Invalid key data'))
@@ -93,22 +69,11 @@ class Key(models.Model):
return reverse('django_gpg:key_detail', args=(self.pk,)) return reverse('django_gpg:key_detail', args=(self.pk,))
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
temporary_directory = mkdtemp() import_results, key_info = gpg_backend.import_and_list_keys(
key_data=self.key_data
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
) )
import_results = gpg.import_keys(key_data=self.key_data)
key_info = gpg.list_keys(keys=import_results.fingerprints[0])[0]
logger.debug('key_info: %s', key_info) logger.debug('key_info: %s', key_info)
shutil.rmtree(temporary_directory)
self.algorithm = key_info['algo'] self.algorithm = key_info['algo']
self.creation_date = date.fromtimestamp(int(key_info['date'])) self.creation_date = date.fromtimestamp(int(key_info['date']))
if key_info['expires']: if key_info['expires']:
@@ -134,24 +99,12 @@ class Key(models.Model):
# file, and appear to be due to random data being inserted in the # file, and appear to be due to random data being inserted in the
# output data stream." # output data stream."
temporary_directory = mkdtemp() file_sign_results = gpg_backend.sign_file(
file_object=file_object, key_data=self.key_data,
os.chmod(temporary_directory, 0x1C0) passphrase=passphrase, clearsign=clearsign, detached=detached,
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
import_results = gpg.import_keys(key_data=self.key_data)
file_sign_results = gpg.sign_file(
file=file_object, keyid=import_results.fingerprints[0],
passphrase=passphrase, clearsign=clearsign, detach=detached,
binary=binary, output=output binary=binary, output=output
) )
shutil.rmtree(temporary_directory)
logger.debug('file_sign_results.stderr: %s', file_sign_results.stderr) logger.debug('file_sign_results.stderr: %s', file_sign_results.stderr)
if ERROR_MSG_NEED_PASSPHRASE in file_sign_results.stderr: if ERROR_MSG_NEED_PASSPHRASE in file_sign_results.stderr:

View File

@@ -0,0 +1,10 @@
from django.utils.module_loading import import_string
from .settings import setting_gpg_path
# TODO: This will become an setting option in 2.2
SETTING_GPG_BACKEND = 'django_gpg.classes.PythonGNUPGBackend'
gpg_backend = import_string(SETTING_GPG_BACKEND)(
binary_path=setting_gpg_path.value
)