Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • djangoldp-packages/djangoldp
  • decentral1se/djangoldp
  • femmefaytale/djangoldp
  • jvtrudel/djangoldp
4 results
Show changes
Showing
with 1724 additions and 593 deletions
from rest_framework.test import APIRequestFactory, APIClient, APITestCase
from rest_framework.test import APIClient, APIRequestFactory, APITestCase
from djangoldp.tests.models import Post
......@@ -8,7 +8,7 @@ class TestPagination(APITestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
for i in range(0, 10):
for i in range(0, 30):
Post.objects.create(content="content {}".format(i))
def tearDown(self):
......@@ -17,12 +17,12 @@ class TestPagination(APITestCase):
def test_next(self):
response = self.client.get('/posts/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('link', response._headers)
self.assertEquals(response._headers['link'][1], '<http://testserver/posts/?limit=5&offset=5>; rel="next"')
self.assertIn('link', response.headers)
self.assertEqual(response.headers['link'], '<http://testserver/posts/?p=2>; rel="next"')
def test_previous(self):
response = self.client.get('/posts/?offset=2&limit=2', content_type='application/ld+json')
response = self.client.get('/posts/?p=2', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('link', response._headers)
self.assertEquals(response._headers['link'][1],
'<http://testserver/posts/?limit=2>; rel="prev", <http://testserver/posts/?limit=2&offset=4>; rel="next"')
self.assertIn('link', response.headers)
self.assertEqual(response.headers['link'],
'<http://testserver/posts/>; rel="prev", <http://testserver/posts/?p=3>; rel="next"')
......@@ -5,9 +5,9 @@ import time
from django.contrib.auth import get_user_model
from rest_framework.test import APIRequestFactory, APIClient, APITestCase
from statistics import mean, variance
import cProfile, io, pstats
from djangoldp.permissions import LDPPermissions
from djangoldp.tests.models import Post, Invoice, JobOffer, Skill, Batch, DateModel, Project, User
from djangoldp.tests.models import Post, JobOffer, Skill, Project, User
class TestPerformanceGET(APITestCase):
......@@ -17,16 +17,13 @@ class TestPerformanceGET(APITestCase):
test_volume = 100
result_line = []
withAuth = True
withPermsCache = True
# fixtures = ['test_ten_1000.json',]
fixtures = ['test.json']
@classmethod
def setUpClass(cls):
super().setUpClass()
print("Init", end='', flush=True)
LDPPermissions.with_cache = cls.withPermsCache
step = cls.test_volume / 10
cls.factory = APIRequestFactory()
......@@ -61,7 +58,7 @@ class TestPerformanceGET(APITestCase):
def setUp(self):
self.client = APIClient()
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
password='glass onion', is_active=True)
self.client.force_authenticate(user=self.user)
print('there are ' + str(Project.objects.count()) + ' projects in the database')
print('there are ' + str(User.objects.count()) + ' users in the database')
......@@ -135,21 +132,32 @@ class TestPerformanceGET(APITestCase):
self.result_line[9] = str(mean(times))
print("Variance execution time :" + str(variance(times)))
def _print_stats(self, pr):
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.sort_stats('time').print_stats(50)
print(s.getvalue())
def _enable_new_profiler(self):
pr = cProfile.Profile()
pr.enable()
return pr
def test_get_users_container(self):
# pr = self._enable_new_profiler()
# response = self.client.get('/projects/', content_type='application/ld+json')
# self.assertEqual(response.status_code, 200)
# print('counted ' + str(len(response.data['ldp:contains'])) + ' projects')
# pr.disable()
#self._print_stats(pr)
# pr = self._enable_new_profiler()
pr = self._enable_new_profiler()
response = self.client.get('/projects/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' projects')
pr.disable()
self._print_stats(pr)
pr = self._enable_new_profiler()
start_time = time.time()
response = self.client.get('/users/', content_type='application/ld+json')
end_time = time.time()
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' users')
self.result_line[10] = str(end_time-start_time)
# pr.disable()
#self._print_stats(pr)
pr.disable()
self._print_stats(pr)
......@@ -27,16 +27,16 @@ class TestPerformance(APITestCase):
return pr
def test_get_container(self):
# pr = self._enable_new_profiler()
pr = self._enable_new_profiler()
response = self.client.get('/projects/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' projects')
# pr.disable()
#self._print_stats(pr)
pr.disable()
self._print_stats(pr)
# pr = self._enable_new_profiler()
pr = self._enable_new_profiler()
response = self.client.get('/users/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' users')
# pr.disable()
#self._print_stats(pr)
pr.disable()
self._print_stats(pr)
import json
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from guardian.models import GroupObjectPermission
from rest_framework.test import APIRequestFactory, APIClient, APITestCase
from djangoldp.tests.models import AnonymousReadOnlyPost, AuthenticatedOnlyPost, ReadOnlyPost, DoubleInheritModel, \
ReadAndCreatePost, OwnedResource, RestrictedCircle, RestrictedResource, ANDPermissionsDummy, ORPermissionsDummy
class TestPermissions(APITestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
def authenticate(self):
self.user = get_user_model().objects.create_user(username='random', email='random@user.com', password='Imrandom')
self.client = APIClient(enforce_csrf_checks=True)
self.client.force_authenticate(user=self.user)
def check_can_add(self, url, status_code=201, field='content', extra_content={}):
data = extra_content
extra_content[f"https://cdn.startinblox.com/owl#{field}"] = "new post"
response = self.client.post(url, data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 201:
self.assertIn('@id', response.data)
return response.data['@id']
def check_can_change(self, id, status_code=200, field='content'):
data = { f"https://cdn.startinblox.com/owl#{field}": "changed post" }
response = self.client.put(id, data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertIn('@id', response.data)
self.assertEqual(response.data['@id'], id)
def check_can_view_one(self, id, status_code=200):
response = self.client.get(id, content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertEqual(response.data['@id'], id)
def check_can_view(self, url, ids, status_code=200):
response = self.client.get(url, content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertEqual(len(response.data['ldp:contains']), len(ids))
for resource, id in zip(response.data['ldp:contains'], ids):
self.assertEqual(resource['@id'], id)
for id in ids:
self.check_can_view_one(id, status_code)
def test_permissionless_model(self):
id = self.check_can_add('/posts/')
self.check_can_view('/posts/', [id])
def test_anonymous_readonly(self):
post = AnonymousReadOnlyPost.objects.create(content = "test post")
self.check_can_view('/anonymousreadonlyposts/', [post.urlid])
self.check_can_add('/anonymousreadonlyposts/', 403)
self.check_can_change(post.urlid, 403)
self.authenticate()
self.check_can_add('/anonymousreadonlyposts/')
self.check_can_change(post.urlid)
def test_authenticated_only(self):
post = AuthenticatedOnlyPost.objects.create(content = "test post")
self.check_can_view('/authenticatedonlyposts/', [post.urlid], 403)
self.check_can_add('/authenticatedonlyposts/', 403)
self.check_can_change(post.urlid, 403)
post.delete()
self.authenticate()
#When authenticated it should behave like a non protected model
id = self.check_can_add('/authenticatedonlyposts/')
self.check_can_view('/authenticatedonlyposts/', [id])
self.check_can_change(id)
def test_readonly(self):
post = ReadOnlyPost.objects.create(content = "test post")
self.check_can_view('/readonlyposts/', [post.urlid])
self.check_can_add('/readonlyposts/', 403)
self.check_can_change(post.urlid, 403)
def test_readandcreate(self):
post = ReadAndCreatePost.objects.create(content = "test post")
self.check_can_view('/readandcreateposts/', [post.urlid])
self.check_can_add('/readandcreateposts/')
self.check_can_change(post.urlid, 403)
def test_owner_permissions(self):
self.authenticate()
them = get_user_model().objects.create_user(username='them', email='them@user.com', password='itstheirsecret')
mine = OwnedResource.objects.create(description="Mine!", user=self.user)
theirs = OwnedResource.objects.create(description="Theirs", user=them)
noones = OwnedResource.objects.create(description="I belong to NO ONE!")
self.check_can_view('/ownedresources/', [mine.urlid]) #checks I can access mine and only mine
self.check_can_change(mine.urlid)
self.check_can_view_one(theirs.urlid, 404)
self.check_can_change(theirs.urlid, 404)
self.check_can_view_one(noones.urlid, 404)
self.check_can_change(noones.urlid, 404)
def check_permissions(self, obj, group, required_perms):
perms = GroupObjectPermission.objects.filter(group=group)
for perm in perms:
self.assertEqual(perm.content_type.model, obj._meta.model_name)
self.assertEqual(perm.object_pk, str(obj.pk))
self.assertEqual(set(perms.values_list('permission__codename', flat=True)),
{f'{perm}_{obj._meta.model_name}' for perm in required_perms})
def create_circles(self):
self.authenticate()
self.user.user_permissions.add(Permission.objects.get(codename='view_restrictedcircle'))
them = get_user_model().objects.create_user(username='them', email='them@user.com', password='itstheirsecret')
mine = RestrictedCircle.objects.create(name="mine", description="Mine!", owner=self.user)
theirs = RestrictedCircle.objects.create(name="theirs", description="Theirs", owner=them)
noones = RestrictedCircle.objects.create(name="no one's", description="I belong to NO ONE!")
return mine, theirs, noones
def test_role_permissions(self):
mine, theirs, noones = self.create_circles()
self.assertIn(self.user, mine.members.user_set.all())
self.assertIn(self.user, mine.admins.user_set.all())
self.assertNotIn(self.user, theirs.members.user_set.all())
self.assertNotIn(self.user, theirs.admins.user_set.all())
self.assertNotIn(self.user, noones.members.user_set.all())
self.assertNotIn(self.user, noones.admins.user_set.all())
self.check_can_view('/restrictedcircles/', [mine.urlid]) #check filtering
self.check_permissions(mine, mine.members, RestrictedCircle._meta.permission_roles['members']['perms'])
self.check_permissions(mine, mine.admins, RestrictedCircle._meta.permission_roles['admins']['perms'])
def test_inherit_permissions(self):
mine, theirs, noones = self.create_circles()
myresource = RestrictedResource.objects.create(content="mine", circle=mine)
their_resource = RestrictedResource.objects.create(content="theirs", circle=theirs)
noones_resource = RestrictedResource.objects.create(content="noones", circle=noones)
self.check_can_view('/restrictedresources/', [myresource.urlid])
self.check_can_change(myresource.urlid)
self.check_can_change(their_resource.urlid, 404)
self.check_can_change(noones_resource.urlid, 404)
def test_inherit_several_permissions(self):
mine, theirs, noones = self.create_circles()
ro_resource = ReadOnlyPost.objects.create(content="read only")
myresource = DoubleInheritModel.objects.create(content="mine", circle=mine, ro_ancestor=None)
some = DoubleInheritModel.objects.create(content="some", circle=theirs, ro_ancestor=ro_resource)
other = DoubleInheritModel.objects.create(content="other", circle=noones, ro_ancestor=None)
self.check_can_view('/doubleinheritmodels/', [myresource.urlid, some.urlid])
self.check_can_change(myresource.urlid)
self.check_can_change(some.urlid, 403)
self.check_can_change(other.urlid, 404)
def test_inherit_permissions_none(self):
id = self.check_can_add('/doubleinheritmodels/')
resource = DoubleInheritModel.objects.get(urlid=id)
self.check_can_view('/doubleinheritmodels/', [resource.urlid])
circle = RestrictedCircle.objects.create()
id = self.check_can_add('/doubleinheritmodels/', 404, extra_content={'https://cdn.startinblox.com/owl#circle': {'@id': circle.urlid}})
def test_and_permissions(self):
self.authenticate()
abc = ANDPermissionsDummy.objects.create(title='ABC')
youpi = ANDPermissionsDummy.objects.create(title='youpi woopaa')
wonder = ANDPermissionsDummy.objects.create(title='A Wonderful World!!')
plop = ANDPermissionsDummy.objects.create(title='plop')
self.check_can_view('/andpermissionsdummys/', [wonder.urlid], 403)
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.user.username = 'toto'
self.user.save()
self.check_can_view('/andpermissionsdummys/', [wonder.urlid])
self.check_can_view_one(abc.urlid, 404)
self.check_can_view_one(youpi.urlid, 404)
self.check_can_view_one(plop.urlid, 404)
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.check_can_change(youpi.urlid, 403, field='title')
def test_or_permissions(self):
self.authenticate()
abc = ORPermissionsDummy.objects.create(title='ABC')
youpi = ORPermissionsDummy.objects.create(title='youpi woopaa')
wonder = ORPermissionsDummy.objects.create(title='A Wonderful World!!')
plop = ORPermissionsDummy.objects.create(title='plop')
self.check_can_view('/orpermissionsdummys/', [abc.urlid, wonder.urlid])
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.user.username = 'toto'
self.user.save()
self.check_can_view('/orpermissionsdummys/', [abc.urlid, youpi.urlid, wonder.urlid])
self.check_can_view_one(plop.urlid, 404)
self.check_can_add('/orpermissionsdummys/', field='title')
self.check_can_change(wonder.urlid, field='title')
self.check_can_change(plop.urlid, 404, field='title')
\ No newline at end of file
from django.contrib.auth import get_user_model
from django.test import TestCase
from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.test import APIClient, APIRequestFactory
from rest_framework.utils import json
from djangoldp.models import Model
from djangoldp.serializers import LDPSerializer, LDListMixin
from djangoldp.tests.models import Skill, JobOffer, Invoice, LDPDummy, Resource, Post, Circle, Project
from djangoldp.tests.models import (Circle, Invoice, LDPDummy, Post, Project,
Resource, Space)
class Save(TestCase):
class PostTestCase(TestCase):
def setUp(self):
self.factory = APIRequestFactory()
......@@ -16,166 +16,19 @@ class Save(TestCase):
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
self.client.force_authenticate(self.user)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
def tearDown(self):
pass
def test_save_m2m_graph_with_many_nested(self):
invoice = {
"@graph": [
{
"@id": "./",
"batches": {"@id": "_:b381"},
"title": "Nouvelle facture",
"date": ""
},
{
"@id": "_:b381",
"tasks": {"@id": "_:b382"},
"title": "Batch 1"
},
{
"@id": "_:b382",
"title": "Tache 1"
}
]
}
meta_args = {'model': Invoice, 'depth': 2, 'fields': ("@id", "title", "batches", "date")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('InvoiceSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=invoice)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "Nouvelle facture")
self.assertIs(result.batches.count(), 1)
self.assertEquals(result.batches.all()[0].title, "Batch 1")
self.assertIs(result.batches.all()[0].tasks.count(), 1)
self.assertEquals(result.batches.all()[0].tasks.all()[0].title, "Tache 1")
def test_save_m2m(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2")
job = {"title": "job test",
"slug": "slug1",
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug), "title": "skill2 UP"},
{"title": "skill3", "obligatoire": "obligatoire", "slug": "slug3"},
]}
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 3)
self.assertEquals(result.skills.all()[0].title, "skill1") # no change
self.assertEquals(result.skills.all()[1].title, "skill2 UP") # title updated
self.assertEquals(result.skills.all()[2].title, "skill3") # creation on the fly
def test_save_m2m_graph_simple(self):
job = {"@graph": [
{"title": "job test", "slug": "slugjob",
},
]}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 0)
def test_save_m2m_graph_with_nested(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b")
job = {"@graph": [
{"title": "job test",
"slug": "slugj",
"skills": {"@id": "_.123"}
},
{"@id": "_.123", "title": "skill3 NEW", "obligatoire": "obligatoire", "slug": "skill3"},
]}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 1)
self.assertEquals(result.skills.all()[0].title, "skill3 NEW") # creation on the fly
def test_save_without_nested_fields(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="a")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="b")
job = {"title": "job test", "slug": "c"}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 0)
def test_save_on_sub_iri(self):
"""
POST /job-offers/1/skills/
"""
job = JobOffer.objects.create(title="job test")
skill = {"title": "new SKILL"}
meta_args = {'model': Skill, 'depth': 2, 'fields': ("@id", "title")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('SkillSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=skill)
serializer.is_valid()
kwargs = {}
kwargs['joboffer'] = job
result = serializer.save(**kwargs)
self.assertEquals(result.title, "new SKILL")
self.assertIs(result.joboffer_set.count(), 1)
self.assertEquals(result.joboffer_set.get(), job)
self.assertIs(result.joboffer_set.get().skills.count(), 1)
def test_save_fk_graph_with_nested(self):
post = {
'@graph': [
{
'http://happy-dev.fr/owl/#title': "title",
'http://happy-dev.fr/owl/#invoice': {
'https://cdn.startinblox.com/owl#title': "title",
'https://cdn.startinblox.com/owl#invoice': {
'@id': "_.123"
}
},
{
'@id': "_.123",
'http://happy-dev.fr/owl/#title': "title 2"
'https://cdn.startinblox.com/owl#title': "title 2"
}
]
}
......@@ -183,17 +36,17 @@ class Save(TestCase):
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEquals(response.data['title'], "title")
self.assertEquals(response.data['invoice']['title'], "title 2")
self.assertEqual(response.data['title'], "title")
self.assertEqual(response.data['invoice']['title'], "title 2")
def test_save_fk_graph_with_existing_nested(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'@graph': [
{
'http://happy-dev.fr/owl/#title': "title",
'http://happy-dev.fr/owl/#invoice': {
'@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)
'https://cdn.startinblox.com/owl#title': "title",
'https://cdn.startinblox.com/owl#invoice': {
'@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)
}
}
]
......@@ -202,27 +55,27 @@ class Save(TestCase):
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEquals(response.data['title'], "title")
self.assertEquals(response.data['invoice']['title'], "title 3")
self.assertEqual(response.data['title'], "title")
self.assertEqual(response.data['invoice']['title'], "title 3")
def test_post_should_accept_missing_field_id_nullable(self):
body = [
{
'@id': "./",
'http://happy-dev.fr/owl/#content': "post update",
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('peer_user', response.data)
body = [
{
'@id': "./",
'https://cdn.startinblox.com/owl#content': "post update",
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('peer_user', response.data)
def test_post_should_accept_empty_field_if_nullable(self):
body = [
{
'@id': "./",
'http://happy-dev.fr/owl/#content': "post update",
'http://happy-dev.fr/owl/#peer_user': ""
'https://cdn.startinblox.com/owl#content': "post update",
'https://cdn.startinblox.com/owl#peer_user': ""
}
]
response = self.client.post('/posts/', data=json.dumps(body),
......@@ -236,17 +89,17 @@ class Save(TestCase):
body = [
{
'@id': "_:b216",
'http://happy-dev.fr/owl/#description': "user update",
'http://happy-dev.fr/owl/#ddummy': {
"@id": "https://happy-dev.fr{}{}/".format(Model.container_id(dummy), dummy.id)
'https://cdn.startinblox.com/owl#description': "user update",
'https://cdn.startinblox.com/owl#ddummy': {
"@id": "http://happy-dev.fr{}{}/".format(Model.container_id(dummy), dummy.id)
}
},
{
'@id': './',
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "_:b216"}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "_:b216"}
}
]
response = self.client.post('/users/', data=json.dumps(body),
......@@ -264,7 +117,7 @@ class Save(TestCase):
}
],
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
......@@ -274,23 +127,22 @@ class Save(TestCase):
def test_nested_container(self):
resource = Resource.objects.create()
body = {
'http://happy-dev.fr/owl/#title': "new job",
'http://happy-dev.fr/owl/#slug': "job1",
'https://cdn.startinblox.com/owl#title': "new job",
'https://cdn.startinblox.com/owl#slug': "job1",
}
response = self.client.post('/resources/{}/joboffers/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['resources']['ldp:contains'][0]['@id'],
"http://testserver/resources/{}/".format(resource.pk))
self.assertEqual(response.data['resources']['ldp:contains'][0]['@id'], resource.urlid)
self.assertEqual(response.data['title'], "new job")
def test_nested_container_bis(self):
invoice = Invoice.objects.create()
body = {
'http://happy-dev.fr/owl/#title': "new batch",
'https://cdn.startinblox.com/owl#title': "new batch",
}
response = self.client.post('/invoices/{}/batches/'.format(invoice.pk),
......@@ -301,38 +153,20 @@ class Save(TestCase):
"http://happy-dev.fr/invoices/{}/".format(invoice.pk))
self.assertEqual(response.data['title'], "new batch")
def test_nested_container_ter(self):
circle = Circle.objects.create()
body = {
'user' : {
"username" : "hubl-workaround-493"
},
# 'circle' : {},
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
}
}
response = self.client.post('/circles/{}/members/'.format(circle.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['circle']['@id'],
"http://testserver/circles/{}/".format(circle.pk))
def test_nested_container_federated(self):
resource = Resource.objects.create()
body = {
'http://happy-dev.fr/owl/#@id': "http://external.job/job/1",
'https://cdn.startinblox.com/owl#@id': "http://external.job/job/1",
}
response = self.client.post('/resources/{}/joboffers/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['resources']['ldp:contains'][0]['@id'],
"http://testserver/resources/{}/".format(resource.pk))
self.assertEqual(response.data['@id'], "http://external.job/job/1")
self.assertIn('@type', response.data)
response = self.client.get('/resources/{}/'.format(resource.pk))
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], "http://external.job/job/1")
def test_embedded_context_2(self):
body = {
......@@ -340,7 +174,7 @@ class Save(TestCase):
'content': "post update",
'peer_user': "",
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
......@@ -354,7 +188,7 @@ class Save(TestCase):
'content': "post update",
'peer_user': "",
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
......@@ -370,7 +204,7 @@ class Save(TestCase):
'content': "post update",
'peer_user': {'none': None},
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
......@@ -381,13 +215,76 @@ class Save(TestCase):
def test_nested_container_user_federated(self):
project = Project.objects.create()
body = {
'http://happy-dev.fr/owl/#@id': "http://external.user/user/1/",
'https://cdn.startinblox.com/owl#@id': "http://external.user/user/1/",
}
response = self.client.post('/projects/{}/team/'.format(project.pk),
response = self.client.post('/projects/{}/members/'.format(project.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['projects']['ldp:contains'][0]['@id'],
"http://testserver/projects/{}/".format(project.pk))
self.assertEqual(response.data['@id'], "http://external.user/user/1/")
self.assertIn('@type', response.data)
response = self.client.get('/projects/{}/'.format(project.pk))
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'], "http://external.user/user/1/")
# https://www.w3.org/TR/json-ld/#value-objects
def test_post_field_with_value_object(self):
post = {
'https://cdn.startinblox.com/owl#title': {
'@value': "title",
'@language': "en"
}
}
response = self.client.post('/invoices/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['title'], "title")
# from JSON-LD spec: "The value associated with the @value key MUST be either a string, a number, true, false or null"
def test_save_field_with_invalid_value_object(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'https://cdn.startinblox.com/owl#invoice': {
'@value': {'title': 'title',
'@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)}
}
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 400)
# TODO: bug with PyLD: https://github.com/digitalbazaar/pyld/issues/142
# from JSON-LD spec: "If the value associated with the @type key is @json, the value MAY be either an array or an object"
'''
def test_save_field_with_object_value_object(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'https://cdn.startinblox.com/owl#invoice': {
'@value': {'title': 'title', '@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)},
'@type': '@json'
}
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
'''
# the below test is necessary because of an obscure bug where the OneToOne field is successfully applied
# during the life of the serializer (and response) but is not persisted in the database,
# when it is posted onto the reverse relation
def test_one_to_one_field_reverse_post(self):
self.assertEqual(Circle.objects.count(), 0)
self.assertEqual(Space.objects.count(), 0)
body = {
'@context': {'@vocab': "https://cdn.startinblox.com/owl#" },
'space': {'name': "Etablissement"}
}
response = self.client.post('/circles/', data=json.dumps(body), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(Circle.objects.count(), 1)
self.assertEqual(Space.objects.count(), 1)
circle = Circle.objects.all()[0]
space = circle.space
self.assertIsNotNone(space)
self.assertIsNotNone(space.circle)
from django.conf import settings
from django.test import TestCase
class TestSettings(TestCase):
def test_inexistent_returns_default(self):
"""Assert a inexistent key returns the provided default value."""
assert getattr(settings, 'INEXISTENT', 'something') == 'something'
def test_only_in_core_config(self):
"""Asserts values defined only in core config."""
assert settings.DEBUG == False
def test_only_in_package(self):
"""Asserts default settings defined in the package."""
assert settings.MYPACKAGEVAR == "ok"
def test_only_in_user_config(self):
"""Asserts LDP packages are loaded from YAML file."""
assert 'djangoldp.tests' in settings.DJANGOLDP_PACKAGES
def test_overrided_core_by_package_config(self):
assert settings.USE_I18N == False
def test_overrided_package_by_user_config(self):
assert settings.USE_TZ == False
def test_overrided_core_by_user_config(self):
"""Asserts values overrided from user configuration."""
assert settings.EMAIL_HOST == 'somewhere'
def test_installed_apps_resolution(self):
"""Asserts LDP packages are referenced along with default installed apps."""
# test inclusion from server YAML settings
assert 'djangoldp.tests' in settings.INSTALLED_APPS
# test inclusion from ldppackage settings
assert 'djangoldp.tests.dummy.apps.DummyConfig' in settings.INSTALLED_APPS
# test inclusion from default core settings
assert 'djangoldp' in settings.INSTALLED_APPS
# test deduplication of dummy app
assert settings.INSTALLED_APPS.count('djangoldp.tests.dummy.apps.DummyConfig') == 1
# FIXME: We should check the order
def test_reference_middleware(self):
"""Asserts middlewares added in packages are added to the settings."""
assert 'djangoldp.tests.dummy.middleware.DummyMiddleware' in settings.MIDDLEWARE
def test_extra_module(self):
#FIXME
pass
import uuid
from django.conf import settings
from django.contrib.auth import get_user_model
from django.test import TestCase
from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.test import APIClient, APIRequestFactory
from rest_framework.utils import json
from djangoldp.serializers import LDPSerializer, LDListMixin
from djangoldp.tests.models import Post, UserProfile, Resource, Circle
from djangoldp.tests.models import Skill, JobOffer, Conversation, Message, Project
from djangoldp.tests.models import (Batch, Conversation, Invoice, JobOffer,
NotificationSetting, Project, Resource,
Skill, Task, UserProfile)
class Update(TestCase):
......@@ -17,268 +19,61 @@ class Update(TestCase):
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
self.client.force_authenticate(user=self.user)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
def tearDown(self):
pass
def test_update(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3")
job1 = JobOffer.objects.create(title="job test")
job1.skills.add(skill)
job = {"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"ldp:contains": [
{"title": "new skill", "obligatoire": "okay"},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug), "title": "skill2 UP"},
]}
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
skills = result.skills.all().order_by('title')
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
def test_update_graph(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3")
job1 = JobOffer.objects.create(title="job test", slug="slug4")
job1.skills.add(skill)
job = {"@graph":
[
{
"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug)},
{"@id": "_.123"},
]}
},
{
"@id": "_.123",
"title": "new skill",
"obligatoire": "okay"
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug),
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug),
"title": "skill2 UP"
}
]
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
skills = result.skills.all().order_by('title')
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
def test_update_graph_2(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2")
job1 = JobOffer.objects.create(title="job test", slug="slug1")
job1.skills.add(skill)
job = {"@graph":
[
{
"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"@id": "https://happy-dev.fr/job-offers/{}/skills/".format(job1.slug)
}
},
{
"@id": "_.123",
"title": "new skill",
"obligatoire": "okay"
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug),
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug),
"title": "skill2 UP"
},
{
'@id': "https://happy-dev.fr/job-offers/{}/skills/".format(job1.slug),
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/326
'''
def test_update_container_append_resource(self):
pre_existing_skill_a = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug1")
pre_existing_skill_b = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug2")
job = JobOffer.objects.create(title="job test")
job.skills.add(pre_existing_skill_a)
job.skills.add(pre_existing_skill_b)
post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug),
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug)},
{"@id": "_.123"},
]
{"title": "new skill", "obligatoire": "okay"},
{"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill_b.slug), "title": "z"},
]}
}
]
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
skills = result.skills.all().order_by('title')
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
self.assertEquals(skill, skill._meta.model.objects.get(pk=skill.pk)) # title updated
def test_update_list_with_reverse_relation(self):
user1 = get_user_model().objects.create()
conversation = Conversation.objects.create(description="Conversation 1", author_user=user1)
message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1)
message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1)
json = {"@graph": [
{
"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk),
"text": "Message 1 UP"
},
{
"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk),
"text": "Message 2 UP"
},
{
'@id': "https://happy-dev.fr/conversations/{}/".format(conversation.pk),
'description': "Conversation 1 UP",
"message_set": [
{"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk)},
{"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk)},
]
}
]
}
meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=json, instance=conversation)
serializer.is_valid()
result = serializer.save()
messages = result.message_set.all().order_by('text')
self.assertEquals(result.description, "Conversation 1 UP")
self.assertIs(result.message_set.count(), 2)
self.assertEquals(messages[0].text, "Message 1 UP")
self.assertEquals(messages[1].text, "Message 2 UP")
def test_add_new_element_with_foreign_key_id(self):
user1 = get_user_model().objects.create()
conversation = Conversation.objects.create(description="Conversation 1", author_user=user1)
message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1)
message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1)
json = {"@graph": [
{
"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk),
"text": "Message 1 UP",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk),
"text": "Message 2 UP",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
"@id": "_:b1",
"text": "Message 3 NEW",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
'@id': "https://happy-dev.fr/conversations/{}/".format(conversation.pk),
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
},
'description': "Conversation 1 UP",
'message_set': {
"@id": "https://happy-dev.fr/conversations/{}/message_set/".format(conversation.pk)
}
},
{
'@id': "https://happy-dev.fr/conversations/{}/message_set/".format(conversation.pk),
"ldp:contains": [
{"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk)},
{"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk)},
{"@id": "_:b1"}
]
}
]
}
meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=json, instance=conversation)
serializer.is_valid()
result = serializer.save()
messages = result.message_set.all().order_by('text')
response = self.client.patch('/job-offers/{}/'.format(job.slug),
data=json.dumps(post),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEquals(result.description, "Conversation 1 UP")
self.assertIs(result.message_set.count(), 3)
self.assertEquals(messages[0].text, "Message 1 UP")
self.assertEquals(messages[1].text, "Message 2 UP")
self.assertEquals(messages[2].text, "Message 3 NEW")
self.assertEqual(response.data['title'], job.title)
self.assertIs(job.skills.count(), 3)
skills = job.skills.all().order_by('title')
self.assertEqual(skills[0].title, "new skill") # new skill
self.assertEqual(skills[1].title, pre_existing_skill_a.title) # old skill unchanged
self.assertEqual(skills[2].title, "z") # updated
self.assertEqual(skills[2].obligatoire, pre_existing_skill_b.obligatoire) # another field not updated
'''
def test_put_resource(self):
post = Post.objects.create(content="content")
skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1')
body = [{
'@id': 'http://testserver.com/posts/{}/'.format(post.pk),
'http://happy-dev.fr/owl/#content': "post content"}]
response = self.client.put('/posts/{}/'.format(post.pk), data=json.dumps(body),
'@id': '{}/skills/{}/'.format(settings.BASE_URL, skill.slug),
'https://cdn.startinblox.com/owl#title': "new", 'https://cdn.startinblox.com/owl#obligatoire': "new"}]
response = self.client.put('/skills/{}/'.format(skill.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEquals(response.data['content'], "post content")
self.assertIn('location', response._headers)
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['obligatoire'], "new")
self.assertIn('location', response.headers)
def test_patch_resource(self):
skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1')
body = {
'@id': '{}/skills/{}'.format(settings.BASE_URL, skill.slug),
'https://cdn.startinblox.com/owl#title': 'new'
}
response = self.client.patch('/skills/{}/'.format(skill.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['obligatoire'], "original")
def test_create_sub_object_in_existing_object_with_existing_reverse_1to1_relation(self):
user = get_user_model().objects.create(username="alex", password="test")
......@@ -286,14 +81,14 @@ class Update(TestCase):
body = [
{
'@id': "/userprofiles/{}/".format(profile.pk),
'http://happy-dev.fr/owl/#description': "user update"
'https://cdn.startinblox.com/owl#description': "user update"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "/userprofiles/{}/".format(profile.pk)}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "/userprofiles/{}/".format(profile.pk)}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -301,6 +96,22 @@ class Update(TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn('userprofile', response.data)
def test_put_nonexistent_local_resource(self):
job = JobOffer.objects.create(title="job test")
# contains internal urlid which refers to non-existent resource
body = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug),
"skills": {
"ldp:contains": [
{"@id": "{}/skills/404/".format(settings.BASE_URL)},
]}
}
response = self.client.put('/job-offers/{}/'.format(job.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(Skill.objects.count(), 0)
def test_create_sub_object_in_existing_object_with_reverse_fk_relation(self):
"""
Doesn't work with depth = 0 on UserProfile Model. Should it be ?
......@@ -309,14 +120,14 @@ class Update(TestCase):
body = [
{
'@id': "_:b975",
'http://happy-dev.fr/owl/#description': "conversation description"
'https://cdn.startinblox.com/owl#description': "conversation description"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#conversation_set': {'@id': "_:b975"}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#conversation_set': {'@id': "_:b975"}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -330,14 +141,14 @@ class Update(TestCase):
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update"
'https://cdn.startinblox.com/owl#description': "conversation update"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#conversation_set': {'@id': "/conversations/{}/".format(conversation.pk)}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#conversation_set': {'@id': "/conversations/{}/".format(conversation.pk)}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -352,7 +163,7 @@ class Update(TestCase):
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update",
'https://cdn.startinblox.com/owl#description': "conversation update",
}
]
response = self.client.put('/conversations/{}/'.format(conversation.pk), data=json.dumps(body),
......@@ -367,8 +178,8 @@ class Update(TestCase):
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update",
'http://happy-dev.fr/owl/#peer_user': ""
'https://cdn.startinblox.com/owl#description': "conversation update",
'https://cdn.startinblox.com/owl#peer_user': ""
}
]
response = self.client.put('/conversations/{}/'.format(conversation.pk), data=json.dumps(body),
......@@ -380,12 +191,12 @@ class Update(TestCase):
resource = Resource.objects.create()
job = JobOffer.objects.create(title="first title", slug="job")
body = {
'http://happy-dev.fr/owl/#joboffers':
'https://cdn.startinblox.com/owl#joboffers':
{
'@id': "http://testserver.com/resources/{}/joboffers/".format(resource.pk),
'@id': "{}/resources/{}/joboffers/".format(settings.BASE_URL, resource.pk),
'ldp:contains': [
{'@id': 'http://testserver.com/job-offers/{}/'.format(job.slug),
'http://happy-dev.fr/owl/#title': "new job",
{'@id': job.urlid,
'https://cdn.startinblox.com/owl#title': "new job",
},
]
}
......@@ -395,16 +206,15 @@ class Update(TestCase):
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://testserver.com/job-offers/{}/".format(job.slug))
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], job.urlid)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "new job")
def test_m2m_new_link_embedded(self):
resource = Resource.objects.create()
body = {
'http://happy-dev.fr/owl/#joboffers': {
'http://happy-dev.fr/owl/#slug': 'aaa',
'http://happy-dev.fr/owl/#title': "new job",
'https://cdn.startinblox.com/owl#joboffers': {
'https://cdn.startinblox.com/owl#slug': 'aaa',
'https://cdn.startinblox.com/owl#title': "new job",
}
}
......@@ -422,12 +232,12 @@ class Update(TestCase):
resource.joboffers.add(job)
resource.save()
body = {
'http://happy-dev.fr/owl/#joboffers': {
'https://cdn.startinblox.com/owl#joboffers': {
# '@id': "http://testserver/resources/{}/joboffers/".format(resource.pk),
'ldp:contains': [
{
'@id': 'http://testserver.com/job-offers/{}/'.format(job.slug),
'http://happy-dev.fr/owl/#title': "new job",
'@id': job.urlid,
'https://cdn.startinblox.com/owl#title': "new job",
}
]
}
......@@ -437,30 +247,14 @@ class Update(TestCase):
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://testserver.com/job-offers/{}/".format(job.slug))
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], job.urlid)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "new job")
def test_m2m_new_link_federated(self):
resource = Resource.objects.create()
body = {
'http://happy-dev.fr/owl/#joboffers': {
'http://happy-dev.fr/owl/#@id': 'http://external.job/job/1',
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://external.job/job/1")
def test_m2m_new_link_external(self):
resource = Resource.objects.create()
body = {
'http://happy-dev.fr/owl/#joboffers': {
'@id': 'http://testserver.com/job-offers/stuff/',
'https://cdn.startinblox.com/owl#joboffers': {
'https://cdn.startinblox.com/owl#@id': 'http://external.job/job/1',
}
}
......@@ -469,13 +263,14 @@ class Update(TestCase):
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://testserver.com/job-offers/stuff/")
"http://external.job/job/1")
self.assertIn('@type', response.data['joboffers']['ldp:contains'][0])
def test_m2m_new_link_local(self):
resource = Resource.objects.create()
job = JobOffer.objects.create(title="first title", slug="job")
body = {
'http://happy-dev.fr/owl/#joboffers': {
'https://cdn.startinblox.com/owl#joboffers': {
'@id': 'http://happy-dev.fr/job-offers/{}/'.format(job.slug),
}
}
......@@ -494,8 +289,8 @@ class Update(TestCase):
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update",
'http://happy-dev.fr/owl/#peer_user': {
'https://cdn.startinblox.com/owl#description': "conversation update",
'https://cdn.startinblox.com/owl#peer_user': {
'@id': 'http://happy-dev.fr/users/{}'.format(self.user.pk),
}
}
......@@ -514,9 +309,9 @@ class Update(TestCase):
def test_m2m_user_link_federated(self):
project = Project.objects.create(description="project name")
body = {
'http://happy-dev.fr/owl/#description': 'project name',
'http://happy-dev.fr/owl/#team': {
'http://happy-dev.fr/owl/#@id': 'http://external.user/user/1',
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
'https://cdn.startinblox.com/owl#@id': 'http://external.user/user/1',
}
}
......@@ -524,16 +319,18 @@ class Update(TestCase):
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['team']['ldp:contains'][0]['@id'],
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'],
"http://external.user/user/1")
self.assertIn('@type', response.data['members']['ldp:contains'][0])
self.assertEqual(len(response.data['members']['ldp:contains'][0].items()), 2)
def test_m2m_user_link_existing_external(self):
project = Project.objects.create(description="project name")
ext_user = get_user_model().objects.create(username=str(uuid.uuid4()), urlid='http://external.user/user/1')
body = {
'http://happy-dev.fr/owl/#description': 'project name',
'http://happy-dev.fr/owl/#team': {
'http://happy-dev.fr/owl/#@id': ext_user.urlid,
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
'https://cdn.startinblox.com/owl#@id': ext_user.urlid,
}
}
......@@ -541,10 +338,12 @@ class Update(TestCase):
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['team']['ldp:contains'][0]['@id'], ext_user.urlid)
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'], ext_user.urlid)
self.assertIn('@type', response.data['members']['ldp:contains'][0])
self.assertEqual(len(response.data['members']['ldp:contains'][0].items()), 2)
project = Project.objects.get(pk=project.pk)
self.assertEqual(project.team.count(), 1)
self.assertEqual(project.members.count(), 1)
user = get_user_model().objects.get(pk=ext_user.pk)
self.assertEqual(user.projects.count(), 1)
......@@ -557,17 +356,17 @@ class Update(TestCase):
body = [
{
'@id': "_:b975",
'http://happy-dev.fr/owl/#description': "user description",
'http://happy-dev.fr/owl/#dummy': {
'https://cdn.startinblox.com/owl#description': "user description",
'https://cdn.startinblox.com/owl#dummy': {
'@id': './'
}
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "_:b975"}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "_:b975"}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -579,11 +378,11 @@ class Update(TestCase):
def test_m2m_user_link_remove_existing_link(self):
ext_user = get_user_model().objects.create(username=str(uuid.uuid4()), urlid='http://external.user/user/1')
project = Project.objects.create(description="project name")
project.team.add(ext_user)
project.members.add(ext_user)
project.save()
body = {
'http://happy-dev.fr/owl/#description': 'project name',
'http://happy-dev.fr/owl/#team': {
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
}
}
......@@ -593,7 +392,7 @@ class Update(TestCase):
self.assertEqual(response.status_code, 200)
project = Project.objects.get(pk=project.pk)
self.assertEqual(project.team.count(), 0)
self.assertEqual(project.members.count(), 0)
user = get_user_model().objects.get(pk=ext_user.pk)
self.assertEqual(user.projects.count(), 0)
......@@ -607,11 +406,11 @@ class Update(TestCase):
"last_name": "Bourlier",
"username": "alex",
'userprofile': {
'@id': "http://happy-dev.fr/userprofiles/{}/".format(profile.pk),
'@id': profile.urlid,
'description': "user update"
},
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
......@@ -622,6 +421,165 @@ class Update(TestCase):
response = self.client.get('/userprofiles/{}/'.format(profile.pk),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['description'], "user update")
# unit tests for a specific bug: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/307
def test_direct_boolean_field(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True,
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/notificationsettings/{}/'.format(setting.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['receiveMail'], True)
def test_nested_container_boolean_field_no_slug(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'settings': {
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True
},
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/userprofiles/{}/'.format(profile.slug),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['settings']['receiveMail'], True)
# variation where the lookup_field for NotificationSetting (pk) is provided
def test_nested_container_boolean_field_with_slug(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'settings': {
'pk': setting.pk,
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True
},
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/userprofiles/{}/'.format(profile.slug),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['settings']['receiveMail'], True)
def test_update_container_twice_nested_view(self):
invoice = Invoice.objects.create(title='test')
pre_existing_batch = Batch.objects.create(title='batch1', invoice=invoice)
pre_existing_task = Task.objects.create(title='task1', batch=pre_existing_batch)
base_url = settings.BASE_URL
body = {
"@id": "{}/invoices/{}/".format(base_url, invoice.pk),
"https://cdn.startinblox.com/owl#title": "new",
"https://cdn.startinblox.com/owl#batches": [
{
"@id": "{}/batchs/{}/".format(base_url, pre_existing_batch.pk),
"https://cdn.startinblox.com/owl#title": "new",
"https://cdn.startinblox.com/owl#tasks": [
{
"@id": "{}/tasks/{}/".format(base_url, pre_existing_task.pk),
"https://cdn.startinblox.com/owl#title": "new"
},
{
"https://cdn.startinblox.com/owl#title": "tache 2"
}
]
},
{
"https://cdn.startinblox.com/owl#title": "z",
}
]
}
response = self.client.put('/invoices/{}/'.format(invoice.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['@id'], invoice.urlid)
invoice = Invoice.objects.get(pk=invoice.pk)
self.assertIs(invoice.batches.count(), 2)
batches = invoice.batches.all().order_by('title')
self.assertEqual(batches[0].title, "new")
self.assertEqual(batches[0].urlid, pre_existing_batch.urlid)
self.assertEqual(batches[1].title, "z")
self.assertIs(batches[0].tasks.count(), 2)
tasks = batches[0].tasks.all().order_by('title')
self.assertEqual(tasks[0].title, "new")
self.assertEqual(tasks[0].pk, pre_existing_task.pk)
self.assertEqual(tasks[1].title, "tache 2")
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/333
'''def test_update_container_nested_view(self):
circle = Circle.objects.create(name='test')
pre_existing = CircleMember.objects.create(user=self.user, circle=circle, is_admin=False)
another_user = get_user_model().objects.create_user(username='u2', email='u2@b.com', password='pw')
body = {
"@id": "{}/circles/{}/".format(settings.BASE_URL, circle.pk),
"https://cdn.startinblox.com/owl#name": "Updated Name",
"https://cdn.startinblox.com/owl#members": {
"ldp:contains": [
{"@id": "{}/circle-members/{}/".format(settings.BASE_URL, pre_existing.pk),
"https://cdn.startinblox.com/owl#is_admin": True},
{"https://cdn.startinblox.com/owl#user": {"@id": another_user.urlid},
"https://cdn.startinblox.com/owl#is_admin": False},
]
}
}
response = \
self.client.put('/circles/{}/'.format(circle.pk), data=json.dumps(body), content_type='application/ld+json')
print(str(self.user.urlid))
print(str(response.data))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['name'], circle.name)
self.assertEqual(response.data['@id'], circle.urlid)
self.assertIs(CircleMember.objects.count(), 2)
self.assertIs(circle.members.count(), 2)
self.assertIs(circle.team.count(), 2)
members = circle.members.all().order_by('pk')
self.assertEqual(members[0].user, self.user)
self.assertEqual(members[0].urlid, pre_existing.urlid)
self.assertEqual(members[0].pk, pre_existing.pk)
self.assertEqual(members[0].is_admin, True)
self.assertEqual(members[1].user, another_user)
self.assertEqual(members[1].is_admin, False)'''
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission, Group
from django.conf import settings
from django.test import override_settings
from rest_framework.test import APIClient, APITestCase
from .models import JobOffer
from djangoldp.tests.models import JobOffer, LDPDummy, PermissionlessDummy, UserProfile, OwnedResource, \
OwnedResourceNestedOwnership, OwnedResourceTwiceNestedOwnership
import json
class TestUserPermissions(APITestCase):
class UserPermissionsTestCase(APITestCase):
def setUp(self):
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion')
self.client = APIClient(enforce_csrf_checks=True)
self.client.force_authenticate(user=self.user)
self.job = JobOffer.objects.create(title="job", slug="slug1")
class TestUserPermissions(UserPermissionsTestCase):
def setUpGroup(self):
self.group = Group.objects.create(name='Test')
view_perm = Permission.objects.get(codename='view_permissionlessdummy')
self.group.permissions.add(view_perm)
self.group.save()
def _make_self_superuser(self):
self.user.is_superuser = True
self.user.save()
# list - simple
def test_get_for_authenticated_user(self):
response = self.client.get('/job-offers/')
self.assertEqual(response.status_code, 200)
# test serialized permissions
self.assertIn('view', response.data['permissions'])
self.assertNotIn('inherit', response.data['permissions'])
# self.assertNotIn('delete', response.data['permissions'])
# TODO: list - I do not have permission from the model, but I do have permission via a Group I am assigned
# https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291
'''def test_group_list_access(self):
self.setUpGroup()
dummy = PermissionlessDummy.objects.create()
response = self.client.get('/permissionless-dummys/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/permissionless-dummys/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
# repeat of the above test on nested field
def test_group_list_access_nested_field(self):
self.setUpGroup()
parent = LDPDummy.objects.create()
PermissionlessDummy.objects.create(parent=parent)
response = self.client.get('/ldpdummys/{}/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['anons']['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/ldpdummys/{}/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['anons']['ldp:contains']), 1)
# repeat of the test on a nested viewset
def test_group_list_access_nested_viewset(self):
self.setUpGroup()
parent = LDPDummy.objects.create()
PermissionlessDummy.objects.create(parent=parent)
response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
# repeat for object-specific request
def test_group_object_access(self):
self.setUpGroup()
dummy = PermissionlessDummy.objects.create()
response = self.client.get('/permissionless-dummys/{}'.format(dummy))
self.assertEqual(response.status_code, 404)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/permissionless-dummys/{}/'.format(dummy))
self.assertEqual(response.status_code, 200)
# TODO: test for POST scenario
# TODO: test for PUT scenario
# TODO: test for DELETE scenario
'''
@override_settings(SERIALIZE_OBJECT_EXCLUDE_PERMISSIONS=['inherit'])
def test_get_1_for_authenticated_user(self):
response = self.client.get('/job-offers/{}/'.format(self.job.slug))
self.assertEqual(response.status_code, 200)
self.assertIn('view', response.data['permissions'])
self.assertNotIn('inherit', response.data['permissions'])
def test_post_request_for_authenticated_user(self):
post = {'title': "job_created", "slug": 'slug1'}
post = {'https://cdn.startinblox.com/owl#title': "job_created", "https://cdn.startinblox.com/owl#slug": 'slug2'}
response = self.client.post('/job-offers/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
# denied because I don't have model permissions
def test_post_request_denied_model_perms(self):
data = {'https://cdn.startinblox.com/owl#some': 'title'}
response = self.client.post('/permissionless-dummys/', data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, 403)
def test_post_nested_view_authorized(self):
data = { "https://cdn.startinblox.com/owl#title": "new skill", "https://cdn.startinblox.com/owl#obligatoire": "okay" }
response = self.client.post('/job-offers/{}/skills/'.format(self.job.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
def test_post_nested_view_denied_model_perms(self):
parent = LDPDummy.objects.create(some='parent')
data = { "https://cdn.startinblox.com/owl#some": "title" }
response = self.client.post('/ldpdummys/{}/anons/'.format(parent.pk), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 403)
def test_put_request_for_authenticated_user(self):
body = {'title':"job_updated"}
body = {'https://cdn.startinblox.com/owl#title':"job_updated"}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
......@@ -35,3 +156,273 @@ class TestUserPermissions(APITestCase):
response = self.client.patch('/job-offers/' + str(self.job.slug) + "/",
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
def test_put_request_denied_model_perms(self):
dummy = PermissionlessDummy.objects.create(some='some', slug='slug')
data = {'https://cdn.startinblox.com/owl#some': 'new'}
response = self.client.put('/permissionless-dummys/{}/'.format(dummy.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 404)
def test_put_nested_view_denied_model_perms(self):
parent = LDPDummy.objects.create(some='parent')
child = PermissionlessDummy.objects.create(some='child', slug='child', parent=parent)
data = {"https://cdn.startinblox.com/owl#some": "new"}
response = self.client.put('/ldpdummys/{}/anons/{}/'.format(parent.pk, child.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 404)
#TODO: check how this could ever work
# def test_patch_nested_container_attach_existing_resource_permission_denied(self):
# '''I am attempting to add a resource which I should not know exists'''
# parent = LDPDummy.objects.create(some='parent')
# dummy = PermissionlessDummy.objects.create(some='some', slug='slug')
# data = {
# 'https://cdn.startinblox.com/owl#anons': [
# {'@id': '{}/permissionless-dummys/{}/'.format(settings.SITE_URL, dummy.slug), 'https://cdn.startinblox.com/owl#slug': dummy.slug}
# ]
# }
# response = self.client.patch('/ldpdummys/{}/'.format(parent.pk), data=json.dumps(data), content_type='application/ld+json')
# self.assertEqual(response.status_code, 404)
# variations on previous tests with an extra level of depth
# TODO
def test_post_nested_container_twice_nested_permission_denied(self):
pass
# TODO
def test_put_nested_container_twice_nested_permission_denied(self):
pass
# TODO: repeat of the above where it is authorized because I have permission through my Group
# https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291
def test_put_request_change_urlid_rejected(self):
self.assertEqual(JobOffer.objects.count(), 1)
body = {'@id': "ishouldnotbeabletochangethis"}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
# TODO: this is failing quietly
# https://git.happy-dev.fr/startinblox/solid-spec/issues/14
self.assertEqual(response.status_code, 200)
self.assertEqual(JobOffer.objects.count(), 1)
self.assertFalse(JobOffer.objects.filter(urlid=body['@id']).exists())
def test_put_request_change_pk_rejected(self):
self.assertEqual(JobOffer.objects.count(), 1)
body = {'https://cdn.startinblox.com/owl#pk': 2}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
# TODO: this is failing quietly
# https://git.happy-dev.fr/startinblox/solid-spec/issues/14
self.assertEqual(response.status_code, 200)
self.assertEqual(JobOffer.objects.count(), 1)
self.assertFalse(JobOffer.objects.filter(pk=body['https://cdn.startinblox.com/owl#pk']).exists())
# tests that I receive a list of objects for which I am owner, filtering those for which I am not
def test_list_owned_resources(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
self.assertEqual(response.data['ldp:contains'][0]['@id'], my_resource.urlid)
# I do not have model permissions as an authenticated user, but I am the resources' owner
def test_get_owned_resource(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/{}/'.format(my_resource.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['@id'], my_resource.urlid)
self.assertIn('delete', response.data['permissions'])
# I have permission to view this resource
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
def test_patch_owned_resource(self):
my_profile = UserProfile.objects.create(user=self.user, slug=self.user.username, description='about me')
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about')
response = self.client.patch('/userprofiles/{}/'.format(my_profile.slug))
self.assertEqual(response.status_code, 200)
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
self.assertEqual(response.status_code, 403)
def test_delete_owned_resource(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.delete('/ownedresources/{}/'.format(my_resource.pk))
self.assertEqual(response.status_code, 204)
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
# test superuser permissions (configured on model)
def test_list_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
# now I'm superuser, I have the permissions
self._make_self_superuser()
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
def test_get_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
self._make_self_superuser()
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 200)
def test_put_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about')
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/336
self.assertEqual(response.status_code, 403)
self._make_self_superuser()
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
self.assertEqual(response.status_code, 200)
def test_delete_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
self._make_self_superuser()
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 204)
# I have model (or object?) permissions. Attempt to make myself owner and thus upgrade my permissions
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/356/
'''
def test_hack_model_perms_privilege_escalation(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
resource = OwnedResourceVariant.objects.create(description='another test', user=another_user)
# authenticated has 'change' permission but only owner's have 'control' permission, meaning that I should
# not be able to change my privilege level
body = {
'https://cdn.startinblox.com/owl#user': {'@id': self.user.urlid}
}
response = self.client.put('/ownedresourcevariants/{}/'.format(resource.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
resource = OwnedResourceVariant.objects.get(pk=resource.pk)
self.assertNotEqual(resource.user, self.user)
'''
class TestOwnerFieldUserPermissions(UserPermissionsTestCase):
restore_meta = None
def setUpTempOwnerFieldForModel(self, model, new_owner_field):
# store the old meta information for tearDown to cleanup after the test
if self.restore_meta is None:
self.restore_meta = []
self.restore_meta.append({
"model": model,
"owner_field": model._meta.owner_field
})
# replace the owner_field attribute for the test to run
model._meta.owner_field = new_owner_field
def tearDown(self):
# restore any previously changed owner_field attributes in the test
if self.restore_meta is not None:
for idx, model in enumerate(self.restore_meta):
model = self.restore_meta[idx]["model"]
model._meta.owner_field = self.restore_meta[idx]["owner_field"]
self.restore_meta = None
def test_list_owned_resources_nested(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
response = self.client.get('/ownedresourcenestedownerships/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 2)
ids = [r['@id'] for r in response.data['ldp:contains']]
self.assertIn(my_nested.urlid, ids)
self.assertIn(my_second_nested.urlid, ids)
self.assertNotIn(their_nested.urlid, ids)
def test_list_owned_resources_nested_variation_urlid(self):
owner_field = OwnedResourceNestedOwnership._meta.owner_field
OwnedResourceNestedOwnership._meta.owner_field = None
OwnedResourceNestedOwnership._meta.owner_urlid_field = owner_field + "__urlid"
self.test_list_owned_resources_nested()
OwnedResourceNestedOwnership._meta.owner_urlid_field = None
OwnedResourceNestedOwnership._meta.owner_field = owner_field
def test_list_owned_resources_nested_variation_twice_nested(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
my_twice_nested = OwnedResourceTwiceNestedOwnership.objects.create(description="test", parent=my_nested)
their_twice_nested = OwnedResourceTwiceNestedOwnership.objects.create(description="test", parent=their_nested)
response = self.client.get('/ownedresourcetwicenestedownerships/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
ids = [r['@id'] for r in response.data['ldp:contains']]
self.assertIn(my_twice_nested.urlid, ids)
self.assertNotIn(their_twice_nested.urlid, ids)
def test_list_owned_resources_nested_does_not_exist(self):
self.setUpTempOwnerFieldForModel(OwnedResourceNestedOwnership, "parent__doesnotexist")
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
self.assertRaises(ValueError, self.client.get, '/ownedresourcenestedownerships/')
from importlib import import_module
from django.conf import settings
from django.conf.urls import re_path, include
from django.contrib.auth.models import Group
from django.urls import path, re_path, include
from djangoldp.models import LDPSource, Model
from djangoldp.permissions import LDPPermissions
from djangoldp.permissions import ReadOnly
from djangoldp.views import LDPSourceViewSet, WebFingerView, InboxView
from djangoldp.views import LDPViewSet
from djangoldp.views import LDPViewSet, serve_static_content
def __clean_path(path):
......@@ -18,36 +19,66 @@ def __clean_path(path):
return path
def get_all_non_abstract_subclasses(cls):
'''
returns a set of all subclasses for a given Python class (recursively calls cls.__subclasses__()). Ignores Abstract
classes
'''
def valid_subclass(sc):
'''returns True if the parameterised subclass is valid and should be returned'''
return not getattr(sc._meta, 'abstract', False)
return set(c for c in cls.__subclasses__() if valid_subclass(c)).union(
[subclass for c in cls.__subclasses__() for subclass in get_all_non_abstract_subclasses(c) if valid_subclass(subclass)])
urlpatterns = [
path('groups/', LDPViewSet.urls(model=Group)),
re_path(r'^sources/(?P<federation>\w+)/', LDPSourceViewSet.urls(model=LDPSource, fields=['federation', 'urlid'],
permission_classes=[LDPPermissions], )),
permission_classes=[ReadOnly], )),
re_path(r'^\.well-known/webfinger/?$', WebFingerView.as_view()),
re_path(r'^inbox/$', InboxView.as_view()),
path('inbox/', InboxView.as_view()),
re_path(r'^ssr/(?P<path>.*)$', serve_static_content, name='serve_static_content'),
]
if settings.ENABLE_SWAGGER_DOCUMENTATION:
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView
urlpatterns.extend([
path("schema/", SpectacularAPIView.as_view(), name="schema"),
path(
"docs/",
SpectacularSwaggerView.as_view(
template_name="swagger-ui.html", url_name="schema"
),
name="swagger-ui",
)
])
for package in settings.DJANGOLDP_PACKAGES:
try:
import_module('{}.models'.format(package))
urlpatterns.append(re_path(r'^', include('{}.djangoldp_urls'.format(package))))
except ModuleNotFoundError:
pass
if 'djangoldp_account' not in settings.DJANGOLDP_PACKAGES:
urlpatterns.append(re_path(r'^users/', LDPViewSet.urls(model=settings.AUTH_USER_MODEL, permission_classes=[])))
# fetch a list of all models which subclass DjangoLDP Model
model_classes = {cls.__name__: cls for cls in Model.__subclasses__()}
try:
urlpatterns.append(path('', include('{}.djangoldp_urls'.format(package))))
except ModuleNotFoundError:
pass
# append urls for all DjangoLDP Model subclasses
for class_name in model_classes:
model_class = model_classes[class_name]
for model in get_all_non_abstract_subclasses(Model):
# the path is the url for this model
path = __clean_path(model_class.get_container_path())
model_path = __clean_path(model.get_container_path())
# urls_fct will be a method which generates urls for a ViewSet (defined in LDPViewSetGenerator)
urls_fct = model_class.get_view_set().urls
urlpatterns.append(re_path(r'^' + path,
urls_fct(model=model_class,
lookup_field=Model.get_meta(model_class, 'lookup_field', 'pk'),
permission_classes=Model.get_meta(model_class, 'permission_classes', [LDPPermissions]),
fields=Model.get_meta(model_class, 'serializer_fields', []),
nested_fields=model_class.nested.fields())))
urls_fct = getattr(model, 'view_set', LDPViewSet).urls
disable_url = getattr(model._meta, 'disable_url', False)
if not disable_url:
urlpatterns.append(path('' + model_path,
urls_fct(model=model,
lookup_field=getattr(model._meta, 'lookup_field', 'pk'),
permission_classes=getattr(model._meta, 'permission_classes', []),
fields=getattr(model._meta, 'serializer_fields', []),
nested_fields=getattr(model._meta, 'nested_fields', [])
)))
# NOTE: this route will be ignored if a custom (subclass of Model) user model is used, or it is registered by a package
# Django matches the first url it finds for a given path
urlpatterns.append(re_path('users/', LDPViewSet.urls(model=settings.AUTH_USER_MODEL, permission_classes=[])))
\ No newline at end of file
from django.conf import settings
from guardian.utils import get_anonymous_user
PASSTHROUGH_IPS = getattr(settings, 'PASSTHROUGH_IPS', '')
# convenience function returns True if user is anonymous
def is_anonymous_user(user):
anonymous_username = getattr(settings, 'ANONYMOUS_USER_NAME', None)
return user.is_anonymous or user.username == anonymous_username
# convenience function returns True if user is authenticated
def is_authenticated_user(user):
anonymous_username = getattr(settings, 'ANONYMOUS_USER_NAME', None)
return user.is_authenticated and user.username != anonymous_username
# this method is used to check if a given IP is part of the PASSTHROUGH_IPS list
def check_client_ip(request):
x_forwarded_for = request.headers.get('x-forwarded-for')
if x_forwarded_for:
if any(ip in x_forwarded_for.replace(' ', '').split(',') for ip in PASSTHROUGH_IPS):
return True
elif request.META.get('REMOTE_ADDR') in PASSTHROUGH_IPS:
return True
return False
import json
import logging
import os
import time
import validators
from django.apps import apps
from django.conf import settings
from django.conf.urls import include, re_path
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist
from django.urls.resolvers import get_resolver
from django.db import IntegrityError, transaction
from django.http import JsonResponse, Http404
from django.http import Http404, HttpResponseNotFound, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import include, path, re_path
from django.urls.resolvers import get_resolver
from django.utils.decorators import classonlymethod
from django.views import View
from pyld import jsonld
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import AllowAny
from rest_framework.exceptions import ParseError
from rest_framework.parsers import JSONParser
from rest_framework.permissions import AllowAny
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.utils import model_meta
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from djangoldp.activities import (ACTIVITY_SAVING_SETTING, ActivityPubService,
ActivityQueueService, as_activitystream)
from djangoldp.activities.errors import (ActivityStreamDecodeError,
ActivityStreamValidationError)
from djangoldp.endpoints.webfinger import WebFingerEndpoint, WebFingerError
from djangoldp.models import LDPSource, Model, Follower
from djangoldp.permissions import LDPPermissions
from djangoldp.filters import LocalObjectOnContainerPathBackend
from djangoldp.activities import ActivityQueueService, as_activitystream
from djangoldp.activities import ActivityPubService
from djangoldp.activities.errors import ActivityStreamDecodeError, ActivityStreamValidationError
import logging
from djangoldp.filters import (LocalObjectOnContainerPathBackend,
SearchByQueryParamFilterBackend)
from djangoldp.models import DynamicNestedField, Follower, LDPSource, Model
from djangoldp.related import get_prefetch_fields
from djangoldp.utils import is_authenticated_user
logger = logging.getLogger('djangoldp')
get_user_model()._meta.rdf_context = {"get_full_name": "rdfs:label"}
......@@ -56,13 +62,17 @@ class JSONLDRenderer(JSONRenderer):
# https://github.com/digitalbazaar/pyld
class JSONLDParser(JSONParser):
#TODO: It current only works with pyld 1.0. We need to check our support of JSON-LD
media_type = 'application/ld+json'
def parse(self, stream, media_type=None, parser_context=None):
data = super(JSONLDParser, self).parse(stream, media_type, parser_context)
# compact applies the context to the data and makes it a format which is easier to work with
# see: http://json-ld.org/spec/latest/json-ld/#compacted-document-form
return jsonld.compact(data, ctx=settings.LDP_RDF_CONTEXT)
try:
return jsonld.compact(data, ctx=settings.LDP_RDF_CONTEXT)
except jsonld.JsonLdError as e:
raise ParseError(str(e.cause))
# an authentication class which exempts CSRF authentication
......@@ -75,16 +85,15 @@ class InboxView(APIView):
"""
Receive linked data notifications
"""
permission_classes=[AllowAny,]
permission_classes = [AllowAny, ]
renderer_classes = [JSONLDRenderer]
def post(self, request, *args, **kwargs):
'''
receiver for inbox messages. See https://www.w3.org/TR/ldn/
'''
payload = request.body.decode("utf-8")
try:
activity = json.loads(payload, object_hook=as_activitystream)
activity = json.loads(request.body, object_hook=as_activitystream)
activity.validate()
except ActivityStreamDecodeError:
return Response('Activity type unsupported', status=status.HTTP_405_METHOD_NOT_ALLOWED)
......@@ -94,14 +103,21 @@ class InboxView(APIView):
try:
self._handle_activity(activity, **kwargs)
except IntegrityError:
return Response({'Unable to save due to an IntegrityError in the receiver model'}, status=status.HTTP_200_OK)
return Response({'Unable to save due to an IntegrityError in the receiver model'},
status=status.HTTP_200_OK)
except ValueError as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
# save the activity and return 201
obj = ActivityQueueService._save_sent_activity(activity.to_json(), local_id=request.path_info, success=True,
type=activity.type)
if ACTIVITY_SAVING_SETTING == 'VERBOSE':
obj = ActivityQueueService._save_sent_activity(activity.to_json(), local_id=request.path_info, success=True,
type=activity.type)
response = Response({}, status=status.HTTP_201_CREATED)
response['Location'] = obj.urlid
response = Response({}, status=status.HTTP_201_CREATED)
response['Location'] = obj.urlid
else:
response = Response({}, status=status.HTTP_200_OK)
return response
......@@ -126,7 +142,8 @@ class InboxView(APIView):
return self._get_or_create_nested_backlinks(obj, object_model, update)
except IntegrityError as e:
logger.error(str(e))
logger.warning('received a backlink which you were not able to save because of a constraint on the model field.')
logger.warning(
'received a backlink which you were not able to save because of a constraint on the model field.')
raise e
def _get_or_create_nested_backlinks(self, obj, object_model=None, update=False):
......@@ -150,7 +167,8 @@ class InboxView(APIView):
item_value = item[1]
item_model = Model.get_subclass_with_rdf_type(item_value['@type'])
if item_model is None:
raise Http404('unable to store type ' + item_value['@type'] + ', model with this rdf_type not found')
raise Http404(
'unable to store type ' + item_value['@type'] + ', model with this rdf_type not found')
# push nested object tuple as a branch
backlink = self._get_or_create_nested_backlinks(item_value, item_model)
......@@ -158,6 +176,8 @@ class InboxView(APIView):
# get or create the backlink
try:
if obj['@id'] is None or not validators.url(obj['@id']):
raise ValueError('received invalid urlid ' + str(obj['@id']))
external = Model.get_or_create_external(object_model, obj['@id'], update=update, **branches)
# creating followers, to inform distant resource of changes to local connection
......@@ -167,7 +187,7 @@ class InboxView(APIView):
urlid = item[1]
if isinstance(item[1], dict):
urlid = urlid['@id']
if not isinstance(urlid, str):
if not isinstance(urlid, str) or not validators.url(urlid):
continue
if not Model.is_external(urlid):
......@@ -177,7 +197,7 @@ class InboxView(APIView):
# this will be raised when the object was local, but it didn't exist
except ObjectDoesNotExist:
raise Http404()
raise Http404(getattr(object_model._meta, 'label', 'Unknown Model') + ' ' + str(obj['@id']) + ' does not exist')
# TODO: a fallback here? Saving the backlink as Object or similar
def _get_subclass_with_rdf_type_or_404(self, rdf_type):
......@@ -226,7 +246,7 @@ class InboxView(APIView):
origin = origin_model.objects.get(urlid=activity.origin['@id'])
object_instance = object_model.objects.get(urlid=activity.object['@id'])
except origin_model.DoesNotExist:
raise Http404()
raise Http404(activity.origin['@id'] + ' did not exist')
except object_model.DoesNotExist:
return
......@@ -280,9 +300,9 @@ class InboxView(APIView):
try:
object_instance = object_model.objects.get(urlid=activity.object['@id'])
except object_model.DoesNotExist:
raise Http404()
raise Http404(activity.object['@id'] + ' did not exist')
if Model.is_external(object_instance):
raise Http404()
raise Http404(activity.object['@id'] + ' is not local to this server')
# get the inbox field from the actor
if isinstance(activity.actor, str):
......@@ -319,7 +339,7 @@ class LDPViewSetGenerator(ModelViewSet):
@classonlymethod
def get_lookup_arg(cls, **kwargs):
return kwargs.get('lookup_url_kwarg') or cls.lookup_url_kwarg or kwargs.get('lookup_field') or \
Model.get_meta(kwargs['model'], 'lookup_field', 'pk') or cls.lookup_field
getattr(kwargs['model']._meta, 'lookup_field', 'pk') or cls.lookup_field
@classonlymethod
def get_detail_expr(cls, lookup_field=None, **kwargs):
......@@ -329,17 +349,13 @@ class LDPViewSetGenerator(ModelViewSet):
return r'(?P<{}>{}+)/'.format(lookup_field, lookup_group)
@classonlymethod
def nested_urls(cls, nested_field, view_set=None, **kwargs):
'''
constructs url patterns for parameterised nested_field
:param view_set: an optional CustomViewSet to mixin with the LDPNestedViewSet implementation
'''
def build_nested_view_set(cls, view_set=None):
'''returns the the view_set parameter mixed into the LDPNestedViewSet class'''
if view_set is not None:
class LDPNestedCustomViewSet(LDPNestedViewSet, view_set):
pass
return LDPNestedCustomViewSet.nested_urls(nested_field, **kwargs)
return LDPNestedViewSet.nested_urls(nested_field, **kwargs)
return LDPNestedCustomViewSet
return LDPNestedViewSet
@classonlymethod
def urls(cls, **kwargs):
......@@ -349,28 +365,45 @@ class LDPViewSetGenerator(ModelViewSet):
if kwargs.get('model_prefix'):
model_name = '{}-{}'.format(kwargs['model_prefix'], model_name)
detail_expr = cls.get_detail_expr(**kwargs)
# Gets permissions on the model if not explicitely passed to the view
if not 'permission_classes' in kwargs and hasattr(kwargs['model']._meta, 'permission_classes'):
kwargs['permission_classes'] = kwargs['model']._meta.permission_classes
urls = [
re_path('^$', cls.as_view(cls.list_actions, **kwargs), name='{}-list'.format(model_name)),
path('', cls.as_view(cls.list_actions, **kwargs), name='{}-list'.format(model_name)),
re_path('^' + detail_expr + '$', cls.as_view(cls.detail_actions, **kwargs),
name='{}-detail'.format(model_name)),
]
# append nested fields to the urls list
for field in kwargs.get('nested_fields') or cls.nested_fields:
# the nested property may have a custom viewset defined
for field_name in kwargs.get('nested_fields') or cls.nested_fields:
try:
nested_model = kwargs['model']._meta.get_field(field).related_model
nested_field = kwargs['model']._meta.get_field(field_name)
nested_model = nested_field.related_model
field_name_to_parent = nested_field.remote_field.name
except FieldDoesNotExist:
nested_model = getattr(kwargs['model'], field).field.model
if hasattr(nested_model, 'get_view_set'):
kwargs['view_set'] = nested_model.get_view_set()
urls_fct = kwargs['view_set'].nested_urls # our custom view_set may override nested_urls
else:
urls_fct = cls.nested_urls
urls.append(re_path('^' + detail_expr + field + '/', urls_fct(field, **kwargs)))
nested_model = getattr(kwargs['model'], field_name).field.model
nested_field = getattr(kwargs['model'], field_name).field.remote_field
field_name_to_parent = getattr(kwargs['model'], field_name).field.name
# urls should be called from _nested_ view set, which may need a custom view set mixed in
view_set = getattr(nested_model._meta, 'view_set', None)
nested_view_set = cls.build_nested_view_set(view_set)
urls.append(re_path('^' + detail_expr + field_name + '/',
nested_view_set.urls(
model=nested_model,
model_prefix=kwargs['model']._meta.object_name.lower(), # prefix with parent name
lookup_field=getattr(nested_model._meta, 'lookup_field', 'pk'),
exclude=(field_name_to_parent,) if nested_field.one_to_many else (),
permission_classes=getattr(nested_model._meta, 'permission_classes', []),
nested_field_name=field_name,
fields=getattr(nested_model._meta, 'serializer_fields', []),
nested_fields=[],
parent_model=kwargs['model'],
parent_lookup_field=cls.get_lookup_arg(**kwargs),
nested_field=nested_field,
field_name_to_parent=field_name_to_parent)))
return include(urls)
......@@ -383,103 +416,81 @@ class LDPViewSet(LDPViewSetGenerator):
renderer_classes = (JSONLDRenderer,)
parser_classes = (JSONLDParser,)
authentication_classes = (NoCSRFAuthentication,)
filter_backends = [LocalObjectOnContainerPathBackend]
filter_backends = [SearchByQueryParamFilterBackend, LocalObjectOnContainerPathBackend]
prefetch_fields = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
# attach filter backends based on permissions classes, to reduce the queryset based on these permissions
# https://www.django-rest-framework.org/api-guide/filtering/#generic-filtering
if self.permission_classes:
for p in self.permission_classes:
if hasattr(p, 'filter_class') and p.filter_class:
self.filter_backends.append(p.filter_class)
self.serializer_class = self.build_read_serializer()
self.write_serializer_class = self.build_write_serializer()
def build_read_serializer(self):
self.filter_backends = type(self).filter_backends + list({perm_class().get_filter_backend(self.model)
for perm_class in self.permission_classes if hasattr(perm_class(), 'get_filter_backend')})
if None in self.filter_backends:
self.filter_backends.remove(None)
def filter_queryset(self, queryset):
if self.request.user.is_superuser:
return queryset
return super().filter_queryset(queryset)
def check_permissions(self, request):
if request.user.is_superuser:
return True
return super().check_permissions(request)
def check_object_permissions(self, request, obj):
if request.user.is_superuser:
return True
return super().check_object_permissions(request, obj)
def get_depth(self) -> int:
if getattr(self, 'force_depth', None):
#TODO: this exception on depth for writing should be handled by the serializer itself
return self.force_depth
if hasattr(self, 'request') and 'HTTP_DEPTH' in self.request.META:
return int(self.request.META['HTTP_DEPTH'])
if hasattr(self, 'depth'):
return self.depth
return getattr(self.model._meta, 'depth', 0)
def get_serializer_class(self):
model_name = self.model._meta.object_name.lower()
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
try:
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
except:
lookup_field = 'urlid'
meta_args = {'model': self.model, 'extra_kwargs': {
'@id': {'lookup_field': lookup_field}},
'depth': getattr(self, 'depth', Model.get_meta(self.model, 'depth', 0)),
# 'depth': getattr(self, 'depth', 4),
'extra_fields': self.nested_fields}
if self.fields:
meta_args['fields'] = self.fields
else:
meta_args['exclude'] = Model.get_meta(self.model, 'serializer_fields_exclude') or ()
return self.build_serializer(meta_args, 'Read')
'@id': {'lookup_field': lookup_field}},
'depth': self.get_depth(),
'extra_fields': self.nested_fields}
def build_write_serializer(self):
model_name = self.model._meta.object_name.lower()
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
meta_args = {'model': self.model, 'extra_kwargs': {
'@id': {'lookup_field': lookup_field}},
'depth': 10,
'extra_fields': self.nested_fields}
if self.fields:
meta_args['fields'] = self.fields
else:
meta_args['exclude'] = self.exclude or Model.get_meta(self.model, 'serializer_fields_exclude') or ()
return self.build_serializer(meta_args, 'Write')
def build_serializer(self, meta_args, name_prefix):
meta_args['exclude'] = self.exclude or getattr(self.model._meta, 'serializer_fields_exclude', ())
# create the Meta class to associate to LDPSerializer, using meta_args param
meta_class = type('Meta', (), meta_args)
from djangoldp.serializers import LDPSerializer
if self.serializer_class is None:
self.serializer_class = LDPSerializer
return type(LDPSerializer)(self.model._meta.object_name.lower() + name_prefix + 'Serializer', (self.serializer_class,),
parent_meta = (self.serializer_class.Meta,) if hasattr(self.serializer_class, 'Meta') else ()
meta_class = type('Meta', parent_meta, meta_args)
return type(self.serializer_class)(self.model._meta.object_name.lower() + 'Serializer',
(self.serializer_class,),
{'Meta': meta_class})
def is_safe_create(self, user, validated_data, *args, **kwargs):
'''
A function which is checked before the create operation to confirm the validated data is safe to add
returns True by default
:return: True if the operation should be permitted, False to return a 403 response
'''
return True
# def list(self, request, *args, **kwargs):
# t1 = time.time()
# queryset = self.get_queryset()
# t2 = time.time()
# print('got queryset in ' + str(t2 - t1))
#
# t1 = time.time()
# queryset = self.filter_queryset(queryset)
# t2 = time.time()
# print('filtered queryset in ' + str(t2 - t1))
#
# t1 = time.time()
# page = self.paginate_queryset(queryset)
# t2 = time.time()
# print('paginated queryset in ' + str(t2-t1))
# if page is not None:
# t1 = time.time()
# serializer = self.get_serializer(page, many=True)
# paginated_response = self.get_paginated_response(serializer.data)
# t2 = time.time()
# print('paginated response in ' + str(t2-t1))
#
# return paginated_response
#
# t1 = time.time()
# serializer = self.get_serializer(queryset, many=True)
# response = Response(serializer.data)
# t2 = time.time()
# print('regular response in ' + str(t2-t1))
# return response
# The chaining of filter through | may lead to duplicates and distinct should only be applied in the end.
def filter_queryset(self, queryset):
return super().filter_queryset(queryset).distinct()
def create(self, request, *args, **kwargs):
serializer = self.get_write_serializer(data=request.data)
self.force_depth = 10
serializer = self.get_serializer(data=request.data)
self.force_depth = None
serializer.is_valid(raise_exception=True)
if not self.is_safe_create(request.user, serializer.validated_data):
return Response({'detail': 'You do not have permission to perform this action'}, status=status.HTTP_403_FORBIDDEN)
self.perform_create(serializer)
response_serializer = self.get_serializer()
......@@ -490,7 +501,9 @@ class LDPViewSet(LDPViewSetGenerator):
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_write_serializer(instance, data=request.data, partial=partial)
self.force_depth = 10
serializer = self.get_serializer(instance, data=request.data, partial=partial)
self.force_depth = None
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
......@@ -501,69 +514,35 @@ class LDPViewSet(LDPViewSetGenerator):
response_serializer = self.get_serializer()
data = response_serializer.to_representation(serializer.instance)
return Response(data)
def get_write_serializer(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_write_serializer_class()
kwargs.setdefault('context', self.get_serializer_context())
return serializer_class(*args, **kwargs)
def get_write_serializer_class(self):
"""
Return the class to use for the serializer.
Defaults to using `self.write_serializer_class`.
You may want to override this if you need to provide different
serializations depending on the incoming request.
(Eg. admins get full serialization, others get basic serialization)
"""
assert self.write_serializer_class is not None, (
"'%s' should either include a `write_serializer_class` attribute, "
"or override the `get_write_serializer_class()` method."
% self.__class__.__name__
)
return self.write_serializer_class
def perform_create(self, serializer, **kwargs):
if hasattr(self.model._meta, 'auto_author') and isinstance(self.request.user, get_user_model()):
# auto_author_field may be set (a field on user which should be made author - e.g. profile)
auto_author_field = getattr(self.model._meta, 'auto_author_field', None)
if auto_author_field is not None:
kwargs[self.model._meta.auto_author] = getattr(self.request.user, auto_author_field, None)
else:
kwargs[self.model._meta.auto_author] = get_user_model().objects.get(pk=self.request.user.pk)
serializer.save(**kwargs)
kwargs[self.model._meta.auto_author] = get_user_model().objects.get(pk=self.request.user.pk)
return serializer.save(**kwargs)
def get_queryset(self, *args, **kwargs):
if self.model:
return self.model.objects.all()
queryset = self.model.objects.all()
else:
return super(LDPView, self).get_queryset(*args, **kwargs)
queryset = super(LDPViewSet, self).get_queryset(*args, **kwargs)
if self.prefetch_fields is None:
self.prefetch_fields = get_prefetch_fields(self.model, self.get_serializer(), self.get_depth())
return queryset.prefetch_related(*self.prefetch_fields)
def dispatch(self, request, *args, **kwargs):
'''overriden dispatch method to append some custom headers'''
response = super(LDPViewSet, self).dispatch(request, *args, **kwargs)
response["Access-Control-Allow-Origin"] = request.META.get('HTTP_ORIGIN')
response["Access-Control-Allow-Methods"] = "GET,POST,PUT,PATCH,DELETE"
response["Access-Control-Allow-Headers"] = "authorization, Content-Type, if-match, accept"
response["Access-Control-Expose-Headers"] = "Location, User"
response["Access-Control-Allow-Credentials"] = 'true'
response["Accept-Post"] = "application/ld+json"
if response.status_code in [201, 200] and '@id' in response.data:
response["Location"] = str(response.data['@id'])
else:
pass
response["Accept-Post"] = "application/ld+json"
if request.user.is_authenticated:
if is_authenticated_user(request.user):
try:
response['User'] = request.user.webid()
response['User'] = request.user.urlid
except AttributeError:
pass
return response
......@@ -576,57 +555,51 @@ class LDPNestedViewSet(LDPViewSet):
"""
parent_model = None
parent_lookup_field = None
related_field = None
nested_field = None
nested_related_name = None
nested_field_name = None
field_name_to_parent = None
def get_parent(self):
return get_object_or_404(self.parent_model, **{self.parent_lookup_field: self.kwargs[self.parent_lookup_field]})
def perform_create(self, serializer, **kwargs):
kwargs[self.nested_related_name] = self.get_parent()
kwargs[self.field_name_to_parent] = self.get_parent()
super().perform_create(serializer, **kwargs)
def get_queryset(self, *args, **kwargs):
if self.related_field.many_to_many or self.related_field.one_to_many:
return getattr(self.get_parent(), self.nested_field).all()
if self.related_field.many_to_one or self.related_field.one_to_one:
return [getattr(self.get_parent(), self.nested_field)]
related = getattr(self.get_parent(), self.nested_field_name)
if self.nested_field.many_to_many or self.nested_field.one_to_many:
if isinstance(self.nested_field, DynamicNestedField):
return related()
return related.all()
if self.nested_field.one_to_one or self.nested_field.many_to_one:
return type(related).objects.filter(pk=related.pk)
@classonlymethod
def get_related_fields(cls, model):
return {field.get_accessor_name(): field for field in model._meta.fields_map.values()}
@classonlymethod
def nested_urls(cls, nested_field, **kwargs):
try:
related_field = cls.get_model(**kwargs)._meta.get_field(nested_field)
except FieldDoesNotExist:
related_field = cls.get_related_fields(cls.get_model(**kwargs))[nested_field]
if related_field.related_query_name:
nested_related_name = related_field.related_query_name()
class LDPAPIView(APIView):
'''extends rest framework APIView to support Solid standards'''
authentication_classes = (NoCSRFAuthentication,)
def dispatch(self, request, *args, **kwargs):
'''overriden dispatch method to append some custom headers'''
response = super().dispatch(request, *args, **kwargs)
if response.status_code in [201, 200] and isinstance(response.data, dict) and '@id' in response.data:
response["Location"] = str(response.data['@id'])
else:
nested_related_name = related_field.remote_field.name
return cls.urls(
lookup_field=Model.get_meta(related_field.related_model, 'lookup_field', 'pk'),
model=related_field.related_model,
exclude=(nested_related_name,) if related_field.one_to_many else (),
parent_model=cls.get_model(**kwargs),
nested_field=nested_field,
nested_related_name=nested_related_name,
related_field=related_field,
parent_lookup_field=cls.get_lookup_arg(**kwargs),
model_prefix=cls.get_model(**kwargs)._meta.object_name.lower(),
permission_classes=Model.get_permission_classes(related_field.related_model,
kwargs.get('permission_classes', [LDPPermissions])),
lookup_url_kwarg=related_field.related_model._meta.object_name.lower() + '_id')
pass
if is_authenticated_user(request.user):
try:
response['User'] = request.user.urlid
except AttributeError:
pass
return response
class LDPSourceViewSet(LDPViewSet):
model = LDPSource
federation = None
filter_backends = []
def get_queryset(self, *args, **kwargs):
return super().get_queryset(*args, **kwargs).filter(federation=self.kwargs['federation'])
......@@ -651,3 +624,82 @@ class WebFingerView(View):
def post(self, request, *args, **kwargs):
return self.on_request(request)
def serve_static_content(request, path):
if request.method != "GET":
resolver = get_resolver()
match = resolver.resolve("/" + path)
request.user = AnonymousUser()
return match.func(request, *match.args, **match.kwargs)
server_url = getattr(settings, "BASE_URL", "http://localhost")
is_filtered = request.GET.get('search-fields', False)
output_dir = "ssr"
output_dir_filtered = "ssr_filtered"
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
if not os.path.exists(output_dir_filtered):
os.makedirs(output_dir_filtered, exist_ok=True)
file_path = os.path.join(output_dir if not is_filtered else output_dir_filtered, path[:-1])
if not file_path.endswith(".jsonld"):
file_path += ".jsonld"
if os.path.exists(file_path):
current_time = time.time()
file_mod_time = os.path.getmtime(file_path)
time_difference = current_time - file_mod_time
if time_difference > 24 * 60 * 60:
os.remove(file_path)
if not os.path.exists(file_path):
resolver = get_resolver()
match = resolver.resolve("/" + path)
request.user = AnonymousUser()
response = match.func(request, *match.args, **match.kwargs)
if response.status_code == 200:
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
json_content = JSONRenderer().render(response.data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(
json_content.decode("utf-8")
.replace('"@id":"' + server_url, '"@id":"' + server_url + "/ssr")
.replace(
'"@id":"' + server_url + "/ssr/ssr",
'"@id":"' + server_url + "/ssr",
)[:-1]
+ ',"@context": "'
+ getattr(
settings,
"LDP_RDF_CONTEXT",
"https://cdn.startinblox.com/owl/context.jsonld",
)
+ '"}'
)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
try:
json_content = json.loads(content)
return JsonResponse(
json_content,
safe=False,
status=200,
content_type="application/ld+json",
headers={
"Access-Control-Allow-Origin": "*",
"Cache-Control": "public, max-age=3600",
},
)
except json.JSONDecodeError:
pass
return HttpResponseNotFound("File not found")
# djangoldp-crypto
Packages like [djangoldp](https://git.startinblox.com/djangoldp-packages/djangoldp) and [django-webidoidc-provider](https://git.startinblox.com/djangoldp-packages/django-webidoidc-provider) have some models and utilities which make use of cryptography. In general, we want to re-use that code in a supporting package to avoid duplication of effort. However, until it is more clear what ca be re-used, we are using this separate django app in this package. See [this ticket](https://git.startinblox.com/djangoldp-packages/djangoldp/issues/236) for more.
## Install
```bash
$ python -m pip install 'djangoldp[crypto]'
```
Tnen add the app to your `settings.yml` like so:
```yaml
INSTALLED_APPS:
- djangoldp_crypto
```
## Management commands
- `creatersakey`: Randomly generate a new RSA key for the DjangoLDP server
## Test
```bash
$ python -m unittest djangoldp_crypto.tests.runner
```
from django.contrib import admin
from djangoldp_crypto.models import RSAKey
@admin.register(RSAKey)
class RSAKeyAdmin(admin.ModelAdmin):
readonly_fields = ['kid', 'pub_key']
def save_model(self, request, obj, form, change):
obj.priv_key.replace('\r', '')
super().save_model(request, obj, form, change)
from Cryptodome.PublicKey import RSA
from django.core.management.base import BaseCommand
from djangoldp_crypto.models import RSAKey
class Command(BaseCommand):
help = 'Randomly generate a new RSA key for the DjangoLDP server'
def handle(self, *args, **options):
try:
key = RSA.generate(2048)
rsakey = RSAKey(priv_key=key.exportKey('PEM').decode('utf8'))
rsakey.save()
self.stdout.write('RSA key successfully created')
self.stdout.write(u'Private key: \n{0}'.format(rsakey.priv_key))
self.stdout.write(u'Public key: \n{0}'.format(rsakey.pub_key))
self.stdout.write(u'Key ID: \n{0}'.format(rsakey.kid))
except Exception as e:
self.stdout.write('Something goes wrong: {0}'.format(e))
# Generated by Django 2.2.19 on 2021-03-15 10:35
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='RSAKey',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('priv_key', models.TextField(help_text='Paste your private RSA Key here.', unique=True, verbose_name='Key')),
],
options={
'verbose_name': 'RSA Key',
'verbose_name_plural': 'RSA Keys',
},
),
]
from hashlib import md5
from Cryptodome.PublicKey import RSA
from django.db import models
from django.utils.translation import gettext_lazy as _
class RSAKey(models.Model):
priv_key = models.TextField(
verbose_name=_(u'Key'), unique=True,
help_text=_(u'Paste your private RSA Key here.'))
class Meta:
verbose_name = _(u'RSA Key')
verbose_name_plural = _(u'RSA Keys')
def __str__(self):
return u'{0}'.format(self.kid)
def __unicode__(self):
return self.__str__()
@property
def kid(self):
if not self.priv_key:
return ''
return u'{0}'.format(md5(self.priv_key.encode('utf-8')).hexdigest())
@property
def pub_key(self):
if not self.priv_key:
return ''
_pub_key = RSA.importKey(self.priv_key).publickey()
return _pub_key.export_key().decode('utf-8')