Remove all GPG interface module.

This commit is contained in:
Roberto Rosario
2016-04-04 14:55:37 -04:00
parent f042533576
commit be392823bb

View File

@@ -1,306 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import tempfile
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import gnupg
from django.utils.translation import ugettext_lazy as _
from .exceptions import * # NOQA
from .literals import KEY_TYPES
logger = logging.getLogger(__name__)
class KeyStub(object):
def __init__(self, raw):
self.key_id = raw['keyid']
self.key_type = raw['type']
self.date = raw['date']
self.expires = raw['expires']
self.length = raw['length']
self.uids = raw['uids']
class Key(object):
@staticmethod
def get_key_id(fingerprint):
return fingerprint[-16:]
@classmethod
def get_all(cls, gpg, secret=False, exclude=None):
result = []
keys = gpg.gpg.list_keys(secret=secret)
if exclude:
excluded_id = exclude.key_id
else:
excluded_id = ''
for key in keys:
if not key['keyid'] in excluded_id:
key_instance = Key(
fingerprint=key['fingerprint'],
uids=key['uids'],
type=key['type'],
data=gpg.gpg.export_keys([key['keyid']], secret=secret)
)
result.append(key_instance)
return result
@classmethod
def get(cls, gpg, key_id, secret=False, search_keyservers=False):
if len(key_id) > 16:
# key_id is a fingerprint
key_id = Key.get_key_id(key_id)
keys = gpg.gpg.list_keys(secret=secret)
key = next((key for key in keys if key['keyid'] == key_id), None)
if not key:
if search_keyservers and secret is False:
try:
gpg.receive_key(key_id)
return Key(gpg, key_id)
except KeyFetchingError:
raise KeyDoesNotExist
else:
raise KeyDoesNotExist
key_instance = Key(
fingerprint=key['fingerprint'],
uids=key['uids'],
type=key['type'],
data=gpg.gpg.export_keys([key['keyid']], secret=secret)
)
return key_instance
def __init__(self, fingerprint, uids, type, data):
self.fingerprint = fingerprint
self.uids = uids
self.type = type
self.data = data
@property
def key_id(self):
return Key.get_key_id(self.fingerprint)
@property
def user_ids(self):
return ', '.join(self.uids)
def __str__(self):
return '%s "%s" (%s)' % (
self.key_id, self.user_ids, KEY_TYPES.get(self.type, _('Unknown'))
)
def __unicode__(self):
return unicode(self.__str__())
def __repr__(self):
return self.__unicode__()
class GPG(object):
@staticmethod
def get_descriptor(file_input):
try:
# Is it a file like object?
file_input.seek(0)
except AttributeError:
# If not, try open it.
return open(file_input, 'rb')
else:
return file_input
def __init__(self, binary_path=None, home=None, keyring=None, keyservers=None):
kwargs = {}
if binary_path:
kwargs['gpgbinary'] = binary_path
if home:
kwargs['gnupghome'] = home
if keyring:
kwargs['keyring'] = keyring
self.keyservers = keyservers
try:
self.gpg = gnupg.GPG(**kwargs)
except OSError as exception:
raise GPGException(
'ERROR: GPG initialization error; Make sure the GPG binary is properly installed; %s' % exception
)
except Exception as exception:
raise GPGException(
'ERROR: GPG initialization error; %s' % exception
)
def verify_file(self, file_input, detached_signature=None, fetch_key=False):
"""
Verify the signature of a file.
"""
input_descriptor = GPG.get_descriptor(file_input)
if detached_signature:
# Save the original data and invert the argument order
# Signature first, file second
file_descriptor, filename = tempfile.mkstemp(prefix='django_gpg')
os.write(file_descriptor, input_descriptor.read())
os.close(file_descriptor)
detached_signature = GPG.get_descriptor(detached_signature)
signature_file = StringIO()
signature_file.write(detached_signature.read())
signature_file.seek(0)
verify = self.gpg.verify_file(
signature_file, data_filename=filename
)
signature_file.close()
else:
verify = self.gpg.verify_file(input_descriptor)
logger.debug('verify.status: %s', getattr(verify, 'status', None))
if verify:
logger.debug('verify ok')
return verify
elif getattr(verify, 'status', None) == 'no public key':
# Exception to the rule, to be able to query the keyservers
if fetch_key:
try:
self.receive_key(verify.key_id)
return self.verify_file(
input_descriptor, detached_signature, fetch_key=False
)
except KeyFetchingError:
return verify
else:
return verify
else:
logger.debug('No verify')
raise GPGVerificationError()
def verify(self, data):
# TODO: try to merge with verify_file
verify = self.gpg.verify(data)
if verify:
return verify
else:
raise GPGVerificationError(verify.status)
def sign_file(self, file_input, key=None, destination=None, key_id=None, passphrase=None, clearsign=False):
"""
Signs a filename, storing the signature and the original file
in the destination filename provided (the destination file is
overrided if it already exists), if no destination file name is
provided the signature is returned.
"""
kwargs = {}
kwargs['clearsign'] = clearsign
if key_id:
kwargs['keyid'] = key_id
if key:
kwargs['keyid'] = key.key_id
if passphrase:
kwargs['passphrase'] = passphrase
input_descriptor = GPG.get_descriptor(file_input)
if destination:
output_descriptor = open(destination, 'wb')
signed_data = self.gpg.sign_file(input_descriptor, **kwargs)
if not signed_data.fingerprint:
raise GPGSigningError('Unable to sign file')
if destination:
output_descriptor.write(signed_data.data)
input_descriptor.close()
if destination:
output_descriptor.close()
if not destination:
return signed_data
def has_embedded_signature(self, *args, **kwargs):
try:
self.decrypt_file(*args, **kwargs)
except GPGDecryptionError:
return False
else:
return True
def decrypt_file(self, file_input, close_descriptor=True):
input_descriptor = GPG.get_descriptor(file_input)
result = self.gpg.decrypt_file(input_descriptor)
if close_descriptor:
input_descriptor.close()
if not result.status or result.status == 'no data was provided':
raise GPGDecryptionError('Unable to decrypt file')
return result
def create_key(self, *args, **kwargs):
if kwargs.get('passphrase') == '':
kwargs.pop('passphrase')
input_data = self.gpg.gen_key_input(**kwargs)
key = self.gpg.gen_key(input_data)
if not key:
raise KeyGenerationError('Unable to generate key')
return Key.get(self, key.fingerprint)
def delete_key(self, key):
status = self.gpg.delete_keys(
key.fingerprint, key.type == 'sec'
).status
if status == 'Must delete secret key first':
self.delete_key(Key.get(self, key.fingerprint, secret=True))
self.delete_key(key)
elif status != 'ok':
raise KeyDeleteError('Unable to delete key')
def receive_key(self, key_id):
for keyserver in self.keyservers:
import_result = self.gpg.recv_keys(keyserver, key_id)
if import_result:
return Key.get(
self, import_result.fingerprints[0], secret=False
)
raise KeyFetchingError
def query(self, term):
results = {}
for keyserver in self.keyservers:
for key_data in self.gpg.search_keys(query=term, keyserver=keyserver):
results[key_data['keyid']] = KeyStub(raw=key_data)
return results.values()
def import_key(self, key_data):
import_result = self.gpg.import_keys(key_data)
logger.debug('import_result: %s', import_result)
if import_result:
return Key.get(self, import_result.fingerprints[0], secret=False)
raise KeyImportError(import_result.results)