From c595edb4c0e978d220fc0f01fa08c651a920337c Mon Sep 17 00:00:00 2001 From: Julian Lam Date: Thu, 15 Jan 2026 15:12:57 -0500 Subject: [PATCH] refactor: move ap jobs to its own file --- src/activitypub/index.js | 81 +----------------------------------- src/activitypub/jobs.js | 89 ++++++++++++++++++++++++++++++++++++++++ src/start.js | 2 +- 3 files changed, 91 insertions(+), 81 deletions(-) create mode 100644 src/activitypub/jobs.js diff --git a/src/activitypub/index.js b/src/activitypub/index.js index 92dd10b544..d60ddb144b 100644 --- a/src/activitypub/index.js +++ b/src/activitypub/index.js @@ -3,7 +3,6 @@ const nconf = require('nconf'); const winston = require('winston'); const { createHash, createSign, createVerify, getHashes } = require('crypto'); -const { CronJob } = require('cron'); const request = require('../request'); const db = require('../database'); @@ -69,34 +68,7 @@ ActivityPub.feps = require('./feps'); ActivityPub.rules = require('./rules'); ActivityPub.relays = require('./relays'); ActivityPub.out = require('./out'); - -ActivityPub.startJobs = () => { - ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.'); - async function tryCronJob(method) { - if (!meta.config.activitypubEnabled) { - return; - } - try { - await method(); - } catch (err) { - winston.error(err.stack); - } - } - new CronJob('0 0 * * *', async () => { - await tryCronJob(async () => { - await ActivityPub.notes.prune(); - await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); - }); - }, null, true, null, null, false); // change last argument to true for debugging - - new CronJob('*/30 * * * *', async () => { - await tryCronJob(ActivityPub.actors.prune); - }, null, true, null, null, false); // change last argument to true for debugging - - new CronJob('0 * * * * *', async () => { - await tryCronJob(retryFailedMessages); - }, null, true, null, null, false); -}; +ActivityPub.jobs = require('./jobs'); ActivityPub.resolveId = async (uid, id) => { try { @@ -472,57 +444,6 @@ ActivityPub.send = async (type, id, targets, payload) => { }).catch(err => winston.error(err.stack)); }; -async function retryFailedMessages() { - const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now()); - const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))); - - const retryQueueAdd = []; - const retryQueuedSet = []; - const queueIdsToRemove = []; - - const oneMinute = 1000 * 60; - await Promise.all(queuedData.map(async (data, index) => { - const queueId = queueIds[index]; - if (!data) { - queueIdsToRemove.push(queueId); - return; - } - - const { uri, id, type, attempts, payload } = data; - if (!uri || !id || !type || !payload || attempts > 10) { - queueIdsToRemove.push(queueId); - return; - } - let payloadObj; - try { - payloadObj = JSON.parse(payload); - } catch (err) { - queueIdsToRemove.push(queueId); - return; - } - const ok = await sendMessage(uri, id, type, payloadObj); - if (ok) { - queueIdsToRemove.push(queueId); - } else { - const nextAttempt = (parseInt(attempts, 10) || 0) + 1; - const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff - const nextTryOn = Date.now() + timeout; - retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); - retryQueuedSet.push([`ap:retry:queue:${queueId}`, { - attempts: nextAttempt, - timestamp: nextTryOn, - }]); - } - })); - - await Promise.all([ - db.sortedSetAddBulk(retryQueueAdd), - db.setObjectBulk(retryQueuedSet), - db.sortedSetRemove('ap:retry:queue', queueIdsToRemove), - db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), - ]); -} - ActivityPub.record = async ({ id, type, actor }) => { const now = Date.now(); const { hostname } = new URL(actor); diff --git a/src/activitypub/jobs.js b/src/activitypub/jobs.js new file mode 100644 index 0000000000..399e92bb85 --- /dev/null +++ b/src/activitypub/jobs.js @@ -0,0 +1,89 @@ +'use strict'; + +const winston = require('winston'); +const { CronJob } = require('cron'); + +const db = require('../database'); +const meta = require('../meta'); +const activitypub = module.parent.exports; + +const Jobs = module.exports; + +Jobs.start = () => { + activitypub.helpers.log('[activitypub/jobs] Registering jobs.'); + async function tryCronJob(method) { + if (!meta.config.activitypubEnabled) { + return; + } + try { + await method(); + } catch (err) { + winston.error(err.stack); + } + } + new CronJob('0 0 * * *', async () => { + await tryCronJob(async () => { + await activitypub.notes.prune(); + await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); + }); + }, null, true, null, null, false); // change last argument to true for debugging + + new CronJob('*/30 * * * *', async () => { + await tryCronJob(activitypub.actors.prune); + }, null, true, null, null, false); // change last argument to true for debugging + + new CronJob('0 * * * * *', async () => { + await tryCronJob(retryFailedMessages); + }, null, true, null, null, false); +}; + +async function retryFailedMessages() { + const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now()); + const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`))); + + const retryQueueAdd = []; + const retryQueuedSet = []; + const queueIdsToRemove = []; + + const oneMinute = 1000 * 60; + await Promise.all(queuedData.map(async (data, index) => { + const queueId = queueIds[index]; + if (!data) { + queueIdsToRemove.push(queueId); + return; + } + + const { uri, id, type, attempts, payload } = data; + if (!uri || !id || !type || !payload || attempts > 10) { + queueIdsToRemove.push(queueId); + return; + } + let payloadObj; + try { + payloadObj = JSON.parse(payload); + } catch (err) { + queueIdsToRemove.push(queueId); + return; + } + const ok = await sendMessage(uri, id, type, payloadObj); + if (ok) { + queueIdsToRemove.push(queueId); + } else { + const nextAttempt = (parseInt(attempts, 10) || 0) + 1; + const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff + const nextTryOn = Date.now() + timeout; + retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]); + retryQueuedSet.push([`ap:retry:queue:${queueId}`, { + attempts: nextAttempt, + timestamp: nextTryOn, + }]); + } + })); + + await Promise.all([ + db.sortedSetAddBulk(retryQueueAdd), + db.setObjectBulk(retryQueuedSet), + db.sortedSetRemove('ap:retry:queue', queueIdsToRemove), + db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), + ]); +} diff --git a/src/start.js b/src/start.js index c4dd925aad..2928ac6bb6 100644 --- a/src/start.js +++ b/src/start.js @@ -39,7 +39,7 @@ start.start = async function () { require('./user').startJobs(); require('./plugins').startJobs(); require('./topics').scheduled.startJobs(); - require('./activitypub').startJobs(); + require('./activitypub').jobs.start(); await db.delete('locks'); }