refactor: move ap jobs to its own file

This commit is contained in:
Julian Lam
2026-01-15 15:12:57 -05:00
parent 62498a3c1b
commit c595edb4c0
3 changed files with 91 additions and 81 deletions

View File

@@ -3,7 +3,6 @@
const nconf = require('nconf');
const winston = require('winston');
const { createHash, createSign, createVerify, getHashes } = require('crypto');
const { CronJob } = require('cron');
const request = require('../request');
const db = require('../database');
@@ -69,34 +68,7 @@ ActivityPub.feps = require('./feps');
ActivityPub.rules = require('./rules');
ActivityPub.relays = require('./relays');
ActivityPub.out = require('./out');
ActivityPub.startJobs = () => {
ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.');
async function tryCronJob(method) {
if (!meta.config.activitypubEnabled) {
return;
}
try {
await method();
} catch (err) {
winston.error(err.stack);
}
}
new CronJob('0 0 * * *', async () => {
await tryCronJob(async () => {
await ActivityPub.notes.prune();
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
});
}, null, true, null, null, false); // change last argument to true for debugging
new CronJob('*/30 * * * *', async () => {
await tryCronJob(ActivityPub.actors.prune);
}, null, true, null, null, false); // change last argument to true for debugging
new CronJob('0 * * * * *', async () => {
await tryCronJob(retryFailedMessages);
}, null, true, null, null, false);
};
ActivityPub.jobs = require('./jobs');
ActivityPub.resolveId = async (uid, id) => {
try {
@@ -472,57 +444,6 @@ ActivityPub.send = async (type, id, targets, payload) => {
}).catch(err => winston.error(err.stack));
};
async function retryFailedMessages() {
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`)));
const retryQueueAdd = [];
const retryQueuedSet = [];
const queueIdsToRemove = [];
const oneMinute = 1000 * 60;
await Promise.all(queuedData.map(async (data, index) => {
const queueId = queueIds[index];
if (!data) {
queueIdsToRemove.push(queueId);
return;
}
const { uri, id, type, attempts, payload } = data;
if (!uri || !id || !type || !payload || attempts > 10) {
queueIdsToRemove.push(queueId);
return;
}
let payloadObj;
try {
payloadObj = JSON.parse(payload);
} catch (err) {
queueIdsToRemove.push(queueId);
return;
}
const ok = await sendMessage(uri, id, type, payloadObj);
if (ok) {
queueIdsToRemove.push(queueId);
} else {
const nextAttempt = (parseInt(attempts, 10) || 0) + 1;
const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff
const nextTryOn = Date.now() + timeout;
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
attempts: nextAttempt,
timestamp: nextTryOn,
}]);
}
}));
await Promise.all([
db.sortedSetAddBulk(retryQueueAdd),
db.setObjectBulk(retryQueuedSet),
db.sortedSetRemove('ap:retry:queue', queueIdsToRemove),
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
]);
}
ActivityPub.record = async ({ id, type, actor }) => {
const now = Date.now();
const { hostname } = new URL(actor);

89
src/activitypub/jobs.js Normal file
View File

@@ -0,0 +1,89 @@
'use strict';
const winston = require('winston');
const { CronJob } = require('cron');
const db = require('../database');
const meta = require('../meta');
const activitypub = module.parent.exports;
const Jobs = module.exports;
Jobs.start = () => {
activitypub.helpers.log('[activitypub/jobs] Registering jobs.');
async function tryCronJob(method) {
if (!meta.config.activitypubEnabled) {
return;
}
try {
await method();
} catch (err) {
winston.error(err.stack);
}
}
new CronJob('0 0 * * *', async () => {
await tryCronJob(async () => {
await activitypub.notes.prune();
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
});
}, null, true, null, null, false); // change last argument to true for debugging
new CronJob('*/30 * * * *', async () => {
await tryCronJob(activitypub.actors.prune);
}, null, true, null, null, false); // change last argument to true for debugging
new CronJob('0 * * * * *', async () => {
await tryCronJob(retryFailedMessages);
}, null, true, null, null, false);
};
async function retryFailedMessages() {
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`)));
const retryQueueAdd = [];
const retryQueuedSet = [];
const queueIdsToRemove = [];
const oneMinute = 1000 * 60;
await Promise.all(queuedData.map(async (data, index) => {
const queueId = queueIds[index];
if (!data) {
queueIdsToRemove.push(queueId);
return;
}
const { uri, id, type, attempts, payload } = data;
if (!uri || !id || !type || !payload || attempts > 10) {
queueIdsToRemove.push(queueId);
return;
}
let payloadObj;
try {
payloadObj = JSON.parse(payload);
} catch (err) {
queueIdsToRemove.push(queueId);
return;
}
const ok = await sendMessage(uri, id, type, payloadObj);
if (ok) {
queueIdsToRemove.push(queueId);
} else {
const nextAttempt = (parseInt(attempts, 10) || 0) + 1;
const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff
const nextTryOn = Date.now() + timeout;
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
attempts: nextAttempt,
timestamp: nextTryOn,
}]);
}
}));
await Promise.all([
db.sortedSetAddBulk(retryQueueAdd),
db.setObjectBulk(retryQueuedSet),
db.sortedSetRemove('ap:retry:queue', queueIdsToRemove),
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
]);
}

View File

@@ -39,7 +39,7 @@ start.start = async function () {
require('./user').startJobs();
require('./plugins').startJobs();
require('./topics').scheduled.startJobs();
require('./activitypub').startJobs();
require('./activitypub').jobs.start();
await db.delete('locks');
}