Add writable ACLs API endpoints.

Signed-off-by: Roberto Rosario <roberto.rosario.gonzalez@gmail.com>
This commit is contained in:
Roberto Rosario
2017-03-13 21:04:12 -04:00
parent f6b58655e8
commit 858eb8b020
6 changed files with 219 additions and 83 deletions

View File

@@ -12,6 +12,7 @@ from .models import AccessControlList
from .permissions import permission_acl_edit, permission_acl_view
from .serializers import (
AccessControlListPermissionSerializer, AccessControlListSerializer,
WritableAccessControlListPermissionSerializer,
WritableAccessControlListSerializer
)
@@ -79,9 +80,16 @@ class APIObjectACLListView(generics.ListCreateAPIView):
return super(APIObjectACLListView, self).post(*args, **kwargs)
class APIObjectACLView(generics.RetrieveAPIView):
class APIObjectACLView(generics.RetrieveDestroyAPIView):
serializer_class = AccessControlListSerializer
def delete(self, *args, **kwargs):
"""
Delete the selected access control list.
"""
return super(APIObjectACLView, self).delete(*args, **kwargs)
def get(self, *args, **kwargs):
"""
Returns the details of the selected access control list.
@@ -119,9 +127,7 @@ class APIObjectACLView(generics.RetrieveAPIView):
return self.get_content_object().acls.all()
class APIObjectACLPermissionListView(generics.ListAPIView):
serializer_class = AccessControlListPermissionSerializer
class APIObjectACLPermissionListView(generics.ListCreateAPIView):
def get(self, *args, **kwargs):
"""
Returns the access control list permission list.
@@ -160,6 +166,12 @@ class APIObjectACLPermissionListView(generics.ListAPIView):
def get_queryset(self):
return self.get_acl().permissions.all()
def get_serializer_class(self):
if self.request.method == 'GET':
return AccessControlListPermissionSerializer
else:
return WritableAccessControlListPermissionSerializer
def get_serializer_context(self):
return {
'acl': self.get_acl(),
@@ -168,11 +180,29 @@ class APIObjectACLPermissionListView(generics.ListAPIView):
'view': self
}
def post(self, *args, **kwargs):
"""
Add a new permission to the selected access control list.
"""
class APIObjectACLPermissionView(generics.RetrieveAPIView):
return super(
APIObjectACLPermissionListView, self
).post(*args, **kwargs)
class APIObjectACLPermissionView(generics.RetrieveDestroyAPIView):
lookup_url_kwarg = 'permission_pk'
serializer_class = AccessControlListPermissionSerializer
def delete(self, *args, **kwargs):
"""
Remove the permission from the selected access control list.
"""
return super(
APIObjectACLPermissionView, self
).delete(*args, **kwargs)
def get(self, *args, **kwargs):
"""
Returns the details of the selected access control list permission.

View File

@@ -43,7 +43,9 @@ class ModelPermission(object):
if proxy:
permissions.extend(cls._registry.get(proxy))
pks = [permission.stored_permission.pk for permission in set(permissions)]
pks = [
permission.stored_permission.pk for permission in set(permissions)
]
return StoredPermission.objects.filter(pk__in=pks)
@classmethod

View File

@@ -45,7 +45,9 @@ class AccessControlList(models.Model):
verbose_name_plural = _('Access entries')
def __str__(self):
return _('Permissions "%(permissions)s" to role "%(role)s" for "%(object)s"') % {
return _(
'Permissions "%(permissions)s" to role "%(role)s" for "%(object)s"'
) % {
'permissions': self.get_permission_titles(),
'object': self.content_object,
'role': self.role

View File

@@ -11,7 +11,7 @@ from rest_framework.reverse import reverse
from common.serializers import ContentTypeSerializer
from permissions import Permission
from permissions.models import Role
from permissions.models import Role, StoredPermission
from permissions.serializers import PermissionSerializer, RoleSerializer
from .models import AccessControlList
@@ -59,15 +59,7 @@ class AccessControlListPermissionSerializer(PermissionSerializer):
'different than the canonical workflow URL.'
)
)
def __init__(self, *args, **kwargs):
super(
AccessControlListPermissionSerializer, self
).__init__(*args, **kwargs)
# Make all fields (inherited and local) read ony.
for field in self._readable_fields:
field.read_only = True
acl_url = serializers.SerializerMethodField()
def get_acl_permission_url(self, instance):
return reverse(
@@ -79,8 +71,56 @@ class AccessControlListPermissionSerializer(PermissionSerializer):
), request=self.context['request'], format=self.context['format']
)
def get_acl_url(self, instance):
return reverse(
'rest_api:accesscontrollist-detail', args=(
self.context['acl'].content_type.app_label,
self.context['acl'].content_type.model,
self.context['acl'].object_id, self.context['acl'].pk
), request=self.context['request'], format=self.context['format']
)
class WritableAccessControlListPermissionSerializer(AccessControlListPermissionSerializer):
permission_pk = serializers.CharField(
help_text=_(
'Primary key of the new permission to grant to the access control '
'list.'
), write_only=True
)
class Meta:
fields = ('namespace',)
read_only_fields = ('namespace',)
def create(self, validated_data):
for permission in validated_data['permissions']:
self.context['acl'].permissions.add(permission)
return validated_data['permissions'][0]
def validate(self, attrs):
permissions_pk_list = attrs.pop('permission_pk', None)
permissions_result = []
if permissions_pk_list:
for pk in permissions_pk_list.split(','):
try:
permission = Permission.get(get_dict={'pk': pk})
except KeyError:
raise ValidationError(_('No such permission: %s') % pk)
else:
# Accumulate valid stored permission pks
permissions_result.append(permission.pk)
attrs['permissions'] = StoredPermission.objects.filter(
pk__in=permissions_result
)
return attrs
class WritableAccessControlListSerializer(serializers.ModelSerializer):
content_type = ContentTypeSerializer(read_only=True)
permissions_pk_list = serializers.CharField(
help_text=_(
'Comma separated list of permission primary keys to grant to this '
@@ -97,7 +137,7 @@ class WritableAccessControlListSerializer(serializers.ModelSerializer):
help_text=_(
'Primary keys of the role to which this access control list '
'binds to.'
), required=False
), write_only=True
)
url = serializers.SerializerMethodField()
@@ -107,33 +147,7 @@ class WritableAccessControlListSerializer(serializers.ModelSerializer):
'permissions_url', 'role_pk', 'url'
)
model = AccessControlList
read_only_fields = ('content_type', 'object_id',)
def _add_permissions(self, instance):
for pk in self.permissions_pk_list.split(','):
try:
stored_permission = Permission.get(get_dict={'pk': pk})
instance.permissions.add(stored_permission)
instance.save()
except KeyError:
raise ValidationError(_('No such permission: %s') % pk)
def create(self, validated_data):
validated_data['content_type'] = ContentType.objects.get_for_model(self.context['content_object'])
validated_data['object_id'] = self.context['content_object'].pk
self.permissions_pk_list = validated_data.pop(
'permissions_pk_list', ''
)
instance = super(
WritableAccessControlListSerializer, self
).create(validated_data)
if self.permissions_pk_list:
self._add_permissions(instance=instance)
return instance
read_only_fields = ('content_type', 'object_id')
def get_permissions_url(self, instance):
return reverse(
@@ -151,44 +165,40 @@ class WritableAccessControlListSerializer(serializers.ModelSerializer):
), request=self.context['request'], format=self.context['format']
)
def update(self, instance, validated_data):
self.permissions_pk_list = validated_data.pop(
'permissions_pk_list', ''
)
instance = super(WritableAccessControlListSerializer, self).update(
instance, validated_data
)
if self.permissions_pk_list:
instance.permissions.clear()
self._add_permissions(instance=instance)
return instance
def validate(self, attrs):
attrs['content_type'] = ContentType.objects.get_for_model(self.context['content_object'])
attrs['content_type'] = ContentType.objects.get_for_model(
self.context['content_object']
)
attrs['object_id'] = self.context['content_object'].pk
role_pk = attrs.pop('role_pk', None)
if not role_pk:
raise ValidationError(
{
'role_pk':
_(
'This field cannot be null.'
)
}
)
try:
attrs['role'] = Role.objects.get(pk=role_pk)
attrs['role'] = Role.objects.get(pk=attrs.pop('role_pk'))
except Role.DoesNotExist as exception:
raise ValidationError(force_text(exception))
permissions_pk_list = attrs.pop('permissions_pk_list', None)
permissions_result = []
if permissions_pk_list:
for pk in permissions_pk_list.split(','):
try:
permission = Permission.get(get_dict={'pk': pk})
except KeyError:
raise ValidationError(_('No such permission: %s') % pk)
else:
# Accumulate valid stored permission pks
permissions_result.append(permission.pk)
instance = AccessControlList(**attrs)
try:
instance.full_clean()
except DjangoValidationError as exception:
raise ValidationError(exception)
# Add a queryset of valid stored permissions so that they get added
# after the ACL gets created.
attrs['permissions'] = StoredPermission.objects.filter(
pk__in=permissions_result
)
return attrs

View File

@@ -20,6 +20,7 @@ from user_management.tests.literals import (
)
from ..models import AccessControlList
from ..permissions import permission_acl_view
@override_settings(OCR_AUTO_OCR=False)
@@ -84,6 +85,23 @@ class ACLAPITestCase(APITestCase):
response.data['results'][0]['role']['label'], TEST_ROLE_LABEL
)
def test_object_acl_delete_view(self):
self._create_acl()
response = self.client.delete(
reverse(
'rest_api:accesscontrollist-detail',
args=(
self.document_content_type.app_label,
self.document_content_type.model,
self.document.pk, self.acl.pk
)
)
)
self.assertEqual(response.status_code, 204)
self.assertEqual(AccessControlList.objects.count(), 0)
def test_object_acl_detail_view(self):
self._create_acl()
@@ -97,7 +115,6 @@ class ACLAPITestCase(APITestCase):
)
)
)
self.assertEqual(
response.data['content_type']['app_label'],
self.document_content_type.app_label
@@ -106,24 +123,23 @@ class ACLAPITestCase(APITestCase):
response.data['role']['label'], TEST_ROLE_LABEL
)
def test_object_acl_permission_list_view(self):
def test_object_acl_permission_delete_view(self):
self._create_acl()
permission = self.acl.permissions.first()
response = self.client.get(
response = self.client.delete(
reverse(
'rest_api:accesscontrollist-permission-list',
'rest_api:accesscontrollist-permission-detail',
args=(
self.document_content_type.app_label,
self.document_content_type.model,
self.document.pk, self.acl.pk
self.document.pk, self.acl.pk,
permission.pk
)
)
)
self.assertEqual(
response.data['results'][0]['pk'],
permission_document_view.pk
)
self.assertEqual(response.status_code, 204)
self.assertEqual(self.acl.permissions.count(), 0)
def test_object_acl_permission_detail_view(self):
self._create_acl()
@@ -145,6 +161,47 @@ class ACLAPITestCase(APITestCase):
response.data['pk'], permission_document_view.pk
)
def test_object_acl_permission_list_view(self):
self._create_acl()
response = self.client.get(
reverse(
'rest_api:accesscontrollist-permission-list',
args=(
self.document_content_type.app_label,
self.document_content_type.model,
self.document.pk, self.acl.pk
)
)
)
self.assertEqual(
response.data['results'][0]['pk'],
permission_document_view.pk
)
def test_object_acl_permission_list_post_view(self):
self._create_acl()
response = self.client.post(
reverse(
'rest_api:accesscontrollist-permission-list',
args=(
self.document_content_type.app_label,
self.document_content_type.model,
self.document.pk, self.acl.pk
)
), data={'permission_pk': permission_acl_view.pk}
)
self.assertEqual(response.status_code, 201)
self.assertQuerysetEqual(
ordered=False, qs=self.acl.permissions.all(), values=(
repr(permission_document_view.stored_permission),
repr(permission_acl_view.stored_permission)
)
)
def test_object_acl_post_no_permissions_added_view(self):
response = self.client.post(
reverse(
@@ -158,6 +215,40 @@ class ACLAPITestCase(APITestCase):
)
self.assertEqual(response.status_code, 201)
self.assertEqual(
self.document.acls.first().role, self.role
)
self.assertEqual(
self.document.acls.first().content_object, self.document
)
self.assertEqual(
self.document.acls.first().permissions.count(), 0
)
def test_object_acl_post_with_permissions_added_view(self):
response = self.client.post(
reverse(
'rest_api:accesscontrollist-list',
args=(
self.document_content_type.app_label,
self.document_content_type.model,
self.document.pk
)
), data={
'role_pk': self.role.pk,
'permissions_pk_list': permission_acl_view.pk
}
)
self.assertEqual(response.status_code, 201)
self.assertEqual(
self.document.acls.first().content_object, self.document
)
self.assertEqual(
self.document.acls.first().role, self.role
)
self.assertEqual(
self.document.acls.first().permissions.first(),
permission_acl_view.stored_permission
)

View File

@@ -153,7 +153,8 @@ class ACLListView(SingleObjectListView):
def get_queryset(self):
return AccessControlList.objects.filter(
content_type=self.object_content_type, object_id=self.content_object.pk
content_type=self.object_content_type,
object_id=self.content_object.pk
)