Files
mayan-edms/mayan/apps/django_gpg/managers.py

155 lines
5.1 KiB
Python

from __future__ import absolute_import, unicode_literals
import io
import logging
import os
import shutil
import tempfile
import gnupg
from django.db import models
from .classes import KeyStub, SignatureVerification
from .exceptions import (
DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError
)
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
from .settings import setting_gpg_path, setting_keyserver
logger = logging.getLogger(__name__)
class KeyManager(models.Manager):
def decrypt_file(self, file_object):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
decrypt_result = gpg.decrypt_file(file=file_object)
shutil.rmtree(temporary_directory)
logger.debug('decrypt_result.status: %s', decrypt_result.status)
if not decrypt_result.status or decrypt_result.status == 'no data was provided':
raise DecryptionError('Unable to decrypt file')
file_object.close()
return io.BytesIO(str(decrypt_result))
def receive_key(self, key_id):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
import_results = gpg.recv_keys(setting_keyserver.value, key_id)
if not import_results.count:
shutil.rmtree(temporary_directory)
raise KeyFetchingError('No key found')
else:
key_data = gpg.export_keys(import_results.fingerprints[0])
shutil.rmtree(temporary_directory)
return self.create(key_data=key_data)
def search(self, query):
temporary_directory = tempfile.mkdtemp()
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
key_data_list = gpg.search_keys(
query=query, keyserver=setting_keyserver.value
)
shutil.rmtree(temporary_directory)
result = []
for key_data in key_data_list:
result.append(KeyStub(raw=key_data))
return result
def public_keys(self):
return self.filter(key_type=KEY_TYPE_PUBLIC)
def private_keys(self):
return self.filter(key_type=KEY_TYPE_SECRET)
def verify_file(self, file_object, signature_file=None, key_id=None, key_fingerprint=None, all_keys=False):
temporary_directory = tempfile.mkdtemp()
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
# Preload keys
if all_keys:
for key in Key.objects.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
try:
key = self.get(fingerprint=key_fingerprint)
except self.model.DoesNotExist:
shutil.rmtree(temporary_directory)
raise KeyDoesNotExist('Specified key for verification not found in keyring')
else:
gpg.import_keys(key_data=key.key_data)
elif key_id:
try:
key = self.get(fingerprint__endswith=key_id)
except self.model.DoesNotExist:
pass
#shutil.rmtree(temporary_directory)
#raise KeyDoesNotExist('Specified key for verification not found in keyring')
else:
result = gpg.import_keys(key_data=key.key_data)
if signature_file:
# Save the original data and invert the argument order
# Signature first, file second
temporary_file_object, temporary_filename = tempfile.mkstemp()
os.write(temporary_file_object, file_object.read())
os.close(temporary_file_object)
signature_file_buffer = io.BytesIO()
signature_file_buffer.write(signature_file.read())
signature_file_buffer.seek(0)
signature_file.seek(0)
verify_result = gpg.verify_file(
file=signature_file_buffer, data_filename=temporary_filename
)
signature_file_buffer.close()
os.unlink(temporary_filename)
else:
verify_result = gpg.verify_file(file=file_object)
logger.debug('verify_result.status: %s', verify_result.status)
shutil.rmtree(temporary_directory)
if verify_result:
# Signed and key present
return SignatureVerification(verify_result.__dict__)
elif verify_result.status == 'no public key' and not (key_fingerprint or all_keys or key_id):
# Signed but key not present, retry with key fetch
file_object.seek(0)
return self.verify_file(file_object=file_object, signature_file=signature_file, key_id=verify_result.key_id)
elif verify_result.key_id:
# Signed, retried and key still not found
return SignatureVerification(verify_result.__dict__)
else:
raise VerificationError('File not signed')