fix: wrap batch processing behind setImmediate() so that it runs on the next iteration of the event loop

This commit is contained in:
Julian Lam
2026-04-02 12:16:50 -04:00
parent 6371528f2a
commit 531d20af3e

View File

@@ -439,35 +439,37 @@ ActivityPub.send = async (type, id, targets, payload) => {
interval: numCores === 1 ? 500 : 100,
};
const keyData = await ActivityPub.getPrivateKey(type, id);
batch.processArray(inboxes, async (inboxBatch) => {
const retryQueueAdd = [];
const retryQueuedSet = [];
setImmediate(() => {
batch.processArray(inboxes, async (inboxBatch) => {
const retryQueueAdd = [];
const retryQueuedSet = [];
await Promise.all(inboxBatch.map(async (uri) => {
const ok = await ActivityPub._sendMessage(uri, keyData, payload, digest);
if (!ok) {
const queueId = crypto.createHash('sha256').update(`${type}:${id}:${uri}`).digest('hex');
const nextTryOn = Date.now() + oneMinute;
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
queueId,
uri,
id,
type,
attempts: 1,
timestamp: nextTryOn,
payload: JSON.stringify(payload),
}]);
await Promise.all(inboxBatch.map(async (uri) => {
const ok = await ActivityPub._sendMessage(uri, keyData, payload, digest);
if (!ok) {
const queueId = crypto.createHash('sha256').update(`${type}:${id}:${uri}`).digest('hex');
const nextTryOn = Date.now() + oneMinute;
retryQueueAdd.push(['ap:retry:queue', nextTryOn, queueId]);
retryQueuedSet.push([`ap:retry:queue:${queueId}`, {
queueId,
uri,
id,
type,
attempts: 1,
timestamp: nextTryOn,
payload: JSON.stringify(payload),
}]);
}
}));
if (retryQueueAdd.length) {
await Promise.all([
db.sortedSetAddBulk(retryQueueAdd),
db.setObjectBulk(retryQueuedSet),
]);
}
}));
if (retryQueueAdd.length) {
await Promise.all([
db.sortedSetAddBulk(retryQueueAdd),
db.setObjectBulk(retryQueuedSet),
]);
}
}, batchSettings).catch(err => winston.error(err.stack));
}, batchSettings).catch(err => winston.error(err.stack));
});
};
ActivityPub.record = {};