mirror of
https://github.com/NodeBB/NodeBB.git
synced 2026-03-03 11:01:20 +01:00
refactor: move ap jobs to its own file
This commit is contained in:
@@ -3,7 +3,6 @@
|
|||||||
const nconf = require('nconf');
|
const nconf = require('nconf');
|
||||||
const winston = require('winston');
|
const winston = require('winston');
|
||||||
const { createHash, createSign, createVerify, getHashes } = require('crypto');
|
const { createHash, createSign, createVerify, getHashes } = require('crypto');
|
||||||
const { CronJob } = require('cron');
|
|
||||||
|
|
||||||
const request = require('../request');
|
const request = require('../request');
|
||||||
const db = require('../database');
|
const db = require('../database');
|
||||||
@@ -69,34 +68,7 @@ ActivityPub.feps = require('./feps');
|
|||||||
ActivityPub.rules = require('./rules');
|
ActivityPub.rules = require('./rules');
|
||||||
ActivityPub.relays = require('./relays');
|
ActivityPub.relays = require('./relays');
|
||||||
ActivityPub.out = require('./out');
|
ActivityPub.out = require('./out');
|
||||||
|
ActivityPub.jobs = require('./jobs');
|
||||||
ActivityPub.startJobs = () => {
|
|
||||||
ActivityPub.helpers.log('[activitypub/jobs] Registering jobs.');
|
|
||||||
async function tryCronJob(method) {
|
|
||||||
if (!meta.config.activitypubEnabled) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await method();
|
|
||||||
} catch (err) {
|
|
||||||
winston.error(err.stack);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
new CronJob('0 0 * * *', async () => {
|
|
||||||
await tryCronJob(async () => {
|
|
||||||
await ActivityPub.notes.prune();
|
|
||||||
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
|
|
||||||
});
|
|
||||||
}, null, true, null, null, false); // change last argument to true for debugging
|
|
||||||
|
|
||||||
new CronJob('*/30 * * * *', async () => {
|
|
||||||
await tryCronJob(ActivityPub.actors.prune);
|
|
||||||
}, null, true, null, null, false); // change last argument to true for debugging
|
|
||||||
|
|
||||||
new CronJob('0 * * * * *', async () => {
|
|
||||||
await tryCronJob(retryFailedMessages);
|
|
||||||
}, null, true, null, null, false);
|
|
||||||
};
|
|
||||||
|
|
||||||
ActivityPub.resolveId = async (uid, id) => {
|
ActivityPub.resolveId = async (uid, id) => {
|
||||||
try {
|
try {
|
||||||
@@ -472,57 +444,6 @@ ActivityPub.send = async (type, id, targets, payload) => {
|
|||||||
}).catch(err => winston.error(err.stack));
|
}).catch(err => winston.error(err.stack));
|
||||||
};
|
};
|
||||||
|
|
||||||
async function retryFailedMessages() {
|
|
||||||
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
|
|
||||||
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`)));
|
|
||||||
|
|
||||||
const retryQueueAdd = [];
|
|
||||||
const retryQueuedSet = [];
|
|
||||||
const queueIdsToRemove = [];
|
|
||||||
|
|
||||||
const oneMinute = 1000 * 60;
|
|
||||||
await Promise.all(queuedData.map(async (data, index) => {
|
|
||||||
const queueId = queueIds[index];
|
|
||||||
if (!data) {
|
|
||||||
queueIdsToRemove.push(queueId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const { uri, id, type, attempts, payload } = data;
|
|
||||||
if (!uri || !id || !type || !payload || attempts > 10) {
|
|
||||||
queueIdsToRemove.push(queueId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let payloadObj;
|
|
||||||
try {
|
|
||||||
payloadObj = JSON.parse(payload);
|
|
||||||
} catch (err) {
|
|
||||||
queueIdsToRemove.push(queueId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const ok = await sendMessage(uri, id, type, payloadObj);
|
|
||||||
if (ok) {
|
|
||||||
queueIdsToRemove.push(queueId);
|
|
||||||
} else {
|
|
||||||
const nextAttempt = (parseInt(attempts, 10) || 0) + 1;
|
|
||||||
const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff
|
|
||||||
const nextTryOn = Date.now() + timeout;
|
|
||||||
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
|
|
||||||
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
|
|
||||||
attempts: nextAttempt,
|
|
||||||
timestamp: nextTryOn,
|
|
||||||
}]);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
db.sortedSetAddBulk(retryQueueAdd),
|
|
||||||
db.setObjectBulk(retryQueuedSet),
|
|
||||||
db.sortedSetRemove('ap:retry:queue', queueIdsToRemove),
|
|
||||||
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
|
|
||||||
ActivityPub.record = async ({ id, type, actor }) => {
|
ActivityPub.record = async ({ id, type, actor }) => {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const { hostname } = new URL(actor);
|
const { hostname } = new URL(actor);
|
||||||
|
|||||||
89
src/activitypub/jobs.js
Normal file
89
src/activitypub/jobs.js
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const winston = require('winston');
|
||||||
|
const { CronJob } = require('cron');
|
||||||
|
|
||||||
|
const db = require('../database');
|
||||||
|
const meta = require('../meta');
|
||||||
|
const activitypub = module.parent.exports;
|
||||||
|
|
||||||
|
const Jobs = module.exports;
|
||||||
|
|
||||||
|
Jobs.start = () => {
|
||||||
|
activitypub.helpers.log('[activitypub/jobs] Registering jobs.');
|
||||||
|
async function tryCronJob(method) {
|
||||||
|
if (!meta.config.activitypubEnabled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await method();
|
||||||
|
} catch (err) {
|
||||||
|
winston.error(err.stack);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
new CronJob('0 0 * * *', async () => {
|
||||||
|
await tryCronJob(async () => {
|
||||||
|
await activitypub.notes.prune();
|
||||||
|
await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000);
|
||||||
|
});
|
||||||
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
|
|
||||||
|
new CronJob('*/30 * * * *', async () => {
|
||||||
|
await tryCronJob(activitypub.actors.prune);
|
||||||
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
|
|
||||||
|
new CronJob('0 * * * * *', async () => {
|
||||||
|
await tryCronJob(retryFailedMessages);
|
||||||
|
}, null, true, null, null, false);
|
||||||
|
};
|
||||||
|
|
||||||
|
async function retryFailedMessages() {
|
||||||
|
const queueIds = await db.getSortedSetRangeByScore('ap:retry:queue', 0, 50, '-inf', Date.now());
|
||||||
|
const queuedData = (await db.getObjects(queueIds.map(id => `ap:retry:queue:${id}`)));
|
||||||
|
|
||||||
|
const retryQueueAdd = [];
|
||||||
|
const retryQueuedSet = [];
|
||||||
|
const queueIdsToRemove = [];
|
||||||
|
|
||||||
|
const oneMinute = 1000 * 60;
|
||||||
|
await Promise.all(queuedData.map(async (data, index) => {
|
||||||
|
const queueId = queueIds[index];
|
||||||
|
if (!data) {
|
||||||
|
queueIdsToRemove.push(queueId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const { uri, id, type, attempts, payload } = data;
|
||||||
|
if (!uri || !id || !type || !payload || attempts > 10) {
|
||||||
|
queueIdsToRemove.push(queueId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let payloadObj;
|
||||||
|
try {
|
||||||
|
payloadObj = JSON.parse(payload);
|
||||||
|
} catch (err) {
|
||||||
|
queueIdsToRemove.push(queueId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const ok = await sendMessage(uri, id, type, payloadObj);
|
||||||
|
if (ok) {
|
||||||
|
queueIdsToRemove.push(queueId);
|
||||||
|
} else {
|
||||||
|
const nextAttempt = (parseInt(attempts, 10) || 0) + 1;
|
||||||
|
const timeout = (2 ** nextAttempt) * oneMinute; // exponential backoff
|
||||||
|
const nextTryOn = Date.now() + timeout;
|
||||||
|
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
|
||||||
|
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
|
||||||
|
attempts: nextAttempt,
|
||||||
|
timestamp: nextTryOn,
|
||||||
|
}]);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
db.sortedSetAddBulk(retryQueueAdd),
|
||||||
|
db.setObjectBulk(retryQueuedSet),
|
||||||
|
db.sortedSetRemove('ap:retry:queue', queueIdsToRemove),
|
||||||
|
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
||||||
|
]);
|
||||||
|
}
|
||||||
@@ -39,7 +39,7 @@ start.start = async function () {
|
|||||||
require('./user').startJobs();
|
require('./user').startJobs();
|
||||||
require('./plugins').startJobs();
|
require('./plugins').startJobs();
|
||||||
require('./topics').scheduled.startJobs();
|
require('./topics').scheduled.startJobs();
|
||||||
require('./activitypub').startJobs();
|
require('./activitypub').jobs.start();
|
||||||
await db.delete('locks');
|
await db.delete('locks');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user