Merge branch 'feature/transition_acls' into feature/hotfix

This commit is contained in:
Roberto Rosario
2017-03-13 23:25:42 -04:00
16 changed files with 537 additions and 134 deletions

View File

@@ -48,6 +48,10 @@ class AccessControlListManager(models.Manager):
def check_access(self, permissions, user, obj, related=None):
if user.is_superuser or user.is_staff:
logger.debug('Permissions "%s" on "%s" granted to user "%s" as superuser or staff',
permissions,
obj,
user)
return True
try:
@@ -77,15 +81,31 @@ class AccessControlListManager(models.Manager):
for group in user.groups.all():
for role in group.roles.all():
if set(stored_permissions).intersection(set(self.get_inherited_permissions(role=role, obj=obj))):
logger.debug('Permissions "%s" on "%s" granted to user "%s" through role "%s" via inherited ACL',
permissions,
obj,
user,
role)
return True
user_roles.append(role)
if not self.filter(content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles).exists():
logger.debug('Permissions "%s" on "%s" denied for user "%s"',
permissions,
obj,
user)
raise PermissionDenied(ugettext('Insufficient access.'))
logger.debug('Permissions "%s" on "%s" granted to user "%s" through roles "%s" by direct ACL',
permissions,
obj,
user,
user_roles)
def filter_by_access(self, permission, user, queryset):
if user.is_superuser or user.is_staff:
logger.debug('Unfiltered queryset returned to user "%s" as superuser or staff',
user)
return queryset
user_roles = []
@@ -124,5 +144,8 @@ class AccessControlListManager(models.Manager):
content_type=content_type, role__in=user_roles,
permissions=permission.stored_permission
).values_list('object_id', flat=True))
logger.debug('Filtered queryset returned to user "%s" based on roles "%s"',
user,
user_roles)
return queryset.filter(parent_acl_query | acl_query)

View File

@@ -177,7 +177,7 @@ class ConfirmView(ObjectListPermissionFilterMixin, ObjectPermissionCheckMixin, V
return HttpResponseRedirect(self.get_success_url())
class FormView(ViewPermissionCheckMixin, ExtraContextMixin, RedirectionMixin, DjangoFormView):
class FormView(FormExtraKwargsMixin, ViewPermissionCheckMixin, ExtraContextMixin, RedirectionMixin, DjangoFormView):
template_name = 'appearance/generic_form.html'

View File

@@ -12,8 +12,8 @@ from permissions import Permission
__all__ = (
'DeleteExtraDataMixin', 'ExtraContextMixin',
'ObjectListPermissionFilterMixin', 'ObjectNameMixin',
'ObjectPermissionCheckMixin', 'RedirectionMixin',
'FormExtraKwargsMixin', 'ObjectListPermissionFilterMixin',
'ObjectNameMixin', 'ObjectPermissionCheckMixin', 'RedirectionMixin',
'ViewPermissionCheckMixin'
)
@@ -42,6 +42,22 @@ class ExtraContextMixin(object):
return context
class FormExtraKwargsMixin(object):
"""
Mixin that allows a view to pass extra keyword arguments to forms
"""
form_extra_kwargs = {}
def get_form_extra_kwargs(self):
return self.form_extra_kwargs
def get_form_kwargs(self):
result = super(FormExtraKwargsMixin, self).get_form_kwargs()
result.update(self.get_form_extra_kwargs())
return result
class MultipleInstanceActionMixin(object):
model = None
success_message = 'Operation performed on %(count)d object'

View File

@@ -76,6 +76,11 @@ class GenericViewTestCase(BaseTestCase):
data=data, follow=follow
)
def grant(self, permission):
self.role.permissions.add(
permission.stored_permission
)
def login(self, username, password):
logged_in = self.client.login(username=username, password=password)
@@ -84,6 +89,15 @@ class GenericViewTestCase(BaseTestCase):
self.assertTrue(logged_in)
self.assertTrue(user.is_authenticated())
def login_user(self):
self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD)
def login_admin_user(self):
self.login(username=TEST_ADMIN_USERNAME, password=TEST_ADMIN_PASSWORD)
def logout(self):
self.client.logout()
def post(self, viewname, *args, **kwargs):
data = kwargs.pop('data', {})
follow = kwargs.pop('follow', False)

View File

@@ -15,8 +15,7 @@ from rest_api.permissions import MayanPermission
from .models import Workflow
from .permissions import (
permission_workflow_create, permission_workflow_delete,
permission_workflow_edit, permission_workflow_transition,
permission_workflow_view
permission_workflow_edit, permission_workflow_view
)
from .serializers import (
NewWorkflowDocumentTypeSerializer, WorkflowDocumentTypeSerializer,
@@ -590,21 +589,27 @@ class APIWorkflowInstanceLogEntryListView(generics.ListCreateAPIView):
)
def get_document(self):
if self.request.method == 'GET':
permission_required = permission_workflow_view
else:
permission_required = permission_workflow_transition
document = get_object_or_404(Document, pk=self.kwargs['pk'])
try:
Permission.check_permissions(
self.request.user, (permission_required,)
)
except PermissionDenied:
AccessControlList.objects.check_access(
permission_required, self.request.user, document
)
if self.request.method == 'GET':
"""
Only test for permission if reading. If writing, the permission
will be checked in the serializer
IMPROVEMENT:
When writing, add check for permission or ACL for the workflow.
Failing that, check for ACLs for any of the workflow's transitions.
Failing that, then raise PermissionDenied
"""
try:
Permission.check_permissions(
self.request.user, (permission_workflow_view,)
)
except PermissionDenied:
AccessControlList.objects.check_access(
permission_workflow_view, self.request.user, document
)
return document

View File

@@ -4,6 +4,8 @@ from django.apps import apps
from django.db.models.signals import post_save
from django.utils.translation import ugettext_lazy as _
from acls import ModelPermission
from acls.links import link_acl_list
from common import (
MayanAppConfig, menu_facet, menu_object, menu_secondary, menu_setup,
menu_sidebar
@@ -23,6 +25,7 @@ from .links import (
link_setup_workflow_transition_delete, link_setup_workflow_transition_edit,
link_workflow_instance_detail, link_workflow_instance_transition
)
from .permissions import permission_workflow_transition
class DocumentStatesApp(MayanAppConfig):
@@ -46,6 +49,15 @@ class DocumentStatesApp(MayanAppConfig):
WorkflowState = self.get_model('WorkflowState')
WorkflowTransition = self.get_model('WorkflowTransition')
ModelPermission.register(
model=Workflow, permissions=(permission_workflow_transition,)
)
ModelPermission.register(
model=WorkflowTransition,
permissions=(permission_workflow_transition,)
)
SourceColumn(
source=Workflow, label=_('Initial state'),
func=lambda context: context['object'].get_initial_state() or _('None')
@@ -118,7 +130,7 @@ class DocumentStatesApp(MayanAppConfig):
links=(
link_setup_workflow_states, link_setup_workflow_transitions,
link_setup_workflow_document_types, link_setup_workflow_edit,
link_setup_workflow_delete
link_acl_list, link_setup_workflow_delete
), sources=(Workflow,)
)
menu_object.bind_links(
@@ -129,7 +141,7 @@ class DocumentStatesApp(MayanAppConfig):
)
menu_object.bind_links(
links=(
link_setup_workflow_transition_edit,
link_setup_workflow_transition_edit, link_acl_list,
link_setup_workflow_transition_delete
), sources=(WorkflowTransition,)
)

View File

@@ -1,4 +1,4 @@
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals
from django import forms
from django.utils.translation import ugettext_lazy as _
@@ -32,11 +32,16 @@ class WorkflowTransitionForm(forms.ModelForm):
class WorkflowInstanceTransitionForm(forms.Form):
def __init__(self, *args, **kwargs):
workflow = kwargs.pop('workflow')
user = kwargs.pop('user')
workflow_instance = kwargs.pop('workflow_instance')
super(WorkflowInstanceTransitionForm, self).__init__(*args, **kwargs)
self.fields['transition'].choices = workflow.get_transition_choices().values_list('pk', 'label')
self.fields[
'transition'
].queryset = workflow_instance.get_transition_choices(_user=user)
transition = forms.ChoiceField(label=_('Transition'))
transition = forms.ModelChoiceField(
label=_('Transition'), queryset=WorkflowTransition.objects.none()
)
comment = forms.CharField(
label=_('Comment'), required=False, widget=forms.widgets.Textarea()
)

View File

@@ -6,8 +6,7 @@ from navigation import Link
from .permissions import (
permission_workflow_create, permission_workflow_delete,
permission_workflow_edit, permission_workflow_transition,
permission_workflow_view,
permission_workflow_edit, permission_workflow_view,
)
link_document_workflow_instance_list = Link(
@@ -76,7 +75,7 @@ link_workflow_instance_detail = Link(
view='document_states:workflow_instance_detail', args='resolved_object.pk'
)
link_workflow_instance_transition = Link(
permissions=(permission_workflow_transition,), text=_('Transition'),
text=_('Transition'),
view='document_states:workflow_instance_transition',
args='resolved_object.pk'
)

View File

@@ -1,17 +1,20 @@
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals
import logging
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.urlresolvers import reverse
from django.db import IntegrityError, models
from django.utils.encoding import python_2_unicode_compatible
from django.utils.translation import ugettext_lazy as _
from acls.models import AccessControlList
from documents.models import Document, DocumentType
from permissions import Permission
from .managers import WorkflowManager
from .permissions import permission_workflow_transition
logger = logging.getLogger(__name__)
@@ -166,11 +169,41 @@ class WorkflowInstance(models.Model):
except AttributeError:
return None
def get_transition_choices(self):
def get_transition_choices(self, _user=None):
current_state = self.get_current_state()
if current_state:
return current_state.origin_transitions.all()
queryset = current_state.origin_transitions.all()
if _user:
try:
Permission.check_permissions(
requester=_user, permissions=(
permission_workflow_transition,
)
)
except PermissionDenied:
try:
"""
Check for ACL access to the workflow, if true, allow
all transition options.
"""
AccessControlList.objects.check_access(
permissions=permission_workflow_transition,
user=_user, obj=self.workflow
)
except PermissionDenied:
"""
If not ACL access to the workflow, filter transition
options by each transition ACL access
"""
queryset = AccessControlList.objects.filter_by_access(
permission=permission_workflow_transition,
user=_user, queryset=queryset
)
return queryset
else:
"""
This happens when a workflow has no initial state and a document
@@ -209,5 +242,5 @@ class WorkflowInstanceLogEntry(models.Model):
verbose_name_plural = _('Workflow instance log entries')
def clean(self):
if self.transition not in self.workflow_instance.get_transition_choices():
if self.transition not in self.workflow_instance.get_transition_choices(_user=self.user):
raise ValidationError(_('Not a valid transition choice.'))

View File

@@ -1,5 +1,6 @@
from __future__ import unicode_literals
from django.core.exceptions import ValidationError as DjangoValidationError
from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
@@ -328,24 +329,6 @@ class WritableWorkflowInstanceLogEntrySerializer(serializers.ModelSerializer):
)
model = WorkflowInstanceLogEntry
def create(self, validated_data):
validated_data['transition'] = WorkflowTransition.objects.get(
pk=validated_data.pop('transition_pk')
)
validated_data['user'] = self.context['request'].user
validated_data['workflow_instance'] = self.context['workflow_instance']
if validated_data['transition'] not in validated_data['workflow_instance'].get_transition_choices():
raise ValidationError(
{
'transition_pk': _('Not a valid transition choice.')
}
)
return super(WritableWorkflowInstanceLogEntrySerializer, self).create(
validated_data
)
def get_document_workflow_url(self, instance):
return reverse(
'rest_api:workflowinstance-detail', args=(
@@ -353,3 +336,19 @@ class WritableWorkflowInstanceLogEntrySerializer(serializers.ModelSerializer):
instance.workflow_instance.pk,
), request=self.context['request'], format=self.context['format']
)
def validate(self, attrs):
attrs['user'] = self.context['request'].user
attrs['workflow_instance'] = self.context['workflow_instance']
attrs['transition'] = WorkflowTransition.objects.get(
pk=attrs.pop('transition_pk')
)
instance = WorkflowInstanceLogEntry(**attrs)
try:
instance.full_clean()
except DjangoValidationError as exception:
raise ValidationError(exception)
return attrs

View File

@@ -8,5 +8,6 @@ TEST_WORKFLOW_INSTANCE_LOG_ENTRY_COMMENT = 'test workflow instance log entry com
TEST_WORKFLOW_STATE_LABEL = 'test state label'
TEST_WORKFLOW_STATE_LABEL_EDITED = 'test state label edited'
TEST_WORKFLOW_STATE_COMPLETION = 66
TEST_WORKFLOW_TRANSITION_LABEL = 'test transtition label'
TEST_WORKFLOW_TRANSITION_LABEL_EDITED = 'test transtition label edited'
TEST_WORKFLOW_TRANSITION_LABEL = 'test transition label'
TEST_WORKFLOW_TRANSITION_LABEL_2 = 'test transition label 2'
TEST_WORKFLOW_TRANSITION_LABEL_EDITED = 'test transition label edited'

View File

@@ -1,20 +1,27 @@
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.urlresolvers import reverse
from django.test import override_settings
from rest_framework.test import APITestCase
from acls.models import AccessControlList
from documents.models import DocumentType
from documents.tests.literals import (
TEST_DOCUMENT_TYPE, TEST_SMALL_DOCUMENT_PATH
)
from user_management.tests.literals import (
TEST_ADMIN_EMAIL, TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME
from permissions import Permission
from permissions.models import Role
from permissions.tests.literals import TEST_ROLE_LABEL
from user_management.tests import (
TEST_ADMIN_EMAIL, TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME, TEST_GROUP,
TEST_USER_EMAIL, TEST_USER_USERNAME, TEST_USER_PASSWORD
)
from ..models import Workflow
from ..permissions import permission_workflow_transition
from .literals import (
TEST_WORKFLOW_LABEL, TEST_WORKFLOW_LABEL_EDITED,
@@ -624,3 +631,154 @@ class DocumentWorkflowsAPITestCase(APITestCase):
response.data['results'][0]['transition']['label'],
TEST_WORKFLOW_TRANSITION_LABEL
)
@override_settings(OCR_AUTO_OCR=False)
class DocumentWorkflowsTransitionACLsAPITestCase(APITestCase):
def setUp(self):
self.user = get_user_model().objects.create_user(
username=TEST_USER_USERNAME, email=TEST_USER_EMAIL,
password=TEST_USER_PASSWORD
)
self.client.login(
username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD
)
self.document_type = DocumentType.objects.create(
label=TEST_DOCUMENT_TYPE
)
self.group = Group.objects.create(name=TEST_GROUP)
self.role = Role.objects.create(label=TEST_ROLE_LABEL)
self.group.user_set.add(self.user)
self.role.groups.add(self.group)
Permission.invalidate_cache()
def tearDown(self):
if hasattr(self, 'document_type'):
self.document_type.delete()
def _create_document(self):
with open(TEST_SMALL_DOCUMENT_PATH) as file_object:
self.document = self.document_type.new_document(
file_object=file_object
)
def _create_workflow(self):
self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
self.workflow.document_types.add(self.document_type)
def _create_workflow_states(self):
self._create_workflow()
self.workflow_state_1 = self.workflow.states.create(
completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION,
initial=True, label=TEST_WORKFLOW_INITIAL_STATE_LABEL
)
self.workflow_state_2 = self.workflow.states.create(
completion=TEST_WORKFLOW_STATE_COMPLETION,
label=TEST_WORKFLOW_STATE_LABEL
)
def _create_workflow_transition(self):
self._create_workflow_states()
self.workflow_transition = self.workflow.transitions.create(
label=TEST_WORKFLOW_TRANSITION_LABEL,
origin_state=self.workflow_state_1,
destination_state=self.workflow_state_2,
)
def test_workflow_transition_view_no_permission(self):
self._create_workflow_transition()
self._create_document()
workflow_instance = self.document.workflows.first()
self.client.post(
reverse(
'rest_api:workflowinstancelogentry-list', args=(
self.document.pk, workflow_instance.pk
),
), data={'transition_pk': self.workflow_transition.pk}
)
workflow_instance.refresh_from_db()
self.assertEqual(workflow_instance.log_entries.count(), 0)
def test_workflow_transition_view_with_permission(self):
self._create_workflow_transition()
self._create_document()
workflow_instance = self.document.workflows.first()
self.role.permissions.add(
permission_workflow_transition.stored_permission
)
self.client.post(
reverse(
'rest_api:workflowinstancelogentry-list', args=(
self.document.pk, workflow_instance.pk
),
), data={'transition_pk': self.workflow_transition.pk}
)
workflow_instance.refresh_from_db()
self.assertEqual(
workflow_instance.log_entries.first().transition.label,
TEST_WORKFLOW_TRANSITION_LABEL
)
def test_workflow_transition_view_with_workflow_acl(self):
self._create_workflow_transition()
self._create_document()
workflow_instance = self.document.workflows.first()
acl = AccessControlList.objects.create(
content_object=self.workflow, role=self.role
)
acl.permissions.add(permission_workflow_transition.stored_permission)
self.client.post(
reverse(
'rest_api:workflowinstancelogentry-list', args=(
self.document.pk, workflow_instance.pk
),
), data={'transition_pk': self.workflow_transition.pk}
)
workflow_instance.refresh_from_db()
self.assertEqual(
workflow_instance.log_entries.first().transition.label,
TEST_WORKFLOW_TRANSITION_LABEL
)
def test_workflow_transition_view_transition_acl(self):
self._create_workflow_transition()
self._create_document()
workflow_instance = self.document.workflows.first()
acl = AccessControlList.objects.create(
content_object=self.workflow_transition, role=self.role
)
acl.permissions.add(permission_workflow_transition.stored_permission)
self.client.post(
reverse(
'rest_api:workflowinstancelogentry-list', args=(
self.document.pk, workflow_instance.pk
),
), data={'transition_pk': self.workflow_transition.pk}
)
workflow_instance.refresh_from_db()
self.assertEqual(
workflow_instance.log_entries.first().transition.label,
TEST_WORKFLOW_TRANSITION_LABEL
)

View File

@@ -5,20 +5,24 @@ from django.core.urlresolvers import reverse
from django.test.client import Client
from django.test import TestCase
from acls.models import AccessControlList
from documents.models import DocumentType
from documents.tests.literals import (
TEST_DOCUMENT_TYPE, TEST_SMALL_DOCUMENT_PATH
)
from documents.tests.test_views import GenericDocumentViewTestCase
from user_management.tests import (
TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME, TEST_ADMIN_EMAIL
)
from ..models import Workflow, WorkflowState, WorkflowTransition
from ..permissions import permission_workflow_transition
from .literals import (
TEST_WORKFLOW_LABEL, TEST_WORKFLOW_INITIAL_STATE_LABEL,
TEST_WORKFLOW_INITIAL_STATE_COMPLETION, TEST_WORKFLOW_STATE_LABEL,
TEST_WORKFLOW_STATE_COMPLETION, TEST_WORKFLOW_TRANSITION_LABEL
TEST_WORKFLOW_STATE_COMPLETION, TEST_WORKFLOW_TRANSITION_LABEL,
TEST_WORKFLOW_TRANSITION_LABEL_2
)
@@ -48,6 +52,26 @@ class DocumentStateViewTestCase(TestCase):
def tearDown(self):
self.document_type.delete()
def _create_workflow(self):
self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
def _create_workflow_states(self):
self.workflow_initial_state = WorkflowState.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL,
completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True
)
self.workflow_state = WorkflowState.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_STATE_LABEL,
completion=TEST_WORKFLOW_STATE_COMPLETION
)
def _create_workflow_transition(self):
self.workflow_transition = WorkflowTransition.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL,
origin_state=self.workflow_initial_state,
destination_state=self.workflow_state
)
def test_creating_workflow(self):
response = self.client.post(
reverse(
@@ -63,14 +87,12 @@ class DocumentStateViewTestCase(TestCase):
self.assertEquals(Workflow.objects.all()[0].label, TEST_WORKFLOW_LABEL)
def test_delete_workflow(self):
workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
self.assertEquals(Workflow.objects.count(), 1)
self.assertEquals(Workflow.objects.all()[0].label, TEST_WORKFLOW_LABEL)
self._create_workflow()
response = self.client.post(
reverse(
'document_states:setup_workflow_delete', args=(workflow.pk,)
'document_states:setup_workflow_delete',
args=(self.workflow.pk,)
), follow=True
)
@@ -79,12 +101,12 @@ class DocumentStateViewTestCase(TestCase):
self.assertEquals(Workflow.objects.count(), 0)
def test_create_workflow_state(self):
workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
self._create_workflow()
response = self.client.post(
reverse(
'document_states:setup_workflow_state_create',
args=(workflow.pk,)
args=(self.workflow.pk,)
), data={
'label': TEST_WORKFLOW_STATE_LABEL,
'completion': TEST_WORKFLOW_STATE_COMPLETION,
@@ -103,43 +125,33 @@ class DocumentStateViewTestCase(TestCase):
)
def test_delete_workflow_state(self):
workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
workflow_state = WorkflowState.objects.create(
workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL,
completion=TEST_WORKFLOW_STATE_COMPLETION
)
self._create_workflow()
self._create_workflow_states()
response = self.client.post(
reverse(
'document_states:setup_workflow_state_delete',
args=(workflow_state.pk,)
args=(self.workflow_state.pk,)
), follow=True
)
self.assertEquals(response.status_code, 200)
self.assertEquals(WorkflowState.objects.count(), 0)
self.assertEquals(WorkflowState.objects.count(), 1)
self.assertEquals(Workflow.objects.count(), 1)
def test_create_workflow_transition(self):
workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
workflow_initial_state = WorkflowState.objects.create(
workflow=workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL,
completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True
)
workflow_state = WorkflowState.objects.create(
workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL,
completion=TEST_WORKFLOW_STATE_COMPLETION
)
self._create_workflow()
self._create_workflow_states()
response = self.client.post(
reverse(
'document_states:setup_workflow_transition_create',
args=(workflow.pk,)
args=(self.workflow.pk,)
), data={
'label': TEST_WORKFLOW_TRANSITION_LABEL,
'origin_state': workflow_initial_state.pk,
'destination_state': workflow_state.pk,
'origin_state': self.workflow_initial_state.pk,
'destination_state': self.workflow_state.pk,
}, follow=True
)
@@ -152,35 +164,22 @@ class DocumentStateViewTestCase(TestCase):
)
self.assertEquals(
WorkflowTransition.objects.all()[0].origin_state,
workflow_initial_state
self.workflow_initial_state
)
self.assertEquals(
WorkflowTransition.objects.all()[0].destination_state,
workflow_state
self.workflow_state
)
def test_delete_workflow_transition(self):
workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
workflow_initial_state = WorkflowState.objects.create(
workflow=workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL,
completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True
)
workflow_state = WorkflowState.objects.create(
workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL,
completion=TEST_WORKFLOW_STATE_COMPLETION
)
workflow_transition = WorkflowTransition.objects.create(
workflow=workflow, label=TEST_WORKFLOW_TRANSITION_LABEL,
origin_state=workflow_initial_state,
destination_state=workflow_state
)
self.assertEquals(WorkflowTransition.objects.count(), 1)
self._create_workflow()
self._create_workflow_states()
self._create_workflow_transition()
response = self.client.post(
reverse(
'document_states:setup_workflow_transition_delete',
args=(workflow_transition.pk,)
args=(self.workflow_transition.pk,)
), follow=True
)
@@ -189,3 +188,152 @@ class DocumentStateViewTestCase(TestCase):
self.assertEquals(WorkflowState.objects.count(), 2)
self.assertEquals(Workflow.objects.count(), 1)
self.assertEquals(WorkflowTransition.objects.count(), 0)
class DocumentStateTransitionViewTestCase(GenericDocumentViewTestCase):
def _create_workflow(self):
self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL)
self.workflow.document_types.add(self.document_type)
def _create_workflow_states(self):
self.workflow_initial_state = WorkflowState.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL,
completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True
)
self.workflow_state = WorkflowState.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_STATE_LABEL,
completion=TEST_WORKFLOW_STATE_COMPLETION
)
def _create_workflow_transitions(self):
self.workflow_transition = WorkflowTransition.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL,
origin_state=self.workflow_initial_state,
destination_state=self.workflow_state
)
self.workflow_transition_2 = WorkflowTransition.objects.create(
workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL_2,
origin_state=self.workflow_initial_state,
destination_state=self.workflow_state
)
def _create_document(self):
with open(TEST_SMALL_DOCUMENT_PATH) as file_object:
self.document_2 = self.document_type.new_document(
file_object=file_object
)
def _request_workflow_transition(self, workflow_instance):
return self.post(
'document_states:workflow_instance_transition',
args=(workflow_instance.pk,), data={
'transition': self.workflow_transition.pk,
}
)
def test_transition_workflow_no_permission(self):
self.login_user()
self._create_workflow()
self._create_workflow_states()
self._create_workflow_transitions()
self._create_document()
workflow_instance = self.document_2.workflows.first()
response = self._request_workflow_transition(
workflow_instance=workflow_instance
)
self.assertEqual(response.status_code, 200)
# Workflow should remain in the same initial state
self.assertEqual(
workflow_instance.get_current_state(), self.workflow_initial_state
)
def test_transition_workflow_with_permission(self):
"""
Test transitioning a workflow by granting the transition workflow
permission to the role.
"""
self.login_user()
self._create_workflow()
self._create_workflow_states()
self._create_workflow_transitions()
self._create_document()
workflow_instance = self.document_2.workflows.first()
self.grant(permission_workflow_transition)
response = self._request_workflow_transition(
workflow_instance=workflow_instance
)
self.assertEqual(response.status_code, 302)
# Workflow should remain in the same initial state
self.assertEqual(
workflow_instance.get_current_state(), self.workflow_state
)
def test_transition_workflow_with_workflow_acl(self):
"""
Test transitioning a workflow by granting the transition workflow
permission to the workflow itself via ACL.
"""
self.login_user()
self._create_workflow()
self._create_workflow_states()
self._create_workflow_transitions()
self._create_document()
workflow_instance = self.document_2.workflows.first()
acl = AccessControlList.objects.create(
content_object=self.workflow, role=self.role
)
acl.permissions.add(permission_workflow_transition.stored_permission)
response = self._request_workflow_transition(
workflow_instance=workflow_instance
)
self.assertEqual(response.status_code, 302)
# Workflow should remain in the same initial state
self.assertEqual(
workflow_instance.get_current_state(), self.workflow_state
)
def test_transition_workflow_with_transition_acl(self):
"""
Test transitioning a workflow by granting the transition workflow
permission to the transition via ACL.
"""
self.login_user()
self._create_workflow()
self._create_workflow_states()
self._create_workflow_transitions()
self._create_document()
workflow_instance = self.document_2.workflows.first()
acl = AccessControlList.objects.create(
content_object=self.workflow_transition, role=self.role
)
acl.permissions.add(permission_workflow_transition.stored_permission)
response = self._request_workflow_transition(
workflow_instance=workflow_instance
)
self.assertEqual(response.status_code, 302)
# Workflow should remain in the same initial state
self.assertEqual(
workflow_instance.get_current_state(), self.workflow_state
)

View File

@@ -7,12 +7,11 @@ from django.db.utils import IntegrityError
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.utils.translation import ugettext_lazy as _
from django.views.generic import FormView
from acls.models import AccessControlList
from common.views import (
AssignRemoveView, SingleObjectCreateView, SingleObjectDeleteView,
SingleObjectEditView, SingleObjectListView
AssignRemoveView, FormView, SingleObjectCreateView,
SingleObjectDeleteView, SingleObjectEditView, SingleObjectListView
)
from documents.models import Document
from documents.views import DocumentListView
@@ -25,8 +24,7 @@ from .forms import (
from .models import Workflow, WorkflowInstance, WorkflowState, WorkflowTransition
from .permissions import (
permission_workflow_create, permission_workflow_delete,
permission_workflow_edit, permission_workflow_transition,
permission_workflow_view,
permission_workflow_edit, permission_workflow_view,
)
@@ -130,28 +128,10 @@ class WorkflowInstanceTransitionView(FormView):
form_class = WorkflowInstanceTransitionForm
template_name = 'appearance/generic_form.html'
def dispatch(self, request, *args, **kwargs):
try:
Permission.check_permissions(
request.user, (permission_workflow_transition,)
)
except PermissionDenied:
AccessControlList.objects.check_access(
permission_workflow_transition, request.user,
self.get_workflow_instance().document
)
return super(
WorkflowInstanceTransitionView, self
).dispatch(request, *args, **kwargs)
def form_valid(self, form):
transition = self.get_workflow_instance().workflow.transitions.get(
pk=form.cleaned_data['transition']
)
self.get_workflow_instance().do_transition(
comment=form.cleaned_data['comment'], transition=transition,
user=self.request.user
comment=form.cleaned_data['comment'],
transition=form.cleaned_data['transition'], user=self.request.user
)
return HttpResponseRedirect(self.get_success_url())
@@ -166,10 +146,11 @@ class WorkflowInstanceTransitionView(FormView):
'workflow_instance': self.get_workflow_instance(),
}
def get_form_kwargs(self):
kwargs = super(WorkflowInstanceTransitionView, self).get_form_kwargs()
kwargs['workflow'] = self.get_workflow_instance()
return kwargs
def get_form_extra_kwargs(self):
return {
'user': self.request.user,
'workflow_instance': self.get_workflow_instance()
}
def get_success_url(self):
return self.get_workflow_instance().get_absolute_url()

View File

@@ -61,8 +61,9 @@ class Permission(object):
if permission.stored_permission.requester_has_this(requester):
return True
logger.debug('no permission')
logger.debug('User "%s" does not have permissions "%s"',
requester,
permissions)
raise PermissionDenied(_('Insufficient permissions.'))
@classmethod

View File

@@ -46,17 +46,25 @@ class StoredPermission(models.Model):
verbose_name_plural = _('Permissions')
def requester_has_this(self, user):
logger.debug('user: %s', user)
if user.is_superuser or user.is_staff:
logger.debug('Permission "%s" granted to user "%s" as superuser or staff',
self,
user)
return True
# Request is one of the permission's holders?
for group in user.groups.all():
for role in group.roles.all():
if self in role.permissions.all():
logger.debug('Permission "%s" granted to user "%s" through role "%s"',
self,
user,
role)
return True
logger.debug('Fallthru')
logger.debug('Fallthru: Permission "%s" not granted to user "%s"',
self,
user)
return False