Refactor the access control computation

Rewrite the ACL queryset filtering to move most of the
computation to the database manager view the ORM.

Add support for cascading access control checking.

Update the .check_access() method to work as a front
end of the new .restrict_queryset method. The workflow
for access control now follow Django convention of
first generating a queryset and then attempt to .get()
the desired element of the queryset.

This update also allows restricting a queryset by related
fields which can be Generic Foreign Keys.

Signed-off-by: Roberto Rosario <Roberto.Rosario@mayan-edms.com>
This commit is contained in:
Roberto Rosario
2019-01-19 00:05:21 -04:00
parent b53c026877
commit 5d7f810477
3 changed files with 183 additions and 156 deletions

View File

@@ -217,6 +217,11 @@
- The tags app permission workflow is now reciprocal. In order
to attach a tag, the user's role will need the tag attach
permissions for both, the document and the tag.
- Refactor and optimize the access control computation. Most of
the computation has been moved to the database instead of doing
filtering in Python. The refactor added cascading access checking
in preparation for nested cabinet access control and the removal
of the permission proxy support which is now redundant.
3.1.9 (2018-11-01)
==================

View File

@@ -5,6 +5,7 @@ from django.utils.translation import ugettext_lazy as _
from mayan.apps.common import MayanAppConfig, menu_object, menu_sidebar
from mayan.apps.navigation import SourceColumn
from .classes import ModelPermission
from .links import link_acl_create, link_acl_delete, link_acl_permissions
@@ -21,6 +22,9 @@ class ACLsApp(MayanAppConfig):
AccessControlList = self.get_model(model_name='AccessControlList')
ModelPermission.register_inheritance(
model=AccessControlList, related='content_object',
)
SourceColumn(
attribute='role', is_identifier=True, is_sortable=True,
source=AccessControlList

View File

@@ -1,15 +1,20 @@
from __future__ import absolute_import, unicode_literals
import logging
import warnings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import PermissionDenied
from django.db import models
from django.db.models import Q
from django.utils.translation import ugettext
from django.utils.translation import ugettext_lazy as _
from django.db.models import CharField, Value as V, Q
from django.db.models.functions import Concat
from django.http import Http404
from mayan.apps.common.utils import resolve_attribute, return_related
from mayan.apps.common.utils import (
get_related_field, resolve_attribute, return_related
)
from mayan.apps.common.warnings import InterfaceWarning
from mayan.apps.permissions import Permission
from mayan.apps.permissions.models import StoredPermission
@@ -24,173 +29,146 @@ class AccessControlListManager(models.Manager):
Implement a 3 tier permission system, involving a permissions, an actor
and an object
"""
def check_access(self, permissions, user, obj, related=None):
if user.is_superuser or user.is_staff:
logger.debug(
'Permissions "%s" on "%s" granted to user "%s" as superuser '
'or staff', permissions, obj, user
def _get_acl_filters(self, queryset, stored_permission, user, related_field_name=None):
"""
This method does the bulk of the work. It generates filters for the
AccessControlList model to determine if there are ACL entries for the
members of the queryset's model provided.
"""
# Determine which of the cases we need to address
# 1: No related field
# 2: Related field
# 3: Related field that is Generic Foreign Key
# 4: No related field, but has an inherited related field, solved by
# 5: Inherited field of a related field
# recursion, branches to #2 or #3.
# Not addressed yet
# 6: Inherited field of a related field that is Generic Foreign Key
result = []
if related_field_name:
related_field = get_related_field(
model=queryset.model, related_field_name=related_field_name
)
return True
try:
return Permission.check_permissions(
requester=user, permissions=permissions
)
except PermissionDenied:
try:
stored_permissions = [
permission.stored_permission for permission in permissions
]
except TypeError:
# Not a list of permissions, just one
stored_permissions = (permissions.stored_permission,)
if isinstance(related_field, GenericForeignKey):
# Case 3: Generic Foreign Key, multiple ContentTypes + object
# id combinations
content_type_object_id_queryset = queryset.annotate(
ct_fk_combination=Concat(
related_field.ct_field, V('-'), related_field.fk_field,
output_field=CharField()
)
).values('ct_fk_combination')
if related:
obj = resolve_attribute(obj=obj, attribute=related)
field_lookup = 'pk__in'
try:
parent_accessor = ModelPermission.get_inheritance(
model=obj._meta.model
acl_filter = self.annotate(
ct_fk_combination=Concat(
'content_type', V('-'), 'object_id', output_field=CharField()
)
).filter(
permissions=stored_permission, role__groups__user=user,
ct_fk_combination__in=content_type_object_id_queryset
).values('object_id')
result.append(Q(**{field_lookup: acl_filter}))
else:
# Case 2: Related field of a single type, single ContentType,
# multiple object id
related_field = get_related_field(
model=queryset.model, related_field_name=related_field_name
)
except AttributeError:
# AttributeError means non model objects: ie Statistics
# These can't have ACLs so we raise PermissionDenied
raise PermissionDenied(
_('Insufficient access for: %(object)s') % {'object': obj}
content_type = ContentType.objects.get_for_model(
model=related_field.related_model
)
field_lookup = '{}_id__in'.format(related_field_name)
acl_filter = self.filter(
content_type=content_type, permissions=stored_permission,
role__groups__user=user
).values('object_id')
result.append(Q(**{field_lookup: acl_filter}))
# Case 5: Related field, has an inherited related field itself
# Bubble up permssion check
# TODO: Add relationship support: OR or AND
# TODO: OR for document pages, version, doc, and types
# TODO: AND for new cabinet levels ACLs
try:
related_field_model_related_field_name = ModelPermission.get_inheritance(
model=related_field.related_model
)
except KeyError:
pass
else:
related_field_name = '{}__{}'.format(related_field_name, related_field_model_related_field_name)
related_field_inherited_acl_queries = self._get_acl_filters(
queryset=queryset, stored_permission=stored_permission,
user=user, related_field_name=related_field_name
)
result.extend(related_field_inherited_acl_queries)
else:
# Case 1: Original model, single ContentType, multiple object id
content_type = ContentType.objects.get_for_model(model=queryset.model)
field_lookup = 'id__in'
acl_filter = self.filter(
permissions=stored_permission, role__groups__user=user
).values('id')
result.append(Q(**{field_lookup: acl_filter}))
# Case 4: Original model, has an inherited related field
try:
related_field_name = ModelPermission.get_inheritance(
model=queryset.model
)
except KeyError:
pass
else:
try:
return self.check_access(
obj=getattr(obj, parent_accessor),
permissions=permissions, user=user
)
except AttributeError:
# Has no such attribute, try it as a related field
try:
return self.check_access(
obj=return_related(
instance=obj, related_field=parent_accessor
), permissions=permissions, user=user
)
except PermissionDenied:
pass
except PermissionDenied:
pass
user_roles = []
for group in user.groups.all():
for role in group.roles.all():
if set(stored_permissions).intersection(set(self.get_inherited_permissions(role=role, obj=obj))):
logger.debug(
'Permissions "%s" on "%s" granted to user "%s" through role "%s" via inherited ACL',
permissions, obj, user, role
)
return True
user_roles.append(role)
if not self.filter(content_type=ContentType.objects.get_for_model(model=obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles).exists():
logger.debug(
'Permissions "%s" on "%s" denied for user "%s"',
permissions, obj, user
inherited_acl_queries = self._get_acl_filters(
queryset=queryset, stored_permission=stored_permission,
user=user, related_field_name=related_field_name
)
raise PermissionDenied(ugettext('Insufficient access for: %s') % obj)
result.extend(inherited_acl_queries)
logger.debug(
'Permissions "%s" on "%s" granted to user "%s" through roles "%s" by direct ACL',
permissions, obj, user, user_roles
)
return result
def filter_by_access(self, permission, user, queryset):
if user.is_superuser or user.is_staff:
logger.debug(
'Unfiltered queryset returned to user "%s" as superuser '
'or staff', user
)
return queryset
def check_access(self, permissions, user, obj, related=None, raise_404=False):
warnings.warn(
'check_access() is deprecated, use restrict_queryset() to '
'produce a queryset from which to .get() the corresponding '
'object in the local code.', InterfaceWarning
)
try:
Permission.check_permissions(
requester=user, permissions=(permission,)
)
except PermissionDenied:
user_roles = []
for group in user.groups.all():
for role in group.roles.all():
user_roles.append(role)
try:
parent_accessor = ModelPermission.get_inheritance(
model=queryset.model
)
except KeyError:
parent_acl_query = Q()
else:
instance = queryset.first()
if instance:
parent_object = return_related(
instance=instance, related_field=parent_accessor
)
try:
# Try to see if parent_object is a function
parent_object()
except TypeError:
# Is not a function, try it as a field
parent_content_type = ContentType.objects.get_for_model(
model=parent_object
)
parent_queryset = self.filter(
content_type=parent_content_type,
role__in=user_roles,
permissions=permission.stored_permission
)
parent_acl_query = Q(
**{
'{}__pk__in'.format(
parent_accessor
): parent_queryset.values_list(
'object_id', flat=True
)
}
)
else:
# Is a function. Can't perform Q object filtering.
# Perform iterative filtering.
result = []
for entry in queryset:
try:
self.check_access(
obj=entry, permissions=permission,
user=user
)
except PermissionDenied:
pass
else:
result.append(entry.pk)
return queryset.filter(pk__in=result)
else:
parent_acl_query = Q()
# Directly granted access
content_type = ContentType.objects.get_for_model(
model=queryset.model
)
acl_query = Q(pk__in=self.filter(
content_type=content_type, role__in=user_roles,
permissions=permission.stored_permission
).values_list('object_id', flat=True))
logger.debug(
'Filtered queryset returned to user "%s" based on roles "%s"',
user, user_roles
)
return queryset.filter(parent_acl_query | acl_query)
# permissions can be a single permission or a list of permissions
permission = permissions[0]
except TypeError:
permission = permissions
else:
return queryset
warnings.warn(
'Passing multiple permissions via the `permissions` argument '
'is deprecated. Pass a single permission. Use multiple call '
'to check against multiple permissions.', InterfaceWarning
)
if related:
warnings.warn(
'Passing a related field name to check_access() is '
'deprecated. Register the related field using '
'common.classes.ModelPermission.', InterfaceWarning
)
queryset = self.restrict_queryset(
permission=permission, queryset=obj._meta.default_manager.all(),
user=user, related_field_name=related
)
if queryset.filter(pk=obj.pk).exists():
return True
else:
if raise_404:
raise Http404
else:
return PermissionDenied
def get_inherited_permissions(self, role, obj):
try:
@@ -242,6 +220,46 @@ class AccessControlListManager(models.Manager):
return acl
def filter_by_access(self, permission, queryset, user):
warnings.warn(
'filter_by_access() is deprecated, use restrict_queryset().',
InterfaceWarning
)
return self.restrict_queryset(
permission=permission, queryset=queryset, user=user
)
def restrict_queryset(self, permission, queryset, user, related_field_name=None):
# `related_field_name` is left only for compatibility with check_access
# once check_access() is removed the `related_field_name` argument
# will be removed too.
# Check directly granted permission via a role
try:
Permission.check_user_permission(permission=permission, user=user)
Permission.check_permissions(
requester=user, permissions=(permission,)
)
except PermissionDenied:
acl_filters = self._get_acl_filters(
queryset=queryset, related_field_name=related_field_name,
stored_permission=permission.stored_permission, user=user
)
final_query = None
for acl_filter in acl_filters:
if final_query is None:
final_query = acl_filter
else:
final_query = final_query | acl_filter
return queryset.filter(final_query)
else:
# User has direct permission assignment via a role, is superuser or
# is staff. Return the entire queryset.
return queryset
def revoke(self, permission, role, obj):
content_type = ContentType.objects.get_for_model(model=obj)
acl, created = self.get_or_create(