Cleanup document version argument handling by making sure it is by document version and not by document instance
This commit is contained in:
@@ -30,8 +30,8 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def document_pre_open_hook(descriptor, instance):
|
||||
if DocumentVersionSignature.objects.has_embedded_signature(instance.document):
|
||||
# If it has an embedded signature decrypt
|
||||
if DocumentVersionSignature.objects.has_embedded_signature(document_version=instance):
|
||||
# If it has an embedded signature, decrypt
|
||||
try:
|
||||
result = gpg.decrypt_file(descriptor, close_descriptor=False)
|
||||
# gpg return a string, turn it into a file like object
|
||||
@@ -46,11 +46,14 @@ def document_pre_open_hook(descriptor, instance):
|
||||
return descriptor
|
||||
|
||||
|
||||
def document_post_save_hook(instance):
|
||||
if not instance.pk:
|
||||
document_signature, created = DocumentVersionSignature.objects.get_or_create(
|
||||
document_version=instance.latest_version,
|
||||
)
|
||||
def document_version_post_save_hook(instance):
|
||||
logger.debug('instance: %s', instance)
|
||||
|
||||
try:
|
||||
document_signature = DocumentVersionSignature.objects.get(document_version=instance)
|
||||
except DocumentVersionSignature.DoesNotExist:
|
||||
document_signature = DocumentVersionSignature.objects.create(document_version=instance)
|
||||
document_signature.check_for_embedded_signature()
|
||||
|
||||
|
||||
class DocumentSignaturesApp(apps.AppConfig):
|
||||
@@ -58,7 +61,7 @@ class DocumentSignaturesApp(apps.AppConfig):
|
||||
verbose_name = _('Document signatures')
|
||||
|
||||
def ready(self):
|
||||
DocumentVersion.register_post_save_hook(1, document_post_save_hook)
|
||||
DocumentVersion.register_post_save_hook(1, document_version_post_save_hook)
|
||||
DocumentVersion.register_pre_open_hook(1, document_pre_open_hook)
|
||||
|
||||
class_permissions(Document, [
|
||||
|
||||
@@ -11,18 +11,20 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DocumentVersionSignatureManager(models.Manager):
|
||||
def get_document_signature(self, document):
|
||||
def get_document_signature(self, document_version):
|
||||
document_signature, created = self.model.objects.get_or_create(
|
||||
document_version=document.latest_version,
|
||||
document_version=document_version,
|
||||
)
|
||||
|
||||
return document_signature
|
||||
|
||||
def add_detached_signature(self, document, detached_signature):
|
||||
document_signature = self.get_document_signature(document)
|
||||
def add_detached_signature(self, document_version, detached_signature):
|
||||
document_signature = self.get_document_signature(
|
||||
document_version=document_version
|
||||
)
|
||||
|
||||
if document_signature.has_embedded_signature:
|
||||
raise Exception('document already has an embedded signature')
|
||||
raise Exception('Document version already has an embedded signature')
|
||||
else:
|
||||
if document_signature.signature_file:
|
||||
logger.debug('Existing detached signature')
|
||||
@@ -33,9 +35,9 @@ class DocumentVersionSignatureManager(models.Manager):
|
||||
document_signature.signature_file = detached_signature
|
||||
document_signature.save()
|
||||
|
||||
def has_detached_signature(self, document):
|
||||
def has_detached_signature(self, document_version):
|
||||
try:
|
||||
document_signature = self.get_document_signature(document)
|
||||
document_signature = self.get_document_signature(document_version=document_version)
|
||||
except ValueError:
|
||||
return False
|
||||
else:
|
||||
@@ -44,42 +46,42 @@ class DocumentVersionSignatureManager(models.Manager):
|
||||
else:
|
||||
return False
|
||||
|
||||
def has_embedded_signature(self, document):
|
||||
logger.debug('document: %s', document)
|
||||
def has_embedded_signature(self, document_version):
|
||||
logger.debug('document_version: %s', document_version)
|
||||
|
||||
try:
|
||||
document_signature = self.get_document_signature(document)
|
||||
document_signature = self.get_document_signature(document_version=document_version)
|
||||
except ValueError:
|
||||
return False
|
||||
else:
|
||||
return document_signature.has_embedded_signature
|
||||
|
||||
def detached_signature(self, document):
|
||||
document_signature = self.get_document_signature(document)
|
||||
def detached_signature(self, document_version):
|
||||
document_signature = self.get_document_signature(document_version=document_version)
|
||||
|
||||
return document_signature.signature_file.storage.open(document_signature.signature_file.path)
|
||||
|
||||
def verify_signature(self, document):
|
||||
document_descriptor = document.open(raw=True)
|
||||
def verify_signature(self, document_version):
|
||||
document_version_descriptor = document_version.open(raw=True)
|
||||
detached_signature = None
|
||||
if self.has_detached_signature(document):
|
||||
if self.has_detached_signature(document_version=document_version):
|
||||
logger.debug('has detached signature')
|
||||
detached_signature = self.detached_signature(document)
|
||||
args = (document_descriptor, detached_signature)
|
||||
detached_signature = self.detached_signature(document_version=document_version)
|
||||
args = (document_version_descriptor, detached_signature)
|
||||
else:
|
||||
args = (document_descriptor,)
|
||||
args = (document_version_descriptor,)
|
||||
|
||||
try:
|
||||
return gpg.verify_file(*args, fetch_key=False)
|
||||
except GPGVerificationError:
|
||||
return None
|
||||
finally:
|
||||
document_descriptor.close()
|
||||
document_version_descriptor.close()
|
||||
if detached_signature:
|
||||
detached_signature.close()
|
||||
|
||||
def clear_detached_signature(self, document):
|
||||
document_signature = self.get_document_signature(document)
|
||||
def clear_detached_signature(self, document_version):
|
||||
document_signature = self.get_document_signature(document_version=document_version)
|
||||
if not document_signature.signature_file:
|
||||
raise Exception('document doesn\'t have a detached signature')
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ class DocumentTestCase(TestCase):
|
||||
gpg.import_key(file_object.read())
|
||||
|
||||
def test_document_no_signature(self):
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document), False)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document.latest_version), False)
|
||||
|
||||
def test_new_document_version_signed(self):
|
||||
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
|
||||
@@ -46,8 +46,8 @@ class DocumentTestCase(TestCase):
|
||||
|
||||
self.document.new_version(file_object=File(file_object, name='mayan_11_1.pdf.gpg'), **new_version_data)
|
||||
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document), False)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document).status, SIGNATURE_STATE_VALID)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document.latest_version), False)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document.latest_version).status, SIGNATURE_STATE_VALID)
|
||||
|
||||
def test_detached_signatures(self):
|
||||
new_version_data = {
|
||||
@@ -57,13 +57,13 @@ class DocumentTestCase(TestCase):
|
||||
self.document.new_version(file_object=File(file_object), **new_version_data)
|
||||
|
||||
# GPGVerificationError
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document), None)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document.latest_version), None)
|
||||
|
||||
with open(TEST_SIGNATURE_FILE_PATH, 'rb') as file_object:
|
||||
DocumentVersionSignature.objects.add_detached_signature(self.document, File(file_object))
|
||||
DocumentVersionSignature.objects.add_detached_signature(self.document.latest_version, File(file_object))
|
||||
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document), True)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document).status, SIGNATURE_STATE_VALID)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.has_detached_signature(self.document.latest_version), True)
|
||||
self.failUnlessEqual(DocumentVersionSignature.objects.verify_signature(self.document.latest_version).status, SIGNATURE_STATE_VALID)
|
||||
|
||||
def tearDown(self):
|
||||
self.document.delete()
|
||||
|
||||
@@ -40,7 +40,7 @@ def document_verify(request, document_pk):
|
||||
document.add_as_recent_document_for_user(request.user)
|
||||
|
||||
try:
|
||||
signature = DocumentVersionSignature.objects.verify_signature(document)
|
||||
signature = DocumentVersionSignature.objects.verify_signature(document.latest_version)
|
||||
except AttributeError:
|
||||
signature_state = SIGNATURE_STATES.get(SIGNATURE_STATE_NONE)
|
||||
signature = None
|
||||
@@ -50,7 +50,7 @@ def document_verify(request, document_pk):
|
||||
paragraphs = [_('Signature status: %s') % signature_state['text']]
|
||||
|
||||
try:
|
||||
if DocumentVersionSignature.objects.has_embedded_signature(document):
|
||||
if DocumentVersionSignature.objects.has_embedded_signature(document.latest_version):
|
||||
signature_type = _('Embedded')
|
||||
else:
|
||||
signature_type = _('Detached')
|
||||
@@ -94,7 +94,7 @@ def document_signature_upload(request, document_pk):
|
||||
form = DetachedSignatureForm(request.POST, request.FILES)
|
||||
if form.is_valid():
|
||||
try:
|
||||
DocumentVersionSignature.objects.add_detached_signature(document, request.FILES['file'])
|
||||
DocumentVersionSignature.objects.add_detached_signature(document.latest_version, request.FILES['file'])
|
||||
messages.success(request, _('Detached signature uploaded successfully.'))
|
||||
return HttpResponseRedirect(next)
|
||||
except Exception as exception:
|
||||
@@ -121,8 +121,8 @@ def document_signature_download(request, document_pk):
|
||||
AccessEntry.objects.check_access(PERMISSION_SIGNATURE_DOWNLOAD, request.user, document)
|
||||
|
||||
try:
|
||||
if DocumentVersionSignature.objects.has_detached_signature(document):
|
||||
signature = DocumentVersionSignature.objects.detached_signature(document)
|
||||
if DocumentVersionSignature.objects.has_detached_signature(document.latest_version):
|
||||
signature = DocumentVersionSignature.objects.detached_signature(document.latest_version)
|
||||
return serve_file(
|
||||
request,
|
||||
signature,
|
||||
@@ -152,7 +152,7 @@ def document_signature_delete(request, document_pk):
|
||||
|
||||
if request.method == 'POST':
|
||||
try:
|
||||
DocumentVersionSignature.objects.clear_detached_signature(document)
|
||||
DocumentVersionSignature.objects.clear_detached_signature(document.latest_version)
|
||||
messages.success(request, _('Detached signature deleted successfully.'))
|
||||
return HttpResponseRedirect(next)
|
||||
except Exception as exception:
|
||||
|
||||
Reference in New Issue
Block a user