Instead of inserting the path of the apps into the Python app, the apps are now referenced by their full import path. This solves name clashes with external or native Python libraries. Example: Mayan statistics app vs. Python new statistics library. Every app reference is now prepended with 'mayan.apps'. Existing config.yml files need to be updated manually. Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
138 lines
4.8 KiB
Python
138 lines
4.8 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import io
|
|
import logging
|
|
import os
|
|
|
|
from django.db import models
|
|
|
|
from mayan.apps.common.utils import mkstemp
|
|
|
|
from .classes import KeyStub, SignatureVerification
|
|
from .exceptions import (
|
|
DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError
|
|
)
|
|
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
|
|
from .runtime import gpg_backend
|
|
from .settings import setting_keyserver
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KeyManager(models.Manager):
|
|
def _preload_keys(self, all_keys=False, key_fingerprint=None, key_id=None):
|
|
# Preload keys
|
|
if all_keys:
|
|
logger.debug('preloading all keys')
|
|
keys = self.values()
|
|
elif key_fingerprint:
|
|
logger.debug('preloading key fingerprint: %s', key_fingerprint)
|
|
keys = self.filter(fingerprint=key_fingerprint).values()
|
|
if not keys:
|
|
logger.debug('key fingerprint %s not found', key_fingerprint)
|
|
raise KeyDoesNotExist(
|
|
'Specified key for verification not found'
|
|
)
|
|
elif key_id:
|
|
logger.debug('preloading key id: %s', key_id)
|
|
keys = self.filter(fingerprint__endswith=key_id).values()
|
|
if keys:
|
|
logger.debug('key id %s impored', key_id)
|
|
else:
|
|
logger.debug('key id %s not found', key_id)
|
|
else:
|
|
keys = ()
|
|
|
|
return keys
|
|
|
|
def decrypt_file(self, file_object, all_keys=False, key_fingerprint=None, key_id=None):
|
|
keys = self._preload_keys(
|
|
all_keys=all_keys, key_fingerprint=key_fingerprint, key_id=key_id
|
|
)
|
|
|
|
decrypt_result = gpg_backend.decrypt_file(
|
|
file_object=file_object, keys=keys
|
|
)
|
|
|
|
logger.debug('decrypt_result.status: %s', decrypt_result.status)
|
|
|
|
if not decrypt_result.status or decrypt_result.status == 'no data was provided':
|
|
raise DecryptionError('Unable to decrypt file')
|
|
|
|
file_object.close()
|
|
|
|
return io.BytesIO(decrypt_result.data)
|
|
|
|
def private_keys(self):
|
|
return self.filter(key_type=KEY_TYPE_SECRET)
|
|
|
|
def public_keys(self):
|
|
return self.filter(key_type=KEY_TYPE_PUBLIC)
|
|
|
|
def receive_key(self, key_id):
|
|
key_data = gpg_backend.recv_keys(
|
|
keyserver=setting_keyserver.value, key_id=key_id
|
|
)
|
|
|
|
if not key_data:
|
|
raise KeyFetchingError('No key found')
|
|
else:
|
|
return self.create(key_data=key_data)
|
|
|
|
def search(self, query):
|
|
key_data_list = gpg_backend.search_keys(
|
|
keyserver=setting_keyserver.value, query=query
|
|
)
|
|
|
|
result = []
|
|
for key_data in key_data_list:
|
|
result.append(KeyStub(raw=key_data))
|
|
|
|
return result
|
|
|
|
def verify_file(self, file_object, signature_file=None, all_keys=False, key_fingerprint=None, key_id=None):
|
|
keys = self._preload_keys(
|
|
all_keys=all_keys, key_fingerprint=key_fingerprint, key_id=key_id
|
|
)
|
|
|
|
if signature_file:
|
|
# Save the original data and invert the argument order
|
|
# Signature first, file second
|
|
temporary_file_object, temporary_filename = mkstemp()
|
|
os.write(temporary_file_object, file_object.read())
|
|
os.close(temporary_file_object)
|
|
|
|
signature_file_buffer = io.BytesIO()
|
|
signature_file_buffer.write(signature_file.read())
|
|
signature_file_buffer.seek(0)
|
|
signature_file.seek(0)
|
|
verify_result = gpg_backend.verify_file(
|
|
file_object=signature_file_buffer,
|
|
data_filename=temporary_filename, keys=keys
|
|
)
|
|
signature_file_buffer.close()
|
|
os.unlink(temporary_filename)
|
|
else:
|
|
verify_result = gpg_backend.verify_file(
|
|
file_object=file_object, keys=keys
|
|
)
|
|
|
|
logger.debug('verify_result.status: %s', verify_result.status)
|
|
|
|
if verify_result:
|
|
# Signed and key present
|
|
logger.debug('signed and key present')
|
|
return SignatureVerification(verify_result.__dict__)
|
|
elif verify_result.status == 'no public key' and not (key_fingerprint or all_keys or key_id):
|
|
# Signed but key not present, retry with key fetch
|
|
logger.debug('no public key')
|
|
file_object.seek(0)
|
|
return self.verify_file(file_object=file_object, signature_file=signature_file, key_id=verify_result.key_id)
|
|
elif verify_result.key_id:
|
|
# Signed, retried and key still not found
|
|
logger.debug('signed, retried and key still not found')
|
|
return SignatureVerification(verify_result.__dict__)
|
|
else:
|
|
logger.debug('file not signed')
|
|
raise VerificationError('File not signed')
|