diff --git a/src/activitypub/jobs.js b/src/activitypub/jobs.js index 1ba0dd57ce..57c2e51221 100644 --- a/src/activitypub/jobs.js +++ b/src/activitypub/jobs.js @@ -5,6 +5,8 @@ const { CronJob } = require('cron'); const db = require('../database'); const meta = require('../meta'); +const topics = require('../topics'); +const utils = require('../utils'); const activitypub = module.parent.exports; const Jobs = module.exports; @@ -34,7 +36,11 @@ Jobs.start = () => { new CronJob('0 * * * * *', async () => { await tryCronJob(retryFailedMessages); - }, null, true, null, null, false); + }, null, true, null, null, false); // change last argument to true for debugging + + new CronJob('15 * * * *', async () => { + await tryCronJob(backfill); + }, null, true, null, null, false); // change last argument to true for debugging }; async function retryFailedMessages() { @@ -87,3 +93,22 @@ async function retryFailedMessages() { db.deleteAll(queueIdsToRemove.map(id => `ap:retry:queue:${id}`)), ]); } + +async function backfill() { + const start = 0; + const stop = meta.config.topicsPerPage - 1; + const sorted = await topics.getSortedTopics({ + term: 'day', + sort: 'posts', + uid: 0, + start, + stop, + }); + + // Remote mainPids only + const pids = sorted.topics + .map(({ mainPid }) => mainPid) + .filter(pid => !utils.isNumber(pid)); + + await activitypub.notes.backfill(pids); +} diff --git a/src/activitypub/notes.js b/src/activitypub/notes.js index a66adb0dc6..01ead62530 100644 --- a/src/activitypub/notes.js +++ b/src/activitypub/notes.js @@ -16,11 +16,19 @@ const user = require('../user'); const topics = require('../topics'); const posts = require('../posts'); const api = require('../api'); +const ttlCache = require('../cache/ttl'); +const websockets = require('../socket.io'); const utils = require('../utils'); const activitypub = module.parent.exports; const Notes = module.exports; +const backfillCache = ttlCache({ + name: 'ap-backfill-cache', + max: 500, + ttl: 1000 * 60 * 2, // 2 minutes +}); + Notes._normalizeTags = async (tag, cid) => { const systemTags = (meta.config.systemTags || '').split(','); const maxTags = await categories.getCategoryField(cid, 'maxTags'); @@ -255,17 +263,31 @@ Notes.assert = async (uid, input, options = { skipChecks: false }) => { } } + let added = []; await Promise.all(unprocessed.map(async (post) => { const { to, cc } = post._activitypub; try { - await topics.reply(post); + const postData = await topics.reply(post); + added.push(postData); await Notes.updateLocalRecipients(post.pid, { to, cc }); } catch (e) { activitypub.helpers.log(`[activitypub/notes.assert] Could not add reply (${post.pid}): ${e.message}`); } })); + if (added.length) { + // Because replies are added in parallel, `index` is calculated incorrectly + added = added + .sort((a, b) => a.timestamp - b.timestamp) + .map((post, idx) => { + post.index = post.index - idx; + return post; + }) + .reverse(); + websockets.in(`topic_${tid}`).emit('event:new_post', { posts: added }); + } + await Notes.syncUserInboxes(tid, uid); if (crosspostCid) { @@ -584,6 +606,22 @@ Notes.getCategoryFollowers = async (cid) => { return uids; }; +Notes.backfill = async (pids) => { + if (!Array.isArray(pids)) { + pids = [pids]; + } + + return Promise.all(pids.map(async (pid) => { + if (backfillCache.has(pid)) { + console.log('cache hit, not proactively backfilling'); + return; + } + + await Notes.assert(0, pid, { skipChecks: 1 }); + backfillCache.set(pid, 1); + })); +}; + Notes.announce = {}; Notes.announce.list = async ({ pid, tid }) => { diff --git a/src/controllers/topics.js b/src/controllers/topics.js index 08958ce730..74ec15c80f 100644 --- a/src/controllers/topics.js +++ b/src/controllers/topics.js @@ -14,6 +14,7 @@ const helpers = require('./helpers'); const pagination = require('../pagination'); const utils = require('../utils'); const analytics = require('../analytics'); +const activitypub = require('../activitypub'); const topicsController = module.exports; @@ -142,11 +143,18 @@ topicsController.get = async function getTopic(req, res, next) { res.locals.linkTags.push(rel); }); - if (meta.config.activitypubEnabled && postAtIndex) { - // Include link header for richer parsing - const { pid } = postAtIndex; - const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid; - res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`); + if (meta.config.activitypubEnabled) { + if (postAtIndex) { + // Include link header for richer parsing + const { pid } = postAtIndex; + const href = utils.isNumber(pid) ? `${nconf.get('url')}/post/${pid}` : pid; + res.set('Link', `<${href}>; rel="alternate"; type="application/activity+json"`); + } + + if (!utils.isNumber(topicData.mainPid)) { + // not awaited on purpose so topic loading is not blocked + activitypub.notes.backfill(topicData.mainPid); + } } res.render('topic', topicData);