Finish refactoring access control list's queryset filtering by access level code.
This commit is contained in:
@@ -21,8 +21,8 @@ class AccessControlListManager(models.Manager):
|
||||
and an object
|
||||
"""
|
||||
|
||||
def check_access(self, permission, actor, obj):
|
||||
if actor.is_superuser or actor.is_staff:
|
||||
def check_access(self, permissions, user, obj):
|
||||
if user.is_superuser or user.is_staff:
|
||||
return True
|
||||
|
||||
user_roles = []
|
||||
@@ -30,243 +30,30 @@ class AccessControlListManager(models.Manager):
|
||||
for role in group.roles.all():
|
||||
user_roles.append(role)
|
||||
|
||||
if not self.filter(content_object=obj, permissions=permission, role__in=user_roles):
|
||||
raise PermissionDenied(ugettext('Insufficient access.'))
|
||||
|
||||
# TODO: add filter_objects_by_access
|
||||
|
||||
|
||||
class AccessEntryManager(models.Manager):
|
||||
"""
|
||||
Implement a 3 tier permission system, involving a permissions, an actor
|
||||
and an object
|
||||
"""
|
||||
def grant(self, permission, actor, obj):
|
||||
"""
|
||||
Grant a permission (what), (to) an actor, (on) a specific object
|
||||
"""
|
||||
obj = get_source_object(obj)
|
||||
actor = get_source_object(actor)
|
||||
|
||||
access_entry, created = self.model.objects.get_or_create(
|
||||
permission=permission,
|
||||
holder_type=ContentType.objects.get_for_model(actor),
|
||||
holder_id=actor.pk,
|
||||
content_type=ContentType.objects.get_for_model(obj),
|
||||
object_id=obj.pk
|
||||
)
|
||||
return created
|
||||
|
||||
def revoke(self, permission, actor, obj):
|
||||
"""
|
||||
Revoke a permission (what), (from) an actor, (on) a specific object
|
||||
"""
|
||||
obj = get_source_object(obj)
|
||||
actor = get_source_object(actor)
|
||||
|
||||
try:
|
||||
access_entry = self.model.objects.get(
|
||||
permission=permission,
|
||||
holder_type=ContentType.objects.get_for_model(actor),
|
||||
holder_id=actor.pk,
|
||||
content_type=ContentType.objects.get_for_model(obj),
|
||||
object_id=obj.pk
|
||||
)
|
||||
except self.model.DoesNotExist:
|
||||
return False
|
||||
else:
|
||||
access_entry.delete()
|
||||
return True
|
||||
|
||||
def has_access(self, permission, actor, obj, db_only=False):
|
||||
"""
|
||||
Returns whether an actor has a specific permission for an object
|
||||
"""
|
||||
obj = get_source_object(obj)
|
||||
actor = get_source_object(actor)
|
||||
|
||||
if isinstance(actor, User) and not db_only:
|
||||
# db_only causes the return of only the stored permissions
|
||||
# and not the perceived permissions for an actor
|
||||
if actor.is_superuser or actor.is_staff:
|
||||
return True
|
||||
|
||||
actor = AnonymousUserSingleton.objects.passthru_check(actor)
|
||||
try:
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
except AttributeError:
|
||||
# Object doesn't have a content type, therefore allow access
|
||||
return True
|
||||
|
||||
try:
|
||||
self.model.objects.get(
|
||||
permission=permission.stored_permission,
|
||||
holder_type=ContentType.objects.get_for_model(actor),
|
||||
holder_id=actor.pk,
|
||||
content_type=content_type,
|
||||
object_id=obj.pk
|
||||
)
|
||||
except self.model.DoesNotExist:
|
||||
# If not check if the actor's memberships is one of
|
||||
# the access's holder?
|
||||
roles = RoleMember.objects.get_roles_for_member(actor)
|
||||
|
||||
if isinstance(actor, User):
|
||||
groups = actor.groups.all()
|
||||
else:
|
||||
groups = []
|
||||
|
||||
for membership in list(set(roles) | set(groups)):
|
||||
if self.has_access(permission, membership, obj, db_only):
|
||||
return True
|
||||
|
||||
logger.debug('Fallthru')
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def check_access(self, permission, actor, obj):
|
||||
# TODO: Merge with has_access
|
||||
obj = get_source_object(obj)
|
||||
actor = get_source_object(actor)
|
||||
|
||||
if self.has_access(permission, actor, obj):
|
||||
return True
|
||||
else:
|
||||
raise PermissionDenied(ugettext('Insufficient access.'))
|
||||
|
||||
def check_accesses(self, permission_list, actor, obj):
|
||||
"""
|
||||
Returns whether an actor has at least one of a list of permissions for an object
|
||||
"""
|
||||
obj = get_source_object(obj)
|
||||
actor = get_source_object(actor)
|
||||
for permission in permission_list:
|
||||
if self.has_access(permission, actor, obj):
|
||||
return True
|
||||
|
||||
raise PermissionDenied(ugettext('Insufficient access.'))
|
||||
|
||||
def get_allowed_class_objects(self, permission, actor, cls, related=None):
|
||||
logger.debug('related: %s', related)
|
||||
|
||||
actor = AnonymousUserSingleton.objects.passthru_check(actor)
|
||||
actor_type = ContentType.objects.get_for_model(actor)
|
||||
content_type = ContentType.objects.get_for_model(cls)
|
||||
|
||||
# Calculate actor role membership ACL query
|
||||
total_queries = Q()
|
||||
for role in RoleMember.objects.get_roles_for_member(actor):
|
||||
role_type = ContentType.objects.get_for_model(role)
|
||||
if related:
|
||||
query = Q(holder_type=role_type, holder_id=role.pk, permission=permission.get_stored_permission)
|
||||
else:
|
||||
query = Q(holder_type=role_type, holder_id=role.pk, content_type=content_type, permission=permission.get_stored_permission)
|
||||
if not total_queries:
|
||||
total_queries = query
|
||||
else:
|
||||
total_queries = total_queries | query
|
||||
|
||||
# Calculate actor group membership ACL query
|
||||
if isinstance(actor, User):
|
||||
groups = actor.groups.all()
|
||||
else:
|
||||
groups = []
|
||||
|
||||
for group in groups:
|
||||
group_type = ContentType.objects.get_for_model(group)
|
||||
if related:
|
||||
query = Q(holder_type=group_type, holder_id=group.pk, permission=permission.get_stored_permission)
|
||||
else:
|
||||
query = Q(holder_type=group_type, holder_id=group.pk, content_type=content_type, permission=permission.get_stored_permission)
|
||||
if not total_queries:
|
||||
total_queries = query
|
||||
else:
|
||||
total_queries = total_queries | query
|
||||
|
||||
if related:
|
||||
actor_query = Q(holder_type=actor_type, holder_id=actor.pk, permission=permission.get_stored_permission)
|
||||
master_list = [obj.content_object for obj in self.model.objects.select_related().filter(actor_query | total_queries)]
|
||||
logger.debug('master_list: %s', master_list)
|
||||
# TODO: update to use Q objects and check performance diff
|
||||
# kwargs = {'%s__in' % related: master_list}
|
||||
# Q(**kwargs)
|
||||
return (obj for obj in cls.objects.all() if getattr(obj, related) in master_list)
|
||||
else:
|
||||
actor_query = Q(holder_type=actor_type, holder_id=actor.pk, content_type=content_type, permission=permission.get_stored_permission)
|
||||
return (obj.content_object for obj in self.model.objects.filter(actor_query | total_queries))
|
||||
|
||||
def get_acl_url(self, obj):
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
return reverse('acl_list', args=[content_type.app_label, content_type.model, obj.pk])
|
||||
|
||||
def get_new_holder_url(self, obj):
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
return reverse('acl_new_holder_for', args=[content_type.app_label, content_type.model, obj.pk])
|
||||
|
||||
def get_holders_for(self, obj):
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
holder_list = []
|
||||
for access_entry in self.model.objects.filter(content_type=content_type, object_id=obj.pk):
|
||||
if access_entry.holder_object:
|
||||
# Don't add references to non existant content type objects
|
||||
entry = AccessHolder.encapsulate(access_entry.holder_object)
|
||||
|
||||
if entry not in holder_list:
|
||||
holder_list.append(entry)
|
||||
|
||||
return holder_list
|
||||
|
||||
def get_holder_permissions_for(self, obj, actor, db_only=False):
|
||||
"""
|
||||
Returns a list of actors that hold at least one permission for
|
||||
a specific object
|
||||
"""
|
||||
logger.debug('obj: %s', obj)
|
||||
logger.debug('actor: %s', actor)
|
||||
|
||||
if isinstance(actor, User) and not db_only:
|
||||
if actor.is_superuser or actor.is_staff:
|
||||
return Permission.all()
|
||||
|
||||
actor_type = ContentType.objects.get_for_model(actor)
|
||||
content_type = ContentType.objects.get_for_model(obj)
|
||||
return (access.permission for access in self.model.objects.filter(content_type=content_type, object_id=obj.pk, holder_type=actor_type, holder_id=actor.pk))
|
||||
|
||||
def filter_objects_by_access(self, permission, actor, object_list, exception_on_empty=False, related=None):
|
||||
"""
|
||||
Filter a list of objects or a QuerySet elements depending on
|
||||
whether the actor holds the specified permission
|
||||
"""
|
||||
logger.debug('exception_on_empty: %s', exception_on_empty)
|
||||
logger.debug('object_list: %s', object_list)
|
||||
|
||||
if isinstance(actor, User):
|
||||
if actor.is_superuser or actor.is_staff:
|
||||
return object_list
|
||||
|
||||
try:
|
||||
if object_list.count() == 0:
|
||||
return object_list
|
||||
stored_permissions = [permission.stored_permission for permission in permissions]
|
||||
except TypeError:
|
||||
# object_list is not a queryset
|
||||
if len(object_list) == 0:
|
||||
return object_list
|
||||
stored_permissions = [permissions.stored_permission]
|
||||
|
||||
try:
|
||||
# Try to process as a QuerySet
|
||||
qs = object_list.filter(pk__in=[obj.pk for obj in self.get_allowed_class_objects(permission, actor, object_list[0].__class__, related)])
|
||||
logger.debug('qs: %s', qs)
|
||||
if not self.filter(content_type=ContentType.objects.get_for_model(obj), object_id=obj.pk, permissions__in=stored_permissions, role__in=user_roles):
|
||||
raise PermissionDenied(ugettext('Insufficient access.'))
|
||||
|
||||
if qs.count() == 0 and exception_on_empty:
|
||||
raise PermissionDenied
|
||||
def filter_by_access(self, permission, user, queryset, exception_on_empty=False, related=None):
|
||||
if user.is_superuser or user.is_staff:
|
||||
return queryset
|
||||
|
||||
return qs
|
||||
except AttributeError:
|
||||
# Fallback to a filtered list
|
||||
object_list = list(set(object_list) & set(self.get_allowed_class_objects(permission, actor, object_list[0].__class__, related)))
|
||||
logger.debug('object_list: %s', object_list)
|
||||
if len(object_list) == 0 and exception_on_empty:
|
||||
raise PermissionDenied
|
||||
user_roles = []
|
||||
for group in user.groups.all():
|
||||
for role in group.roles.all():
|
||||
user_roles.append(role)
|
||||
|
||||
return object_list
|
||||
content_type = ContentType.objects.get_for_model(queryset.model)
|
||||
|
||||
acls = self.filter(content_type=content_type, role__in=user_roles, permissions=permission.stored_permission).values_list('object_id', flat=True)
|
||||
|
||||
new_queryset = queryset.filter(pk__in=acls)
|
||||
|
||||
if new_queryset.count() == 0 and exception_on_empty:
|
||||
raise PermissionDenied
|
||||
|
||||
return new_queryset
|
||||
|
||||
@@ -13,7 +13,7 @@ class Migration(migrations.Migration):
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='AccessEntry',
|
||||
name='AccessControlList',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('holder_id', models.PositiveIntegerField()),
|
||||
@@ -40,7 +40,7 @@ class Migration(migrations.Migration):
|
||||
bases=(models.Model,),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name='DefaultAccessEntry',
|
||||
name='DefaultAccessControlList',
|
||||
fields=[
|
||||
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
|
||||
('holder_id', models.PositiveIntegerField()),
|
||||
|
||||
@@ -28,7 +28,7 @@ class Migration(migrations.Migration):
|
||||
name='permission',
|
||||
),
|
||||
migrations.DeleteModel(
|
||||
name='DefaultAccessEntry',
|
||||
name='DefaultAccessControlList',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='accessentry',
|
||||
|
||||
@@ -10,7 +10,7 @@ from django.utils.translation import ugettext_lazy as _
|
||||
|
||||
from permissions.models import Role, StoredPermission
|
||||
|
||||
from .managers import AccessControlListManager, AccessEntryManager
|
||||
from .managers import AccessControlListManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -28,7 +28,7 @@ class AccessControlList(models.Model):
|
||||
object_id = models.PositiveIntegerField()
|
||||
content_object = generic.GenericForeignKey(
|
||||
ct_field='content_type',
|
||||
fk_field='object_id'
|
||||
fk_field='object_id',
|
||||
)
|
||||
# TODO: limit choices to the permissions valid for the content_object
|
||||
permissions = models.ManyToManyField(StoredPermission, blank=True, related_name='acls', verbose_name=_('Permissions'))
|
||||
@@ -44,10 +44,10 @@ class AccessControlList(models.Model):
|
||||
def __str__(self):
|
||||
return '{} <=> {}'.format(self.content_object, self.role)
|
||||
|
||||
|
||||
'''
|
||||
# TODO: remove
|
||||
@python_2_unicode_compatible
|
||||
class AccessEntry(models.Model):
|
||||
class AccessControlList(models.Model):
|
||||
"""
|
||||
Model that hold the permission, object, actor relationship
|
||||
"""
|
||||
@@ -63,7 +63,7 @@ class AccessEntry(models.Model):
|
||||
fk_field='object_id'
|
||||
)
|
||||
|
||||
objects = AccessEntryManager()
|
||||
objects = AccessControlListManager()
|
||||
|
||||
class Meta:
|
||||
verbose_name = _('Access entry')
|
||||
@@ -71,3 +71,4 @@ class AccessEntry(models.Model):
|
||||
|
||||
def __str__(self):
|
||||
return '%s: %s' % (self.content_type, self.content_object)
|
||||
'''
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
ResolvedSmartLink = namedtuple('ResolvedSmartLink', ['smart_link', 'queryset'])
|
||||
@@ -1,8 +0,0 @@
|
||||
from django.db import models
|
||||
|
||||
from .classes import ResolvedSmartLink
|
||||
|
||||
|
||||
class SmartLinkManager(models.Manager):
|
||||
def get_for(self, document):
|
||||
return [ResolvedSmartLink(smart_link=smart_link, queryset=smart_link.get_linked_document_for(document)) for smart_link in self.filter(enabled=True).filter(document_types=document.document_type)]
|
||||
Reference in New Issue
Block a user