Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • djangoldp-packages/djangoldp
  • decentral1se/djangoldp
  • femmefaytale/djangoldp
  • jvtrudel/djangoldp
4 results
Show changes
Showing
with 772 additions and 5 deletions
from django.apps import AppConfig
class {{ camel_case_app_name }}Config(AppConfig):
name = '{{ app_name }}'
"""This module is loaded by DjangoLDP core during setup."""
# define an extra variable (should be prefix with package name)
MYPACKAGE_VAR = 'MY_DEFAULT_VAR'
# register an extra middleware
MIDDLEWARE = []
# register an extra installed app
INSTALLED_APPS = []
from django.urls import path
from .models import Dummy
from .views import DummyViewset
urlpatterns = [
path('dummy/', DummyViewset.urls(model_prefix="dummy", model=Dummy))
]
from django.db import models
from djangoldp.models import Model
class Dummy(Model):
name = models.CharField(max_length=255, blank=True)
class Meta(Model.Meta):
rdf_type = 'hd:dummy'
def __str__(self):
return self.name
from django.test import TestCase
# Create your tests here.
from djangoldp.views import LDPViewSet
from djangoldp.models import Model
from .models import Dummy
class DummyViewset(LDPViewSet):
pass
[metadata]
name = {{ app_name }}
version = attr: {{ app_name }}.__version__
description = {{ description }}
long_description = file: README.md
license = MIT
classifiers =
Programming Language :: Python :: 3
[options]
packages = find:
[semantic_release]
version_source = tag
version_variable = {{ app_name }}/__init__.py:__version__
commit_parser = commit_parser.parse
#!/usr/bin/env python
"""Setup script."""
from setuptools import setup
setup()
#!/usr/bin/env python
import os
import sys
from djangoldp.conf import ldpsettings
if __name__ == "__main__":
ldpsettings.configure('settings.yml')
try:
from django.core.management import execute_from_command_line
except ImportError:
# The above import may fail for some other reason. Ensure that the
# issue is really that Django is missing to avoid masking other
# exceptions on Python 2.
try:
import django
except ImportError:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
)
raise
execute_from_command_line(sys.argv)
"""{{ project_name }} URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/1.11/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from django.urls import include, path
from django.contrib import admin
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
path('', include('djangoldp.urls')),
path('admin/', admin.site.urls),
]
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
"""
WSGI config for SIB project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.11/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
from django.conf import settings as django_settings
from djangoldp.conf import ldpsettings
if not django_settings.configured:
ldpsettings.configure()
application = get_wsgi_application()
try:
from djangoldp.activities.services import ActivityQueueService
ActivityQueueService.start()
except:
pass
dependencies:
ldppackages:
server:
# DjangoLDP required settings
DEBUG: true
ALLOWED_HOSTS:
- '*'
SECRET_KEY: '{{secret_key}}'
DATABASES:
default:
ENGINE: django.db.backends.sqlite3
NAME: db.sqlite3
LDP_RDF_CONTEXT: https://cdn.startinblox.com/owl/context.jsonld
ROOT_URLCONF: server.urls
STATIC_ROOT: static
MEDIA_ROOT: media
from django.conf import settings
from django.db.models import Q
from rest_framework.filters import BaseFilterBackend
from djangoldp.models import Model
from djangoldp.utils import check_client_ip
class OwnerFilterBackend(BaseFilterBackend):
"""Adds the objects owned by the user"""
def filter_queryset(self, request, queryset, view):
if request.user.is_superuser:
return queryset
if request.user.is_anonymous:
return queryset.none()
if getattr(view.model._meta, 'owner_field', None):
return queryset.filter(**{view.model._meta.owner_field: request.user})
if getattr(view.model._meta, 'owner_urlid_field', None):
return queryset.filter(**{view.model._meta.owner_urlid_field: request.user.urlid})
if getattr(view.model._meta, 'auto_author', None):
return queryset.filter(**{view.model._meta.auto_author: request.user})
return queryset
class PublicFilterBackend(BaseFilterBackend):
"""
Public filter applied.
This class can be applied on models which bears a is_public boolean field, to filter objects that are public.
"""
def filter_queryset(self, request, queryset, view):
public_field = queryset.model._meta.public_field
return queryset.filter(**{public_field: True})
class ActiveFilterBackend(BaseFilterBackend):
"""
Filter which removes inactive objects from the queryset, useful for user for instance and configurable using the active_field meta
"""
def filter_queryset(self, request, queryset, view):
if (hasattr(queryset.model._meta, 'active_field')):
is_active_field = queryset.model._meta.active_field
return queryset.filter(**{is_active_field :True})
return queryset
class NoFilterBackend(BaseFilterBackend):
"""
No filter applied.
This class is useful for permission classes that don't filter objects, so that they can be chained with other
"""
def filter_queryset(self, request, queryset, view):
return queryset
class LocalObjectFilterBackend(BaseFilterBackend):
"""
Filter which removes external objects (federated backlinks) from the queryset
For querysets which should only include local objects
"""
def filter_queryset(self, request, queryset, view):
internal_ids = [x.pk for x in queryset if not Model.is_external(x)]
return queryset.filter(pk__in=internal_ids)
return queryset.filter(urlid__startswith=settings.SITE_URL)
class LocalObjectOnContainerPathBackend(LocalObjectFilterBackend):
......@@ -18,6 +61,94 @@ class LocalObjectOnContainerPathBackend(LocalObjectFilterBackend):
is the model container path
"""
def filter_queryset(self, request, queryset, view):
from djangoldp.models import Model
if issubclass(view.model, Model) and request.path_info == view.model.get_container_path():
return super(LocalObjectOnContainerPathBackend, self).filter_queryset(request, queryset, view)
return queryset
class SearchByQueryParamFilterBackend(BaseFilterBackend):
"""
Applies search fields in the request query params to the queryset
"""
def filter_queryset(self, request, queryset, view):
# the model fields on which to perform the search
search_fields = request.GET.get('search-fields', None)
# the terms to search the fields for
search_terms = request.GET.get('search-terms', None)
# the method of search to apply to the model fields
search_method = request.GET.get('search-method', "basic")
# union or intersection
search_policy = request.GET.get('search-policy', 'union')
# check if there is indeed a search requested
if search_fields is None or search_terms is None:
return queryset
def _construct_search_query(search):
'''Utility function pipes many Django Query objects'''
search_query = []
for idx, s in enumerate(search):
if idx > 0:
# the user has indicated to make a union of all query results
if search_policy == "union":
search_query = search_query | Q(**s)
# the user has indicated to make an intersection of all query results
else:
search_query = search_query & Q(**s)
continue
search_query = Q(**s)
return search_query
search_fields = search_fields.split(',')
if search_method == "basic":
search = []
for s in search_fields:
query = {}
query["{}__contains".format(s)] = search_terms
search.append(query)
queryset = queryset.filter(_construct_search_query(search))
elif search_method == "ibasic":
# NOTE: to use, see https://stackoverflow.com/questions/54071944/fielderror-unsupported-lookup-unaccent-for-charfield-or-join-on-the-field-not
unaccent_extension = getattr(settings, 'SEARCH_UNACCENT_EXTENSION', False) and 'django.contrib.postgres' in settings.INSTALLED_APPS
middle_term = '__unaccent' if unaccent_extension else ''
search = []
for s in search_fields:
query = {}
query["{}{}__icontains".format(s, middle_term)] = search_terms
search.append(query)
queryset = queryset.filter(_construct_search_query(search))
elif search_method == "exact":
search = []
for s in search_fields:
query = {}
query["{}__exact".format(s)] = search_terms
search.append(query)
queryset = queryset.filter(_construct_search_query(search))
return queryset
class IPFilterBackend(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
if check_client_ip(request):
return queryset
else:
return queryset.none()
\ No newline at end of file
'''
DjangoLDP Check Integrity Command & Importer
Usage `./manage.py check_integrity --help`
This command does import every `check_integrity.py` of every DJANGOLDP_PACKAGES
Allowed methods:
`add_arguments(parser)`: Allows to create new arguments to this command
`check_integrity(options)`: Do your own checks on the integrity
Examples on `djangoldp/check_integrity.py`
'''
from django.core.management.base import BaseCommand
from django.conf import settings
from importlib import import_module
import requests
class Command(BaseCommand):
help = "Check the datas integrity"
def add_arguments(self, parser):
import_module('djangoldp.check_integrity').add_arguments(parser)
for package in settings.DJANGOLDP_PACKAGES:
try:
import_module('{}.check_integrity'.format(package)).add_arguments(parser)
except:
pass
def handle(self, *args, **options):
import_module('djangoldp.check_integrity').check_integrity(options)
for package in settings.DJANGOLDP_PACKAGES:
try:
import_module('{}.check_integrity'.format(package)).check_integrity(options)
except:
pass
exit(0)
from django.core import management
from django.conf import settings
from django.db.utils import IntegrityError
from django.core.management.base import BaseCommand, CommandError
from django.core.exceptions import ValidationError
class Command(BaseCommand):
help = 'Initialize the DjangoLDP backend'
def add_arguments(self, parser):
"""Define the same arguments as the ones in CLI."""
parser.add_argument('--with-admin', nargs='?', type=str, help='Create an administrator user.')
parser.add_argument('--email', nargs='?', type=str, help='Provide an email for administrator.')
parser.add_argument('--with-dummy-admin', action='store_true', help='Create a default "admin" user with "admin" password.')
def handle(self, *args, **options):
"""Wrapper command around default django initialization commands."""
try:
# migrate data
management.call_command('migrate', interactive=False)
except CommandError as e:
self.stdout.write(self.style.ERROR(f'Data migration failed: {e}'))
if options['with_dummy_admin']:
try:
# create a default super user
from django.contrib.auth import get_user_model
User = get_user_model()
User.objects.create_superuser('admin', 'admin@example.org', 'admin')
except (ValidationError, IntegrityError):
self.stdout.write('User "admin" already exists. Skipping...')
pass
elif options['with_admin']:
try:
# call default createsuperuser command
management.call_command('createsuperuser', '--noinput', '--username', options['with_admin'], '--email', options['email'])
except CommandError as e:
self.stdout.write(self.style.ERROR(f'Superuser {e}'))
pass
#try:
# # creatersakey
# management.call_command('creatersakey', interactive=False)
#except CommandError as e:
# self.stdout.write(self.style.ERROR(f'RSA key creation failed: {e}'))
import argparse
from django.core.management.base import BaseCommand
from django.conf import settings
from djangoldp.models import LDPSource
def isstring(target):
if isinstance(target, str):
return target
return False
def list_models():
# Improve me using apps.get_models()
return {
"circles": "/circles/",
"circlesjoinable": "/circles/joinable/",
"communities": "/communities/",
"opencommunities": "/open-communities/",
"communitiesaddresses": "/community-addresses/",
"dashboards": "/dashboards/",
"events": "/events/",
"eventsfuture": "/events/future/",
"eventspast": "/events/past/",
"typeevents": "/typeevents/",
"resources": "/resources/",
"keywords": "/keywords/",
"types": "/types/",
"joboffers": "/job-offers/current/",
"polls": "/polls/",
"projects": "/projects/",
"projectsjoinable": "/projects/joinable/",
"skills": "/skills/",
"users": "/users/"
}
class Command(BaseCommand):
help = 'Add another server to this one sources'
def add_arguments(self, parser):
parser.add_argument(
"--target",
action="store",
default=False,
type=isstring,
help="Targeted server, format protocol://domain",
)
parser.add_argument(
"--delete",
default=False,
nargs='?',
const=True,
help="Remove targeted source",
)
def handle(self, *args, **options):
target = options["target"]
models = list_models()
if not target:
target = settings.SITE_URL
error_counter = 0
if(options["delete"]):
for attr, value in models.items():
try:
LDPSource.objects.filter(urlid=target+value, federation=attr).delete()
except:
error_counter += 1
if error_counter > 0:
self.stdout.write(self.style.ERROR("Can't remove: "+target))
exit(2)
else:
self.stdout.write(self.style.SUCCESS("Successfully removed sources for "+target))
exit(0)
else:
for attr, value in models.items():
try:
LDPSource.objects.create(urlid=target+value, federation=attr)
except:
error_counter += 1
if error_counter > 0:
self.stdout.write(self.style.WARNING("Source aleady exists for "+target+"\nIgnored "+str(error_counter)+"/"+str(len(models))))
if error_counter == len(models):
exit(1)
exit(0)
else:
self.stdout.write(self.style.SUCCESS("Successfully created sources for "+target))
exit(0)
import os
import json
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
from django.apps import apps
from urllib.parse import urlparse, urljoin
class StaticContentGenerator:
def __init__(self, stdout, style):
self.stdout = stdout
self.style = style
self.base_uri = getattr(settings, 'BASE_URL', '')
self.max_depth = getattr(settings, 'MAX_RECURSION_DEPTH', 5)
self.request_timeout = getattr(settings, 'SSR_REQUEST_TIMEOUT', 10)
self.regenerated_urls = set()
self.failed_urls = set()
self.output_dir = 'ssr'
self.output_dir_filtered = 'ssr_filtered'
def generate_content(self):
self._create_output_directory()
for model in self._get_static_models():
self._process_model(model)
def _create_output_directory(self):
os.makedirs(self.output_dir, exist_ok=True)
os.makedirs(self.output_dir_filtered, exist_ok=True)
def _get_static_models(self):
return [model for model in apps.get_models() if hasattr(model._meta, 'static_version')]
def _process_model(self, model):
self.stdout.write(f"Generating content for model: {model}")
url = self._build_url(model)
if url not in self.regenerated_urls and url not in self.failed_urls:
self._fetch_and_save_content(model, url, self.output_dir)
else:
self.stdout.write(self.style.WARNING(f'Skipping {url} as it has already been fetched'))
if hasattr(model._meta, 'static_params'):
url = self._build_url(model, True)
if url not in self.regenerated_urls and url not in self.failed_urls:
self._fetch_and_save_content(model, url, self.output_dir_filtered)
else:
self.stdout.write(self.style.WARNING(f'Skipping {url} as it has already been fetched'))
def _build_url(self, model, use_static_params=False):
container_path = model.get_container_path()
url = urljoin(self.base_uri, container_path)
if hasattr(model._meta, 'static_params') and use_static_params:
url += '?' + '&'.join(f'{k}={v}' for k, v in model._meta.static_params.items())
return url
def _fetch_and_save_content(self, model, url, output_dir):
try:
response = requests.get(url, timeout=self.request_timeout)
if response.status_code == 200:
content = self._update_ids_and_fetch_associated(response.text)
self._save_content(model, url, content, output_dir)
else:
self.stdout.write(self.style.ERROR(f'Failed to fetch content from {url}: HTTP {response.status_code}'))
except requests.exceptions.RequestException as e:
self.stdout.write(self.style.ERROR(f'Error fetching content from {url}: {str(e)}'))
def _save_content(self, model, url, content, output_dir):
relative_path = urlparse(url).path.strip('/')
file_path = os.path.join(output_dir, relative_path)
if file_path.endswith('/'):
file_path = file_path[:-1]
if not file_path.endswith('.jsonld'):
file_path += '.jsonld'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
try:
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
self.stdout.write(self.style.SUCCESS(f'Successfully saved content for {model._meta.model_name} from {url} to {file_path}'))
except IOError as e:
self.stdout.write(self.style.ERROR(f'Error saving content for {model._meta.model_name}: {str(e)}'))
def _update_ids_and_fetch_associated(self, content, depth=0):
if depth > self.max_depth:
return content
try:
data = json.loads(content)
self._process_data(data, depth)
return json.dumps(data)
except json.JSONDecodeError as e:
self.stdout.write(self.style.ERROR(f'Failed to decode JSON: {e}'))
return content
def _process_data(self, data, depth):
if isinstance(data, dict):
self._process_item(data, depth)
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
self._process_item(item, depth)
def _process_item(self, item, depth):
if '@id' in item:
self._update_and_fetch_content(item, depth)
for value in item.values():
if isinstance(value, (dict, list)):
self._process_data(value, depth)
def _update_and_fetch_content(self, item, depth):
original_id = item['@id']
parsed_url = urlparse(original_id)
if not parsed_url.netloc:
original_id = urljoin(self.base_uri, original_id)
parsed_url = urlparse(original_id)
path = parsed_url.path
if path.startswith(urlparse(self.base_uri).path):
path = path[len(urlparse(self.base_uri).path):]
new_id = f'/ssr{path}'
item['@id'] = urljoin(self.base_uri, new_id)
self._fetch_and_save_associated_content(original_id, path, depth)
def _rewrite_ids_before_saving(self, data):
if isinstance(data, dict):
if '@id' in data:
original_id = data['@id']
parsed_url = urlparse(data['@id'])
if not parsed_url.netloc:
content_id = urljoin(self.base_uri, original_id)
parsed_url = urlparse(content_id)
if 'ssr/' not in data['@id']:
path = parsed_url.path
if path.startswith(urlparse(self.base_uri).path):
path = path[len(urlparse(self.base_uri).path):]
new_id = f'/ssr{path}'
data['@id'] = urljoin(self.base_uri, new_id)
for value in data.values():
if isinstance(value, (dict, list)):
self._rewrite_ids_before_saving(value)
elif isinstance(data, list):
for item in data:
self._rewrite_ids_before_saving(item)
return data
def _fetch_and_save_associated_content(self, url, new_path, depth):
if url in self.regenerated_urls:
self.stdout.write(self.style.WARNING(f'Skipping {url} as it has already been fetched'))
return
if url in self.failed_urls:
self.stdout.write(self.style.WARNING(f'Skipping {url} as it has already been tried and failed'))
return
file_path = os.path.join(self.output_dir, new_path.strip('/'))
if file_path.endswith('/'):
file_path = file_path[:-1]
if not file_path.endswith('.jsonld'):
file_path += '.jsonld'
os.makedirs(os.path.dirname(file_path), exist_ok=True)
try:
response = requests.get(url, timeout=self.request_timeout)
if response.status_code == 200:
updated_content = json.loads(self._update_ids_and_fetch_associated(response.text, depth + 1))
updated_content = self._rewrite_ids_before_saving(updated_content)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(json.dumps(updated_content))
self.regenerated_urls.add(url)
self.stdout.write(self.style.SUCCESS(f'Successfully fetched and saved associated content from {url} to {file_path}'))
else:
self.failed_urls.add(url)
self.stdout.write(self.style.ERROR(f'Failed to fetch associated content from {url}: HTTP {response.status_code}'))
except requests.exceptions.RequestException as e:
self.stdout.write(self.style.ERROR(f'Error fetching associated content from {url}: {str(e)}'))
except IOError as e:
self.stdout.write(self.style.ERROR(f'Error saving associated content from {url}: {str(e)}'))
class Command(BaseCommand):
help = 'Generate static content for models having the static_version meta attribute set to 1/true'
def handle(self, *args, **options):
generator = StaticContentGenerator(self.stdout, self.style)
generator.generate_content()
\ No newline at end of file
from django.conf import settings
from django.utils.http import is_safe_url
from django.utils.http import url_has_allowed_host_and_scheme
from django.shortcuts import redirect
from djangoldp.models import Model
class AllowOnlySiteUrl:
......@@ -9,7 +10,26 @@ class AllowOnlySiteUrl:
def __call__(self, request):
response = self.get_response(request)
if(is_safe_url(request.get_raw_uri(), allowed_hosts=settings.SITE_URL) or response.status_code != 200):
if(url_has_allowed_host_and_scheme(request.get_raw_uri(), allowed_hosts=settings.SITE_URL) or response.status_code != 200):
return response
else:
return redirect('{}{}'.format(settings.SITE_URL, request.path), permanent=True)
class AllowRequestedCORSMiddleware:
'''A CORS Middleware which allows the domains requested by the request'''
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
response["Access-Control-Allow-Origin"] = request.headers.get('origin')
response["Access-Control-Allow-Methods"] = "GET,POST,PUT,PATCH,DELETE,OPTIONS,HEAD"
response["Access-Control-Allow-Headers"] = \
getattr(settings, 'OIDC_ACCESS_CONTROL_ALLOW_HEADERS',
"authorization, Content-Type, if-match, accept, DPoP, cache-control, prefer")
response["Access-Control-Expose-Headers"] = "Location, User"
response["Access-Control-Allow-Credentials"] = 'true'
return response
\ No newline at end of file
# Generated by Django 2.2 on 2020-09-09 22:06
from django.db import migrations, models
import django.db.models.deletion
import djangoldp.fields
class Migration(migrations.Migration):
dependencies = [
('djangoldp', '0013_auto_20200624_1709'),
]
operations = [
migrations.CreateModel(
name='ScheduledActivity',
fields=[
('activity_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='djangoldp.Activity')),
('failed_attempts', models.PositiveIntegerField(default=0, help_text='a log of how many failed retries have been made sending the activity')),
],
options={
'abstract': False,
'default_permissions': ('add', 'change', 'delete', 'view', 'control'),
},
bases=('djangoldp.activity',),
),
migrations.AlterField(
model_name='activity',
name='created_at',
field=models.DateTimeField(auto_now_add=True),
),
migrations.AlterField(
model_name='activity',
name='local_id',
field=djangoldp.fields.LDPUrlField(help_text='/inbox or /outbox url (local - this server)'),
),
migrations.AlterField(
model_name='activity',
name='payload',
field=models.BinaryField(),
),
migrations.AddField(
model_name='activity',
name='success',
field=models.BooleanField(default=False,
help_text='set to True when an Activity is successfully delivered'),
),
migrations.AddField(
model_name='activity',
name='type',
field=models.CharField(blank=True, help_text='the ActivityStreams type of the Activity', max_length=64,
null=True),
),
migrations.AddField(
model_name='activity',
name='is_finished',
field=models.BooleanField(default=True),
),
migrations.AddField(
model_name='activity',
name='response_code',
field=models.CharField(blank=True, help_text='Response code sent by receiver', max_length=8, null=True),
),
migrations.AddField(
model_name='activity',
name='response_location',
field=djangoldp.fields.LDPUrlField(blank=True, help_text='Location saved activity can be found', null=True),
),
migrations.AddField(
model_name='activity',
name='response_body',
field=models.BinaryField(null=True),
),
migrations.AddField(
model_name='activity',
name='external_id',
field=djangoldp.fields.LDPUrlField(help_text='the /inbox or /outbox url (from the sender or receiver)',
null=True),
),
migrations.RemoveField(
model_name='activity',
name='aid',
),
]