diff --git a/src/activitypub/inbox.js b/src/activitypub/inbox.js index 19686a8bf8..80412aca8b 100644 --- a/src/activitypub/inbox.js +++ b/src/activitypub/inbox.js @@ -412,3 +412,14 @@ inbox.flag = async (req) => { } })); }; + +inbox.reject = async (req) => { + const { actor, object } = req.body; + const { type, id } = object; + const { hostname } = new URL(actor); + const queueId = `${type}:${id}:${hostname}`; + + // stop retrying rejected requests + clearTimeout(activitypub.retryQueue.get(queueId)); + activitypub.retryQueue.delete(queueId); +}; diff --git a/src/activitypub/index.js b/src/activitypub/index.js index 357de15f9f..89d1fa0523 100644 --- a/src/activitypub/index.js +++ b/src/activitypub/index.js @@ -10,6 +10,7 @@ const meta = require('../meta'); const user = require('../user'); const utils = require('../utils'); const ttl = require('../cache/ttl'); +const lru = require('../cache/lru'); const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes const ActivityPub = module.exports; @@ -252,6 +253,40 @@ ActivityPub.get = async (type, id, uri) => { } }; +ActivityPub.retryQueue = lru({ name: 'activitypub-retry-queue', max: 4000, ttl: 1000 * 60 * 60 * 24 * 60 }); + +async function sendMessage(uri, id, type, payload, attempts = 1) { + const keyData = await ActivityPub.getPrivateKey(type, id); + const headers = await ActivityPub.sign(keyData, uri, payload); + winston.verbose(`[activitypub/send] ${uri}`); + try { + const { response, body } = await request.post(uri, { + headers: { + ...headers, + 'content-type': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', + }, + body: payload, + }); + + if (String(response.statusCode).startsWith('2')) { + winston.verbose(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`); + } else { + throw new Error(String(body)); + } + } catch (e) { + winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`); + // add to retry queue + if (attempts < 12) { // stop attempting after ~2 months + const timeout = (4 ** attempts) * 1000; // exponential backoff + const queueId = `${payload.type}:${payload.id}:${new URL(uri).hostname}`; + const timeoutId = setTimeout(() => sendMessage(uri, id, type, payload, attempts + 1), timeout); + ActivityPub.retryQueue.set(queueId, timeoutId); + + winston.verbose(`[activitypub/send] Added ${payload.type} to ${uri} to retry queue for ${timeout}ms`); + } + } +} + ActivityPub.send = async (type, id, targets, payload) => { if (!Array.isArray(targets)) { targets = [targets]; @@ -267,26 +302,5 @@ ActivityPub.send = async (type, id, targets, payload) => { ...payload, }; - await Promise.all(inboxes.map(async (uri) => { - const keyData = await ActivityPub.getPrivateKey(type, id); - const headers = await ActivityPub.sign(keyData, uri, payload); - winston.verbose(`[activitypub/send] ${uri}`); - try { - const { response, body } = await request.post(uri, { - headers: { - ...headers, - 'content-type': 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', - }, - body: payload, - }); - - if (String(response.statusCode).startsWith('2')) { - winston.verbose(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`); - } else { - winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${String(body)}`); - } - } catch (e) { - winston.warn(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`); - } - })); + await Promise.all(inboxes.map(async uri => sendMessage(uri, id, type, payload))); };