Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • djangoldp-packages/djangoldp
  • decentral1se/djangoldp
  • femmefaytale/djangoldp
  • jvtrudel/djangoldp
4 results
Show changes
Showing
with 1989 additions and 66 deletions
# -*- coding: utf-8 -*-
# Generated by Django 1.11.29 on 2020-06-17 18:17
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0011_auto_20200610_1323'),
]
operations = [
migrations.AlterField(
model_name='activity',
name='is_backlink',
field=models.BooleanField(default=False, help_text='set automatically to indicate the Model is a backlink'),
),
migrations.AlterField(
model_name='follower',
name='is_backlink',
field=models.BooleanField(default=False, help_text='set automatically to indicate the Model is a backlink'),
),
migrations.AlterField(
model_name='ldpsource',
name='is_backlink',
field=models.BooleanField(default=False, help_text='set automatically to indicate the Model is a backlink'),
),
]
# -*- coding: utf-8 -*-
# Generated by Django 1.11.29 on 2020-06-24 17:09
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0012_auto_20200617_1817'),
]
operations = [
migrations.AddField(
model_name='follower',
name='follower',
field=models.URLField(blank=True, help_text='(optional) the resource/actor following the object'),
),
migrations.AlterField(
model_name='follower',
name='inbox',
field=models.URLField(help_text='the inbox recipient of updates'),
),
migrations.AlterField(
model_name='follower',
name='object',
field=models.URLField(help_text='the object being followed'),
),
]
# Generated by Django 2.2 on 2020-09-09 22:06
from django.db import migrations, models
import django.db.models.deletion
import djangoldp.fields
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0013_auto_20200624_1709'),
]
operations = [
migrations.CreateModel(
name='ScheduledActivity',
fields=[
('activity_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='djangoldp.Activity')),
('failed_attempts', models.PositiveIntegerField(default=0, help_text='a log of how many failed retries have been made sending the activity')),
],
options={
'abstract': False,
'default_permissions': ('add', 'change', 'delete', 'view', 'control'),
},
bases=('djangoldp.activity',),
),
migrations.AlterField(
model_name='activity',
name='created_at',
field=models.DateTimeField(auto_now_add=True),
),
migrations.AlterField(
model_name='activity',
name='local_id',
field=djangoldp.fields.LDPUrlField(help_text='/inbox or /outbox url (local - this server)'),
),
migrations.AlterField(
model_name='activity',
name='payload',
field=models.BinaryField(),
),
migrations.AddField(
model_name='activity',
name='success',
field=models.BooleanField(default=False,
help_text='set to True when an Activity is successfully delivered'),
),
migrations.AddField(
model_name='activity',
name='type',
field=models.CharField(blank=True, help_text='the ActivityStreams type of the Activity', max_length=64,
null=True),
),
migrations.AddField(
model_name='activity',
name='is_finished',
field=models.BooleanField(default=True),
),
migrations.AddField(
model_name='activity',
name='response_code',
field=models.CharField(blank=True, help_text='Response code sent by receiver', max_length=8, null=True),
),
migrations.AddField(
model_name='activity',
name='response_location',
field=djangoldp.fields.LDPUrlField(blank=True, help_text='Location saved activity can be found', null=True),
),
migrations.AddField(
model_name='activity',
name='response_body',
field=models.BinaryField(null=True),
),
migrations.AddField(
model_name='activity',
name='external_id',
field=djangoldp.fields.LDPUrlField(help_text='the /inbox or /outbox url (from the sender or receiver)',
null=True),
),
migrations.RemoveField(
model_name='activity',
name='aid',
),
]
# Generated by Django 2.2.17 on 2021-01-25 18:47
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0014_auto_20200909_2206'),
]
operations = [
migrations.AlterModelOptions(
name='activity',
options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']},
),
migrations.AlterModelOptions(
name='follower',
options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']},
),
migrations.AlterModelOptions(
name='ldpsource',
options={'default_permissions': ['add', 'change', 'delete', 'view', 'control'], 'ordering': ('federation',)},
),
migrations.AlterModelOptions(
name='scheduledactivity',
options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']},
),
]
# Generated by Django 4.2.3 on 2023-08-31 15:27
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0015_auto_20210125_1847'),
]
operations = [
migrations.AlterModelOptions(
name='activity',
options={'default_permissions': {'delete', 'change', 'view', 'add', 'control'}},
),
migrations.AlterModelOptions(
name='follower',
options={'default_permissions': {'delete', 'change', 'view', 'add', 'control'}},
),
migrations.AlterModelOptions(
name='ldpsource',
options={'default_permissions': {'delete', 'change', 'view', 'add', 'control'}, 'ordering': ('federation',)},
),
migrations.AlterModelOptions(
name='scheduledactivity',
options={'default_permissions': {'delete', 'change', 'view', 'add', 'control'}},
),
]
# Generated by Django 4.2.3 on 2023-09-03 20:26
from django.db import migrations
import djangoldp.fields
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0016_alter_activity_options_alter_follower_options_and_more'),
]
operations = [
migrations.AlterField(
model_name='activity',
name='urlid',
field=djangoldp.fields.LDPUrlField(blank=True, db_index=True, null=True, unique=True),
),
migrations.AlterField(
model_name='follower',
name='urlid',
field=djangoldp.fields.LDPUrlField(blank=True, db_index=True, null=True, unique=True),
),
migrations.AlterField(
model_name='ldpsource',
name='urlid',
field=djangoldp.fields.LDPUrlField(blank=True, db_index=True, null=True, unique=True),
),
]
# Generated by Django 4.2.3 on 2023-10-17 19:25
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0017_alter_activity_urlid_alter_follower_urlid_and_more'),
]
operations = [
migrations.AlterField(
model_name='activity',
name='payload',
field=models.TextField(),
),
migrations.AlterField(
model_name='activity',
name='response_body',
field=models.TextField(null=True),
),
]
import json
import logging
import uuid
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from django.core.exceptions import ObjectDoesNotExist, ValidationError, FieldDoesNotExist
from django.db import models
from django.db.models.base import ModelBase
from django.db.models.signals import post_save, pre_save, pre_delete, m2m_changed
from django.dispatch import receiver
from django.urls import get_resolver
from django.utils.datastructures import MultiValueDictKeyError
from django.utils.decorators import classonlymethod
from guardian.shortcuts import assign_perm
from rest_framework.utils import model_meta
from djangoldp.fields import LDPUrlField
from djangoldp.permissions import DEFAULT_DJANGOLDP_PERMISSIONS, OwnerPermissions, InheritPermissions, ReadOnly
logger = logging.getLogger('djangoldp')
Group._meta.serializer_fields = ['name', 'user_set']
Group._meta.rdf_type = 'foaf:Group'
# Group._meta.rdf_context = {'user_set': 'foaf:member'}
Group._meta.permission_classes = [(OwnerPermissions&ReadOnly)|InheritPermissions]
Group._meta.owner_field = 'user'
Group._meta.inherit_permissions = []
class LDPModelManager(models.Manager):
def local(self):
'''an alternative to all() which exlcudes external resources'''
queryset = super(LDPModelManager, self).all()
internal_ids = [x.pk for x in queryset if not Model.is_external(x)]
return queryset.filter(pk__in=internal_ids)
class Model(models.Model):
urlid = LDPUrlField(blank=True, null=True, unique=True, db_index=True)
is_backlink = models.BooleanField(default=False, help_text='set automatically to indicate the Model is a backlink')
allow_create_backlink = models.BooleanField(default=True,
help_text='set to False to disable backlink creation after Model save')
objects = LDPModelManager()
class LDPSource(models.Model):
container = models.URLField()
federation = models.CharField(max_length=255)
class Meta:
rdf_type = 'sib:source'
default_permissions = DEFAULT_DJANGOLDP_PERMISSIONS
abstract = True
depth = 0
def __init__(self, *args, **kwargs):
super(Model, self).__init__(*args, **kwargs)
@classmethod
def get_container_path(cls):
'''returns the url path which is used to access actions on this model (e.g. /users/)'''
path = getattr(cls._meta, 'container_path', getattr(cls.Meta, 'container_path', None))
if path is None:
path = "{}s".format(cls._meta.object_name.lower())
return cls.__clean_path(path)
def get_absolute_url(self):
return Model.absolute_url(self)
@classonlymethod
def absolute_url(cls, instance_or_model):
if isinstance(instance_or_model, ModelBase) or not instance_or_model.urlid:
return '{}{}'.format(settings.SITE_URL, Model.resource(instance_or_model))
else:
return instance_or_model.urlid
def get_container_id(self):
return Model.container_id(self)
@classonlymethod
def resource(cls, instance_or_model):
if isinstance(instance_or_model, ModelBase):
return cls.container_id(instance_or_model)
else:
return cls.resource_id(instance_or_model)
@classonlymethod
def resource_id(cls, instance):
r_id = "{}{}".format(cls.container_id(instance), getattr(instance, cls.slug_field(instance), ""))
return cls.__clean_path(r_id)
@classonlymethod
def slug_field(cls, instance_or_model):
if isinstance(instance_or_model, ModelBase):
model = instance_or_model
else:
model = type(instance_or_model)
# Use cached value if present
if hasattr(model, "_slug_field"):
return model._slug_field
object_name = model.__name__.lower()
view_name = '{}-detail'.format(object_name)
try:
slug_field = '/{}'.format(get_resolver().reverse_dict[view_name][0][0][1][0])
except MultiValueDictKeyError:
slug_field = getattr(model._meta, 'lookup_field', 'pk')
if slug_field.startswith('/'):
slug_field = slug_field[1:]
model._slug_field = slug_field
return slug_field
@classonlymethod
def container_id(cls, instance):
if isinstance(instance, cls):
path = instance.get_container_path()
else:
view_name = '{}-list'.format(instance._meta.object_name.lower())
path = get_resolver().reverse(view_name)
path = cls.__clean_path(path)
return path
@classonlymethod
def resolve_id(cls, id):
'''
Resolves the id of a given path (e.g. /container/1/)
Raises Resolver404 if the path cannot be found, ValidationError if the path is for a model base
and an ObjectDoesNotExist exception if the resource does not exist
'''
id = cls.__clean_path(id)
match = get_resolver().resolve(id)
kwargs = match.kwargs
view = match.func
if match.url_name.endswith('-list') or len(match.kwargs.keys()) == 0:
raise ValidationError('resolve_id received a path for a container or nested container')
return view.initkwargs['model'].objects.get(**kwargs)
@classonlymethod
def resolve_parent(cls, path):
split = path.strip('/').split('/')
parent_path = "/".join(split[:-1])
return Model.resolve_id(parent_path)
@classonlymethod
def resolve_container(cls, path):
'''retruns the model container of passed URL path'''
path = cls.__clean_path(path)
view, args, kwargs = get_resolver().resolve(path)
return view.initkwargs['model']
@classonlymethod
def resolve(cls, path):
'''
resolves the containing model and associated id in the path. If there is no id in the path returns None
:param path: a URL path to check
:return: the container model and resolved id in a tuple
'''
if path.startswith(settings.BASE_URL):
path = path.replace(settings.BASE_URL, '')
container = cls.resolve_container(path)
try:
resolve_id = cls.resolve_id(path)
except:
resolve_id = None
return container, resolve_id
@classonlymethod
def __clean_path(cls, path):
'''ensures path is Django-friendly'''
if not path.startswith("/"):
path = "/{}".format(path)
if not path.endswith("/"):
path = "{}/".format(path)
return path
@classonlymethod
def get_or_create(cls, model, urlid, update=False, **field_tuples):
'''
gets an object with the passed urlid if it exists, creates it if not
:param model: the model class which the object belongs to
:param update: if set to True the object will be updated with the passed field_tuples
:param field_tuples: kwargs for the model creation/updating
:return: the object, fetched or created
:raises Exception: if the object does not exist, but the data passed is invalid
'''
try:
rval = model.objects.get(urlid=urlid)
if update:
for field in field_tuples.keys():
setattr(rval, field, field_tuples[field])
rval.save()
return rval
except ObjectDoesNotExist:
if model is get_user_model():
field_tuples['username'] = str(uuid.uuid4())
return model.objects.create(urlid=urlid, is_backlink=True, **field_tuples)
@classonlymethod
def get_or_create_external(cls, model, urlid, **kwargs):
'''
checks that the parameterised urlid is external and then returns the result of Model.get_or_create
:raises ObjectDoesNotExist: if the urlid is not external and the object doesn't exist
'''
if not Model.is_external(urlid) and not model.objects.filter(urlid=urlid).exists():
raise ObjectDoesNotExist
return Model.get_or_create(model, urlid, **kwargs)
@classonlymethod
def get_subclass_with_rdf_type(cls, type):
#TODO: deprecate
'''returns Model subclass with Meta.rdf_type matching parameterised type, or None'''
if type == 'foaf:user':
return get_user_model()
def find_subclass_with_rdf_type(cls):
if getattr(cls._meta, "rdf_type", None) == type:
return cls
for subcls in cls.__subclasses__():
result = find_subclass_with_rdf_type(subcls)
if result:
return result
return None
for subcls in Model.__subclasses__():
result = find_subclass_with_rdf_type(subcls)
if result:
return result
return None
@classmethod
def is_external(cls, value):
'''
:param value: string urlid or an instance with urlid field
:return: True if the urlid is external to the server, False otherwise
'''
try:
if not value:
return False
if not isinstance(value, str):
value = value.urlid
# This expects all @ids to start with http which mlight not be universal. Maybe needs a fix.
return value.startswith('http') and not value.startswith(settings.SITE_URL)
except:
return False
#TODO: this breaks the serializer, which probably assumes that traditional models don't have a urlid.
# models.Model.urlid = property(lambda self: '{}{}'.format(settings.SITE_URL, Model.resource(self)))
class LDPSource(Model):
federation = models.CharField(max_length=255)
class Meta(Model.Meta):
rdf_type = 'sib:federatedContainer'
ordering = ('federation',)
permissions = (
('view_source', 'acl:Read'),
('control_source', 'acl:Control'),
)
container_path = 'sources'
lookup_field = 'federation'
def __str__(self):
return "{}: {}".format(self.federation, self.container)
return "{}: {}".format(self.federation, self.urlid)
class LDNotification(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL)
author = models.URLField()
object = models.URLField()
type = models.CharField(max_length=255)
summary = models.TextField()
class Meta:
permissions = (
('view_todo', 'Read'),
('control_todo', 'Control'),
)
class Activity(Model):
'''Models an ActivityStreams Activity'''
local_id = LDPUrlField(help_text='/inbox or /outbox url (local - this server)') # /inbox or /outbox full url
external_id = LDPUrlField(null=True, help_text='the /inbox or /outbox url (from the sender or receiver)')
payload = models.TextField()
response_location = LDPUrlField(null=True, blank=True, help_text='Location saved activity can be found')
response_code = models.CharField(null=True, blank=True, help_text='Response code sent by receiver', max_length=8)
response_body = models.TextField(null=True)
type = models.CharField(null=True, blank=True, help_text='the ActivityStreams type of the Activity',
max_length=64)
is_finished = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
success = models.BooleanField(default=False, help_text='set to True when an Activity is successfully delivered')
class Meta(Model.Meta):
container_path = "activities"
rdf_type = 'as:Activity'
disable_url = True
def to_activitystream(self):
return json.loads(self.payload)
def response_to_json(self):
return self.to_activitystream()
# temporary database-side storage used for scheduled tasks in the ActivityQueue
class ScheduledActivity(Activity):
failed_attempts = models.PositiveIntegerField(default=0,
help_text='a log of how many failed retries have been made sending the activity')
def save(self, *args, **kwargs):
self.is_finished = False
super(ScheduledActivity, self).save(*args, **kwargs)
class Meta(Model.Meta):
disable_url = True
class Follower(Model):
'''Models a subscription on a model. When the model is saved, an Update activity will be sent to the inbox'''
object = models.URLField(help_text='the object being followed')
inbox = models.URLField(help_text='the inbox recipient of updates')
follower = models.URLField(help_text='(optional) the resource/actor following the object', blank=True)
def __str__(self):
return 'Inbox ' + str(self.inbox) + ' on ' + str(self.object)
class Meta(Model.Meta):
disable_url = True
class DynamicNestedField:
'''
Used to define a method as a nested_field.
Usage:
LDPUser.circles = lambda self: Circle.objects.filter(members__user=self)
LDPUser.circles.field = DynamicNestedField(Circle, 'circles')
'''
related_query_name = None
one_to_many = False
many_to_many = True
many_to_one = False
one_to_one = False
read_only = True
name = ''
def __init__(self, model:models.Model|None, remote_name:str, name:str='', remote:object|None=None) -> None:
self.model = model
self.name = name
if remote:
self.remote_field = remote
else:
self.remote_field = DynamicNestedField(None, '', remote_name, self)
@receiver([post_save])
def auto_urlid(sender, instance, **kwargs):
if isinstance(instance, Model):
changed = False
if getattr(instance, Model.slug_field(instance), None) is None:
setattr(instance, Model.slug_field(instance), instance.pk)
changed = True
if (not instance.urlid or 'None' in instance.urlid):
instance.urlid = instance.get_absolute_url()
changed = True
if changed:
instance.save()
@receiver(post_save)
def create_role_groups(sender, instance, created, **kwargs):
if created:
for name, params in getattr(instance._meta, 'permission_roles', {}).items():
group, x = Group.objects.get_or_create(name=f'LDP_{instance._meta.model_name}_{name}_{instance.id}')
setattr(instance, name, group)
instance.save()
if params.get('add_author'):
assert hasattr(instance._meta, 'auto_author'), "add_author requires to also define auto_author"
author = getattr(instance, instance._meta.auto_author)
if author:
group.user_set.add(author)
for permission in params.get('perms', []):
assign_perm(f'{permission}_{instance._meta.model_name}', group, instance)
def invalidate_cache_if_has_entry(entry):
from djangoldp.serializers import GLOBAL_SERIALIZER_CACHE
if GLOBAL_SERIALIZER_CACHE.has(entry):
GLOBAL_SERIALIZER_CACHE.invalidate(entry)
def invalidate_model_cache_if_has_entry(model):
entry = getattr(model._meta, 'label', None)
invalidate_cache_if_has_entry(entry)
@receiver([pre_save, pre_delete])
def invalidate_caches(sender, instance, **kwargs):
invalidate_model_cache_if_has_entry(sender)
@receiver([m2m_changed])
def invalidate_caches_m2m(sender, instance, action, *args, **kwargs):
invalidate_model_cache_if_has_entry(kwargs['model'])
\ No newline at end of file
from rest_framework.pagination import LimitOffsetPagination
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response
class LDPOffsetPagination(LimitOffsetPagination):
def get_paginated_response(self, data):
next_url = self.get_next_link()
previous_url = self.get_previous_link()
links = []
for url, label in ((previous_url, 'prev'), (next_url, 'next')):
if url is not None:
links.append('<{}>; rel="{}"'.format(url, label))
headers = {'Link': ', '.join(links)} if links else {}
return Response(data, headers=headers)
class LDPPagination(PageNumberPagination):
page_query_param = 'p'
page_size_query_param = 'limit'
def get_paginated_response(self, data):
next_url = self.get_next_link()
previous_url = self.get_previous_link()
links = []
for url, label in ((previous_url, 'prev'), (next_url, 'next')):
if url is not None:
links.append('<{}>; rel="{}"'.format(url, label))
headers = {'Link': ', '.join(links)} if links else {}
return Response(data, headers=headers)
from copy import copy
from django.conf import settings
from django.http import Http404
from rest_framework.permissions import BasePermission, DjangoObjectPermissions, OR, AND
from rest_framework.filters import BaseFilterBackend
from rest_framework_guardian.filters import ObjectPermissionsFilter
from djangoldp.filters import OwnerFilterBackend, NoFilterBackend, PublicFilterBackend, IPFilterBackend, ActiveFilterBackend
from djangoldp.utils import is_anonymous_user, is_authenticated_user, check_client_ip
DEFAULT_DJANGOLDP_PERMISSIONS = {'view', 'add', 'change', 'delete', 'control'}
DEFAULT_RESOURCE_PERMISSIONS = {'view', 'change', 'delete', 'control'}
DEFAULT_CONTAINER_PERMISSIONS = {'view', 'add'}
def join_filter_backends(*permissions_or_filters:BaseFilterBackend|BasePermission, model:object, union:bool=False) -> BaseFilterBackend:
'''Creates a new Filter backend by joining a list of existing backends.
It chains the filterings or joins them, depending on the argument union'''
backends = []
for permission_or_filter in permissions_or_filters:
if hasattr(permission_or_filter, 'get_filter_backend'):
backends.append(permission_or_filter.get_filter_backend(model))
elif isinstance(permission_or_filter, type) and issubclass(permission_or_filter, BaseFilterBackend):
backends.append(permission_or_filter)
class JointFilterBackend(BaseFilterBackend):
def __init__(self) -> None:
self.filters = []
for backend in backends:
if backend:
self.filters.append(backend())
def filter_queryset(self, request:object, queryset:object, view:object) -> object:
if union:
result = queryset.none() #starts with empty for union
else:
result = queryset
for filter in self.filters:
if union:
result = result | filter.filter_queryset(request, queryset, view)
else:
result = filter.filter_queryset(request, result, view)
return result
return JointFilterBackend
permission_map ={
'GET': ['%(app_label)s.view_%(model_name)s'],
'OPTIONS': [],
'HEAD': ['%(app_label)s.view_%(model_name)s'],
'POST': ['%(app_label)s.add_%(model_name)s'],
'PUT': ['%(app_label)s.change_%(model_name)s'],
'PATCH': ['%(app_label)s.change_%(model_name)s'],
'DELETE': ['%(app_label)s.delete_%(model_name)s'],
}
# Patch of OR and AND classes to enable chaining of LDPBasePermission
def OR_get_permissions(self, user, model, obj=None):
perms1 = self.op1.get_permissions(user, model, obj) if hasattr(self.op1, 'get_permissions') else set()
perms2 = self.op2.get_permissions(user, model, obj) if hasattr(self.op2, 'get_permissions') else set()
return set.union(perms1, perms2)
OR.get_permissions = OR_get_permissions
def OR_get_filter_backend(self, model):
return join_filter_backends(self.op1, self.op2, model=model, union=True)
OR.get_filter_backend = OR_get_filter_backend
OR.__repr__ = lambda self: f"{self.op1}|{self.op2}"
def AND_get_permissions(self, user, model, obj=None):
perms1 = self.op1.get_permissions(user, model, obj) if hasattr(self.op1, 'get_permissions') else set()
perms2 = self.op2.get_permissions(user, model, obj) if hasattr(self.op2, 'get_permissions') else set()
return set.intersection(perms1, perms2)
AND.get_permissions = AND_get_permissions
def AND_get_filter_backend(self, model):
return join_filter_backends(self.op1, self.op2, model=model, union=False)
AND.get_filter_backend = AND_get_filter_backend
AND.__repr__ = lambda self: f"{self.op1}&{self.op2}"
class LDPBasePermission(BasePermission):
"""
A base class from which all permission classes should inherit.
Extends the DRF permissions class to include the concept of model-permissions, separate from the view, and to
change to a system of outputting permissions sets for the serialization of WebACLs
"""
# filter backends associated with the permissions class. This will be used to filter queryset in the (auto-generated)
# view for a model, and in the serializing nested fields
filter_backend = ActiveFilterBackend
# by default, all permissions
permissions = getattr(settings, 'DJANGOLDP_PERMISSIONS', DEFAULT_DJANGOLDP_PERMISSIONS)
# perms_map defines the permissions required for different methods
perms_map = permission_map
@classmethod
def get_filter_backend(cls, model):
'''returns the Filter backend associated with this permission class'''
return cls.filter_backend
def check_all_permissions(self, required_permissions):
'''returns True if the all the permissions are included in the permissions of the class'''
return all([permission.split('.')[1].split('_')[0] in self.permissions for permission in required_permissions])
def get_allowed_methods(self):
'''returns the list of methods allowed for the permissions of the class, depending on the permission map'''
return [method for method, permissions in self.perms_map.items() if self.check_all_permissions(permissions)]
def has_permission(self, request, view):
'''checks if the request is allowed at all, based on its method and the permissions of the class'''
return request.method in self.get_allowed_methods()
def has_object_permission(self, request, view, obj=None):
'''checks if the access to the object is allowed,'''
return self.has_permission(request, view)
def get_permissions(self, user, model, obj=None):
'''returns the permissions the user has on a given model or on a given object'''
return self.permissions.intersection(DEFAULT_RESOURCE_PERMISSIONS if obj else DEFAULT_CONTAINER_PERMISSIONS)
class AnonymousReadOnly(LDPBasePermission):
"""Anonymous users can only view, no check for others"""
permissions = {'view'}
def has_permission(self, request, view):
return super().has_permission(request, view) or is_authenticated_user(request.user)
def get_permissions(self, user, model, obj=None):
if is_anonymous_user(user):
return self.permissions
else:
return super().permissions #all permissions
class AuthenticatedOnly(LDPBasePermission):
"""Only authenticated users have permissions"""
def has_permission(self, request, view):
return request.method=='OPTIONS' or is_authenticated_user(request.user)
class ReadOnly(LDPBasePermission):
"""Users can only view"""
permissions = {'view'}
class ReadAndCreate(LDPBasePermission):
"""Users can only view and create"""
permissions = {'view', 'add'}
class CreateOnly(LDPBasePermission):
"""Users can only view and create"""
permissions = {'add'}
class ACLPermissions(DjangoObjectPermissions, LDPBasePermission):
"""Permissions based on the rights given in db, on model for container requests or on object for resource requests"""
filter_backend = ObjectPermissionsFilter
perms_map = permission_map
def has_permission(self, request, view):
if view.action in ('list', 'create'): # The container permission only apply to containers requests
return super().has_permission(request, view)
return True
def get_permissions(self, user, model, obj=None):
model_name = model._meta.model_name
app_label = model._meta.app_label
if obj:
return {perm.replace('_'+model_name, '') for perm in user.get_all_permissions(obj)}
permissions = set(filter(lambda perm: perm.startswith(app_label) and perm.endswith(model_name), user.get_all_permissions()))
return {perm.replace(app_label+'.', '').replace('_'+model_name, '') for perm in permissions}
class OwnerPermissions(LDPBasePermission):
"""Gives all permissions to the owner of the object"""
filter_backend = OwnerFilterBackend
def check_permission(self, user, model, obj):
if user.is_superuser:
return True
if getattr(model._meta, 'owner_field', None):
field = model._meta.get_field(model._meta.owner_field)
if field.many_to_many or field.one_to_many:
return user in getattr(obj, field.get_accessor_name()).all()
else:
return user == getattr(obj, model._meta.owner_field)
if getattr(model._meta, 'owner_urlid_field', None) is not None:
return is_authenticated_user(user) and user.urlid == getattr(obj, model._meta.owner_urlid_field)
return True
def has_object_permission(self, request, view, obj=None):
return self.check_permission(request.user, view.model, obj)
def get_permissions(self, user, model, obj=None):
if not obj or self.check_permission(user, model, obj):
return self.permissions
return set()
class OwnerCreatePermission(LDPBasePermission):
'''only accepts the creation of new resources if the owner of the created resource is the user of the request'''
def check_patch(self, first, second, user):
diff = first - second
return diff == set() or diff == {user.urlid}
def has_permission(self, request:object, view:object) -> bool:
if request.method != 'POST':
return super().has_permission(request, view)
if is_anonymous_user(request.user):
return False
owner = None
if getattr(view.model._meta, 'owner_field', None):
field = view.model._meta.get_field(view.model._meta.owner_field)
if field.many_to_many or field.one_to_many:
owner = request.data[field.get_accessor_name()]
else:
owner = request.data[view.model._meta.owner_field]
if getattr(view.model._meta, 'owner_urlid_field', None):
owner = request.data[view.model._meta.owner_urlid_field]
return not owner or owner['@id'] == request.user.urlid
class PublicPermission(LDPBasePermission):
"""Gives read-only access to resources which have a public flag to True"""
filter_backend = PublicFilterBackend
permissions = {'view', 'add'}
def has_object_permission(self, request, view, obj=None):
assert hasattr(view.model._meta, 'public_field'), \
f'Model {view.model} has PublicPermission applied without "public_field" defined'
public_field = view.model._meta.public_field
if getattr(obj, public_field, False):
return super().has_object_permission(request, view, obj)
return False
class JoinMembersPermission(LDPBasePermission):
filter_backend = None
def has_permission(self, request:object, view:object) -> bool:
if is_anonymous_user(request.user):
return False
return request.method == 'PATCH'
def check_patch(self, first, second, user):
diff = first - second
return diff == set() or diff == {user.urlid}
def has_object_permission(self, request:object, view:object, obj:object) -> bool:
'''only accept patch request, only if the only difference on the user_set is the user'''
if not self.has_permission(request, view) or not obj or not 'user_set' in request.data:
return False
new_members = request.data['user_set']
if not isinstance(new_members, list):
new_members = [new_members]
new_ids = {user['@id'] for user in new_members}
old_ids = {user.urlid for user in obj.members.user_set.all()}
return self.check_patch(new_ids, old_ids, request.user) and self.check_patch(old_ids, new_ids, request.user)
def get_permissions(self, user, model, obj=None):
return set()
class IPOpenPermissions(LDPBasePermission):
filter_backend = IPFilterBackend
def has_permission(self, request, view):
return check_client_ip(request)
def has_object_permission(self, request, view, obj):
return check_client_ip(request)
def get_permissions(self, user, model, obj=None):
#Will always say there is no migrations, not taking the IP into accounts
return set()
class InheritPermissions(LDPBasePermission):
"""Gets the permissions from a related objects"""
@classmethod
def get_parent_fields(cls, model: object) -> list:
'''checks that the model is adequately configured and returns the associated model'''
assert hasattr(model._meta, 'inherit_permissions') and isinstance(model._meta.inherit_permissions, list), \
f'Model {model} has InheritPermissions applied without "inherit_permissions" defined as a list'
return model._meta.inherit_permissions
@classmethod
def get_parent_model(cls, model:object, field_name:str) -> object:
parent_model = model._meta.get_field(field_name).related_model
assert hasattr(parent_model._meta, 'permission_classes'), \
f'Related model {parent_model} has no "permission_classes" defined'
return parent_model
def get_parent_objects(self, obj:object, field_name:str) -> list:
'''gets the parent object'''
if obj is None:
return []
field = obj._meta.get_field(field_name)
if field.many_to_many or field.one_to_many:
return getattr(obj, field.get_accessor_name()).all()
parent = getattr(obj, field_name, None)
return [parent] if parent else []
@classmethod
def clone_with_model(self, request:object, view:object, model:object) -> tuple:
'''changes the model on the argument, so that they can be called on the parent model'''
# For some reason if we copy the request itself, we go into an infinite loop, so take the native request instead
_request = copy(request._request)
_request.model = model
_request.data = request.data #because the data is not present on the native request
_request._request = _request #so that it can be nested
_view = copy(view)
_view.queryset = None #to make sure the model is taken into account
_view.model = model
return _request, _view
@classmethod
def generate_filter_backend(cls, parent:object, field_name:str) -> BaseFilterBackend:
'''returns a new Filter backend that applies all filters of the parent model'''
filter_arg = f'{field_name}__in'
backends = {perm().get_filter_backend(parent) for perm in parent._meta.permission_classes}
class InheritFilterBackend(BaseFilterBackend):
def __init__(self) -> None:
self.filters = []
for backend in backends:
if backend:
self.filters.append(backend())
def filter_queryset(self, request:object, queryset:object, view:object) -> object:
request, view = InheritPermissions.clone_with_model(request, view, parent)
for filter in self.filters:
allowed_parents = filter.filter_queryset(request, parent.objects.all(), view)
queryset = queryset.filter(**{filter_arg: allowed_parents})
return queryset
return InheritFilterBackend
@classmethod
def generate_filter_backend_for_none(cls, fields) -> BaseFilterBackend:
'''returns a new Filter backend that checks that none of the parent fields are set'''
class InheritNoneFilterBackend(BaseFilterBackend):
def filter_queryset(self, request:object, queryset:object, view:object) -> object:
return queryset.filter(**{field: None for field in fields})
return InheritNoneFilterBackend
@classmethod
def get_filter_backend(cls, model:object) -> BaseFilterBackend:
'''Returns a union filter backend of all filter backends of parents'''
fields = cls.get_parent_fields(model)
backends = [cls.generate_filter_backend(cls.get_parent_model(model, field), field) for field in fields]
backend_none = cls.generate_filter_backend_for_none(fields)
return join_filter_backends(*backends, backend_none, model=model, union=True)
def has_permission(self, request:object, view:object) -> bool:
'''Returns True unless we're trying to create a resource with a link to a parent we're not allowed to change'''
if request.method == 'POST':
for field in InheritPermissions.get_parent_fields(view.model):
if field in request.data:
model = InheritPermissions.get_parent_model(view.model, field)
parent = model.objects.get(urlid=request.data[field]['@id'])
_request, _view = InheritPermissions.clone_with_model(request, view, model)
if not all([perm().has_object_permission(_request, _view, parent) for perm in model._meta.permission_classes]):
return False
return True
def has_object_permission(self, request:object, view:object, obj:object) -> bool:
'''Returns True if at least one inheriting object has permission'''
if not obj:
return super().has_object_permission(request, view, obj)
parents = []
for field in InheritPermissions.get_parent_fields(view.model):
model = InheritPermissions.get_parent_model(view.model, field)
parent_request, parent_view = InheritPermissions.clone_with_model(request, view, model)
for parent_object in self.get_parent_objects(obj, field):
parents.append(parent_object)
try:
if all([perm().has_object_permission(parent_request, parent_view, parent_object) for perm in model._meta.permission_classes]):
return True
except Http404:
#keep trying
pass
# return False if there were parent resources but none accepted
return False if parents else True
def get_permissions(self, user:object, model:object, obj:object=None) -> set:
'''returns a union of all inheriting linked permissions'''
perms = set()
parents = []
for field in InheritPermissions.get_parent_fields(model):
parent_model = InheritPermissions.get_parent_model(model, field)
for parent_object in self.get_parent_objects(obj, field):
parents.append(parent_object)
perms = perms.union(set.intersection(*[perm().get_permissions(user, parent_model, parent_object)
for perm in parent_model._meta.permission_classes]))
if parents:
return perms
return super().get_permissions(user, model, obj)
\ No newline at end of file
from rest_framework.utils import model_meta
def get_prefetch_fields(model, serializer, depth, prepend_string=''):
'''
This method should then be used with queryset.prefetch_related, to auto-fetch joined resources (to speed up nested serialization)
This can speed up ModelViewSet and LDPViewSet alike by as high a factor as 2
:param model: the model to be analysed
:param serializer: an LDPSerializer instance. Used to extract the fields for each nested model
:param depth: the depth at which to stop the recursion (should be set to the configured depth of the ViewSet)
:param prepend_string: should be set to the default. Used in recursive calls
:return: set of strings to prefetch for a given model. Including serialized nested fields and foreign keys recursively
called on many-to-many fields until configured depth reached
'''
# the objective is to build a list of fields and nested fields which should be prefetched for the optimisation
# of database queries
fields = set()
# get a list of all fields which would be serialized on this model
# TODO: dynamically generating serializer fields is necessary to retrieve many-to-many fields at depth > 0,
# but the _all_ default has issues detecting reverse many-to-many fields
# meta_args = {'model': model, 'depth': 0, 'fields': getattr(model._meta, 'serializer_fields', '__all__')}
# meta_class = type('Meta', (), meta_args)
# serializer = (type(LDPSerializer)('TestSerializer', (LDPSerializer,), {'Meta': meta_class}))()
serializer_fields = set([f for f in serializer.get_fields()])
empty_containers = getattr(model._meta, 'empty_containers', [])
# we are only interested in foreign keys (and many-to-many relationships)
model_relations = model_meta.get_field_info(model).relations
for field_name, relation_info in model_relations.items():
# foreign keys can be added without fuss
if not relation_info.to_many:
fields.add((prepend_string + field_name))
continue
# nested fields should be added if serialized
if field_name in serializer_fields and field_name not in empty_containers:
fields.add((prepend_string + field_name))
# and they should also have their immediate foreign keys prefetched if depth not reached
if depth >= 0:
new_prepend_str = prepend_string + field_name + '__'
fields = fields.union(get_prefetch_fields(relation_info.related_model, serializer, depth - 1, new_prepend_str))
return fields
This diff is collapsed.
<!DOCTYPE html>
<html>
<head>
<title>Swagger</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" type="text/css" href="https://unpkg.com/swagger-ui-dist@3/swagger-ui.css">
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@3/swagger-ui-bundle.js"></script>
<script>
const ui = SwaggerUIBundle({
url: "{% url 'schema' %}",
dom_id: '#swagger-ui',
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIBundle.SwaggerUIStandalonePreset
],
layout: "BaseLayout",
requestInterceptor: (request) => {
request.headers['X-CSRFToken'] = "{{ csrf_token }}"
return request;
}
})
</script>
</body>
</html>
\ No newline at end of file
"""
This module is meant to be used as a testing LDP package.
It contains configuration elements imported by a djangoldp-package
when the django server is setup.
"""
# define an extra variables
MYPACKAGEVAR = 'ok'
USE_I18N = False
# register an extra middleware
MIDDLEWARE = [
'djangoldp.tests.dummy.middleware.DummyMiddleware'
]
# register an extra installed app
INSTALLED_APPS = [
'djangoldp.tests.dummy.apps.DummyConfig'
]
SECRET_KEY = "$r&)p-4k@h5b!1yrft6&q%j)_p$lxqh6#)jeeu0z1iag&y&wdu"
from django.urls import path
from djangoldp.tests.models import Message, Conversation, Dummy, PermissionlessDummy, Task, DateModel, LDPDummy
from djangoldp.permissions import ACLPermissions
from djangoldp.views import LDPViewSet
urlpatterns = [
path('messages/', LDPViewSet.urls(model=Message, fields=["@id", "text", "conversation"], nested_fields=['conversation'])),
path('tasks/', LDPViewSet.urls(model=Task)),
path('conversations/', LDPViewSet.urls(model=Conversation, nested_fields=["message_set", "observers"])),
path('dummys/', LDPViewSet.urls(model=Dummy, lookup_field='slug',)),
path('permissionless-dummys/', LDPViewSet.urls(model=PermissionlessDummy, lookup_field='slug', permission_classes=[ACLPermissions])),
]
"""This module contains apps for testing."""
from django.apps import AppConfig
class DummyConfig(AppConfig):
# 'djangoldp.tests' is already registered as an installed app (it simulates a LDP package)
name = 'djangoldp.tests.dummy'
"""This module contains a dummy middleware for djangoldp testing."""
class DummyMiddleware(object):
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
This diff is collapsed.