diff --git a/.gitignore b/.gitignore index 4450e25a9279ad60226f6701474f18e72864ed74..cde3d5b2fd5c9c40159ab399b341e397c9a1e144 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ build *~ *.swp djangoldp/tests/tests_temp.py +*/.idea/* +.DS_STORE diff --git a/djangoldp/__init__.py b/djangoldp/__init__.py index cffed812699f3ca38c55de9ec24a11b1fc3228dc..960dbcfacc5e7c2d623999303a59e81c01884687 100644 --- a/djangoldp/__init__.py +++ b/djangoldp/__init__.py @@ -5,5 +5,6 @@ __version__ = '0.0.0' options.DEFAULT_NAMES += ( 'lookup_field', 'rdf_type', 'rdf_context', 'auto_author', 'auto_author_field', 'owner_field', 'view_set', 'container_path', 'permission_classes', 'serializer_fields', 'serializer_fields_exclude', 'empty_containers', - 'nested_fields', 'nested_fields_exclude', 'depth', 'anonymous_perms', 'authenticated_perms', 'owner_perms') + 'nested_fields', 'nested_fields_exclude', 'depth', 'anonymous_perms', 'authenticated_perms', 'owner_perms', + 'superuser_perms') default_app_config = 'djangoldp.apps.DjangoldpConfig' diff --git a/djangoldp/filters.py b/djangoldp/filters.py index 4fc943c43f5d8ea9a24a4d2ef6a8fd6b1048a619..8e1b441fba895269c9d4c91c7ed07cadfd659dfd 100644 --- a/djangoldp/filters.py +++ b/djangoldp/filters.py @@ -1,7 +1,6 @@ -from django.conf import settings -from guardian.utils import get_anonymous_user from rest_framework.filters import BaseFilterBackend from rest_framework_guardian.filters import ObjectPermissionsFilter +from djangoldp.utils import is_anonymous_user class LDPPermissionsFilterBackend(ObjectPermissionsFilter): @@ -9,16 +8,37 @@ class LDPPermissionsFilterBackend(ObjectPermissionsFilter): Default FilterBackend for LDPPermissions. If user does not have model-level permissions, filters by Django-Guardian's get_objects_for_user """ + + shortcut_kwargs = { + 'accept_global_perms': False, + 'with_superuser': True + } + def filter_queryset(self, request, queryset, view): - from djangoldp.permissions import LDPPermissions + from djangoldp.models import Model + from djangoldp.permissions import LDPPermissions, ModelConfiguredPermissions # compares the requirement for GET, with what the user has on the MODEL - if LDPPermissions.has_model_view_permission(request, view.model): + ldp_permissions = LDPPermissions() + if ldp_permissions.has_container_permission(request, view): return queryset - if not request.user.is_anonymous or ( - getattr(settings, 'ANONYMOUS_USER_NAME', True) is not None and - request.user != get_anonymous_user()): - return super().filter_queryset(request, queryset, view) + + if not is_anonymous_user(request.user): + # those objects I have by grace of group or object + # first figure out if the superuser has special permissions (important to the implementation in superclass) + perms_class = ModelConfiguredPermissions() + anon_perms, auth_perms, owner_perms, superuser_perms = perms_class.get_permission_settings(view.model) + self.shortcut_kwargs['with_superuser'] = 'view' in superuser_perms + + object_perms = super().filter_queryset(request, queryset, view) + + # those objects I have by grace of being owner + if Model.get_meta(view.model, 'owner_field', None) is not None: + if 'view' in owner_perms: + owned_objects = [q.pk for q in queryset if Model.is_owner(view.model, request.user, q)] + return object_perms | queryset.filter(pk__in=owned_objects) + return object_perms + # user is anonymous without anonymous permissions return view.model.objects.none() diff --git a/djangoldp/migrations/0015_auto_20210125_1847.py b/djangoldp/migrations/0015_auto_20210125_1847.py new file mode 100644 index 0000000000000000000000000000000000000000..a4050aeabf2b582cb9da54c112dfcad0d36f1026 --- /dev/null +++ b/djangoldp/migrations/0015_auto_20210125_1847.py @@ -0,0 +1,29 @@ +# Generated by Django 2.2.17 on 2021-01-25 18:47 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('djangoldp', '0014_auto_20200909_2206'), + ] + + operations = [ + migrations.AlterModelOptions( + name='activity', + options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']}, + ), + migrations.AlterModelOptions( + name='follower', + options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']}, + ), + migrations.AlterModelOptions( + name='ldpsource', + options={'default_permissions': ['add', 'change', 'delete', 'view', 'control'], 'ordering': ('federation',)}, + ), + migrations.AlterModelOptions( + name='scheduledactivity', + options={'default_permissions': ['add', 'change', 'delete', 'view', 'control']}, + ), + ] diff --git a/djangoldp/models.py b/djangoldp/models.py index 332721bb2dd240ce879b2e7c3f3ad1f13cb9bcde..39df689de25095bf6f473868b22fa1bd1e54c4c4 100644 --- a/djangoldp/models.py +++ b/djangoldp/models.py @@ -1,6 +1,7 @@ import json import logging import uuid +import copy from urllib.parse import urlparse from django.conf import settings @@ -18,7 +19,7 @@ from django.utils.decorators import classonlymethod from rest_framework.utils import model_meta from djangoldp.fields import LDPUrlField -from djangoldp.permissions import LDPPermissions +from djangoldp.permissions import LDPPermissions, DEFAULT_DJANGOLDP_PERMISSIONS logger = logging.getLogger('djangoldp') @@ -156,7 +157,7 @@ class Model(models.Model): return path class Meta: - default_permissions = ('add', 'change', 'delete', 'view', 'control') + default_permissions = DEFAULT_DJANGOLDP_PERMISSIONS abstract = True depth = 0 @@ -266,7 +267,7 @@ class Model(models.Model): return None @classonlymethod - def get_permission_classes(cls, related_model, default_permissions_classes) -> LDPPermissions: + def get_permission_classes(cls, related_model, default_permissions_classes): '''returns the permission_classes set in the models Meta class''' return cls.get_meta(related_model, 'permission_classes', default_permissions_classes) @@ -281,12 +282,49 @@ class Model(models.Model): return default return getattr(model_class._meta, meta_name, meta) - @staticmethod - def get_permissions(obj_or_model, context, filter): - permissions = filter - for permission_class in Model.get_permission_classes(obj_or_model, [LDPPermissions]): - permissions = permission_class().filter_user_perms(context, obj_or_model, permissions) - return [{'mode': {'@type': name.split('_')[0]}} for name in permissions] + @classmethod + def get_model_class(cls): + return cls + + @classonlymethod + def get_container_permissions(cls, model_class, request, view, obj=None): + '''outputs the permissions given by all permissions_classes on the model_class on the model-level''' + perms = set() + view = copy.copy(view) + view.model = model_class + for permission_class in Model.get_permission_classes(model_class, [LDPPermissions]): + if hasattr(permission_class, 'get_container_permissions'): + perms = perms.union(permission_class().get_container_permissions(request, view, obj)) + return perms + + @classonlymethod + def get_object_permissions(cls, model_class, request, view, obj): + '''outputs the permissions given by all permissions_classes on the model_class on the object-level''' + perms = set() + for permission_class in Model.get_permission_classes(model_class, [LDPPermissions]): + if hasattr(permission_class, 'get_object_permissions'): + perms = perms.union(permission_class().get_object_permissions(request, view, obj)) + return perms + + @classonlymethod + def get_permissions(cls, model_class, request, view, obj=None): + '''outputs the permissions given by all permissions_classes on the model_class on both the model and the object level''' + perms = Model.get_container_permissions(model_class, request, view, obj) + if obj is not None: + perms = perms.union(Model.get_object_permissions(model_class, request, view, obj)) + return perms + + @classmethod + def is_owner(cls, model_class, user, obj): + '''returns True if I given user is the owner of given object instance, otherwise False''' + owner_field = Model.get_meta(model_class, 'owner_field') + + if owner_field is None: + return False + + return (getattr(obj, owner_field) == user + or (hasattr(user, 'urlid') and getattr(obj, owner_field) == user.urlid) + or getattr(obj, owner_field) == user.id) @classmethod def is_external(cls, value): @@ -405,7 +443,6 @@ def auto_urlid(sender, instance, **kwargs): @receiver([pre_save, pre_delete, m2m_changed]) def invalidate_caches(instance, **kwargs): from djangoldp.serializers import LDListMixin, LDPSerializer - LDPPermissions.invalidate_cache() LDListMixin.to_representation_cache.reset() if hasattr(instance, 'urlid'): diff --git a/djangoldp/permissions.py b/djangoldp/permissions.py index 6be469add913d1cebf85db07fd94377c73989e12..2abdd769b4b3aaab6acc47586a7ac9fb6016a284 100644 --- a/djangoldp/permissions.py +++ b/djangoldp/permissions.py @@ -1,113 +1,23 @@ -import time from django.conf import settings from django.contrib.auth.models import _user_get_all_permissions -from django.core.exceptions import PermissionDenied -from django.db.models.base import ModelBase +from django.contrib.auth import _get_backends from rest_framework.permissions import DjangoObjectPermissions +from djangoldp.utils import is_anonymous_user from djangoldp.filters import LDPPermissionsFilterBackend -class LDPPermissions(DjangoObjectPermissions): - # *DEFAULT* permissions for anon, auth and owner statuses - anonymous_perms = ['view'] - authenticated_perms = ['inherit'] - owner_perms = ['inherit'] - # filter backends associated with the permissions class. This will be used to filter queryset in the (auto-generated) - # view for a model, and in the serializing nested fields - filter_backends = [LDPPermissionsFilterBackend] - - perms_cache = { - 'time': time.time() - } - with_cache = getattr(settings, 'PERMISSIONS_CACHE', True) - - @classmethod - def invalidate_cache(cls): - cls.perms_cache = { - 'time': time.time() - } - - @classmethod - def refresh_cache(cls): - if (time.time() - cls.perms_cache['time']) > 5: - cls.invalidate_cache() - - @classmethod - def is_owner(cls, user, model, obj): - return obj and hasattr(model._meta, 'owner_field') and ( - getattr(obj, getattr(model._meta, 'owner_field')) == user - or (hasattr(user, 'urlid') and getattr(obj, getattr(model._meta, 'owner_field')) == user.urlid) - or getattr(obj, getattr(model._meta, 'owner_field')) == user.id) - - def _get_cache_key(self, model_name, user, obj): - user_key = 'None' if user is None else user.id - obj_key = 'None' if obj is None else obj.id - return 'User{}{}{}'.format(user_key, model_name, obj_key) - - @classmethod - def get_model_level_perms(cls, model, user, obj=None): - '''Auxiliary function returns the model-level anon-auth-owner permissions for a given, model, user and object''' - anonymous_perms = getattr(model._meta, 'anonymous_perms', cls.anonymous_perms) - authenticated_perms = getattr(model._meta, 'authenticated_perms', cls.authenticated_perms) - owner_perms = getattr(model._meta, 'owner_perms', cls.owner_perms) - - # 'inherit' permissions means inherit the permissions from the next level 'down' - if 'inherit' in authenticated_perms: - authenticated_perms = authenticated_perms + list(set(anonymous_perms) - set(authenticated_perms)) - if 'inherit' in owner_perms: - owner_perms = owner_perms + list(set(authenticated_perms) - set(owner_perms)) - - # apply user permissions and return - perms = set() - if user.is_anonymous: - perms = perms.union(set(anonymous_perms)) - else: - if cls.is_owner(user, model, obj): - perms = perms.union(set(owner_perms)) - else: - perms = perms.union(set(authenticated_perms)) - return perms - - def user_permissions(self, user, obj_or_model, obj=None): - """ - Filter user permissions for a model class - """ - self.refresh_cache() - # this may be a permission for the model class, or an instance - if isinstance(obj_or_model, ModelBase): - model = obj_or_model - else: - obj = obj_or_model - model = obj_or_model.__class__ - model_name = model._meta.model_name - - perms_cache_key = self.cache_key(model, obj, user) - if self.with_cache and perms_cache_key in self.perms_cache: - return self.perms_cache[perms_cache_key] - - # return permissions - using set to avoid duplicates - perms = self.get_model_level_perms(model, user, obj) - - if obj is not None and not user.is_anonymous: - # get permissions from all backends and then remove model name from the permissions - forbidden_string = "_" + model_name - perms = perms.union(set([p.replace(forbidden_string, '') for p in _user_get_all_permissions(user, obj)])) - - self.perms_cache[perms_cache_key] = list(perms) +DEFAULT_DJANGOLDP_PERMISSIONS = ['add', 'change', 'delete', 'view', 'control'] - return self.perms_cache[perms_cache_key] - - def cache_key(self, model, obj, user): - model_name = model._meta.model_name - user_key = 'None' if user is None else user.id - obj_key = 'None' if obj is None else obj.id - perms_cache_key = 'User{}{}{}'.format(user_key, model_name, obj_key) - return perms_cache_key - - def filter_user_perms(self, context, obj_or_model, permissions): - # Only used on Model.get_permissions to translate permissions to LDP - return [perm for perm in permissions if perm in self.user_permissions(context['request'].user, obj_or_model)] +class LDPBasePermission(DjangoObjectPermissions): + """ + A base class from which all permission classes should inherit. + Extends the DRF permissions class to include the concept of model-permissions, separate from the view, and to + change to a system of outputting permissions sets for the serialization of WebACLs + """ + # filter backends associated with the permissions class. This will be used to filter queryset in the (auto-generated) + # view for a model, and in the serializing nested fields + filter_backends = [] # perms_map defines the permissions required for different methods perms_map = { 'GET': ['%(app_label)s.view_%(model_name)s'], @@ -119,83 +29,171 @@ class LDPPermissions(DjangoObjectPermissions): 'DELETE': ['%(app_label)s.delete_%(model_name)s'], } - @classmethod - def get_permissions(cls, method, obj): + def get_container_permissions(self, request, view, obj=None): """ - Translate perms_map to request + outputs a set of permissions of a given container. Used in the generation of WebACLs in LDPSerializer """ - kwargs = { - 'app_label': obj._meta.app_label, - 'model_name': obj._meta.model_name - } + return set() - # Only allows methods that are on perms_map - if method not in cls.perms_map: - raise PermissionDenied + def get_object_permissions(self, request, view, obj): + """ + outputs the permissions of a given object instance. Used in the generation of WebACLs in LDPSerializer + """ + return set() - return [perm % kwargs for perm in cls.perms_map[method]] + def get_user_permissions(self, request, view, obj): + ''' + returns a set of all model permissions and object permissions for given parameters + You shouldn't override this function + ''' + perms = self.get_container_permissions(request, view, obj) + if obj is not None: + return perms.union(self.get_object_permissions(request, view, obj)) + return perms def has_permission(self, request, view): + """concerned with the permissions to access the _view_""" + return True + + def has_container_permission(self, request, view): """ - Access to containers + concerned with the permissions to access the _model_ + in most situations you won't need to override this. It is primarily called by has_object_permission + checked when POSTing to LDPViewSet """ + required_perms = self.get_required_permissions(request.method, view.model) + return self.compare_permissions(required_perms, self.get_container_permissions(request, view)) + + def has_object_permission(self, request, view, obj): + """concerned with the permissions to access the _object_""" + required_perms = self.get_required_permissions(request.method, view.model) + return self.compare_permissions(required_perms, self.get_user_permissions(request, view, obj)) + + def compare_permissions(self, required_perms, user_perms): + '''returns True if all user_perms are in required_perms''' + for perm in required_perms: + if not perm.split('.')[-1].split('_')[0] in user_perms: + return False + return True + + +class ModelConfiguredPermissions(LDPBasePermission): + # *DEFAULT* model-level permissions for anon, auth and owner statuses + anonymous_perms = ['view'] + authenticated_perms = ['inherit'] + owner_perms = ['inherit'] + # superuser has all permissions by default + superuser_perms = getattr(settings, 'DEFAULT_SUPERUSER_PERMS', DEFAULT_DJANGOLDP_PERMISSIONS) + + def _get_permissions_setting(self, model, setting, parent_perms=None): + '''Auxiliary function returns the configured permissions given to parameterised setting, or default''' from djangoldp.models import Model - if self.is_a_container(request._request.path): - try: - obj = Model.resolve_parent(request.path) - model = view.parent_model - except: - obj = None - model = view.model + # gets the model-configured setting or default if it exists + return_perms = Model.get_meta(model, setting, getattr(self, setting)) + + if parent_perms is not None and 'inherit' in return_perms: + return_perms = return_perms + list(set(parent_perms) - set(return_perms)) + + return return_perms + + def get_permission_settings(self, model): + '''returns a tuple of (Auth, Anon, Owner) settings for a given model''' + anonymous_perms = self._get_permissions_setting(model, 'anonymous_perms') + authenticated_perms = self._get_permissions_setting(model, 'authenticated_perms', anonymous_perms) + owner_perms = self._get_permissions_setting(model, 'owner_perms', authenticated_perms) + superuser_perms = self._get_permissions_setting(model, 'superuser_perms', owner_perms) + + return anonymous_perms, authenticated_perms, owner_perms, superuser_perms + + def get_container_permissions(self, request, view, obj=None): + '''analyses the Model's set anonymous, authenticated and owner_permissions and returns these''' + from djangoldp.models import Model + + model = view.model + anonymous_perms, authenticated_perms, owner_perms, superuser_perms = self.get_permission_settings(model) + + perms = super().get_container_permissions(request, view, obj) + if is_anonymous_user(request.user): + perms = perms.union(set(anonymous_perms)) else: - obj = Model.resolve_id(request._request.path) - model = view.model + if obj is not None and Model.is_owner(view.model, request.user, obj): + perms = perms.union(set(owner_perms)) + else: + perms = perms.union(set(authenticated_perms)) - # get permissions required - perms = LDPPermissions.get_permissions(request.method, model) - user_perms = self.user_permissions(request.user, model, obj) + if request.user.is_superuser: + perms = perms.union(set(superuser_perms)) + return perms - # compare them with the permissions I have - for perm in perms: - if not perm.split('.')[-1].split('_')[0] in user_perms: + def has_permission(self, request, view): + """concerned with the permissions to access the _view_""" + if is_anonymous_user(request.user): + if not self.has_container_permission(request, view): return False - return True - def is_a_container(self, path): + +class LDPObjectLevelPermissions(LDPBasePermission): + def get_all_user_object_permissions(self, user, obj): + return _user_get_all_permissions(user, obj) + + def get_object_permissions(self, request, view, obj): + '''overridden to append permissions from all backends given to the user (e.g. Groups and object-level perms)''' from djangoldp.models import Model - container, id = Model.resolve(path) - return id is None + + model_name = Model.get_meta(view.model, 'model_name') + + perms = super().get_object_permissions(request, view, obj) + + if obj is not None and not is_anonymous_user(request.user): + forbidden_string = "_" + model_name + return perms.union(set([p.replace(forbidden_string, '') for p in + self.get_all_user_object_permissions(request.user, obj)])) + + return perms + + +class SuperUserPermission(LDPBasePermission): + filter_backends = [] + + def get_container_permissions(self, request, view, obj=None): + if request.user.is_superuser: + return set(DEFAULT_DJANGOLDP_PERMISSIONS) + return super().get_container_permissions(request, view, obj) + + def get_object_permissions(self, request, view, obj): + if request.user.is_superuser: + return set(DEFAULT_DJANGOLDP_PERMISSIONS) + return super().get_object_permissions(request, view, obj) + + def has_permission(self, request, view): + if request.user.is_superuser: + return True + return super().has_permission(request, view) + + def has_container_permission(self, request, view): + if request.user.is_superuser: + return True + return super().has_container_permission(request, view) def has_object_permission(self, request, view, obj): - """ - Access to objects - User have permission on request: Continue - User does not have permission: 403 - """ - # get permissions required - perms = LDPPermissions.get_permissions(request.method, obj) - model = obj - user_perms = self.user_permissions(request.user, model, obj) + if request.user.is_superuser: + return True + return super().has_object_permission(request, view, obj) - return LDPPermissions.compare_permissions(perms, user_perms) - @classmethod - def has_model_view_permission(cls, request, model): - ''' - shortcut to compare the requested user's permissions on the model-level - :return: True or False - ''' - # compare required permissions with those I have (on the model) - perms = LDPPermissions.get_permissions('GET', model) - user_perms = LDPPermissions.get_model_level_perms(model, request.user) - return cls.compare_permissions(perms, user_perms) - - @classmethod - def compare_permissions(self, perms, user_perms): - # compare them with the permissions I have - for perm in perms: - if not perm.split('.')[-1].split('_')[0] in user_perms: - return False - return True +class LDPPermissions(LDPObjectLevelPermissions, ModelConfiguredPermissions): + filter_backends = [LDPPermissionsFilterBackend] + + def get_all_user_object_permissions(self, user, obj): + # if the super_user perms are no different from authenticated_perms, then we want to skip Django's auth backend + restore_super = False + if user.is_superuser: + user.is_superuser = False + restore_super = True + + perms = super().get_all_user_object_permissions(user, obj) + + user.is_superuser = restore_super + return perms diff --git a/djangoldp/serializers.py b/djangoldp/serializers.py index 36059854d0a43f712af2ef73b67ca155482a8617..c1014e1e8a562e695c30b7d5845599adbef2d718 100644 --- a/djangoldp/serializers.py +++ b/djangoldp/serializers.py @@ -29,6 +29,12 @@ from djangoldp.models import Model from djangoldp.permissions import LDPPermissions +# defaults for various DjangoLDP settings (see documentation) +SERIALIZE_EXCLUDE_PERMISSIONS_DEFAULT = ['inherit'] +SERIALIZE_EXCLUDE_CONTAINER_PERMISSIONS_DEFAULT = ['delete'] +SERIALIZE_EXCLUDE_OBJECT_PERMISSIONS_DEFAULT = [] + + class InMemoryCache: def __init__(self): @@ -60,6 +66,26 @@ class InMemoryCache: self.cache[cache_key].pop(vary, None) +def _serialize_permissions(permissions, exclude_perms): + '''takes a set or list of permissions and returns them in the JSON-LD format''' + exclude_perms = set(exclude_perms).union( + getattr(settings, 'SERIALIZE_EXCLUDE_PERMISSIONS', SERIALIZE_EXCLUDE_PERMISSIONS_DEFAULT)) + + return [{'mode': {'@type': name}} for name in permissions if name not in exclude_perms] + + +def _serialize_container_permissions(permissions): + exclude = getattr(settings, 'SERIALIZE_EXCLUDE_CONTAINER_PERMISSIONS', + SERIALIZE_EXCLUDE_CONTAINER_PERMISSIONS_DEFAULT) + return _serialize_permissions(permissions, exclude) + + +def _serialize_object_permissions(permissions): + exclude = getattr(settings, 'SERIALIZE_EXCLUDE_OBJECT_PERMISSIONS', + SERIALIZE_EXCLUDE_OBJECT_PERMISSIONS_DEFAULT) + return _serialize_permissions(permissions, exclude) + + def _ldp_container_representation(id, container_permissions=None, value=None): '''Utility function builds a LDP-format dictionary for passed container data''' represented_object = {'@id': id} @@ -114,6 +140,22 @@ class LDListMixin: - Can Add if add permission on contained object's type - Can view the container is view permission on container model : container obj are filtered by view permission ''' + def check_cache(): + '''Auxiliary function to avoid code duplication - checks cache and returns from it if it has entry''' + if not self.id.startswith('http'): + self.id = '{}{}{}'.format(settings.BASE_URL, Model.resource(parent_model), self.id) + + cache_key = self.id + if self.with_cache and self.to_representation_cache.has(cache_key, cache_vary): + return self.to_representation_cache.get(cache_key, cache_vary) + + # if this field is listed as an "empty_container" it means that it should only be serialized with @id + if getattr(self, 'parent', None) is not None and getattr(self, 'field_name', None) is not None: + empty_containers = getattr(self.parent.Meta.model._meta, 'empty_containers', None) + + if empty_containers is not None and self.field_name in empty_containers: + return _ldp_container_representation(self.id) + try: child_model = getattr(self, self.child_attr).Meta.model except AttributeError: @@ -124,14 +166,9 @@ class LDListMixin: cache_vary = str(self.context['request'].user) if not isinstance(value, Iterable) and not isinstance(value, QuerySet): - if not self.id.startswith('http'): - self.id = '{}{}{}'.format(settings.BASE_URL, Model.resource(parent_model), self.id) - - cache_key = self.id - if self.with_cache and self.to_representation_cache.has(cache_key, cache_vary): - return self.to_representation_cache.get(cache_key, cache_vary) - - container_permissions = Model.get_permissions(child_model, self.context, ['view', 'add']) + check_cache() + container_permissions = _serialize_container_permissions( + Model.get_container_permissions(child_model, self.context['request'], self.context['view'])) else: try: @@ -139,27 +176,16 @@ class LDListMixin: except: parent_model = child_model - if not self.id.startswith('http'): - self.id = '{}{}{}'.format(settings.BASE_URL, Model.resource(parent_model), self.id) - - cache_key = self.id - if self.with_cache and self.to_representation_cache.has(cache_key, cache_vary): - return self.to_representation_cache.get(cache_key, cache_vary) - - container_permissions = Model.get_permissions(child_model, self.context, ['add']) - container_permissions.extend(Model.get_permissions(parent_model, self.context, ['view'])) + check_cache() # optimize: filter the queryset automatically based on child model permissions classes (filter_backends) if isinstance(value, QuerySet) and hasattr(child_model, 'get_queryset'): value = child_model.get_queryset(self.context['request'], self.context['view'], queryset=value, model=child_model) - # if this field is listed as an "empty_container" it means that it should only be serialized with @id - if getattr(self, 'parent', None) is not None and getattr(self, 'field_name', None) is not None: - empty_containers = getattr(self.parent.Meta.model._meta, 'empty_containers', None) - - if empty_containers is not None and self.field_name in empty_containers: - return _ldp_container_representation(self.id) + container_permissions = Model.get_container_permissions(child_model, self.context['request'], self.context['view']) + container_permissions = _serialize_container_permissions(container_permissions.union( + Model.get_container_permissions(parent_model, self.context['request'], self.context['view']))) self.to_representation_cache.set(self.id, cache_vary, _ldp_container_representation(self.id, @@ -380,9 +406,14 @@ class LDPSerializer(HyperlinkedModelSerializer): data['@id'] = data.pop('urlid')['@id'] if not '@id' in data: data['@id'] = '{}{}'.format(settings.SITE_URL, Model.resource(obj)) + data = _serialize_rdf_fields(obj, data, include_context=True) - data['permissions'] = Model.get_permissions(obj, self.context, - ['view', 'change', 'control', 'delete']) + if hasattr(obj, 'get_model_class'): + model_class = obj.get_model_class() + else: + model_class = type(obj) + data['permissions'] = _serialize_object_permissions( + Model.get_permissions(model_class, self.context['request'], self.context['view'], obj)) return data @@ -411,9 +442,10 @@ class LDPSerializer(HyperlinkedModelSerializer): if isinstance(instance, QuerySet): data = list(instance) - id = '{}{}{}/'.format(settings.SITE_URL, '{}{}/', self.source) - permissions = Model.get_permissions(self.parent.Meta.model, self.context, ['view', 'add']) + permissions = _serialize_container_permissions(Model.get_permissions(self.parent.Meta.model, + self.parent.context['request'], + self.parent.context['view'])) data = [serializer.to_representation(item) if item is not None else None for item in data] return _ldp_container_representation(id, container_permissions=permissions, value=data) else: @@ -655,7 +687,7 @@ class LDPSerializer(HyperlinkedModelSerializer): def internal_create(self, validated_data, model): validated_data = self.resolve_fk_instances(model, validated_data, True) - # build tuples list of nested_field keys and their values + # build tuples list of nested_field keys and their values. All list values are considered nested fields nested_fields = [] nested_list_fields_name = list(filter(lambda key: isinstance(validated_data[key], list), validated_data)) for field_name in nested_list_fields_name: diff --git a/djangoldp/tests/djangoldp_urls.py b/djangoldp/tests/djangoldp_urls.py index 05d28e6fc0f28ed393673b56ea74697bb39e6c26..4f21ed752024e0b4aa0c752ce22fecc1416c7460 100644 --- a/djangoldp/tests/djangoldp_urls.py +++ b/djangoldp/tests/djangoldp_urls.py @@ -1,7 +1,7 @@ from django.conf.urls import re_path from djangoldp.permissions import LDPPermissions -from djangoldp.tests.models import Skill, JobOffer, Message, Conversation, Dummy, PermissionlessDummy, Task, DateModel +from djangoldp.tests.models import Skill, JobOffer, Message, Conversation, Dummy, PermissionlessDummy, Task, DateModel, LDPDummy from djangoldp.views import LDPViewSet urlpatterns = [ @@ -10,6 +10,7 @@ urlpatterns = [ re_path(r'^tasks/', LDPViewSet.urls(model=Task, permission_classes=[LDPPermissions])), re_path(r'^dates/', LDPViewSet.urls(model=DateModel, permission_classes=[LDPPermissions])), re_path(r'^dummys/', LDPViewSet.urls(model=Dummy, permission_classes=[LDPPermissions], lookup_field='slug',)), + re_path(r'^ldpdummys/', LDPViewSet.urls(model=LDPDummy, permission_classes=[LDPPermissions], nested_fields=['anons'])), re_path(r'^permissionless-dummys/', LDPViewSet.urls(model=PermissionlessDummy, permission_classes=[LDPPermissions], lookup_field='slug',)), ] diff --git a/djangoldp/tests/models.py b/djangoldp/tests/models.py index 4c94988acab2823b6fa68edf2df1b2c6e7186869..bada7baf07bd97705893047ebbc74f1584ca5e6f 100644 --- a/djangoldp/tests/models.py +++ b/djangoldp/tests/models.py @@ -1,12 +1,10 @@ from django.conf import settings from django.contrib.auth.models import AbstractUser from django.db import models -from django.db.models.signals import post_save -from django.dispatch import receiver from django.utils.datetime_safe import date from djangoldp.models import Model -from djangoldp.permissions import LDPPermissions +from djangoldp.permissions import LDPPermissions, SuperUserPermission class User(AbstractUser, Model): @@ -31,9 +29,9 @@ class Skill(Model): class Meta(Model.Meta): anonymous_perms = ['view'] - authenticated_perms = ['inherit', 'add'] - owner_perms = ['inherit', 'change', 'delete', 'control'] - serializer_fields = ["@id", "title", "recent_jobs", "slug"] + authenticated_perms = ['inherit', 'add', 'change'] + owner_perms = ['inherit', 'delete', 'control'] + serializer_fields = ["@id", "title", "recent_jobs", "slug", "obligatoire"] lookup_field = 'slug' rdf_type = 'hd:skill' @@ -87,6 +85,35 @@ class Resource(Model): rdf_type = 'hd:Resource' +# a resource in which only the owner has permissions (for testing owner permissions) +class OwnedResource(Model): + description = models.CharField(max_length=255, blank=True, null=True) + user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, related_name="owned_resources", + on_delete=models.CASCADE) + + class Meta(Model.Meta): + anonymous_perms = [] + authenticated_perms = [] + owner_perms = ['view', 'delete', 'add', 'change', 'control'] + owner_field = 'user' + serializer_fields = ['@id', 'description', 'user'] + depth = 1 + + +class OwnedResourceVariant(Model): + description = models.CharField(max_length=255, blank=True, null=True) + user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, related_name="owned_variant_resources", + on_delete=models.CASCADE) + + class Meta(Model.Meta): + anonymous_perms = [] + authenticated_perms = ['view', 'change'] + owner_perms = ['view', 'delete', 'add', 'change', 'control'] + owner_field = 'user' + serializer_fields = ['@id', 'description', 'user'] + depth = 1 + + class UserProfile(Model): description = models.CharField(max_length=255, blank=True, null=True) user = models.OneToOneField(settings.AUTH_USER_MODEL, related_name='userprofile', on_delete=models.CASCADE) @@ -179,8 +206,8 @@ class Invoice(Model): class Meta(Model.Meta): depth = 2 anonymous_perms = ['view'] - authenticated_perms = ['inherit', 'add'] - owner_perms = ['inherit', 'change', 'delete', 'control'] + authenticated_perms = ['inherit', 'add', 'change'] + owner_perms = ['inherit', 'delete', 'control'] class Circle(Model): @@ -232,6 +259,11 @@ class Task(models.Model): owner_perms = ['inherit', 'change', 'delete', 'control'] +class ModelTask(Model, Task): + class Meta(Model.Meta): + pass + + class Project(Model): description = models.CharField(max_length=255, null=True, blank=False) team = models.ManyToManyField(settings.AUTH_USER_MODEL, blank=True, related_name='projects') @@ -267,6 +299,16 @@ class MyAbstractModel(Model): rdf_type = "wow:defaultrdftype" -@receiver(post_save, sender=User) -def update_perms(sender, instance, created, **kwargs): - LDPPermissions.invalidate_cache() +class NoSuperUsersAllowedModel(Model): + class Meta(Model.Meta): + anonymous_perms = [] + authenticated_perms = [] + owner_perms = [] + superuser_perms = [] + permission_classes = [LDPPermissions] + + +class ComplexPermissionClassesModel(Model): + class Meta(Model.Meta): + permission_classes = [LDPPermissions, SuperUserPermission] + superuser_perms = [] diff --git a/djangoldp/tests/permissions.py b/djangoldp/tests/permissions.py deleted file mode 100644 index f8ec6d3cc4eff41cd52647cd47d422ea2ece227f..0000000000000000000000000000000000000000 --- a/djangoldp/tests/permissions.py +++ /dev/null @@ -1,35 +0,0 @@ -from django.db.models import QuerySet -from django.db.models.base import ModelBase - -from djangoldp.permissions import LDPPermissions - - -class HalfRandomPermissions(LDPPermissions): - - def prefilter_query_set(self, query_set: QuerySet, request, view, model) -> QuerySet: - if request.user.is_anonymous: - return query_set.filter(pk__in=[2, 4, 6, 8]) - else: - return super().prefilter_query_set(query_set, request, view, model) - - def user_permissions(self, user, obj_or_model, obj=None): - if isinstance(obj_or_model, ModelBase): - model = obj_or_model - else: - obj = obj_or_model - model = obj_or_model.__class__ - - # perms_cache_key = self.cache_key(model, obj, user) - # if self.with_cache and perms_cache_key in self.perms_cache: - # return self.perms_cache[perms_cache_key] - - # start with the permissions set on the object and model - perms = set(super().user_permissions(user, obj_or_model, obj)) - - if obj is not None and not isinstance(obj, ModelBase) and user.is_anonymous: - if obj.pk % 2 == 0: - return ['add', 'view'] - else: - return [] - else: - return ['view'] diff --git a/djangoldp/tests/runner.py b/djangoldp/tests/runner.py index c4cb67afef7c01f011c5e94a941ab224737a3a3f..5b41fc2bfd8b1412db5a954cfcb0aff093e95c47 100644 --- a/djangoldp/tests/runner.py +++ b/djangoldp/tests/runner.py @@ -19,11 +19,12 @@ test_runner = DiscoverRunner(verbosity=1) failures = test_runner.run_tests([ 'djangoldp.tests.tests_settings', 'djangoldp.tests.tests_ldp_model', + 'djangoldp.tests.tests_model_serializer', 'djangoldp.tests.tests_ldp_viewset', - 'djangoldp.tests.tests_save', 'djangoldp.tests.tests_user_permissions', 'djangoldp.tests.tests_guardian', 'djangoldp.tests.tests_anonymous_permissions', + 'djangoldp.tests.tests_post', 'djangoldp.tests.tests_update', 'djangoldp.tests.tests_auto_author', 'djangoldp.tests.tests_get', diff --git a/djangoldp/tests/settings_default.py b/djangoldp/tests/settings_default.py index ceacf534a73b4b57aef7847a546640e21d1400d8..e152d4d6587d9c936c9b14fbaff0ee78bf779dc2 100644 --- a/djangoldp/tests/settings_default.py +++ b/djangoldp/tests/settings_default.py @@ -23,5 +23,4 @@ server: SEND_BACKLINKS: false GUARDIAN_AUTO_PREFETCH: true SERIALIZER_CACHE: false - PERMISSIONS_CACHE: false """ diff --git a/djangoldp/tests/tests_anonymous_permissions.py b/djangoldp/tests/tests_anonymous_permissions.py index adfb255ec8da827528b4848f412b3b95c8e97f24..8fff2f19649307ae3df7faac43faff1a2df122a5 100644 --- a/djangoldp/tests/tests_anonymous_permissions.py +++ b/djangoldp/tests/tests_anonymous_permissions.py @@ -3,9 +3,7 @@ import json from django.test import TestCase from rest_framework.test import APIClient -from djangoldp.permissions import LDPPermissions from djangoldp.tests.models import JobOffer -from djangoldp.views import LDPViewSet class TestAnonymousUserPermissions(TestCase): @@ -26,6 +24,8 @@ class TestAnonymousUserPermissions(TestCase): response = self.client.post('/job-offers/', data=json.dumps(post), content_type='application/ld+json') self.assertEqual(response.status_code, 403) + # TODO: test POST request for anonymous user where it's allowed + def test_put_request_for_anonymousUser(self): body = {'title':"job_updated"} response = self.client.put('/job-offers/{}/'.format(self.job.pk), data=json.dumps(body), diff --git a/djangoldp/tests/tests_get.py b/djangoldp/tests/tests_get.py index 6f162acb91a3ecefb4e943acbf8cd75229a86070..7f705e0f3e945aa02d9204778e2baab45b45e39c 100644 --- a/djangoldp/tests/tests_get.py +++ b/djangoldp/tests/tests_get.py @@ -1,6 +1,6 @@ -from djangoldp.serializers import LDListMixin, LDPSerializer -from django.contrib.auth import get_user_model from datetime import datetime +from django.contrib.auth import get_user_model +from djangoldp.serializers import LDListMixin, LDPSerializer from rest_framework.test import APIRequestFactory, APIClient, APITestCase from djangoldp.tests.models import Post, Invoice, JobOffer, Skill, Batch, DateModel, Circle, CircleMember, UserProfile @@ -44,11 +44,10 @@ class TestGET(APITestCase): Post.objects.create(content="federated", urlid="https://external.com/posts/1/") response = self.client.get('/posts/', content_type='application/ld+json') self.assertEqual(response.status_code, 200) - self.assertIn('permissions', response.data) self.assertEquals(1, len(response.data['ldp:contains'])) self.assertIn('@type', response.data) self.assertIn('@type', response.data['ldp:contains'][0]) - self.assertEquals(2, len(response.data['permissions'])) # read and add + self.assertEquals(4, len(response.data['permissions'])) # configured anonymous permissions to give all Invoice.objects.create(title="content") response = self.client.get('/invoices/', content_type='application/ld+json') @@ -116,6 +115,17 @@ class TestGET(APITestCase): self.assertEquals(response.data['ldp:contains'][0]['invoice']['@id'], invoice.urlid) self.assertEqual(response.data['ldp:contains'][1]['@id'], distant_batch.urlid) + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/335 + # test getting a route with multiple nested fields (/job-offers/X/skills/Y/) + '''def test_get_twice_nested(self): + job = JobOffer.objects.create(title="job", slug="slug1") + skill = Skill.objects.create(title='old', obligatoire='old', slug='skill1') + job.skills.add(skill) + self.assertEqual(job.skills.count(), 1) + + response = self.client.get('/job-offers/{}/skills/{}/'.format(job.slug, skill.slug)) + self.assertEqual(response.status_code, 200)''' + def test_serializer_excludes(self): date = DateModel.objects.create(excluded='test', value=datetime.now()) response = self.client.get('/dates/{}/'.format(date.pk), content_type='application/ld+json') diff --git a/djangoldp/tests/tests_guardian.py b/djangoldp/tests/tests_guardian.py index 79a770fd154a232739c92555e113d4c430dd6be9..027423ef579811881b226d343a605b9ce3c4bb0d 100644 --- a/djangoldp/tests/tests_guardian.py +++ b/djangoldp/tests/tests_guardian.py @@ -1,6 +1,7 @@ import json import uuid from django.contrib.auth import get_user_model +from django.contrib.auth.models import Group from djangoldp.serializers import LDListMixin, LDPSerializer from rest_framework.test import APIClient, APITestCase from guardian.shortcuts import assign_perm @@ -13,58 +14,117 @@ class TestsGuardian(APITestCase): def setUp(self): self.client = APIClient(enforce_csrf_checks=True) - LDPPermissions.invalidate_cache() LDListMixin.to_representation_cache.reset() LDPSerializer.to_representation_cache.reset() def setUpLoggedInUser(self): self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion') + self.group = Group.objects.create(name='Test') + self.user.groups.add(self.group) + self.user.save() self.client.force_authenticate(user=self.user) - LDPPermissions.invalidate_cache() LDListMixin.to_representation_cache.reset() LDPSerializer.to_representation_cache.reset() - def _get_dummy_with_perms(self, perms=None, parent=None): + def _get_dummy_with_perms(self, perms=None, parent=None, group=False): if perms is None: perms = [] dummy = PermissionlessDummy.objects.create(some='test', slug=uuid.uuid4(), parent=parent) model_name = PermissionlessDummy._meta.model_name for perm in perms: - assign_perm(perm + '_' + model_name, self.user, dummy) + perm = perm + '_' + model_name + if group: + assign_perm(perm, self.group, dummy) + else: + assign_perm(perm, self.user, dummy) return dummy # optional setup for testing PermissionlessDummy model with parameterised perms - def setUpGuardianDummyWithPerms(self, perms=None, parent=None): - self.dummy = self._get_dummy_with_perms(perms, parent) + def setUpGuardianDummyWithPerms(self, perms=None, parent=None, group=False): + self.dummy = self._get_dummy_with_perms(perms, parent, group) + + # auxiliary function converts permission format for test + def _unpack_permissions(self, perms_from_response): + return [p['mode']['@type'] for p in perms_from_response] # test that dummy with no permissions set returns no results def test_get_dummy_no_permissions(self): self.setUpLoggedInUser() self.setUpGuardianDummyWithPerms() response = self.client.get('/permissionless-dummys/{}/'.format(self.dummy.slug)) - self.assertEqual(response.status_code, 403) + self.assertEqual(response.status_code, 404) # test with anonymous user def test_get_dummy_anonymous_user(self): self.setUpGuardianDummyWithPerms() response = self.client.get('/permissionless-dummys/') + # I have no object permissions - I should receive a 403 self.assertEqual(response.status_code, 403) + def test_list_dummy_exception(self): + self.setUpLoggedInUser() + # I have permission on a permissionless dummy, but not in general + dummy_a = self._get_dummy_with_perms() + dummy_b = self._get_dummy_with_perms(['view']) + response = self.client.get('/permissionless-dummys/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + containees = [d['@id'] for d in response.data['ldp:contains']] + self.assertNotIn(dummy_a.urlid, containees) + self.assertIn(dummy_b.urlid, containees) + + def test_list_dummy_group_exception(self): + self.setUpLoggedInUser() + dummy_a = self._get_dummy_with_perms() + dummy_b = self._get_dummy_with_perms(['view'], group=True) + response = self.client.get('/permissionless-dummys/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + containees = [d['@id'] for d in response.data['ldp:contains']] + self.assertNotIn(dummy_a.urlid, containees) + self.assertIn(dummy_b.urlid, containees) + + def test_list_dummy_exception_nested_view(self): + self.setUpLoggedInUser() + parent = LDPDummy.objects.create(some="test") + # two dummies, one I have permission to view and one I don't + dummy_a = self._get_dummy_with_perms(parent=parent) + dummy_b = self._get_dummy_with_perms(['view'], parent) + response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + + def test_list_dummy_exception_nested_serializer(self): + self.setUpLoggedInUser() + parent = LDPDummy.objects.create(some="test") + # two dummies, one I have permission to view and one I don't + dummy_a = self._get_dummy_with_perms(parent=parent) + dummy_b = self._get_dummy_with_perms(['view'], parent) + response = self.client.get('/ldpdummys/{}/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['anons']['ldp:contains']), 1) + def test_get_dummy_permission_granted(self): self.setUpLoggedInUser() self.setUpGuardianDummyWithPerms(['view']) response = self.client.get('/permissionless-dummys/{}/'.format(self.dummy.slug)) self.assertEqual(response.status_code, 200) + def test_get_dummy_group_permission_granted(self): + self.setUpLoggedInUser() + self.setUpGuardianDummyWithPerms(['view'], group=True) + response = self.client.get('/permissionless-dummys/{}/'.format(self.dummy.slug)) + self.assertEqual(response.status_code, 200) + def test_get_dummy_permission_rejected(self): self.setUpLoggedInUser() self.setUpGuardianDummyWithPerms(['view']) dummy_without = PermissionlessDummy.objects.create(some='test2', slug='test2') response = self.client.get('/permissionless-dummys/{}/'.format(dummy_without.slug)) - self.assertEqual(response.status_code, 403) + self.assertEqual(response.status_code, 404) def test_patch_dummy_permission_granted(self): self.setUpLoggedInUser() @@ -81,16 +141,17 @@ class TestsGuardian(APITestCase): body = {'some': "some_new"} response = self.client.patch('/permissionless-dummys/{}/'.format(dummy_without.slug), data=json.dumps(body), content_type='application/ld+json') - self.assertEqual(response.status_code, 403) + self.assertEqual(response.status_code, 404) + + # TODO: PUT container of many objects approved on specific resource for which I do not have _model_ permissions # test that custom permissions are returned on a model def test_custom_permissions(self): self.setUpLoggedInUser() - self.setUpGuardianDummyWithPerms(['custom_permission']) + self.setUpGuardianDummyWithPerms(['custom_permission', 'view']) - permissions = LDPPermissions() - result = permissions.user_permissions(self.user, self.dummy) - self.assertIn('custom_permission', result) + response = self.client.get('/permissionless-dummys/{}/'.format(self.dummy.slug)) + self.assertIn('custom_permission', self._unpack_permissions(response.data['permissions'])) # test that duplicate permissions aren't returned def test_no_duplicate_permissions(self): @@ -100,39 +161,11 @@ class TestsGuardian(APITestCase): assign_perm('view_' + model_name, self.user, dummy) - permissions = LDPPermissions() - result = permissions.user_permissions(self.user, dummy) - self.assertEqual(result.count('view'), 1) - - # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/297 - '''def test_list_dummy_exception(self): - self.setUpLoggedInUser() - # I have permission on a permissionless dummy, but not in general - dummy_a = self._get_dummy_with_perms() - dummy_b = self._get_dummy_with_perms(['view']) - response = self.client.get('/permissionless-dummys/') + response = self.client.get('/dummys/{}/'.format(dummy.slug)) self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['ldp:contains']), 1) - self.assertNotIn(response.data['ldp:contains'], dummy_a.urlid) - self.assertIn(response.data['ldp:contains'], dummy_b.urlid)''' - - def test_list_dummy_exception_nested_view(self): - self.setUpLoggedInUser() - parent = LDPDummy.objects.create(some="test") - # two dummies, one I have permission to view and one I don't - dummy_a = self._get_dummy_with_perms(parent=parent) - dummy_b = self._get_dummy_with_perms(['view'], parent) - response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk)) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['ldp:contains']), 1) - - def test_list_dummy_exception_nested_serializer(self): - self.setUpLoggedInUser() - parent = LDPDummy.objects.create(some="test") - # two dummies, one I have permission to view and one I don't - dummy_a = self._get_dummy_with_perms(parent=parent) - dummy_b = self._get_dummy_with_perms(['view'], parent) - response = self.client.get('/ldpdummys/{}/'.format(parent.pk)) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['anons']['ldp:contains']), 1) + perms = self._unpack_permissions(response.data['permissions']) + self.assertIn('view', perms) + view_perms = [perm for perm in perms if perm == 'view'] + self.assertEqual(len(view_perms), 1) + # TODO: attempting to migrate my object permissions by changing FK reference diff --git a/djangoldp/tests/tests_model_serializer.py b/djangoldp/tests/tests_model_serializer.py new file mode 100644 index 0000000000000000000000000000000000000000..e5871d1c8800a4a34ef40ebee9cd72967ec10418 --- /dev/null +++ b/djangoldp/tests/tests_model_serializer.py @@ -0,0 +1,820 @@ +import uuid +from django.conf import settings +from django.contrib.auth import get_user_model +from django.test import TestCase, override_settings +from rest_framework.test import APIRequestFactory, APIClient + +from djangoldp.serializers import LDPSerializer, LDListMixin +from djangoldp.tests.models import Post, UserProfile, Resource, Circle, CircleMember, Invoice, Batch, Task, ModelTask +from djangoldp.tests.models import Skill, JobOffer, Conversation, Message, Project + + +class LDPModelSerializerTestCase(TestCase): + def setUp(self): + self.factory = APIRequestFactory() + self.client = APIClient() + self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com', + password='glass onion') + self.client.force_authenticate(user=self.user) + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() + + def _get_serializer_class(self, model, depth, fields): + meta_args = {'model': model, 'depth': depth, 'fields': fields} + + meta_class = type('Meta', (), meta_args) + return type(LDPSerializer)('TestSerializer', (LDPSerializer,), {'Meta': meta_class}) + + def test_update_container_new_resource_replace(self): + # 2 pre-existing skills, one will be replaced and the other updated + redundant_skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1") + pre_existing_skill = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug2") + job = JobOffer.objects.create(title="job test") + job.skills.add(redundant_skill) + job.skills.add(pre_existing_skill) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "title": "job test updated", + "skills": { + "ldp:contains": [ + {"title": "new skill", "obligatoire": "okay"}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill.slug), "title": "z"}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + self.assertEquals(result.title, "job test updated") + self.assertIs(result.skills.count(), 2) + skills = result.skills.all().order_by("title") + self.assertEquals(skills[0].title, "new skill") + self.assertEquals(skills[0].obligatoire, "okay") + self.assertEquals(skills[1].title, "z") # updated + self.assertEquals(skills[1].obligatoire, pre_existing_skill.obligatoire) + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/326 + ''' + def test_update_container_edit_and_new_resource_append(self): + pre_existing_skill_a = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug1") + pre_existing_skill_b = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug2") + job = JobOffer.objects.create(title="job test") + job.skills.add(pre_existing_skill_a) + job.skills.add(pre_existing_skill_b) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"title": "new skill", "obligatoire": "okay"}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill_b.slug), "title": "z"}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save(partial=True) + + self.assertEquals(result.title, job.title) + self.assertIs(result.skills.count(), 3) + skills = result.skills.all().order_by('title') + self.assertEquals(skills[0].title, "new skill") # new skill + self.assertEquals(skills[1].title, pre_existing_skill_a.title) # old skill unchanged + self.assertEquals(skills[2].title, "z") # updated + self.assertEquals(skills[2].obligatoire, pre_existing_skill_b.obligatoire) # another field not updated + ''' + + def test_update_container_edit_and_new_external_resources(self): + job = JobOffer.objects.create(title="job test") + pre_existing_external = Skill.objects.create(title="to keep", obligatoire="obligatoire", + urlid="https://external.com/skills/2/") + job.skills.add(pre_existing_external) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "https://external.com/skills/1/", "title": "external skill", "obligatoire": "okay"}, + {"@id": "https://external.com/skills/2/", "title": "to keep", "obligatoire": "okay"}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + skills = result.skills.all().order_by('urlid') + self.assertEquals(result.title, job.title) + self.assertEqual(result.pk, job.pk) + self.assertEqual(result.urlid, job.urlid) + self.assertIs(result.skills.count(), 2) + self.assertEquals(skills[0].title, "external skill") # new skill + self.assertEquals(skills[0].urlid, "https://external.com/skills/1/") # new skill + self.assertEquals(skills[0].obligatoire, "okay") + self.assertEquals(skills[1].title, pre_existing_external.title) # old skill unchanged + self.assertEquals(skills[1].urlid, pre_existing_external.urlid) + self.assertEquals(skills[1].obligatoire, "okay") + self.assertEquals(skills[1].pk, pre_existing_external.pk) + + def test_update_container_attach_existing_resource(self): + job = JobOffer.objects.create(title="job test") + another_job = JobOffer.objects.create(title="job2") + pre_existing_skill = Skill.objects.create(title="to keep", obligatoire="obligatoire") + another_job.skills.add(pre_existing_skill) + + self.assertIs(job.skills.count(), 0) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill.slug)}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + skills = result.skills.all().order_by('urlid') + self.assertEquals(result.title, job.title) + self.assertEqual(result.pk, job.pk) + self.assertEqual(result.urlid, job.urlid) + self.assertIs(result.skills.count(), 1) + self.assertEquals(skills[0].urlid, pre_existing_skill.urlid) + self.assertIs(another_job.skills.count(), 1) + self.assertIs(Skill.objects.count(), 1) + + def test_update_container_attach_existing_resource_external(self): + job = JobOffer.objects.create(title="job test") + another_job = JobOffer.objects.create(title="job2") + pre_existing_external = Skill.objects.create(title="to keep", obligatoire="obligatoire", + urlid="https://external.com/skills/2/") + another_job.skills.add(pre_existing_external) + + self.assertIs(job.skills.count(), 0) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": pre_existing_external.urlid}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + skills = result.skills.all().order_by('urlid') + self.assertEquals(result.title, job.title) + self.assertEqual(result.pk, job.pk) + self.assertEqual(result.urlid, job.urlid) + self.assertIs(result.skills.count(), 1) + self.assertEquals(skills[0].urlid, pre_existing_external.urlid) + self.assertIs(another_job.skills.count(), 1) + self.assertIs(Skill.objects.count(), 1) + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/344 + def test_update_container_mismatched_type_urlid(self): + job = JobOffer.objects.create(title="job test") + another_job = JobOffer.objects.create(title="job2") + + # contains internal urlid which refers to a different type of object entirely, and one which refers to container + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, another_job.slug)}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/345 + ''' + def test_update_container_mismatched_type_urlid_2(self): + job = JobOffer.objects.create(title="job test") + + # contains internal urlid which refers to a container + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/".format(settings.BASE_URL)}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + # TODO: assert correct error is thrown + ''' + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/344 + def test_update_container_mismatched_type_urlid_external(self): + job = JobOffer.objects.create(title="job test") + + # contains external mismatched urlids which refers to a container + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "https://external.com/skills/"}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/346 + '''def test_update_container_attach_nonexistent_local_resource(self): + job = JobOffer.objects.create(title="job test") + + self.assertEqual(JobOffer.objects.count(), 1) + self.assertEqual(job.skills.count(), 0) + self.assertEqual(Skill.objects.count(), 0) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/404/".format(settings.BASE_URL)}, + ]} + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=post, instance=job) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + self.assertEqual(JobOffer.objects.count(), 1) + self.assertEqual(job.skills.count(), 0) + self.assertEqual(Skill.objects.count(), 0)''' + + # CircleMember is different to Skill because it represents a many-to-many relationship via a through model + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/333 + '''def test_update_m2m_relationship_with_through_model_add_and_edit(self): + circle = Circle.objects.create(name='test') + pre_existing = CircleMember.objects.create(user=self.user, circle=circle, is_admin=False) + another_user = get_user_model().objects.create_user(username='u2', email='u2@b.com', password='pw') + + post = { + "@id": "{}/circles/{}/".format(settings.BASE_URL, circle.pk), + "name": "Updated Name", + "members": { + "ldp:contains": [ + {"@id": "{}/circle-members/{}/".format(settings.BASE_URL, pre_existing.pk), "is_admin": True}, + {"user": {"@id": another_user.urlid }, "is_admin": False}, + ] + } + } + + serializer_class = self._get_serializer_class(Circle, 2, ("@id", "name", "description", "members", "team")) + serializer = serializer_class(data=post, instance=circle) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + self.assertEquals(result.name, circle.name) + self.assertEqual(result.pk, circle.pk) + self.assertEqual(result.urlid, circle.urlid) + self.assertIs(result.members.count(), 2) + self.assertIs(result.team.count(), 2) + + members = result.members.all().order_by('pk') + self.assertEqual(members[0].user, self.user) + self.assertEqual(members[0].urlid, pre_existing.urlid) + self.assertEqual(members[0].pk, pre_existing.pk) + self.assertEqual(members[0].is_admin, True) + self.assertEqual(members[1].user, another_user) + self.assertEqual(members[1].is_admin, False) + + # TODO: variation on the above using external resources + def test_update_m2m_relationship_with_through_model_add_and_edit_external_resources(self): + pass + + # NOTE: this test if failing due to missing the 'invoice_id' field (see #333) + # variation of this test exists in tests_update.py with different behaviour + def test_update_container_twice_nested(self): + invoice = Invoice.objects.create(title='test') + pre_existing_batch = Batch.objects.create(title='batch1', invoice=invoice) + pre_existing_task = ModelTask.objects.create(title='task1', batch=pre_existing_batch) + + post = { + "@id": "{}/invoices/{}/".format(settings.BASE_URL, invoice.pk), + "title": "new", + "batches": [ + { + "@id": "{}/batchs/{}/".format(settings.BASE_URL, pre_existing_batch.pk), + "title": "new", + "tasks": [ + { + "@id": "{}/modeltasks/{}/".format(settings.BASE_URL, pre_existing_task.pk), + "title": "new" + }, + { + "title": "tache 2" + } + ] + }, + { + "title": "z", + } + ] + } + + serializer_class = self._get_serializer_class(Invoice, 2, ("@id", "title", "batches")) + serializer = serializer_class(data=post, instance=invoice) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + self.assertEquals(result.title, "new") + self.assertEquals(result.urlid, invoice.urlid) + self.assertEquals(result.pk, invoice.pk) + + self.assertIs(result.batches.count(), 2) + batches = result.batches.all().order_by('title') + self.assertEquals(batches[0].title, "new") + self.assertEquals(batches[0].urlid, pre_existing_batch.urlid) + self.assertEquals(batches[1].title, "z") + + self.assertIs(batches[0].tasks.count(), 2) + tasks = batches[0].tasks.all().order_by('title') + self.assertEquals(tasks[0].title, "new") + self.assertEquals(tasks[0].urlid, pre_existing_task.urlid) + self.assertEquals(tasks[1].title, "tache 2") + + # variation on the above test with external resources + def test_update_container_twice_nested_external_resources(self): + invoice = Invoice.objects.create(urlid='https://external.com/invoices/1/', title='test') + pre_existing_batch = Batch.objects.create(urlid='https://external.com/batchs/1/', title='batch1', invoice=invoice) + pre_existing_task = ModelTask.objects.create(urlid='https://external.com/tasks/1/', title='task1', batch=pre_existing_batch) + + post = { + "@id": invoice.urlid, + "title": "new", + "batches": [ + { + "@id": pre_existing_batch.urlid, + "title": "new", + "tasks": [ + { + "@id": pre_existing_task.urlid, + "title": "new" + }, + { + "@id": "https://anotherexternal.com/tasks/1/", + "title": "tache 2" + } + ] + }, + { + "@id": "https://yetanotherexternal.com/batchs/1/", + "title": "z" + } + ] + } + + serializer_class = self._get_serializer_class(Invoice, 2, ("@id", "title", "batches")) + serializer = serializer_class(data=post, instance=invoice) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + self.assertEquals(result.title, "new") + self.assertEquals(result.urlid, invoice.urlid) + self.assertEquals(result.pk, invoice.pk) + + self.assertIs(result.batches.count(), 2) + batches = result.batches.all().order_by('title') + self.assertEquals(batches[0].title, "new") + self.assertEquals(batches[0].urlid, pre_existing_batch.urlid) + self.assertEquals(batches[1].title, "z") + + self.assertIs(batches[0].tasks.count(), 2) + tasks = batches[0].tasks.all().order_by('title') + self.assertEquals(tasks[0].title, "new") + self.assertEquals(tasks[0].urlid, pre_existing_task.urlid) + self.assertEquals(tasks[1].title, "tache 2")''' + + # variation on the test where a field is omitted on each level (no changes are made) + def test_update_container_twice_nested_no_changes_missing_fields(self): + invoice = Invoice.objects.create(title='test') + pre_existing_batch = Batch.objects.create(title='batch1', invoice=invoice) + pre_existing_task = ModelTask.objects.create(title='task1', batch=pre_existing_batch) + + post = { + "@id": "{}/invoices/{}/".format(settings.BASE_URL, invoice.pk), + "batches": [ + { + "@id": "{}/batchs/{}/".format(settings.BASE_URL, pre_existing_batch.pk), + "tasks": [ + { + "@id": "{}/tasks/{}/".format(settings.BASE_URL, pre_existing_task.pk), + } + ] + } + ] + } + + serializer_class = self._get_serializer_class(Invoice, 2, ("@id", "title", "batches")) + serializer = serializer_class(data=post, instance=invoice) + serializer.is_valid(raise_exception=True) + result = serializer.save(partial=True) + + self.assertEquals(result.title, invoice.title) + self.assertEquals(result.urlid, invoice.urlid) + self.assertEquals(result.pk, invoice.pk) + + self.assertIs(result.batches.count(), 1) + batches = result.batches.all() + self.assertEquals(batches[0].title, pre_existing_batch.title) + self.assertEquals(batches[0].urlid, pre_existing_batch.urlid) + + self.assertIs(batches[0].tasks.count(), 1) + tasks = batches[0].tasks.all() + self.assertEquals(tasks[0].title, pre_existing_task.title) + + def test_update_graph_edit_and_new_resource(self): + redundant_skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1") + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2") + skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3") + job1 = JobOffer.objects.create(title="job test", slug="slug4") + job1.skills.add(redundant_skill) + + job = {"@graph": + [ + { + "@id": "{}/job-offers/{}/".format(settings.BASE_URL, job1.slug), + "title": "job test updated", + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug)}, + {"@id": "_.123"}, + ]} + }, + { + "@id": "_.123", + "title": "new skill", + "obligatoire": "okay" + }, + { + "@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug), + }, + { + "@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), + "title": "skill2 UP" + } + ] + } + + serializer_class = self._get_serializer_class(JobOffer, 2, ("@id", "title", "skills")) + serializer = serializer_class(data=job, instance=job1) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + skills = result.skills.all().order_by('title') + + self.assertEquals(result.title, "job test updated") + self.assertIs(result.skills.count(), 3) + self.assertEquals(skills[0].title, "new skill") # new skill + self.assertEquals(skills[1].title, "skill1") # no change + self.assertEquals(skills[2].title, "skill2 UP") # title updated + + def test_update_graph_2(self): + skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug") + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") + skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2") + job1 = JobOffer.objects.create(title="job test", slug="slug1") + job1.skills.add(skill) + + job = {"@graph": + [ + { + "@id": "{}/job-offers/{}/".format(settings.BASE_URL, job1.slug), + "title": "job test updated", + "skills": { + "@id": "{}/job-offers/{}/skills/".format(settings.BASE_URL, job1.slug) + } + }, + { + "@id": "_.123", + "title": "new skill", + "obligatoire": "okay" + }, + { + "@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug), + }, + { + "@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), + "title": "skill2 UP" + }, + { + '@id': "{}/job-offers/{}/skills/".format(settings.BASE_URL, job1.slug), + "ldp:contains": [ + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug)}, + {"@id": "_.123"}, + ] + } + ] + } + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job, instance=job1) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + skills = result.skills.all().order_by('title') + + self.assertEquals(result.title, "job test updated") + self.assertIs(result.skills.count(), 3) + self.assertEquals(skills[0].title, "new skill") # new skill + self.assertEquals(skills[1].title, "skill1") # no change + self.assertEquals(skills[2].title, "skill2 UP") # title updated + self.assertEquals(skill, skill._meta.model.objects.get(pk=skill.pk)) # title updated + + def test_update_list_with_reverse_relation(self): + user1 = get_user_model().objects.create() + conversation = Conversation.objects.create(description="Conversation 1", author_user=user1) + message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1) + message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1) + + json = {"@graph": [ + { + "@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk), + "text": "Message 1 UP" + }, + { + "@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk), + "text": "Message 2 UP" + }, + { + '@id': "{}/conversations/{}/".format(settings.BASE_URL, conversation.pk), + 'description': "Conversation 1 UP", + "message_set": [ + {"@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk)}, + {"@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk)}, + ] + } + ] + } + + meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=json, instance=conversation) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + messages = result.message_set.all().order_by('text') + + self.assertEquals(result.description, "Conversation 1 UP") + self.assertIs(result.message_set.count(), 2) + self.assertEquals(messages[0].text, "Message 1 UP") + self.assertEquals(messages[1].text, "Message 2 UP") + + def test_add_new_element_with_foreign_key_id(self): + user1 = get_user_model().objects.create() + conversation = Conversation.objects.create(description="Conversation 1", author_user=user1) + message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1) + message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1) + + json = {"@graph": [ + { + "@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk), + "text": "Message 1 UP", + "author_user": { + '@id': "{}/users/{}/".format(settings.BASE_URL, user1.pk) + } + }, + { + "@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk), + "text": "Message 2 UP", + "author_user": { + '@id': user1.urlid + } + }, + { + "@id": "_:b1", + "text": "Message 3 NEW", + "author_user": { + '@id': user1.urlid + } + }, + { + '@id': "{}/conversations/{}/".format(settings.BASE_URL, conversation.pk), + "author_user": { + '@id': user1.urlid + }, + 'description': "Conversation 1 UP", + 'message_set': { + "@id": "{}/conversations/{}/message_set/".format(settings.BASE_URL, conversation.pk) + } + }, + { + '@id': "{}/conversations/{}/message_set/".format(settings.BASE_URL, conversation.pk), + "ldp:contains": [ + {"@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk)}, + {"@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk)}, + {"@id": "_:b1"} + ] + } + ] + } + + meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=json, instance=conversation) + serializer.is_valid(raise_exception=True) + result = serializer.save() + + messages = result.message_set.all().order_by('text') + + self.assertEquals(result.description, "Conversation 1 UP") + self.assertIs(result.message_set.count(), 3) + self.assertEquals(messages[0].text, "Message 1 UP") + self.assertEquals(messages[1].text, "Message 2 UP") + self.assertEquals(messages[2].text, "Message 3 NEW") + + # TODO: variation on https://git.startinblox.com/djangoldp-packages/djangoldp/issues/344 + '''def test_update_container_invalid_fk_reference_given(self): + pass''' + + def test_save_m2m_graph_with_many_nested(self): + invoice = { + "@graph": [ + { + "@id": "./", + "batches": {"@id": "_:b381"}, + "title": "Nouvelle facture", + "date": "" + }, + { + "@id": "_:b381", + "tasks": {"@id": "_:b382"}, + "title": "Batch 1" + }, + { + "@id": "_:b382", + "title": "Tache 1" + } + ] + } + + meta_args = {'model': Invoice, 'depth': 2, 'fields': ("@id", "title", "batches", "date")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('InvoiceSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=invoice) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "Nouvelle facture") + self.assertIs(result.batches.count(), 1) + self.assertEquals(result.batches.all()[0].title, "Batch 1") + self.assertIs(result.batches.all()[0].tasks.count(), 1) + self.assertEquals(result.batches.all()[0].tasks.all()[0].title, "Tache 1") + + def test_save_m2m(self): + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") + skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2") + + job = {"title": "job test", + "slug": "slug1", + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), "title": "skill2 UP"}, + {"title": "skill3", "obligatoire": "obligatoire", "slug": "slug3"}, + ]} + } + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "job test") + self.assertIs(result.skills.count(), 3) + self.assertEquals(result.skills.all()[0].title, "skill1") # no change + self.assertEquals(result.skills.all()[1].title, "skill2 UP") # title updated + self.assertEquals(result.skills.all()[2].title, "skill3") # creation on the fly + + # variation switching the http prefix of the BASE_URL in the request + @override_settings(BASE_URL='http://happy-dev.fr/') + def test_save_m2m_switch_base_url_prefix(self): + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") + + job = {"title": "job test", + "slug": "slug1", + "skills": { + "ldp:contains": [ + {"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)}, + ]} + } + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "job test") + self.assertIs(result.skills.count(), 1) + self.assertEquals(result.skills.all()[0].title, "skill1") # no change + + def test_save_m2m_graph_simple(self): + job = {"@graph": [ + {"title": "job test", "slug": "slugjob", + }, + ]} + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "job test") + self.assertIs(result.skills.count(), 0) + + def test_save_m2m_graph_with_nested(self): + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a") + skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b") + + job = {"@graph": [ + {"title": "job test", + "slug": "slugj", + "skills": {"@id": "_.123"} + }, + {"@id": "_.123", "title": "skill3 NEW", "obligatoire": "obligatoire", "slug": "skill3"}, + ]} + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "job test") + self.assertIs(result.skills.count(), 1) + self.assertEquals(result.skills.all()[0].title, "skill3 NEW") # creation on the fly + + def test_save_without_nested_fields(self): + skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a") + skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b") + job = {"title": "job test", "slug": "c"} + + meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=job) + serializer.is_valid() + result = serializer.save() + + self.assertEquals(result.title, "job test") + self.assertIs(result.skills.count(), 0) + + def test_save_on_sub_iri(self): + """ + POST /job-offers/1/skills/ + """ + job = JobOffer.objects.create(title="job test") + skill = {"title": "new SKILL"} + + meta_args = {'model': Skill, 'depth': 2, 'fields': ("@id", "title")} + + meta_class = type('Meta', (), meta_args) + serializer_class = type(LDPSerializer)('SkillSerializer', (LDPSerializer,), {'Meta': meta_class}) + serializer = serializer_class(data=skill) + serializer.is_valid() + kwargs = {} + kwargs['joboffer'] = job + result = serializer.save(**kwargs) + + self.assertEquals(result.title, "new SKILL") + self.assertIs(result.joboffer_set.count(), 1) + self.assertEquals(result.joboffer_set.get(), job) + self.assertIs(result.joboffer_set.get().skills.count(), 1) diff --git a/djangoldp/tests/tests_save.py b/djangoldp/tests/tests_post.py similarity index 50% rename from djangoldp/tests/tests_save.py rename to djangoldp/tests/tests_post.py index b9973266b3bdf53c03ce821a12de1eaa204e8e3b..4e0322d155bd313ade475aa3db6ddfbce4890fe7 100644 --- a/djangoldp/tests/tests_save.py +++ b/djangoldp/tests/tests_post.py @@ -10,7 +10,7 @@ from djangoldp.tests.models import Skill, JobOffer, Invoice, LDPDummy, Resource, UserProfile, NotificationSetting -class Save(TestCase): +class PostTestCase(TestCase): def setUp(self): self.factory = APIRequestFactory() @@ -21,176 +21,6 @@ class Save(TestCase): LDListMixin.to_representation_cache.reset() LDPSerializer.to_representation_cache.reset() - def tearDown(self): - pass - - def test_save_m2m_graph_with_many_nested(self): - invoice = { - "@graph": [ - { - "@id": "./", - "batches": {"@id": "_:b381"}, - "title": "Nouvelle facture", - "date": "" - }, - { - "@id": "_:b381", - "tasks": {"@id": "_:b382"}, - "title": "Batch 1" - }, - { - "@id": "_:b382", - "title": "Tache 1" - } - ] - } - - meta_args = {'model': Invoice, 'depth': 2, 'fields': ("@id", "title", "batches", "date")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('InvoiceSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=invoice) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "Nouvelle facture") - self.assertIs(result.batches.count(), 1) - self.assertEquals(result.batches.all()[0].title, "Batch 1") - self.assertIs(result.batches.all()[0].tasks.count(), 1) - self.assertEquals(result.batches.all()[0].tasks.all()[0].title, "Tache 1") - - def test_save_m2m(self): - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2") - - job = {"title": "job test", - "slug": "slug1", - "skills": { - "ldp:contains": [ - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), "title": "skill2 UP"}, - {"title": "skill3", "obligatoire": "obligatoire", "slug": "slug3"}, - ]} - } - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test") - self.assertIs(result.skills.count(), 3) - self.assertEquals(result.skills.all()[0].title, "skill1") # no change - self.assertEquals(result.skills.all()[1].title, "skill2 UP") # title updated - self.assertEquals(result.skills.all()[2].title, "skill3") # creation on the fly - - # variation switching the http prefix of the BASE_URL in the request - @override_settings(BASE_URL='http://happy-dev.fr/') - def test_save_m2m_switch_base_url_prefix(self): - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") - - job = {"title": "job test", - "slug": "slug1", - "skills": { - "ldp:contains": [ - {"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)}, - ]} - } - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test") - self.assertIs(result.skills.count(), 1) - self.assertEquals(result.skills.all()[0].title, "skill1") # no change - - def test_save_m2m_graph_simple(self): - job = {"@graph": [ - {"title": "job test", "slug": "slugjob", - }, - ]} - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test") - self.assertIs(result.skills.count(), 0) - - def test_save_m2m_graph_with_nested(self): - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b") - - job = {"@graph": [ - {"title": "job test", - "slug": "slugj", - "skills": {"@id": "_.123"} - }, - {"@id": "_.123", "title": "skill3 NEW", "obligatoire": "obligatoire", "slug": "skill3"}, - ]} - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test") - self.assertIs(result.skills.count(), 1) - self.assertEquals(result.skills.all()[0].title, "skill3 NEW") # creation on the fly - - def test_save_without_nested_fields(self): - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b") - job = {"title": "job test", "slug": "c"} - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test") - self.assertIs(result.skills.count(), 0) - - def test_save_on_sub_iri(self): - """ - POST /job-offers/1/skills/ - """ - job = JobOffer.objects.create(title="job test") - skill = {"title": "new SKILL"} - - meta_args = {'model': Skill, 'depth': 2, 'fields': ("@id", "title")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('SkillSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=skill) - serializer.is_valid() - kwargs = {} - kwargs['joboffer'] = job - result = serializer.save(**kwargs) - - self.assertEquals(result.title, "new SKILL") - self.assertIs(result.joboffer_set.count(), 1) - self.assertEquals(result.joboffer_set.get(), job) - self.assertIs(result.joboffer_set.get().skills.count(), 1) - def test_save_fk_graph_with_nested(self): post = { '@graph': [ @@ -232,42 +62,6 @@ class Save(TestCase): self.assertEquals(response.data['title'], "title") self.assertEquals(response.data['invoice']['title'], "title 3") - #Â https://www.w3.org/TR/json-ld/#value-objects - def test_save_field_with_value_object(self): - post = { - 'http://happy-dev.fr/owl/#title': { - '@value': "title", - '@language': "en" - } - } - response = self.client.post('/invoices/', data=json.dumps(post), content_type='application/ld+json') - self.assertEqual(response.status_code, 201) - self.assertEquals(response.data['title'], "title") - - # from JSON-LD spec: "The value associated with the @value key MUST be either a string, a number, true, false or null" - def test_save_field_with_invalid_value_object(self): - invoice = Invoice.objects.create(title="title 3") - post = { - 'http://happy-dev.fr/owl/#invoice': { - '@value': {'title': 'title', '@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)} - } - } - response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json') - self.assertEqual(response.status_code, 400) - - # TODO: bug with PyLD: https://github.com/digitalbazaar/pyld/issues/142 - # from JSON-LD spec: "If the value associated with the @type key is @json, the value MAY be either an array or an object" - '''def test_save_field_with_object_value_object(self): - invoice = Invoice.objects.create(title="title 3") - post = { - 'http://happy-dev.fr/owl/#invoice': { - '@value': {'title': 'title', '@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)}, - '@type': '@json' - } - } - response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json') - self.assertEqual(response.status_code, 201)''' - def test_post_should_accept_missing_field_id_nullable(self): body = [ { @@ -455,69 +249,41 @@ class Save(TestCase): response = self.client.get('/projects/{}/'.format(project.pk)) self.assertEqual(response.data['team']['ldp:contains'][0]['@id'], "http://external.user/user/1/") - # unit tests for a specific bug: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/307 - def test_direct_boolean_field(self): - profile = UserProfile.objects.create(user=self.user) - setting = NotificationSetting.objects.create(user=profile, receiveMail=False) - body = { - 'http://happy-dev.fr/owl/#@id': setting.urlid, - 'receiveMail': True, - "@context": {"@vocab": "http://happy-dev.fr/owl/#", "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", - "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", - "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", - "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", - "lng": "geo:long"} + # Â https://www.w3.org/TR/json-ld/#value-objects + def test_post_field_with_value_object(self): + post = { + 'http://happy-dev.fr/owl/#title': { + '@value': "title", + '@language': "en" + } } + response = self.client.post('/invoices/', data=json.dumps(post), content_type='application/ld+json') + self.assertEqual(response.status_code, 201) + self.assertEquals(response.data['title'], "title") - response = self.client.patch('/notificationsettings/{}/'.format(setting.pk), - data=json.dumps(body), - content_type='application/ld+json') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['receiveMail'], True) - - def test_nested_container_boolean_field_no_slug(self): - profile = UserProfile.objects.create(user=self.user) - setting = NotificationSetting.objects.create(user=profile, receiveMail=False) - body = { - 'settings': { - 'http://happy-dev.fr/owl/#@id': setting.urlid, - 'receiveMail': True - }, - "@context": {"@vocab": "http://happy-dev.fr/owl/#", "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", - "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", - "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", - "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", - "lng": "geo:long"} + # from JSON-LD spec: "The value associated with the @value key MUST be either a string, a number, true, false or null" + def test_save_field_with_invalid_value_object(self): + invoice = Invoice.objects.create(title="title 3") + post = { + 'http://happy-dev.fr/owl/#invoice': { + '@value': {'title': 'title', + '@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)} + } } + response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json') + self.assertEqual(response.status_code, 400) - response = self.client.patch('/userprofiles/{}/'.format(profile.slug), - data=json.dumps(body), - content_type='application/ld+json') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['settings']['receiveMail'], True) - - # variation where the lookup_field for NotificationSetting (pk) is provided - def test_nested_container_boolean_field_with_slug(self): - profile = UserProfile.objects.create(user=self.user) - setting = NotificationSetting.objects.create(user=profile, receiveMail=False) - body = { - 'settings': { - 'pk': setting.pk, - 'http://happy-dev.fr/owl/#@id': setting.urlid, - 'receiveMail': True - }, - "@context": {"@vocab": "http://happy-dev.fr/owl/#", "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", - "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", - "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", - "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", - "lng": "geo:long"} + # TODO: bug with PyLD: https://github.com/digitalbazaar/pyld/issues/142 + # from JSON-LD spec: "If the value associated with the @type key is @json, the value MAY be either an array or an object" + ''' + def test_save_field_with_object_value_object(self): + invoice = Invoice.objects.create(title="title 3") + post = { + 'http://happy-dev.fr/owl/#invoice': { + '@value': {'title': 'title', '@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)}, + '@type': '@json' + } } - - response = self.client.patch('/userprofiles/{}/'.format(profile.slug), - data=json.dumps(body), - content_type='application/ld+json') - self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['settings']['receiveMail'], True) + response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json') + self.assertEqual(response.status_code, 201) + ''' \ No newline at end of file diff --git a/djangoldp/tests/tests_update.py b/djangoldp/tests/tests_update.py index 1f9e3adc4f82056f34ef476ed64cd39215390bf5..bb31b7e1abd92a75f9951e3074e5945c2e43a54d 100644 --- a/djangoldp/tests/tests_update.py +++ b/djangoldp/tests/tests_update.py @@ -6,8 +6,8 @@ from rest_framework.test import APIRequestFactory, APIClient from rest_framework.utils import json from djangoldp.serializers import LDPSerializer, LDListMixin -from djangoldp.tests.models import Post, UserProfile, Resource, Circle -from djangoldp.tests.models import Skill, JobOffer, Conversation, Message, Project +from djangoldp.tests.models import Post, UserProfile, Resource, Circle, CircleMember, Invoice, Batch, Task, Skill, JobOffer, \ + Conversation, Message, Project, NotificationSetting class Update(TestCase): @@ -21,270 +21,61 @@ class Update(TestCase): LDListMixin.to_representation_cache.reset() LDPSerializer.to_representation_cache.reset() - def tearDown(self): - pass - - def test_update(self): - skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1") - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3") - job1 = JobOffer.objects.create(title="job test") - job1.skills.add(skill) - - job = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job1.slug), - "title": "job test updated", - "skills": { - "ldp:contains": [ - {"title": "new skill", "obligatoire": "okay"}, - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), "title": "skill2 UP"}, - ]} - } - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job, instance=job1) - serializer.is_valid() - result = serializer.save() - - self.assertEquals(result.title, "job test updated") - self.assertIs(result.skills.count(), 3) - skills = result.skills.all().order_by('title') - self.assertEquals(skills[0].title, "new skill") # new skill - self.assertEquals(skills[1].title, "skill1") # no change - self.assertEquals(skills[2].title, "skill2 UP") # title updated - - def test_update_graph(self): - skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1") - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3") - job1 = JobOffer.objects.create(title="job test", slug="slug4") - job1.skills.add(skill) - - job = {"@graph": - [ - { - "@id": "{}/job-offers/{}/".format(settings.BASE_URL, job1.slug), - "title": "job test updated", - "skills": { - "ldp:contains": [ - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug)}, - {"@id": "_.123"}, - ]} - }, - { - "@id": "_.123", - "title": "new skill", - "obligatoire": "okay" - }, - { - "@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug), - }, - { - "@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), - "title": "skill2 UP" - } - ] - } - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job, instance=job1) - serializer.is_valid() - result = serializer.save() - - skills = result.skills.all().order_by('title') - - self.assertEquals(result.title, "job test updated") - self.assertIs(result.skills.count(), 3) - self.assertEquals(skills[0].title, "new skill") # new skill - self.assertEquals(skills[1].title, "skill1") # no change - self.assertEquals(skills[2].title, "skill2 UP") # title updated - - def test_update_graph_2(self): - skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug") - skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1") - skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2") - job1 = JobOffer.objects.create(title="job test", slug="slug1") - job1.skills.add(skill) - - job = {"@graph": - [ - { - "@id": "{}/job-offers/{}/".format(settings.BASE_URL, job1.slug), - "title": "job test updated", - "skills": { - "@id": "{}/job-offers/{}/skills/".format(settings.BASE_URL, job1.slug) - } - }, - { - "@id": "_.123", - "title": "new skill", - "obligatoire": "okay" - }, - { - "@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug), - }, - { - "@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug), - "title": "skill2 UP" - }, - { - '@id': "{}/job-offers/{}/skills/".format(settings.BASE_URL, job1.slug), + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/326 + ''' + def test_update_container_append_resource(self): + pre_existing_skill_a = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug1") + pre_existing_skill_b = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug2") + job = JobOffer.objects.create(title="job test") + job.skills.add(pre_existing_skill_a) + job.skills.add(pre_existing_skill_b) + + post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { "ldp:contains": [ - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill1.slug)}, - {"@id": "{}/skills/{}/".format(settings.BASE_URL, skill2.slug)}, - {"@id": "_.123"}, - ] + {"title": "new skill", "obligatoire": "okay"}, + {"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill_b.slug), "title": "z"}, + ]} } - ] - } - - meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=job, instance=job1) - serializer.is_valid() - result = serializer.save() - skills = result.skills.all().order_by('title') + response = self.client.patch('/job-offers/{}/'.format(job.slug), + data=json.dumps(post), + content_type='application/ld+json') + self.assertEquals(response.status_code, 200) - self.assertEquals(result.title, "job test updated") - self.assertIs(result.skills.count(), 3) + self.assertEquals(response.data['title'], job.title) + self.assertIs(job.skills.count(), 3) + skills = job.skills.all().order_by('title') self.assertEquals(skills[0].title, "new skill") # new skill - self.assertEquals(skills[1].title, "skill1") # no change - self.assertEquals(skills[2].title, "skill2 UP") # title updated - self.assertEquals(skill, skill._meta.model.objects.get(pk=skill.pk)) # title updated - - # TODO: test update with external urlid which doesn't exist - # TODO: test update with internal urlid which doesn't exist - # TODO: repeat of the above where the relationship is ForeignKey - # TODO: test update with internal urlid which refers to a different type of object entirely - # TODO: test update with internal urlid which refers to a container - - def test_update_list_with_reverse_relation(self): - user1 = get_user_model().objects.create() - conversation = Conversation.objects.create(description="Conversation 1", author_user=user1) - message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1) - message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1) - - json = {"@graph": [ - { - "@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk), - "text": "Message 1 UP" - }, - { - "@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk), - "text": "Message 2 UP" - }, - { - '@id': "{}/conversations/{}/".format(settings.BASE_URL, conversation.pk), - 'description': "Conversation 1 UP", - "message_set": [ - {"@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk)}, - {"@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk)}, - ] - } - ] - } - - meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=json, instance=conversation) - serializer.is_valid() - result = serializer.save() - - messages = result.message_set.all().order_by('text') - - self.assertEquals(result.description, "Conversation 1 UP") - self.assertIs(result.message_set.count(), 2) - self.assertEquals(messages[0].text, "Message 1 UP") - self.assertEquals(messages[1].text, "Message 2 UP") - - def test_add_new_element_with_foreign_key_id(self): - user1 = get_user_model().objects.create() - conversation = Conversation.objects.create(description="Conversation 1", author_user=user1) - message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1) - message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1) - - json = {"@graph": [ - { - "@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk), - "text": "Message 1 UP", - "author_user": { - '@id': "{}/users/{}/".format(settings.BASE_URL, user1.pk) - } - }, - { - "@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk), - "text": "Message 2 UP", - "author_user": { - '@id': user1.urlid - } - }, - { - "@id": "_:b1", - "text": "Message 3 NEW", - "author_user": { - '@id': user1.urlid - } - }, - { - '@id': "{}/conversations/{}/".format(settings.BASE_URL, conversation.pk), - "author_user": { - '@id': user1.urlid - }, - 'description': "Conversation 1 UP", - 'message_set': { - "@id": "{}/conversations/{}/message_set/".format(settings.BASE_URL, conversation.pk) - } - }, - { - '@id': "{}/conversations/{}/message_set/".format(settings.BASE_URL, conversation.pk), - "ldp:contains": [ - {"@id": "{}/messages/{}/".format(settings.BASE_URL, message1.pk)}, - {"@id": "{}/messages/{}/".format(settings.BASE_URL, message2.pk)}, - {"@id": "_:b1"} - ] - } - ] - } - - meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")} - - meta_class = type('Meta', (), meta_args) - serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class}) - serializer = serializer_class(data=json, instance=conversation) - serializer.is_valid() - result = serializer.save() - - messages = result.message_set.all().order_by('text') - - self.assertEquals(result.description, "Conversation 1 UP") - self.assertIs(result.message_set.count(), 3) - self.assertEquals(messages[0].text, "Message 1 UP") - self.assertEquals(messages[1].text, "Message 2 UP") - self.assertEquals(messages[2].text, "Message 3 NEW") + self.assertEquals(skills[1].title, pre_existing_skill_a.title) # old skill unchanged + self.assertEquals(skills[2].title, "z") # updated + self.assertEquals(skills[2].obligatoire, pre_existing_skill_b.obligatoire) # another field not updated + ''' def test_put_resource(self): - post = Post.objects.create(content="content") + skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1') body = [{ - '@id': '{}/posts/{}/'.format(settings.BASE_URL, post.pk), - 'http://happy-dev.fr/owl/#content': "post content"}] - response = self.client.put('/posts/{}/'.format(post.pk), data=json.dumps(body), + '@id': '{}/skills/{}/'.format(settings.BASE_URL, skill.slug), + 'http://happy-dev.fr/owl/#title': "new", 'http://happy-dev.fr/owl/#obligatoire': "new"}] + response = self.client.put('/skills/{}/'.format(skill.slug), data=json.dumps(body), content_type='application/ld+json') self.assertEqual(response.status_code, 200) - self.assertEquals(response.data['content'], "post content") + self.assertEquals(response.data['title'], "new") + self.assertEquals(response.data['obligatoire'], "new") self.assertIn('location', response._headers) + def test_patch_resource(self): + skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1') + body = { + '@id': '{}/skills/{}'.format(settings.BASE_URL, skill.slug), + 'http://happy-dev.fr/owl/#title': 'new' + } + response = self.client.patch('/skills/{}/'.format(skill.slug), data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + self.assertEquals(response.data['title'], "new") + self.assertEquals(response.data['obligatoire'], "original") + def test_create_sub_object_in_existing_object_with_existing_reverse_1to1_relation(self): user = get_user_model().objects.create(username="alex", password="test") profile = UserProfile.objects.create(user=user, description="user description") @@ -306,6 +97,22 @@ class Update(TestCase): self.assertEqual(response.status_code, 200) self.assertIn('userprofile', response.data) + def test_put_nonexistent_local_resource(self): + job = JobOffer.objects.create(title="job test") + + # contains internal urlid which refers to non-existent resource + body = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug), + "skills": { + "ldp:contains": [ + {"@id": "{}/skills/404/".format(settings.BASE_URL)}, + ]} + } + + response = self.client.put('/job-offers/{}/'.format(job.slug), data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + self.assertEqual(Skill.objects.count(), 0) + def test_create_sub_object_in_existing_object_with_reverse_fk_relation(self): """ Doesn't work with depth = 0 on UserProfile Model. Should it be ? @@ -617,7 +424,162 @@ class Update(TestCase): content_type='application/ld+json') self.assertEqual(response.data['description'], "user update") - # TODO: test passing foreign key relation which I shouldn't have access/permission to - # TODO: test passing many-to-many relation in edit which isn't yet on my model + # unit tests for a specific bug: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/307 + def test_direct_boolean_field(self): + profile = UserProfile.objects.create(user=self.user) + setting = NotificationSetting.objects.create(user=profile, receiveMail=False) + body = { + 'http://happy-dev.fr/owl/#@id': setting.urlid, + 'receiveMail': True, + "@context": {"@vocab": "http://happy-dev.fr/owl/#", + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", + "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", + "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", + "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", + "lng": "geo:long"} + } + + response = self.client.patch('/notificationsettings/{}/'.format(setting.pk), + data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data['receiveMail'], True) + + def test_nested_container_boolean_field_no_slug(self): + profile = UserProfile.objects.create(user=self.user) + setting = NotificationSetting.objects.create(user=profile, receiveMail=False) + body = { + 'settings': { + 'http://happy-dev.fr/owl/#@id': setting.urlid, + 'receiveMail': True + }, + "@context": {"@vocab": "http://happy-dev.fr/owl/#", + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", + "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", + "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", + "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", + "lng": "geo:long"} + } + + response = self.client.patch('/userprofiles/{}/'.format(profile.slug), + data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data['settings']['receiveMail'], True) + + # variation where the lookup_field for NotificationSetting (pk) is provided + def test_nested_container_boolean_field_with_slug(self): + profile = UserProfile.objects.create(user=self.user) + setting = NotificationSetting.objects.create(user=profile, receiveMail=False) + body = { + 'settings': { + 'pk': setting.pk, + 'http://happy-dev.fr/owl/#@id': setting.urlid, + 'receiveMail': True + }, + "@context": {"@vocab": "http://happy-dev.fr/owl/#", + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#", + "foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label", + "acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl", + "mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat", + "lng": "geo:long"} + } + + response = self.client.patch('/userprofiles/{}/'.format(profile.slug), + data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data['settings']['receiveMail'], True) + + def test_update_container_twice_nested_view(self): + invoice = Invoice.objects.create(title='test') + pre_existing_batch = Batch.objects.create(title='batch1', invoice=invoice) + pre_existing_task = Task.objects.create(title='task1', batch=pre_existing_batch) + + base_url = settings.BASE_URL + + body = { + "@id": "{}/invoices/{}/".format(base_url, invoice.pk), + "http://happy-dev.fr/owl/#title": "new", + "http://happy-dev.fr/owl/#batches": [ + { + "@id": "{}/batchs/{}/".format(base_url, pre_existing_batch.pk), + "http://happy-dev.fr/owl/#title": "new", + "http://happy-dev.fr/owl/#tasks": [ + { + "@id": "{}/tasks/{}/".format(base_url, pre_existing_task.pk), + "http://happy-dev.fr/owl/#title": "new" + }, + { + "http://happy-dev.fr/owl/#title": "tache 2" + } + ] + }, + { + "http://happy-dev.fr/owl/#title": "z", + } + ] + } + + response = self.client.put('/invoices/{}/'.format(invoice.pk), data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + + self.assertEquals(response.data['title'], "new") + self.assertEquals(response.data['@id'], invoice.urlid) + + invoice = Invoice.objects.get(pk=invoice.pk) + self.assertIs(invoice.batches.count(), 2) + batches = invoice.batches.all().order_by('title') + self.assertEquals(batches[0].title, "new") + self.assertEquals(batches[0].urlid, pre_existing_batch.urlid) + self.assertEquals(batches[1].title, "z") + + self.assertIs(batches[0].tasks.count(), 2) + tasks = batches[0].tasks.all().order_by('title') + self.assertEquals(tasks[0].title, "new") + self.assertEquals(tasks[0].pk, pre_existing_task.pk) + self.assertEquals(tasks[1].title, "tache 2") + + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/333 + '''def test_update_container_nested_view(self): + circle = Circle.objects.create(name='test') + pre_existing = CircleMember.objects.create(user=self.user, circle=circle, is_admin=False) + another_user = get_user_model().objects.create_user(username='u2', email='u2@b.com', password='pw') + + body = { + "@id": "{}/circles/{}/".format(settings.BASE_URL, circle.pk), + "http://happy-dev.fr/owl/#name": "Updated Name", + "http://happy-dev.fr/owl/#members": { + "ldp:contains": [ + {"@id": "{}/circle-members/{}/".format(settings.BASE_URL, pre_existing.pk), + "http://happy-dev.fr/owl/#is_admin": True}, + {"http://happy-dev.fr/owl/#user": {"@id": another_user.urlid}, + "http://happy-dev.fr/owl/#is_admin": False}, + ] + } + } + + response = \ + self.client.put('/circles/{}/'.format(circle.pk), data=json.dumps(body), content_type='application/ld+json') + print(str(self.user.urlid)) + print(str(response.data)) + self.assertEqual(response.status_code, 200) + self.assertEquals(response.data['name'], circle.name) + self.assertEqual(response.data['@id'], circle.urlid) + self.assertIs(CircleMember.objects.count(), 2) + self.assertIs(circle.members.count(), 2) + self.assertIs(circle.team.count(), 2) + + members = circle.members.all().order_by('pk') + self.assertEqual(members[0].user, self.user) + self.assertEqual(members[0].urlid, pre_existing.urlid) + self.assertEqual(members[0].pk, pre_existing.pk) + self.assertEqual(members[0].is_admin, True) + self.assertEqual(members[1].user, another_user) + self.assertEqual(members[1].is_admin, False)''' diff --git a/djangoldp/tests/tests_user_permissions.py b/djangoldp/tests/tests_user_permissions.py index 2f61261ceb5d2403b72c60066aaf8cc42a1a5dbb..9ab7ec55d3c6c84f2801cd7c7284734a8cbcde0d 100644 --- a/djangoldp/tests/tests_user_permissions.py +++ b/djangoldp/tests/tests_user_permissions.py @@ -1,7 +1,11 @@ from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission, Group +from django.conf import settings +from django.test import override_settings +from djangoldp.serializers import LDListMixin, LDPSerializer from rest_framework.test import APIClient, APITestCase -from .models import JobOffer, LDPDummy, PermissionlessDummy +from djangoldp.tests.models import JobOffer, LDPDummy, PermissionlessDummy, UserProfile, OwnedResource, \ + NoSuperUsersAllowedModel, ComplexPermissionClassesModel, OwnedResourceVariant import json @@ -13,6 +17,8 @@ class TestUserPermissions(APITestCase): self.client = APIClient(enforce_csrf_checks=True) self.client.force_authenticate(user=self.user) self.job = JobOffer.objects.create(title="job", slug="slug1") + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() def setUpGroup(self): self.group = Group.objects.create(name='Test') @@ -20,41 +26,132 @@ class TestUserPermissions(APITestCase): self.group.permissions.add(view_perm) self.group.save() + def _make_self_superuser(self): + self.user.is_superuser = True + self.user.save() + # list - simple + @override_settings(SERIALIZE_EXCLUDE_PERMISSIONS=['inherit'], + SERIALIZE_CONTAINER_EXCLUDE_PERMISSIONS=['inherit', 'delete']) def test_get_for_authenticated_user(self): response = self.client.get('/job-offers/') self.assertEqual(response.status_code, 200) + # test serialized permissions + self.assertIn({'mode': {'@type': 'view'}}, response.data['permissions']) + self.assertNotIn({'mode': {'@type': 'inherit'}}, response.data['permissions']) + # self.assertNotIn({'mode': {'@type': 'delete'}}, response.data['permissions']) - # list - I do not have permission from the model, but I do have permission via a Group I am assigned - # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291 + # TODO: list - I do not have permission from the model, but I do have permission via a Group I am assigned + # https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291 '''def test_group_list_access(self): self.setUpGroup() + dummy = PermissionlessDummy.objects.create() response = self.client.get('/permissionless-dummys/') - self.assertEqual(response.status_code, 403) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 0) + + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() self.user.groups.add(self.group) self.user.save() response = self.client.get('/permissionless-dummys/') - self.assertEqual(response.status_code, 200)''' + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) - # TODO: repeat of the above test on nested field - '''def test_group_list_access_nested(self): + # repeat of the above test on nested field + def test_group_list_access_nested_field(self): self.setUpGroup() parent = LDPDummy.objects.create() - dummy = PermissionlessDummy.objects.create(parent=parent)''' + PermissionlessDummy.objects.create(parent=parent) + + response = self.client.get('/ldpdummys/{}/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['anons']['ldp:contains']), 0) + + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() + + self.user.groups.add(self.group) + self.user.save() + response = self.client.get('/ldpdummys/{}/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['anons']['ldp:contains']), 1) + + # repeat of the test on a nested viewset + def test_group_list_access_nested_viewset(self): + self.setUpGroup() + parent = LDPDummy.objects.create() + PermissionlessDummy.objects.create(parent=parent) + + response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 0) + + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() + + self.user.groups.add(self.group) + self.user.save() + response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + + # repeat for object-specific request + def test_group_object_access(self): + self.setUpGroup() + dummy = PermissionlessDummy.objects.create() + response = self.client.get('/permissionless-dummys/{}'.format(dummy)) + self.assertEqual(response.status_code, 404) + + LDListMixin.to_representation_cache.reset() + LDPSerializer.to_representation_cache.reset() + + self.user.groups.add(self.group) + self.user.save() + response = self.client.get('/permissionless-dummys/{}/'.format(dummy)) + self.assertEqual(response.status_code, 200) + + # TODO: test for POST scenario + # TODO: test for PUT scenario + # TODO: test for DELETE scenario + ''' + + @override_settings(SERIALIZE_OBJECT_EXCLUDE_PERMISSIONS=['inherit']) def test_get_1_for_authenticated_user(self): response = self.client.get('/job-offers/{}/'.format(self.job.slug)) self.assertEqual(response.status_code, 200) + self.assertIn({'mode': {'@type': 'view'}}, response.data['permissions']) + self.assertNotIn({'mode': {'@type': 'inherit'}}, response.data['permissions']) def test_post_request_for_authenticated_user(self): - post = {'title': "job_created", "slug": 'slug1'} + post = {'http://happy-dev.fr/owl/#title': "job_created", "http://happy-dev.fr/owl/#slug": 'slug2'} response = self.client.post('/job-offers/', data=json.dumps(post), content_type='application/ld+json') self.assertEqual(response.status_code, 201) + # denied because I don't have model permissions + def test_post_request_denied_model_perms(self): + data = {'http://happy-dev.fr/owl/#some': 'title'} + response = self.client.post('/permissionless-dummys/', data=json.dumps(data), content_type='application/ld+json') + self.assertEqual(response.status_code, 403) + + def test_post_nested_view_authorized(self): + data = { "http://happy-dev.fr/owl/#title": "new skill", "http://happy-dev.fr/owl/#obligatoire": "okay" } + response = self.client.post('/job-offers/{}/skills/'.format(self.job.slug), data=json.dumps(data), + content_type='application/ld+json') + self.assertEqual(response.status_code, 201) + + def test_post_nested_view_denied_model_perms(self): + parent = LDPDummy.objects.create(some='parent') + data = { "http://happy-dev.fr/owl/#some": "title" } + response = self.client.post('/ldpdummys/{}/anons/'.format(parent.pk), data=json.dumps(data), + content_type='application/ld+json') + self.assertEqual(response.status_code, 403) + def test_put_request_for_authenticated_user(self): - body = {'title':"job_updated"} + body = {'https://happy-dev.fr/owl/#title':"job_updated"} response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body), content_type='application/ld+json') self.assertEqual(response.status_code, 200) @@ -63,3 +160,206 @@ class TestUserPermissions(APITestCase): response = self.client.patch('/job-offers/' + str(self.job.slug) + "/", content_type='application/ld+json') self.assertEqual(response.status_code, 200) + + def test_put_request_denied_model_perms(self): + dummy = PermissionlessDummy.objects.create(some='some', slug='slug') + data = {'http://happy-dev.fr/owl/#some': 'new'} + response = self.client.put('/permissionless-dummys/{}/'.format(dummy.slug), data=json.dumps(data), + content_type='application/ld+json') + self.assertEqual(response.status_code, 404) + + def test_put_nested_view_denied_model_perms(self): + parent = LDPDummy.objects.create(some='parent') + child = PermissionlessDummy.objects.create(some='child', slug='child', parent=parent) + data = {"http://happy-dev.fr/owl/#some": "new"} + response = self.client.put('/ldpdummys/{}/anons/{}/'.format(parent.pk, child.slug), data=json.dumps(data), + content_type='application/ld+json') + self.assertEqual(response.status_code, 404) + + def test_patch_nested_container_attach_existing_resource_permission_denied(self): + '''I am attempting to add a resource which I should not know exists''' + parent = LDPDummy.objects.create(some='parent') + dummy = PermissionlessDummy.objects.create(some='some', slug='slug') + data = { + 'http://happy-dev.fr/owl/#anons': [ + {'@id': '{}/permissionless-dummys/{}/'.format(settings.SITE_URL, dummy.slug), 'http://happy-dev.fr/owl/#slug': dummy.slug} + ] + } + response = self.client.patch('/ldpdummys/{}/'.format(parent.pk), data=json.dumps(data), content_type='application/ld+json') + self.assertEqual(response.status_code, 404) + + # variations on previous tests with an extra level of depth + # TODO + def test_post_nested_container_twice_nested_permission_denied(self): + pass + + # TODO + def test_put_nested_container_twice_nested_permission_denied(self): + pass + + # TODO: repeat of the above where it is authorized because I have permission through my Group + # https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291 + + def test_put_request_change_urlid_rejected(self): + self.assertEqual(JobOffer.objects.count(), 1) + body = {'@id': "ishouldnotbeabletochangethis"} + response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body), + content_type='application/ld+json') + # TODO: this is failing quietly + # https://git.happy-dev.fr/startinblox/solid-spec/issues/14 + self.assertEqual(response.status_code, 200) + self.assertEqual(JobOffer.objects.count(), 1) + self.assertFalse(JobOffer.objects.filter(urlid=body['@id']).exists()) + + def test_put_request_change_pk_rejected(self): + self.assertEqual(JobOffer.objects.count(), 1) + body = {'http://happy-dev.fr/owl/#pk': 2} + response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body), + content_type='application/ld+json') + # TODO: this is failing quietly + # https://git.happy-dev.fr/startinblox/solid-spec/issues/14 + self.assertEqual(response.status_code, 200) + self.assertEqual(JobOffer.objects.count(), 1) + self.assertFalse(JobOffer.objects.filter(pk=body['http://happy-dev.fr/owl/#pk']).exists()) + + # tests that I receive a list of objects for which I am owner, filtering those for which I am not + def test_list_owned_resources(self): + my_resource = OwnedResource.objects.create(description='test', user=self.user) + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.get('/ownedresources/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + self.assertEqual(response.data['ldp:contains'][0]['@id'], my_resource.urlid) + + # I do not have model permissions as an authenticated user, but I am the resources' owner + def test_get_owned_resource(self): + my_resource = OwnedResource.objects.create(description='test', user=self.user) + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.get('/ownedresources/{}/'.format(my_resource.pk)) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data['@id'], my_resource.urlid) + self.assertIn({'mode': {'@type': 'delete'}}, response.data['permissions']) + + # I have permission to view this resource + response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 404) + + def test_patch_owned_resource(self): + my_profile = UserProfile.objects.create(user=self.user, slug=self.user.username, description='about me') + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about') + + response = self.client.patch('/userprofiles/{}/'.format(my_profile.slug)) + self.assertEqual(response.status_code, 200) + + response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug)) + # TODO: technically this should be 403, since I do have permission to view their user profile + # https://git.startinblox.com/djangoldp-packages/djangoldp/issues/336 + self.assertEqual(response.status_code, 404) + + def test_delete_owned_resource(self): + my_resource = OwnedResource.objects.create(description='test', user=self.user) + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.delete('/ownedresources/{}/'.format(my_resource.pk)) + self.assertEqual(response.status_code, 204) + + response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 404) + + # test superuser permissions (configured on model) + def test_list_superuser_perms(self): + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.get('/ownedresources/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 0) + + # now I'm superuser, I have the permissions + self._make_self_superuser() + + response = self.client.get('/ownedresources/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + + def test_get_superuser_perms(self): + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 404) + + self._make_self_superuser() + + response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 200) + + def test_put_superuser_perms(self): + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about') + + response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug)) + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/336 + self.assertEqual(response.status_code, 404) + + self._make_self_superuser() + + response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug)) + self.assertEqual(response.status_code, 200) + + def test_delete_superuser_perms(self): + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + their_resource = OwnedResource.objects.create(description='another test', user=another_user) + + response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 404) + + self._make_self_superuser() + + response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk)) + self.assertEqual(response.status_code, 204) + + # test where superuser_perms are configured on the model to be different + def test_superuser_perms_configured(self): + self._make_self_superuser() + + NoSuperUsersAllowedModel.objects.create() + self.assertEqual(NoSuperUsersAllowedModel.objects.count(), 1) + + response = self.client.get('/nosuperusersallowedmodels/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 0) + + # test list where SuperUserPermission is being used on a model in conjunction with LDPPermissions + def test_filter_backend_multiple_permission_classes_configured(self): + ComplexPermissionClassesModel.objects.create() + + response = self.client.get('/complexpermissionclassesmodels/') + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data['ldp:contains']), 1) + + # I have model (or object?) permissions. Attempt to make myself owner and thus upgrade my permissions + # TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/356/ + ''' + def test_hack_model_perms_privilege_escalation(self): + another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test') + resource = OwnedResourceVariant.objects.create(description='another test', user=another_user) + + # authenticated has 'change' permission but only owner's have 'control' permission, meaning that I should + # not be able to change my privilege level + body = { + 'http://happy-dev.fr/owl/#user': {'@id': self.user.urlid} + } + response = self.client.put('/ownedresourcevariants/{}/'.format(resource.pk), data=json.dumps(body), + content_type='application/ld+json') + self.assertEqual(response.status_code, 200) + + resource = OwnedResourceVariant.objects.get(pk=resource.pk) + self.assertNotEqual(resource.user, self.user) + ''' diff --git a/djangoldp/utils.py b/djangoldp/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..83075fd842ce28b8e79068374f49397ab60de3e7 --- /dev/null +++ b/djangoldp/utils.py @@ -0,0 +1,14 @@ +from django.conf import settings +from guardian.utils import get_anonymous_user + + +# convenience function returns True if user is anonymous +def is_anonymous_user(user): + return user.is_anonymous or (getattr(settings, 'ANONYMOUS_USER_NAME', True) is not None and + user == get_anonymous_user()) + + +# convenience function returns True if user is authenticated +def is_authenticated_user(user): + return user.is_authenticated and (getattr(settings, 'ANONYMOUS_USER_NAME', True) is None or + user != get_anonymous_user()) diff --git a/djangoldp/views.py b/djangoldp/views.py index 6b36fc26106db150bc372b1062bd98720ebee1fe..af690fd8c46ef4644b969a6e5f18000e98371438 100644 --- a/djangoldp/views.py +++ b/djangoldp/views.py @@ -29,6 +29,7 @@ from djangoldp.models import LDPSource, Model, Follower from djangoldp.permissions import LDPPermissions from djangoldp.filters import LocalObjectOnContainerPathBackend from djangoldp.related import get_prefetch_fields +from djangoldp.utils import is_authenticated_user from djangoldp.activities import ActivityQueueService, as_activitystream from djangoldp.activities import ActivityPubService from djangoldp.activities.errors import ActivityStreamDecodeError, ActivityStreamValidationError @@ -511,7 +512,20 @@ class LDPViewSet(LDPViewSetGenerator): ''' return True + def check_model_permissions(self, request): + """ + Check if the request should be permitted when the model-level permissions matter (generally just for creating an object) + Raises an appropriate exception if the request is not permitted. + """ + for permission in self.get_permissions(): + if hasattr(permission, 'has_container_permission') and not permission.has_container_permission(request, self): + self.permission_denied( + request, + message=getattr(permission, 'message', None) + ) + def create(self, request, *args, **kwargs): + self.check_model_permissions(request) serializer = self.get_write_serializer(data=request.data) serializer.is_valid(raise_exception=True) if not self.is_safe_create(request.user, serializer.validated_data): @@ -581,7 +595,7 @@ class LDPViewSet(LDPViewSetGenerator): if self.model: queryset = self.model.objects.all() else: - queryset = super(LDPView, self).get_queryset(*args, **kwargs) + queryset = super(LDPViewSet, self).get_queryset(*args, **kwargs) if self.prefetch_fields is None: depth = getattr(self, 'depth', Model.get_meta(self.model, 'depth', 0)) self.prefetch_fields = get_prefetch_fields(self.model, self.get_serializer(), depth) @@ -601,7 +615,7 @@ class LDPViewSet(LDPViewSetGenerator): else: pass response["Accept-Post"] = "application/ld+json" - if request.user.is_authenticated: + if is_authenticated_user(request.user): try: response['User'] = request.user.webid() except AttributeError: diff --git a/docs/create_model.md b/docs/create_model.md index 1895d4b582698e08013c998275380b20f852f785..a169780e4caa7fea08aa935fc70f67ddf4afc6ae 100644 --- a/docs/create_model.md +++ b/docs/create_model.md @@ -291,13 +291,19 @@ Now when an instance of `MyModel` is saved, its `author_user` property will be s Django-Guardian is used by default to support object-level permissions. Custom permissions can be added to your model using this attribute. See the [Django-Guardian documentation](https://django-guardian.readthedocs.io/en/stable/userguide/assign.html) for more information +### Serializing Permissions + +* `SERIALIZE_EXCLUDE_PERMISSIONS`. Permissions which should always be excluded from serialization defaults to `['inherit']` +* `SERIALIZE_EXCLUDE_CONTAINER_PERMISSIONS_DEFAULT`. Excluded also when serializing containers `['delete']` +* `SERIALIZE_EXCLUDE_OBJECT_PERMISSIONS_DEFAULT`. Excluded also when serializing objects `[]` + ## permissions_classes This allows you to add permissions for anonymous, logged in user, author ... in the url: By default `LDPPermissions` is used. Specific permissin classes can be developed to fit special needs. -## anonymous_perms, user_perms, owner_perms +## anonymous_perms, user_perms, owner_perms, superuser_perms Those allow you to set permissions from your model's meta. @@ -326,8 +332,9 @@ class Todo(Model): class Meta: anonymous_perms = ['view'] - authenticated_perms = ['inherit', 'add'] - owner_perms = ['inherit', 'change', 'control', 'delete'] + authenticated_perms = ['inherit', 'add'] # inherits from anonymous + owner_perms = ['inherit', 'change', 'control', 'delete'] # inherits from authenticated + superuser_perms = ['inherit'] # inherits from owner owner_field = 'user' ``` @@ -335,6 +342,10 @@ class Todo(Model): Important note: If you need to give permissions to owner's object, don't forget to add auto_author in model's meta +Superuser's are by default configured to have all of the default DjangoLDP permissions +* you can restrict their permissions globally by setting `DEFAULT_SUPERUSER_PERMS = []` in your server settings +* you can change it on a per-model basis as described here. Please note that if you use a custom permissions class you will need to give superusers this permission explicitly, or use the `SuperUsersPermission` class on the model which will grant superusers all permissions + ### view_set In case of custom viewset, you can use