diff --git a/src/activitypub/index.js b/src/activitypub/index.js index 5531196d0b..0e46941bb7 100644 --- a/src/activitypub/index.js +++ b/src/activitypub/index.js @@ -13,6 +13,7 @@ const ttl = require('../cache/ttl'); const lru = require('../cache/lru'); const batch = require('../batch'); const pubsub = require('../pubsub'); +const analytics = require('../analytics'); const requestCache = ttl({ ttl: 1000 * 60 * 5 }); // 5 minutes const ActivityPub = module.exports; @@ -322,3 +323,14 @@ ActivityPub.send = async (type, id, targets, payload) => { }, ); }; + +ActivityPub.record = async ({ id, type, actor }) => { + const now = Date.now(); + const { hostname } = new URL(actor); + + await Promise.all([ + db.sortedSetAdd(`activities:datetime`, now, id), + db.sortedSetAdd('domains:lastSeen', now, hostname), + analytics.increment(['activities', `activities:byType:${type}`, `activities:byHost:${hostname}`]), + ]); +}; diff --git a/src/controllers/activitypub/index.js b/src/controllers/activitypub/index.js index e21e81633a..a700f9ca48 100644 --- a/src/controllers/activitypub/index.js +++ b/src/controllers/activitypub/index.js @@ -122,7 +122,8 @@ Controller.postInbox = async (req, res) => { try { await activitypub.inbox[method](req); - helpers.formatApiResponse(200, res); + await activitypub.record(req.body); + helpers.formatApiResponse(202, res); } catch (e) { helpers.formatApiResponse(500, res, e); } diff --git a/src/middleware/activitypub.js b/src/middleware/activitypub.js index d2bda3a57a..aa8c0c9e26 100644 --- a/src/middleware/activitypub.js +++ b/src/middleware/activitypub.js @@ -32,6 +32,22 @@ middleware.assertS2S = async function (req, res, next) { middleware.validate = async function (req, res, next) { winston.verbose('[middleware/activitypub] Validating incoming payload...'); + + // Sanity-check payload schema + const required = ['id', 'type', 'actor', 'object']; + if (!required.every(prop => req.body.hasOwnProperty(prop))) { + winston.verbose('[middleware/activitypub] Request body missing required properties.'); + return res.sendStatus(400); + } + winston.verbose('[middleware/activitypub] Request body check passed.'); + + // History check + const seen = await db.isSortedSetMember('activities:datetime', req.body.id); + if (seen) { + winston.verbose(`[middleware/activitypub] Activity already seen, ignoring (${req.body.id}).`); + return res.sendStatus(200); + } + // Checks the validity of the incoming payload against the sender and rejects on failure const verified = await activitypub.verify(req); if (!verified) { @@ -40,14 +56,6 @@ middleware.validate = async function (req, res, next) { } winston.verbose('[middleware/activitypub] HTTP signature verification passed.'); - // Sanity-check payload schema - const required = ['type', 'actor', 'object']; - if (!required.every(prop => req.body.hasOwnProperty(prop))) { - winston.verbose('[middleware/activitypub] Request body missing required properties.'); - return res.sendStatus(400); - } - winston.verbose('[middleware/activitypub] Request body check passed.'); - let { actor, object } = req.body; // Actor normalization diff --git a/test/activitypub.js b/test/activitypub.js index 34ec335e66..c22bfe7165 100644 --- a/test/activitypub.js +++ b/test/activitypub.js @@ -361,7 +361,7 @@ describe('ActivityPub integration', () => { }); }); - describe('Serving of local assets to remote clients', () => { + describe('Serving of local assets to remote clients (mocking)', () => { describe('Note', () => { let cid; let uid; diff --git a/test/activitypub/analytics.js b/test/activitypub/analytics.js new file mode 100644 index 0000000000..6d68b6f165 --- /dev/null +++ b/test/activitypub/analytics.js @@ -0,0 +1,154 @@ +'use strict'; + +const nconf = require('nconf'); +const assert = require('assert'); + +const db = require('../../src/database'); +const controllers = require('../../src/controllers'); +const middleware = require('../../src/middleware'); +const activitypub = require('../../src/activitypub'); +const utils = require('../../src/utils'); +const user = require('../../src/user'); +const categories = require('../../src/categories'); +const topics = require('../../src/topics'); +const analytics = require('../../src/analytics'); +const api = require('../../src/api'); + +describe('Analytics', () => { + let cid; + let uid; + let postData; + + before(async () => { + nconf.set('runJobs', 1); + ({ cid } = await categories.create({ name: utils.generateUUID().slice(0, 8) })); + const remoteUser = { + '@context': 'https://www.w3.org/ns/activitystreams', + id: 'https://example.org/user/foobar', + url: 'https://example.org/user/foobar', + + type: 'Person', + name: 'Foo Bar', + preferredUsername: 'foobar', + publicKey: { + id: 'https://example.org/user/foobar#key', + owner: 'https://example.org/user/foobar', + publicKeyPem: 'publickey', + }, + }; + activitypub._cache.set(`0;https://example.org/user/foobar`, remoteUser); + }); + + after(async () => { + nconf.set('runJobs', undefined); + }); + + beforeEach(async () => { + uid = await user.create({ username: utils.generateUUID().slice(0, 8) }); + ({ postData } = await topics.post({ + uid, + cid, + title: utils.generateUUID(), + content: utils.generateUUID(), + })); + }); + + it('should record the incoming activity if successfully processed', async () => { + const id = `https://example.org/activity/${utils.generateUUID()}`; + await controllers.activitypub.postInbox({ + body: { + id, + type: 'Like', + actor: 'https://example.org/user/foobar', + object: { + type: 'Note', + id: `${nconf.get('url')}/post/${postData.pid}`, + }, + }, + }, { sendStatus: () => {} }); + const processed = await db.isSortedSetMember('activities:datetime', id); + + assert(processed); + }); + + it('should not process the activity if received again', async () => { + // Specifically, the controller would update the score, but the request should be caught in middlewares and ignored + const id = `https://example.org/activity/${utils.generateUUID()}`; + await controllers.activitypub.postInbox({ + body: { + id, + type: 'Like', + actor: 'https://example.org/user/foobar', + object: { + type: 'Note', + id: `${nconf.get('url')}/post/${postData.pid}`, + }, + }, + }, { sendStatus: () => {} }); + + await middleware.activitypub.validate({ + body: { + id, + type: 'Like', + actor: 'https://example.org/user/foobar', + object: { + type: 'Note', + id: `${nconf.get('url')}/post/${postData.pid}`, + }, + }, + }, { + sendStatus: (statusCode) => { + assert.strictEqual(statusCode, 200); + }, + }); + }); + + it('should increment the last seen time of that domain', async () => { + const id = `https://example.org/activity/${utils.generateUUID()}`; + const before = await db.sortedSetScore('domains:lastSeen', 'example.org'); + await controllers.activitypub.postInbox({ + body: { + id, + type: 'Like', + actor: 'https://example.org/user/foobar', + object: { + type: 'Note', + id: `${nconf.get('url')}/post/${postData.pid}`, + }, + }, + }, { sendStatus: () => {} }); + + const after = await db.sortedSetScore('domains:lastSeen', 'example.org'); + + assert(before && after); + assert(before < after); + }); + + it('should increment various metrics', async () => { + let counters; + ({ counters } = analytics.peek()); + const before = { ...counters }; + + const id = `https://example.org/activity/${utils.generateUUID()}`; + await controllers.activitypub.postInbox({ + body: { + id, + type: 'Like', + actor: 'https://example.org/user/foobar', + object: { + type: 'Note', + id: `${nconf.get('url')}/post/${postData.pid}`, + }, + }, + }, { sendStatus: () => {} }); + + ({ counters } = analytics.peek()); + const after = { ...counters }; + + const metrics = ['activities', 'activities:byType:Like', 'activities:byHost:example.org']; + metrics.forEach((metric) => { + assert(before[metric] && after[metric]); + assert(before[metric] < after[metric]); + }); + }); +});