mirror of
https://github.com/NodeBB/NodeBB.git
synced 2026-03-03 11:01:20 +01:00
feat: opportunistic backfill, #13895
This commit is contained in:
@@ -5,6 +5,8 @@ const { CronJob } = require('cron');
|
|||||||
|
|
||||||
const db = require('../database');
|
const db = require('../database');
|
||||||
const meta = require('../meta');
|
const meta = require('../meta');
|
||||||
|
const topics = require('../topics');
|
||||||
|
const utils = require('../utils');
|
||||||
const activitypub = module.parent.exports;
|
const activitypub = module.parent.exports;
|
||||||
|
|
||||||
const Jobs = module.exports;
|
const Jobs = module.exports;
|
||||||
@@ -34,7 +36,11 @@ Jobs.start = () => {
|
|||||||
|
|
||||||
new CronJob('0 * * * * *', async () => {
|
new CronJob('0 * * * * *', async () => {
|
||||||
await tryCronJob(retryFailedMessages);
|
await tryCronJob(retryFailedMessages);
|
||||||
}, null, true, null, null, false);
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
|
|
||||||
|
new CronJob('15 * * * *', async () => {
|
||||||
|
await tryCronJob(backfill);
|
||||||
|
}, null, true, null, null, false); // change last argument to true for debugging
|
||||||
};
|
};
|
||||||
|
|
||||||
async function retryFailedMessages() {
|
async function retryFailedMessages() {
|
||||||
@@ -87,3 +93,22 @@ async function retryFailedMessages() {
|
|||||||
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function backfill() {
|
||||||
|
const start = 0;
|
||||||
|
const stop = meta.config.topicsPerPage - 1;
|
||||||
|
const sorted = await topics.getSortedTopics({
|
||||||
|
term: 'day',
|
||||||
|
sort: 'posts',
|
||||||
|
uid: 0,
|
||||||
|
start,
|
||||||
|
stop,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Remote mainPids only
|
||||||
|
const pids = sorted.topics
|
||||||
|
.map(({ mainPid }) => mainPid)
|
||||||
|
.filter(pid => !utils.isNumber(pid));
|
||||||
|
|
||||||
|
await activitypub.notes.backfill(pids);
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,11 +16,19 @@ const user = require('../user');
|
|||||||
const topics = require('../topics');
|
const topics = require('../topics');
|
||||||
const posts = require('../posts');
|
const posts = require('../posts');
|
||||||
const api = require('../api');
|
const api = require('../api');
|
||||||
|
const ttlCache = require('../cache/ttl');
|
||||||
|
const websockets = require('../socket.io');
|
||||||
const utils = require('../utils');
|
const utils = require('../utils');
|
||||||
|
|
||||||
const activitypub = module.parent.exports;
|
const activitypub = module.parent.exports;
|
||||||
const Notes = module.exports;
|
const Notes = module.exports;
|
||||||
|
|
||||||
|
const backfillCache = ttlCache({
|
||||||
|
name: 'ap-backfill-cache',
|
||||||
|
max: 500,
|
||||||
|
ttl: 1000 * 60 * 2, // 2 minutes
|
||||||
|
});
|
||||||
|
|
||||||
Notes._normalizeTags = async (tag, cid) => {
|
Notes._normalizeTags = async (tag, cid) => {
|
||||||
const systemTags = (meta.config.systemTags || '').split(',');
|
const systemTags = (meta.config.systemTags || '').split(',');
|
||||||
const maxTags = await categories.getCategoryField(cid, 'maxTags');
|
const maxTags = await categories.getCategoryField(cid, 'maxTags');
|
||||||
@@ -255,17 +263,31 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let added = [];
|
||||||
await Promise.all(unprocessed.map(async (post) => {
|
await Promise.all(unprocessed.map(async (post) => {
|
||||||
const { to, cc } = post._activitypub;
|
const { to, cc } = post._activitypub;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await topics.reply(post);
|
const postData = await topics.reply(post);
|
||||||
|
added.push(postData);
|
||||||
await Notes.updateLocalRecipients(post.pid, { to, cc });
|
await Notes.updateLocalRecipients(post.pid, { to, cc });
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`);
|
activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
if (added.length) {
|
||||||
|
// Because replies are added in parallel, `index` is calculated incorrectly
|
||||||
|
added = added
|
||||||
|
.sort((a, b) => a.timestamp - b.timestamp)
|
||||||
|
.map((post, idx) => {
|
||||||
|
post.index = post.index - idx;
|
||||||
|
return post;
|
||||||
|
})
|
||||||
|
.reverse();
|
||||||
|
websockets.in(`topic_${tid}`).emit('event:new_post', { posts: added });
|
||||||
|
}
|
||||||
|
|
||||||
await Notes.syncUserInboxes(tid, uid);
|
await Notes.syncUserInboxes(tid, uid);
|
||||||
|
|
||||||
if (crosspostCid) {
|
if (crosspostCid) {
|
||||||
@@ -584,6 +606,22 @@ Notes.getCategoryFollowers = async (cid) => {
|
|||||||
return uids;
|
return uids;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Notes.backfill = async (pids) => {
|
||||||
|
if (!Array.isArray(pids)) {
|
||||||
|
pids = [pids];
|
||||||
|
}
|
||||||
|
|
||||||
|
return Promise.all(pids.map(async (pid) => {
|
||||||
|
if (backfillCache.has(pid)) {
|
||||||
|
console.log('cache hit, not proactively backfilling');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await Notes.assert(0, pid, { skipChecks: 1 });
|
||||||
|
backfillCache.set(pid, 1);
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
|
||||||
Notes.announce = {};
|
Notes.announce = {};
|
||||||
|
|
||||||
Notes.announce.list = async ({ pid, tid }) => {
|
Notes.announce.list = async ({ pid, tid }) => {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ const helpers = require('./helpers');
|
|||||||
const pagination = require('../pagination');
|
const pagination = require('../pagination');
|
||||||
const utils = require('../utils');
|
const utils = require('../utils');
|
||||||
const analytics = require('../analytics');
|
const analytics = require('../analytics');
|
||||||
|
const activitypub = require('../activitypub');
|
||||||
|
|
||||||
const topicsController = module.exports;
|
const topicsController = module.exports;
|
||||||
|
|
||||||
@@ -142,11 +143,18 @@ topicsController.get = async function getTopic(req, res, next) {
|
|||||||
res.locals.linkTags.push(rel);
|
res.locals.linkTags.push(rel);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (meta.config.activitypubEnabled && postAtIndex) {
|
if (meta.config.activitypubEnabled) {
|
||||||
// Include link header for richer parsing
|
if (postAtIndex) {
|
||||||
const { pid } = postAtIndex;
|
// Include link header for richer parsing
|
||||||
const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid;
|
const { pid } = postAtIndex;
|
||||||
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`);
|
const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid;
|
||||||
|
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!utils.isNumber(topicData.mainPid)) {
|
||||||
|
// not awaited on purpose so topic loading is not blocked
|
||||||
|
activitypub.notes.backfill(topicData.mainPid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.render('topic', topicData);
|
res.render('topic', topicData);
|
||||||
|
|||||||
Reference in New Issue
Block a user