Merge branch 'development' into feature/tornado

This commit is contained in:
Roberto Rosario
2016-04-21 17:02:08 -04:00
79 changed files with 3645 additions and 1084 deletions

View File

@@ -10,16 +10,16 @@ variables:
POSTGRES_PASSWORD: "postgres"
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
MYSQL_DATABASE: "mayan_edms"
test:mysql:
script:
- pip install -r requirements/testing.txt
- pip install -q mysql-python
- apt-get install -qq mysql-client
- mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD" -e "ALTER DATABASE $MYSQL_DATABASE CHARACTER SET utf8 COLLATE utf8_unicode_ci;"
- coverage run manage.py runtests --settings=mayan.settings.testing.gitlab-ci.db_mysql --nomigrations
- bash <(curl https://raw.githubusercontent.com/codecov/codecov-bash/master/codecov) -t $CODECOV_TOKEN
tags:
- mysql
#test:mysql:
# script:
# - pip install -r requirements/testing.txt
# - pip install -q mysql-python
# - apt-get install -qq mysql-client
# - mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD" -e "ALTER DATABASE $MYSQL_DATABASE CHARACTER SET utf8 COLLATE utf8_unicode_ci;"
# - coverage run manage.py runtests --settings=mayan.settings.testing.gitlab-ci.db_mysql --nomigrations
# - bash <(curl https://raw.githubusercontent.com/codecov/codecov-bash/master/codecov) -t $CODECOV_TOKEN
# tags:
# - mysql
test:postgres:
script:
- pip install -r requirements/testing.txt

View File

@@ -27,6 +27,10 @@
- More tests added.
- Handle unicode filenames in staging folders.
- Add staging file deletion permission.
- New document_signature_view permission.
- Add support for signing documents.
- Instead of multiple keyservers only one keyserver is now supported.
- Replace document type selection widget with an opened selection list.
2.0.2 (2016-02-09)
==================

View File

@@ -7,22 +7,125 @@ Released: April, 2016
What's new
==========
- Upgrade to use Django 1.8.11.
- Remove remaining references to Django's User model.
- Remove included login required middleware using django-stronghold instead (http://mikegrouchy.com/django-stronghold/).
- Improve generation of success and error messages for class based views.
- Remove ownership concept from folders.
- Replace strip_spaces middleware with the spaceless template tag.
- Deselect the update checkbox for optional metadata by default.
- Implement per document type document creation permission.
- Make document type delete time period optional.
- Fixed date locale handling in document properties, checkout and user detail views.
- Add HTML5 upload widget.
- Add Message of the Day app.
Upgrade to use Django 1.8.11
----------------------------
With the end of life support for Django 1.7, moving to the next Mayan EDMS
minor version was a target for this release. The Django minor release chosen was
1.8 as it is very compatible with 1.7 and required minimal changes. Django 1.8
is an LTS release (Long Term Support) meaning that is no new big feature of a
new Django version is required, the project can stay in Django 1.8 for a good
amount of time with no downsides.
Remove remaining references to Django's User model
--------------------------------------------------
The few remaining hard code references to Django's User model that were missed
in a previous release have been removed. Using a custom User model with Mayan
should present very little if any obstacles.
Remove included login required middleware
-----------------------------------------
The custom middleware include with Mayan EDMS that forces user to be
authenticated before being able to access any view has been removed in favor of
a dedicated 3rd party Django app for that purpose. The app chosen was
django-stronghold (http://mikegrouchy.com/django-stronghold/).
Improve generation of success and error messages for class based views
----------------------------------------------------------------------
In the past success messages for actions would show a generic mention to the
object being manipulated (document, folder, tag). Now the errors and success
messages with be more explcit in describing what the view has or was trying
to manipulate.
Remove ownership concept from folders
-------------------------------------
Currently Folders in Mayan EDMS have a field that stores a reference to the
user that has created that folders. One of the design decissions of Mayan EDMS
is that there should never be any explicit ownership of any object. Ownership
is relative and is defined by the Access Control List of an object. The
removal of the user field from the Folders model brings this app in line with
the defined behavior.
Replacement of strip_spaces middleware with the spaceless template tag
----------------------------------------------------------------------
As a size optimization technique HTML content was dynamically stripped of spaces
as it was being served. The techique used involved detecting the MIME type of
the content being served and if found to be of text/HTML type spaces between
tags were stripped. An edge case was found where this did not worked always.
The approached has been changed to use Django's official tag to strip spaces.
In addition to using an official approach, the removal of spaces only happens
when the template is compiled and not at each HTTP response. The optimization
is minimal but since it happened at every response a small increase in speed
is expected for all deployment scenarios.
Deselect the update checkbox for optional metadata by default
-------------------------------------------------------------
During the last releases the behavior of the of metadata edit checkbox has seen
several tune ups. Thanks to community feedback one small change has been
introduced. The edit checkbox will be deselected by default for all optional
document type metadata entries.
Implement per document type document creation permission
--------------------------------------------------------
If is now possible to grant the document creation permission to a role for a
document type. Previously document creation was a "blanket" permission. Having
the permission meant that user could create any type of document. With this
change it is now possible to restrict which types of document users of a
specific role can create.
Make document type delete time period optional
----------------------------------------------
The entries that defined after how long a document in the trash would be
permanently deleted have been made optional. This means that if a document
type has this option blank, the corresponding document of this type would never
be deleted from the trash can.
Fixed date locale handling in document properties, checkout and user detail views
---------------------------------------------------------------------------------
A few releases back the ability to for users to set their timezone was added.
This change also included a smart date rendering update to adjust the dates
and times fields to the user's timezone. Some users reported a few views where
this timezone adjustment was not happeding, this has been fully fixed.
HTML5 upload widget
-------------------
A common request is the ability to just drap and drop documents from other
windows into Mayan EDMS's document upload wizard. This release includes that
capability and will also show a completion bar for the upload. Document
uploading is sped up dramatically with this change.
Message of the Day app
----------------------
Administrators wanting to display announcements has no other way to do so
than to customize the login template. To avoid this a new app has been added
that allows for the creation of messages to be shown at the user login
screen. These messages can have an activation and an experiation date and
time. These messages are useful for display company access policies,
maintenance announcement, etc.
Document signing
----------------
The biggest change for this release if the addition of document signing from
within the UI. Enterprise users request this feature very often as in those
environments cryptographic signatures are a basic requirement. Previously
Mayan EDMS had the ability to automatically check if a document was signed and
if signed, verify the validity of the signature. However, to sign documents
user had to download the document, sign the document offline, and either
re-upload the signed document as a new version or upload a detached
signature for the existing document version. Aside from being now able to sign
documents from the web user iterface, the way keys are handled has been
rewritten from scratch to support distributed key storage. This means that
a key uploaded in one computer by one user can be used transparently by
other users in other computers to sign documents. The relevant access control
updates were added to the new document signing system. Users wanting to sign a
document need the singing permission for the document (or document type),
for the private key they intend to use, and the passphrase (if the key has one).
Finally documents are now checked just once for signatures and not every time
they are accessed, this provides a very sizable speed improvement in document
access and availability.
Other changes
=============
- Upgrade requirements.
- Upgrade Python requirements to recent versions.
- Rename 'Content' search box to 'OCR'.
- Silence all Django 1.8 model import warnings.
- Add icons to the document face menu links.
@@ -94,5 +197,4 @@ Bugs fixed or issues closed
* `GitLab issue #246 <https://gitlab.com/mayan-edms/mayan-edms/issues/246>`_ Upgrade to Django version 1.8 as Django 1.7 is end-of-life.
* `GitLab issue #255 <https://gitlab.com/mayan-edms/mayan-edms/issues/255>`_ UnicodeDecodeError in apps/common/middleware/strip_spaces_widdleware.py.
.. _PyPI: https://pypi.python.org/pypi/mayan-edms/

View File

@@ -8,6 +8,7 @@ from django.db import models
from django.db.models import Q
from django.utils.translation import ugettext
from common.utils import return_attrib
from permissions.models import StoredPermission
from .classes import ModelPermission
@@ -54,10 +55,11 @@ class AccessControlListManager(models.Manager):
permission.stored_permission for permission in permissions
]
except TypeError:
# Not a list of permissions, just one
stored_permissions = [permissions.stored_permission]
if related:
obj = getattr(obj, related)
obj = return_attrib(obj, related)
try:
parent_accessor = ModelPermission.get_inheritance(obj._meta.model)
@@ -79,8 +81,7 @@ class AccessControlListManager(models.Manager):
user_roles.append(role)
# TODO: possible .exists() optimization
if not self.filter(content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles):
if not self.filter(content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles).exists():
raise PermissionDenied(ugettext('Insufficient access.'))
def filter_by_access(self, permission, user, queryset):

View File

@@ -69,7 +69,7 @@
{{ option.render }}
</div>
{% endfor %}
{% elif field|widget_type == 'datetimeinput' %}
{% elif field|widget_type == 'datetimeinput' or field|widget_type == 'dateinput' %}
{% if read_only %}
{{ field.value }}
{% else %}

View File

@@ -45,5 +45,6 @@ link_checkout_info = Link(
icon='fa fa-shopping-cart', permissions=(
permission_document_checkin, permission_document_checkin_override,
permission_document_checkout
), text=_('Check in/out'), view='checkouts:checkout_info', args='object.pk'
), text=_('Check in/out'), view='checkouts:checkout_info',
args='resolved_object.pk'
)

View File

@@ -61,6 +61,8 @@ class MayanAppConfig(apps.AppConfig):
'App %s doesn\'t have URLs defined. Exception: %s', self.name,
exception
)
if 'No module named urls' not in unicode(exception):
raise exception
class CommonApp(MayanAppConfig):

View File

@@ -8,11 +8,15 @@ from django.utils.translation import ugettext_lazy as _
from django.views.generic import (
FormView as DjangoFormView, DetailView, TemplateView
)
from django.views.generic.base import ContextMixin
from django.views.generic.detail import SingleObjectMixin
from django.views.generic.edit import (
CreateView, DeleteView, ModelFormMixin, UpdateView
)
from django.views.generic.list import ListView
from django_downloadview import VirtualDownloadView
from django_downloadview import VirtualFile
from pure_pagination.mixins import PaginationMixin
from .forms import ChoiceForm
@@ -344,6 +348,10 @@ class SingleObjectDetailView(ViewPermissionCheckMixin, ObjectPermissionCheckMixi
return context
class SingleObjectDownloadView(ViewPermissionCheckMixin, ObjectPermissionCheckMixin, VirtualDownloadView, SingleObjectMixin):
VirtualFile = VirtualFile
class SingleObjectEditView(ObjectNameMixin, ViewPermissionCheckMixin, ObjectPermissionCheckMixin, ExtraContextMixin, RedirectionMixin, UpdateView):
template_name = 'appearance/generic_form.html'

View File

@@ -50,7 +50,9 @@ class GenericViewTestCase(TestCase):
def test_view(request):
template = Template('{{ object }}')
context = Context({'object': test_object})
context = Context(
{'object': test_object, 'resolved_object': test_object}
)
return HttpResponse(template.render(context=context))
urlpatterns.insert(0, url(TEST_VIEW_URL, test_view, name=TEST_VIEW_NAME))

View File

@@ -0,0 +1,15 @@
from __future__ import unicode_literals
from django.contrib import admin
from .models import Key
@admin.register(Key)
class KeyAdmin(admin.ModelAdmin):
list_display = (
'key_id', 'user_id', 'creation_date', 'expiration_date', 'key_type'
)
list_filter = ('key_type',)
readonly_fields = list_display + ('fingerprint', 'length', 'algorithm')
search_fields = ('key_id', 'user_id',)

View File

@@ -1,306 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import tempfile
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import gnupg
from django.utils.translation import ugettext_lazy as _
from .exceptions import * # NOQA
from .literals import KEY_TYPES
logger = logging.getLogger(__name__)
class KeyStub(object):
def __init__(self, raw):
self.key_id = raw['keyid']
self.key_type = raw['type']
self.date = raw['date']
self.expires = raw['expires']
self.length = raw['length']
self.uids = raw['uids']
class Key(object):
@staticmethod
def get_key_id(fingerprint):
return fingerprint[-16:]
@classmethod
def get_all(cls, gpg, secret=False, exclude=None):
result = []
keys = gpg.gpg.list_keys(secret=secret)
if exclude:
excluded_id = exclude.key_id
else:
excluded_id = ''
for key in keys:
if not key['keyid'] in excluded_id:
key_instance = Key(
fingerprint=key['fingerprint'],
uids=key['uids'],
type=key['type'],
data=gpg.gpg.export_keys([key['keyid']], secret=secret)
)
result.append(key_instance)
return result
@classmethod
def get(cls, gpg, key_id, secret=False, search_keyservers=False):
if len(key_id) > 16:
# key_id is a fingerprint
key_id = Key.get_key_id(key_id)
keys = gpg.gpg.list_keys(secret=secret)
key = next((key for key in keys if key['keyid'] == key_id), None)
if not key:
if search_keyservers and secret is False:
try:
gpg.receive_key(key_id)
return Key(gpg, key_id)
except KeyFetchingError:
raise KeyDoesNotExist
else:
raise KeyDoesNotExist
key_instance = Key(
fingerprint=key['fingerprint'],
uids=key['uids'],
type=key['type'],
data=gpg.gpg.export_keys([key['keyid']], secret=secret)
)
return key_instance
def __init__(self, fingerprint, uids, type, data):
self.fingerprint = fingerprint
self.uids = uids
self.type = type
self.data = data
@property
def key_id(self):
return Key.get_key_id(self.fingerprint)
@property
def user_ids(self):
return ', '.join(self.uids)
def __str__(self):
return '%s "%s" (%s)' % (
self.key_id, self.user_ids, KEY_TYPES.get(self.type, _('Unknown'))
)
def __unicode__(self):
return unicode(self.__str__())
def __repr__(self):
return self.__unicode__()
class GPG(object):
@staticmethod
def get_descriptor(file_input):
try:
# Is it a file like object?
file_input.seek(0)
except AttributeError:
# If not, try open it.
return open(file_input, 'rb')
else:
return file_input
def __init__(self, binary_path=None, home=None, keyring=None, keyservers=None):
kwargs = {}
if binary_path:
kwargs['gpgbinary'] = binary_path
if home:
kwargs['gnupghome'] = home
if keyring:
kwargs['keyring'] = keyring
self.keyservers = keyservers
try:
self.gpg = gnupg.GPG(**kwargs)
except OSError as exception:
raise GPGException(
'ERROR: GPG initialization error; Make sure the GPG binary is properly installed; %s' % exception
)
except Exception as exception:
raise GPGException(
'ERROR: GPG initialization error; %s' % exception
)
def verify_file(self, file_input, detached_signature=None, fetch_key=False):
"""
Verify the signature of a file.
"""
input_descriptor = GPG.get_descriptor(file_input)
if detached_signature:
# Save the original data and invert the argument order
# Signature first, file second
file_descriptor, filename = tempfile.mkstemp(prefix='django_gpg')
os.write(file_descriptor, input_descriptor.read())
os.close(file_descriptor)
detached_signature = GPG.get_descriptor(detached_signature)
signature_file = StringIO()
signature_file.write(detached_signature.read())
signature_file.seek(0)
verify = self.gpg.verify_file(
signature_file, data_filename=filename
)
signature_file.close()
else:
verify = self.gpg.verify_file(input_descriptor)
logger.debug('verify.status: %s', getattr(verify, 'status', None))
if verify:
logger.debug('verify ok')
return verify
elif getattr(verify, 'status', None) == 'no public key':
# Exception to the rule, to be able to query the keyservers
if fetch_key:
try:
self.receive_key(verify.key_id)
return self.verify_file(
input_descriptor, detached_signature, fetch_key=False
)
except KeyFetchingError:
return verify
else:
return verify
else:
logger.debug('No verify')
raise GPGVerificationError()
def verify(self, data):
# TODO: try to merge with verify_file
verify = self.gpg.verify(data)
if verify:
return verify
else:
raise GPGVerificationError(verify.status)
def sign_file(self, file_input, key=None, destination=None, key_id=None, passphrase=None, clearsign=False):
"""
Signs a filename, storing the signature and the original file
in the destination filename provided (the destination file is
overrided if it already exists), if no destination file name is
provided the signature is returned.
"""
kwargs = {}
kwargs['clearsign'] = clearsign
if key_id:
kwargs['keyid'] = key_id
if key:
kwargs['keyid'] = key.key_id
if passphrase:
kwargs['passphrase'] = passphrase
input_descriptor = GPG.get_descriptor(file_input)
if destination:
output_descriptor = open(destination, 'wb')
signed_data = self.gpg.sign_file(input_descriptor, **kwargs)
if not signed_data.fingerprint:
raise GPGSigningError('Unable to sign file')
if destination:
output_descriptor.write(signed_data.data)
input_descriptor.close()
if destination:
output_descriptor.close()
if not destination:
return signed_data
def has_embedded_signature(self, *args, **kwargs):
try:
self.decrypt_file(*args, **kwargs)
except GPGDecryptionError:
return False
else:
return True
def decrypt_file(self, file_input, close_descriptor=True):
input_descriptor = GPG.get_descriptor(file_input)
result = self.gpg.decrypt_file(input_descriptor)
if close_descriptor:
input_descriptor.close()
if not result.status or result.status == 'no data was provided':
raise GPGDecryptionError('Unable to decrypt file')
return result
def create_key(self, *args, **kwargs):
if kwargs.get('passphrase') == '':
kwargs.pop('passphrase')
input_data = self.gpg.gen_key_input(**kwargs)
key = self.gpg.gen_key(input_data)
if not key:
raise KeyGenerationError('Unable to generate key')
return Key.get(self, key.fingerprint)
def delete_key(self, key):
status = self.gpg.delete_keys(
key.fingerprint, key.type == 'sec'
).status
if status == 'Must delete secret key first':
self.delete_key(Key.get(self, key.fingerprint, secret=True))
self.delete_key(key)
elif status != 'ok':
raise KeyDeleteError('Unable to delete key')
def receive_key(self, key_id):
for keyserver in self.keyservers:
import_result = self.gpg.recv_keys(keyserver, key_id)
if import_result:
return Key.get(
self, import_result.fingerprints[0], secret=False
)
raise KeyFetchingError
def query(self, term):
results = {}
for keyserver in self.keyservers:
for key_data in self.gpg.search_keys(query=term, keyserver=keyserver):
results[key_data['keyid']] = KeyStub(raw=key_data)
return results.values()
def import_key(self, key_data):
import_result = self.gpg.import_keys(key_data)
logger.debug('import_result: %s', import_result)
if import_result:
return Key.get(self, import_result.fingerprints[0], secret=False)
raise KeyImportError(import_result.results)

View File

@@ -1,18 +1,26 @@
from __future__ import unicode_literals
from datetime import datetime
from __future__ import absolute_import, unicode_literals
from django.utils.translation import ugettext_lazy as _
from common import MayanAppConfig, menu_object, menu_setup, menu_sidebar
from acls import ModelPermission
from acls.links import link_acl_list
from acls.permissions import permission_acl_edit, permission_acl_view
from common import (
MayanAppConfig, menu_facet, menu_object, menu_setup, menu_sidebar
)
from common.classes import Package
from navigation import SourceColumn
from .api import Key, KeyStub
from .classes import KeyStub
from .links import (
link_key_delete, link_key_query, link_key_receive, link_key_setup,
link_key_delete, link_key_detail, link_key_download, link_key_query,
link_key_receive, link_key_setup, link_key_upload, link_private_keys,
link_public_keys
)
from .permissions import (
permission_key_delete, permission_key_download, permission_key_sign,
permission_key_view
)
class DjangoGPGApp(MayanAppConfig):
@@ -24,6 +32,16 @@ class DjangoGPGApp(MayanAppConfig):
def ready(self):
super(DjangoGPGApp, self).ready()
Key = self.get_model('Key')
ModelPermission.register(
model=Key, permissions=(
permission_acl_edit, permission_acl_view,
permission_key_delete, permission_key_download,
permission_key_sign, permission_key_view
)
)
Package(label='python-gnupg', license_text='''
Copyright (c) 2008-2014 by Vinay Sajip.
All rights reserved.
@@ -52,40 +70,45 @@ OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
''')
SourceColumn(source=Key, label=_('ID'), attribute='key_id')
SourceColumn(
source=Key, label=_('Owner'),
func=lambda context: ', '.join(context['object'].uids)
)
SourceColumn(source=Key, label=_('Key ID'), attribute='key_id')
SourceColumn(source=Key, label=_('User ID'), attribute='user_id')
SourceColumn(
source=KeyStub, label=_('ID'),
func=lambda context: '...{0}'.format(context['object'].key_id[-16:])
)
SourceColumn(source=KeyStub, label=_('Key ID'), attribute='key_id')
SourceColumn(source=KeyStub, label=_('Type'), attribute='key_type')
SourceColumn(
source=KeyStub, label=_('Creation date'),
func=lambda context: datetime.fromtimestamp(
int(context['object'].date)
)
source=KeyStub, label=_('Creation date'), attribute='date'
)
SourceColumn(
source=KeyStub, label=_('Expiration date'),
func=lambda context: datetime.fromtimestamp(int(context['object'].expires)) if context['object'].expires else _('No expiration')
func=lambda context: context['object'].expires or _('No expiration')
)
SourceColumn(source=KeyStub, label=_('Length'), attribute='length')
SourceColumn(
source=KeyStub, label=_('Identities'),
func=lambda context: ', '.join(context['object'].uids)
source=KeyStub, label=_('User ID'),
func=lambda context: ', '.join(context['object'].user_id)
)
menu_object.bind_links(links=(link_key_delete,), sources=(Key,))
menu_object.bind_links(links=(link_key_detail,), sources=(Key,))
menu_object.bind_links(links=(link_key_receive,), sources=(KeyStub,))
menu_object.bind_links(
links=(link_acl_list, link_key_delete, link_key_download,),
sources=(Key,)
)
menu_setup.bind_links(links=(link_key_setup,))
menu_sidebar.bind_links(
links=(link_public_keys, link_key_query),
menu_facet.bind_links(
links=(link_private_keys, link_public_keys),
sources=(
'django_gpg:key_delete', 'django_gpg:key_public_list',
'django_gpg:key_query', 'django_gpg:key_query_results',
'django_gpg:key_public_list', 'django_gpg:key_private_list',
'django_gpg:key_query', 'django_gpg:key_query_results', Key,
KeyStub
)
)
menu_sidebar.bind_links(
links=(link_key_query, link_key_upload),
sources=(
'django_gpg:key_public_list', 'django_gpg:key_private_list',
'django_gpg:key_query', 'django_gpg:key_query_results', Key,
KeyStub
)
)

View File

@@ -0,0 +1,39 @@
from __future__ import absolute_import, unicode_literals
from datetime import date
class KeyStub(object):
def __init__(self, raw):
self.fingerprint = raw['keyid']
self.key_type = raw['type']
self.date = date.fromtimestamp(int(raw['date']))
if raw['expires']:
self.expires = date.fromtimestamp(int(raw['expires']))
else:
self.expires = None
self.length = raw['length']
self.user_id = raw['uids']
@property
def key_id(self):
return self.fingerprint[-8:]
class SignatureVerification(object):
def __init__(self, raw):
self.user_id = raw['username']
self.status = raw['status']
self.key_id = raw['key_id']
self.pubkey_fingerprint = raw['pubkey_fingerprint']
self.date = date.fromtimestamp(int(raw['timestamp']))
if raw['expire_timestamp']:
self.expires = date.fromtimestamp(int(raw['expire_timestamp']))
else:
self.expires = None
self.trust_text = raw['trust_text']
self.valid = raw['valid']
self.stderr = raw['stderr']
self.fingerprint = raw['fingerprint']
self.signature_id = raw['signature_id']
self.trust_level = raw['trust_level']

View File

@@ -1,6 +1,6 @@
__all__ = (
'GPGException', 'GPGVerificationError', 'GPGSigningError',
'GPGDecryptionError', 'KeyDeleteError', 'KeyGenerationError',
'GPGException', 'VerificationError', 'SigningError',
'DecryptionError', 'KeyDeleteError', 'KeyGenerationError',
'KeyFetchingError', 'KeyDoesNotExist', 'KeyImportError'
)
@@ -9,15 +9,15 @@ class GPGException(Exception):
pass
class GPGVerificationError(GPGException):
class VerificationError(GPGException):
pass
class GPGSigningError(GPGException):
class SigningError(GPGException):
pass
class GPGDecryptionError(GPGException):
class DecryptionError(GPGException):
pass
@@ -30,7 +30,9 @@ class KeyGenerationError(GPGException):
class KeyFetchingError(GPGException):
pass
"""
Unable to receive key or key not found
"""
class KeyDoesNotExist(GPGException):
@@ -39,3 +41,15 @@ class KeyDoesNotExist(GPGException):
class KeyImportError(GPGException):
pass
class NeedPassphrase(GPGException):
"""
Passphrase is needed but none was provided
"""
class PassphraseError(GPGException):
"""
Passphrase provided is incorrect
"""

View File

@@ -1,8 +1,46 @@
from __future__ import unicode_literals
from django import forms
from django.utils.html import escape
from django.utils.translation import ugettext_lazy as _
from common.forms import DetailForm
from .models import Key
class KeyDetailForm(DetailForm):
def __init__(self, *args, **kwargs):
instance = kwargs['instance']
extra_fields = (
{'label': _('Key ID'), 'field': 'key_id'},
{
'label': _('User ID'),
'field': lambda x: escape(instance.user_id),
},
{
'label': _('Creation date'), 'field': 'creation_date',
'widget': forms.widgets.DateInput
},
{
'label': _('Expiration date'),
'field': lambda x: instance.expiration_date or _('None'),
'widget': forms.widgets.DateInput
},
{'label': _('Fingerprint'), 'field': 'fingerprint'},
{'label': _('Length'), 'field': 'length'},
{'label': _('Algorithm'), 'field': 'algorithm'},
{'label': _('Type'), 'field': lambda x: instance.get_key_type_display()},
)
kwargs['extra_fields'] = extra_fields
super(KeyDetailForm, self).__init__(*args, **kwargs)
class Meta:
fields = ()
model = Key
class KeySearchForm(forms.Form):
term = forms.CharField(

View File

@@ -5,23 +5,22 @@ from django.utils.translation import ugettext_lazy as _
from navigation import Link
from .permissions import (
permission_key_delete, permission_key_receive, permission_key_view,
permission_keyserver_query
permission_key_delete, permission_key_download, permission_key_receive,
permission_key_view, permission_key_upload, permission_keyserver_query
)
link_private_keys = Link(
icon='fa fa-key', permissions=(permission_key_view,),
text=_('Private keys'), view='django_gpg:key_private_list'
)
link_public_keys = Link(
icon='fa fa-key', permissions=(permission_key_view,),
text=_('Public keys'), view='django_gpg:key_public_list'
)
link_key_delete = Link(
permissions=(permission_key_delete,), tags='dangerous', text=_('Delete'),
view='django_gpg:key_delete', args=('object.fingerprint', 'object.type',)
view='django_gpg:key_delete', args=('resolved_object.pk',)
)
link_key_detail = Link(
permissions=(permission_key_view,), text=_('Details'),
view='django_gpg:key_detail', args=('resolved_object.pk',)
)
link_key_download = Link(
permissions=(permission_key_download,), text=_('Download'),
view='django_gpg:key_download', args=('resolved_object.pk',)
)
link_key_query = Link(
permissions=(permission_keyserver_query,), text=_('Query keyservers'),
view='django_gpg:key_query'
@@ -34,3 +33,15 @@ link_key_setup = Link(
icon='fa fa-key', permissions=(permission_key_view,),
text=_('Key management'), view='django_gpg:key_public_list'
)
link_key_upload = Link(
permissions=(permission_key_upload,), text=_('Upload key'),
view='django_gpg:key_upload'
)
link_private_keys = Link(
permissions=(permission_key_view,), text=_('Private keys'),
view='django_gpg:key_private_list'
)
link_public_keys = Link(
permissions=(permission_key_view,), text=_('Public keys'),
view='django_gpg:key_public_list'
)

View File

@@ -7,6 +7,14 @@ KEY_TYPES = {
'sec': _('Secret'),
}
KEY_TYPE_PUBLIC = 'pub'
KEY_TYPE_SECRET = 'sec'
KEY_TYPE_CHOICES = (
(KEY_TYPE_PUBLIC, _('Public')),
(KEY_TYPE_SECRET, _('Secret')),
)
KEY_CLASS_RSA = 'RSA'
KEY_CLASS_DSA = 'DSA'
KEY_CLASS_ELG = 'ELG-E'
@@ -53,3 +61,8 @@ SIGNATURE_STATES = {
'text': _('Document is signed with a valid signature.'),
},
}
ERROR_MSG_NEED_PASSPHRASE = 'NEED_PASSPHRASE'
ERROR_MSG_BAD_PASSPHRASE = 'BAD_PASSPHRASE'
ERROR_MSG_GOOD_PASSPHRASE = 'GOOD_PASSPHRASE'
OUTPUT_MESSAGE_CONTAINS_PRIVATE_KEY = 'Contains private key'

View File

@@ -0,0 +1,194 @@
from __future__ import absolute_import, unicode_literals
import io
import logging
import os
import shutil
import tempfile
import gnupg
from django.db import models
from .classes import KeyStub, SignatureVerification
from .exceptions import (
DecryptionError, KeyDoesNotExist, KeyFetchingError, VerificationError
)
from .literals import KEY_TYPE_PUBLIC, KEY_TYPE_SECRET
from .settings import setting_gpg_path, setting_keyserver
logger = logging.getLogger(__name__)
class KeyManager(models.Manager):
def decrypt_file(self, file_object, all_keys=False, key_fingerprint=None, key_id=None):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
# Preload keys
if all_keys:
logger.debug('preloading all keys')
for key in self.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint)
try:
key = self.get(fingerprint=key_fingerprint)
except self.model.DoesNotExist:
logger.debug('key fingerprint %s not found', key_fingerprint)
shutil.rmtree(temporary_directory)
raise KeyDoesNotExist(
'Specified key for verification not found'
)
else:
gpg.import_keys(key_data=key.key_data)
elif key_id:
logger.debug('preloading key id: %s', key_id)
try:
key = self.get(fingerprint__endswith=key_id)
except self.model.DoesNotExist:
logger.debug('key id %s not found', key_id)
else:
gpg.import_keys(key_data=key.key_data)
logger.debug('key id %s impored', key_id)
decrypt_result = gpg.decrypt_file(file=file_object)
shutil.rmtree(temporary_directory)
logger.debug('decrypt_result.status: %s', decrypt_result.status)
if not decrypt_result.status or decrypt_result.status == 'no data was provided':
raise DecryptionError('Unable to decrypt file')
file_object.close()
return io.BytesIO(decrypt_result.data)
def receive_key(self, key_id):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
import_results = gpg.recv_keys(setting_keyserver.value, key_id)
if not import_results.count:
shutil.rmtree(temporary_directory)
raise KeyFetchingError('No key found')
else:
key_data = gpg.export_keys(import_results.fingerprints[0])
shutil.rmtree(temporary_directory)
return self.create(key_data=key_data)
def search(self, query):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
key_data_list = gpg.search_keys(
query=query, keyserver=setting_keyserver.value
)
shutil.rmtree(temporary_directory)
result = []
for key_data in key_data_list:
result.append(KeyStub(raw=key_data))
return result
def public_keys(self):
return self.filter(key_type=KEY_TYPE_PUBLIC)
def private_keys(self):
return self.filter(key_type=KEY_TYPE_SECRET)
def verify_file(self, file_object, signature_file=None, all_keys=False, key_fingerprint=None, key_id=None):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
# Preload keys
if all_keys:
logger.debug('preloading all keys')
for key in self.all():
gpg.import_keys(key_data=key.key_data)
elif key_fingerprint:
logger.debug('preloading key fingerprint: %s', key_fingerprint)
try:
key = self.get(fingerprint=key_fingerprint)
except self.model.DoesNotExist:
logger.debug('key fingerprint %s not found', key_fingerprint)
shutil.rmtree(temporary_directory)
raise KeyDoesNotExist(
'Specified key for verification not found'
)
else:
gpg.import_keys(key_data=key.key_data)
elif key_id:
logger.debug('preloading key id: %s', key_id)
try:
key = self.get(fingerprint__endswith=key_id)
except self.model.DoesNotExist:
logger.debug('key id %s not found', key_id)
else:
gpg.import_keys(key_data=key.key_data)
logger.debug('key id %s impored', key_id)
if signature_file:
# Save the original data and invert the argument order
# Signature first, file second
temporary_file_object, temporary_filename = tempfile.mkstemp()
os.write(temporary_file_object, file_object.read())
os.close(temporary_file_object)
signature_file_buffer = io.BytesIO()
signature_file_buffer.write(signature_file.read())
signature_file_buffer.seek(0)
signature_file.seek(0)
verify_result = gpg.verify_file(
file=signature_file_buffer, data_filename=temporary_filename
)
signature_file_buffer.close()
os.unlink(temporary_filename)
else:
verify_result = gpg.verify_file(file=file_object)
logger.debug('verify_result.status: %s', verify_result.status)
shutil.rmtree(temporary_directory)
if verify_result:
# Signed and key present
logger.debug('signed and key present')
return SignatureVerification(verify_result.__dict__)
elif verify_result.status == 'no public key' and not (key_fingerprint or all_keys or key_id):
# Signed but key not present, retry with key fetch
logger.debug('no public key')
file_object.seek(0)
return self.verify_file(file_object=file_object, signature_file=signature_file, key_id=verify_result.key_id)
elif verify_result.key_id:
# Signed, retried and key still not found
logger.debug('signed, retried and key still not found')
return SignatureVerification(verify_result.__dict__)
else:
logger.debug('file not signed')
raise VerificationError('File not signed')

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
]
operations = [
migrations.CreateModel(
name='Key',
fields=[
(
'id', models.AutoField(
verbose_name='ID', serialize=False, auto_created=True,
primary_key=True
)
),
('data', models.TextField(verbose_name='Data')),
(
'key_id', models.CharField(
unique=True, max_length=16, verbose_name='Key ID'
)
),
(
'creation_date', models.DateField(
verbose_name='Creation date'
)
),
(
'expiration_date', models.DateField(
null=True, verbose_name='Expiration date', blank=True
)
),
(
'fingerprint', models.CharField(
unique=True, max_length=40, verbose_name='Fingerprint'
)
),
('length', models.PositiveIntegerField(verbose_name='Length')),
(
'algorithm', models.PositiveIntegerField(
verbose_name='Algorithm'
)
),
('user_id', models.TextField(verbose_name='User ID')),
(
'key_type', models.CharField(
max_length=3, verbose_name='Type'
)
),
],
options={
'verbose_name': 'Key',
'verbose_name_plural': 'Keys',
},
),
]

View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_gpg', '0001_initial'),
]
operations = [
migrations.RemoveField(
model_name='key',
name='data',
),
migrations.AddField(
model_name='key',
name='key_data',
field=models.TextField(default='', verbose_name='Key data'),
preserve_default=False,
),
]

View File

@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_gpg', '0002_auto_20160322_1756'),
]
operations = [
migrations.AlterField(
model_name='key',
name='algorithm',
field=models.PositiveIntegerField(
verbose_name='Algorithm', editable=False
),
),
migrations.AlterField(
model_name='key',
name='creation_date',
field=models.DateField(
verbose_name='Creation date', editable=False
),
),
migrations.AlterField(
model_name='key',
name='expiration_date',
field=models.DateField(
verbose_name='Expiration date', null=True, editable=False,
blank=True
),
),
migrations.AlterField(
model_name='key',
name='fingerprint',
field=models.CharField(
verbose_name='Fingerprint', unique=True, max_length=40,
editable=False
),
),
migrations.AlterField(
model_name='key',
name='key_data',
field=models.TextField(verbose_name='Key data', editable=False),
),
migrations.AlterField(
model_name='key',
name='key_id',
field=models.CharField(
verbose_name='Key ID', unique=True, max_length=16,
editable=False
),
),
migrations.AlterField(
model_name='key',
name='key_type',
field=models.CharField(
verbose_name='Type', max_length=3, editable=False
),
),
migrations.AlterField(
model_name='key',
name='length',
field=models.PositiveIntegerField(
verbose_name='Length', editable=False
),
),
migrations.AlterField(
model_name='key',
name='user_id',
field=models.TextField(verbose_name='User ID', editable=False),
),
]

View File

@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('django_gpg', '0003_auto_20160322_1810'),
]
operations = [
migrations.AlterField(
model_name='key',
name='key_data',
field=models.TextField(verbose_name='Key data'),
),
migrations.AlterField(
model_name='key',
name='key_type',
field=models.CharField(
verbose_name='Type', max_length=3, editable=False,
choices=[('pub', 'Public'), ('sec', 'Secret')]
),
),
]

View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('django_gpg', '0004_auto_20160322_2202'),
]
operations = [
migrations.RemoveField(
model_name='key',
name='key_id',
),
]

View File

@@ -0,0 +1,166 @@
from __future__ import absolute_import, unicode_literals
from datetime import date
import logging
import os
import shutil
import tempfile
import gnupg
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _
from .exceptions import NeedPassphrase, PassphraseError
from .literals import (
ERROR_MSG_NEED_PASSPHRASE, ERROR_MSG_BAD_PASSPHRASE,
ERROR_MSG_GOOD_PASSPHRASE, KEY_TYPE_CHOICES, KEY_TYPE_SECRET,
OUTPUT_MESSAGE_CONTAINS_PRIVATE_KEY
)
from .managers import KeyManager
from .settings import setting_gpg_path
logger = logging.getLogger(__name__)
def gpg_command(function):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
result = function(gpg=gpg)
shutil.rmtree(temporary_directory)
return result
@python_2_unicode_compatible
class Key(models.Model):
key_data = models.TextField(
help_text=_('ASCII armored version of the key.'),
verbose_name=_('Key data')
)
creation_date = models.DateField(
editable=False, verbose_name=_('Creation date')
)
expiration_date = models.DateField(
blank=True, editable=False, null=True,
verbose_name=_('Expiration date')
)
fingerprint = models.CharField(
editable=False, max_length=40, unique=True,
verbose_name=_('Fingerprint')
)
length = models.PositiveIntegerField(
editable=False, verbose_name=_('Length')
)
algorithm = models.PositiveIntegerField(
editable=False, verbose_name=_('Algorithm')
)
user_id = models.TextField(editable=False, verbose_name=_('User ID'))
key_type = models.CharField(
choices=KEY_TYPE_CHOICES, editable=False, max_length=3,
verbose_name=_('Type')
)
objects = KeyManager()
class Meta:
verbose_name = _('Key')
verbose_name_plural = _('Keys')
def clean(self):
def import_key(gpg):
return gpg.import_keys(key_data=self.key_data)
import_results = gpg_command(function=import_key)
if not import_results.count:
raise ValidationError(_('Invalid key data'))
if Key.objects.filter(fingerprint=import_results.fingerprints[0]).exists():
raise ValidationError(_('Key already exists.'))
def get_absolute_url(self):
return reverse('django_gpg:key_detail', args=(self.pk,))
def save(self, *args, **kwargs):
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
import_results = gpg.import_keys(key_data=self.key_data)
key_info = gpg.list_keys(keys=import_results.fingerprints[0])[0]
logger.debug('key_info: %s', key_info)
shutil.rmtree(temporary_directory)
self.algorithm = key_info['algo']
self.creation_date = date.fromtimestamp(int(key_info['date']))
if key_info['expires']:
self.expiration_date = date.fromtimestamp(int(key_info['expires']))
self.fingerprint = key_info['fingerprint']
self.length = int(key_info['length'])
self.user_id = key_info['uids'][0]
if OUTPUT_MESSAGE_CONTAINS_PRIVATE_KEY in import_results.results[0]['text']:
self.key_type = KEY_TYPE_SECRET
else:
self.key_type = key_info['type']
super(Key, self).save(*args, **kwargs)
def __str__(self):
return '{} - {}'.format(self.key_id, self.user_id)
def sign_file(self, file_object, passphrase=None, clearsign=False, detached=False, binary=False, output=None):
# WARNING: using clearsign=True and subsequent decryption corrupts the
# file. Appears to be a problem in python-gnupg or gpg itself.
# https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=55647
# "The problems differ from run to run and file to
# file, and appear to be due to random data being inserted in the
# output data stream."
temporary_directory = tempfile.mkdtemp()
os.chmod(temporary_directory, 0x1C0)
gpg = gnupg.GPG(
gnupghome=temporary_directory, gpgbinary=setting_gpg_path.value
)
import_results = gpg.import_keys(key_data=self.key_data)
file_sign_results = gpg.sign_file(
file=file_object, keyid=import_results.fingerprints[0],
passphrase=passphrase, clearsign=clearsign, detach=detached,
binary=binary, output=output
)
shutil.rmtree(temporary_directory)
logger.debug('file_sign_results.stderr: %s', file_sign_results.stderr)
if ERROR_MSG_NEED_PASSPHRASE in file_sign_results.stderr:
if ERROR_MSG_BAD_PASSPHRASE in file_sign_results.stderr:
raise PassphraseError
elif ERROR_MSG_GOOD_PASSPHRASE not in file_sign_results.stderr:
raise NeedPassphrase
return file_sign_results
@property
def key_id(self):
return self.fingerprint[-8:]

View File

@@ -6,15 +6,24 @@ from permissions import PermissionNamespace
namespace = PermissionNamespace('django_gpg', _('Key management'))
permission_key_view = namespace.add_permission(
name='key_view', label=_('View keys')
)
permission_key_delete = namespace.add_permission(
name='key_delete', label=_('Delete keys')
)
permission_keyserver_query = namespace.add_permission(
name='keyserver_query', label=_('Query keyservers')
permission_key_download = namespace.add_permission(
name='key_download', label=_('Download keys')
)
permission_key_receive = namespace.add_permission(
name='key_receive', label=_('Import keys from keyservers')
)
permission_key_sign = namespace.add_permission(
name='key_sign', label=_('Use keys to sign content')
)
permission_key_upload = namespace.add_permission(
name='key_upload', label=_('Upload keys')
)
permission_key_view = namespace.add_permission(
name='key_view', label=_('View keys')
)
permission_keyserver_query = namespace.add_permission(
name='keyserver_query', label=_('Query keyservers')
)

View File

@@ -1,7 +0,0 @@
from .api import GPG
from .settings import setting_gpg_home, setting_gpg_path, setting_keyservers
gpg = GPG(
binary_path=setting_gpg_path.value, home=setting_gpg_home.value,
keyservers=setting_keyservers.value
)

View File

@@ -8,10 +8,6 @@ from django.utils.translation import ugettext_lazy as _
from smart_settings import Namespace
namespace = Namespace(name='django_gpg', label=_('Signatures'))
setting_keyservers = namespace.add_setting(
global_name='SIGNATURES_KEYSERVERS', default=['pool.sks-keyservers.net'],
help_text=_('List of keyservers to be queried for unknown keys.')
)
setting_gpg_home = namespace.add_setting(
global_name='SIGNATURES_GPG_HOME',
default=os.path.join(settings.MEDIA_ROOT, 'gpg_home'),
@@ -24,3 +20,7 @@ setting_gpg_path = namespace.add_setting(
global_name='SIGNATURES_GPG_PATH', default='/usr/bin/gpg',
help_text=_('Path to the GPG binary.'), is_path=True
)
setting_keyserver = namespace.add_setting(
global_name='SIGNATURES_KEYSERVER', default='pool.sks-keyservers.net',
help_text=_('Keyserver used to query for keys.')
)

View File

@@ -0,0 +1 @@
test_file content

View File

@@ -0,0 +1,11 @@
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1
iQEcBAABAgAGBQJW8z8CAAoJEEEl6cVx83isoK0IAKa7TGyBBhebx/EM8/tzKwgd
mKmXFtgpSoJhqoLZiBdFVP4gJtxfaDnqSFt7uQA69V0TMqI6cIuX0x7h0lRrNLea
+FQZGsa8HwnJzsTQXIRzszY/wvvFyHfk8jYjBQi7BRwg5kZdW5fUgprYsE8j08WA
cn/VAP5xigxKJfM0ny3pL3mbJj7rdoz+bEu4z7yMg5EsRLbF4MZDU9mUo5QjAvQg
oMMIjv6fFpuvBP9xY/D03IstyqoEEfwl/+36ZQwyGNsA3ZHWXgSb5fwcpXWPdqaD
PuX5l137B2VHz9YNPK9sJdnr5ZRyY0vebzkAvCFPfysAaGWH36iTE58V3uhLNMk=
=yL7D
-----END PGP SIGNATURE-----

View File

@@ -1,6 +1,90 @@
from __future__ import unicode_literals
TEST_GPG_HOME = '/tmp/test_gpg_home'
TEST_KEY_ID = '607138F1AECC5A5CA31CB7715F3F7F75D210724D'
import os
from django.conf import settings
TEST_DETACHED_SIGNATURE = os.path.join(
settings.BASE_DIR, 'mayan', 'apps', 'django_gpg', 'tests', 'contrib',
'test_files', 'test_file.txt.asc'
)
TEST_FILE = os.path.join(
settings.BASE_DIR, 'mayan', 'apps', 'django_gpg', 'tests', 'contrib',
'test_files', 'test_file.txt'
)
TEST_KEY_DATA = '''-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v1
lQO+BFbxfC8BCACnUZoD96W4+CSIaU9G8I08kXu2zJLzy2XgUtwLx8VQ8dOHr0E/
UembHkjMeH6Gjxu2Yrrbl9/Anzd+lkP0L9BV7WqjXpHmPxuaRlsrNXuMyX0YWtjo
zvCo/mVBKEt1aEejuE6YbeZdBQraym32ew8hTXQhPwqbKPC9LTUa2tDjkJHs0DLU
5Hvg2/16IYd94ZHAH+wOa4WrR/6wU1VBfFCGBl+xbSvburLYDwhNZC9+sIu61BO8
fZh48IIQ89Hin7cS/ovHTBF2Sr3n5yRzatV2eXXmT5AQdpTEpD3HPF82HXNRrSUK
I+BIoIGXnPg3wotOyahFGrC8RluY7QhU/KBdABEBAAH+AwMCyBnD0YX+KwtgKrBg
Nxz+lWc6bWQ4CvdxW4rlLTujXBbTYQ0YUpZ44qLXhq9Yso7760LF/ZZK4I12AZ+J
PCxubmYCBKg7HIHG1/tT6ACJyoWhCaO2rNXx7zh3SnYFNjvEoCUXoEoupoZ/Hk6J
NGCdJPUZe4mTY9lVHTSnwPusyGeSu9i51J4kREb0E1sN9UgMHNoJawu5BJw0Yl97
wD0U1cP93BB9FA+3KHUZDcj0v5exSkvWO1HQKzkZAaWOPfHoGCVRRBe4fYhjgumv
cbu7p1ve4ysooOO28DD/bIgbLA9swQjJT9CgwTnudmrn+3PEY9ghPFm4pLjUMWBK
nkBsSGQ1y7rCeGNGg5lAAKQfzL7gseiS0f+lmfSXsl1VTFWI89cCwnP7rTYHjsyS
Fs1V5/HhwCUL3SVJL+p6VMtZ4VWVlZ+Hm27hD0VYnmvd/cO8h14NRF3R/If7Ut+8
nqDwwtxTUPcDLzs2gbjGt9XhpVXCvoUExxZuf/q91wTUJGQ96wjKOopyH67i22m/
Orr29VGdzaE9iLe+cicf4ZwwKLzLczTVSjk2KUpSFx5KaFMcekHaBo+h1ABYfYQd
DE+3zKnuVMgF3Z2VXdKj4meibByc0BvrILLhcZ08eqWAd+Duyo2eSZyWV+1FKbKw
qtzudRxKMtEh5h4y1vn4eRd1zEQPBG9m9CTLUeO0l60Q1/gy/VwmAsiJZkcI8KSS
9HVw672+Q3gAcblLyYJrIvKT2EyLD2rSijxgx61//s9UR9k0a9iFXB11FtQ6N3Ct
+msBMO3wFGviZ2iqWiMYiGDoIXMil6G1KtJLkDc5uDXFMc5see12vlsFmEDFScvj
Nnslh9ajbC+mfgRPZFtprtoaGFUd4VRDM7/rr7kuuCZFQ1QebEVjJjmQnfgpowa7
C7QhRXhhbXBsZSBhZG1pbiA8YWRtaW5AZXhhbXBsZS5jb20+iQE4BBMBAgAiAhsD
BgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAUCVvGFyAAKCRBBJenFcfN4rIwSB/4l
PbS0F8tGtetzPIgPYerI2OwZDbVyrTVGbrY0ZJJfWXR0vyTJ38s3dZNC22ct3+g1
t1RVxFGssSZYW0StlPyb2u+5VUI4LDWmbaDL3QbN5KTkyXtGLaWUwJ/TC4EkAKEa
8HKqRpZOfeUj4gTIm2uwpYwihyVY2M6EW6we5DUOScX1kIO/VTB+QWChFcLEjZb3
LdSSOEKI56QHV/Sxn38jp0tD7E/yBBVw+HhFamhqwrYTnxy2/W/xHBvWQk34ZnTf
o/mZyBlWz5h6JaKhHyw0akRQbBSfo3huW6+RKI3QHj82f85zv01Uzvjxvaz/N5SP
/MuHTPgG8g69Bg4Ik6jjnQO+BFbxfC8BCADQYEUpx79976Ut5ZtMj3CNpndUWHB1
l2wa/vd+Gb6Yzm+/hu3t5GG8uxzFk9TC33G7/Ugyob2V0eVXS8rqIbiqbRW7Nmb6
RF4xeEZkUlLTmzXu9vJLRCW0f0ui+YJz6Rdgn4BCRJ+/OkLIoB9axDxDL+961ftw
LqBTK3IpQc+VwjBLPTofApJGjM/pExJDskAi4IJpd8sz5Djc7MkF/tANSWVdvNOA
lTIWZkfSiY2cThmC1WgL1KfSSYcFH0Z6/8M2qzF/9+D//j7WSq2GPahqVueVIE4r
CIi3ffayXdPsiEzgkZqJxeZyt8ht74qTgZhAhmIxnobrLg1nbwOVGx0tABEBAAH+
AwMCyBnD0YX+Kwtgqas29fXB07iu+YJbSEXDsg56zrdDBToOFODrpRsqQtVofRyO
1GVDt1qE8jJF+zxnxSWawFLwR3mUs8/RKmdOm9cLnsadjCSWXWXPgb0w5mzcaVBa
tn9CtnF2G30D77LtBrkhnKtmjpW2Etudd7wkBYtSL4mqADX+8SgbFlR5jYtlFcUl
6HziXFzFSDEJ3YOE4LMm39pk+p7Kn/1GvxLleXu46uQZU3yEUxmnrHFSmolehWJk
1OR6CZ4SDmsKyFF9aNJPo+0ytU/VyOOuruaEQwp6r+zuM9sanrZJVGwlN5PRhfmr
+TrUwStsh2sdKrQ10xDxBBp7xThR3wz3+REO2c6uIEIkXhSAOARK1EQGXpAeK35x
uAUief4yMMiBKweKADT9ic36xxmc52Ov7Nrkwgj8PXma3gWiktTPhGWLZQ/YdXTW
fV+IwDShJEmTPOAAtxqPljj9isC1qPS2ylJXrHyws3jz0xIMYe8GbgK1UmURC7DI
CAXC4K6x5/3Uuz+kirbQRXVt1c8O8azy/Zc9a97qodWd7NBHTAr8xk2JlcesjHmk
rGSKsm53sGV0PTweoi4n1YiEE6yBpCEoobcAABWfojCYIe5W54PTf7nkc+Ayzd9t
7ipTELF8RKHHBU42penurBAX+U3aSe6rUfhlTuVs8KykzT/4pQeUzndNYQos6KLH
C50CHXQbeLchdvDAzO0j80j8YGciRv0U+juaZMct+NCi/SNU46RD7qs85M9rB77/
GzOyrpsfVA0lfS5Z/g25+TqxEBTypiGMSh5Exza1Nwc2tIRExoYThW22SAM2PWqg
zw+aeNyC4uJWc9Qzf9sVMC1vaUUkf7cRMl8Lh7fNkX/sBUB4X8E3IG2UpeHKiWxp
UjRRioHbL6k8qEviaSyJLIkBJQQYAQIADwUCVvF8LwIbDAUJAA0vAAAKCRBBJenF
cfN4rAkxB/9Xyvsny6iBY1aFrIr2roOyXg1rX+NjEfo+HZqUIjpESQcviIatQcGB
1MVnvABVKCQWzQyoIkOyAmTUHKb0aLDynDblIctMVOy80wEtWRHcMQo4PzGUPJn3
hZOukiotQTeawLvyeoBY1M4FJaCvPYvUNl+PEUVLi2h2VFkANrtzJMjZpmI5iR62
h4oCbUV5JHhOyB+89Y1w8haFU9LrgOER2kXff1xU6wMfLdcO5ApV/sRJcNdYL7Cg
7nJLpOu33rvGW97adFMStZxXz4k+VXLErvtkT72XZX9TjS8hmIRxHKZgpb12ZkUe
8aeg3z/W+YctdRt81bi5isgM+oML9LAQ
=JZ5G
-----END PGP PRIVATE KEY BLOCK-----'''
TEST_KEY_ID = '4125E9C571F378AC'
TEST_KEY_FINGERPRINT = '6A24574E0A35004CDDFD22704125E9C571F378AC'
TEST_KEY_PASSPHRASE = 'testpassphrase'
TEST_KEYSERVERS = ['pool.sks-keyservers.net']
TEST_UIDS = 'Roberto Rosario'
TEST_SEARCH_UID = 'Roberto Rosario'
TEST_SEARCH_FINGERPRINT = '607138F1AECC5A5CA31CB7715F3F7F75D210724D'
TEST_SIGNED_FILE = os.path.join(
settings.BASE_DIR, 'mayan', 'apps', 'django_gpg', 'tests', 'contrib',
'test_files', 'test_file.txt.gpg'
)
TEST_SIGNED_FILE_CONTENT = 'test_file.txt\n'

View File

@@ -1,45 +0,0 @@
from __future__ import unicode_literals
import shutil
from django.test import TestCase
from ..api import GPG, Key
from ..settings import setting_gpg_path
from .literals import TEST_GPG_HOME, TEST_KEY_ID, TEST_KEYSERVERS, TEST_UIDS
class DjangoGPGTestCase(TestCase):
def setUp(self):
try:
shutil.rmtree(TEST_GPG_HOME)
except OSError:
pass
self.gpg = GPG(
binary_path=setting_gpg_path.value, home=TEST_GPG_HOME,
keyservers=TEST_KEYSERVERS
)
def test_main(self):
# No private or public keys in the keyring
self.assertEqual(Key.get_all(self.gpg, secret=True), [])
self.assertEqual(Key.get_all(self.gpg), [])
# Test querying the keyservers
self.assertTrue(
TEST_KEY_ID in [
key_stub.key_id for key_stub in self.gpg.query(TEST_UIDS)
]
)
# Receive a public key from the keyserver
self.gpg.receive_key(key_id=TEST_KEY_ID[-8:])
# Check that the received key is indeed in the keyring
self.assertTrue(
TEST_KEY_ID[-16:] in [
key_stub.key_id for key_stub in Key.get_all(self.gpg)
]
)

View File

@@ -0,0 +1,165 @@
from __future__ import unicode_literals
import StringIO
import tempfile
from django.test import TestCase
from ..exceptions import (
DecryptionError, KeyDoesNotExist, NeedPassphrase, PassphraseError,
VerificationError
)
from ..models import Key
from .literals import (
TEST_DETACHED_SIGNATURE, TEST_FILE, TEST_KEY_DATA, TEST_KEY_FINGERPRINT,
TEST_KEY_PASSPHRASE, TEST_SEARCH_FINGERPRINT, TEST_SEARCH_UID,
TEST_SIGNED_FILE, TEST_SIGNED_FILE_CONTENT
)
class KeyTestCase(TestCase):
def test_key_instance_creation(self):
# Creating a Key instance is analogous to importing a key
key = Key.objects.create(key_data=TEST_KEY_DATA)
self.assertEqual(key.fingerprint, TEST_KEY_FINGERPRINT)
def test_key_search(self):
search_results = Key.objects.search(query=TEST_SEARCH_UID)
self.assertTrue(
TEST_SEARCH_FINGERPRINT in [
key_stub.fingerprint for key_stub in search_results
]
)
def test_key_receive(self):
Key.objects.receive_key(key_id=TEST_SEARCH_FINGERPRINT)
self.assertEqual(Key.objects.all().count(), 1)
self.assertEqual(
Key.objects.first().fingerprint, TEST_SEARCH_FINGERPRINT
)
def test_cleartext_file_verification(self):
cleartext_file = tempfile.TemporaryFile()
cleartext_file.write('test')
cleartext_file.seek(0)
with self.assertRaises(VerificationError):
Key.objects.verify_file(file_object=cleartext_file)
cleartext_file.close()
def test_embedded_verification_no_key(self):
with open(TEST_SIGNED_FILE) as signed_file:
result = Key.objects.verify_file(signed_file)
self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT)
def test_embedded_verification_with_key(self):
Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_SIGNED_FILE) as signed_file:
result = Key.objects.verify_file(signed_file)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
def test_embedded_verification_with_correct_fingerprint(self):
Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_SIGNED_FILE) as signed_file:
result = Key.objects.verify_file(
signed_file, key_fingerprint=TEST_KEY_FINGERPRINT
)
self.assertTrue(result.valid)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
def test_embedded_verification_with_incorrect_fingerprint(self):
Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_SIGNED_FILE) as signed_file:
with self.assertRaises(KeyDoesNotExist):
Key.objects.verify_file(signed_file, key_fingerprint='999')
def test_signed_file_decryption(self):
Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_SIGNED_FILE) as signed_file:
result = Key.objects.decrypt_file(file_object=signed_file)
self.assertEqual(result.read(), TEST_SIGNED_FILE_CONTENT)
def test_cleartext_file_decryption(self):
cleartext_file = tempfile.TemporaryFile()
cleartext_file.write('test')
cleartext_file.seek(0)
with self.assertRaises(DecryptionError):
Key.objects.decrypt_file(file_object=cleartext_file)
cleartext_file.close()
def test_detached_verification_no_key(self):
with open(TEST_DETACHED_SIGNATURE) as signature_file:
with open(TEST_FILE) as test_file:
result = Key.objects.verify_file(
file_object=test_file, signature_file=signature_file
)
self.assertTrue(result.key_id in TEST_KEY_FINGERPRINT)
def test_detached_verification_with_key(self):
Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_DETACHED_SIGNATURE) as signature_file:
with open(TEST_FILE) as test_file:
result = Key.objects.verify_file(
file_object=test_file, signature_file=signature_file
)
self.assertTrue(result)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)
def test_detached_signing_no_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
with self.assertRaises(NeedPassphrase):
with open(TEST_FILE) as test_file:
key.sign_file(
file_object=test_file, detached=True,
)
def test_detached_signing_bad_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
with self.assertRaises(PassphraseError):
with open(TEST_FILE) as test_file:
key.sign_file(
file_object=test_file, detached=True,
passphrase='bad passphrase'
)
def test_detached_signing_with_passphrase(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_FILE) as test_file:
detached_signature = key.sign_file(
file_object=test_file, detached=True,
passphrase=TEST_KEY_PASSPHRASE
)
signature_file = StringIO.StringIO()
signature_file.write(detached_signature)
signature_file.seek(0)
with open(TEST_FILE) as test_file:
result = Key.objects.verify_file(
file_object=test_file, signature_file=signature_file
)
signature_file.close()
self.assertTrue(result)
self.assertEqual(result.fingerprint, TEST_KEY_FINGERPRINT)

View File

@@ -0,0 +1,67 @@
from __future__ import absolute_import, unicode_literals
from django_downloadview.test import assert_download_response
from common.tests.test_views import GenericViewTestCase
from user_management.tests import (
TEST_USER_USERNAME, TEST_USER_PASSWORD
)
from ..models import Key
from ..permissions import permission_key_download, permission_key_upload
from .literals import TEST_KEY_DATA, TEST_KEY_FINGERPRINT
class KeyViewTestCase(GenericViewTestCase):
def test_key_download_view_no_permission(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.get(
viewname='django_gpg:key_download', args=(key.pk,)
)
self.assertEqual(response.status_code, 403)
def test_key_download_view_with_permission(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(permission_key_download.stored_permission)
response = self.get(
viewname='django_gpg:key_download', args=(key.pk,)
)
assert_download_response(
self, response=response, content=key.key_data,
basename=key.key_id,
)
def test_key_upload_view_no_permission(self):
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.post(
viewname='django_gpg:key_upload', data={'key_data': TEST_KEY_DATA}
)
self.assertEqual(response.status_code, 403)
self.assertEqual(Key.objects.count(), 0)
def test_key_upload_view_with_permission(self):
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(permission_key_upload.stored_permission)
response = self.post(
viewname='django_gpg:key_upload', data={'key_data': TEST_KEY_DATA},
follow=True
)
self.assertContains(response, 'created', status_code=200)
self.assertEqual(Key.objects.count(), 1)
self.assertEqual(Key.objects.first().fingerprint, TEST_KEY_FINGERPRINT)

View File

@@ -3,14 +3,22 @@ from __future__ import unicode_literals
from django.conf.urls import patterns, url
from .views import (
KeyQueryView, KeyQueryResultView, PrivateKeyListView, PublicKeyListView
KeyDeleteView, KeyDetailView, KeyDownloadView, KeyQueryView,
KeyQueryResultView, KeyReceive, KeyUploadView, PrivateKeyListView,
PublicKeyListView
)
urlpatterns = patterns(
'django_gpg.views',
url(
r'^delete/(?P<fingerprint>.+)/(?P<key_type>\w+)/$', 'key_delete',
name='key_delete'
r'^(?P<pk>\d+)/$', KeyDetailView.as_view(), name='key_detail'
),
url(
r'^(?P<pk>\d+)/delete/$', KeyDeleteView.as_view(), name='key_delete'
),
url(
r'^(?P<pk>\d+)/download/$', KeyDownloadView.as_view(),
name='key_download'
),
url(
r'^list/private/$', PrivateKeyListView.as_view(),
@@ -19,10 +27,15 @@ urlpatterns = patterns(
url(
r'^list/public/$', PublicKeyListView.as_view(), name='key_public_list'
),
url(
r'^upload/$', KeyUploadView.as_view(), name='key_upload'
),
url(r'^query/$', KeyQueryView.as_view(), name='key_query'),
url(
r'^query/results/$', KeyQueryResultView.as_view(),
name='key_query_results'
),
url(r'^receive/(?P<key_id>.+)/$', 'key_receive', name='key_receive'),
url(
r'^receive/(?P<key_id>.+)/$', KeyReceive.as_view(), name='key_receive'
),
)

View File

@@ -2,117 +2,90 @@ from __future__ import absolute_import, unicode_literals
import logging
from django.conf import settings
from django.contrib import messages
from django.core.urlresolvers import reverse
from django.http import HttpResponseRedirect
from django.shortcuts import redirect, render_to_response
from django.template import RequestContext
from django.core.files.base import ContentFile
from django.core.urlresolvers import reverse, reverse_lazy
from django.utils.translation import ugettext_lazy as _
from common.generics import SimpleView, SingleObjectListView
from permissions import Permission
from .api import Key
from .forms import KeySearchForm
from .permissions import (
permission_key_delete, permission_key_receive, permission_key_view,
permission_keyserver_query
from common.generics import (
ConfirmView, SingleObjectCreateView, SingleObjectDeleteView,
SingleObjectDetailView, SingleObjectDownloadView, SingleObjectListView,
SimpleView
)
from .forms import KeyDetailForm, KeySearchForm
from .literals import KEY_TYPE_PUBLIC
from .models import Key
from .permissions import (
permission_key_delete, permission_key_download, permission_key_receive,
permission_key_upload, permission_key_view, permission_keyserver_query
)
from .runtime import gpg
logger = logging.getLogger(__name__)
def key_receive(request, key_id):
Permission.check_permissions(request.user, (permission_key_receive,))
class KeyDeleteView(SingleObjectDeleteView):
model = Key
object_permission = permission_key_delete
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL))))
if request.method == 'POST':
try:
gpg.receive_key(key_id=key_id)
except Exception as exception:
messages.error(
request,
_('Unable to import key: %(key_id)s; %(error)s') %
{
'key_id': key_id,
'error': exception,
}
)
return HttpResponseRedirect(previous)
def get_post_action_redirect(self):
if self.get_object().key_type == KEY_TYPE_PUBLIC:
return reverse_lazy('django_gpg:key_public_list')
else:
messages.success(
request,
_('Successfully received key: %(key_id)s') %
{
'key_id': key_id,
}
)
return reverse_lazy('django_gpg:key_private_list')
return redirect('django_gpg:key_public_list')
return render_to_response('appearance/generic_confirm.html', {
'message': _('Import key ID: %s?') % key_id,
'previous': previous,
'title': _('Import key'),
}, context_instance=RequestContext(request))
def get_extra_context(self):
return {'title': _('Delete key: %s') % self.get_object()}
class PublicKeyListView(SingleObjectListView):
view_permission = permission_key_view
class KeyDetailView(SingleObjectDetailView):
form_class = KeyDetailForm
model = Key
object_permission = permission_key_view
def get_extra_context(self):
return {
'hide_object': True,
'title': self.get_title()
'title': _('Details for key: %s') % self.get_object(),
}
def get_queryset(self):
return Key.get_all(gpg)
def get_title(self):
return _('Public keys')
class KeyDownloadView(SingleObjectDownloadView):
model = Key
object_permission = permission_key_download
def get_file(self):
key = self.get_object()
return ContentFile(key.key_data, name=key.key_id)
class PrivateKeyListView(PublicKeyListView):
def get_title(self):
return _('Private keys')
class KeyReceive(ConfirmView):
post_action_redirect = reverse_lazy('django_gpg:key_public_list')
view_permission = permission_key_receive
def get_queryset(self):
return Key.get_all(gpg, secret=True)
def get_extra_context(self):
return {
'message': _('Import key ID: %s?') % self.kwargs['key_id'],
'title': _('Import key'),
}
def key_delete(request, fingerprint, key_type):
Permission.check_permissions(request.user, (permission_key_delete,))
secret = key_type == 'sec'
key = Key.get(gpg, fingerprint, secret=secret)
post_action_redirect = redirect('django_gpg:key_public_list')
previous = request.POST.get('previous', request.GET.get('previous', request.META.get('HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL))))
next = request.POST.get('next', request.GET.get('next', post_action_redirect if post_action_redirect else request.META.get('HTTP_REFERER', reverse(settings.LOGIN_REDIRECT_URL))))
if request.method == 'POST':
def view_action(self):
try:
gpg.delete_key(key)
messages.success(request, _('Key: %s, deleted successfully.') % fingerprint)
return HttpResponseRedirect(next)
Key.objects.receive_key(key_id=self.kwargs['key_id'])
except Exception as exception:
messages.error(request, exception)
return HttpResponseRedirect(previous)
return render_to_response('appearance/generic_confirm.html', {
'title': _('Delete key'),
'delete_view': True,
'message': _(
'Delete key %s? If you delete a public key that is part of a '
'public/private pair the private key will be deleted as well.'
) % key,
'next': next,
'previous': previous,
}, context_instance=RequestContext(request))
messages.error(
self.request,
_('Unable to import key: %(key_id)s; %(error)s') % {
'key_id': self.kwargs['key_id'],
'error': exception,
}
)
else:
messages.success(
self.request, _('Successfully received key: %(key_id)s') % {
'key_id': self.kwargs['key_id'],
}
)
class KeyQueryView(SimpleView):
@@ -149,6 +122,40 @@ class KeyQueryResultView(SingleObjectListView):
def get_queryset(self):
term = self.request.GET.get('term')
if term:
return gpg.query(term)
return Key.objects.search(query=term)
else:
return ()
class KeyUploadView(SingleObjectCreateView):
fields = ('key_data',)
model = Key
post_action_redirect = reverse_lazy('django_gpg:key_public_list')
view_permission = permission_key_upload
def get_extra_context(self):
return {
'title': _('Upload new key'),
}
class PublicKeyListView(SingleObjectListView):
object_permission = permission_key_view
queryset = Key.objects.public_keys()
def get_extra_context(self):
return {
'hide_object': True,
'title': _('Public keys')
}
class PrivateKeyListView(SingleObjectListView):
object_permission = permission_key_view
queryset = Key.objects.private_keys()
def get_extra_context(self):
return {
'hide_object': True,
'title': _('Private keys')
}

View File

@@ -19,5 +19,6 @@ link_comment_delete = Link(
)
link_comments_for_document = Link(
icon='fa fa-comment', permissions=(permission_comment_view,),
text=_('Comments'), view='comments:comments_for_document', args='object.pk'
text=_('Comments'), view='comments:comments_for_document',
args='resolved_object.pk'
)

View File

@@ -16,7 +16,7 @@ def is_not_root_node(context):
link_document_index_list = Link(
icon='fa fa-list-ul', text=_('Indexes'),
view='indexing:document_index_list', args='object.pk'
view='indexing:document_index_list', args='resolved_object.pk'
)
link_index_main_menu = Link(
icon='fa fa-list-ul', text=_('Indexes'), view='indexing:index_list'

View File

@@ -2,21 +2,22 @@ from __future__ import unicode_literals
from django.contrib import admin
from .models import DocumentVersionSignature
from .models import DetachedSignature, EmbeddedSignature
@admin.register(DocumentVersionSignature)
class DocumentVersionSignatureAdmin(admin.ModelAdmin):
def document(self, instance):
return instance.document_version.document
def has_detached_signature(self, instance):
return True if instance.signature_file else False
has_detached_signature.boolean = True
@admin.register(DetachedSignature)
class DetachedSignatureAdmin(admin.ModelAdmin):
list_display = (
'document', 'document_version', 'has_embedded_signature',
'has_detached_signature'
'document_version', 'date', 'key_id', 'signature_id',
'public_key_fingerprint', 'signature_file'
)
list_display_links = ('document_version',)
@admin.register(EmbeddedSignature)
class EmbeddedSignatureAdmin(admin.ModelAdmin):
list_display = (
'document_version', 'date', 'key_id', 'signature_id',
'public_key_fingerprint'
)
list_display_links = ('document_version',)
search_fields = ('document_version__document__label',)

View File

@@ -2,20 +2,42 @@ from __future__ import unicode_literals
import logging
from kombu import Exchange, Queue
from django.apps import apps
from django.db.models.signals import post_save, post_delete
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission
from common import MayanAppConfig, menu_facet, menu_sidebar
from common import (
MayanAppConfig, menu_facet, menu_object, menu_sidebar, menu_tools
)
from common.signals import post_upgrade
from mayan.celery import app
from navigation import SourceColumn
from .hooks import document_pre_open_hook, document_version_post_save_hook
from .handlers import (
unverify_key_signatures, verify_key_signatures,
verify_missing_embedded_signature
)
from .links import (
link_document_signature_delete, link_document_signature_download,
link_document_signature_upload, link_document_verify
link_all_document_version_signature_verify,
link_document_signature_list,
link_document_version_signature_delete,
link_document_version_signature_detached_create,
link_document_version_signature_embedded_create,
link_document_version_signature_details,
link_document_version_signature_download,
link_document_version_signature_list,
link_document_version_signature_upload,
)
from .permissions import (
permission_document_verify, permission_signature_delete,
permission_signature_download, permission_signature_upload
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
permission_document_version_signature_delete,
permission_document_version_signature_download,
permission_document_version_signature_upload,
permission_document_version_signature_view,
)
logger = logging.getLogger(__name__)
@@ -39,30 +61,110 @@ class DocumentSignaturesApp(MayanAppConfig):
app_label='documents', model_name='DocumentVersion'
)
DocumentVersion.register_post_save_hook(
1, document_version_post_save_hook
Key = apps.get_model(
app_label='django_gpg', model_name='Key'
)
EmbeddedSignature = self.get_model('EmbeddedSignature')
SignatureBaseModel = self.get_model('SignatureBaseModel')
DocumentVersion.register_post_save_hook(
order=1, func=EmbeddedSignature.objects.create
)
DocumentVersion.register_pre_open_hook(
order=1, func=EmbeddedSignature.objects.open_signed
)
DocumentVersion.register_pre_open_hook(1, document_pre_open_hook)
ModelPermission.register(
model=Document, permissions=(
permission_document_verify, permission_signature_delete,
permission_signature_download, permission_signature_upload,
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
permission_document_version_signature_delete,
permission_document_version_signature_download,
permission_document_version_signature_view,
permission_document_version_signature_upload,
)
)
SourceColumn(
source=SignatureBaseModel, label=_('Date'), attribute='date'
)
SourceColumn(
source=SignatureBaseModel, label=_('Key ID'),
attribute='get_key_id'
)
SourceColumn(
source=SignatureBaseModel, label=_('Signature ID'),
func=lambda context: context['object'].signature_id or _('None')
)
SourceColumn(
source=SignatureBaseModel, label=_('Type'),
func=lambda context: SignatureBaseModel.objects.get_subclass(
pk=context['object'].pk
).get_signature_type_display()
)
app.conf.CELERY_QUEUES.append(
Queue(
'signatures', Exchange('signatures'), routing_key='signatures'
),
)
app.conf.CELERY_ROUTES.update(
{
'document_signatures.tasks.task_verify_key_signatures': {
'queue': 'signatures'
},
'document_signatures.tasks.task_unverify_key_signatures': {
'queue': 'signatures'
},
'document_signatures.tasks.task_verify_document_version': {
'queue': 'signatures'
},
'document_signatures.tasks.task_verify_missing_embedded_signature': {
'queue': 'tools'
},
}
)
menu_facet.bind_links(
links=(link_document_verify,), sources=(Document,)
links=(link_document_signature_list,), sources=(Document,)
)
menu_object.bind_links(
links=(
link_document_version_signature_list,
link_document_version_signature_detached_create,
link_document_version_signature_embedded_create
), sources=(DocumentVersion,)
)
menu_object.bind_links(
links=(
link_document_version_signature_details,
link_document_version_signature_download,
link_document_version_signature_delete,
), sources=(SignatureBaseModel,)
)
menu_sidebar.bind_links(
links=(
link_document_signature_upload,
link_document_signature_download,
link_document_signature_delete
), sources=(
'signatures:document_verify',
'signatures:document_signature_upload',
'signatures:document_signature_download',
'signatures:document_signature_delete'
)
link_document_version_signature_upload,
), sources=(DocumentVersion,)
)
menu_tools.bind_links(
links=(link_all_document_version_signature_verify,)
)
post_delete.connect(
unverify_key_signatures,
dispatch_uid='unverify_key_signatures',
sender=Key
)
post_upgrade.connect(
verify_missing_embedded_signature,
dispatch_uid='verify_missing_embedded_signature',
)
post_save.connect(
verify_key_signatures,
dispatch_uid='verify_key_signatures',
sender=Key
)

View File

@@ -1,10 +1,111 @@
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals
import logging
from django import forms
from django.core.exceptions import PermissionDenied
from django.utils.translation import ugettext_lazy as _
from acls.models import AccessControlList
from permissions import Permission
class DetachedSignatureForm(forms.Form):
file = forms.FileField(
label=_('Signature file'),
from common.forms import DetailForm
from django_gpg.models import Key
from django_gpg.permissions import permission_key_sign
from .models import SignatureBaseModel
logger = logging.getLogger(__name__)
class DocumentVersionSignatureCreateForm(forms.Form):
key = forms.ModelChoiceField(
label=_('Key'), queryset=Key.objects.none()
)
passphrase = forms.CharField(
label=_('Passphrase'), required=False,
widget=forms.widgets.PasswordInput
)
def __init__(self, *args, **kwargs):
user = kwargs.pop('user', None)
logger.debug('user: %s', user)
super(
DocumentVersionSignatureCreateForm, self
).__init__(*args, **kwargs)
queryset = Key.objects.private_keys()
try:
Permission.check_permissions(user, (permission_key_sign,))
except PermissionDenied:
queryset = AccessControlList.objects.filter_by_access(
permission_key_sign, user, queryset
)
self.fields['key'].queryset = queryset
class DocumentVersionSignatureDetailForm(DetailForm):
def __init__(self, *args, **kwargs):
extra_fields = (
{'label': _('Signature is embedded?'), 'field': 'is_embedded'},
{
'label': _('Signature date'), 'field': 'date',
'widget': forms.widgets.DateInput
},
{'label': _('Signature key ID'), 'field': 'key_id'},
{
'label': _('Signature key present?'),
'field': lambda x: x.public_key_fingerprint is not None
},
)
if kwargs['instance'].public_key_fingerprint:
key = Key.objects.get(
fingerprint=kwargs['instance'].public_key_fingerprint
)
extra_fields += (
{'label': _('Signature ID'), 'field': 'signature_id'},
{
'label': _('Key fingerprint'),
'field': lambda x: key.fingerprint
},
{
'label': _('Key creation date'),
'field': lambda x: key.creation_date,
'widget': forms.widgets.DateInput
},
{
'label': _('Key expiration date'),
'field': lambda x: key.expiration_date or _('None'),
'widget': forms.widgets.DateInput
},
{
'label': _('Key length'),
'field': lambda x: key.length
},
{
'label': _('Key algorithm'),
'field': lambda x: key.algorithm
},
{
'label': _('Key user ID'),
'field': lambda x: key.user_id
},
{
'label': _('Key type'),
'field': lambda x: key.get_key_type_display()
},
)
kwargs['extra_fields'] = extra_fields
super(
DocumentVersionSignatureDetailForm, self
).__init__(*args, **kwargs)
class Meta:
fields = ()
model = SignatureBaseModel

View File

@@ -0,0 +1,22 @@
from __future__ import unicode_literals
from .tasks import (
task_unverify_key_signatures, task_verify_missing_embedded_signature,
task_verify_key_signatures
)
def unverify_key_signatures(sender, **kwargs):
task_unverify_key_signatures.apply_async(
kwargs=dict(key_id=kwargs['instance'].key_id)
)
def verify_key_signatures(sender, **kwargs):
task_verify_key_signatures.apply_async(
kwargs=dict(key_pk=kwargs['instance'].pk)
)
def verify_missing_embedded_signature(sender, **kwargs):
task_verify_missing_embedded_signature.delay()

View File

@@ -1,51 +0,0 @@
from __future__ import unicode_literals
import io
import logging
from django.apps import apps
from django_gpg.exceptions import GPGDecryptionError
from django_gpg.runtime import gpg
logger = logging.getLogger(__name__)
def document_pre_open_hook(descriptor, instance):
logger.debug('instance: %s', instance)
DocumentVersionSignature = apps.get_model(
app_label='document_signatures', model_name='DocumentVersionSignature'
)
if DocumentVersionSignature.objects.has_embedded_signature(document_version=instance):
# If it has an embedded signature, decrypt
try:
result = gpg.decrypt_file(descriptor, close_descriptor=False)
# gpg return a string, turn it into a file like object
except GPGDecryptionError:
# At least return the original raw content
descriptor.seek(0)
return descriptor
else:
descriptor.close()
return io.BytesIO(result.data)
else:
return descriptor
def document_version_post_save_hook(instance):
logger.debug('instance: %s', instance)
DocumentVersionSignature = apps.get_model(
app_label='document_signatures', model_name='DocumentVersionSignature'
)
try:
document_signature = DocumentVersionSignature.objects.get(
document_version=instance
)
except DocumentVersionSignature.DoesNotExist:
document_signature = DocumentVersionSignature.objects.create(
document_version=instance
)
document_signature.check_for_embedded_signature()

View File

@@ -6,50 +6,78 @@ from django.utils.translation import ugettext_lazy as _
from navigation import Link
from .permissions import (
permission_document_verify, permission_signature_delete,
permission_signature_download, permission_signature_upload,
permission_document_version_sign_detached,
permission_document_version_sign_embedded,
permission_document_version_signature_delete,
permission_document_version_signature_download,
permission_document_version_signature_upload,
permission_document_version_signature_verify,
permission_document_version_signature_view
)
def can_upload_detached_signature(context):
DocumentVersionSignature = apps.get_model(
app_label='document_signatures', model_name='DocumentVersionSignature'
def is_detached_signature(context):
SignatureBaseModel = apps.get_model(
app_label='document_signatures', model_name='SignatureBaseModel'
)
return not DocumentVersionSignature.objects.has_detached_signature(
context['object'].latest_version
) and not DocumentVersionSignature.objects.has_embedded_signature(
context['object'].latest_version
)
return SignatureBaseModel.objects.select_subclasses().get(
pk=context['object'].pk
).is_detached
def can_delete_detached_signature(context):
DocumentVersionSignature = apps.get_model(
app_label='document_signatures', model_name='DocumentVersionSignature'
)
return DocumentVersionSignature.objects.has_detached_signature(
context['object'].latest_version
)
link_document_signature_delete = Link(
condition=can_delete_detached_signature,
permissions=(permission_signature_delete,), tags='dangerous',
text=_('Delete signature'), view='signatures:document_signature_delete',
args='object.pk'
link_all_document_version_signature_verify = Link(
icon='fa fa-certificate',
permissions=(permission_document_version_signature_verify,),
text=_('Verify all documents'),
view='signatures:all_document_version_signature_verify',
)
link_document_signature_download = Link(
condition=can_delete_detached_signature, text=_('Download signature'),
view='signatures:document_signature_download', args='object.pk',
permissions=(permission_signature_download,)
link_document_signature_list = Link(
args='resolved_object.latest_version.pk',
icon='fa fa-certificate',
permissions=(permission_document_version_signature_view,),
text=_('Signatures'),
view='signatures:document_version_signature_list',
)
link_document_signature_upload = Link(
condition=can_upload_detached_signature,
permissions=(permission_signature_upload,), text=_('Upload signature'),
view='signatures:document_signature_upload', args='object.pk'
link_document_version_signature_delete = Link(
args='resolved_object.pk', condition=is_detached_signature,
permissions=(permission_document_version_signature_delete,),
permissions_related='document_version.document', tags='dangerous',
text=_('Delete'), view='signatures:document_version_signature_delete',
)
link_document_verify = Link(
icon='fa fa-certificate', permissions=(permission_document_verify,),
text=_('Signatures'), view='signatures:document_verify', args='object.pk'
link_document_version_signature_details = Link(
args='resolved_object.pk',
permissions=(permission_document_version_signature_view,),
permissions_related='document_version.document', text=_('Details'),
view='signatures:document_version_signature_details',
)
link_document_version_signature_list = Link(
args='resolved_object.pk',
permissions=(permission_document_version_signature_view,),
permissions_related='document', text=_('Signature list'),
view='signatures:document_version_signature_list',
)
link_document_version_signature_download = Link(
args='resolved_object.pk', condition=is_detached_signature,
permissions=(permission_document_version_signature_download,),
permissions_related='document_version.document', text=_('Download'),
view='signatures:document_version_signature_download',
)
link_document_version_signature_upload = Link(
args='resolved_object.pk',
permissions=(permission_document_version_signature_upload,),
permissions_related='document', text=_('Upload signature'),
view='signatures:document_version_signature_upload',
)
link_document_version_signature_detached_create = Link(
args='resolved_object.pk',
permissions=(permission_document_version_sign_detached,),
permissions_related='document', text=_('Sign detached'),
view='signatures:document_version_signature_detached_create',
)
link_document_version_signature_embedded_create = Link(
args='resolved_object.pk',
permissions=(permission_document_version_sign_embedded,),
permissions_related='document', text=_('Sign embedded'),
view='signatures:document_version_signature_embedded_create',
)

View File

@@ -1,104 +1,55 @@
from __future__ import unicode_literals
import logging
import os
import tempfile
from django.db import models
from django_gpg.exceptions import GPGVerificationError
from django_gpg.runtime import gpg
from django_gpg.exceptions import DecryptionError
from django_gpg.models import Key
from documents.models import DocumentVersion
logger = logging.getLogger(__name__)
class DocumentVersionSignatureManager(models.Manager):
def get_document_signature(self, document_version):
document_signature, created = self.model.objects.get_or_create(
document_version=document_version,
class EmbeddedSignatureManager(models.Manager):
def open_signed(self, file_object, document_version):
for signature in self.filter(document_version=document_version):
try:
return self.open_signed(
file_object=Key.objects.decrypt_file(
file_object=file_object
), document_version=document_version
)
except DecryptionError:
file_object.seek(0)
return file_object
else:
return file_object
def unsigned_document_versions(self):
return DocumentVersion.objects.exclude(
pk__in=self.values('document_version')
)
return document_signature
def add_detached_signature(self, document_version, detached_signature):
document_signature = self.get_document_signature(
document_version=document_version
)
if document_signature.has_embedded_signature:
raise Exception(
'Document version already has an embedded signature'
)
else:
if document_signature.signature_file:
logger.debug('Existing detached signature')
document_signature.delete_detached_signature_file()
document_signature.signature_file = None
document_signature.save()
document_signature.signature_file = detached_signature
document_signature.save()
def has_detached_signature(self, document_version):
try:
document_signature = self.get_document_signature(
document_version=document_version
)
except ValueError:
return False
else:
if document_signature.signature_file:
return True
else:
return False
def has_embedded_signature(self, document_version):
logger.debug('document_version: %s', document_version)
def sign_document_version(self, document_version, key, passphrase=None, user=None):
temporary_file_object, temporary_filename = tempfile.mkstemp()
try:
document_signature = self.get_document_signature(
document_version=document_version
)
except ValueError:
return False
with document_version.open() as file_object:
signature_result = key.sign_file(
binary=True, file_object=file_object,
output=temporary_filename, passphrase=passphrase
)
except Exception:
raise
else:
return document_signature.has_embedded_signature
def detached_signature(self, document_version):
document_signature = self.get_document_signature(
document_version=document_version
)
return document_signature.signature_file.storage.open(
document_signature.signature_file.name
)
def verify_signature(self, document_version):
document_version_descriptor = document_version.open(raw=True)
detached_signature = None
if self.has_detached_signature(document_version=document_version):
logger.debug('has detached signature')
detached_signature = self.detached_signature(
document_version=document_version
)
args = (document_version_descriptor, detached_signature)
else:
args = (document_version_descriptor,)
try:
return gpg.verify_file(*args, fetch_key=False)
except GPGVerificationError:
return None
with open(temporary_filename) as file_object:
new_version = document_version.document.new_version(
file_object=file_object, _user=user
)
finally:
document_version_descriptor.close()
if detached_signature:
detached_signature.close()
os.unlink(temporary_filename)
def clear_detached_signature(self, document_version):
document_signature = self.get_document_signature(
document_version=document_version
)
if not document_signature.signature_file:
raise Exception('document doesn\'t have a detached signature')
document_signature.delete_detached_signature_file()
document_signature.signature_file = None
document_signature.save()
return new_version

View File

@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import storage.backends.filebasedstorage
import document_signatures.models
class Migration(migrations.Migration):
dependencies = [
('documents', '0033_auto_20160325_0052'),
('document_signatures', '0002_auto_20150608_1902'),
]
operations = [
migrations.CreateModel(
name='SignatureBaseModel',
fields=[
(
'id', models.AutoField(
verbose_name='ID', serialize=False, auto_created=True,
primary_key=True
)
),
(
'date', models.DateField(
null=True, verbose_name='Date signed', blank=True
)
),
(
'key_id', models.CharField(
max_length=40, verbose_name='Key ID'
)
),
(
'signature_id', models.CharField(
max_length=64, null=True, verbose_name='Signature ID',
blank=True
)
),
(
'public_key_fingerprint', models.CharField(
verbose_name='Public key fingerprint', unique=True,
max_length=40, editable=False
)
),
],
options={
'verbose_name': 'Document version signature',
'verbose_name_plural': 'Document version signatures',
},
),
migrations.RemoveField(
model_name='documentversionsignature',
name='has_embedded_signature',
),
migrations.AddField(
model_name='documentversionsignature',
name='date',
field=models.DateField(
null=True, verbose_name='Date signed', blank=True
),
),
migrations.AddField(
model_name='documentversionsignature',
name='signature_id',
field=models.CharField(
max_length=64, null=True, verbose_name='Signature ID',
blank=True
),
),
migrations.AlterField(
model_name='documentversionsignature',
name='document_version',
field=models.ForeignKey(
related_name='signature', editable=False,
to='documents.DocumentVersion', verbose_name='Document version'
),
),
migrations.CreateModel(
name='DetachedSignature',
fields=[
(
'signaturebasemodel_ptr', models.OneToOneField(
parent_link=True, auto_created=True, primary_key=True,
serialize=False,
to='document_signatures.SignatureBaseModel'
)
),
(
'signature_file', models.FileField(
storage=storage.backends.filebasedstorage.FileBasedStorage(),
upload_to=document_signatures.models.upload_to,
null=True, verbose_name='Signature file', blank=True
)
),
],
options={
'verbose_name': 'Document version detached signature',
'verbose_name_plural': 'Document version detached signatures',
},
bases=('document_signatures.signaturebasemodel',),
),
migrations.CreateModel(
name='EmbeddedSignature',
fields=[
(
'signaturebasemodel_ptr', models.OneToOneField(
parent_link=True, auto_created=True, primary_key=True,
serialize=False,
to='document_signatures.SignatureBaseModel'
)
),
],
options={
'verbose_name': 'Document version embedded signature',
'verbose_name_plural': 'Document version embedded signatures',
},
bases=('document_signatures.signaturebasemodel',),
),
migrations.AddField(
model_name='signaturebasemodel',
name='document_version',
field=models.ForeignKey(
related_name='signaturebasemodel', editable=False,
to='documents.DocumentVersion', verbose_name='Document version'
),
),
]

View File

@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('document_signatures', '0003_auto_20160325_0052'),
]
operations = [
migrations.AlterField(
model_name='documentversionsignature',
name='document_version',
field=models.ForeignKey(
editable=False, to='documents.DocumentVersion',
verbose_name='Document version'
),
),
migrations.AlterField(
model_name='signaturebasemodel',
name='date',
field=models.DateField(
verbose_name='Date signed', null=True, editable=False,
blank=True
),
),
migrations.AlterField(
model_name='signaturebasemodel',
name='document_version',
field=models.ForeignKey(
related_name='signatures', editable=False,
to='documents.DocumentVersion', verbose_name='Document version'
),
),
migrations.AlterField(
model_name='signaturebasemodel',
name='public_key_fingerprint',
field=models.CharField(
null=True, editable=False, max_length=40, blank=True,
unique=True, verbose_name='Public key fingerprint'
),
),
migrations.AlterField(
model_name='signaturebasemodel',
name='signature_id',
field=models.CharField(
verbose_name='Signature ID', max_length=64, null=True,
editable=False, blank=True
),
),
]

View File

@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('document_signatures', '0004_auto_20160325_0418'),
]
operations = [
migrations.RemoveField(
model_name='documentversionsignature',
name='document_version',
),
migrations.DeleteModel(
name='DocumentVersionSignature',
),
]

View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('document_signatures', '0005_auto_20160325_0748'),
]
operations = [
migrations.AlterField(
model_name='signaturebasemodel',
name='public_key_fingerprint',
field=models.CharField(
verbose_name='Public key fingerprint', max_length=40,
null=True, editable=False, blank=True
),
),
]

View File

@@ -3,13 +3,18 @@ from __future__ import unicode_literals
import logging
import uuid
from django.core.urlresolvers import reverse
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _
from django_gpg.runtime import gpg
from model_utils.managers import InheritanceManager
from django_gpg.exceptions import VerificationError
from django_gpg.models import Key
from documents.models import DocumentVersion
from .managers import DocumentVersionSignatureManager
from .managers import EmbeddedSignatureManager
from .runtime import storage_backend
logger = logging.getLogger(__name__)
@@ -19,35 +24,133 @@ def upload_to(*args, **kwargs):
return unicode(uuid.uuid4())
class DocumentVersionSignature(models.Model):
"""
Model that describes a document version signature properties
"""
@python_2_unicode_compatible
class SignatureBaseModel(models.Model):
document_version = models.ForeignKey(
DocumentVersion, editable=False, verbose_name=_('Document version')
DocumentVersion, editable=False, related_name='signatures',
verbose_name=_('Document version')
)
signature_file = models.FileField(
blank=True, null=True, storage=storage_backend, upload_to=upload_to,
verbose_name=_('Signature file')
# Basic fields
date = models.DateField(
blank=True, editable=False, null=True, verbose_name=_('Date signed')
)
has_embedded_signature = models.BooleanField(
default=False, verbose_name=_('Has embedded signature')
key_id = models.CharField(max_length=40, verbose_name=_('Key ID'))
# With proper key
signature_id = models.CharField(
blank=True, editable=False, null=True, max_length=64,
verbose_name=_('Signature ID')
)
public_key_fingerprint = models.CharField(
blank=True, editable=False, null=True, max_length=40,
verbose_name=_('Public key fingerprint')
)
objects = DocumentVersionSignatureManager()
def check_for_embedded_signature(self):
logger.debug('checking for embedded signature')
with self.document_version.open(raw=True) as file_object:
self.has_embedded_signature = gpg.has_embedded_signature(
file_object
)
self.save()
def delete_detached_signature_file(self):
self.signature_file.storage.delete(self.signature_file.name)
objects = InheritanceManager()
class Meta:
verbose_name = _('Document version signature')
verbose_name_plural = _('Document version signatures')
def __str__(self):
return self.signature_id or '{} - {}'.format(self.date, self.key_id)
def get_absolute_url(self):
return reverse(
'document_signatures:document_version_signature_detail',
args=(self.pk,)
)
def get_key_id(self):
if self.public_key_fingerprint:
return self.public_key_fingerprint[-16:]
else:
return self.key_id
def get_signature_type_display(self):
if self.is_detached:
return _('Detached')
else:
return _('Embedded')
@property
def is_detached(self):
return hasattr(self, 'signature_file')
@property
def is_embedded(self):
return not hasattr(self, 'signature_file')
class EmbeddedSignature(SignatureBaseModel):
objects = EmbeddedSignatureManager()
class Meta:
verbose_name = _('Document version embedded signature')
verbose_name_plural = _('Document version embedded signatures')
def save(self, *args, **kwargs):
logger.debug('checking for embedded signature')
if self.pk:
raw = True
else:
raw = False
with self.document_version.open(raw=raw) as file_object:
try:
verify_result = Key.objects.verify_file(
file_object=file_object
)
except VerificationError as exception:
# Not signed
logger.debug(
'embedded signature verification error; %s', exception
)
else:
self.date = verify_result.date
self.key_id = verify_result.key_id
self.signature_id = verify_result.signature_id
self.public_key_fingerprint = verify_result.pubkey_fingerprint
super(EmbeddedSignature, self).save(*args, **kwargs)
@python_2_unicode_compatible
class DetachedSignature(SignatureBaseModel):
signature_file = models.FileField(
blank=True, null=True, storage=storage_backend, upload_to=upload_to,
verbose_name=_('Signature file')
)
class Meta:
verbose_name = _('Document version detached signature')
verbose_name_plural = _('Document version detached signatures')
def __str__(self):
return '{}-{}'.format(self.document_version, _('signature'))
def delete(self, *args, **kwargs):
if self.signature_file.name:
self.signature_file.storage.delete(name=self.signature_file.name)
super(DetachedSignature, self).delete(*args, **kwargs)
def save(self, *args, **kwargs):
with self.document_version.open() as file_object:
try:
verify_result = Key.objects.verify_file(
file_object=file_object, signature_file=self.signature_file
)
except VerificationError as exception:
# Not signed
logger.debug(
'detached signature verification error; %s', exception
)
else:
self.signature_file.seek(0)
self.date = verify_result.date
self.key_id = verify_result.key_id
self.signature_id = verify_result.signature_id
self.public_key_fingerprint = verify_result.pubkey_fingerprint
return super(DetachedSignature, self).save(*args, **kwargs)

View File

@@ -8,15 +8,31 @@ namespace = PermissionNamespace(
'document_signatures', _('Document signatures')
)
permission_document_verify = namespace.add_permission(
name='document_verify', label=_('Verify document signatures')
permission_document_version_sign_detached = namespace.add_permission(
name='document_version_sign_detached',
label=_('Sign documents with detached signatures')
)
permission_signature_delete = namespace.add_permission(
name='signature_delete', label=_('Delete detached signatures')
permission_document_version_sign_embedded = namespace.add_permission(
name='document_version_sign_embedded',
label=_('Sign documents with embedded signatures')
)
permission_signature_download = namespace.add_permission(
name='signature_download', label=_('Download detached signatures')
permission_document_version_signature_delete = namespace.add_permission(
name='document_version_signature_delete',
label=_('Delete detached signatures')
)
permission_signature_upload = namespace.add_permission(
name='signature_upload', label=_('Upload detached signatures')
permission_document_version_signature_download = namespace.add_permission(
name='document_version_signature_download',
label=_('Download detached document signatures')
)
permission_document_version_signature_upload = namespace.add_permission(
name='document_version_signature_upload',
label=_('Upload detached document signatures')
)
permission_document_version_signature_verify = namespace.add_permission(
name='document_version_signature_verify',
label=_('Verify document signatures')
)
permission_document_version_signature_view = namespace.add_permission(
name='document_version_signature_view',
label=_('View details of document signatures')
)

View File

@@ -0,0 +1,76 @@
from __future__ import unicode_literals
import logging
from django.apps import apps
from mayan.celery import app
RETRY_DELAY = 10
logger = logging.getLogger(__name__)
@app.task(bind=True, ignore_result=True)
def task_unverify_key_signatures(self, key_id):
DetachedSignature = apps.get_model(
app_label='document_signatures', model_name='DetachedSignature'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
for signature in DetachedSignature.objects.filter(key_id__endswith=key_id).filter(signature_id__isnull=False):
signature.save()
for signature in EmbeddedSignature.objects.filter(key_id__endswith=key_id).filter(signature_id__isnull=False):
signature.save()
@app.task(bind=True, ignore_result=True)
def task_verify_key_signatures(self, key_pk):
Key = apps.get_model(
app_label='django_gpg', model_name='Key'
)
DetachedSignature = apps.get_model(
app_label='document_signatures', model_name='DetachedSignature'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
key = Key.objects.get(pk=key_pk)
for signature in DetachedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True):
signature.save()
for signature in EmbeddedSignature.objects.filter(key_id__endswith=key.key_id).filter(signature_id__isnull=True):
signature.save()
@app.task(bind=True, ignore_result=True)
def task_verify_missing_embedded_signature(self):
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
for document_version in EmbeddedSignature.objects.unsigned_document_versions():
task_verify_document_version.apply_async(
kwargs=dict(document_version_pk=document_version.pk)
)
@app.task(bind=True, ignore_result=True)
def task_verify_document_version(self, document_version_pk):
DocumentVersion = apps.get_model(
app_label='documents', model_name='DocumentVersion'
)
EmbeddedSignature = apps.get_model(
app_label='document_signatures', model_name='EmbeddedSignature'
)
document_version = DocumentVersion.objects.get(pk=document_version_pk)
EmbeddedSignature.objects.create(document_version=document_version)

View File

@@ -0,0 +1,18 @@
from __future__ import unicode_literals
import os
from django.conf import settings
TEST_SIGNED_DOCUMENT_PATH = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents', 'mayan_11_1.pdf.gpg'
)
TEST_SIGNATURE_FILE_PATH = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents', 'mayan_11_1.pdf.sig'
)
TEST_KEY_FILE = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents',
'key0x5F3F7F75D210724D.asc'
)
TEST_KEY_ID = '5F3F7F75D210724D'
TEST_SIGNATURE_ID = 'XVkoGKw35yU1iq11dZPiv7uAY7k'

View File

@@ -0,0 +1,129 @@
from __future__ import unicode_literals
from django.core.files import File
from django.core.urlresolvers import reverse
from documents.tests.literals import TEST_DOCUMENT_PATH
from documents.tests.test_views import GenericDocumentViewTestCase
from user_management.tests.literals import (
TEST_USER_PASSWORD, TEST_USER_USERNAME
)
from ..links import (
link_document_version_signature_delete,
link_document_version_signature_details,
)
from ..models import DetachedSignature
from ..permissions import (
permission_document_version_signature_delete,
permission_document_version_signature_view
)
from .literals import TEST_SIGNATURE_FILE_PATH, TEST_SIGNED_DOCUMENT_PATH
class DocumentSignatureLinksTestCase(GenericDocumentViewTestCase):
def test_document_version_signature_detail_link_no_permission(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.add_test_view(
test_object=document.latest_version.signatures.first()
)
context = self.get_test_view()
resolved_link = link_document_version_signature_details.resolve(
context=context
)
self.assertEqual(resolved_link, None)
def test_document_version_signature_detail_link_with_permission(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
self.add_test_view(
test_object=document.latest_version.signatures.first()
)
context = self.get_test_view()
resolved_link = link_document_version_signature_details.resolve(
context=context
)
self.assertNotEqual(resolved_link, None)
self.assertEqual(
resolved_link.url,
reverse(
'signatures:document_version_signature_details',
args=(document.latest_version.signatures.first().pk,)
)
)
def test_document_version_signature_delete_link_no_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.add_test_view(
test_object=document.latest_version.signatures.first()
)
context = self.get_test_view()
resolved_link = link_document_version_signature_delete.resolve(
context=context
)
self.assertEqual(resolved_link, None)
def test_document_version_signature_delete_link_with_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_delete.stored_permission
)
self.add_test_view(
test_object=document.latest_version.signatures.first()
)
context = self.get_test_view()
resolved_link = link_document_version_signature_delete.resolve(
context=context
)
self.assertNotEqual(resolved_link, None)
self.assertEqual(
resolved_link.url,
reverse(
'signatures:document_version_signature_delete',
args=(document.latest_version.signatures.first().pk,)
)
)

View File

@@ -1,101 +1,345 @@
from __future__ import unicode_literals
import os
import hashlib
import time
from django.conf import settings
from django.core.files.base import File
from django.core.files import File
from django.test import TestCase, override_settings
from documents.models import DocumentType
from django_gpg.models import Key
from django_gpg.tests.literals import TEST_KEY_DATA, TEST_KEY_PASSPHRASE
from documents.models import DocumentType, DocumentVersion
from documents.tests import TEST_DOCUMENT_PATH, TEST_DOCUMENT_TYPE
from django_gpg.literals import SIGNATURE_STATE_VALID
from django_gpg.runtime import gpg
from ..models import DocumentVersionSignature
from ..models import DetachedSignature, EmbeddedSignature
from ..tasks import task_verify_missing_embedded_signature
TEST_SIGNED_DOCUMENT_PATH = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents', 'mayan_11_1.pdf.gpg'
)
TEST_SIGNATURE_FILE_PATH = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents', 'mayan_11_1.pdf.sig'
)
TEST_KEY_FILE = os.path.join(
settings.BASE_DIR, 'contrib', 'sample_documents',
'key0x5F3F7F75D210724D.asc'
from .literals import (
TEST_SIGNED_DOCUMENT_PATH, TEST_SIGNATURE_FILE_PATH, TEST_KEY_FILE,
TEST_KEY_ID, TEST_SIGNATURE_ID
)
@override_settings(OCR_AUTO_OCR=False)
class DocumentTestCase(TestCase):
class DocumentSignaturesTestCase(TestCase):
def setUp(self):
self.document_type = DocumentType.objects.create(
label=TEST_DOCUMENT_TYPE
)
with open(TEST_DOCUMENT_PATH) as file_object:
self.document = self.document_type.new_document(
file_object=File(file_object), label='mayan_11_1.pdf'
)
with open(TEST_KEY_FILE) as file_object:
gpg.import_key(file_object.read())
def tearDown(self):
self.document_type.delete()
def test_document_no_signature(self):
self.assertEqual(
DocumentVersionSignature.objects.has_detached_signature(
self.document.latest_version
), False
)
def test_new_document_version_signed(self):
def test_embedded_signature_no_key(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document.new_version(
file_object=File(file_object), comment='test comment 1'
signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_post_key_verify(self):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, None)
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
def test_embedded_signature_post_no_key_verify(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version, signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
key.delete()
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.signature_id, None)
def test_embedded_signature_with_key(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.signed_document = self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(
signature.document_version,
self.signed_document.latest_version
)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
self.assertEqual(signature.signature_id, TEST_SIGNATURE_ID)
def test_detached_signature_no_key(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, None)
def test_detached_signature_with_key(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
def test_detached_signature_post_key_verify(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, None)
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
signature = DetachedSignature.objects.first()
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
def test_detached_signature_post_no_key_verify(self):
with open(TEST_KEY_FILE) as file_object:
key = Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.assertEqual(DetachedSignature.objects.count(), 1)
signature = DetachedSignature.objects.first()
self.assertEqual(signature.document_version, document.latest_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
self.assertEqual(signature.public_key_fingerprint, key.fingerprint)
key.delete()
signature = DetachedSignature.objects.first()
self.assertEqual(signature.public_key_fingerprint, None)
def test_document_no_signature(self):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(EmbeddedSignature.objects.count(), 0)
def test_new_signed_version(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
signed_version = document.new_version(
file_object=file_object, comment='test comment 1'
)
# Artifical delay since MySQL doesn't store microsecond data in
# timestamps. Version timestamp is used to determine which version
# is the latest.
time.sleep(1)
time.sleep(2)
self.assertEqual(
DocumentVersionSignature.objects.has_detached_signature(
self.document.latest_version
), False
)
self.assertEqual(
DocumentVersionSignature.objects.verify_signature(
self.document.latest_version
).status, SIGNATURE_STATE_VALID
self.assertEqual(EmbeddedSignature.objects.count(), 1)
signature = EmbeddedSignature.objects.first()
self.assertEqual(signature.document_version, signed_version)
self.assertEqual(signature.key_id, TEST_KEY_ID)
@override_settings(OCR_AUTO_OCR=False)
class EmbeddedSignaturesTestCase(TestCase):
def setUp(self):
self.document_type = DocumentType.objects.create(
label=TEST_DOCUMENT_TYPE
)
def test_detached_signatures(self):
def tearDown(self):
self.document_type.delete()
def test_unsigned_document_version_method(self):
TEST_UNSIGNED_DOCUMENT_COUNT = 3
TEST_SIGNED_DOCUMENT_COUNT = 3
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)
def test_task_verify_missing_embedded_signature(self):
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
TEST_UNSIGNED_DOCUMENT_COUNT = 4
TEST_SIGNED_DOCUMENT_COUNT = 2
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
task_verify_missing_embedded_signature.delay()
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)
def test_signing(self):
key = Key.objects.create(key_data=TEST_KEY_DATA)
with open(TEST_DOCUMENT_PATH) as file_object:
self.document.new_version(
file_object=File(file_object), comment='test comment 2'
document = self.document_type.new_document(
file_object=file_object
)
# GPGVerificationError
self.assertEqual(DocumentVersionSignature.objects.verify_signature(
self.document.latest_version), None
with document.latest_version.open() as file_object:
file_object.seek(0, 2)
original_size = file_object.tell()
file_object.seek(0)
original_hash = hashlib.sha256(file_object.read()).hexdigest()
new_version = EmbeddedSignature.objects.sign_document_version(
document_version=document.latest_version, key=key,
passphrase=TEST_KEY_PASSPHRASE
)
with open(TEST_SIGNATURE_FILE_PATH, 'rb') as file_object:
DocumentVersionSignature.objects.add_detached_signature(
self.document.latest_version, File(file_object)
)
self.assertEqual(EmbeddedSignature.objects.count(), 1)
self.assertEqual(
DocumentVersionSignature.objects.has_detached_signature(
self.document.latest_version
), True
)
self.assertEqual(
DocumentVersionSignature.objects.verify_signature(
self.document.latest_version
).status, SIGNATURE_STATE_VALID
)
with new_version.open() as file_object:
document_content_hash = hashlib.sha256(file_object.read()).hexdigest()
with new_version.open() as file_object:
file_object.seek(0, 2)
new_size = file_object.tell()
file_object.seek(0)
new_hash = hashlib.sha256(file_object.read()).hexdigest()
self.assertEqual(original_size, new_size)
self.assertEqual(original_hash, new_hash)

View File

@@ -0,0 +1,366 @@
from __future__ import absolute_import, unicode_literals
from django.core.files import File
from django_downloadview.test import assert_download_response
from django_gpg.models import Key
from documents.models import Document, DocumentVersion
from documents.tests.literals import TEST_DOCUMENT_PATH
from documents.tests.test_views import GenericDocumentViewTestCase
from user_management.tests import (
TEST_USER_USERNAME, TEST_USER_PASSWORD
)
from ..models import DetachedSignature, EmbeddedSignature
from ..permissions import (
permission_document_version_signature_delete,
permission_document_version_signature_download,
permission_document_version_signature_upload,
permission_document_version_signature_verify,
permission_document_version_signature_view
)
from .literals import (
TEST_SIGNATURE_FILE_PATH, TEST_SIGNED_DOCUMENT_PATH, TEST_KEY_FILE
)
TEST_UNSIGNED_DOCUMENT_COUNT = 4
TEST_SIGNED_DOCUMENT_COUNT = 2
class SignaturesViewTestCase(GenericDocumentViewTestCase):
def test_signature_list_view_no_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.get(
'signatures:document_version_signature_list',
args=(document.latest_version.pk,)
)
self.assertEqual(response.status_code, 403)
def test_signature_list_view_with_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.get(
'signatures:document_version_signature_list',
args=(document.latest_version.pk,)
)
self.assertContains(response, 'Total: 1', status_code=200)
def test_signature_detail_view_no_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.get(
'signatures:document_version_signature_details',
args=(signature.pk,)
)
self.assertEqual(response.status_code, 403)
def test_signature_detail_view_with_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.get(
'signatures:document_version_signature_details',
args=(signature.pk,)
)
self.assertContains(response, signature.signature_id, status_code=200)
def test_signature_upload_view_no_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
response = self.post(
'signatures:document_version_signature_upload',
args=(document.latest_version.pk,),
data={'signature_file': file_object}
)
self.assertEqual(response.status_code, 403)
self.assertEqual(DetachedSignature.objects.count(), 0)
def test_signature_upload_view_with_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_upload.stored_permission
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
response = self.post(
'signatures:document_version_signature_upload',
args=(document.latest_version.pk,),
data={'signature_file': file_object}
)
self.assertEqual(response.status_code, 302)
self.assertEqual(DetachedSignature.objects.count(), 1)
def test_signature_download_view_no_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.get(
'signatures:document_version_signature_download',
args=(signature.pk,),
)
self.assertEqual(response.status_code, 403)
def test_signature_download_view_with_permission(self):
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_download.stored_permission
)
response = self.get(
'signatures:document_version_signature_download',
args=(signature.pk,),
)
assert_download_response(
self, response=response, content=signature.signature_file.read(),
)
def test_signature_delete_view_no_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.post(
'signatures:document_version_signature_delete',
args=(signature.pk,)
)
self.assertEqual(response.status_code, 403)
self.assertEqual(DetachedSignature.objects.count(), 1)
def test_signature_delete_view_with_permission(self):
with open(TEST_KEY_FILE) as file_object:
Key.objects.create(key_data=file_object.read())
with open(TEST_DOCUMENT_PATH) as file_object:
document = self.document_type.new_document(
file_object=file_object
)
with open(TEST_SIGNATURE_FILE_PATH) as file_object:
signature = DetachedSignature.objects.create(
document_version=document.latest_version,
signature_file=File(file_object)
)
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_delete.stored_permission
)
self.role.permissions.add(
permission_document_version_signature_view.stored_permission
)
response = self.post(
'signatures:document_version_signature_delete',
args=(signature.pk,), follow=True
)
self.assertContains(response, 'deleted', status_code=200)
self.assertEqual(DetachedSignature.objects.count(), 0)
def test_missing_signature_verify_view_no_permission(self):
for document in self.document_type.documents.all():
document.delete(to_trash=False)
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
response = self.post(
'signatures:all_document_version_signature_verify', follow=True
)
self.assertEqual(response.status_code, 403)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
def test_missing_signature_verify_view_with_permission(self):
for document in self.document_type.documents.all():
document.delete(to_trash=False)
from documents.models import DocumentType
old_hooks = DocumentVersion._post_save_hooks
DocumentVersion._post_save_hooks = {}
for count in range(TEST_UNSIGNED_DOCUMENT_COUNT):
with open(TEST_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
for count in range(TEST_SIGNED_DOCUMENT_COUNT):
with open(TEST_SIGNED_DOCUMENT_PATH) as file_object:
self.document_type.new_document(
file_object=file_object
)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT + TEST_SIGNED_DOCUMENT_COUNT
)
DocumentVersion._post_save_hooks = old_hooks
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
self.role.permissions.add(
permission_document_version_signature_verify.stored_permission
)
response = self.post(
'signatures:all_document_version_signature_verify', follow=True
)
self.assertContains(response, 'queued', status_code=200)
self.assertEqual(
EmbeddedSignature.objects.unsigned_document_versions().count(),
TEST_UNSIGNED_DOCUMENT_COUNT
)

View File

@@ -2,22 +2,54 @@ from __future__ import unicode_literals
from django.conf.urls import patterns, url
from .views import (
AllDocumentSignatureVerifyView, DocumentVersionDetachedSignatureCreateView,
DocumentVersionEmbeddedSignatureCreateView,
DocumentVersionSignatureDeleteView, DocumentVersionSignatureDetailView,
DocumentVersionSignatureDownloadView, DocumentVersionSignatureListView,
DocumentVersionSignatureUploadView
)
urlpatterns = patterns(
'document_signatures.views',
'',
url(
r'^verify/(?P<document_pk>\d+)/$', 'document_verify',
name='document_verify'
r'^(?P<pk>\d+)/details/$',
DocumentVersionSignatureDetailView.as_view(),
name='document_version_signature_details'
),
url(
r'^upload/signature/(?P<document_pk>\d+)/$',
'document_signature_upload', name='document_signature_upload'
r'^signature/(?P<pk>\d+)/download/$',
DocumentVersionSignatureDownloadView.as_view(),
name='document_version_signature_download'
),
url(
r'^download/signature/(?P<document_pk>\d+)/$',
'document_signature_download', name='document_signature_download'
r'^document/version/(?P<pk>\d+)/signatures/list/$',
DocumentVersionSignatureListView.as_view(),
name='document_version_signature_list'
),
url(
r'^document/(?P<document_pk>\d+)/signature/delete/$',
'document_signature_delete', name='document_signature_delete'
r'^documents/version/(?P<pk>\d+)/signature/detached/upload/$',
DocumentVersionSignatureUploadView.as_view(),
name='document_version_signature_upload'
),
url(
r'^documents/version/(?P<pk>\d+)/signature/detached/create/$',
DocumentVersionDetachedSignatureCreateView.as_view(),
name='document_version_signature_detached_create'
),
url(
r'^documents/version/(?P<pk>\d+)/signature/embedded/create/$',
DocumentVersionEmbeddedSignatureCreateView.as_view(),
name='document_version_signature_embedded_create'
),
url(
r'^signature/(?P<pk>\d+)/delete/$',
DocumentVersionSignatureDeleteView.as_view(),
name='document_version_signature_delete'
),
url(
r'^tools/all/document/version/signature/verify/$',
AllDocumentSignatureVerifyView.as_view(),
name='all_document_version_signature_verify'
),
)

File diff suppressed because it is too large Load Diff

View File

@@ -13,7 +13,8 @@ from .permissions import (
link_document_workflow_instance_list = Link(
icon='fa fa-sitemap', permissions=(permission_workflow_view,),
text=_('Workflows'),
view='document_states:document_workflow_instance_list', args='object.pk'
view='document_states:document_workflow_instance_list',
args='resolved_object.pk'
)
link_setup_workflow_create = Link(
permissions=(permission_workflow_create,), text=_('Create workflow'),

View File

@@ -7,6 +7,10 @@ from events.classes import Event
event_document_create = Event(
name='documents_document_create', label=_('Document created')
)
event_document_download = Event(
name='documents_document_download',
label=_('Document downloaded')
)
event_document_properties_edit = Event(
name='documents_document_edit', label=_('Document properties edited')
)
@@ -20,3 +24,7 @@ event_document_version_revert = Event(
name='documents_document_version_revert',
label=_('Document version reverted')
)
event_document_view = Event(
name='documents_document_view',
label=_('Document viewed')
)

View File

@@ -169,8 +169,8 @@ class DocumentTypeSelectForm(forms.Form):
)
self.fields['document_type'] = forms.ModelChoiceField(
queryset=queryset,
label=_('Document type')
empty_label=None, label=_('Document type'), queryset=queryset,
required=True, widget=forms.widgets.Select(attrs={'size': 10})
)

View File

@@ -18,7 +18,7 @@ from .settings import setting_zoom_max_level, setting_zoom_min_level
def is_not_current_version(context):
return context['object'].document.latest_version.timestamp != context['object'].timestamp
return context['resolved_object'].document.latest_version.timestamp != context['resolved_object'].timestamp
def is_first_page(context):
@@ -40,17 +40,18 @@ def is_min_zoom(context):
# Facet
link_document_preview = Link(
icon='fa fa-eye', permissions=(permission_document_view,),
text=_('Preview'), view='documents:document_preview', args='object.id'
text=_('Preview'), view='documents:document_preview',
args='resolved_object.id'
)
link_document_properties = Link(
icon='fa fa-info', permissions=(permission_document_view,),
text=_('Properties'), view='documents:document_properties',
args='object.id'
args='resolved_object.id'
)
link_document_version_list = Link(
icon='fa fa-code-fork', permissions=(permission_document_view,),
text=_('Versions'), view='documents:document_version_list',
args='object.pk'
args='resolved_object.pk'
)
link_document_pages = Link(
icon='fa fa-files-o', permissions=(permission_document_view,),
@@ -61,36 +62,39 @@ link_document_pages = Link(
link_document_clear_transformations = Link(
permissions=(permission_transformation_delete,),
text=_('Clear transformations'),
view='documents:document_clear_transformations', args='object.id'
view='documents:document_clear_transformations', args='resolved_object.id'
)
link_document_delete = Link(
permissions=(permission_document_delete,), tags='dangerous',
text=_('Delete'), view='documents:document_delete', args='object.id'
text=_('Delete'), view='documents:document_delete',
args='resolved_object.id'
)
link_document_trash = Link(
permissions=(permission_document_trash,), tags='dangerous',
text=_('Move to trash'), view='documents:document_trash', args='object.id'
text=_('Move to trash'), view='documents:document_trash',
args='resolved_object.id'
)
link_document_edit = Link(
permissions=(permission_document_properties_edit,),
text=_('Edit properties'), view='documents:document_edit',
args='object.id'
args='resolved_object.id'
)
link_document_document_type_edit = Link(
permissions=(permission_document_properties_edit,), text=_('Change type'),
view='documents:document_document_type_edit', args='object.id'
view='documents:document_document_type_edit', args='resolved_object.id'
)
link_document_download = Link(
permissions=(permission_document_download,), text=_('Download'),
view='documents:document_download', args='object.id'
view='documents:document_download', args='resolved_object.id'
)
link_document_print = Link(
permissions=(permission_document_print,), text=_('Print'),
view='documents:document_print', args='object.id'
view='documents:document_print', args='resolved_object.id'
)
link_document_update_page_count = Link(
permissions=(permission_document_tools,), text=_('Recalculate page count'),
view='documents:document_update_page_count', args='object.pk'
args='resolved_object.pk', permissions=(permission_document_tools,),
text=_('Recalculate page count'),
view='documents:document_update_page_count'
)
link_document_restore = Link(
permissions=(permission_document_restore,), text=_('Restore'),
@@ -124,8 +128,8 @@ link_document_multiple_restore = Link(
text=_('Restore'), view='documents:document_multiple_restore'
)
link_document_version_download = Link(
args='object.pk', permissions=(permission_document_download,),
text=_('Download'), view='documents:document_version_download'
args='resolved_object.pk', permissions=(permission_document_download,),
text=_('Download version'), view='documents:document_version_download'
)
# Views

View File

@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('documents', '0032_auto_20160315_0537'),
]
operations = [
migrations.AlterModelOptions(
name='documenttypefilename',
options={'ordering': ('filename',), 'verbose_name': 'Quick label', 'verbose_name_plural': 'Quick labels'},
),
]

View File

@@ -127,7 +127,7 @@ class DocumentType(models.Model):
with transaction.atomic():
document = self.documents.create(
description=description or '',
label=label or unicode(file_object),
label=label or file_object.name,
language=language or setting_language.value
)
document.save(_user=_user)
@@ -138,7 +138,7 @@ class DocumentType(models.Model):
logger.critical(
'Unexpected exception while trying to create new document '
'"%s" from document type "%s"; %s',
label or unicode(file_object), self, exception
label or file_object.name, self, exception
)
raise
@@ -322,10 +322,6 @@ class Document(models.Model):
# Document has no version yet
return 0
@property
def signature_state(self):
return self.latest_version.signature_state
class DeletedDocument(Document):
objects = TrashCanManager()
@@ -405,7 +401,9 @@ class DocumentVersion(models.Model):
super(DocumentVersion, self).save(*args, **kwargs)
for key in sorted(DocumentVersion._post_save_hooks):
DocumentVersion._post_save_hooks[key](self)
DocumentVersion._post_save_hooks[key](
document_version=self
)
if new_document_version:
# Only do this for new documents
@@ -503,7 +501,9 @@ class DocumentVersion(models.Model):
else:
result = self.file.storage.open(self.file.name)
for key in sorted(DocumentVersion._pre_open_hooks):
result = DocumentVersion._pre_open_hooks[key](result, self)
result = DocumentVersion._pre_open_hooks[key](
file_object=result, document_version=self
)
return result

View File

@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.contrib.contenttypes.models import ContentType
from django.test import override_settings
from django.utils.six import BytesIO
from actstream.models import Action
from common.tests.test_views import GenericViewTestCase
from converter.models import Transformation
from converter.permissions import permission_transformation_delete
from user_management.tests.literals import (
TEST_USER_PASSWORD, TEST_USER_USERNAME
)
from ..events import event_document_download, event_document_view
from ..literals import DEFAULT_DELETE_PERIOD, DEFAULT_DELETE_TIME_UNIT
from ..models import (
DeletedDocument, Document, DocumentType, HASH_FUNCTION
)
from ..permissions import (
permission_document_create, permission_document_delete,
permission_document_download, permission_document_properties_edit,
permission_document_restore, permission_document_tools,
permission_document_trash, permission_document_type_create,
permission_document_type_delete, permission_document_type_edit,
permission_document_type_view, permission_document_version_revert,
permission_document_view, permission_empty_trash
)
from .literals import (
TEST_DOCUMENT_TYPE, TEST_DOCUMENT_TYPE_QUICK_LABEL,
TEST_SMALL_DOCUMENT_CHECKSUM, TEST_SMALL_DOCUMENT_PATH
)
from .test_views import GenericDocumentViewTestCase
TEST_DOCUMENT_TYPE_EDITED_LABEL = 'test document type edited label'
TEST_DOCUMENT_TYPE_2_LABEL = 'test document type 2 label'
TEST_TRANSFORMATION_NAME = 'rotate'
TEST_TRANSFORMATION_ARGUMENT = 'degrees: 180'
class DocumentEventsTestCase(GenericDocumentViewTestCase):
def test_document_download_event_no_permissions(self):
self.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
Action.objects.all().delete()
response = self.post(
'documents:document_download', args=(self.document.pk,)
)
self.assertEqual(response.status_code, 302)
self.assertEqual(list(Action.objects.any(obj=self.document)), [])
def test_document_download_event_with_permissions(self):
self.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
Action.objects.all().delete()
self.role.permissions.add(
permission_document_download.stored_permission
)
response = self.post(
'documents:document_download', args=(self.document.pk,),
)
event = Action.objects.any(obj=self.document).first()
self.assertEqual(event.verb, event_document_download.name)
self.assertEqual(event.target, self.document)
self.assertEqual(event.actor, self.user)
def test_document_view_event_no_permissions(self):
self.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
Action.objects.all().delete()
response = self.get(
'documents:document_preview', args=(self.document.pk,)
)
self.assertEqual(response.status_code, 403)
self.assertEqual(list(Action.objects.any(obj=self.document)), [])
def test_document_view_event_with_permissions(self):
self.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
Action.objects.all().delete()
self.role.permissions.add(
permission_document_view.stored_permission
)
response = self.get(
'documents:document_preview', args=(self.document.pk,),
)
event = Action.objects.any(obj=self.document).first()
self.assertEqual(event.verb, event_document_view.name)
self.assertEqual(event.target, self.document)
self.assertEqual(event.actor, self.user)

View File

@@ -49,7 +49,7 @@ class GenericDocumentViewTestCase(GenericViewTestCase):
with open(TEST_SMALL_DOCUMENT_PATH) as file_object:
self.document = self.document_type.new_document(
file_object=file_object, label='mayan_11_1.pdf'
file_object=file_object
)
def tearDown(self):

View File

@@ -30,6 +30,7 @@ from converter.permissions import permission_transformation_delete
from filetransfers.api import serve_file
from permissions import Permission
from .events import event_document_download, event_document_view
from .forms import (
DocumentDownloadForm, DocumentForm, DocumentPageForm, DocumentPreviewForm,
DocumentPropertiesForm, DocumentTypeSelectForm,
@@ -316,6 +317,10 @@ class DocumentPreviewView(SingleObjectDetailView):
DocumentPreviewView, self
).dispatch(request, *args, **kwargs)
self.get_object().add_as_recent_document_for_user(request.user)
event_document_view.commit(
actor=request.user, target=self.get_object()
)
return result
def get_extra_context(self):
@@ -841,6 +846,10 @@ def document_download(request, document_id=None, document_id_list=None, document
arcname=document_version.document.label
)
descriptor.close()
event_document_download.commit(
actor=request.user,
target=document_version.document
)
compressed_file.close()
@@ -865,6 +874,9 @@ def document_download(request, document_id=None, document_id_list=None, document
# Test permissions and trigger exception
fd = queryset.first().open()
fd.close()
event_document_download.commit(
actor=request.user, target=queryset.first().document
)
return serve_file(
request,
queryset.first().file,

View File

@@ -13,7 +13,8 @@ from .permissions import (
link_document_folder_list = Link(
icon='fa fa-folder', permissions=(permission_document_view,),
text=_('Folders'), view='folders:document_folder_list', args='object.pk'
text=_('Folders'), view='folders:document_folder_list',
args='resolved_object.pk'
)
link_folder_add_document = Link(
permissions=(permission_folder_add_document,), text=_('Add to a folder'),

View File

@@ -52,7 +52,7 @@ link_smart_link_instance_view = Link(
link_smart_link_instances_for_document = Link(
icon='fa fa-link', permissions=(permission_document_view,),
text=_('Smart links'), view='linking:smart_link_instances_for_document',
args='object.pk'
args='resolved_object.pk'
)
link_smart_link_list = Link(
permissions=(permission_smart_link_create,), text=_('Smart links'),

View File

@@ -10,12 +10,12 @@ from .permissions import (
)
link_send_document = Link(
permissions=(permission_mailing_send_document,), text=_('Email document'),
view='mailer:send_document', args='object.pk'
args='resolved_object.pk', permissions=(permission_mailing_send_document,),
text=_('Email document'), view='mailer:send_document'
)
link_send_document_link = Link(
permissions=(permission_mailing_link,), text=_('Email link'),
view='mailer:send_document_link', args='object.pk'
args='resolved_object.pk', permissions=(permission_mailing_link,),
text=_('Email link'), view='mailer:send_document_link'
)
link_document_mailing_error_log = Link(
icon='fa fa-envelope', permissions=(permission_view_error_log,),

View File

@@ -36,7 +36,7 @@ link_metadata_remove = Link(
)
link_metadata_view = Link(
icon='fa fa-pencil', permissions=(permission_metadata_document_view,),
text=_('Metadata'), view='metadata:metadata_view', args='object.pk'
text=_('Metadata'), view='metadata:metadata_view', args='resolved_object.pk'
)
link_setup_document_type_metadata = Link(
permissions=(permission_document_type_edit,), text=_('Optional metadata'),

View File

@@ -202,7 +202,7 @@ class Link(object):
def __init__(self, text, view, args=None, condition=None,
conditional_disable=None, description=None, icon=None,
keep_query=False, kwargs=None, permissions=None,
remove_from_query=None, tags=None):
permissions_related=None, remove_from_query=None, tags=None):
self.args = args or []
self.condition = condition
@@ -245,7 +245,8 @@ class Link(object):
if resolved_object:
try:
AccessControlList.objects.check_access(
self.permissions, request.user, resolved_object
self.permissions, request.user, resolved_object,
related=getattr(self, 'permissions_related', None)
)
except PermissionDenied:
return None

View File

@@ -14,8 +14,8 @@ link_document_content = Link(
text=_('OCR'), view='ocr:document_content', args='resolved_object.id'
)
link_document_submit = Link(
permissions=(permission_ocr_document,), text=_('Submit for OCR'),
view='ocr:document_submit', args='object.id'
args='resolved_object.id', permissions=(permission_ocr_document,),
text=_('Submit for OCR'), view='ocr:document_submit'
)
link_document_submit_all = Link(
icon='fa fa-font', permissions=(permission_ocr_document,),

View File

@@ -75,10 +75,9 @@ link_staging_file_delete = Link(
args=('source.pk', 'object.encoded_filename',)
)
link_upload_version = Link(
condition=document_new_version_not_blocked,
args='resolved_object.pk', condition=document_new_version_not_blocked,
permissions=(permission_document_new_version,),
text=_('Upload new version'), view='sources:upload_version',
args='object.pk'
)
link_setup_source_logs = Link(
text=_('Logs'), view='sources:setup_source_logs',

View File

@@ -65,7 +65,7 @@ class Source(models.Model):
with transaction.atomic():
document = Document.objects.create(
description=description or '', document_type=document_type,
label=label or unicode(file_object),
label=label or file_object.name,
language=language or setting_language.value
)
document.save(_user=user)
@@ -100,7 +100,7 @@ class Source(models.Model):
logger.critical(
'Unexpected exception while trying to create new document '
'"%s" from source "%s"; %s',
label or unicode(file_object), self, exception
label or file_object.name, self, exception
)
raise

View File

@@ -38,7 +38,7 @@ link_tag_edit = Link(
)
link_tag_document_list = Link(
icon='fa fa-tag', permissions=(permission_tag_view,), text=_('Tags'),
view='tags:document_tags', args='object.pk'
view='tags:document_tags', args='resolved_object.pk'
)
link_tag_list = Link(icon='fa fa-tag', text=_('Tags'), view='tags:tag_list')
link_tag_multiple_delete = Link(

View File

@@ -10,6 +10,7 @@ django-celery==3.1.17
django-colorful==1.1.0
django-compressor==2.0
django-cors-headers==1.1.0
django-downloadview==1.9
django-filetransfers==0.1.0
django-formtools==1.0
django-pure-pagination==0.3.0

View File

@@ -1,7 +1,7 @@
[tox]
envlist =
coverage-clean
py{27,33,34,35}-django{1.7,1.8}
py{27,33,34,35}-django{1.8,1.9}
coverage-report
[testenv]
@@ -16,8 +16,8 @@ commands=
deps =
-rrequirements/testing-no-django.txt
django1.7: django>=1.7,<1.8
django1.8: django>=1.8,<1.9
django1.9: django>=1.9,<1.10
setenv=
COVERAGE_FILE=.coverage.tox.{envname}