diff --git a/README.md b/README.md index 94f9d2915aa8dd4c5db91405fd858693c4f977ac..cf2760d56e6ee27e4cb7b399fa616068987dcc5f 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Building a Startin' Blox application? Read this: https://git.happy-dev.fr/starti ## Requirements -* Django (known to work with django 1.11) +* Django (known to work with 2.2) * Django Rest Framework * pyld==1.0.5 * django-guardian diff --git a/djangoldp/activities/services.py b/djangoldp/activities/services.py index 94d8e9d566e971281fa5e72de7c71382f1ee559b..840fdc51088131e4a36ea1f9d798289d6dd3b268 100644 --- a/djangoldp/activities/services.py +++ b/djangoldp/activities/services.py @@ -1,14 +1,19 @@ import threading import json +import time import requests +from queue import Queue +from requests.exceptions import Timeout, ConnectionError +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry from urllib.parse import urlparse from django.contrib.auth import get_user_model from django.db.models.signals import post_save, post_delete, m2m_changed -from django.dispatch import receiver +from django.dispatch import receiver, Signal from django.conf import settings from rest_framework.utils import model_meta -from djangoldp.models import Model, Follower +from djangoldp.models import Model, Follower, ScheduledActivity from djangoldp.models import Activity as ActivityModel import logging @@ -20,13 +25,348 @@ BACKLINKS_ACTOR = { "name": "Backlinks Service" } +SCHEDULER_SETTINGS = { + 'apscheduler.timezone': getattr(settings, 'TIME_ZONE', 'UTC'), +} + +MAX_ACTIVITY_RESCHEDULES = getattr(settings, 'MAX_ACTIVITY_RESCHEDULES', 3) +DEFAULT_BACKOFF_FACTOR = getattr(settings, 'DEFAULT_BACKOFF_FACTOR', 1) +DEFAULT_ACTIVITY_DELAY = getattr(settings, 'DEFAULT_ACTIVITY_DELAY', 3) + + +activity_sending_finished = Signal() + + +class ActivityQueueService: + '''Manages an asynchronous queue for Activity format messages''' + initialized = False + queue = None + + @classmethod + def revive_activities(cls): + '''re-schedules all ScheduledActivities to the queue''' + with cls.queue.mutex: + cls.queue.queue.clear() + + scheduled = ScheduledActivity.objects.all() + for activity in scheduled: + if activity.external_id is not None: + cls.resend_activity(str(activity.external_id), activity, failed=False) + else: + activity.delete() + + @classmethod + def start(cls): + ''' + method checks if there are scheduled activities on the queue and starts them up + Important: this method should only be called in start-up, when you know there are not queue tasks running + otherwise duplicate activities may be sent + ''' + def queue_worker(queue): + while True: + # wait for queue item to manifest + item = queue.get() + time.sleep(DEFAULT_ACTIVITY_DELAY) + cls._activity_queue_worker(item[0], item[1]) + cls.queue.task_done() + + if not cls.initialized: + cls.initialized = True + + # initialise the queue worker - infinite maxsize + cls.queue = Queue(maxsize=0) + t = threading.Thread(target=queue_worker, args=[cls.queue]) + t.setDaemon(True) + t.start() + + cls.revive_activities() + + @classmethod + def do_post(cls, url, activity, auth=None): + ''' + makes a POST request to url, passing activity + :returns: response from server + :raises: Timeout or ConnectionError if the post could not be made + ''' + headers = {'Content-Type': 'application/ld+json'} + logger.debug('[Sender] sending Activity... ' + str(activity)) + + if getattr(settings, 'DISABLE_OUTBOX', False) == 'DEBUG': + return {'data': {}} + return requests.post(url, data=json.dumps(activity), headers=headers, timeout=10) + + @classmethod + def _save_activity_from_response(cls, response, url, scheduled_activity): + ''' + wrapper to save a finished Activity based on the parameterised response + :return: saved Activity object + ''' + response_body = None + + if hasattr(response, 'text'): + response_body = response.text + response_location = getattr(response, "Location", None) + status_code = getattr(response, "status_code", None) + success = str(status_code).startswith("2") + + return cls._save_sent_activity(scheduled_activity.to_activitystream(), ActivityModel, success=success, + external_id=url, type=scheduled_activity.type, + response_location=response_location, response_code=str(status_code), + response_body=response_body) + + @classmethod + def _attempt_failed_reschedule(cls, url, scheduled_activity, backoff_factor): + ''' + either re-schedules a failed activity or saves its failure state, depending on the number of fails and the + fail policy (MAX_ACTIVITY_RESCHEDULES) + :return: True if it was able to reschedule + ''' + if scheduled_activity.failed_attempts < MAX_ACTIVITY_RESCHEDULES: + backoff = backoff_factor * (2 ** (scheduled_activity.failed_attempts - 1)) + cls.resend_activity(url, scheduled_activity, backoff) + return True + + # no retries left, save the failure state + logger.error('Failed to deliver backlink to ' + str(url) + ' after retrying ' + + str(MAX_ACTIVITY_RESCHEDULES) + ' times') + + cls._save_sent_activity(scheduled_activity.to_activitystream(), ActivityModel, success=False, external_id=url, + type=scheduled_activity.type, response_code='408') + return False + + @classmethod + def _dispatch_activity_sending_finished(cls, response, saved_activity): + '''sends a 'activity_sending_finished' signal to receivers''' + activity_sending_finished.send(sender=cls, response=response, saved_activity=saved_activity) + + @classmethod + def _send_activity(cls, url, scheduled_activity, auth=None, backoff_factor=DEFAULT_BACKOFF_FACTOR): + ''' + makes a POST request to url, passing ScheduledActivity instance. reschedules if needed + :param backoff_factor: a factor to use in the extension of waiting for retries. Used both in the RetryStrategy + of requests.post and in rescheduling an activity which timed out + ''' + response = None + activity = scheduled_activity.to_activitystream() + try: + response = cls.do_post(url, activity, auth) + except (Timeout, ConnectionError): + if cls._attempt_failed_reschedule(url, scheduled_activity, backoff_factor): + # successfully rescheduled, so skip cleanup for now + return + except Exception as e: + logger.error('Failed to deliver backlink to ' + str(url) + ', was attempting ' + str(activity) + + str(e.__class__) + ': ' + str(e)) + + saved = None + if response is not None: + saved = cls._save_activity_from_response(response, url, scheduled_activity) + scheduled_activity.delete() + + # emit activity finished event + cls._dispatch_activity_sending_finished(response, saved) + + @classmethod + def _activity_queue_worker(cls, url, scheduled_activity): + ''' + Worker for sending a scheduled activity on the queue. Decides whether to send the activity and then passes to + _send_activity if it is worth it + ''' + + def get_related_activities(type): + '''returns a list of activity types which should be considered a "match" with the parameterised type''' + if type is None: + return [] + type = type.lower() + group_a = ['create', 'update', 'delete'] + groub_b = ['add', 'remove'] + + if type in group_a: + return group_a + if type in groub_b: + return groub_b + return [] + + def get_fail_response(): + return {'response': None, 'activity': None} + + types = get_related_activities(scheduled_activity.type) + if len(types) > 0: + scheduled = ScheduledActivity.objects.filter(external_id=scheduled_activity.external_id, + created_at__gt=scheduled_activity.created_at, + type__in=types) + + # filter to scheduled activities on the same object + scheduled = [s for s in scheduled if cls._is_same_object_target(s, scheduled_activity)] + + if len(scheduled) > 0: + scheduled_activity.delete() + return get_fail_response() + + if scheduled_activity.type == 'update' and not cls._update_is_new(url, scheduled_activity): + scheduled_activity.delete() + return get_fail_response() + + if scheduled_activity.type in ['add', 'remove'] and not cls._add_remove_is_new(url, scheduled_activity): + scheduled_activity.delete() + return get_fail_response() + + cls._send_activity(url, scheduled_activity) + + @classmethod + def _is_same_object_target(cls, activity_a, activity_b): + def get_property(object, property, default=None): + ret = object.get(property, default) + if isinstance(ret, dict): + ret = ret.get('@id', ret.get('name', None)) + return ret + + a = activity_a.to_activitystream() + b = activity_b.to_activitystream() + + if get_property(a, 'object') != get_property(b, 'object'): + return False + + return get_property(a, 'target', a.get('origin', None)) == get_property(b, 'target', b.get('origin', None)) + + @classmethod + def _update_is_new(cls, url, scheduled_activity): + '''auxiliary function which validates if a scheduled update holds new information, compared to a past success''' + def ordered(obj): + '''recursively sorts nested dictionary objects to standardise ordering in comparison''' + if isinstance(obj, dict): + return sorted((k, ordered(v)) for k, v in obj.items()) + else: + return obj + + def no_new_changes(old_activity, new_activity): + '''returns False if the two activities are equivalent''' + return ordered(old_activity['object']) == ordered(new_activity['object']) + + def get_most_recent_sent_activity(external_id): + '''returns the most recently sent activity which meets the specification''' + activities = ActivityModel.objects.filter(external_id=external_id, is_finished=True, + type__in=['create', 'update']).order_by('-created_at')[:10] + for a in activities.all(): + a = a.to_activitystream() + if 'object' in a: + return a + return None + + # str objects will have to be checked manually by the receiver + new_activity = scheduled_activity.to_activitystream() + if 'object' not in new_activity or isinstance(new_activity['object'], str): + return True + + old_activity = get_most_recent_sent_activity(url) + + if old_activity is None: + return True + + if no_new_changes(old_activity, new_activity): + return False + return True + + @classmethod + def _add_remove_is_new(cls, url, scheduled_activity): + '''auxiliary function validates if the receiver does not know about this Add/Remove activity''' + def get_most_recent_sent_activity(source_obj, source_target_origin): + # get a list of activities with the right type + activities = ActivityModel.objects.filter(external_id=url, is_finished=True, + type__in=['add', 'remove']).order_by('-created_at')[:10] + + # we are searching for the most recent Add/Remove activity which shares inbox, object and target/origin + for a in activities.all(): + astream = a.to_activitystream() + obj = astream.get('object', None) + target_origin = astream.get('target', astream.get('origin', None)) + if obj is None or target_origin is None: + continue + + if source_obj == obj and source_target_origin == target_origin: + return a + return None + + new_activity = scheduled_activity.to_activitystream() + new_obj = new_activity.get('object', None) + new_target_origin = new_activity.get('target', new_activity.get('origin', None)) + + # bounds checking + if new_obj is None or new_target_origin is None: + return True + + # Â if most recent is the same type of activity as me, it's not new + old_activity = get_most_recent_sent_activity(new_obj, new_target_origin) + if old_activity is not None and old_activity.type == scheduled_activity.type: + return False + return True + + @classmethod + def _push_to_queue(cls, url, scheduled_activity): + '''wrapper to check for singleton initialization before pushing''' + if not cls.initialized: + cls.start() + cls.queue.put([url, scheduled_activity]) + + @classmethod + def resend_activity(cls, url, scheduled_activity, failed=True): + ''' + a variation of send_activity for ScheduledActivity objects + :param url: the recipient url inbox + :param scheduled_activity: a ScheduledActivity object for sending + :param failed: set to True to increment scheduled_activity.failed_attempts, to keep track of the number of resends + ''' + if failed: + scheduled_activity.failed_attempts = scheduled_activity.failed_attempts + 1 + scheduled_activity.save() + + cls._push_to_queue(url, scheduled_activity) + + @classmethod + def send_activity(cls, url, activity, auth=None): + ''' + saves a ScheduledActivity for the parameterised activity and passes it to the queue + :param url: the recipient url inbox + :param activity: an Activity to send + ''' + if getattr(settings, 'DISABLE_OUTBOX', False) is not False: + if getattr(settings, 'DISABLE_OUTBOX') == 'DEBUG': + cls._save_sent_activity(activity, ActivityModel, external_id=url, success=True, type=activity.get('type', None), + response_code='201') + return + + # schedule the activity + scheduled = cls._save_sent_activity(activity, ScheduledActivity, external_id=url, type=activity.get('type', None)) + cls._push_to_queue(url, scheduled) + + @classmethod + def _save_sent_activity(cls, activity, model_represenation=ActivityModel, success=False, external_id=None, type=None, + response_location=None, response_code=None, local_id=None, response_body=None): + ''' + Auxiliary function saves a record of parameterised activity + :param model_represenation: the model class which should be used to store the activity. Defaults to djangoldp.Activity, must be a subclass + ''' + payload = bytes(json.dumps(activity), "utf-8") + if response_body is not None: + response_body = bytes(json.dumps(response_body), "utf-8") + if local_id is None: + local_id = settings.SITE_URL + "/outbox/" + if type is not None: + type = type.lower() + elif 'type' in activity and isinstance(activity.get('type'), str): + type = activity.get('type').lower() + obj = model_represenation.objects.create(local_id=local_id, payload=payload, success=success, + external_id=external_id, type=type, response_location=response_location, + response_code=response_code, response_body=response_body) + return obj + class ActivityPubService(object): - '''A service for sending ActivityPub notifications''' + '''A service aiding the construction and sending of ActivityStreams notifications''' @classmethod def build_object_tree(cls, instance): - '''builds an object tree from a parameterised instance''' + '''builds a depth 1 object tree from a parameterised instance, with each branch being an object's urlid and RDF type''' model = type(instance) info = model_meta.get_field_info(model) @@ -71,11 +411,12 @@ class ActivityPubService(object): @classmethod def discover_inbox(cls, target_id): + '''a method which discovers the inbox of the target resource''' url = urlparse(target_id) return target_id.replace(url.path, "/") + "inbox/" @classmethod - def _build_activity(self, actor, obj, activity_type='Activity', **kwargs): + def build_activity(self, actor, obj, activity_type='Activity', **kwargs): '''Auxiliary function returns an activity object with kwargs in the body''' res = { "@context": [ @@ -101,13 +442,11 @@ class ActivityPubService(object): :param target: an object representing the target collection ''' summary = str(obj['@id']) + " was added to " + str(target['@id']) - activity = cls._build_activity(actor, obj, activity_type='Add', summary=summary, target=target) + activity = cls.build_activity(actor, obj, activity_type='Add', summary=summary, target=target) # send request inbox = ActivityPubService.discover_inbox(target['@id']) - t = threading.Thread(target=cls.do_post, args=[inbox, activity]) - t.start() - cls._save_sent_activity(activity) + ActivityQueueService.send_activity(inbox, activity) @classmethod def send_remove_activity(cls, actor, obj, origin): @@ -118,13 +457,11 @@ class ActivityPubService(object): :param origin: the context the object has been removed from ''' summary = str(obj['@id']) + " was removed from " + str(origin['@id']) - activity = cls._build_activity(actor, obj, activity_type='Remove', summary=summary, origin=origin) + activity = cls.build_activity(actor, obj, activity_type='Remove', summary=summary, origin=origin) # send request inbox = ActivityPubService.discover_inbox(origin['@id']) - t = threading.Thread(target=cls.do_post, args=[inbox, activity]) - t.start() - cls._save_sent_activity(activity) + ActivityQueueService.send_activity(inbox, activity) @classmethod def send_create_activity(cls, actor, obj, inbox): @@ -135,12 +472,9 @@ class ActivityPubService(object): :param inbox: the inbox to send the activity to ''' summary = str(obj['@id']) + " was created" - activity = cls._build_activity(actor, obj, activity_type='Create', summary=summary) + activity = cls.build_activity(actor, obj, activity_type='Create', summary=summary) - # send request - t = threading.Thread(target=cls.do_post, args=[inbox, activity]) - t.start() - cls._save_sent_activity(activity) + ActivityQueueService.send_activity(inbox, activity) @classmethod def send_update_activity(cls, actor, obj, inbox): @@ -151,12 +485,9 @@ class ActivityPubService(object): :param inbox: the inbox to send the activity to ''' summary = str(obj['@id']) + " was updated" - activity = cls._build_activity(actor, obj, activity_type='Update', summary=summary) + activity = cls.build_activity(actor, obj, activity_type='Update', summary=summary) - # send request - t = threading.Thread(target=cls.do_post, args=[inbox, activity]) - t.start() - cls._save_sent_activity(activity) + ActivityQueueService.send_activity(inbox, activity) @classmethod def send_delete_activity(cls, actor, obj, inbox): @@ -167,38 +498,9 @@ class ActivityPubService(object): :param inbox: the inbox to send the activity to ''' summary = str(obj['@id']) + " was deleted" - activity = cls._build_activity(actor, obj, activity_type='Delete', summary=summary) - - # send request - t = threading.Thread(target=cls.do_post, args=[inbox, activity]) - t.start() - cls._save_sent_activity(activity) + activity = cls.build_activity(actor, obj, activity_type='Delete', summary=summary) - @classmethod - def _save_sent_activity(cls, activity): - '''Auxiliary function saves a record of parameterised activity''' - payload = bytes(json.dumps(activity), "utf-8") - local_id = settings.SITE_URL + "/outbox/" - obj = ActivityModel.objects.create(local_id=local_id, payload=payload) - obj.aid = Model.absolute_url(obj) - obj.save() - - @classmethod - def do_post(cls, url, activity, auth=None): - ''' - makes a POST request to url, passing activity (json) content - :return: response, or None if the request was unsuccessful - ''' - headers = {'Content-Type': 'application/ld+json'} - response = None - try: - logger.debug('[Sender] sending Activity... ' + str(activity)) - if not getattr(settings, 'DISABLE_OUTBOX', False): - response = requests.post(url, data=json.dumps(activity), headers=headers) - logger.debug('[Sender] sent, receiver responded ' + response.text) - except: - logger.error('Failed to deliver backlink to ' + str(url) +', was attempting ' + str(activity)) - return response + ActivityQueueService.send_activity(inbox, activity) @classmethod def get_related_externals(cls, sender, instance): @@ -221,7 +523,6 @@ class ActivityPubService(object): continue targets.add(value.urlid) - logger.debug('[Sender] model has external relation ' + str(value.urlid)) return targets @@ -355,9 +656,6 @@ def check_m2m_for_backlinks(sender, instance, action, *args, **kwargs): query_set = member_model.objects.filter(pk__in=pk_set) targets = build_targets(query_set) - logger.debug('[Sender] checking many2many for backlinks') - logger.debug('[Sender] built targets: ' + str(targets)) - if len(targets) > 0: obj = { "@type": container_rdf_type, diff --git a/djangoldp/admin.py b/djangoldp/admin.py index ed79745394e99dba7710d144ffcfec586e88f95a..26a46bf656ebd982754dd3be042b2b05548a6e2e 100644 --- a/djangoldp/admin.py +++ b/djangoldp/admin.py @@ -1,5 +1,7 @@ +from django.contrib import admin from guardian.admin import GuardedModelAdmin from django.contrib.auth.admin import UserAdmin +from djangoldp.models import Activity, ScheduledActivity class DjangoLDPAdmin(GuardedModelAdmin): @@ -31,3 +33,20 @@ class DjangoLDPUserAdmin(UserAdmin, GuardedModelAdmin): fieldsets = [('Federation', {'fields': federated_fields})] + list(fieldsets) return fieldsets + + +class ActivityAdmin(DjangoLDPAdmin): + fields = ['urlid', 'type', 'local_id', 'external_id', 'created_at', 'success', 'payload_view', 'response_code', + 'response_location', 'response_body_view'] + list_display = ['created_at', 'type', 'local_id', 'external_id', 'success', 'response_code'] + readonly_fields = ['created_at', 'payload_view', 'response_location', 'response_code', 'response_body_view'] + + def payload_view(self, obj): + return str(obj.to_activitystream()) + + def response_body_view(self, obj): + return str(obj.response_to_json()) + + +admin.site.register(Activity, ActivityAdmin) +admin.site.register(ScheduledActivity, ActivityAdmin) diff --git a/djangoldp/apps.py b/djangoldp/apps.py index d0538027d6c0e78b735bf90aa51ad9d783da4cfd..c92c53c902727c33d5c647a77a29323b73e3482d 100644 --- a/djangoldp/apps.py +++ b/djangoldp/apps.py @@ -1,3 +1,4 @@ +import os from django.apps import AppConfig class DjangoldpConfig(AppConfig): @@ -5,6 +6,12 @@ class DjangoldpConfig(AppConfig): def ready(self): self.auto_register_model_admin() + self.start_activity_queue() + + def start_activity_queue(self): + from djangoldp.activities.services import ActivityQueueService + if os.environ.get('RUN_MAIN') is not None: + ActivityQueueService.start() def auto_register_model_admin(self): ''' diff --git a/djangoldp/migrations/0014_auto_20200909_2206.py b/djangoldp/migrations/0014_auto_20200909_2206.py new file mode 100644 index 0000000000000000000000000000000000000000..31140ae5da1c5522f6626152f1b04e5a9912c9dd --- /dev/null +++ b/djangoldp/migrations/0014_auto_20200909_2206.py @@ -0,0 +1,84 @@ +# Generated by Django 2.2 on 2020-09-09 22:06 + +from django.db import migrations, models +import django.db.models.deletion +import djangoldp.fields + + +class Migration(migrations.Migration): + + dependencies = [ + ('djangoldp', '0013_auto_20200624_1709'), + ] + + operations = [ + migrations.CreateModel( + name='ScheduledActivity', + fields=[ + ('activity_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='djangoldp.Activity')), + ('failed_attempts', models.PositiveIntegerField(default=0, help_text='a log of how many failed retries have been made sending the activity')), + ], + options={ + 'abstract': False, + 'default_permissions': ('add', 'change', 'delete', 'view', 'control'), + }, + bases=('djangoldp.activity',), + ), + migrations.AlterField( + model_name='activity', + name='created_at', + field=models.DateTimeField(auto_now_add=True), + ), + migrations.AlterField( + model_name='activity', + name='local_id', + field=djangoldp.fields.LDPUrlField(help_text='/inbox or /outbox url (local - this server)'), + ), + migrations.AlterField( + model_name='activity', + name='payload', + field=models.BinaryField(), + ), + migrations.AddField( + model_name='activity', + name='success', + field=models.BooleanField(default=False, + help_text='set to True when an Activity is successfully delivered'), + ), + migrations.AddField( + model_name='activity', + name='type', + field=models.CharField(blank=True, help_text='the ActivityStreams type of the Activity', max_length=64, + null=True), + ), + migrations.AddField( + model_name='activity', + name='is_finished', + field=models.BooleanField(default=True), + ), + migrations.AddField( + model_name='activity', + name='response_code', + field=models.CharField(blank=True, help_text='Response code sent by receiver', max_length=8, null=True), + ), + migrations.AddField( + model_name='activity', + name='response_location', + field=djangoldp.fields.LDPUrlField(blank=True, help_text='Location saved activity can be found', null=True), + ), + migrations.AddField( + model_name='activity', + name='response_body', + field=models.BinaryField(null=True), + ), + migrations.AddField( + model_name='activity', + name='external_id', + field=djangoldp.fields.LDPUrlField(help_text='the /inbox or /outbox url (from the sender or receiver)', + null=True), + ), + migrations.RemoveField( + model_name='activity', + name='aid', + ), + ] diff --git a/djangoldp/models.py b/djangoldp/models.py index 4bd5b10e5756f15031a0457f0e4a5f5ef3baf835..8976d449d1af3c47848b69a6de53cae1352f37dc 100644 --- a/djangoldp/models.py +++ b/djangoldp/models.py @@ -2,18 +2,17 @@ import json import uuid from urllib.parse import urlparse from django.conf import settings -from django.contrib.auth.models import User, AbstractUser +from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.contrib.auth import get_user_model from django.db import models -from django.db.models import BinaryField, DateField +from django.db.models import BinaryField, DateTimeField from django.db.models.base import ModelBase from django.db.models.signals import post_save from django.dispatch import receiver from django.urls import reverse_lazy, get_resolver from django.utils.datastructures import MultiValueDictKeyError from django.utils.decorators import classonlymethod -from guardian.models import UserObjectPermissionBase from rest_framework.utils import model_meta from djangoldp.fields import LDPUrlField @@ -200,7 +199,6 @@ class Model(models.Model): :raises Exception: if the object does not exist, but the data passed is invalid ''' try: - logger.debug('[get_or_create] ' + str(model) + ' backlink ' + str(urlid)) rval = model.objects.get(urlid=urlid) if update: for field in field_tuples.keys(): @@ -208,7 +206,6 @@ class Model(models.Model): rval.save() return rval except ObjectDoesNotExist: - logger.debug('[get_or_create] creating..') if model is get_user_model(): field_tuples['username'] = str(uuid.uuid4()) return model.objects.create(urlid=urlid, is_backlink=True, **field_tuples) @@ -293,17 +290,43 @@ class LDPSource(Model): class Activity(Model): '''Models an ActivityStreams Activity''' - aid = LDPUrlField(null=True) # activity id - local_id = LDPUrlField() # /inbox or /outbox full url + local_id = LDPUrlField(help_text='/inbox or /outbox url (local - this server)') # /inbox or /outbox full url + external_id = LDPUrlField(null=True, help_text='the /inbox or /outbox url (from the sender or receiver)') payload = BinaryField() - created_at = DateField(auto_now_add=True) + response_location = LDPUrlField(null=True, blank=True, help_text='Location saved activity can be found') + response_code = models.CharField(null=True, blank=True, help_text='Response code sent by receiver', max_length=8) + response_body = BinaryField(null=True) + type = models.CharField(null=True, blank=True, help_text='the ActivityStreams type of the Activity', + max_length=64) + is_finished = models.BooleanField(default=True) + created_at = DateTimeField(auto_now_add=True) + success = models.BooleanField(default=False, help_text='set to True when an Activity is successfully delivered') class Meta(Model.Meta): container_path = "activities" rdf_type = 'as:Activity' + def _bytes_to_json(self, obj): + if hasattr(obj, 'tobytes'): + obj = obj.tobytes() + if obj is None or obj == b'': + return {} + return json.loads(obj) + def to_activitystream(self): - return json.loads(self.payload.tobytes()) + return self._bytes_to_json(self.payload) + + def response_to_json(self): + return self._bytes_to_json(self.response_body) + + +# temporary database-side storage used for scheduled tasks in the ActivityQueue +class ScheduledActivity(Activity): + failed_attempts = models.PositiveIntegerField(default=0, help_text='a log of how many failed retries have been made sending the activity') + + def save(self, *args, **kwargs): + self.is_finished = False + super(ScheduledActivity, self).save(*args, **kwargs) class Follower(Model): @@ -315,11 +338,6 @@ class Follower(Model): def __str__(self): return 'Inbox ' + str(self.inbox) + ' on ' + str(self.object) - def save(self, *args, **kwargs): - if self.pk is None: - logger.debug('[Follower] saving Follower ' + self.__str__()) - super(Follower, self).save(*args, **kwargs) - @receiver([post_save]) def auto_urlid(sender, instance, **kwargs): diff --git a/djangoldp/tests/tests_backlinks_service.py b/djangoldp/tests/tests_backlinks_service.py index d214247b49e79645b5af430a1ce012c8c6447ba5..eff2002e441320809f66a85314f50ffb96a3a62d 100644 --- a/djangoldp/tests/tests_backlinks_service.py +++ b/djangoldp/tests/tests_backlinks_service.py @@ -1,11 +1,11 @@ -import json import uuid +import time from django.contrib.auth import get_user_model -from django.db import IntegrityError from django.test import override_settings from rest_framework.test import APIClient, APITestCase -from djangoldp.tests.models import Circle, CircleMember, Project, UserProfile, DateModel, DateChild -from djangoldp.models import Activity, Follower +from djangoldp.tests.models import Circle, Project +from djangoldp.models import Activity, ScheduledActivity +from djangoldp.activities.services import BACKLINKS_ACTOR, ActivityPubService, ActivityQueueService class TestsBacklinksService(APITestCase): @@ -22,7 +22,9 @@ class TestsBacklinksService(APITestCase): urlid = 'https://distant.com/users/' + username return get_user_model().objects.create_user(username=username, email=email, password='test', urlid=urlid) - @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX=True) + # TODO: inbox discovery (https://git.startinblox.com/djangoldp-packages/djangoldp/issues/233) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') def test_local_object_with_distant_foreign_key(self): # a local Circle with a distant owner local_circle = Circle.objects.create(description='Test') @@ -54,7 +56,7 @@ class TestsBacklinksService(APITestCase): local_circle.delete() self.assertEqual(Activity.objects.all().count(), 4) - @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX=True) + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') def test_local_object_with_external_m2m_join_leave(self): # a local project with three distant users project = Project.objects.create(description='Test') @@ -79,7 +81,7 @@ class TestsBacklinksService(APITestCase): project.delete() self.assertEqual(Activity.objects.all().count(), prior_count) - @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX=True) + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') def test_local_object_with_external_m2m_delete_parent(self): project = Project.objects.create(description='Test') external_a = self._get_random_external_user() @@ -88,3 +90,237 @@ class TestsBacklinksService(APITestCase): project.delete() self.assertEqual(Activity.objects.all().count(), prior_count + 1) + + # test that older ScheduledActivity is discarded for newer ScheduledActivity + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_old_invalid_scheduled_activity_discarded(self): + + def send_two_activities_and_assert_old_discarded(obj): + # there are two scheduled activities with the same object, (and different time stamps) + old_activity = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='old') + old_scheduled = ActivityQueueService._save_sent_activity(old_activity, ScheduledActivity) + + new_activity = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Update', summary='new') + new_scheduled = ActivityQueueService._save_sent_activity(new_activity, ScheduledActivity) + + # both are sent to the ActivityQueueService + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', old_scheduled) + time.sleep(0.1) + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', new_scheduled) + + time.sleep(0.1) + # assert that all scheduled activities were cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + + # assert that ONLY the newly scheduled activity was sent + activities = Activity.objects.all() + self.assertEquals(Activity.objects.count(), 1) + astream = activities[0].to_activitystream() + self.assertEquals(astream['summary'], new_activity['summary']) + activities[0].delete() + + # variation using expanded syntax + obj = { + '@id': 'https://test.com/users/test/' + } + send_two_activities_and_assert_old_discarded(obj) + + # variation using id-only syntax + obj = 'https://test.com/users/test/' + send_two_activities_and_assert_old_discarded(obj) + + # test that older ScheduledActivity is still sent if it's on a different object + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_old_valid_scheduled_activity_sent(self): + # there are two scheduled activities with different objects + obj = 'https://test.com/users/test1/' + activity_a = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='A') + scheduled_a = ActivityQueueService._save_sent_activity(activity_a, ScheduledActivity) + + obj = 'https://test.com/users/test2/' + activity_b = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='B') + scheduled_b = ActivityQueueService._save_sent_activity(activity_b, ScheduledActivity) + + # both are sent to the same inbox + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_a) + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_b) + + # assert that both scheduled activities were sent, and the scheduled activities were cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 2) + + # variation on the previous test where the two activities are working on different models (using the same object) + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_old_valid_scheduled_activity_sent_same_object(self): + obj = 'https://test.com/users/test1/' + target = {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/4/'} + activity_a = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Add', summary='A', target=target) + scheduled_a = ActivityQueueService._save_sent_activity(activity_a, ScheduledActivity) + + obj = 'https://test.com/users/test1/' + target = {'@type': 'hd:joboffer', '@id': 'https://api.test1.startinblox.com/job-offers/1/'} + activity_b = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Add', summary='B', target=target) + scheduled_b = ActivityQueueService._save_sent_activity(activity_b, ScheduledActivity) + + # both are sent to the same inbox + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_a) + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_b) + + # assert that both scheduled activities were sent, and the scheduled activities were cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 2) + + # variation using an Add and a Remove (one defines target, the other origin) + # also tests that an unnecessary add is not sent + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_matching_origin_and_target_not_sent(self): + a = {'type': 'Add', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'target': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + scheduled_a = ActivityQueueService._save_sent_activity(a, ScheduledActivity) + b = {'type': 'Remove', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'origin': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + scheduled_b = ActivityQueueService._save_sent_activity(b, ScheduledActivity) + + # both are sent to the same inbox + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_a) + ActivityQueueService._activity_queue_worker('http://127.0.0.1:8001/idontexist/', scheduled_b) + + # assert that both scheduled activities were sent, and the scheduled activities were cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_unnecessary_add_not_sent(self): + # an add activity was sent previously + a = {'type': 'Add', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'target': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + ActivityQueueService._save_sent_activity(a, Activity, success=True, type='add', + external_id='https://distant.com/inbox/') + + # no remove has since been sent, but a new Add is scheduled + scheduled_b = ActivityQueueService._save_sent_activity(a, ScheduledActivity, success=False, type='add', + external_id='https://distant.com/inbox/') + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled_b) + + # assert that only the previous activity was sent, and the scheduled activites cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_unnecessary_remove_not_sent(self): + # an remove activity was sent previously + a = {'type': 'Remove', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'target': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + ActivityQueueService._save_sent_activity(a, Activity, success=True, type='remove', + external_id='https://distant.com/inbox/') + + # no add has since been sent, but a new Remove is scheduled + scheduled_b = ActivityQueueService._save_sent_activity(a, ScheduledActivity, success=False, type='remove', + external_id='https://distant.com/inbox/') + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled_b) + + # assert that only the previous activity was sent, and the scheduled activites cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_necessary_add_sent(self): + # a remove activity was sent previously + a = {'type': 'Remove', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'target': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + ActivityQueueService._save_sent_activity(a, Activity, success=True, type='remove', + external_id='https://distant.com/inbox/') + + # an add is now being sent + scheduled_b = ActivityQueueService._save_sent_activity(a, ScheduledActivity, type='add', + external_id='https://distant.com/inbox/') + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled_b) + + # assert that both activities sent, and the scheduled activites cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 2) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_first_add_sent(self): + # no activity has been sent with this target, before this add + a = {'type': 'Add', 'actor': {'type': 'Service', 'name': 'Backlinks Service'}, + 'object': {'@type': 'foaf:user', '@id': 'https://api.test2.startinblox.com/users/calum/'}, + 'target': {'@type': 'hd:skill', '@id': 'https://api.test1.startinblox.com/skills/3/'}} + scheduled = ActivityQueueService._save_sent_activity(a, ScheduledActivity, success=True, type='add', + external_id='https://distant.com/inbox/') + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled) + + # assert that the activity was sent, and the scheduled activites cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) + + # validate Update activity objects have new info before sending the notification + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_unnecessary_update_not_sent(self): + # an object was sent in one activity + obj = { + '@type': 'hd:circle', + '@id': 'https://test.com/circles/8/', + 'owner': {'@id': 'https://distant.com/users/john/', + '@type': 'foaf:user'} + } + activity_a = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='A') + ActivityQueueService._save_sent_activity(activity_a, Activity, success=True, type='create', + external_id='https://distant.com/inbox/') + + # now I'm sending an update, which doesn't change anything about the object + activity_b = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='B') + scheduled_b = ActivityQueueService._save_sent_activity(activity_b, ScheduledActivity, type='update', + external_id='https://distant.com/inbox/') + + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled_b) + + # assert that only the previous activity was sent, and the scheduled activites cleaned up + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_necessary_update_is_sent(self): + # an object was sent in one activity + obj = { + '@type': 'hd:circle', + '@id': 'https://test.com/circles/8/', + 'owner': {'@id': 'https://distant.com/users/john/', + '@type': 'foaf:user'} + } + activity_a = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='A') + ActivityQueueService._save_sent_activity(activity_a, Activity, success=True, type='create', + external_id='https://distant.com/inbox/') + + # now I'm sending an update, which changes the owner of the circle + obj['owner']['@id'] = 'https://distant.com/users/mark/' + activity_b = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='B') + scheduled_b = ActivityQueueService._save_sent_activity(activity_b, ScheduledActivity, type='update', + external_id='https://distant.com/inbox/') + + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled_b) + + # assert that both activities were sent + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 2) + + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') + def test_first_update_is_sent(self): + # no prior activity was sent for this object - should send + obj = { + '@type': 'hd:circle', + '@id': 'https://test.com/circles/8/', + 'owner': {'@id': 'https://distant.com/users/john/', + '@type': 'foaf:user'} + } + activity = ActivityPubService.build_activity(BACKLINKS_ACTOR, obj, activity_type='Create', summary='A') + scheduled = ActivityQueueService._save_sent_activity(activity, ScheduledActivity, type='update', + external_id='https://distant.com/inbox/') + ActivityQueueService._activity_queue_worker('https://distant.com/inbox/', scheduled) + self.assertEquals(ScheduledActivity.objects.count(), 0) + self.assertEquals(Activity.objects.count(), 1) diff --git a/djangoldp/tests/tests_inbox.py b/djangoldp/tests/tests_inbox.py index 41dadbba95b2510dae4c389d813158a47a6a5625..6df7af6c5d494d0c963abdc282a497fdbcf7f318 100644 --- a/djangoldp/tests/tests_inbox.py +++ b/djangoldp/tests/tests_inbox.py @@ -1,10 +1,9 @@ import json from django.contrib.auth import get_user_model from django.conf import settings -from django.db import IntegrityError from django.test import override_settings from rest_framework.test import APIClient, APITestCase -from djangoldp.tests.models import Circle, CircleMember, Project, UserProfile, DateModel, DateChild +from djangoldp.tests.models import Circle, CircleMember, Project, DateModel, DateChild from djangoldp.models import Activity, Follower @@ -224,6 +223,8 @@ class TestsInbox(APITestCase): content_type='application/ld+json;profile="https://www.w3.org/ns/activitystreams"') self.assertEqual(response.status_code, 400) + # TODO: may pass an object without an explicit urlid e.g. Person actor, or Collection target + # error behaviour - unknown model def test_add_activity_unknown(self): obj = { @@ -330,10 +331,10 @@ class TestsInbox(APITestCase): self.assertEquals(len(projects), 1) self.assertEquals(len(user_projects), 0) self.assertIn("https://distant.com/projects/1/", projects.values_list('urlid', flat=True)) - self._assert_activity_created(response, prior_activity_count + 2) + self._assert_activity_created(response, prior_activity_count + 1) self.assertEqual(Follower.objects.count(), 0) - # TODO: test_remove_activity_project_using_target + # TODO: test_remove_activity_project_using_target (https://git.startinblox.com/djangoldp-packages/djangoldp/issues/231) # error behaviour - project does not exist on user def test_remove_activity_nonexistent_project(self): @@ -350,7 +351,7 @@ class TestsInbox(APITestCase): self.assertEqual(response.status_code, 201) self._assert_activity_created(response) - @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX=True) + @override_settings(SEND_BACKLINKS=True, DISABLE_OUTBOX='DEBUG') def test_removing_object_twice(self): project = Project.objects.create(urlid="https://distant.com/projects/1/") self.user.projects.add(project) @@ -485,3 +486,6 @@ class TestsInbox(APITestCase): def test_get_inbox(self): response = self.client.get('/inbox/') self.assertEqual(response.status_code, 405) + + # TODO: GET inbox for specific resource - should return a list of activities sent to this inbox + # TODO: view to access outbox (https://git.startinblox.com/djangoldp-packages/djangoldp/issues/284) diff --git a/djangoldp/views.py b/djangoldp/views.py index af66a19c7ac19a4c0d9cab2043478b8f9e1853a8..a5f6d990db0b32cc0b59cc844a2805c582d1b1bb 100644 --- a/djangoldp/views.py +++ b/djangoldp/views.py @@ -24,10 +24,11 @@ from rest_framework.views import APIView from rest_framework.viewsets import ModelViewSet from djangoldp.endpoints.webfinger import WebFingerEndpoint, WebFingerError -from djangoldp.models import LDPSource, Model, Activity, Follower +from djangoldp.models import LDPSource, Model, Follower from djangoldp.permissions import LDPPermissions from djangoldp.filters import LocalObjectOnContainerPathBackend -from djangoldp.activities import ActivityPubService, as_activitystream +from djangoldp.activities import ActivityQueueService, as_activitystream +from djangoldp.activities import ActivityPubService from djangoldp.activities.errors import ActivityStreamDecodeError, ActivityStreamValidationError import logging @@ -96,13 +97,11 @@ class InboxView(APIView): return Response({'Unable to save due to an IntegrityError in the receiver model'}, status=status.HTTP_200_OK) # save the activity and return 201 - payload = bytes(json.dumps(activity.to_json()), "utf-8") - obj = Activity.objects.create(local_id=request.path_info, payload=payload) - obj.aid = Model.absolute_url(obj) - obj.save() + obj = ActivityQueueService._save_sent_activity(activity.to_json(), local_id=request.path_info, success=True, + type=activity.type) response = Response({}, status=status.HTTP_201_CREATED) - response['Location'] = obj.aid + response['Location'] = obj.urlid return response