mirror of
https://github.com/NodeBB/NodeBB.git
synced 2026-03-03 11:01:20 +01:00
feat: opportunistic backfill, #13895
This commit is contained in:
@@ -5,6 +5,8 @@ const { CronJob } = require('cron');
|
||||
|
||||
const db = require('../database');
|
||||
const meta = require('../meta');
|
||||
const topics = require('../topics');
|
||||
const utils = require('../utils');
|
||||
const activitypub = module.parent.exports;
|
||||
|
||||
const Jobs = module.exports;
|
||||
@@ -34,7 +36,11 @@ Jobs.start = () => {
|
||||
|
||||
new CronJob('0 * * * * *', async () => {
|
||||
await tryCronJob(retryFailedMessages);
|
||||
}, null, true, null, null, false);
|
||||
}, null, true, null, null, false); // change last argument to true for debugging
|
||||
|
||||
new CronJob('15 * * * *', async () => {
|
||||
await tryCronJob(backfill);
|
||||
}, null, true, null, null, false); // change last argument to true for debugging
|
||||
};
|
||||
|
||||
async function retryFailedMessages() {
|
||||
@@ -87,3 +93,22 @@ async function retryFailedMessages() {
|
||||
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
|
||||
]);
|
||||
}
|
||||
|
||||
async function backfill() {
|
||||
const start = 0;
|
||||
const stop = meta.config.topicsPerPage - 1;
|
||||
const sorted = await topics.getSortedTopics({
|
||||
term: 'day',
|
||||
sort: 'posts',
|
||||
uid: 0,
|
||||
start,
|
||||
stop,
|
||||
});
|
||||
|
||||
// Remote mainPids only
|
||||
const pids = sorted.topics
|
||||
.map(({ mainPid }) => mainPid)
|
||||
.filter(pid => !utils.isNumber(pid));
|
||||
|
||||
await activitypub.notes.backfill(pids);
|
||||
}
|
||||
|
||||
@@ -16,11 +16,19 @@ const user = require('../user');
|
||||
const topics = require('../topics');
|
||||
const posts = require('../posts');
|
||||
const api = require('../api');
|
||||
const ttlCache = require('../cache/ttl');
|
||||
const websockets = require('../socket.io');
|
||||
const utils = require('../utils');
|
||||
|
||||
const activitypub = module.parent.exports;
|
||||
const Notes = module.exports;
|
||||
|
||||
const backfillCache = ttlCache({
|
||||
name: 'ap-backfill-cache',
|
||||
max: 500,
|
||||
ttl: 1000 * 60 * 2, // 2 minutes
|
||||
});
|
||||
|
||||
Notes._normalizeTags = async (tag, cid) => {
|
||||
const systemTags = (meta.config.systemTags || '').split(',');
|
||||
const maxTags = await categories.getCategoryField(cid, 'maxTags');
|
||||
@@ -255,17 +263,31 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
|
||||
}
|
||||
}
|
||||
|
||||
let added = [];
|
||||
await Promise.all(unprocessed.map(async (post) => {
|
||||
const { to, cc } = post._activitypub;
|
||||
|
||||
try {
|
||||
await topics.reply(post);
|
||||
const postData = await topics.reply(post);
|
||||
added.push(postData);
|
||||
await Notes.updateLocalRecipients(post.pid, { to, cc });
|
||||
} catch (e) {
|
||||
activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`);
|
||||
}
|
||||
}));
|
||||
|
||||
if (added.length) {
|
||||
// Because replies are added in parallel, `index` is calculated incorrectly
|
||||
added = added
|
||||
.sort((a, b) => a.timestamp - b.timestamp)
|
||||
.map((post, idx) => {
|
||||
post.index = post.index - idx;
|
||||
return post;
|
||||
})
|
||||
.reverse();
|
||||
websockets.in(`topic_${tid}`).emit('event:new_post', { posts: added });
|
||||
}
|
||||
|
||||
await Notes.syncUserInboxes(tid, uid);
|
||||
|
||||
if (crosspostCid) {
|
||||
@@ -584,6 +606,22 @@ Notes.getCategoryFollowers = async (cid) => {
|
||||
return uids;
|
||||
};
|
||||
|
||||
Notes.backfill = async (pids) => {
|
||||
if (!Array.isArray(pids)) {
|
||||
pids = [pids];
|
||||
}
|
||||
|
||||
return Promise.all(pids.map(async (pid) => {
|
||||
if (backfillCache.has(pid)) {
|
||||
console.log('cache hit, not proactively backfilling');
|
||||
return;
|
||||
}
|
||||
|
||||
await Notes.assert(0, pid, { skipChecks: 1 });
|
||||
backfillCache.set(pid, 1);
|
||||
}));
|
||||
};
|
||||
|
||||
Notes.announce = {};
|
||||
|
||||
Notes.announce.list = async ({ pid, tid }) => {
|
||||
|
||||
@@ -14,6 +14,7 @@ const helpers = require('./helpers');
|
||||
const pagination = require('../pagination');
|
||||
const utils = require('../utils');
|
||||
const analytics = require('../analytics');
|
||||
const activitypub = require('../activitypub');
|
||||
|
||||
const topicsController = module.exports;
|
||||
|
||||
@@ -142,11 +143,18 @@ topicsController.get = async function getTopic(req, res, next) {
|
||||
res.locals.linkTags.push(rel);
|
||||
});
|
||||
|
||||
if (meta.config.activitypubEnabled && postAtIndex) {
|
||||
// Include link header for richer parsing
|
||||
const { pid } = postAtIndex;
|
||||
const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid;
|
||||
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`);
|
||||
if (meta.config.activitypubEnabled) {
|
||||
if (postAtIndex) {
|
||||
// Include link header for richer parsing
|
||||
const { pid } = postAtIndex;
|
||||
const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid;
|
||||
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`);
|
||||
}
|
||||
|
||||
if (!utils.isNumber(topicData.mainPid)) {
|
||||
// not awaited on purpose so topic loading is not blocked
|
||||
activitypub.notes.backfill(topicData.mainPid);
|
||||
}
|
||||
}
|
||||
|
||||
res.render('topic', topicData);
|
||||
|
||||
Reference in New Issue
Block a user