feat: opportunistic backfill, #13895

This commit is contained in:
Julian Lam
2026-01-16 15:12:16 -05:00
parent 4bab9fb446
commit 33c2de9c5b
3 changed files with 78 additions and 7 deletions

View File

@@ -5,6 +5,8 @@ const { CronJob } = require('cron');
const db = require('../database'); const db = require('../database');
const meta = require('../meta'); const meta = require('../meta');
const topics = require('../topics');
const utils = require('../utils');
const activitypub = module.parent.exports; const activitypub = module.parent.exports;
const Jobs = module.exports; const Jobs = module.exports;
@@ -34,7 +36,11 @@ Jobs.start = () => {
new CronJob('0 * * * * *', async () => { new CronJob('0 * * * * *', async () => {
await tryCronJob(retryFailedMessages); await tryCronJob(retryFailedMessages);
}, null, true, null, null, false); }, null, true, null, null, false); // change last argument to true for debugging
new CronJob('15 * * * *', async () => {
await tryCronJob(backfill);
}, null, true, null, null, false); // change last argument to true for debugging
}; };
async function retryFailedMessages() { async function retryFailedMessages() {
@@ -87,3 +93,22 @@ async function retryFailedMessages() {
db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)),
]); ]);
} }
async function backfill() {
const start = 0;
const stop = meta.config.topicsPerPage - 1;
const sorted = await topics.getSortedTopics({
term: 'day',
sort: 'posts',
uid: 0,
start,
stop,
});
// Remote mainPids only
const pids = sorted.topics
.map(({ mainPid }) => mainPid)
.filter(pid => !utils.isNumber(pid));
await activitypub.notes.backfill(pids);
}

View File

@@ -16,11 +16,19 @@ const user = require('../user');
const topics = require('../topics'); const topics = require('../topics');
const posts = require('../posts'); const posts = require('../posts');
const api = require('../api'); const api = require('../api');
const ttlCache = require('../cache/ttl');
const websockets = require('../socket.io');
const utils = require('../utils'); const utils = require('../utils');
const activitypub = module.parent.exports; const activitypub = module.parent.exports;
const Notes = module.exports; const Notes = module.exports;
const backfillCache = ttlCache({
name: 'ap-backfill-cache',
max: 500,
ttl: 1000 * 60 * 2, // 2 minutes
});
Notes._normalizeTags = async (tag, cid) => { Notes._normalizeTags = async (tag, cid) => {
const systemTags = (meta.config.systemTags || '').split(','); const systemTags = (meta.config.systemTags || '').split(',');
const maxTags = await categories.getCategoryField(cid, 'maxTags'); const maxTags = await categories.getCategoryField(cid, 'maxTags');
@@ -255,17 +263,31 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => {
} }
} }
let added = [];
await Promise.all(unprocessed.map(async (post) => { await Promise.all(unprocessed.map(async (post) => {
const { to, cc } = post._activitypub; const { to, cc } = post._activitypub;
try { try {
await topics.reply(post); const postData = await topics.reply(post);
added.push(postData);
await Notes.updateLocalRecipients(post.pid, { to, cc }); await Notes.updateLocalRecipients(post.pid, { to, cc });
} catch (e) { } catch (e) {
activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`); activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`);
} }
})); }));
if (added.length) {
// Because replies are added in parallel, `index` is calculated incorrectly
added = added
.sort((a, b) => a.timestamp - b.timestamp)
.map((post, idx) => {
post.index = post.index - idx;
return post;
})
.reverse();
websockets.in(`topic_${tid}`).emit('event:new_post', { posts: added });
}
await Notes.syncUserInboxes(tid, uid); await Notes.syncUserInboxes(tid, uid);
if (crosspostCid) { if (crosspostCid) {
@@ -584,6 +606,22 @@ Notes.getCategoryFollowers = async (cid) => {
return uids; return uids;
}; };
Notes.backfill = async (pids) => {
if (!Array.isArray(pids)) {
pids = [pids];
}
return Promise.all(pids.map(async (pid) => {
if (backfillCache.has(pid)) {
console.log('cache hit, not proactively backfilling');
return;
}
await Notes.assert(0, pid, { skipChecks: 1 });
backfillCache.set(pid, 1);
}));
};
Notes.announce = {}; Notes.announce = {};
Notes.announce.list = async ({ pid, tid }) => { Notes.announce.list = async ({ pid, tid }) => {

View File

@@ -14,6 +14,7 @@ const helpers = require('./helpers');
const pagination = require('../pagination'); const pagination = require('../pagination');
const utils = require('../utils'); const utils = require('../utils');
const analytics = require('../analytics'); const analytics = require('../analytics');
const activitypub = require('../activitypub');
const topicsController = module.exports; const topicsController = module.exports;
@@ -142,11 +143,18 @@ topicsController.get = async function getTopic(req, res, next) {
res.locals.linkTags.push(rel); res.locals.linkTags.push(rel);
}); });
if (meta.config.activitypubEnabled && postAtIndex) { if (meta.config.activitypubEnabled) {
// Include link header for richer parsing if (postAtIndex) {
const { pid } = postAtIndex; // Include link header for richer parsing
const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid; const { pid } = postAtIndex;
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`); const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid;
res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`);
}
if (!utils.isNumber(topicData.mainPid)) {
// not awaited on purpose so topic loading is not blocked
activitypub.notes.backfill(topicData.mainPid);
}
} }
res.render('topic', topicData); res.render('topic', topicData);