Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • djangoldp-packages/djangoldp
  • decentral1se/djangoldp
  • femmefaytale/djangoldp
  • jvtrudel/djangoldp
4 results
Show changes
Showing
with 2207 additions and 774 deletions
from django.contrib.auth import get_user_model
from rest_framework.test import APIClient, APITestCase
from djangoldp.tests.models import User, Project
import cProfile, pstats, io
class TestPerformance(APITestCase):
fixtures = ['test.json',]
def setUp(self):
self.client = APIClient()
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
self.client.force_authenticate(user=self.user)
print('there are ' + str(Project.objects.count()) + ' projects in the database')
print('there are ' + str(User.objects.count()) + ' users in the database')
def _print_stats(self, pr):
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.print_stats()
print(s.getvalue())
def _enable_new_profiler(self):
pr = cProfile.Profile()
pr.enable()
return pr
def test_get_container(self):
pr = self._enable_new_profiler()
response = self.client.get('/projects/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' projects')
pr.disable()
self._print_stats(pr)
pr = self._enable_new_profiler()
response = self.client.get('/users/', content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
print('counted ' + str(len(response.data['ldp:contains'])) + ' users')
pr.disable()
self._print_stats(pr)
import json
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from guardian.models import GroupObjectPermission
from rest_framework.test import APIRequestFactory, APIClient, APITestCase
from djangoldp.tests.models import AnonymousReadOnlyPost, AuthenticatedOnlyPost, ReadOnlyPost, DoubleInheritModel, \
ReadAndCreatePost, OwnedResource, RestrictedCircle, RestrictedResource, ANDPermissionsDummy, ORPermissionsDummy
class TestPermissions(APITestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
def authenticate(self):
self.user = get_user_model().objects.create_user(username='random', email='random@user.com', password='Imrandom')
self.client = APIClient(enforce_csrf_checks=True)
self.client.force_authenticate(user=self.user)
def check_can_add(self, url, status_code=201, field='content', extra_content={}):
data = extra_content
extra_content[f"https://cdn.startinblox.com/owl#{field}"] = "new post"
response = self.client.post(url, data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 201:
self.assertIn('@id', response.data)
return response.data['@id']
def check_can_change(self, id, status_code=200, field='content'):
data = { f"https://cdn.startinblox.com/owl#{field}": "changed post" }
response = self.client.put(id, data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertIn('@id', response.data)
self.assertEqual(response.data['@id'], id)
def check_can_view_one(self, id, status_code=200):
response = self.client.get(id, content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertEqual(response.data['@id'], id)
def check_can_view(self, url, ids, status_code=200):
response = self.client.get(url, content_type='application/ld+json')
self.assertEqual(response.status_code, status_code)
if status_code == 200:
self.assertEqual(len(response.data['ldp:contains']), len(ids))
for resource, id in zip(response.data['ldp:contains'], ids):
self.assertEqual(resource['@id'], id)
for id in ids:
self.check_can_view_one(id, status_code)
def test_permissionless_model(self):
id = self.check_can_add('/posts/')
self.check_can_view('/posts/', [id])
def test_anonymous_readonly(self):
post = AnonymousReadOnlyPost.objects.create(content = "test post")
self.check_can_view('/anonymousreadonlyposts/', [post.urlid])
self.check_can_add('/anonymousreadonlyposts/', 403)
self.check_can_change(post.urlid, 403)
self.authenticate()
self.check_can_add('/anonymousreadonlyposts/')
self.check_can_change(post.urlid)
def test_authenticated_only(self):
post = AuthenticatedOnlyPost.objects.create(content = "test post")
self.check_can_view('/authenticatedonlyposts/', [post.urlid], 403)
self.check_can_add('/authenticatedonlyposts/', 403)
self.check_can_change(post.urlid, 403)
post.delete()
self.authenticate()
#When authenticated it should behave like a non protected model
id = self.check_can_add('/authenticatedonlyposts/')
self.check_can_view('/authenticatedonlyposts/', [id])
self.check_can_change(id)
def test_readonly(self):
post = ReadOnlyPost.objects.create(content = "test post")
self.check_can_view('/readonlyposts/', [post.urlid])
self.check_can_add('/readonlyposts/', 403)
self.check_can_change(post.urlid, 403)
def test_readandcreate(self):
post = ReadAndCreatePost.objects.create(content = "test post")
self.check_can_view('/readandcreateposts/', [post.urlid])
self.check_can_add('/readandcreateposts/')
self.check_can_change(post.urlid, 403)
def test_owner_permissions(self):
self.authenticate()
them = get_user_model().objects.create_user(username='them', email='them@user.com', password='itstheirsecret')
mine = OwnedResource.objects.create(description="Mine!", user=self.user)
theirs = OwnedResource.objects.create(description="Theirs", user=them)
noones = OwnedResource.objects.create(description="I belong to NO ONE!")
self.check_can_view('/ownedresources/', [mine.urlid]) #checks I can access mine and only mine
self.check_can_change(mine.urlid)
self.check_can_view_one(theirs.urlid, 404)
self.check_can_change(theirs.urlid, 404)
self.check_can_view_one(noones.urlid, 404)
self.check_can_change(noones.urlid, 404)
def check_permissions(self, obj, group, required_perms):
perms = GroupObjectPermission.objects.filter(group=group)
for perm in perms:
self.assertEqual(perm.content_type.model, obj._meta.model_name)
self.assertEqual(perm.object_pk, str(obj.pk))
self.assertEqual(set(perms.values_list('permission__codename', flat=True)),
{f'{perm}_{obj._meta.model_name}' for perm in required_perms})
def create_circles(self):
self.authenticate()
self.user.user_permissions.add(Permission.objects.get(codename='view_restrictedcircle'))
them = get_user_model().objects.create_user(username='them', email='them@user.com', password='itstheirsecret')
mine = RestrictedCircle.objects.create(name="mine", description="Mine!", owner=self.user)
theirs = RestrictedCircle.objects.create(name="theirs", description="Theirs", owner=them)
noones = RestrictedCircle.objects.create(name="no one's", description="I belong to NO ONE!")
return mine, theirs, noones
def test_role_permissions(self):
mine, theirs, noones = self.create_circles()
self.assertIn(self.user, mine.members.user_set.all())
self.assertIn(self.user, mine.admins.user_set.all())
self.assertNotIn(self.user, theirs.members.user_set.all())
self.assertNotIn(self.user, theirs.admins.user_set.all())
self.assertNotIn(self.user, noones.members.user_set.all())
self.assertNotIn(self.user, noones.admins.user_set.all())
self.check_can_view('/restrictedcircles/', [mine.urlid]) #check filtering
self.check_permissions(mine, mine.members, RestrictedCircle._meta.permission_roles['members']['perms'])
self.check_permissions(mine, mine.admins, RestrictedCircle._meta.permission_roles['admins']['perms'])
def test_inherit_permissions(self):
mine, theirs, noones = self.create_circles()
myresource = RestrictedResource.objects.create(content="mine", circle=mine)
their_resource = RestrictedResource.objects.create(content="theirs", circle=theirs)
noones_resource = RestrictedResource.objects.create(content="noones", circle=noones)
self.check_can_view('/restrictedresources/', [myresource.urlid])
self.check_can_change(myresource.urlid)
self.check_can_change(their_resource.urlid, 404)
self.check_can_change(noones_resource.urlid, 404)
def test_inherit_several_permissions(self):
mine, theirs, noones = self.create_circles()
ro_resource = ReadOnlyPost.objects.create(content="read only")
myresource = DoubleInheritModel.objects.create(content="mine", circle=mine, ro_ancestor=None)
some = DoubleInheritModel.objects.create(content="some", circle=theirs, ro_ancestor=ro_resource)
other = DoubleInheritModel.objects.create(content="other", circle=noones, ro_ancestor=None)
self.check_can_view('/doubleinheritmodels/', [myresource.urlid, some.urlid])
self.check_can_change(myresource.urlid)
self.check_can_change(some.urlid, 403)
self.check_can_change(other.urlid, 404)
def test_inherit_permissions_none(self):
id = self.check_can_add('/doubleinheritmodels/')
resource = DoubleInheritModel.objects.get(urlid=id)
self.check_can_view('/doubleinheritmodels/', [resource.urlid])
circle = RestrictedCircle.objects.create()
id = self.check_can_add('/doubleinheritmodels/', 404, extra_content={'https://cdn.startinblox.com/owl#circle': {'@id': circle.urlid}})
def test_and_permissions(self):
self.authenticate()
abc = ANDPermissionsDummy.objects.create(title='ABC')
youpi = ANDPermissionsDummy.objects.create(title='youpi woopaa')
wonder = ANDPermissionsDummy.objects.create(title='A Wonderful World!!')
plop = ANDPermissionsDummy.objects.create(title='plop')
self.check_can_view('/andpermissionsdummys/', [wonder.urlid], 403)
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.user.username = 'toto'
self.user.save()
self.check_can_view('/andpermissionsdummys/', [wonder.urlid])
self.check_can_view_one(abc.urlid, 404)
self.check_can_view_one(youpi.urlid, 404)
self.check_can_view_one(plop.urlid, 404)
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.check_can_change(youpi.urlid, 403, field='title')
def test_or_permissions(self):
self.authenticate()
abc = ORPermissionsDummy.objects.create(title='ABC')
youpi = ORPermissionsDummy.objects.create(title='youpi woopaa')
wonder = ORPermissionsDummy.objects.create(title='A Wonderful World!!')
plop = ORPermissionsDummy.objects.create(title='plop')
self.check_can_view('/orpermissionsdummys/', [abc.urlid, wonder.urlid])
self.check_can_add('/andpermissionsdummys/', 403, field='title')
self.check_can_change(wonder.urlid, 403, field='title')
self.user.username = 'toto'
self.user.save()
self.check_can_view('/orpermissionsdummys/', [abc.urlid, youpi.urlid, wonder.urlid])
self.check_can_view_one(plop.urlid, 404)
self.check_can_add('/orpermissionsdummys/', field='title')
self.check_can_change(wonder.urlid, field='title')
self.check_can_change(plop.urlid, 404, field='title')
\ No newline at end of file
from django.contrib.auth import get_user_model
from django.test import TestCase
from rest_framework.test import APIClient, APIRequestFactory
from rest_framework.utils import json
from djangoldp.models import Model
from djangoldp.tests.models import (Circle, Invoice, LDPDummy, Post, Project,
Resource, Space)
class PostTestCase(TestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
self.client.force_authenticate(self.user)
def test_save_fk_graph_with_nested(self):
post = {
'@graph': [
{
'https://cdn.startinblox.com/owl#title': "title",
'https://cdn.startinblox.com/owl#invoice': {
'@id': "_.123"
}
},
{
'@id': "_.123",
'https://cdn.startinblox.com/owl#title': "title 2"
}
]
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEqual(response.data['title'], "title")
self.assertEqual(response.data['invoice']['title'], "title 2")
def test_save_fk_graph_with_existing_nested(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'@graph': [
{
'https://cdn.startinblox.com/owl#title': "title",
'https://cdn.startinblox.com/owl#invoice': {
'@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)
}
}
]
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEqual(response.data['title'], "title")
self.assertEqual(response.data['invoice']['title'], "title 3")
def test_post_should_accept_missing_field_id_nullable(self):
body = [
{
'@id': "./",
'https://cdn.startinblox.com/owl#content': "post update",
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('peer_user', response.data)
def test_post_should_accept_empty_field_if_nullable(self):
body = [
{
'@id': "./",
'https://cdn.startinblox.com/owl#content': "post update",
'https://cdn.startinblox.com/owl#peer_user': ""
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['peer_user'], None)
def test_save_sub_object_in_new_object_with_reverse_1to1_relation(self):
dummy = LDPDummy.objects.create(some="foo")
body = [
{
'@id': "_:b216",
'https://cdn.startinblox.com/owl#description': "user update",
'https://cdn.startinblox.com/owl#ddummy': {
"@id": "http://happy-dev.fr{}{}/".format(Model.container_id(dummy), dummy.id)
}
},
{
'@id': './',
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "_:b216"}
}
]
response = self.client.post('/users/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('userprofile', response.data)
def test_embedded_context(self):
body = {
'@graph': [
{
'@id': "./",
'content': "post update",
'peer_user': ""
}
],
'@context': {
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
def test_nested_container(self):
resource = Resource.objects.create()
body = {
'https://cdn.startinblox.com/owl#title': "new job",
'https://cdn.startinblox.com/owl#slug': "job1",
}
response = self.client.post('/resources/{}/joboffers/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['resources']['ldp:contains'][0]['@id'], resource.urlid)
self.assertEqual(response.data['title'], "new job")
def test_nested_container_bis(self):
invoice = Invoice.objects.create()
body = {
'https://cdn.startinblox.com/owl#title': "new batch",
}
response = self.client.post('/invoices/{}/batches/'.format(invoice.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['invoice']['@id'],
"http://happy-dev.fr/invoices/{}/".format(invoice.pk))
self.assertEqual(response.data['title'], "new batch")
def test_nested_container_federated(self):
resource = Resource.objects.create()
body = {
'https://cdn.startinblox.com/owl#@id': "http://external.job/job/1",
}
response = self.client.post('/resources/{}/joboffers/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['@id'], "http://external.job/job/1")
self.assertIn('@type', response.data)
response = self.client.get('/resources/{}/'.format(resource.pk))
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], "http://external.job/job/1")
def test_embedded_context_2(self):
body = {
'@id': "./",
'content': "post update",
'peer_user': "",
'@context': {
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
def test_auto_id(self):
body = {
'@id': "./",
'content': "post update",
'peer_user': "",
'@context': {
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
saved_post = Post.objects.get(pk=1)
self.assertEqual(saved_post.urlid, "http://happy-dev.fr/posts/1/")
def test_save_invalid_nested_user(self):
body = {
'@id': "./",
'content': "post update",
'peer_user': {'none': None},
'@context': {
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 400)
def test_nested_container_user_federated(self):
project = Project.objects.create()
body = {
'https://cdn.startinblox.com/owl#@id': "http://external.user/user/1/",
}
response = self.client.post('/projects/{}/members/'.format(project.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['@id'], "http://external.user/user/1/")
self.assertIn('@type', response.data)
response = self.client.get('/projects/{}/'.format(project.pk))
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'], "http://external.user/user/1/")
# https://www.w3.org/TR/json-ld/#value-objects
def test_post_field_with_value_object(self):
post = {
'https://cdn.startinblox.com/owl#title': {
'@value': "title",
'@language': "en"
}
}
response = self.client.post('/invoices/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['title'], "title")
# from JSON-LD spec: "The value associated with the @value key MUST be either a string, a number, true, false or null"
def test_save_field_with_invalid_value_object(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'https://cdn.startinblox.com/owl#invoice': {
'@value': {'title': 'title',
'@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)}
}
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 400)
# TODO: bug with PyLD: https://github.com/digitalbazaar/pyld/issues/142
# from JSON-LD spec: "If the value associated with the @type key is @json, the value MAY be either an array or an object"
'''
def test_save_field_with_object_value_object(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'https://cdn.startinblox.com/owl#invoice': {
'@value': {'title': 'title', '@id': "http://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)},
'@type': '@json'
}
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
'''
# the below test is necessary because of an obscure bug where the OneToOne field is successfully applied
# during the life of the serializer (and response) but is not persisted in the database,
# when it is posted onto the reverse relation
def test_one_to_one_field_reverse_post(self):
self.assertEqual(Circle.objects.count(), 0)
self.assertEqual(Space.objects.count(), 0)
body = {
'@context': {'@vocab': "https://cdn.startinblox.com/owl#" },
'space': {'name': "Etablissement"}
}
response = self.client.post('/circles/', data=json.dumps(body), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(Circle.objects.count(), 1)
self.assertEqual(Space.objects.count(), 1)
circle = Circle.objects.all()[0]
space = circle.space
self.assertIsNotNone(space)
self.assertIsNotNone(space.circle)
from django.test import TestCase
from rest_framework.utils import json
from djangoldp.models import Model
from djangoldp.serializers import LDPSerializer
from djangoldp.tests.models import Skill, JobOffer, Invoice, LDPDummy
class Save(TestCase):
def test_save_m2m_graph_with_many_nested(self):
invoice = {
"@graph": [
{
"@id": "./",
"batches": {"@id": "_:b381"},
"title": "Nouvelle facture",
"date": ""
},
{
"@id": "_:b381",
"tasks": {"@id": "_:b382"},
"title": "Batch 1"
},
{
"@id": "_:b382",
"title": "Tache 1"
}
]
}
meta_args = {'model': Invoice, 'depth': 2, 'fields': ("@id", "title", "batches", "date")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('InvoiceSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=invoice)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "Nouvelle facture")
self.assertIs(result.batches.count(), 1)
self.assertEquals(result.batches.all()[0].title, "Batch 1")
self.assertIs(result.batches.all()[0].tasks.count(), 1)
self.assertEquals(result.batches.all()[0].tasks.all()[0].title, "Tache 1")
def test_save_m2m(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2")
job = {"title": "job test",
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug), "title": "skill2 UP"},
{"title": "skill3", "obligatoire": "obligatoire", "slug": "slug3"},
]}
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills", "slug")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 3)
self.assertEquals(result.skills.all()[0].title, "skill1") # no change
self.assertEquals(result.skills.all()[1].title, "skill2 UP") # title updated
self.assertEquals(result.skills.all()[2].title, "skill3") # creation on the fly
def test_save_m2m_graph_simple(self):
job = {"@graph": [
{"title": "job test",
},
]}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 0)
def test_save_m2m_graph_with_nested(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire")
job = {"@graph": [
{"title": "job test",
"skills": {"@id": "_.123"}
},
{"@id": "_.123", "title": "skill3 NEW", "obligatoire": "obligatoire"},
]}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 1)
self.assertEquals(result.skills.all()[0].title, "skill3 NEW") # creation on the fly
def test_save_without_nested_fields(self):
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire")
job = {"title": "job test"}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test")
self.assertIs(result.skills.count(), 0)
def test_save_on_sub_iri(self):
"""
POST /job-offers/1/skills/
"""
job = JobOffer.objects.create(title="job test")
skill = {"title": "new SKILL"}
meta_args = {'model': Skill, 'depth': 2, 'fields': ("@id", "title")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('SkillSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=skill)
serializer.is_valid()
kwargs = {}
kwargs['joboffer'] = job
result = serializer.save(**kwargs)
self.assertEquals(result.title, "new SKILL")
self.assertIs(result.joboffer_set.count(), 1)
self.assertEquals(result.joboffer_set.get(), job)
self.assertIs(result.joboffer_set.get().skills.count(), 1)
def test_save_fk_graph_with_nested(self):
post = {
'@graph': [
{
'http://happy-dev.fr/owl/#title': "title",
'http://happy-dev.fr/owl/#invoice': {
'@id': "_.123"
}
},
{
'@id': "_.123",
'http://happy-dev.fr/owl/#title': "title 2"
}
]
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEquals(response.data['title'], "title")
self.assertEquals(response.data['invoice']['title'], "title 2")
def test_save_fk_graph_with_existing_nested(self):
invoice = Invoice.objects.create(title="title 3")
post = {
'@graph': [
{
'http://happy-dev.fr/owl/#title': "title",
'http://happy-dev.fr/owl/#invoice': {
'@id': "https://happy-dev.fr{}{}/".format(Model.container_id(invoice), invoice.id)
}
}
]
}
response = self.client.post('/batchs/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertNotIn('author', response.data)
self.assertEquals(response.data['title'], "title")
self.assertEquals(response.data['invoice']['title'], "title 3")
def test_post_should_accept_missing_field_id_nullable(self):
body = [
{
'@id': "./",
'http://happy-dev.fr/owl/#content': "post update",
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('peer_user', response.data)
def test_post_should_accept_empty_field_if_nullable(self):
body = [
{
'@id': "./",
'http://happy-dev.fr/owl/#content': "post update",
'http://happy-dev.fr/owl/#peer_user': ""
}
]
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertEqual(response.data['peer_user'], None)
def test_save_sub_object_in_new_object_with_reverse_1to1_relation(self):
dummy = LDPDummy.objects.create(some="foo")
body = [
{
'@id': "_:b216",
'http://happy-dev.fr/owl/#description': "user update",
'http://happy-dev.fr/owl/#ddummy': {
"@id": "https://happy-dev.fr{}{}/".format(Model.container_id(dummy), dummy.id)
}
},
{
'@id': './',
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "_:b216"}
}
]
response = self.client.post('/users/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
self.assertIn('userprofile', response.data)
def test_embedded_context(self):
body = {
'@graph': [
{
'@id': "./",
'content': "post update",
'peer_user': ""
}
],
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
def test_embedded_context_2(self):
body = {
'@id': "./",
'content': "post update",
'peer_user': "",
'@context': {
"@vocab": "http://happy-dev.fr/owl/#",
}
}
response = self.client.post('/posts/', data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
from django.conf import settings
from django.test import TestCase
class TestSettings(TestCase):
def test_inexistent_returns_default(self):
"""Assert a inexistent key returns the provided default value."""
assert getattr(settings, 'INEXISTENT', 'something') == 'something'
def test_only_in_core_config(self):
"""Asserts values defined only in core config."""
assert settings.DEBUG == False
def test_only_in_package(self):
"""Asserts default settings defined in the package."""
assert settings.MYPACKAGEVAR == "ok"
def test_only_in_user_config(self):
"""Asserts LDP packages are loaded from YAML file."""
assert 'djangoldp.tests' in settings.DJANGOLDP_PACKAGES
def test_overrided_core_by_package_config(self):
assert settings.USE_I18N == False
def test_overrided_package_by_user_config(self):
assert settings.USE_TZ == False
def test_overrided_core_by_user_config(self):
"""Asserts values overrided from user configuration."""
assert settings.EMAIL_HOST == 'somewhere'
def test_installed_apps_resolution(self):
"""Asserts LDP packages are referenced along with default installed apps."""
# test inclusion from server YAML settings
assert 'djangoldp.tests' in settings.INSTALLED_APPS
# test inclusion from ldppackage settings
assert 'djangoldp.tests.dummy.apps.DummyConfig' in settings.INSTALLED_APPS
# test inclusion from default core settings
assert 'djangoldp' in settings.INSTALLED_APPS
# test deduplication of dummy app
assert settings.INSTALLED_APPS.count('djangoldp.tests.dummy.apps.DummyConfig') == 1
# FIXME: We should check the order
def test_reference_middleware(self):
"""Asserts middlewares added in packages are added to the settings."""
assert 'djangoldp.tests.dummy.middleware.DummyMiddleware' in settings.MIDDLEWARE
def test_extra_module(self):
#FIXME
pass
......@@ -13,6 +13,14 @@ class TestSource(APITestCase):
pass
def test_get_resource(self):
source = LDPSource.objects.create(federation="source_name", container="http://bar.foo/")
source = LDPSource.objects.create(federation="source_name", urlid="http://bar.foo/")
response = self.client.get('/sources/{}/'.format(source.federation), content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['@id'], 'http://happy-dev.fr/sources/source_name/')
self.assertEqual(len(response.data['ldp:contains']), 1)
def test_get_empty_resource(self):
response = self.client.get('/sources/{}/'.format('unknown'), content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['@id'], 'http://happy-dev.fr/sources/unknown/')
self.assertEqual(len(response.data['ldp:contains']), 0)
import json
from django.contrib.auth.models import User
from django.test import TestCase
from rest_framework.test import APIRequestFactory, APIClient
from djangoldp.tests.models import Skill, JobOffer
class TestTemp(TestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
self.user = User.objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion')
def tearDown(self):
pass
from django.contrib.auth.models import User
import uuid
from django.conf import settings
from django.contrib.auth import get_user_model
from django.test import TestCase
from rest_framework.test import APIRequestFactory, APIClient
from rest_framework.test import APIClient, APIRequestFactory
from rest_framework.utils import json
from djangoldp.serializers import LDPSerializer
from djangoldp.tests.models import Post, UserProfile
from djangoldp.tests.models import Skill, JobOffer, Conversation, Message
from djangoldp.tests.models import (Batch, Conversation, Invoice, JobOffer,
NotificationSetting, Project, Resource,
Skill, Task, UserProfile)
class Update(TestCase):
......@@ -13,305 +16,79 @@ class Update(TestCase):
def setUp(self):
self.factory = APIRequestFactory()
self.client = APIClient()
def tearDown(self):
pass
def test_update(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3")
job1 = JobOffer.objects.create(title="job test")
job1.skills.add(skill)
job = {"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"ldp:contains": [
{"title": "new skill", "obligatoire": "okay"},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug), "title": "skill2 UP"},
]}
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
skills = result.skills.all().order_by('title')
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
def test_update_graph(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug1")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug2")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug3")
job1 = JobOffer.objects.create(title="job test", slug="slug4")
job1.skills.add(skill)
job = {"@graph":
[
{
"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug)},
{"@id": "_.123"},
]}
},
{
"@id": "_.123",
"title": "new skill",
"obligatoire": "okay"
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug),
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug),
"title": "skill2 UP"
}
]
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
skills = result.skills.all().order_by('title')
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
def test_update_graph_2(self):
skill = Skill.objects.create(title="to drop", obligatoire="obligatoire", slug="slug")
skill1 = Skill.objects.create(title="skill1", obligatoire="obligatoire", slug="slug1")
skill2 = Skill.objects.create(title="skill2", obligatoire="obligatoire", slug="slug2")
job1 = JobOffer.objects.create(title="job test", slug="slug1")
job1.skills.add(skill)
job = {"@graph":
[
{
"@id": "https://happy-dev.fr/job-offers/{}/".format(job1.slug),
"title": "job test updated",
"skills": {
"@id": "https://happy-dev.fr/job-offers/{}/skills/".format(job1.slug)
}
},
{
"@id": "_.123",
"title": "new skill",
"obligatoire": "okay"
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug),
},
{
"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug),
"title": "skill2 UP"
},
{
'@id': "https://happy-dev.fr/job-offers/{}/skills/".format(job1.slug),
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com',
password='glass onion')
self.client.force_authenticate(user=self.user)
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/326
'''
def test_update_container_append_resource(self):
pre_existing_skill_a = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug1")
pre_existing_skill_b = Skill.objects.create(title="to keep", obligatoire="obligatoire", slug="slug2")
job = JobOffer.objects.create(title="job test")
job.skills.add(pre_existing_skill_a)
job.skills.add(pre_existing_skill_b)
post = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug),
"skills": {
"ldp:contains": [
{"@id": "https://happy-dev.fr/skills/{}/".format(skill1.slug)},
{"@id": "https://happy-dev.fr/skills/{}/".format(skill2.slug)},
{"@id": "_.123"},
]
{"title": "new skill", "obligatoire": "okay"},
{"@id": "{}/skills/{}/".format(settings.BASE_URL, pre_existing_skill_b.slug), "title": "z"},
]}
}
]
}
meta_args = {'model': JobOffer, 'depth': 2, 'fields': ("@id", "title", "skills")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('JobOfferSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=job, instance=job1)
serializer.is_valid()
result = serializer.save()
skills = result.skills.all().order_by('title')
self.assertEquals(result.title, "job test updated")
self.assertIs(result.skills.count(), 3)
self.assertEquals(skills[0].title, "new skill") # new skill
self.assertEquals(skills[1].title, "skill1") # no change
self.assertEquals(skills[2].title, "skill2 UP") # title updated
self.assertEquals(skill, skill._meta.model.objects.get(pk=skill.pk)) # title updated
def test_update_list_with_reverse_relation(self):
user1 = User.objects.create()
conversation = Conversation.objects.create(description="Conversation 1", author_user=user1)
message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1)
message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1)
json = {"@graph": [
{
"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk),
"text": "Message 1 UP"
},
{
"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk),
"text": "Message 2 UP"
},
{
'@id': "https://happy-dev.fr/conversations/{}/".format(conversation.pk),
'description': "Conversation 1 UP",
"message_set": [
{"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk)},
{"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk)},
]
}
]
}
meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=json, instance=conversation)
serializer.is_valid()
result = serializer.save()
messages = result.message_set.all().order_by('text')
self.assertEquals(result.description, "Conversation 1 UP")
self.assertIs(result.message_set.count(), 2)
self.assertEquals(messages[0].text, "Message 1 UP")
self.assertEquals(messages[1].text, "Message 2 UP")
def test_add_new_element_with_foreign_key_id(self):
user1 = User.objects.create()
conversation = Conversation.objects.create(description="Conversation 1", author_user=user1)
message1 = Message.objects.create(text="Message 1", conversation=conversation, author_user=user1)
message2 = Message.objects.create(text="Message 2", conversation=conversation, author_user=user1)
json = {"@graph": [
{
"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk),
"text": "Message 1 UP",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk),
"text": "Message 2 UP",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
"@id": "_:b1",
"text": "Message 3 NEW",
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
}
},
{
'@id': "https://happy-dev.fr/conversations/{}/".format(conversation.pk),
"author_user": {
'@id': "https://happy-dev.fr/users/{}/".format(user1.pk)
},
'description': "Conversation 1 UP",
'message_set': {
"@id": "https://happy-dev.fr/conversations/{}/message_set/".format(conversation.pk)
}
},
{
'@id': "https://happy-dev.fr/conversations/{}/message_set/".format(conversation.pk),
"ldp:contains": [
{"@id": "https://happy-dev.fr/messages/{}/".format(message1.pk)},
{"@id": "https://happy-dev.fr/messages/{}/".format(message2.pk)},
{"@id": "_:b1"}
]
}
]
}
meta_args = {'model': Conversation, 'depth': 2, 'fields': ("@id", "description", "message_set")}
meta_class = type('Meta', (), meta_args)
serializer_class = type(LDPSerializer)('ConversationSerializer', (LDPSerializer,), {'Meta': meta_class})
serializer = serializer_class(data=json, instance=conversation)
serializer.is_valid()
result = serializer.save()
messages = result.message_set.all().order_by('text')
response = self.client.patch('/job-offers/{}/'.format(job.slug),
data=json.dumps(post),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEquals(result.description, "Conversation 1 UP")
self.assertIs(result.message_set.count(), 3)
self.assertEquals(messages[0].text, "Message 1 UP")
self.assertEquals(messages[1].text, "Message 2 UP")
self.assertEquals(messages[2].text, "Message 3 NEW")
self.assertEqual(response.data['title'], job.title)
self.assertIs(job.skills.count(), 3)
skills = job.skills.all().order_by('title')
self.assertEqual(skills[0].title, "new skill") # new skill
self.assertEqual(skills[1].title, pre_existing_skill_a.title) # old skill unchanged
self.assertEqual(skills[2].title, "z") # updated
self.assertEqual(skills[2].obligatoire, pre_existing_skill_b.obligatoire) # another field not updated
'''
def test_put_resource(self):
post = Post.objects.create(content="content")
skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1')
body = [{
'@id': '/posts/{}/'.format(post.pk),
'http://happy-dev.fr/owl/#content': "post content"}]
response = self.client.put('/posts/{}/'.format(post.pk), data=json.dumps(body),
'@id': '{}/skills/{}/'.format(settings.BASE_URL, skill.slug),
'https://cdn.startinblox.com/owl#title': "new", 'https://cdn.startinblox.com/owl#obligatoire': "new"}]
response = self.client.put('/skills/{}/'.format(skill.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEquals(response.data['content'], "post content")
self.assertIn('location', response._headers)
def test_create_sub_object_in_existing_object_with_reverse_1to1_relation(self):
"""
Doesn't work with depth = 0 on UserProfile Model. Should it be ?
"""
user = User.objects.create(username="alex", password="test")
body = [
{
'@id': "_:b975",
'http://happy-dev.fr/owl/#description': "user description",
'http://happy-dev.fr/owl/#dummy': {
'@id': './'
}
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "_:b975"}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['obligatoire'], "new")
self.assertIn('location', response.headers)
def test_patch_resource(self):
skill = Skill.objects.create(title='original', obligatoire='original', slug='skill1')
body = {
'@id': '{}/skills/{}'.format(settings.BASE_URL, skill.slug),
'https://cdn.startinblox.com/owl#title': 'new'
}
response = self.client.patch('/skills/{}/'.format(skill.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('userprofile', response.data)
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['obligatoire'], "original")
def test_create_sub_object_in_existing_object_with_existing_reverse_1to1_relation(self):
user = User.objects.create(username="alex", password="test")
user = get_user_model().objects.create(username="alex", password="test")
profile = UserProfile.objects.create(user=user, description="user description")
body = [
{
'@id': "/userprofiles/{}/".format(profile.pk),
'http://happy-dev.fr/owl/#description': "user update"
'https://cdn.startinblox.com/owl#description': "user update"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#userprofile': {'@id': "/userprofiles/{}/".format(profile.pk)}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "/userprofiles/{}/".format(profile.pk)}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -319,22 +96,38 @@ class Update(TestCase):
self.assertEqual(response.status_code, 200)
self.assertIn('userprofile', response.data)
def test_put_nonexistent_local_resource(self):
job = JobOffer.objects.create(title="job test")
# contains internal urlid which refers to non-existent resource
body = {"@id": "{}/job-offers/{}/".format(settings.BASE_URL, job.slug),
"skills": {
"ldp:contains": [
{"@id": "{}/skills/404/".format(settings.BASE_URL)},
]}
}
response = self.client.put('/job-offers/{}/'.format(job.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(Skill.objects.count(), 0)
def test_create_sub_object_in_existing_object_with_reverse_fk_relation(self):
"""
Doesn't work with depth = 0 on UserProfile Model. Should it be ?
"""
user = User.objects.create(username="alex", password="test")
user = get_user_model().objects.create(username="alex", password="test")
body = [
{
'@id': "_:b975",
'http://happy-dev.fr/owl/#description': "conversation description"
'https://cdn.startinblox.com/owl#description': "conversation description"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#conversation_set': {'@id': "_:b975"}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#conversation_set': {'@id': "_:b975"}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -343,19 +136,19 @@ class Update(TestCase):
self.assertIn('conversation_set', response.data)
def test_create_sub_object_in_existing_object_with_existing_reverse_fk_relation(self):
user = User.objects.create(username="alex", password="test")
user = get_user_model().objects.create(username="alex", password="test")
conversation = Conversation.objects.create(author_user=user, description="conversation description")
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update"
'https://cdn.startinblox.com/owl#description': "conversation update"
},
{
'@id': '/users/{}/'.format(user.pk),
"http://happy-dev.fr/owl/#first_name": "Alexandre",
"http://happy-dev.fr/owl/#last_name": "Bourlier",
"http://happy-dev.fr/owl/#username": "alex",
'http://happy-dev.fr/owl/#conversation_set': {'@id': "/conversations/{}/".format(conversation.pk)}
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#conversation_set': {'@id': "/conversations/{}/".format(conversation.pk)}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
......@@ -364,14 +157,13 @@ class Update(TestCase):
self.assertIn('conversation_set', response.data)
def test_missing_field_should_not_be_removed_with_fk_relation(self):
user = User.objects.create(username="alex", password="test")
peer = User.objects.create(username="sylvain", password="test2")
conversation = Conversation.objects.create(author_user=user, peer_user=peer,
peer = get_user_model().objects.create(username="sylvain", password="test2")
conversation = Conversation.objects.create(author_user=self.user, peer_user=peer,
description="conversation description")
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update",
'https://cdn.startinblox.com/owl#description': "conversation update",
}
]
response = self.client.put('/conversations/{}/'.format(conversation.pk), data=json.dumps(body),
......@@ -380,18 +172,414 @@ class Update(TestCase):
self.assertIn('peer_user', response.data)
def test_empty_field_should_be_removed_with_fk_relation(self):
user = User.objects.create(username="alex", password="test")
peer = User.objects.create(username="sylvain", password="test2")
conversation = Conversation.objects.create(author_user=user, peer_user=peer,
peer = get_user_model().objects.create(username="sylvain", password="test2")
conversation = Conversation.objects.create(author_user=self.user, peer_user=peer,
description="conversation description")
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'http://happy-dev.fr/owl/#description': "conversation update",
'http://happy-dev.fr/owl/#peer_user': ""
'https://cdn.startinblox.com/owl#description': "conversation update",
'https://cdn.startinblox.com/owl#peer_user': ""
}
]
response = self.client.put('/conversations/{}/'.format(conversation.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['peer_user'], None)
def test_m2m_new_link_bis(self):
resource = Resource.objects.create()
job = JobOffer.objects.create(title="first title", slug="job")
body = {
'https://cdn.startinblox.com/owl#joboffers':
{
'@id': "{}/resources/{}/joboffers/".format(settings.BASE_URL, resource.pk),
'ldp:contains': [
{'@id': job.urlid,
'https://cdn.startinblox.com/owl#title': "new job",
},
]
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], job.urlid)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "new job")
def test_m2m_new_link_embedded(self):
resource = Resource.objects.create()
body = {
'https://cdn.startinblox.com/owl#joboffers': {
'https://cdn.startinblox.com/owl#slug': 'aaa',
'https://cdn.startinblox.com/owl#title': "new job",
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://happy-dev.fr/job-offers/aaa/")
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "new job")
def test_m2m_existing_link(self):
resource = Resource.objects.create()
job = JobOffer.objects.create(title="first title", slug="job")
resource.joboffers.add(job)
resource.save()
body = {
'https://cdn.startinblox.com/owl#joboffers': {
# '@id': "http://testserver/resources/{}/joboffers/".format(resource.pk),
'ldp:contains': [
{
'@id': job.urlid,
'https://cdn.startinblox.com/owl#title': "new job",
}
]
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'], job.urlid)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "new job")
def test_m2m_new_link_external(self):
resource = Resource.objects.create()
body = {
'https://cdn.startinblox.com/owl#joboffers': {
'https://cdn.startinblox.com/owl#@id': 'http://external.job/job/1',
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://external.job/job/1")
self.assertIn('@type', response.data['joboffers']['ldp:contains'][0])
def test_m2m_new_link_local(self):
resource = Resource.objects.create()
job = JobOffer.objects.create(title="first title", slug="job")
body = {
'https://cdn.startinblox.com/owl#joboffers': {
'@id': 'http://happy-dev.fr/job-offers/{}/'.format(job.slug),
}
}
response = self.client.put('/resources/{}/'.format(resource.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['@id'],
"http://happy-dev.fr/job-offers/{}/".format(job.slug))
self.assertEqual(response.data['joboffers']['ldp:contains'][0]['title'], "first title")
def test_update_with_new_fk_relation(self):
conversation = Conversation.objects.create(author_user=self.user,
description="conversation description")
body = [
{
'@id': "/conversations/{}/".format(conversation.pk),
'https://cdn.startinblox.com/owl#description': "conversation update",
'https://cdn.startinblox.com/owl#peer_user': {
'@id': 'http://happy-dev.fr/users/{}'.format(self.user.pk),
}
}
]
response = self.client.put('/conversations/{}/'.format(conversation.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('peer_user', response.data)
conversation = Conversation.objects.get(pk=conversation.pk)
self.assertIsNotNone(conversation.peer_user)
user = get_user_model().objects.get(pk=self.user.pk)
self.assertEqual(user.peers_conv.count(), 1)
def test_m2m_user_link_federated(self):
project = Project.objects.create(description="project name")
body = {
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
'https://cdn.startinblox.com/owl#@id': 'http://external.user/user/1',
}
}
response = self.client.put('/projects/{}/'.format(project.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'],
"http://external.user/user/1")
self.assertIn('@type', response.data['members']['ldp:contains'][0])
self.assertEqual(len(response.data['members']['ldp:contains'][0].items()), 2)
def test_m2m_user_link_existing_external(self):
project = Project.objects.create(description="project name")
ext_user = get_user_model().objects.create(username=str(uuid.uuid4()), urlid='http://external.user/user/1')
body = {
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
'https://cdn.startinblox.com/owl#@id': ext_user.urlid,
}
}
response = self.client.put('/projects/{}/'.format(project.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['members']['ldp:contains'][0]['@id'], ext_user.urlid)
self.assertIn('@type', response.data['members']['ldp:contains'][0])
self.assertEqual(len(response.data['members']['ldp:contains'][0].items()), 2)
project = Project.objects.get(pk=project.pk)
self.assertEqual(project.members.count(), 1)
user = get_user_model().objects.get(pk=ext_user.pk)
self.assertEqual(user.projects.count(), 1)
def test_create_sub_object_in_existing_object_with_reverse_1to1_relation(self):
"""
Doesn't work with depth = 0 on UserProfile Model. Should it be ?
"""
user = get_user_model().objects.create(username="alex", password="test")
body = [
{
'@id': "_:b975",
'https://cdn.startinblox.com/owl#description': "user description",
'https://cdn.startinblox.com/owl#dummy': {
'@id': './'
}
},
{
'@id': '/users/{}/'.format(user.pk),
"https://cdn.startinblox.com/owl#first_name": "Alexandre",
"https://cdn.startinblox.com/owl#last_name": "Bourlier",
"https://cdn.startinblox.com/owl#username": "alex",
'https://cdn.startinblox.com/owl#userprofile': {'@id': "_:b975"}
}
]
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('userprofile', response.data)
self.assertIsNotNone(response.data['userprofile'])
def test_m2m_user_link_remove_existing_link(self):
ext_user = get_user_model().objects.create(username=str(uuid.uuid4()), urlid='http://external.user/user/1')
project = Project.objects.create(description="project name")
project.members.add(ext_user)
project.save()
body = {
'https://cdn.startinblox.com/owl#description': 'project name',
'https://cdn.startinblox.com/owl#members': {
}
}
response = self.client.put('/projects/{}/'.format(project.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
project = Project.objects.get(pk=project.pk)
self.assertEqual(project.members.count(), 0)
user = get_user_model().objects.get(pk=ext_user.pk)
self.assertEqual(user.projects.count(), 0)
def test_update_sub_object_with_urlid(self):
user = get_user_model().objects.create(username="alex", password="test")
profile = UserProfile.objects.create(user=user, description="user description")
body = {
'@id': '/users/{}/'.format(user.pk),
"first_name": "Alexandre",
"last_name": "Bourlier",
"username": "alex",
'userprofile': {
'@id': profile.urlid,
'description': "user update"
},
'@context': {
"@vocab": "https://cdn.startinblox.com/owl#",
}
}
response = self.client.put('/users/{}/'.format(user.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertIn('userprofile', response.data)
response = self.client.get('/userprofiles/{}/'.format(profile.pk),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['description'], "user update")
# unit tests for a specific bug: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/307
def test_direct_boolean_field(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True,
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/notificationsettings/{}/'.format(setting.pk),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['receiveMail'], True)
def test_nested_container_boolean_field_no_slug(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'settings': {
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True
},
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/userprofiles/{}/'.format(profile.slug),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['settings']['receiveMail'], True)
# variation where the lookup_field for NotificationSetting (pk) is provided
def test_nested_container_boolean_field_with_slug(self):
profile = UserProfile.objects.create(user=self.user)
setting = NotificationSetting.objects.create(user=profile, receiveMail=False)
body = {
'settings': {
'pk': setting.pk,
'https://cdn.startinblox.com/owl#@id': setting.urlid,
'receiveMail': True
},
"@context": {"@vocab": "https://cdn.startinblox.com/owl#",
"rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"rdfs": "http://www.w3.org/2000/01/rdf-schema#", "ldp": "http://www.w3.org/ns/ldp#",
"foaf": "http://xmlns.com/foaf/0.1/", "name": "rdfs:label",
"acl": "http://www.w3.org/ns/auth/acl#", "permissions": "acl:accessControl",
"mode": "acl:mode", "geo": "http://www.w3.org/2003/01/geo/wgs84_pos#", "lat": "geo:lat",
"lng": "geo:long"}
}
response = self.client.patch('/userprofiles/{}/'.format(profile.slug),
data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['settings']['receiveMail'], True)
def test_update_container_twice_nested_view(self):
invoice = Invoice.objects.create(title='test')
pre_existing_batch = Batch.objects.create(title='batch1', invoice=invoice)
pre_existing_task = Task.objects.create(title='task1', batch=pre_existing_batch)
base_url = settings.BASE_URL
body = {
"@id": "{}/invoices/{}/".format(base_url, invoice.pk),
"https://cdn.startinblox.com/owl#title": "new",
"https://cdn.startinblox.com/owl#batches": [
{
"@id": "{}/batchs/{}/".format(base_url, pre_existing_batch.pk),
"https://cdn.startinblox.com/owl#title": "new",
"https://cdn.startinblox.com/owl#tasks": [
{
"@id": "{}/tasks/{}/".format(base_url, pre_existing_task.pk),
"https://cdn.startinblox.com/owl#title": "new"
},
{
"https://cdn.startinblox.com/owl#title": "tache 2"
}
]
},
{
"https://cdn.startinblox.com/owl#title": "z",
}
]
}
response = self.client.put('/invoices/{}/'.format(invoice.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['title'], "new")
self.assertEqual(response.data['@id'], invoice.urlid)
invoice = Invoice.objects.get(pk=invoice.pk)
self.assertIs(invoice.batches.count(), 2)
batches = invoice.batches.all().order_by('title')
self.assertEqual(batches[0].title, "new")
self.assertEqual(batches[0].urlid, pre_existing_batch.urlid)
self.assertEqual(batches[1].title, "z")
self.assertIs(batches[0].tasks.count(), 2)
tasks = batches[0].tasks.all().order_by('title')
self.assertEqual(tasks[0].title, "new")
self.assertEqual(tasks[0].pk, pre_existing_task.pk)
self.assertEqual(tasks[1].title, "tache 2")
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/333
'''def test_update_container_nested_view(self):
circle = Circle.objects.create(name='test')
pre_existing = CircleMember.objects.create(user=self.user, circle=circle, is_admin=False)
another_user = get_user_model().objects.create_user(username='u2', email='u2@b.com', password='pw')
body = {
"@id": "{}/circles/{}/".format(settings.BASE_URL, circle.pk),
"https://cdn.startinblox.com/owl#name": "Updated Name",
"https://cdn.startinblox.com/owl#members": {
"ldp:contains": [
{"@id": "{}/circle-members/{}/".format(settings.BASE_URL, pre_existing.pk),
"https://cdn.startinblox.com/owl#is_admin": True},
{"https://cdn.startinblox.com/owl#user": {"@id": another_user.urlid},
"https://cdn.startinblox.com/owl#is_admin": False},
]
}
}
response = \
self.client.put('/circles/{}/'.format(circle.pk), data=json.dumps(body), content_type='application/ld+json')
print(str(self.user.urlid))
print(str(response.data))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['name'], circle.name)
self.assertEqual(response.data['@id'], circle.urlid)
self.assertIs(CircleMember.objects.count(), 2)
self.assertIs(circle.members.count(), 2)
self.assertIs(circle.team.count(), 2)
members = circle.members.all().order_by('pk')
self.assertEqual(members[0].user, self.user)
self.assertEqual(members[0].urlid, pre_existing.urlid)
self.assertEqual(members[0].pk, pre_existing.pk)
self.assertEqual(members[0].is_admin, True)
self.assertEqual(members[1].user, another_user)
self.assertEqual(members[1].is_admin, False)'''
from django.contrib.auth.models import User
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission, Group
from django.conf import settings
from django.test import override_settings
from rest_framework.test import APIClient, APITestCase
from djangoldp.permissions import LDPPermissions
from .models import JobOffer
from djangoldp.views import LDPViewSet
from djangoldp.tests.models import JobOffer, LDPDummy, PermissionlessDummy, UserProfile, OwnedResource, \
OwnedResourceNestedOwnership, OwnedResourceTwiceNestedOwnership
import json
class TestUserPermissions(APITestCase):
class UserPermissionsTestCase(APITestCase):
def setUp(self):
user = User.objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion')
self.user = get_user_model().objects.create_user(username='john', email='jlennon@beatles.com', password='glass onion')
self.client = APIClient(enforce_csrf_checks=True)
self.client.force_authenticate(user=user)
self.job = JobOffer.objects.create(title="job", slug=1)
self.client.force_authenticate(user=self.user)
self.job = JobOffer.objects.create(title="job", slug="slug1")
class TestUserPermissions(UserPermissionsTestCase):
def setUpGroup(self):
self.group = Group.objects.create(name='Test')
view_perm = Permission.objects.get(codename='view_permissionlessdummy')
self.group.permissions.add(view_perm)
self.group.save()
def _make_self_superuser(self):
self.user.is_superuser = True
self.user.save()
# list - simple
def test_get_for_authenticated_user(self):
response = self.client.get('/job-offers/')
self.assertEqual(response.status_code, 200)
# test serialized permissions
self.assertIn('view', response.data['permissions'])
self.assertNotIn('inherit', response.data['permissions'])
# self.assertNotIn('delete', response.data['permissions'])
# TODO: list - I do not have permission from the model, but I do have permission via a Group I am assigned
# https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291
'''def test_group_list_access(self):
self.setUpGroup()
dummy = PermissionlessDummy.objects.create()
response = self.client.get('/permissionless-dummys/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/permissionless-dummys/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
# repeat of the above test on nested field
def test_group_list_access_nested_field(self):
self.setUpGroup()
parent = LDPDummy.objects.create()
PermissionlessDummy.objects.create(parent=parent)
response = self.client.get('/ldpdummys/{}/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['anons']['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/ldpdummys/{}/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['anons']['ldp:contains']), 1)
# repeat of the test on a nested viewset
def test_group_list_access_nested_viewset(self):
self.setUpGroup()
parent = LDPDummy.objects.create()
PermissionlessDummy.objects.create(parent=parent)
response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/ldpdummys/{}/anons/'.format(parent.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
# repeat for object-specific request
def test_group_object_access(self):
self.setUpGroup()
dummy = PermissionlessDummy.objects.create()
response = self.client.get('/permissionless-dummys/{}'.format(dummy))
self.assertEqual(response.status_code, 404)
LDListMixin.to_representation_cache.reset()
LDPSerializer.to_representation_cache.reset()
self.user.groups.add(self.group)
self.user.save()
response = self.client.get('/permissionless-dummys/{}/'.format(dummy))
self.assertEqual(response.status_code, 200)
# TODO: test for POST scenario
# TODO: test for PUT scenario
# TODO: test for DELETE scenario
'''
@override_settings(SERIALIZE_OBJECT_EXCLUDE_PERMISSIONS=['inherit'])
def test_get_1_for_authenticated_user(self):
response = self.client.get('/job-offers/1/')
response = self.client.get('/job-offers/{}/'.format(self.job.slug))
self.assertEqual(response.status_code, 200)
self.assertIn('view', response.data['permissions'])
self.assertNotIn('inherit', response.data['permissions'])
def test_post_request_for_authenticated_user(self):
post = {'title': "job_created"}
post = {'https://cdn.startinblox.com/owl#title': "job_created", "https://cdn.startinblox.com/owl#slug": 'slug2'}
response = self.client.post('/job-offers/', data=json.dumps(post), content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
# denied because I don't have model permissions
def test_post_request_denied_model_perms(self):
data = {'https://cdn.startinblox.com/owl#some': 'title'}
response = self.client.post('/permissionless-dummys/', data=json.dumps(data), content_type='application/ld+json')
self.assertEqual(response.status_code, 403)
def test_post_nested_view_authorized(self):
data = { "https://cdn.startinblox.com/owl#title": "new skill", "https://cdn.startinblox.com/owl#obligatoire": "okay" }
response = self.client.post('/job-offers/{}/skills/'.format(self.job.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 201)
def test_post_nested_view_denied_model_perms(self):
parent = LDPDummy.objects.create(some='parent')
data = { "https://cdn.startinblox.com/owl#some": "title" }
response = self.client.post('/ldpdummys/{}/anons/'.format(parent.pk), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 403)
def test_put_request_for_authenticated_user(self):
body = {'title':"job_updated"}
response = self.client.put('/job-offers/{}/'.format(self.job.pk), data=json.dumps(body),
body = {'https://cdn.startinblox.com/owl#title':"job_updated"}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
def test_request_patch_for_authenticated_user(self):
response = self.client.patch('/job-offers/' + str(self.job.pk) + "/",
response = self.client.patch('/job-offers/' + str(self.job.slug) + "/",
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
def test_put_request_denied_model_perms(self):
dummy = PermissionlessDummy.objects.create(some='some', slug='slug')
data = {'https://cdn.startinblox.com/owl#some': 'new'}
response = self.client.put('/permissionless-dummys/{}/'.format(dummy.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 404)
def test_put_nested_view_denied_model_perms(self):
parent = LDPDummy.objects.create(some='parent')
child = PermissionlessDummy.objects.create(some='child', slug='child', parent=parent)
data = {"https://cdn.startinblox.com/owl#some": "new"}
response = self.client.put('/ldpdummys/{}/anons/{}/'.format(parent.pk, child.slug), data=json.dumps(data),
content_type='application/ld+json')
self.assertEqual(response.status_code, 404)
#TODO: check how this could ever work
# def test_patch_nested_container_attach_existing_resource_permission_denied(self):
# '''I am attempting to add a resource which I should not know exists'''
# parent = LDPDummy.objects.create(some='parent')
# dummy = PermissionlessDummy.objects.create(some='some', slug='slug')
# data = {
# 'https://cdn.startinblox.com/owl#anons': [
# {'@id': '{}/permissionless-dummys/{}/'.format(settings.SITE_URL, dummy.slug), 'https://cdn.startinblox.com/owl#slug': dummy.slug}
# ]
# }
# response = self.client.patch('/ldpdummys/{}/'.format(parent.pk), data=json.dumps(data), content_type='application/ld+json')
# self.assertEqual(response.status_code, 404)
# variations on previous tests with an extra level of depth
# TODO
def test_post_nested_container_twice_nested_permission_denied(self):
pass
# TODO
def test_put_nested_container_twice_nested_permission_denied(self):
pass
# TODO: repeat of the above where it is authorized because I have permission through my Group
# https://git.startinblox.com/djangoldp-packages/djangoldp/issues/291
def test_put_request_change_urlid_rejected(self):
self.assertEqual(JobOffer.objects.count(), 1)
body = {'@id': "ishouldnotbeabletochangethis"}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
# TODO: this is failing quietly
# https://git.happy-dev.fr/startinblox/solid-spec/issues/14
self.assertEqual(response.status_code, 200)
self.assertEqual(JobOffer.objects.count(), 1)
self.assertFalse(JobOffer.objects.filter(urlid=body['@id']).exists())
def test_put_request_change_pk_rejected(self):
self.assertEqual(JobOffer.objects.count(), 1)
body = {'https://cdn.startinblox.com/owl#pk': 2}
response = self.client.put('/job-offers/{}/'.format(self.job.slug), data=json.dumps(body),
content_type='application/ld+json')
# TODO: this is failing quietly
# https://git.happy-dev.fr/startinblox/solid-spec/issues/14
self.assertEqual(response.status_code, 200)
self.assertEqual(JobOffer.objects.count(), 1)
self.assertFalse(JobOffer.objects.filter(pk=body['https://cdn.startinblox.com/owl#pk']).exists())
# tests that I receive a list of objects for which I am owner, filtering those for which I am not
def test_list_owned_resources(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
self.assertEqual(response.data['ldp:contains'][0]['@id'], my_resource.urlid)
# I do not have model permissions as an authenticated user, but I am the resources' owner
def test_get_owned_resource(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/{}/'.format(my_resource.pk))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.data['@id'], my_resource.urlid)
self.assertIn('delete', response.data['permissions'])
# I have permission to view this resource
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
def test_patch_owned_resource(self):
my_profile = UserProfile.objects.create(user=self.user, slug=self.user.username, description='about me')
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about')
response = self.client.patch('/userprofiles/{}/'.format(my_profile.slug))
self.assertEqual(response.status_code, 200)
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
self.assertEqual(response.status_code, 403)
def test_delete_owned_resource(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.delete('/ownedresources/{}/'.format(my_resource.pk))
self.assertEqual(response.status_code, 204)
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
# test superuser permissions (configured on model)
def test_list_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 0)
# now I'm superuser, I have the permissions
self._make_self_superuser()
response = self.client.get('/ownedresources/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
def test_get_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
self._make_self_superuser()
response = self.client.patch('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 200)
def test_put_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_profile = UserProfile.objects.create(user=another_user, slug=another_user.username, description='about')
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/336
self.assertEqual(response.status_code, 403)
self._make_self_superuser()
response = self.client.patch('/userprofiles/{}/'.format(their_profile.slug))
self.assertEqual(response.status_code, 200)
def test_delete_superuser_perms(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 404)
self._make_self_superuser()
response = self.client.delete('/ownedresources/{}/'.format(their_resource.pk))
self.assertEqual(response.status_code, 204)
# I have model (or object?) permissions. Attempt to make myself owner and thus upgrade my permissions
# TODO: https://git.startinblox.com/djangoldp-packages/djangoldp/issues/356/
'''
def test_hack_model_perms_privilege_escalation(self):
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
resource = OwnedResourceVariant.objects.create(description='another test', user=another_user)
# authenticated has 'change' permission but only owner's have 'control' permission, meaning that I should
# not be able to change my privilege level
body = {
'https://cdn.startinblox.com/owl#user': {'@id': self.user.urlid}
}
response = self.client.put('/ownedresourcevariants/{}/'.format(resource.pk), data=json.dumps(body),
content_type='application/ld+json')
self.assertEqual(response.status_code, 200)
resource = OwnedResourceVariant.objects.get(pk=resource.pk)
self.assertNotEqual(resource.user, self.user)
'''
class TestOwnerFieldUserPermissions(UserPermissionsTestCase):
restore_meta = None
def setUpTempOwnerFieldForModel(self, model, new_owner_field):
# store the old meta information for tearDown to cleanup after the test
if self.restore_meta is None:
self.restore_meta = []
self.restore_meta.append({
"model": model,
"owner_field": model._meta.owner_field
})
# replace the owner_field attribute for the test to run
model._meta.owner_field = new_owner_field
def tearDown(self):
# restore any previously changed owner_field attributes in the test
if self.restore_meta is not None:
for idx, model in enumerate(self.restore_meta):
model = self.restore_meta[idx]["model"]
model._meta.owner_field = self.restore_meta[idx]["owner_field"]
self.restore_meta = None
def test_list_owned_resources_nested(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
response = self.client.get('/ownedresourcenestedownerships/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 2)
ids = [r['@id'] for r in response.data['ldp:contains']]
self.assertIn(my_nested.urlid, ids)
self.assertIn(my_second_nested.urlid, ids)
self.assertNotIn(their_nested.urlid, ids)
def test_list_owned_resources_nested_variation_urlid(self):
owner_field = OwnedResourceNestedOwnership._meta.owner_field
OwnedResourceNestedOwnership._meta.owner_field = None
OwnedResourceNestedOwnership._meta.owner_urlid_field = owner_field + "__urlid"
self.test_list_owned_resources_nested()
OwnedResourceNestedOwnership._meta.owner_urlid_field = None
OwnedResourceNestedOwnership._meta.owner_field = owner_field
def test_list_owned_resources_nested_variation_twice_nested(self):
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
my_twice_nested = OwnedResourceTwiceNestedOwnership.objects.create(description="test", parent=my_nested)
their_twice_nested = OwnedResourceTwiceNestedOwnership.objects.create(description="test", parent=their_nested)
response = self.client.get('/ownedresourcetwicenestedownerships/')
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data['ldp:contains']), 1)
ids = [r['@id'] for r in response.data['ldp:contains']]
self.assertIn(my_twice_nested.urlid, ids)
self.assertNotIn(their_twice_nested.urlid, ids)
def test_list_owned_resources_nested_does_not_exist(self):
self.setUpTempOwnerFieldForModel(OwnedResourceNestedOwnership, "parent__doesnotexist")
my_resource = OwnedResource.objects.create(description='test', user=self.user)
my_second_resource = OwnedResource.objects.create(description='test', user=self.user)
another_user = get_user_model().objects.create_user(username='test', email='test@test.com', password='test')
their_resource = OwnedResource.objects.create(description='another test', user=another_user)
my_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_resource)
my_second_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=my_second_resource)
their_nested = OwnedResourceNestedOwnership.objects.create(description="test", parent=their_resource)
self.assertRaises(ValueError, self.client.get, '/ownedresourcenestedownerships/')
from importlib import import_module
from django.conf import settings
from django.conf.urls import url, include
from django.contrib.auth.models import Group
from django.urls import path, re_path, include
from djangoldp.models import LDPSource, Model
from djangoldp.views import LDPSourceViewSet
from djangoldp.permissions import LDPPermissions
from djangoldp.permissions import ReadOnly
from djangoldp.views import LDPSourceViewSet, WebFingerView, InboxView
from djangoldp.views import LDPViewSet, serve_static_content
def __clean_path(path):
'''ensures path is Django-friendly'''
if path.startswith("/"):
path = path[1:]
if not path.endswith("/"):
......@@ -16,31 +19,66 @@ def __clean_path(path):
return path
def get_all_non_abstract_subclasses(cls):
'''
returns a set of all subclasses for a given Python class (recursively calls cls.__subclasses__()). Ignores Abstract
classes
'''
def valid_subclass(sc):
'''returns True if the parameterised subclass is valid and should be returned'''
return not getattr(sc._meta, 'abstract', False)
return set(c for c in cls.__subclasses__() if valid_subclass(c)).union(
[subclass for c in cls.__subclasses__() for subclass in get_all_non_abstract_subclasses(c) if valid_subclass(subclass)])
urlpatterns = [
url(r'^sources/', LDPSourceViewSet.urls(model=LDPSource)),
path('groups/', LDPViewSet.urls(model=Group)),
re_path(r'^sources/(?P<federation>\w+)/', LDPSourceViewSet.urls(model=LDPSource, fields=['federation', 'urlid'],
permission_classes=[ReadOnly], )),
re_path(r'^\.well-known/webfinger/?$', WebFingerView.as_view()),
path('inbox/', InboxView.as_view()),
re_path(r'^ssr/(?P<path>.*)$', serve_static_content, name='serve_static_content'),
]
if settings.ENABLE_SWAGGER_DOCUMENTATION:
from drf_spectacular.views import SpectacularAPIView, SpectacularSwaggerView
urlpatterns.extend([
path("schema/", SpectacularAPIView.as_view(), name="schema"),
path(
"docs/",
SpectacularSwaggerView.as_view(
template_name="swagger-ui.html", url_name="schema"
),
name="swagger-ui",
)
])
for package in settings.DJANGOLDP_PACKAGES:
try:
import_module('{}.models'.format(package))
except ModuleNotFoundError:
pass
model_classes = {cls.__name__: cls for cls in Model.__subclasses__()}
for class_name in model_classes:
model_class = model_classes[class_name]
path = __clean_path(model_class.get_container_path())
urls_fct = model_class.get_view_set().urls
urlpatterns.append(url(r'^' + path, include(
urls_fct(model=model_class,
lookup_field=Model.get_meta(model_class, 'lookup_field', 'pk'),
permission_classes=Model.get_meta(model_class, 'permission_classes', [LDPPermissions]),
fields=Model.get_meta(model_class, 'serializer_fields', []),
nested_fields=Model.get_meta(model_class, 'nested_fields', [])))))
for package in settings.DJANGOLDP_PACKAGES:
try:
urlpatterns.append(url(r'^', include('{}.djangoldp_urls'.format(package))))
urlpatterns.append(path('', include('{}.djangoldp_urls'.format(package))))
except ModuleNotFoundError:
pass
# append urls for all DjangoLDP Model subclasses
for model in get_all_non_abstract_subclasses(Model):
# the path is the url for this model
model_path = __clean_path(model.get_container_path())
# urls_fct will be a method which generates urls for a ViewSet (defined in LDPViewSetGenerator)
urls_fct = getattr(model, 'view_set', LDPViewSet).urls
disable_url = getattr(model._meta, 'disable_url', False)
if not disable_url:
urlpatterns.append(path('' + model_path,
urls_fct(model=model,
lookup_field=getattr(model._meta, 'lookup_field', 'pk'),
permission_classes=getattr(model._meta, 'permission_classes', []),
fields=getattr(model._meta, 'serializer_fields', []),
nested_fields=getattr(model._meta, 'nested_fields', [])
)))
# NOTE: this route will be ignored if a custom (subclass of Model) user model is used, or it is registered by a package
# Django matches the first url it finds for a given path
urlpatterns.append(re_path('users/', LDPViewSet.urls(model=settings.AUTH_USER_MODEL, permission_classes=[])))
\ No newline at end of file
from django.conf import settings
from guardian.utils import get_anonymous_user
PASSTHROUGH_IPS = getattr(settings, 'PASSTHROUGH_IPS', '')
# convenience function returns True if user is anonymous
def is_anonymous_user(user):
anonymous_username = getattr(settings, 'ANONYMOUS_USER_NAME', None)
return user.is_anonymous or user.username == anonymous_username
# convenience function returns True if user is authenticated
def is_authenticated_user(user):
anonymous_username = getattr(settings, 'ANONYMOUS_USER_NAME', None)
return user.is_authenticated and user.username != anonymous_username
# this method is used to check if a given IP is part of the PASSTHROUGH_IPS list
def check_client_ip(request):
x_forwarded_for = request.headers.get('x-forwarded-for')
if x_forwarded_for:
if any(ip in x_forwarded_for.replace(' ', '').split(',') for ip in PASSTHROUGH_IPS):
return True
elif request.META.get('REMOTE_ADDR') in PASSTHROUGH_IPS:
return True
return False
import sys
from importlib import reload
import json
import logging
import os
import time
import validators
from django.apps import apps
from django.conf import settings
from django.conf.urls import url, include
from django.contrib.auth import get_user_model
from django.core.exceptions import FieldDoesNotExist
from django.core.urlresolvers import get_resolver
from django.db.models.signals import post_save, post_delete
from django.db.utils import OperationalError, ProgrammingError
from django.dispatch import receiver
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist
from django.db import IntegrityError, transaction
from django.http import Http404, HttpResponseNotFound, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import clear_url_caches
from django.urls import include, path, re_path
from django.urls.resolvers import get_resolver
from django.utils.decorators import classonlymethod
from django.views import View
from pyld import jsonld
from rest_framework import status
from rest_framework.authentication import SessionAuthentication
from rest_framework.exceptions import ParseError
from rest_framework.parsers import JSONParser
from rest_framework.permissions import AllowAny
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.utils import model_meta
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet
from djangoldp.models import LDPSource, Model
from djangoldp.permissions import LDPPermissions
from djangoldp.activities import (ACTIVITY_SAVING_SETTING, ActivityPubService,
ActivityQueueService, as_activitystream)
from djangoldp.activities.errors import (ActivityStreamDecodeError,
ActivityStreamValidationError)
from djangoldp.endpoints.webfinger import WebFingerEndpoint, WebFingerError
from djangoldp.filters import (LocalObjectOnContainerPathBackend,
SearchByQueryParamFilterBackend)
from djangoldp.models import DynamicNestedField, Follower, LDPSource, Model
from djangoldp.related import get_prefetch_fields
from djangoldp.utils import is_authenticated_user
logger = logging.getLogger('djangoldp')
get_user_model()._meta.rdf_context = {"get_full_name": "rdfs:label"}
# renders into JSONLD format by applying context to the data
# https://github.com/digitalbazaar/pyld
class JSONLDRenderer(JSONRenderer):
media_type = 'application/ld+json'
def render(self, data, accepted_media_type=None, renderer_context=None):
if data is not None:
data["@context"] = settings.LDP_RDF_CONTEXT
if isinstance(data, dict):
context = data.get("@context")
if isinstance(context, list):
data["@context"] = [settings.LDP_RDF_CONTEXT] + context
elif isinstance(context, str) or isinstance(context, dict):
data["@context"] = [settings.LDP_RDF_CONTEXT, context]
else:
data["@context"] = settings.LDP_RDF_CONTEXT
return super(JSONLDRenderer, self).render(data, accepted_media_type, renderer_context)
# https://github.com/digitalbazaar/pyld
class JSONLDParser(JSONParser):
#TODO: It current only works with pyld 1.0. We need to check our support of JSON-LD
media_type = 'application/ld+json'
def parse(self, stream, media_type=None, parser_context=None):
data = super(JSONLDParser, self).parse(stream, media_type, parser_context)
return jsonld.compact(data, ctx=settings.LDP_RDF_CONTEXT)
# compact applies the context to the data and makes it a format which is easier to work with
# see: http://json-ld.org/spec/latest/json-ld/#compacted-document-form
try:
return jsonld.compact(data, ctx=settings.LDP_RDF_CONTEXT)
except jsonld.JsonLdError as e:
raise ParseError(str(e.cause))
# an authentication class which exempts CSRF authentication
class NoCSRFAuthentication(SessionAuthentication):
def enforce_csrf(self, request):
return
class InboxView(APIView):
"""
Receive linked data notifications
"""
permission_classes = [AllowAny, ]
renderer_classes = [JSONLDRenderer]
def post(self, request, *args, **kwargs):
'''
receiver for inbox messages. See https://www.w3.org/TR/ldn/
'''
try:
activity = json.loads(request.body, object_hook=as_activitystream)
activity.validate()
except ActivityStreamDecodeError:
return Response('Activity type unsupported', status=status.HTTP_405_METHOD_NOT_ALLOWED)
except ActivityStreamValidationError as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
try:
self._handle_activity(activity, **kwargs)
except IntegrityError:
return Response({'Unable to save due to an IntegrityError in the receiver model'},
status=status.HTTP_200_OK)
except ValueError as e:
return Response(str(e), status=status.HTTP_400_BAD_REQUEST)
# save the activity and return 201
if ACTIVITY_SAVING_SETTING == 'VERBOSE':
obj = ActivityQueueService._save_sent_activity(activity.to_json(), local_id=request.path_info, success=True,
type=activity.type)
response = Response({}, status=status.HTTP_201_CREATED)
response['Location'] = obj.urlid
else:
response = Response({}, status=status.HTTP_200_OK)
return response
def _handle_activity(self, activity, **kwargs):
if activity.type == 'Add':
self.handle_add_activity(activity, **kwargs)
elif activity.type == 'Remove':
self.handle_remove_activity(activity, **kwargs)
elif activity.type == 'Delete':
self.handle_delete_activity(activity, **kwargs)
elif activity.type == 'Create' or activity.type == 'Update':
self.handle_create_or_update_activity(activity, **kwargs)
elif activity.type == 'Follow':
self.handle_follow_activity(activity, **kwargs)
def atomic_get_or_create_nested_backlinks(self, obj, object_model=None, update=False):
'''
a version of get_or_create_nested_backlinks in which all nested backlinks are created, or none of them are
'''
try:
with transaction.atomic():
return self._get_or_create_nested_backlinks(obj, object_model, update)
except IntegrityError as e:
logger.error(str(e))
logger.warning(
'received a backlink which you were not able to save because of a constraint on the model field.')
raise e
def _get_or_create_nested_backlinks(self, obj, object_model=None, update=False):
'''
recursively deconstructs a tree of nested objects, using get_or_create on each leaf/branch
:param obj: Dict representation of the object
:param object_model: The Model class of the object. Will be discovered if set to None
:param update: if True will update retrieved objects with new data
:raises Exception: if get_or_create fails on a branch, the creation will be reversed and the Exception re-thrown
'''
# store a list of the object's sub-items
if object_model is None:
object_model = Model.get_subclass_with_rdf_type(obj['@type'])
if object_model is None:
raise Http404('unable to store type ' + obj['@type'] + ', model with this rdf_type not found')
branches = {}
for item in obj.items():
# TODO: parse other data types. Match the key to the field_name
if isinstance(item[1], dict):
item_value = item[1]
item_model = Model.get_subclass_with_rdf_type(item_value['@type'])
if item_model is None:
raise Http404(
'unable to store type ' + item_value['@type'] + ', model with this rdf_type not found')
# push nested object tuple as a branch
backlink = self._get_or_create_nested_backlinks(item_value, item_model)
branches[item[0]] = backlink
# get or create the backlink
try:
if obj['@id'] is None or not validators.url(obj['@id']):
raise ValueError('received invalid urlid ' + str(obj['@id']))
external = Model.get_or_create_external(object_model, obj['@id'], update=update, **branches)
# creating followers, to inform distant resource of changes to local connection
if Model.is_external(external):
# this is handled with Followers, where each local child of the branch is followed by its external parent
for item in obj.items():
urlid = item[1]
if isinstance(item[1], dict):
urlid = urlid['@id']
if not isinstance(urlid, str) or not validators.url(urlid):
continue
if not Model.is_external(urlid):
ActivityPubService.save_follower_for_target(external.urlid, urlid)
return external
# this will be raised when the object was local, but it didn't exist
except ObjectDoesNotExist:
raise Http404(getattr(object_model._meta, 'label', 'Unknown Model') + ' ' + str(obj['@id']) + ' does not exist')
# TODO: a fallback here? Saving the backlink as Object or similar
def _get_subclass_with_rdf_type_or_404(self, rdf_type):
model = Model.get_subclass_with_rdf_type(rdf_type)
if model is None:
raise Http404('unable to store type ' + rdf_type + ', model not found')
return model
def handle_add_activity(self, activity, **kwargs):
'''
handles Add Activities. See https://www.w3.org/ns/activitystreams
Indicates that the actor has added the object to the target
'''
object_model = self._get_subclass_with_rdf_type_or_404(activity.object['@type'])
target_model = self._get_subclass_with_rdf_type_or_404(activity.target['@type'])
try:
target = target_model.objects.get(urlid=activity.target['@id'])
except target_model.DoesNotExist:
return Response({}, status=status.HTTP_404_NOT_FOUND)
# store backlink(s) in database
backlink = self.atomic_get_or_create_nested_backlinks(activity.object, object_model)
# add object to target
target_info = model_meta.get_field_info(target_model)
for field_name, relation_info in target_info.relations.items():
if relation_info.related_model == object_model:
attr = getattr(target, field_name)
if not attr.filter(urlid=backlink.urlid).exists():
attr.add(backlink)
ActivityPubService.save_follower_for_target(backlink.urlid, target.urlid)
def handle_remove_activity(self, activity, **kwargs):
'''
handles Remove Activities. See https://www.w3.org/ns/activitystreams
Indicates that the actor has removed the object from the origin
'''
# TODO: Remove Activity may pass target instead
object_model = self._get_subclass_with_rdf_type_or_404(activity.object['@type'])
origin_model = self._get_subclass_with_rdf_type_or_404(activity.origin['@type'])
# get the model reference to saved object
try:
origin = origin_model.objects.get(urlid=activity.origin['@id'])
object_instance = object_model.objects.get(urlid=activity.object['@id'])
except origin_model.DoesNotExist:
raise Http404(activity.origin['@id'] + ' did not exist')
except object_model.DoesNotExist:
return
# remove object from origin
origin_info = model_meta.get_field_info(origin_model)
for field_name, relation_info in origin_info.relations.items():
if relation_info.related_model == object_model:
attr = getattr(origin, field_name)
if attr.filter(urlid=object_instance.urlid).exists():
attr.remove(object_instance)
ActivityPubService.remove_followers_for_resource(origin.urlid, object_instance.urlid)
def handle_create_or_update_activity(self, activity, **kwargs):
'''
handles Create & Update Activities. See https://www.w3.org/ns/activitystreams
'''
object_model = self._get_subclass_with_rdf_type_or_404(activity.object['@type'])
self.atomic_get_or_create_nested_backlinks(activity.object, object_model, update=True)
def handle_delete_activity(self, activity, **kwargs):
'''
handles Remove Activities. See https://www.w3.org/ns/activitystreams
Indicates that the actor has deleted the object
'''
object_model = self._get_subclass_with_rdf_type_or_404(activity.object['@type'])
# get the model reference to saved object
try:
object_instance = object_model.objects.get(urlid=activity.object['@id'])
except object_model.DoesNotExist:
return
# disable backlinks first - prevents a duplicate being sent back
object_instance.allow_create_backlink = False
object_instance.save()
object_instance.delete()
urlid = getattr(object_instance, 'urlid', None)
if urlid is not None:
for follower in Follower.objects.filter(follower=urlid):
follower.delete()
def handle_follow_activity(self, activity, **kwargs):
'''
handles Follow Activities. See https://www.w3.org/ns/activitystreams
Indicates that the actor is following the object, and should receive Updates on what happens to it
'''
object_model = self._get_subclass_with_rdf_type_or_404(activity.object['@type'])
# get the model reference to saved object
try:
object_instance = object_model.objects.get(urlid=activity.object['@id'])
except object_model.DoesNotExist:
raise Http404(activity.object['@id'] + ' did not exist')
if Model.is_external(object_instance):
raise Http404(activity.object['@id'] + ' is not local to this server')
# get the inbox field from the actor
if isinstance(activity.actor, str):
inbox = activity.actor
else:
inbox = getattr(activity.actor, 'inbox', None)
if inbox is None:
inbox = getattr(activity.actor, 'id', getattr(activity.actor, '@id'))
if not Follower.objects.filter(object=object_instance.urlid, inbox=inbox).exists():
Follower.objects.create(object=object_instance.urlid, inbox=inbox)
class LDPViewSetGenerator(ModelViewSet):
"""An extension of ModelViewSet that generates automatically URLs for the model"""
model = None
......@@ -55,6 +324,10 @@ class LDPViewSetGenerator(ModelViewSet):
list_actions = {'get': 'list', 'post': 'create'}
detail_actions = {'get': 'retrieve', 'put': 'update', 'patch': 'partial_update', 'delete': 'destroy'}
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.lookup_field = LDPViewSetGenerator.get_lookup_arg(**kwargs)
@classonlymethod
def get_model(cls, **kwargs):
'''gets the model in the arguments or in the viewset definition'''
......@@ -65,7 +338,8 @@ class LDPViewSetGenerator(ModelViewSet):
@classonlymethod
def get_lookup_arg(cls, **kwargs):
return kwargs.get('lookup_url_kwarg') or cls.lookup_url_kwarg or kwargs.get('lookup_field') or cls.lookup_field
return kwargs.get('lookup_url_kwarg') or cls.lookup_url_kwarg or kwargs.get('lookup_field') or \
getattr(kwargs['model']._meta, 'lookup_field', 'pk') or cls.lookup_field
@classonlymethod
def get_detail_expr(cls, lookup_field=None, **kwargs):
......@@ -74,26 +348,67 @@ class LDPViewSetGenerator(ModelViewSet):
lookup_group = r'\d' if lookup_field == 'pk' else r'[\w\-\.]'
return r'(?P<{}>{}+)/'.format(lookup_field, lookup_group)
@classonlymethod
def build_nested_view_set(cls, view_set=None):
'''returns the the view_set parameter mixed into the LDPNestedViewSet class'''
if view_set is not None:
class LDPNestedCustomViewSet(LDPNestedViewSet, view_set):
pass
return LDPNestedCustomViewSet
return LDPNestedViewSet
@classonlymethod
def urls(cls, **kwargs):
'''constructs urls list for model passed in kwargs'''
kwargs['model'] = cls.get_model(**kwargs)
model_name = kwargs['model']._meta.object_name.lower()
if kwargs.get('model_prefix'):
model_name = '{}-{}'.format(kwargs['model_prefix'], model_name)
detail_expr = cls.get_detail_expr(**kwargs)
# Gets permissions on the model if not explicitely passed to the view
if not 'permission_classes' in kwargs and hasattr(kwargs['model']._meta, 'permission_classes'):
kwargs['permission_classes'] = kwargs['model']._meta.permission_classes
urls = [
url('^$', cls.as_view(cls.list_actions, **kwargs), name='{}-list'.format(model_name)),
url('^' + detail_expr + '$', cls.as_view(cls.detail_actions, **kwargs),
name='{}-detail'.format(model_name)),
path('', cls.as_view(cls.list_actions, **kwargs), name='{}-list'.format(model_name)),
re_path('^' + detail_expr + '$', cls.as_view(cls.detail_actions, **kwargs),
name='{}-detail'.format(model_name)),
]
for field in kwargs.get('nested_fields') or cls.nested_fields:
urls.append(url('^' + detail_expr + field + '/', LDPNestedViewSet.nested_urls(field, **kwargs)))
# append nested fields to the urls list
for field_name in kwargs.get('nested_fields') or cls.nested_fields:
try:
nested_field = kwargs['model']._meta.get_field(field_name)
nested_model = nested_field.related_model
field_name_to_parent = nested_field.remote_field.name
except FieldDoesNotExist:
nested_model = getattr(kwargs['model'], field_name).field.model
nested_field = getattr(kwargs['model'], field_name).field.remote_field
field_name_to_parent = getattr(kwargs['model'], field_name).field.name
# urls should be called from _nested_ view set, which may need a custom view set mixed in
view_set = getattr(nested_model._meta, 'view_set', None)
nested_view_set = cls.build_nested_view_set(view_set)
urls.append(re_path('^' + detail_expr + field_name + '/',
nested_view_set.urls(
model=nested_model,
model_prefix=kwargs['model']._meta.object_name.lower(), # prefix with parent name
lookup_field=getattr(nested_model._meta, 'lookup_field', 'pk'),
exclude=(field_name_to_parent,) if nested_field.one_to_many else (),
permission_classes=getattr(nested_model._meta, 'permission_classes', []),
nested_field_name=field_name,
fields=getattr(nested_model._meta, 'serializer_fields', []),
nested_fields=[],
parent_model=kwargs['model'],
parent_lookup_field=cls.get_lookup_arg(**kwargs),
nested_field=nested_field,
field_name_to_parent=field_name_to_parent)))
return include(urls)
# LDPViewSetGenerator is a ModelViewSet (DRF) with methods to automatically generate model urls
class LDPViewSet(LDPViewSetGenerator):
"""An automatically generated viewset that serves models following the Linked Data Platform convention"""
fields = None
......@@ -101,56 +416,94 @@ class LDPViewSet(LDPViewSetGenerator):
renderer_classes = (JSONLDRenderer,)
parser_classes = (JSONLDParser,)
authentication_classes = (NoCSRFAuthentication,)
filter_backends = [SearchByQueryParamFilterBackend, LocalObjectOnContainerPathBackend]
prefetch_fields = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.permission_classes:
for p in self.permission_classes:
if hasattr(p, 'filter_class') and p.filter_class:
self.filter_backends = p.filter_class
self.serializer_class = self.build_read_serializer()
self.write_serializer_class = self.build_write_serializer()
def build_read_serializer(self):
model_name = self.model._meta.object_name.lower()
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
meta_args = {'model': self.model, 'extra_kwargs': {
'@id': {'lookup_field': lookup_field}},
'depth': getattr(self, 'depth', Model.get_meta(self.model, 'depth', 0)),
'extra_fields': self.nested_fields}
return self.build_serializer(meta_args, 'Read')
def build_write_serializer(self):
# attach filter backends based on permissions classes, to reduce the queryset based on these permissions
# https://www.django-rest-framework.org/api-guide/filtering/#generic-filtering
self.filter_backends = type(self).filter_backends + list({perm_class().get_filter_backend(self.model)
for perm_class in self.permission_classes if hasattr(perm_class(), 'get_filter_backend')})
if None in self.filter_backends:
self.filter_backends.remove(None)
def filter_queryset(self, queryset):
if self.request.user.is_superuser:
return queryset
return super().filter_queryset(queryset)
def check_permissions(self, request):
if request.user.is_superuser:
return True
return super().check_permissions(request)
def check_object_permissions(self, request, obj):
if request.user.is_superuser:
return True
return super().check_object_permissions(request, obj)
def get_depth(self) -> int:
if getattr(self, 'force_depth', None):
#TODO: this exception on depth for writing should be handled by the serializer itself
return self.force_depth
if hasattr(self, 'request') and 'HTTP_DEPTH' in self.request.META:
return int(self.request.META['HTTP_DEPTH'])
if hasattr(self, 'depth'):
return self.depth
return getattr(self.model._meta, 'depth', 0)
def get_serializer_class(self):
model_name = self.model._meta.object_name.lower()
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
try:
lookup_field = get_resolver().reverse_dict[model_name + '-detail'][0][0][1][0]
except:
lookup_field = 'urlid'
meta_args = {'model': self.model, 'extra_kwargs': {
'@id': {'lookup_field': lookup_field}},
'depth': 10,
'extra_fields': self.nested_fields}
return self.build_serializer(meta_args, 'Write')
'@id': {'lookup_field': lookup_field}},
'depth': self.get_depth(),
'extra_fields': self.nested_fields}
def build_serializer(self, meta_args, name_prefix):
if self.fields:
meta_args['fields'] = self.fields
else:
meta_args['exclude'] = self.exclude or ()
meta_class = type('Meta', (), meta_args)
meta_args['exclude'] = self.exclude or getattr(self.model._meta, 'serializer_fields_exclude', ())
# create the Meta class to associate to LDPSerializer, using meta_args param
from djangoldp.serializers import LDPSerializer
return type(LDPSerializer)(self.model._meta.object_name.lower() + name_prefix + 'Serializer', (LDPSerializer,),
if self.serializer_class is None:
self.serializer_class = LDPSerializer
parent_meta = (self.serializer_class.Meta,) if hasattr(self.serializer_class, 'Meta') else ()
meta_class = type('Meta', parent_meta, meta_args)
return type(self.serializer_class)(self.model._meta.object_name.lower() + 'Serializer',
(self.serializer_class,),
{'Meta': meta_class})
# The chaining of filter through | may lead to duplicates and distinct should only be applied in the end.
def filter_queryset(self, queryset):
return super().filter_queryset(queryset).distinct()
def create(self, request, *args, **kwargs):
serializer = self.get_write_serializer(data=request.data)
self.force_depth = 10
serializer = self.get_serializer(data=request.data)
self.force_depth = None
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)
response_serializer = self.get_serializer()
data = response_serializer.to_representation(serializer.instance)
headers = self.get_success_headers(data)
return Response(data, status=status.HTTP_201_CREATED, headers=headers)
def update(self, request, *args, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_write_serializer(instance, data=request.data, partial=partial)
self.force_depth = 10
serializer = self.get_serializer(instance, data=request.data, partial=partial)
self.force_depth = None
serializer.is_valid(raise_exception=True)
self.perform_update(serializer)
......@@ -159,62 +512,37 @@ class LDPViewSet(LDPViewSetGenerator):
# forcibly invalidate the prefetch cache on the instance.
instance._prefetched_objects_cache = {}
return Response(serializer.data)
def get_write_serializer(self, *args, **kwargs):
"""
Return the serializer instance that should be used for validating and
deserializing input, and for serializing output.
"""
serializer_class = self.get_write_serializer_class()
kwargs['context'] = self.get_serializer_context()
return serializer_class(*args, **kwargs)
def get_write_serializer_class(self):
"""
Return the class to use for the serializer.
Defaults to using `self.write_serializer_class`.
You may want to override this if you need to provide different
serializations depending on the incoming request.
(Eg. admins get full serialization, others get basic serialization)
"""
assert self.write_serializer_class is not None, (
"'%s' should either include a `write_serializer_class` attribute, "
"or override the `get_write_serializer_class()` method."
% self.__class__.__name__
)
return self.write_serializer_class
response_serializer = self.get_serializer()
data = response_serializer.to_representation(serializer.instance)
return Response(data)
def perform_create(self, serializer, **kwargs):
if hasattr(self.model._meta, 'auto_author') and isinstance(self.request.user, get_user_model()):
kwargs[self.model._meta.auto_author] = self.request.user
serializer.save(**kwargs)
kwargs[self.model._meta.auto_author] = get_user_model().objects.get(pk=self.request.user.pk)
return serializer.save(**kwargs)
def get_queryset(self, *args, **kwargs):
if self.model:
return self.model.objects.all()
queryset = self.model.objects.all()
else:
return super(LDPView, self).get_queryset(*args, **kwargs)
queryset = super(LDPViewSet, self).get_queryset(*args, **kwargs)
if self.prefetch_fields is None:
self.prefetch_fields = get_prefetch_fields(self.model, self.get_serializer(), self.get_depth())
return queryset.prefetch_related(*self.prefetch_fields)
def dispatch(self, request, *args, **kwargs):
'''overriden dispatch method to append some custom headers'''
response = super(LDPViewSet, self).dispatch(request, *args, **kwargs)
response["Access-Control-Allow-Origin"] = request.META.get('HTTP_ORIGIN')
response["Access-Control-Allow-Methods"] = "GET,POST,PUT,PATCH,DELETE"
response["Access-Control-Allow-Headers"] = "authorization, Content-Type, if-match, accept"
response["Access-Control-Expose-Headers"] = "Location"
response["Access-Control-Allow-Credentials"] = 'true'
response["Accept-Post"] = "application/ld+json"
if response.status_code in [201, 200] and '@id' in response.data:
response["Location"] = response.data['@id']
response["Location"] = str(response.data['@id'])
else:
pass
response["Accept-Post"] = "application/ld+json"
if request.user.is_authenticated():
if is_authenticated_user(request.user):
try:
response['User'] = request.user.webid()
response['User'] = request.user.urlid
except AttributeError:
pass
return response
......@@ -227,75 +555,151 @@ class LDPNestedViewSet(LDPViewSet):
"""
parent_model = None
parent_lookup_field = None
related_field = None
nested_field = None
nested_related_name = None
nested_field_name = None
field_name_to_parent = None
def get_parent(self):
return get_object_or_404(self.parent_model, **{self.parent_lookup_field: self.kwargs[self.parent_lookup_field]})
def perform_create(self, serializer, **kwargs):
kwargs[self.nested_related_name] = self.get_parent()
kwargs[self.field_name_to_parent] = self.get_parent()
super().perform_create(serializer, **kwargs)
def get_queryset(self, *args, **kwargs):
if self.related_field.many_to_many or self.related_field.one_to_many:
return getattr(self.get_parent(), self.nested_field).all()
if self.related_field.many_to_one or self.related_field.one_to_one:
return [getattr(self.get_parent(), self.nested_field)]
related = getattr(self.get_parent(), self.nested_field_name)
if self.nested_field.many_to_many or self.nested_field.one_to_many:
if isinstance(self.nested_field, DynamicNestedField):
return related()
return related.all()
if self.nested_field.one_to_one or self.nested_field.many_to_one:
return type(related).objects.filter(pk=related.pk)
@classonlymethod
def get_related_fields(cls, model):
return {field.get_accessor_name(): field for field in model._meta.fields_map.values()}
@classonlymethod
def nested_urls(cls, nested_field, **kwargs):
try:
related_field = cls.get_model(**kwargs)._meta.get_field(nested_field)
except FieldDoesNotExist:
related_field = cls.get_related_fields(cls.get_model(**kwargs))[nested_field]
if related_field.related_query_name:
nested_related_name = related_field.related_query_name()
class LDPAPIView(APIView):
'''extends rest framework APIView to support Solid standards'''
authentication_classes = (NoCSRFAuthentication,)
def dispatch(self, request, *args, **kwargs):
'''overriden dispatch method to append some custom headers'''
response = super().dispatch(request, *args, **kwargs)
if response.status_code in [201, 200] and isinstance(response.data, dict) and '@id' in response.data:
response["Location"] = str(response.data['@id'])
else:
nested_related_name = related_field.remote_field.name
return cls.urls(
lookup_field=Model.get_meta(related_field.related_model, 'lookup_field', 'pk'),
model=related_field.related_model,
exclude=(nested_related_name,) if related_field.one_to_many else (),
parent_model=cls.get_model(**kwargs),
nested_field=nested_field,
nested_related_name=nested_related_name,
related_field=related_field,
parent_lookup_field=cls.get_lookup_arg(**kwargs),
model_prefix=cls.get_model(**kwargs)._meta.object_name.lower(),
permission_classes=Model.get_permission_classes(related_field.related_model,
kwargs.get('permission_classes', [LDPPermissions])),
lookup_url_kwarg=related_field.related_model._meta.object_name.lower() + '_id')
pass
if is_authenticated_user(request.user):
try:
response['User'] = request.user.urlid
except AttributeError:
pass
return response
class LDPSourceViewSet(LDPViewSet):
model = LDPSource
federation = None
@classonlymethod
def urls(cls, **kwargs):
try:
return include([url(name + '/', super(LDPSourceViewSet, cls).urls(federation=name, **kwargs))
for name in LDPSource.objects.order_by().values_list('federation', flat=True).distinct()])
except (OperationalError, ProgrammingError): # for the case where the table doesn't exist
return include([])
def get_queryset(self, *args, **kwargs):
return super().get_queryset(*args, **kwargs).filter(federation=self.federation)
return super().get_queryset(*args, **kwargs).filter(federation=self.kwargs['federation'])
class WebFingerView(View):
endpoint_class = WebFingerEndpoint
def get(self, request, *args, **kwargs):
return self.on_request(request)
@receiver([post_save, post_delete], sender=LDPSource)
def reload_sources_module(sender, instance, **kwargs):
urlconf = settings.ROOT_URLCONF
clear_url_caches()
def on_request(self, request):
endpoint = self.endpoint_class(request)
try:
endpoint.validate_params()
return JsonResponse(endpoint.response())
except WebFingerError as error:
return JsonResponse(error.create_dict(), status=400)
def post(self, request, *args, **kwargs):
return self.on_request(request)
def serve_static_content(request, path):
if request.method != "GET":
resolver = get_resolver()
match = resolver.resolve("/" + path)
request.user = AnonymousUser()
return match.func(request, *match.args, **match.kwargs)
server_url = getattr(settings, "BASE_URL", "http://localhost")
is_filtered = request.GET.get('search-fields', False)
output_dir = "ssr"
output_dir_filtered = "ssr_filtered"
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
if not os.path.exists(output_dir_filtered):
os.makedirs(output_dir_filtered, exist_ok=True)
file_path = os.path.join(output_dir if not is_filtered else output_dir_filtered, path[:-1])
if not file_path.endswith(".jsonld"):
file_path += ".jsonld"
if os.path.exists(file_path):
current_time = time.time()
file_mod_time = os.path.getmtime(file_path)
time_difference = current_time - file_mod_time
if time_difference > 24 * 60 * 60:
os.remove(file_path)
if not os.path.exists(file_path):
resolver = get_resolver()
match = resolver.resolve("/" + path)
request.user = AnonymousUser()
response = match.func(request, *match.args, **match.kwargs)
if response.status_code == 200:
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
json_content = JSONRenderer().render(response.data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(
json_content.decode("utf-8")
.replace('"@id":"' + server_url, '"@id":"' + server_url + "/ssr")
.replace(
'"@id":"' + server_url + "/ssr/ssr",
'"@id":"' + server_url + "/ssr",
)[:-1]
+ ',"@context": "'
+ getattr(
settings,
"LDP_RDF_CONTEXT",
"https://cdn.startinblox.com/owl/context.jsonld",
)
+ '"}'
)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
try:
json_content = json.loads(content)
return JsonResponse(
json_content,
safe=False,
status=200,
content_type="application/ld+json",
headers={
"Access-Control-Allow-Origin": "*",
"Cache-Control": "public, max-age=3600",
},
)
except json.JSONDecodeError:
pass
if 'djangoldp.urls' in sys.modules:
reload(sys.modules['djangoldp.urls'])
if urlconf in sys.modules:
reload(sys.modules[urlconf])
return HttpResponseNotFound("File not found")
# djangoldp-crypto
Packages like [djangoldp](https://git.startinblox.com/djangoldp-packages/djangoldp) and [django-webidoidc-provider](https://git.startinblox.com/djangoldp-packages/django-webidoidc-provider) have some models and utilities which make use of cryptography. In general, we want to re-use that code in a supporting package to avoid duplication of effort. However, until it is more clear what ca be re-used, we are using this separate django app in this package. See [this ticket](https://git.startinblox.com/djangoldp-packages/djangoldp/issues/236) for more.
## Install
```bash
$ python -m pip install 'djangoldp[crypto]'
```
Tnen add the app to your `settings.yml` like so:
```yaml
INSTALLED_APPS:
- djangoldp_crypto
```
## Management commands
- `creatersakey`: Randomly generate a new RSA key for the DjangoLDP server
## Test
```bash
$ python -m unittest djangoldp_crypto.tests.runner
```
from django.contrib import admin
from djangoldp_crypto.models import RSAKey
@admin.register(RSAKey)
class RSAKeyAdmin(admin.ModelAdmin):
readonly_fields = ['kid', 'pub_key']
def save_model(self, request, obj, form, change):
obj.priv_key.replace('\r', '')
super().save_model(request, obj, form, change)
from Cryptodome.PublicKey import RSA
from django.core.management.base import BaseCommand
from djangoldp_crypto.models import RSAKey
class Command(BaseCommand):
help = 'Randomly generate a new RSA key for the DjangoLDP server'
def handle(self, *args, **options):
try:
key = RSA.generate(2048)
rsakey = RSAKey(priv_key=key.exportKey('PEM').decode('utf8'))
rsakey.save()
self.stdout.write('RSA key successfully created')
self.stdout.write(u'Private key: \n{0}'.format(rsakey.priv_key))
self.stdout.write(u'Public key: \n{0}'.format(rsakey.pub_key))
self.stdout.write(u'Key ID: \n{0}'.format(rsakey.kid))
except Exception as e:
self.stdout.write('Something goes wrong: {0}'.format(e))
# Generated by Django 2.2.19 on 2021-03-15 10:35
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='RSAKey',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('priv_key', models.TextField(help_text='Paste your private RSA Key here.', unique=True, verbose_name='Key')),
],
options={
'verbose_name': 'RSA Key',
'verbose_name_plural': 'RSA Keys',
},
),
]