refactor: replace old createSign/createVerify methods with more modern sign/verify, called asynchronously, add lru cache to public key fetches so that a received activity does not kick off a network request unnecessarily

This commit is contained in:
Julian Lam
2026-04-02 11:39:33 -04:00
parent be44247339
commit 448a76e1c4
2 changed files with 30 additions and 21 deletions

View File

@@ -2,8 +2,11 @@
const nconf = require('nconf');
const winston = require('winston');
const { createHash, createSign, createVerify, getHashes } = require('crypto');
const { createHash, createVerify, getHashes, sign, verify } = require('crypto');
const { cpus } = require('os');
const { promisify } = require('util');
const signAsync = promisify(sign);
const verifyAsync = promisify(verify);
const request = require('../request');
const db = require('../database');
@@ -14,6 +17,7 @@ const messaging = require('../messaging');
const user = require('../user');
const utils = require('../utils');
const ttl = require('../cache/ttl');
const lru = require('../cache/lru');
const batch = require('../batch');
const analytics = require('../analytics');
const crypto = require('crypto');
@@ -32,6 +36,11 @@ const probeRateLimit = ttl({
name: 'ap-probe-rate-limit-cache',
ttl: 1000 * 3, // 3 seconds
});
const publicKeyCache = lru({
name: 'ap-public-key-cache',
ttl: 1000 * 60 * 60 * 6, // 6 hours
max: 1000,
});
const ActivityPub = module.exports;
@@ -195,14 +204,21 @@ ActivityPub.getPrivateKey = async (type, id) => {
};
ActivityPub.fetchPublicKey = async (uri) => {
const cached = publicKeyCache.get(uri);
if (cached) {
return cached;
}
// Used for retrieving the public key from the passed-in keyId uri
const body = await ActivityPub.get('uid', 0, uri);
if (body.hasOwnProperty('publicKeyPem')) {
// CryptographicKey returned (correct)
publicKeyCache.set(uri, body.publicKeyPem);
return body.publicKeyPem;
} else if (body.hasOwnProperty('publicKey') && body?.publicKey?.publicKeyPem) {
// Actor object returned (less correct)
publicKeyCache.set(uri, body.publicKey.publicKeyPem);
return body.publicKey.publicKeyPem;
}
@@ -228,10 +244,8 @@ ActivityPub.sign = async ({ key, keyId }, url, payload) => {
}
// Sign string using private key
let signature = createSign('sha256');
signature.update(signed_string);
signature.end();
signature = signature.sign(key, 'base64');
let signature = await signAsync('sha256', Buffer.from(signed_string), key);
signature = signature.toString('base64');
// Construct signature header
return {
@@ -295,12 +309,7 @@ ActivityPub.verify = async (req) => {
ActivityPub.helpers.log(`[activitypub/verify] Retrieving pubkey for ${keyId}`);
const publicKeyPem = await ActivityPub.fetchPublicKey(keyId);
const verify = createVerify('sha256');
verify.update(signed_string);
verify.end();
ActivityPub.helpers.log('[activitypub/verify] Attempting signed string verification');
const verified = verify.verify(publicKeyPem, signature, 'base64');
return verified;
return await verifyAsync('sha256', Buffer.from(signed_string), publicKeyPem, Buffer.from(signature, 'base64'));
} catch (e) {
ActivityPub.helpers.log('[activitypub/verify] Failed, key retrieval or verification failure.');
return false;
@@ -367,9 +376,8 @@ ActivityPub.get = async (type, id, uri, options) => {
}
};
ActivityPub._sendMessage = async function (uri, id, type, payload) {
ActivityPub._sendMessage = async function (uri, keyData, payload) {
try {
const keyData = await ActivityPub.getPrivateKey(type, id);
const headers = await ActivityPub.sign(keyData, uri, payload);
const { response, body } = await request.post(uri, {
@@ -427,14 +435,17 @@ ActivityPub.send = async (type, id, targets, payload) => {
const oneMinute = 1000 * 60;
const numCores = cpus().length;
const batchSize = Math.max(8, numCores * 8);
const interval = numCores === 1 ? 500 : 100;
const batchSettings = {
batch: Math.max(8, numCores * 8),
interval: numCores === 1 ? 500 : 100,
};
const keyData = await ActivityPub.getPrivateKey(type, id);
batch.processArray(inboxes, async (inboxBatch) => {
const retryQueueAdd = [];
const retryQueuedSet = [];
await Promise.all(inboxBatch.map(async (uri) => {
const ok = await ActivityPub._sendMessage(uri, id, type, payload);
const ok = await ActivityPub._sendMessage(uri, keyData, payload);
if (!ok) {
const queueId = crypto.createHash('sha256').update(`${type}:${id}:${uri}`).digest('hex');
const nextTryOn = Date.now() + oneMinute;
@@ -457,10 +468,7 @@ ActivityPub.send = async (type, id, targets, payload) => {
db.setObjectBulk(retryQueuedSet),
]);
}
}, {
batch: batchSize,
interval,
}).catch(err => winston.error(err.stack));
}, batchSettings).catch(err => winston.error(err.stack));
};
ActivityPub.record = {};

View File

@@ -92,7 +92,8 @@ async function retryFailedMessages() {
queueIdsToRemove.push(queueId);
return;
}
const ok = await activitypub._sendMessage(uri, id, type, payloadObj);
const keyData = await activitypub.getPrivateKey(type, id); // keyData could be moved higher up (optimization)
const ok = await activitypub._sendMessage(uri, keyData, payloadObj);
if (ok) {
queueIdsToRemove.push(queueId);
} else {