diff --git a/djangoldp/activities/services.py b/djangoldp/activities/services.py index 840fdc51088131e4a36ea1f9d798289d6dd3b268..629143a54b3c13a9209e4e99e50e6db9366625db 100644 --- a/djangoldp/activities/services.py +++ b/djangoldp/activities/services.py @@ -66,7 +66,7 @@ class ActivityQueueService: while True: # wait for queue item to manifest item = queue.get() - time.sleep(DEFAULT_ACTIVITY_DELAY) + time.sleep(item[2]) cls._activity_queue_worker(item[0], item[1]) cls.queue.task_done() @@ -187,9 +187,6 @@ class ActivityQueueService: return groub_b return [] - def get_fail_response(): - return {'response': None, 'activity': None} - types = get_related_activities(scheduled_activity.type) if len(types) > 0: scheduled = ScheduledActivity.objects.filter(external_id=scheduled_activity.external_id, @@ -201,15 +198,15 @@ class ActivityQueueService: if len(scheduled) > 0: scheduled_activity.delete() - return get_fail_response() + return if scheduled_activity.type == 'update' and not cls._update_is_new(url, scheduled_activity): scheduled_activity.delete() - return get_fail_response() + return if scheduled_activity.type in ['add', 'remove'] and not cls._add_remove_is_new(url, scheduled_activity): scheduled_activity.delete() - return get_fail_response() + return cls._send_activity(url, scheduled_activity) @@ -302,11 +299,11 @@ class ActivityQueueService: return True @classmethod - def _push_to_queue(cls, url, scheduled_activity): + def _push_to_queue(cls, url, scheduled_activity, delay=DEFAULT_ACTIVITY_DELAY): '''wrapper to check for singleton initialization before pushing''' if not cls.initialized: cls.start() - cls.queue.put([url, scheduled_activity]) + cls.queue.put([url, scheduled_activity, delay]) @classmethod def resend_activity(cls, url, scheduled_activity, failed=True): @@ -323,7 +320,7 @@ class ActivityQueueService: cls._push_to_queue(url, scheduled_activity) @classmethod - def send_activity(cls, url, activity, auth=None): + def send_activity(cls, url, activity, auth=None, delay=DEFAULT_ACTIVITY_DELAY): ''' saves a ScheduledActivity for the parameterised activity and passes it to the queue :param url: the recipient url inbox @@ -337,7 +334,7 @@ class ActivityQueueService: # schedule the activity scheduled = cls._save_sent_activity(activity, ScheduledActivity, external_id=url, type=activity.get('type', None)) - cls._push_to_queue(url, scheduled) + cls._push_to_queue(url, scheduled, delay) @classmethod def _save_sent_activity(cls, activity, model_represenation=ActivityModel, success=False, external_id=None, type=None,