From be392823bbba16bdbd79a9f93ed333dc1b442b32 Mon Sep 17 00:00:00 2001 From: Roberto Rosario Date: Mon, 4 Apr 2016 14:55:37 -0400 Subject: [PATCH] Remove all GPG interface module. --- mayan/apps/django_gpg/api.py | 306 ----------------------------------- 1 file changed, 306 deletions(-) delete mode 100644 mayan/apps/django_gpg/api.py diff --git a/mayan/apps/django_gpg/api.py b/mayan/apps/django_gpg/api.py deleted file mode 100644 index aadc50bc37..0000000000 --- a/mayan/apps/django_gpg/api.py +++ /dev/null @@ -1,306 +0,0 @@ -from __future__ import unicode_literals - -import logging -import os -import tempfile - -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO - -import gnupg - -from django.utils.translation import ugettext_lazy as _ - -from .exceptions import * # NOQA -from .literals import KEY_TYPES - -logger = logging.getLogger(__name__) - - -class KeyStub(object): - def __init__(self, raw): - self.key_id = raw['keyid'] - self.key_type = raw['type'] - self.date = raw['date'] - self.expires = raw['expires'] - self.length = raw['length'] - self.uids = raw['uids'] - - -class Key(object): - @staticmethod - def get_key_id(fingerprint): - return fingerprint[-16:] - - @classmethod - def get_all(cls, gpg, secret=False, exclude=None): - result = [] - keys = gpg.gpg.list_keys(secret=secret) - if exclude: - excluded_id = exclude.key_id - else: - excluded_id = '' - for key in keys: - if not key['keyid'] in excluded_id: - key_instance = Key( - fingerprint=key['fingerprint'], - uids=key['uids'], - type=key['type'], - data=gpg.gpg.export_keys([key['keyid']], secret=secret) - ) - result.append(key_instance) - - return result - - @classmethod - def get(cls, gpg, key_id, secret=False, search_keyservers=False): - if len(key_id) > 16: - # key_id is a fingerprint - key_id = Key.get_key_id(key_id) - - keys = gpg.gpg.list_keys(secret=secret) - key = next((key for key in keys if key['keyid'] == key_id), None) - if not key: - if search_keyservers and secret is False: - try: - gpg.receive_key(key_id) - return Key(gpg, key_id) - except KeyFetchingError: - raise KeyDoesNotExist - else: - raise KeyDoesNotExist - - key_instance = Key( - fingerprint=key['fingerprint'], - uids=key['uids'], - type=key['type'], - data=gpg.gpg.export_keys([key['keyid']], secret=secret) - ) - - return key_instance - - def __init__(self, fingerprint, uids, type, data): - self.fingerprint = fingerprint - self.uids = uids - self.type = type - self.data = data - - @property - def key_id(self): - return Key.get_key_id(self.fingerprint) - - @property - def user_ids(self): - return ', '.join(self.uids) - - def __str__(self): - return '%s "%s" (%s)' % ( - self.key_id, self.user_ids, KEY_TYPES.get(self.type, _('Unknown')) - ) - - def __unicode__(self): - return unicode(self.__str__()) - - def __repr__(self): - return self.__unicode__() - - -class GPG(object): - @staticmethod - def get_descriptor(file_input): - try: - # Is it a file like object? - file_input.seek(0) - except AttributeError: - # If not, try open it. - return open(file_input, 'rb') - else: - return file_input - - def __init__(self, binary_path=None, home=None, keyring=None, keyservers=None): - kwargs = {} - if binary_path: - kwargs['gpgbinary'] = binary_path - - if home: - kwargs['gnupghome'] = home - - if keyring: - kwargs['keyring'] = keyring - - self.keyservers = keyservers - - try: - self.gpg = gnupg.GPG(**kwargs) - except OSError as exception: - raise GPGException( - 'ERROR: GPG initialization error; Make sure the GPG binary is properly installed; %s' % exception - ) - except Exception as exception: - raise GPGException( - 'ERROR: GPG initialization error; %s' % exception - ) - - def verify_file(self, file_input, detached_signature=None, fetch_key=False): - """ - Verify the signature of a file. - """ - - input_descriptor = GPG.get_descriptor(file_input) - - if detached_signature: - # Save the original data and invert the argument order - # Signature first, file second - file_descriptor, filename = tempfile.mkstemp(prefix='django_gpg') - os.write(file_descriptor, input_descriptor.read()) - os.close(file_descriptor) - - detached_signature = GPG.get_descriptor(detached_signature) - signature_file = StringIO() - signature_file.write(detached_signature.read()) - signature_file.seek(0) - verify = self.gpg.verify_file( - signature_file, data_filename=filename - ) - signature_file.close() - else: - verify = self.gpg.verify_file(input_descriptor) - - logger.debug('verify.status: %s', getattr(verify, 'status', None)) - if verify: - logger.debug('verify ok') - return verify - elif getattr(verify, 'status', None) == 'no public key': - # Exception to the rule, to be able to query the keyservers - if fetch_key: - try: - self.receive_key(verify.key_id) - return self.verify_file( - input_descriptor, detached_signature, fetch_key=False - ) - except KeyFetchingError: - return verify - else: - return verify - else: - logger.debug('No verify') - raise GPGVerificationError() - - def verify(self, data): - # TODO: try to merge with verify_file - verify = self.gpg.verify(data) - - if verify: - return verify - else: - raise GPGVerificationError(verify.status) - - def sign_file(self, file_input, key=None, destination=None, key_id=None, passphrase=None, clearsign=False): - """ - Signs a filename, storing the signature and the original file - in the destination filename provided (the destination file is - overrided if it already exists), if no destination file name is - provided the signature is returned. - """ - - kwargs = {} - kwargs['clearsign'] = clearsign - - if key_id: - kwargs['keyid'] = key_id - - if key: - kwargs['keyid'] = key.key_id - - if passphrase: - kwargs['passphrase'] = passphrase - - input_descriptor = GPG.get_descriptor(file_input) - - if destination: - output_descriptor = open(destination, 'wb') - - signed_data = self.gpg.sign_file(input_descriptor, **kwargs) - if not signed_data.fingerprint: - raise GPGSigningError('Unable to sign file') - - if destination: - output_descriptor.write(signed_data.data) - - input_descriptor.close() - - if destination: - output_descriptor.close() - - if not destination: - return signed_data - - def has_embedded_signature(self, *args, **kwargs): - try: - self.decrypt_file(*args, **kwargs) - except GPGDecryptionError: - return False - else: - return True - - def decrypt_file(self, file_input, close_descriptor=True): - input_descriptor = GPG.get_descriptor(file_input) - - result = self.gpg.decrypt_file(input_descriptor) - if close_descriptor: - input_descriptor.close() - - if not result.status or result.status == 'no data was provided': - raise GPGDecryptionError('Unable to decrypt file') - - return result - - def create_key(self, *args, **kwargs): - if kwargs.get('passphrase') == '': - kwargs.pop('passphrase') - - input_data = self.gpg.gen_key_input(**kwargs) - key = self.gpg.gen_key(input_data) - if not key: - raise KeyGenerationError('Unable to generate key') - - return Key.get(self, key.fingerprint) - - def delete_key(self, key): - status = self.gpg.delete_keys( - key.fingerprint, key.type == 'sec' - ).status - if status == 'Must delete secret key first': - self.delete_key(Key.get(self, key.fingerprint, secret=True)) - self.delete_key(key) - elif status != 'ok': - raise KeyDeleteError('Unable to delete key') - - def receive_key(self, key_id): - for keyserver in self.keyservers: - import_result = self.gpg.recv_keys(keyserver, key_id) - if import_result: - return Key.get( - self, import_result.fingerprints[0], secret=False - ) - - raise KeyFetchingError - - def query(self, term): - results = {} - for keyserver in self.keyservers: - for key_data in self.gpg.search_keys(query=term, keyserver=keyserver): - results[key_data['keyid']] = KeyStub(raw=key_data) - - return results.values() - - def import_key(self, key_data): - import_result = self.gpg.import_keys(key_data) - logger.debug('import_result: %s', import_result) - - if import_result: - return Key.get(self, import_result.fingerprints[0], secret=False) - - raise KeyImportError(import_result.results)