diff --git a/src/activitypub/inbox.js b/src/activitypub/inbox.js index ec2f9d7fb0..cd13982b8c 100644 --- a/src/activitypub/inbox.js +++ b/src/activitypub/inbox.js @@ -617,6 +617,8 @@ inbox.reject = async (req) => { const queueId = `${type}:${id}:${hostname}`; // stop retrying rejected requests - clearTimeout(activitypub.retryQueue.get(queueId)); - activitypub.retryQueue.delete(queueId); + await Promise.all([ + db.sortedSetRemove('ap:retry:queue', queueId), + db.delete(`ap:retry:queue:${queueId}`), + ]); }; diff --git a/src/activitypub/index.js b/src/activitypub/index.js index f4dd3cb5d2..fac6a92ed8 100644 --- a/src/activitypub/index.js +++ b/src/activitypub/index.js @@ -14,9 +14,7 @@ const messaging = require('../messaging'); const user = require('../user'); const utils = require('../utils'); const ttl = require('../cache/ttl'); -const lru = require('../cache/lru'); const batch = require('../batch'); -const pubsub = require('../pubsub'); const analytics = require('../analytics'); const requestCache = ttl({ @@ -69,28 +67,30 @@ ActivityPub.feps = require('./feps'); ActivityPub.startJobs = () => { ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.'); - new CronJob('0 0 * * *', async () => { + async function tryCronJob(method) { if (!meta.config.activitypubEnabled) { return; } try { - await ActivityPub.notes.prune(); - await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); + await method(); } catch (err) { winston.error(err.stack); } + } + new CronJob('0 0 * * *', async () => { + await tryCronJob(async () => { + await ActivityPub.notes.prune(); + await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); + }); }, null, true, null, null, false); // change last argument to true for debugging new CronJob('*/30 * * * *', async () => { - if (!meta.config.activitypubEnabled) { - return; - } - try { - await ActivityPub.actors.prune(); - } catch (err) { - winston.error(err.stack); - } + await tryCronJob(ActivityPub.actors.prune); }, null, true, null, null, false); // change last argument to true for debugging + + new CronJob('0 * * * * *', async () => { + await tryCronJob(retryFailedMessages); + }, null, true, null, null, false); }; ActivityPub.resolveId = async (uid, id) => { @@ -348,29 +348,11 @@ ActivityPub.get = async (type, id, uri, options) => { } }; -ActivityPub.retryQueue = lru({ - name: 'activitypub-retry-queue', - max: 4000, - ttl: 1000 * 60 * 60 * 24 * 60, - dispose: (value) => { - if (value) { - clearTimeout(value); - } - }, -}); - -// handle clearing retry queue from another member of the cluster -pubsub.on(`activitypub-retry-queue:lruCache:del`, (keys) => { - if (Array.isArray(keys)) { - keys.forEach(key => clearTimeout(ActivityPub.retryQueue.get(key))); - } -}); - -async function sendMessage(uri, id, type, payload, attempts = 1) { - const keyData = await ActivityPub.getPrivateKey(type, id); - const headers = await ActivityPub.sign(keyData, uri, payload); - +async function sendMessage(uri, id, type, payload) { try { + const keyData = await ActivityPub.getPrivateKey(type, id); + const headers = await ActivityPub.sign(keyData, uri, payload); + const { response, body } = await request.post(uri, { headers: { ...headers, @@ -382,25 +364,15 @@ async function sendMessage(uri, id, type, payload, attempts = 1) { if (String(response.statusCode).startsWith('2')) { ActivityPub.helpers.log(`[activitypub/send] Successfully sent ${payload.type} to ${uri}`); - } else { - if (typeof body === 'object') { - throw new Error(JSON.stringify(body)); - } - throw new Error(String(body)); + return true; } + if (typeof body === 'object') { + throw new Error(JSON.stringify(body)); + } + throw new Error(String(body)); } catch (e) { ActivityPub.helpers.log(`[activitypub/send] Could not send ${payload.type} to ${uri}; error: ${e.message}`); - // add to retry queue - if (attempts < 12) { // stop attempting after ~2 months - const timeout = (4 ** attempts) * 1000; // exponential backoff - const queueId = `${payload.type}:${payload.id}:${new URL(uri).hostname}`; - const timeoutId = setTimeout(() => sendMessage(uri, id, type, payload, attempts + 1), timeout); - ActivityPub.retryQueue.set(queueId, timeoutId); - - ActivityPub.helpers.log(`[activitypub/send] Added ${payload.type} to ${uri} to retry queue for ${timeout}ms`); - } else { - winston.warn(`[activitypub/send] Max attempts reached for ${payload.type} to ${uri}; giving up on sending`); - } + return false; } } @@ -429,17 +401,75 @@ ActivityPub.send = async (type, id, targets, payload) => { ...payload, }; - // Runs in background... potentially a better queue is required... later. - batch.processArray( - inboxes, - async inboxBatch => Promise.all(inboxBatch.map(async uri => sendMessage(uri, id, type, payload))), - { - batch: 50, - interval: 100, - }, - ); + const oneMinute = 1000 * 60; + batch.processArray(inboxes, async (inboxBatch) => { + const retryQueueAdd = []; + const retryQueuedSet = []; + + await Promise.all(inboxBatch.map(async (uri) => { + const ok = await sendMessage(uri, id, type, payload); + if (!ok) { + const queueId = `${type}:${id}:${new URL(uri).hostname}`; + const nextTryOn = Date.now() + oneMinute; + retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); + retryQueuedSet.push([`ap:retry:queue:${queueId}`, { + queueId, + uri, + id, + type, + attempts: 1, + timestamp: nextTryOn, + payload: JSON.stringify(payload), + }]); + } + })); + + if (retryQueueAdd.length) { + await Promise.all([ + db.sortedSetAddBulk(retryQueueAdd), + db.setObjectBulk(retryQueuedSet), + ]); + } + }, { + batch: 50, + interval: 100, + }).catch(err => winston.error(err.stack)); }; +async function retryFailedMessages() { + const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now()); + const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))).filter(Boolean); + + const retryQueueAdd = []; + const retryQueuedSet = []; + const queueIdsToRemove = []; + + const oneMinute = 1000 * 60; + await Promise.all(queuedData.map(async (data) => { + const { queueId, uri, id, type, attempts, payload } = data; + const payloadObj = JSON.parse(payload); + + const ok = await sendMessage(uri, id, type, payloadObj); + + if (ok || attempts > 10) { + queueIdsToRemove.push(queueId); + } else { + const nextAttempt = (parseInt(attempts, 10) || 0) + 1; + const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff + const nextTryOn = Date.now() + timeout; + retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); + retryQueuedSet.push([`ap:retry:queue:${queueId}`, { + attempts: nextAttempt, + timestamp: nextTryOn, + }]); + } + })); + await Promise.all([ + db.sortedSetRemove('ap:retry:queue', queueIdsToRemove), + db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), + ]); +} + ActivityPub.record = async ({ id, type, actor }) => { const now = Date.now(); const { hostname } = new URL(actor);