diff --git a/mayan/apps/acls/managers.py b/mayan/apps/acls/managers.py index e9bbb88b5b..20c00391e9 100644 --- a/mayan/apps/acls/managers.py +++ b/mayan/apps/acls/managers.py @@ -48,6 +48,10 @@ class AccessControlListManager(models.Manager): def check_access(self, permissions, user, obj, related=None): if user.is_superuser or user.is_staff: + logger.debug('Permissions "%s" on "%s" granted to user "%s" as superuser or staff', + permissions, + obj, + user) return True try: @@ -77,15 +81,31 @@ class AccessControlListManager(models.Manager): for group in user.groups.all(): for role in group.roles.all(): if set(stored_permissions).intersection(set(self.get_inherited_permissions(role=role, obj=obj))): + logger.debug('Permissions "%s" on "%s" granted to user "%s" through role "%s" via inherited ACL', + permissions, + obj, + user, + role) return True user_roles.append(role) if not self.filter(content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles).exists(): + logger.debug('Permissions "%s" on "%s" denied for user "%s"', + permissions, + obj, + user) raise PermissionDenied(ugettext('Insufficient access.')) + logger.debug('Permissions "%s" on "%s" granted to user "%s" through roles "%s" by direct ACL', + permissions, + obj, + user, + user_roles) def filter_by_access(self, permission, user, queryset): if user.is_superuser or user.is_staff: + logger.debug('Unfiltered queryset returned to user "%s" as superuser or staff', + user) return queryset user_roles = [] @@ -124,5 +144,8 @@ class AccessControlListManager(models.Manager): content_type=content_type, role__in=user_roles, permissions=permission.stored_permission ).values_list('object_id', flat=True)) + logger.debug('Filtered queryset returned to user "%s" based on roles "%s"', + user, + user_roles) return queryset.filter(parent_acl_query | acl_query) diff --git a/mayan/apps/common/generics.py b/mayan/apps/common/generics.py index d79cfb145d..4efc317922 100644 --- a/mayan/apps/common/generics.py +++ b/mayan/apps/common/generics.py @@ -177,7 +177,7 @@ class ConfirmView(ObjectListPermissionFilterMixin, ObjectPermissionCheckMixin, V return HttpResponseRedirect(self.get_success_url()) -class FormView(ViewPermissionCheckMixin, ExtraContextMixin, RedirectionMixin, DjangoFormView): +class FormView(FormExtraKwargsMixin, ViewPermissionCheckMixin, ExtraContextMixin, RedirectionMixin, DjangoFormView): template_name = 'appearance/generic_form.html' diff --git a/mayan/apps/common/mixins.py b/mayan/apps/common/mixins.py index 34b89ff736..aefabb9d57 100644 --- a/mayan/apps/common/mixins.py +++ b/mayan/apps/common/mixins.py @@ -12,8 +12,8 @@ from permissions import Permission __all__ = ( 'DeleteExtraDataMixin', 'ExtraContextMixin', - 'ObjectListPermissionFilterMixin', 'ObjectNameMixin', - 'ObjectPermissionCheckMixin', 'RedirectionMixin', + 'FormExtraKwargsMixin', 'ObjectListPermissionFilterMixin', + 'ObjectNameMixin', 'ObjectPermissionCheckMixin', 'RedirectionMixin', 'ViewPermissionCheckMixin' ) @@ -42,6 +42,22 @@ class ExtraContextMixin(object): return context +class FormExtraKwargsMixin(object): + """ + Mixin that allows a view to pass extra keyword arguments to forms + """ + + form_extra_kwargs = {} + + def get_form_extra_kwargs(self): + return self.form_extra_kwargs + + def get_form_kwargs(self): + result = super(FormExtraKwargsMixin, self).get_form_kwargs() + result.update(self.get_form_extra_kwargs()) + return result + + class MultipleInstanceActionMixin(object): model = None success_message = 'Operation performed on %(count)d object' diff --git a/mayan/apps/common/tests/test_views.py b/mayan/apps/common/tests/test_views.py index 6fa95a8721..ec56a1b9ea 100644 --- a/mayan/apps/common/tests/test_views.py +++ b/mayan/apps/common/tests/test_views.py @@ -76,6 +76,11 @@ class GenericViewTestCase(BaseTestCase): data=data, follow=follow ) + def grant(self, permission): + self.role.permissions.add( + permission.stored_permission + ) + def login(self, username, password): logged_in = self.client.login(username=username, password=password) @@ -84,6 +89,15 @@ class GenericViewTestCase(BaseTestCase): self.assertTrue(logged_in) self.assertTrue(user.is_authenticated()) + def login_user(self): + self.login(username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD) + + def login_admin_user(self): + self.login(username=TEST_ADMIN_USERNAME, password=TEST_ADMIN_PASSWORD) + + def logout(self): + self.client.logout() + def post(self, viewname, *args, **kwargs): data = kwargs.pop('data', {}) follow = kwargs.pop('follow', False) diff --git a/mayan/apps/document_states/api_views.py b/mayan/apps/document_states/api_views.py index 08a1e55d43..ea99a0eb74 100644 --- a/mayan/apps/document_states/api_views.py +++ b/mayan/apps/document_states/api_views.py @@ -15,8 +15,7 @@ from rest_api.permissions import MayanPermission from .models import Workflow from .permissions import ( permission_workflow_create, permission_workflow_delete, - permission_workflow_edit, permission_workflow_transition, - permission_workflow_view + permission_workflow_edit, permission_workflow_view ) from .serializers import ( NewWorkflowDocumentTypeSerializer, WorkflowDocumentTypeSerializer, @@ -590,21 +589,27 @@ class APIWorkflowInstanceLogEntryListView(generics.ListCreateAPIView): ) def get_document(self): - if self.request.method == 'GET': - permission_required = permission_workflow_view - else: - permission_required = permission_workflow_transition - document = get_object_or_404(Document, pk=self.kwargs['pk']) - try: - Permission.check_permissions( - self.request.user, (permission_required,) - ) - except PermissionDenied: - AccessControlList.objects.check_access( - permission_required, self.request.user, document - ) + if self.request.method == 'GET': + """ + Only test for permission if reading. If writing, the permission + will be checked in the serializer + + IMPROVEMENT: + When writing, add check for permission or ACL for the workflow. + Failing that, check for ACLs for any of the workflow's transitions. + Failing that, then raise PermissionDenied + """ + + try: + Permission.check_permissions( + self.request.user, (permission_workflow_view,) + ) + except PermissionDenied: + AccessControlList.objects.check_access( + permission_workflow_view, self.request.user, document + ) return document diff --git a/mayan/apps/document_states/apps.py b/mayan/apps/document_states/apps.py index 706d7435d3..76ffb2673d 100644 --- a/mayan/apps/document_states/apps.py +++ b/mayan/apps/document_states/apps.py @@ -4,6 +4,8 @@ from django.apps import apps from django.db.models.signals import post_save from django.utils.translation import ugettext_lazy as _ +from acls import ModelPermission +from acls.links import link_acl_list from common import ( MayanAppConfig, menu_facet, menu_object, menu_secondary, menu_setup, menu_sidebar @@ -23,6 +25,7 @@ from .links import ( link_setup_workflow_transition_delete, link_setup_workflow_transition_edit, link_workflow_instance_detail, link_workflow_instance_transition ) +from .permissions import permission_workflow_transition class DocumentStatesApp(MayanAppConfig): @@ -46,6 +49,15 @@ class DocumentStatesApp(MayanAppConfig): WorkflowState = self.get_model('WorkflowState') WorkflowTransition = self.get_model('WorkflowTransition') + ModelPermission.register( + model=Workflow, permissions=(permission_workflow_transition,) + ) + + ModelPermission.register( + model=WorkflowTransition, + permissions=(permission_workflow_transition,) + ) + SourceColumn( source=Workflow, label=_('Initial state'), func=lambda context: context['object'].get_initial_state() or _('None') @@ -118,7 +130,7 @@ class DocumentStatesApp(MayanAppConfig): links=( link_setup_workflow_states, link_setup_workflow_transitions, link_setup_workflow_document_types, link_setup_workflow_edit, - link_setup_workflow_delete + link_acl_list, link_setup_workflow_delete ), sources=(Workflow,) ) menu_object.bind_links( @@ -129,7 +141,7 @@ class DocumentStatesApp(MayanAppConfig): ) menu_object.bind_links( links=( - link_setup_workflow_transition_edit, + link_setup_workflow_transition_edit, link_acl_list, link_setup_workflow_transition_delete ), sources=(WorkflowTransition,) ) diff --git a/mayan/apps/document_states/forms.py b/mayan/apps/document_states/forms.py index 98ead3a160..df288b255b 100644 --- a/mayan/apps/document_states/forms.py +++ b/mayan/apps/document_states/forms.py @@ -1,4 +1,4 @@ -from __future__ import unicode_literals +from __future__ import absolute_import, unicode_literals from django import forms from django.utils.translation import ugettext_lazy as _ @@ -32,11 +32,16 @@ class WorkflowTransitionForm(forms.ModelForm): class WorkflowInstanceTransitionForm(forms.Form): def __init__(self, *args, **kwargs): - workflow = kwargs.pop('workflow') + user = kwargs.pop('user') + workflow_instance = kwargs.pop('workflow_instance') super(WorkflowInstanceTransitionForm, self).__init__(*args, **kwargs) - self.fields['transition'].choices = workflow.get_transition_choices().values_list('pk', 'label') + self.fields[ + 'transition' + ].queryset = workflow_instance.get_transition_choices(_user=user) - transition = forms.ChoiceField(label=_('Transition')) + transition = forms.ModelChoiceField( + label=_('Transition'), queryset=WorkflowTransition.objects.none() + ) comment = forms.CharField( label=_('Comment'), required=False, widget=forms.widgets.Textarea() ) diff --git a/mayan/apps/document_states/links.py b/mayan/apps/document_states/links.py index f9a0e3bba3..c0ea98a89b 100644 --- a/mayan/apps/document_states/links.py +++ b/mayan/apps/document_states/links.py @@ -6,8 +6,7 @@ from navigation import Link from .permissions import ( permission_workflow_create, permission_workflow_delete, - permission_workflow_edit, permission_workflow_transition, - permission_workflow_view, + permission_workflow_edit, permission_workflow_view, ) link_document_workflow_instance_list = Link( @@ -76,7 +75,7 @@ link_workflow_instance_detail = Link( view='document_states:workflow_instance_detail', args='resolved_object.pk' ) link_workflow_instance_transition = Link( - permissions=(permission_workflow_transition,), text=_('Transition'), + text=_('Transition'), view='document_states:workflow_instance_transition', args='resolved_object.pk' ) diff --git a/mayan/apps/document_states/models.py b/mayan/apps/document_states/models.py index 203b29574f..6963d07751 100644 --- a/mayan/apps/document_states/models.py +++ b/mayan/apps/document_states/models.py @@ -1,17 +1,20 @@ -from __future__ import unicode_literals +from __future__ import absolute_import, unicode_literals import logging from django.conf import settings -from django.core.exceptions import ValidationError +from django.core.exceptions import PermissionDenied, ValidationError from django.core.urlresolvers import reverse from django.db import IntegrityError, models from django.utils.encoding import python_2_unicode_compatible from django.utils.translation import ugettext_lazy as _ +from acls.models import AccessControlList from documents.models import Document, DocumentType +from permissions import Permission from .managers import WorkflowManager +from .permissions import permission_workflow_transition logger = logging.getLogger(__name__) @@ -166,11 +169,41 @@ class WorkflowInstance(models.Model): except AttributeError: return None - def get_transition_choices(self): + def get_transition_choices(self, _user=None): current_state = self.get_current_state() if current_state: - return current_state.origin_transitions.all() + queryset = current_state.origin_transitions.all() + + if _user: + try: + Permission.check_permissions( + requester=_user, permissions=( + permission_workflow_transition, + ) + ) + except PermissionDenied: + try: + """ + Check for ACL access to the workflow, if true, allow + all transition options. + """ + + AccessControlList.objects.check_access( + permissions=permission_workflow_transition, + user=_user, obj=self.workflow + ) + except PermissionDenied: + """ + If not ACL access to the workflow, filter transition + options by each transition ACL access + """ + + queryset = AccessControlList.objects.filter_by_access( + permission=permission_workflow_transition, + user=_user, queryset=queryset + ) + return queryset else: """ This happens when a workflow has no initial state and a document @@ -209,5 +242,5 @@ class WorkflowInstanceLogEntry(models.Model): verbose_name_plural = _('Workflow instance log entries') def clean(self): - if self.transition not in self.workflow_instance.get_transition_choices(): + if self.transition not in self.workflow_instance.get_transition_choices(_user=self.user): raise ValidationError(_('Not a valid transition choice.')) diff --git a/mayan/apps/document_states/serializers.py b/mayan/apps/document_states/serializers.py index 4193a46104..3a0daf4ab2 100644 --- a/mayan/apps/document_states/serializers.py +++ b/mayan/apps/document_states/serializers.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +from django.core.exceptions import ValidationError as DjangoValidationError from django.utils.translation import ugettext_lazy as _ from rest_framework import serializers @@ -328,24 +329,6 @@ class WritableWorkflowInstanceLogEntrySerializer(serializers.ModelSerializer): ) model = WorkflowInstanceLogEntry - def create(self, validated_data): - validated_data['transition'] = WorkflowTransition.objects.get( - pk=validated_data.pop('transition_pk') - ) - validated_data['user'] = self.context['request'].user - validated_data['workflow_instance'] = self.context['workflow_instance'] - - if validated_data['transition'] not in validated_data['workflow_instance'].get_transition_choices(): - raise ValidationError( - { - 'transition_pk': _('Not a valid transition choice.') - } - ) - - return super(WritableWorkflowInstanceLogEntrySerializer, self).create( - validated_data - ) - def get_document_workflow_url(self, instance): return reverse( 'rest_api:workflowinstance-detail', args=( @@ -353,3 +336,19 @@ class WritableWorkflowInstanceLogEntrySerializer(serializers.ModelSerializer): instance.workflow_instance.pk, ), request=self.context['request'], format=self.context['format'] ) + + def validate(self, attrs): + attrs['user'] = self.context['request'].user + attrs['workflow_instance'] = self.context['workflow_instance'] + attrs['transition'] = WorkflowTransition.objects.get( + pk=attrs.pop('transition_pk') + ) + + instance = WorkflowInstanceLogEntry(**attrs) + + try: + instance.full_clean() + except DjangoValidationError as exception: + raise ValidationError(exception) + + return attrs diff --git a/mayan/apps/document_states/tests/literals.py b/mayan/apps/document_states/tests/literals.py index fac41db489..004d17de8d 100644 --- a/mayan/apps/document_states/tests/literals.py +++ b/mayan/apps/document_states/tests/literals.py @@ -8,5 +8,6 @@ TEST_WORKFLOW_INSTANCE_LOG_ENTRY_COMMENT = 'test workflow instance log entry com TEST_WORKFLOW_STATE_LABEL = 'test state label' TEST_WORKFLOW_STATE_LABEL_EDITED = 'test state label edited' TEST_WORKFLOW_STATE_COMPLETION = 66 -TEST_WORKFLOW_TRANSITION_LABEL = 'test transtition label' -TEST_WORKFLOW_TRANSITION_LABEL_EDITED = 'test transtition label edited' +TEST_WORKFLOW_TRANSITION_LABEL = 'test transition label' +TEST_WORKFLOW_TRANSITION_LABEL_2 = 'test transition label 2' +TEST_WORKFLOW_TRANSITION_LABEL_EDITED = 'test transition label edited' diff --git a/mayan/apps/document_states/tests/test_api.py b/mayan/apps/document_states/tests/test_api.py index 9ee947abe2..ffc9366894 100644 --- a/mayan/apps/document_states/tests/test_api.py +++ b/mayan/apps/document_states/tests/test_api.py @@ -1,20 +1,27 @@ -from __future__ import unicode_literals +from __future__ import absolute_import, unicode_literals from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group from django.core.urlresolvers import reverse from django.test import override_settings from rest_framework.test import APITestCase +from acls.models import AccessControlList from documents.models import DocumentType from documents.tests.literals import ( TEST_DOCUMENT_TYPE, TEST_SMALL_DOCUMENT_PATH ) -from user_management.tests.literals import ( - TEST_ADMIN_EMAIL, TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME +from permissions import Permission +from permissions.models import Role +from permissions.tests.literals import TEST_ROLE_LABEL +from user_management.tests import ( + TEST_ADMIN_EMAIL, TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME, TEST_GROUP, + TEST_USER_EMAIL, TEST_USER_USERNAME, TEST_USER_PASSWORD ) from ..models import Workflow +from ..permissions import permission_workflow_transition from .literals import ( TEST_WORKFLOW_LABEL, TEST_WORKFLOW_LABEL_EDITED, @@ -624,3 +631,154 @@ class DocumentWorkflowsAPITestCase(APITestCase): response.data['results'][0]['transition']['label'], TEST_WORKFLOW_TRANSITION_LABEL ) + + +@override_settings(OCR_AUTO_OCR=False) +class DocumentWorkflowsTransitionACLsAPITestCase(APITestCase): + def setUp(self): + self.user = get_user_model().objects.create_user( + username=TEST_USER_USERNAME, email=TEST_USER_EMAIL, + password=TEST_USER_PASSWORD + ) + + self.client.login( + username=TEST_USER_USERNAME, password=TEST_USER_PASSWORD + ) + + self.document_type = DocumentType.objects.create( + label=TEST_DOCUMENT_TYPE + ) + + self.group = Group.objects.create(name=TEST_GROUP) + self.role = Role.objects.create(label=TEST_ROLE_LABEL) + self.group.user_set.add(self.user) + self.role.groups.add(self.group) + Permission.invalidate_cache() + + def tearDown(self): + if hasattr(self, 'document_type'): + self.document_type.delete() + + def _create_document(self): + with open(TEST_SMALL_DOCUMENT_PATH) as file_object: + self.document = self.document_type.new_document( + file_object=file_object + ) + + def _create_workflow(self): + self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) + self.workflow.document_types.add(self.document_type) + + def _create_workflow_states(self): + self._create_workflow() + self.workflow_state_1 = self.workflow.states.create( + completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, + initial=True, label=TEST_WORKFLOW_INITIAL_STATE_LABEL + ) + self.workflow_state_2 = self.workflow.states.create( + completion=TEST_WORKFLOW_STATE_COMPLETION, + label=TEST_WORKFLOW_STATE_LABEL + ) + + def _create_workflow_transition(self): + self._create_workflow_states() + self.workflow_transition = self.workflow.transitions.create( + label=TEST_WORKFLOW_TRANSITION_LABEL, + origin_state=self.workflow_state_1, + destination_state=self.workflow_state_2, + ) + + def test_workflow_transition_view_no_permission(self): + self._create_workflow_transition() + self._create_document() + + workflow_instance = self.document.workflows.first() + + self.client.post( + reverse( + 'rest_api:workflowinstancelogentry-list', args=( + self.document.pk, workflow_instance.pk + ), + ), data={'transition_pk': self.workflow_transition.pk} + ) + + workflow_instance.refresh_from_db() + + self.assertEqual(workflow_instance.log_entries.count(), 0) + + def test_workflow_transition_view_with_permission(self): + self._create_workflow_transition() + self._create_document() + + workflow_instance = self.document.workflows.first() + + self.role.permissions.add( + permission_workflow_transition.stored_permission + ) + + self.client.post( + reverse( + 'rest_api:workflowinstancelogentry-list', args=( + self.document.pk, workflow_instance.pk + ), + ), data={'transition_pk': self.workflow_transition.pk} + ) + + workflow_instance.refresh_from_db() + + self.assertEqual( + workflow_instance.log_entries.first().transition.label, + TEST_WORKFLOW_TRANSITION_LABEL + ) + + def test_workflow_transition_view_with_workflow_acl(self): + self._create_workflow_transition() + self._create_document() + + workflow_instance = self.document.workflows.first() + + acl = AccessControlList.objects.create( + content_object=self.workflow, role=self.role + ) + acl.permissions.add(permission_workflow_transition.stored_permission) + + self.client.post( + reverse( + 'rest_api:workflowinstancelogentry-list', args=( + self.document.pk, workflow_instance.pk + ), + ), data={'transition_pk': self.workflow_transition.pk} + ) + + workflow_instance.refresh_from_db() + + self.assertEqual( + workflow_instance.log_entries.first().transition.label, + TEST_WORKFLOW_TRANSITION_LABEL + ) + + def test_workflow_transition_view_transition_acl(self): + self._create_workflow_transition() + self._create_document() + + workflow_instance = self.document.workflows.first() + + acl = AccessControlList.objects.create( + content_object=self.workflow_transition, role=self.role + ) + acl.permissions.add(permission_workflow_transition.stored_permission) + + self.client.post( + reverse( + 'rest_api:workflowinstancelogentry-list', args=( + self.document.pk, workflow_instance.pk + ), + ), data={'transition_pk': self.workflow_transition.pk} + ) + + workflow_instance.refresh_from_db() + + self.assertEqual( + workflow_instance.log_entries.first().transition.label, + TEST_WORKFLOW_TRANSITION_LABEL + ) diff --git a/mayan/apps/document_states/tests/test_views.py b/mayan/apps/document_states/tests/test_views.py index 028818247d..5f37b0bacf 100644 --- a/mayan/apps/document_states/tests/test_views.py +++ b/mayan/apps/document_states/tests/test_views.py @@ -5,20 +5,24 @@ from django.core.urlresolvers import reverse from django.test.client import Client from django.test import TestCase +from acls.models import AccessControlList from documents.models import DocumentType from documents.tests.literals import ( TEST_DOCUMENT_TYPE, TEST_SMALL_DOCUMENT_PATH ) +from documents.tests.test_views import GenericDocumentViewTestCase from user_management.tests import ( TEST_ADMIN_PASSWORD, TEST_ADMIN_USERNAME, TEST_ADMIN_EMAIL ) from ..models import Workflow, WorkflowState, WorkflowTransition +from ..permissions import permission_workflow_transition from .literals import ( TEST_WORKFLOW_LABEL, TEST_WORKFLOW_INITIAL_STATE_LABEL, TEST_WORKFLOW_INITIAL_STATE_COMPLETION, TEST_WORKFLOW_STATE_LABEL, - TEST_WORKFLOW_STATE_COMPLETION, TEST_WORKFLOW_TRANSITION_LABEL + TEST_WORKFLOW_STATE_COMPLETION, TEST_WORKFLOW_TRANSITION_LABEL, + TEST_WORKFLOW_TRANSITION_LABEL_2 ) @@ -48,6 +52,26 @@ class DocumentStateViewTestCase(TestCase): def tearDown(self): self.document_type.delete() + def _create_workflow(self): + self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) + + def _create_workflow_states(self): + self.workflow_initial_state = WorkflowState.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL, + completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True + ) + self.workflow_state = WorkflowState.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_STATE_LABEL, + completion=TEST_WORKFLOW_STATE_COMPLETION + ) + + def _create_workflow_transition(self): + self.workflow_transition = WorkflowTransition.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL, + origin_state=self.workflow_initial_state, + destination_state=self.workflow_state + ) + def test_creating_workflow(self): response = self.client.post( reverse( @@ -63,14 +87,12 @@ class DocumentStateViewTestCase(TestCase): self.assertEquals(Workflow.objects.all()[0].label, TEST_WORKFLOW_LABEL) def test_delete_workflow(self): - workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) - - self.assertEquals(Workflow.objects.count(), 1) - self.assertEquals(Workflow.objects.all()[0].label, TEST_WORKFLOW_LABEL) + self._create_workflow() response = self.client.post( reverse( - 'document_states:setup_workflow_delete', args=(workflow.pk,) + 'document_states:setup_workflow_delete', + args=(self.workflow.pk,) ), follow=True ) @@ -79,12 +101,12 @@ class DocumentStateViewTestCase(TestCase): self.assertEquals(Workflow.objects.count(), 0) def test_create_workflow_state(self): - workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) + self._create_workflow() response = self.client.post( reverse( 'document_states:setup_workflow_state_create', - args=(workflow.pk,) + args=(self.workflow.pk,) ), data={ 'label': TEST_WORKFLOW_STATE_LABEL, 'completion': TEST_WORKFLOW_STATE_COMPLETION, @@ -103,43 +125,33 @@ class DocumentStateViewTestCase(TestCase): ) def test_delete_workflow_state(self): - workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) - workflow_state = WorkflowState.objects.create( - workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL, - completion=TEST_WORKFLOW_STATE_COMPLETION - ) + self._create_workflow() + self._create_workflow_states() response = self.client.post( reverse( 'document_states:setup_workflow_state_delete', - args=(workflow_state.pk,) + args=(self.workflow_state.pk,) ), follow=True ) self.assertEquals(response.status_code, 200) - self.assertEquals(WorkflowState.objects.count(), 0) + self.assertEquals(WorkflowState.objects.count(), 1) self.assertEquals(Workflow.objects.count(), 1) def test_create_workflow_transition(self): - workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) - workflow_initial_state = WorkflowState.objects.create( - workflow=workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL, - completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True - ) - workflow_state = WorkflowState.objects.create( - workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL, - completion=TEST_WORKFLOW_STATE_COMPLETION - ) + self._create_workflow() + self._create_workflow_states() response = self.client.post( reverse( 'document_states:setup_workflow_transition_create', - args=(workflow.pk,) + args=(self.workflow.pk,) ), data={ 'label': TEST_WORKFLOW_TRANSITION_LABEL, - 'origin_state': workflow_initial_state.pk, - 'destination_state': workflow_state.pk, + 'origin_state': self.workflow_initial_state.pk, + 'destination_state': self.workflow_state.pk, }, follow=True ) @@ -152,35 +164,22 @@ class DocumentStateViewTestCase(TestCase): ) self.assertEquals( WorkflowTransition.objects.all()[0].origin_state, - workflow_initial_state + self.workflow_initial_state ) self.assertEquals( WorkflowTransition.objects.all()[0].destination_state, - workflow_state + self.workflow_state ) def test_delete_workflow_transition(self): - workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) - workflow_initial_state = WorkflowState.objects.create( - workflow=workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL, - completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True - ) - workflow_state = WorkflowState.objects.create( - workflow=workflow, label=TEST_WORKFLOW_STATE_LABEL, - completion=TEST_WORKFLOW_STATE_COMPLETION - ) - workflow_transition = WorkflowTransition.objects.create( - workflow=workflow, label=TEST_WORKFLOW_TRANSITION_LABEL, - origin_state=workflow_initial_state, - destination_state=workflow_state - ) - - self.assertEquals(WorkflowTransition.objects.count(), 1) + self._create_workflow() + self._create_workflow_states() + self._create_workflow_transition() response = self.client.post( reverse( 'document_states:setup_workflow_transition_delete', - args=(workflow_transition.pk,) + args=(self.workflow_transition.pk,) ), follow=True ) @@ -189,3 +188,152 @@ class DocumentStateViewTestCase(TestCase): self.assertEquals(WorkflowState.objects.count(), 2) self.assertEquals(Workflow.objects.count(), 1) self.assertEquals(WorkflowTransition.objects.count(), 0) + + +class DocumentStateTransitionViewTestCase(GenericDocumentViewTestCase): + def _create_workflow(self): + self.workflow = Workflow.objects.create(label=TEST_WORKFLOW_LABEL) + self.workflow.document_types.add(self.document_type) + + def _create_workflow_states(self): + self.workflow_initial_state = WorkflowState.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_INITIAL_STATE_LABEL, + completion=TEST_WORKFLOW_INITIAL_STATE_COMPLETION, initial=True + ) + self.workflow_state = WorkflowState.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_STATE_LABEL, + completion=TEST_WORKFLOW_STATE_COMPLETION + ) + + def _create_workflow_transitions(self): + self.workflow_transition = WorkflowTransition.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL, + origin_state=self.workflow_initial_state, + destination_state=self.workflow_state + ) + + self.workflow_transition_2 = WorkflowTransition.objects.create( + workflow=self.workflow, label=TEST_WORKFLOW_TRANSITION_LABEL_2, + origin_state=self.workflow_initial_state, + destination_state=self.workflow_state + ) + + def _create_document(self): + with open(TEST_SMALL_DOCUMENT_PATH) as file_object: + self.document_2 = self.document_type.new_document( + file_object=file_object + ) + + def _request_workflow_transition(self, workflow_instance): + return self.post( + 'document_states:workflow_instance_transition', + args=(workflow_instance.pk,), data={ + 'transition': self.workflow_transition.pk, + } + ) + + def test_transition_workflow_no_permission(self): + self.login_user() + self._create_workflow() + self._create_workflow_states() + self._create_workflow_transitions() + self._create_document() + + workflow_instance = self.document_2.workflows.first() + + response = self._request_workflow_transition( + workflow_instance=workflow_instance + ) + + self.assertEqual(response.status_code, 200) + + # Workflow should remain in the same initial state + self.assertEqual( + workflow_instance.get_current_state(), self.workflow_initial_state + ) + + def test_transition_workflow_with_permission(self): + """ + Test transitioning a workflow by granting the transition workflow + permission to the role. + """ + + self.login_user() + self._create_workflow() + self._create_workflow_states() + self._create_workflow_transitions() + self._create_document() + + workflow_instance = self.document_2.workflows.first() + + self.grant(permission_workflow_transition) + response = self._request_workflow_transition( + workflow_instance=workflow_instance + ) + + self.assertEqual(response.status_code, 302) + + # Workflow should remain in the same initial state + self.assertEqual( + workflow_instance.get_current_state(), self.workflow_state + ) + + def test_transition_workflow_with_workflow_acl(self): + """ + Test transitioning a workflow by granting the transition workflow + permission to the workflow itself via ACL. + """ + + self.login_user() + self._create_workflow() + self._create_workflow_states() + self._create_workflow_transitions() + self._create_document() + + workflow_instance = self.document_2.workflows.first() + + acl = AccessControlList.objects.create( + content_object=self.workflow, role=self.role + ) + acl.permissions.add(permission_workflow_transition.stored_permission) + + response = self._request_workflow_transition( + workflow_instance=workflow_instance + ) + + self.assertEqual(response.status_code, 302) + + # Workflow should remain in the same initial state + self.assertEqual( + workflow_instance.get_current_state(), self.workflow_state + ) + + def test_transition_workflow_with_transition_acl(self): + """ + Test transitioning a workflow by granting the transition workflow + permission to the transition via ACL. + """ + + self.login_user() + self._create_workflow() + self._create_workflow_states() + self._create_workflow_transitions() + self._create_document() + + workflow_instance = self.document_2.workflows.first() + + acl = AccessControlList.objects.create( + content_object=self.workflow_transition, role=self.role + ) + acl.permissions.add(permission_workflow_transition.stored_permission) + + response = self._request_workflow_transition( + workflow_instance=workflow_instance + ) + + self.assertEqual(response.status_code, 302) + + # Workflow should remain in the same initial state + self.assertEqual( + workflow_instance.get_current_state(), self.workflow_state + ) diff --git a/mayan/apps/document_states/views.py b/mayan/apps/document_states/views.py index 132212c8f0..d8cb69611b 100644 --- a/mayan/apps/document_states/views.py +++ b/mayan/apps/document_states/views.py @@ -7,12 +7,11 @@ from django.db.utils import IntegrityError from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404 from django.utils.translation import ugettext_lazy as _ -from django.views.generic import FormView from acls.models import AccessControlList from common.views import ( - AssignRemoveView, SingleObjectCreateView, SingleObjectDeleteView, - SingleObjectEditView, SingleObjectListView + AssignRemoveView, FormView, SingleObjectCreateView, + SingleObjectDeleteView, SingleObjectEditView, SingleObjectListView ) from documents.models import Document from documents.views import DocumentListView @@ -25,8 +24,7 @@ from .forms import ( from .models import Workflow, WorkflowInstance, WorkflowState, WorkflowTransition from .permissions import ( permission_workflow_create, permission_workflow_delete, - permission_workflow_edit, permission_workflow_transition, - permission_workflow_view, + permission_workflow_edit, permission_workflow_view, ) @@ -130,28 +128,10 @@ class WorkflowInstanceTransitionView(FormView): form_class = WorkflowInstanceTransitionForm template_name = 'appearance/generic_form.html' - def dispatch(self, request, *args, **kwargs): - try: - Permission.check_permissions( - request.user, (permission_workflow_transition,) - ) - except PermissionDenied: - AccessControlList.objects.check_access( - permission_workflow_transition, request.user, - self.get_workflow_instance().document - ) - - return super( - WorkflowInstanceTransitionView, self - ).dispatch(request, *args, **kwargs) - def form_valid(self, form): - transition = self.get_workflow_instance().workflow.transitions.get( - pk=form.cleaned_data['transition'] - ) self.get_workflow_instance().do_transition( - comment=form.cleaned_data['comment'], transition=transition, - user=self.request.user + comment=form.cleaned_data['comment'], + transition=form.cleaned_data['transition'], user=self.request.user ) return HttpResponseRedirect(self.get_success_url()) @@ -166,10 +146,11 @@ class WorkflowInstanceTransitionView(FormView): 'workflow_instance': self.get_workflow_instance(), } - def get_form_kwargs(self): - kwargs = super(WorkflowInstanceTransitionView, self).get_form_kwargs() - kwargs['workflow'] = self.get_workflow_instance() - return kwargs + def get_form_extra_kwargs(self): + return { + 'user': self.request.user, + 'workflow_instance': self.get_workflow_instance() + } def get_success_url(self): return self.get_workflow_instance().get_absolute_url() diff --git a/mayan/apps/permissions/classes.py b/mayan/apps/permissions/classes.py index 0de231d8b0..e1655484a5 100644 --- a/mayan/apps/permissions/classes.py +++ b/mayan/apps/permissions/classes.py @@ -61,8 +61,9 @@ class Permission(object): if permission.stored_permission.requester_has_this(requester): return True - logger.debug('no permission') - + logger.debug('User "%s" does not have permissions "%s"', + requester, + permissions) raise PermissionDenied(_('Insufficient permissions.')) @classmethod diff --git a/mayan/apps/permissions/models.py b/mayan/apps/permissions/models.py index af35e599ea..9182e57094 100644 --- a/mayan/apps/permissions/models.py +++ b/mayan/apps/permissions/models.py @@ -46,17 +46,25 @@ class StoredPermission(models.Model): verbose_name_plural = _('Permissions') def requester_has_this(self, user): - logger.debug('user: %s', user) if user.is_superuser or user.is_staff: + logger.debug('Permission "%s" granted to user "%s" as superuser or staff', + self, + user) return True # Request is one of the permission's holders? for group in user.groups.all(): for role in group.roles.all(): if self in role.permissions.all(): + logger.debug('Permission "%s" granted to user "%s" through role "%s"', + self, + user, + role) return True - logger.debug('Fallthru') + logger.debug('Fallthru: Permission "%s" not granted to user "%s"', + self, + user) return False