diff --git a/src/categories/recentreplies.js b/src/categories/recentreplies.js index 8e21647c70..92eed3be56 100644 --- a/src/categories/recentreplies.js +++ b/src/categories/recentreplies.js @@ -191,10 +191,15 @@ module.exports = function (Categories) { const bulkRemove = []; const bulkAdd = []; + const bulkIncr = []; postData.forEach((post) => { bulkRemove.push([`cid:${oldCid}:uid:${post.uid}:pids`, post.pid]); bulkRemove.push([`cid:${oldCid}:uid:${post.uid}:pids:votes`, post.pid]); bulkAdd.push([`cid:${cid}:uid:${post.uid}:pids`, post.timestamp, post.pid]); + bulkIncr.push( + [`uid:${post.uid}:cids`, -1, oldCid], + [`uid:${post.uid}:cids`, 1, cid], + ); if (post.votes > 0 || post.votes < 0) { bulkAdd.push([`cid:${cid}:uid:${post.uid}:pids:votes`, post.votes, post.pid]); } @@ -207,6 +212,7 @@ module.exports = function (Categories) { db.sortedSetAdd(`cid:${cid}:pids`, timestamps, postsToReAdd.map(p => p.pid)), db.sortedSetRemoveBulk(bulkRemove), db.sortedSetAddBulk(bulkAdd), + db.sortedSetIncrByBulk(bulkIncr), ]); }, { batch: 500 }); }; diff --git a/src/controllers/accounts/posts.js b/src/controllers/accounts/posts.js index 9ce048bd28..1535ed0737 100644 --- a/src/controllers/accounts/posts.js +++ b/src/controllers/accounts/posts.js @@ -31,7 +31,8 @@ const templateToData = { noItemsFoundKey: '[[user:has-no-posts]]', crumb: '[[global:posts]]', getSets: async function (callerUid, userData) { - const cids = await categories.getCidsByPrivilege('categories:cid', callerUid, 'topics:read'); + let cids = await db.getSortedSetRangeByScore(`uid:${userData.uid}:cids`, 0, -1, 1, '+inf'); + cids = await privileges.categories.filterCids('topics:read', cids, callerUid); return cids.map(c => `cid:${c}:uid:${userData.uid}:pids`); }, }, @@ -143,7 +144,8 @@ const templateToData = { noItemsFoundKey: '[[user:has-no-topics]]', crumb: '[[global:topics]]', getSets: async function (callerUid, userData) { - const cids = await categories.getCidsByPrivilege('categories:cid', callerUid, 'topics:read'); + let cids = await db.getSortedSetRangeByScore(`uid:${userData.uid}:cids`, 0, -1, 1, '+inf'); + cids = await privileges.categories.filterCids('topics:read', cids, callerUid); return cids.map(c => `cid:${c}:uid:${userData.uid}:tids`); }, }, diff --git a/src/database/helpers.js b/src/database/helpers.js index eb8206f783..d66bfb87a6 100644 --- a/src/database/helpers.js +++ b/src/database/helpers.js @@ -42,3 +42,21 @@ helpers.globToRegex = function (match) { } return _match; }; + +helpers.aggregateIncrByBulk = function (data) { + const buckets = Object.create(null); + + for (const [key, incr, val] of data) { + buckets[key] = buckets[key] || {}; + buckets[key][val] = (buckets[key][val] || 0) + incr; + } + + const result = []; + for (const [key, vals] of Object.entries(buckets)) { + for (const [val, incr] of Object.entries(vals)) { + result.push([key, incr, val]); + } + } + + return result; +}; diff --git a/src/database/mongo/sorted.js b/src/database/mongo/sorted.js index c6f1ee90ce..71c4f4c273 100644 --- a/src/database/mongo/sorted.js +++ b/src/database/mongo/sorted.js @@ -459,16 +459,35 @@ module.exports = function (module) { }; module.sortedSetIncrByBulk = async function (data) { + if (!Array.isArray(data) || !data.length) { + return []; + } + const aggregated = dbHelpers.aggregateIncrByBulk(data); const bulk = module.client.collection('objects').initializeUnorderedBulkOp(); - data.forEach((item) => { + aggregated.forEach((item) => { bulk.find({ _key: item[0], value: helpers.valueToString(item[2]) }) .upsert() .update({ $inc: { score: parseFloat(item[1]) } }); }); - await bulk.execute(); + + try { + await bulk.execute(); + } catch (err) { + // retry failed e11000 operations + if (err.code === 11000 || (err.writeErrors && err.writeErrors.some(e => e.code === 11000))) { + const failedIndices = err.writeErrors.map(e => e.index); + const retryData = failedIndices.map(idx => aggregated[idx]); + await Promise.all(retryData.map( + item => module.sortedSetIncrBy(item[0], item[1], item[2]) + )); + } else { + throw err; + } + } + const result = await module.client.collection('objects').find({ - _key: { $in: _.uniq(data.map(i => i[0])) }, - value: { $in: _.uniq(data.map(i => i[2])) }, + _key: { $in: _.uniq(aggregated.map(i => i[0])) }, + value: { $in: _.uniq(aggregated.map(i => i[2])) }, }, { projection: { _id: 0, _key: 1, value: 1, score: 1 }, }).toArray(); diff --git a/src/database/postgres/sorted.js b/src/database/postgres/sorted.js index 351fe3e059..6332907e43 100644 --- a/src/database/postgres/sorted.js +++ b/src/database/postgres/sorted.js @@ -1,8 +1,10 @@ 'use strict'; module.exports = function (module) { - const helpers = require('./helpers'); const util = require('util'); + + const helpers = require('./helpers'); + const dbHelpers = require('../helpers'); const Cursor = require('pg-cursor'); Cursor.prototype.readAsync = util.promisify(Cursor.prototype.read); const sleep = util.promisify(setTimeout); @@ -547,18 +549,19 @@ RETURNING "score" s`, }; module.sortedSetIncrByBulk = async function (data) { - if (!data.length) { + if (!Array.isArray(data) || !data.length) { return []; } + const aggregated = dbHelpers.aggregateIncrByBulk(data); return await module.transaction(async (client) => { - await helpers.ensureLegacyObjectsType(client, data.map(item => item[0]), 'zset'); + await helpers.ensureLegacyObjectsType(client, aggregated.map(item => item[0]), 'zset'); const values = []; const queryParams = []; let paramIndex = 1; - data.forEach(([key, increment, value]) => { + aggregated.forEach(([key, increment, value]) => { value = helpers.valueToString(value); increment = parseFloat(increment); values.push(key, value, increment); diff --git a/src/database/redis/sorted.js b/src/database/redis/sorted.js index 8433133f15..a29ef75a20 100644 --- a/src/database/redis/sorted.js +++ b/src/database/redis/sorted.js @@ -261,8 +261,12 @@ module.exports = function (module) { }; module.sortedSetIncrByBulk = async function (data) { + if (!Array.isArray(data) || !data.length) { + return []; + } + const aggregated = dbHelpers.aggregateIncrByBulk(data); const multi = module.client.multi(); - data.forEach((item) => { + aggregated.forEach((item) => { multi.zIncrBy(item[0], item[1], String(item[2])); }); const result = await multi.exec(); diff --git a/src/posts/delete.js b/src/posts/delete.js index f6b7aafb37..9235f37111 100644 --- a/src/posts/delete.js +++ b/src/posts/delete.js @@ -96,14 +96,19 @@ module.exports = function (Posts) { async function deleteFromTopicUserNotification(postData) { const bulkRemove = []; + const bulkIncr = []; postData.forEach((p) => { bulkRemove.push([`tid:${p.tid}:posts`, p.pid]); bulkRemove.push([`tid:${p.tid}:posts:votes`, p.pid]); bulkRemove.push([`uid:${p.uid}:posts`, p.pid]); bulkRemove.push([`cid:${p.cid}:uid:${p.uid}:pids`, p.pid]); bulkRemove.push([`cid:${p.cid}:uid:${p.uid}:pids:votes`, p.pid]); + bulkIncr.push([`uid:${p.uid}:cids`, -1, p.cid]); }); - await db.sortedSetRemoveBulk(bulkRemove); + await Promise.all([ + db.sortedSetRemoveBulk(bulkRemove), + db.sortedSetIncrByBulk(bulkIncr), + ]); const localCount = postData.filter(p => utils.isNumber(p.pid)).length; const incrObjectBulk = [['global', { postCount: -localCount }]]; diff --git a/src/posts/user.js b/src/posts/user.js index d096b0fa50..cdb9836bdb 100644 --- a/src/posts/user.js +++ b/src/posts/user.js @@ -150,6 +150,7 @@ module.exports = function (Posts) { const bulkRemove = []; const bulkAdd = []; + const bulkIncr = []; let repChange = 0; const postsByUser = {}; postData.forEach((post, i) => { @@ -164,6 +165,11 @@ module.exports = function (Posts) { if (post.votes > 0 || post.votes < 0) { bulkAdd.push([`cid:${post.cid}:uid:${toUid}:pids:votes`, post.votes, post.pid]); } + + bulkIncr.push( + [`uid:${post.uid}:cids`, -1, post.cid], + [`uid:${toUid}:cids`, 1, post.cid], + ); postsByUser[post.uid] = postsByUser[post.uid] || []; postsByUser[post.uid].push(post); }); @@ -172,6 +178,7 @@ module.exports = function (Posts) { db.setObjectField(pids.map(pid => `post:${pid}`), 'uid', toUid), db.sortedSetRemoveBulk(bulkRemove), db.sortedSetAddBulk(bulkAdd), + db.sortedSetIncrByBulk(bulkIncr), user.incrementUserReputationBy(toUid, repChange), handleMainPidOwnerChange(postData, toUid), updateTopicPosters(postData, toUid), diff --git a/src/topics/fork.js b/src/topics/fork.js index fc8b030f63..22903d112e 100644 --- a/src/topics/fork.js +++ b/src/topics/fork.js @@ -169,6 +169,10 @@ module.exports = function (Topics) { db.sortedSetRemove(removeFrom, postData.pid), db.sortedSetAdd(`cid:${topicData[1].cid}:pids`, postData.timestamp, postData.pid), db.sortedSetAdd(`cid:${topicData[1].cid}:uid:${postData.uid}:pids`, postData.timestamp, postData.pid), + db.sortedSetIncrByBulk([ + [`uid:${postData.uid}:cids`, -1, topicData[0].cid], + [`uid:${postData.uid}:cids`, 1, topicData[1].cid], + ]), ]; if (postData.votes > 0 || postData.votes < 0) { tasks.push(db.sortedSetAdd(`cid:${topicData[1].cid}:uid:${postData.uid}:pids:votes`, postData.votes, postData.pid)); diff --git a/src/upgrades/4.11.0/backfill-user-cids.js b/src/upgrades/4.11.0/backfill-user-cids.js new file mode 100644 index 0000000000..58db2fc194 --- /dev/null +++ b/src/upgrades/4.11.0/backfill-user-cids.js @@ -0,0 +1,38 @@ +'use strict'; + +const db = require('../../database'); +const posts = require('../../posts'); +const batch = require('../../batch'); + +module.exports = { + name: 'Backfill user posted categories', + timestamp: Date.UTC(2026, 2, 20), + method: async function () { + const { progress } = this; + await batch.processSortedSet('posts:pid', async (pids) => { + const postData = await db.getObjectsFields(pids.map(pid => `post:${pid}`), ['uid']); + const cids = await posts.getCidsByPids(pids); + const uidPostCountByCid = Object.create(null); + postData.forEach((post, idx) => { + const cid = cids[idx]; + uidPostCountByCid[post.uid] = uidPostCountByCid[post.uid] || {}; + uidPostCountByCid[post.uid][cid] = (uidPostCountByCid[post.uid][cid] || 0) + 1; + }); + const bulkIncr = []; + Object.keys(uidPostCountByCid).forEach((uid) => { + Object.keys(uidPostCountByCid[uid]).forEach((cid) => { + bulkIncr.push([`uid:${uid}:cids`, uidPostCountByCid[uid][cid], cid]); + }); + }); + + if (bulkIncr.length) { + await db.sortedSetIncrByBulk(bulkIncr); + } + + progress.incr(pids.length); + }, { + batch: 500, + progress, + }); + }, +}; diff --git a/src/user/delete.js b/src/user/delete.js index 5c15c5c660..b13c197f7f 100644 --- a/src/user/delete.js +++ b/src/user/delete.js @@ -126,6 +126,7 @@ module.exports = function (User) { `user:${uid}:usernames`, `user:${uid}:emails`, `uid:${uid}:topics`, `uid:${uid}:posts`, + `uid:${uid}:cids`, `uid:${uid}:chats`, `uid:${uid}:chats:unread`, `uid:${uid}:chat:rooms`, `uid:${uid}:chat:rooms:unread`, diff --git a/src/user/posts.js b/src/user/posts.js index 30ddeaadfc..c8be6f81cd 100644 --- a/src/user/posts.js +++ b/src/user/posts.js @@ -101,6 +101,7 @@ module.exports = function (User) { `uid:${postData.uid}:posts`, `cid:${postData.cid}:uid:${postData.uid}:pids`, ], postData.timestamp, postData.pid); + await db.sortedSetIncrBy(`uid:${postData.uid}:cids`, 1, postData.cid); await User.updatePostCount(postData.uid); }; diff --git a/test/database/sorted.js b/test/database/sorted.js index fde2bafb0b..f1eca291bf 100644 --- a/test/database/sorted.js +++ b/test/database/sorted.js @@ -2,6 +2,7 @@ const assert = require('assert'); const db = require('../mocks/databasemock'); +const utils = require('../../src/utils'); describe('Sorted Set methods', () => { before(async () => { @@ -1070,31 +1071,19 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea }); }); - describe('sortedSetIncrBy()', () => { - it('should create a sorted set with a field set to 1', (done) => { - db.sortedSetIncrBy('sortedIncr', 1, 'field1', function (err, newValue) { - assert.equal(err, null); - assert.equal(arguments.length, 2); - assert.strictEqual(newValue, 1); - db.sortedSetScore('sortedIncr', 'field1', (err, score) => { - assert.equal(err, null); - assert.strictEqual(score, 1); - done(); - }); - }); + describe('sortedSetIncrBy()/sortedSetIncrByBulk()', () => { + it('should create a sorted set with a field set to 1', async () => { + const newValue = await db.sortedSetIncrBy('sortedIncr', 1, 'field1'); + assert.strictEqual(newValue, 1); + const score = await db.sortedSetScore('sortedIncr', 'field1'); + assert.strictEqual(score, 1); }); - it('should increment a field of a sorted set by 5', (done) => { - db.sortedSetIncrBy('sortedIncr', 5, 'field1', function (err, newValue) { - assert.equal(err, null); - assert.equal(arguments.length, 2); - assert.strictEqual(newValue, 6); - db.sortedSetScore('sortedIncr', 'field1', (err, score) => { - assert.equal(err, null); - assert.strictEqual(score, 6); - done(); - }); - }); + it('should increment a field of a sorted set by 5', async () => { + const newValue = await db.sortedSetIncrBy('sortedIncr', 5, 'field1'); + assert.strictEqual(newValue, 6); + const score = await db.sortedSetScore('sortedIncr', 'field1'); + assert.strictEqual(score, 6); }); it('should increment fields of sorted sets with a single call', async () => { @@ -1122,12 +1111,27 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea ); }); + it('should increment the same zset twice', async () => { + const zset = utils.generateUUID(); + const value1 = utils.generateUUID(); + const value2 = utils.generateUUID(); + await db.sortedSetIncrByBulk([ + [zset, 1, value1], + [zset, 1, value2], + ]); + const scores = await Promise.all([ + db.sortedSetScore(zset, value1), + db.sortedSetScore(zset, value2), + ]); + assert.deepStrictEqual(scores, [1, 1]); + }); + it('should increment the same field', async () => { - const data1 = await db.sortedSetIncrByBulk([ + await db.sortedSetIncrByBulk([ ['sortedIncrBulk5', 5, 'value5'], ]); - const data2 = await db.sortedSetIncrByBulk([ + await db.sortedSetIncrByBulk([ ['sortedIncrBulk5', 5, 'value5'], ]); assert.deepStrictEqual( @@ -1137,6 +1141,41 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea ], ); }); + + it('should return empty array', async function () { + const zset = utils.generateUUID(); + const response = await db.sortedSetIncrByBulk(zset, []); + assert(Array.isArray(response)); + assert.strictEqual(response.length, 0); + }); + + it('should aggregate increments to the same key/value pair', async function () { + const zset = utils.generateUUID(); + await db.sortedSetIncrByBulk([ + [zset, 1, 'baz'], + [zset, 1, 'baz'], + [zset, 7, 'baz'], + [zset, 1, 'foo'], + [zset, 3, 'foo'], + [zset, 4, 'foo'], + [zset, 2, 'fizz'], + [zset, 1, 'fizz'], + [zset, -3, 'fizz'], + ]); + const score = await db.sortedSetScores(zset, ['foo', 'baz', 'fizz']); + assert.deepStrictEqual(score, [8, 9, 0]); + }); + + it('should handle parallel increments with same key/value pairs', async function () { + const zset = utils.generateUUID(); + await Promise.all([ + db.sortedSetIncrByBulk([[zset, 1, 'baz']]), + db.sortedSetIncrByBulk([[zset, 1, 'baz']]), + db.sortedSetIncrByBulk([[zset, 1, 'baz']]), + ]); + const score = await db.sortedSetScore(zset, 'baz'); + assert.deepStrictEqual(score, 3); + }); }); diff --git a/test/topics.js b/test/topics.js index f67e0fc40e..767af84901 100644 --- a/test/topics.js +++ b/test/topics.js @@ -1425,10 +1425,13 @@ describe('Topic\'s', () => { const unreadTids = await topics.getUnreadTids({ cid: 0, uid: uid }); await sleep(2000); - const _unreadTids = await topics.getUnreadTids({ cid: 0, uid: uid }); + const [_unreadTids, topicData] = await Promise.all([ + topics.getUnreadTids({ cid: 0, uid: uid }), + topics.getTopicData(result.topicData.tid), + ]); assert( !unreadTids.includes(result.topicData.tid), - JSON.stringify({ unreadTids, _unreadTids, tid: result.topicData.tid }) + JSON.stringify({ unreadTids, _unreadTids, topic: topicData }) ); }); });