mirror of
https://github.com/NodeBB/NodeBB.git
synced 2026-05-06 10:17:15 +02:00
feat: track user cids (#14114)
* feat: start tracking which cids a user has posted to, update account pages' topics/posts view to call this new sorted set re: #14113 * feat: upgrade script for #14113 * fix: cids unavailable in getPostsFields, duh * fix: update sortedSetIncrByBulk in mongo/psql to return early on empty data * fix: remove unused lodash require * test: sortedSetIncrBy and sortedSetIncrByBulk tests * test: who needs null checks anyway * fix: sortedSetIncrByBulk null response * test: aggregate zincrbulk data if there are alot of identical key/value pairs they will be combined into a single row * fix: key name * test: fix test name * lint: fix lint issues * test: negative values should work too * fix: add e11000 handler for incrByBulk * refactor: fix variable name * merge tests with existing zset test, remove dupes * test: return topicData for failing test * delete uid:<uid>:cids on user delete --------- Co-authored-by: Barış Soner Uşaklı <barisusakli@gmail.com>
This commit is contained in:
@@ -191,10 +191,15 @@ module.exports = function (Categories) {
|
||||
|
||||
const bulkRemove = [];
|
||||
const bulkAdd = [];
|
||||
const bulkIncr = [];
|
||||
postData.forEach((post) => {
|
||||
bulkRemove.push([`cid:${oldCid}:uid:${post.uid}:pids`, post.pid]);
|
||||
bulkRemove.push([`cid:${oldCid}:uid:${post.uid}:pids:votes`, post.pid]);
|
||||
bulkAdd.push([`cid:${cid}:uid:${post.uid}:pids`, post.timestamp, post.pid]);
|
||||
bulkIncr.push(
|
||||
[`uid:${post.uid}:cids`, -1, oldCid],
|
||||
[`uid:${post.uid}:cids`, 1, cid],
|
||||
);
|
||||
if (post.votes > 0 || post.votes < 0) {
|
||||
bulkAdd.push([`cid:${cid}:uid:${post.uid}:pids:votes`, post.votes, post.pid]);
|
||||
}
|
||||
@@ -207,6 +212,7 @@ module.exports = function (Categories) {
|
||||
db.sortedSetAdd(`cid:${cid}:pids`, timestamps, postsToReAdd.map(p => p.pid)),
|
||||
db.sortedSetRemoveBulk(bulkRemove),
|
||||
db.sortedSetAddBulk(bulkAdd),
|
||||
db.sortedSetIncrByBulk(bulkIncr),
|
||||
]);
|
||||
}, { batch: 500 });
|
||||
};
|
||||
|
||||
@@ -31,7 +31,8 @@ const templateToData = {
|
||||
noItemsFoundKey: '[[user:has-no-posts]]',
|
||||
crumb: '[[global:posts]]',
|
||||
getSets: async function (callerUid, userData) {
|
||||
const cids = await categories.getCidsByPrivilege('categories:cid', callerUid, 'topics:read');
|
||||
let cids = await db.getSortedSetRangeByScore(`uid:${userData.uid}:cids`, 0, -1, 1, '+inf');
|
||||
cids = await privileges.categories.filterCids('topics:read', cids, callerUid);
|
||||
return cids.map(c => `cid:${c}:uid:${userData.uid}:pids`);
|
||||
},
|
||||
},
|
||||
@@ -143,7 +144,8 @@ const templateToData = {
|
||||
noItemsFoundKey: '[[user:has-no-topics]]',
|
||||
crumb: '[[global:topics]]',
|
||||
getSets: async function (callerUid, userData) {
|
||||
const cids = await categories.getCidsByPrivilege('categories:cid', callerUid, 'topics:read');
|
||||
let cids = await db.getSortedSetRangeByScore(`uid:${userData.uid}:cids`, 0, -1, 1, '+inf');
|
||||
cids = await privileges.categories.filterCids('topics:read', cids, callerUid);
|
||||
return cids.map(c => `cid:${c}:uid:${userData.uid}:tids`);
|
||||
},
|
||||
},
|
||||
|
||||
@@ -42,3 +42,21 @@ helpers.globToRegex = function (match) {
|
||||
}
|
||||
return _match;
|
||||
};
|
||||
|
||||
helpers.aggregateIncrByBulk = function (data) {
|
||||
const buckets = Object.create(null);
|
||||
|
||||
for (const [key, incr, val] of data) {
|
||||
buckets[key] = buckets[key] || {};
|
||||
buckets[key][val] = (buckets[key][val] || 0) + incr;
|
||||
}
|
||||
|
||||
const result = [];
|
||||
for (const [key, vals] of Object.entries(buckets)) {
|
||||
for (const [val, incr] of Object.entries(vals)) {
|
||||
result.push([key, incr, val]);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
};
|
||||
|
||||
@@ -459,16 +459,35 @@ module.exports = function (module) {
|
||||
};
|
||||
|
||||
module.sortedSetIncrByBulk = async function (data) {
|
||||
if (!Array.isArray(data) || !data.length) {
|
||||
return [];
|
||||
}
|
||||
const aggregated = dbHelpers.aggregateIncrByBulk(data);
|
||||
const bulk = module.client.collection('objects').initializeUnorderedBulkOp();
|
||||
data.forEach((item) => {
|
||||
aggregated.forEach((item) => {
|
||||
bulk.find({ _key: item[0], value: helpers.valueToString(item[2]) })
|
||||
.upsert()
|
||||
.update({ $inc: { score: parseFloat(item[1]) } });
|
||||
});
|
||||
await bulk.execute();
|
||||
|
||||
try {
|
||||
await bulk.execute();
|
||||
} catch (err) {
|
||||
// retry failed e11000 operations
|
||||
if (err.code === 11000 || (err.writeErrors && err.writeErrors.some(e => e.code === 11000))) {
|
||||
const failedIndices = err.writeErrors.map(e => e.index);
|
||||
const retryData = failedIndices.map(idx => aggregated[idx]);
|
||||
await Promise.all(retryData.map(
|
||||
item => module.sortedSetIncrBy(item[0], item[1], item[2])
|
||||
));
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
const result = await module.client.collection('objects').find({
|
||||
_key: { $in: _.uniq(data.map(i => i[0])) },
|
||||
value: { $in: _.uniq(data.map(i => i[2])) },
|
||||
_key: { $in: _.uniq(aggregated.map(i => i[0])) },
|
||||
value: { $in: _.uniq(aggregated.map(i => i[2])) },
|
||||
}, {
|
||||
projection: { _id: 0, _key: 1, value: 1, score: 1 },
|
||||
}).toArray();
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
'use strict';
|
||||
|
||||
module.exports = function (module) {
|
||||
const helpers = require('./helpers');
|
||||
const util = require('util');
|
||||
|
||||
const helpers = require('./helpers');
|
||||
const dbHelpers = require('../helpers');
|
||||
const Cursor = require('pg-cursor');
|
||||
Cursor.prototype.readAsync = util.promisify(Cursor.prototype.read);
|
||||
const sleep = util.promisify(setTimeout);
|
||||
@@ -547,18 +549,19 @@ RETURNING "score" s`,
|
||||
};
|
||||
|
||||
module.sortedSetIncrByBulk = async function (data) {
|
||||
if (!data.length) {
|
||||
if (!Array.isArray(data) || !data.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const aggregated = dbHelpers.aggregateIncrByBulk(data);
|
||||
return await module.transaction(async (client) => {
|
||||
await helpers.ensureLegacyObjectsType(client, data.map(item => item[0]), 'zset');
|
||||
await helpers.ensureLegacyObjectsType(client, aggregated.map(item => item[0]), 'zset');
|
||||
|
||||
const values = [];
|
||||
const queryParams = [];
|
||||
let paramIndex = 1;
|
||||
|
||||
data.forEach(([key, increment, value]) => {
|
||||
aggregated.forEach(([key, increment, value]) => {
|
||||
value = helpers.valueToString(value);
|
||||
increment = parseFloat(increment);
|
||||
values.push(key, value, increment);
|
||||
|
||||
@@ -261,8 +261,12 @@ module.exports = function (module) {
|
||||
};
|
||||
|
||||
module.sortedSetIncrByBulk = async function (data) {
|
||||
if (!Array.isArray(data) || !data.length) {
|
||||
return [];
|
||||
}
|
||||
const aggregated = dbHelpers.aggregateIncrByBulk(data);
|
||||
const multi = module.client.multi();
|
||||
data.forEach((item) => {
|
||||
aggregated.forEach((item) => {
|
||||
multi.zIncrBy(item[0], item[1], String(item[2]));
|
||||
});
|
||||
const result = await multi.exec();
|
||||
|
||||
@@ -96,14 +96,19 @@ module.exports = function (Posts) {
|
||||
|
||||
async function deleteFromTopicUserNotification(postData) {
|
||||
const bulkRemove = [];
|
||||
const bulkIncr = [];
|
||||
postData.forEach((p) => {
|
||||
bulkRemove.push([`tid:${p.tid}:posts`, p.pid]);
|
||||
bulkRemove.push([`tid:${p.tid}:posts:votes`, p.pid]);
|
||||
bulkRemove.push([`uid:${p.uid}:posts`, p.pid]);
|
||||
bulkRemove.push([`cid:${p.cid}:uid:${p.uid}:pids`, p.pid]);
|
||||
bulkRemove.push([`cid:${p.cid}:uid:${p.uid}:pids:votes`, p.pid]);
|
||||
bulkIncr.push([`uid:${p.uid}:cids`, -1, p.cid]);
|
||||
});
|
||||
await db.sortedSetRemoveBulk(bulkRemove);
|
||||
await Promise.all([
|
||||
db.sortedSetRemoveBulk(bulkRemove),
|
||||
db.sortedSetIncrByBulk(bulkIncr),
|
||||
]);
|
||||
|
||||
const localCount = postData.filter(p => utils.isNumber(p.pid)).length;
|
||||
const incrObjectBulk = [['global', { postCount: -localCount }]];
|
||||
|
||||
@@ -150,6 +150,7 @@ module.exports = function (Posts) {
|
||||
|
||||
const bulkRemove = [];
|
||||
const bulkAdd = [];
|
||||
const bulkIncr = [];
|
||||
let repChange = 0;
|
||||
const postsByUser = {};
|
||||
postData.forEach((post, i) => {
|
||||
@@ -164,6 +165,11 @@ module.exports = function (Posts) {
|
||||
if (post.votes > 0 || post.votes < 0) {
|
||||
bulkAdd.push([`cid:${post.cid}:uid:${toUid}:pids:votes`, post.votes, post.pid]);
|
||||
}
|
||||
|
||||
bulkIncr.push(
|
||||
[`uid:${post.uid}:cids`, -1, post.cid],
|
||||
[`uid:${toUid}:cids`, 1, post.cid],
|
||||
);
|
||||
postsByUser[post.uid] = postsByUser[post.uid] || [];
|
||||
postsByUser[post.uid].push(post);
|
||||
});
|
||||
@@ -172,6 +178,7 @@ module.exports = function (Posts) {
|
||||
db.setObjectField(pids.map(pid => `post:${pid}`), 'uid', toUid),
|
||||
db.sortedSetRemoveBulk(bulkRemove),
|
||||
db.sortedSetAddBulk(bulkAdd),
|
||||
db.sortedSetIncrByBulk(bulkIncr),
|
||||
user.incrementUserReputationBy(toUid, repChange),
|
||||
handleMainPidOwnerChange(postData, toUid),
|
||||
updateTopicPosters(postData, toUid),
|
||||
|
||||
@@ -169,6 +169,10 @@ module.exports = function (Topics) {
|
||||
db.sortedSetRemove(removeFrom, postData.pid),
|
||||
db.sortedSetAdd(`cid:${topicData[1].cid}:pids`, postData.timestamp, postData.pid),
|
||||
db.sortedSetAdd(`cid:${topicData[1].cid}:uid:${postData.uid}:pids`, postData.timestamp, postData.pid),
|
||||
db.sortedSetIncrByBulk([
|
||||
[`uid:${postData.uid}:cids`, -1, topicData[0].cid],
|
||||
[`uid:${postData.uid}:cids`, 1, topicData[1].cid],
|
||||
]),
|
||||
];
|
||||
if (postData.votes > 0 || postData.votes < 0) {
|
||||
tasks.push(db.sortedSetAdd(`cid:${topicData[1].cid}:uid:${postData.uid}:pids:votes`, postData.votes, postData.pid));
|
||||
|
||||
38
src/upgrades/4.11.0/backfill-user-cids.js
Normal file
38
src/upgrades/4.11.0/backfill-user-cids.js
Normal file
@@ -0,0 +1,38 @@
|
||||
'use strict';
|
||||
|
||||
const db = require('../../database');
|
||||
const posts = require('../../posts');
|
||||
const batch = require('../../batch');
|
||||
|
||||
module.exports = {
|
||||
name: 'Backfill user posted categories',
|
||||
timestamp: Date.UTC(2026, 2, 20),
|
||||
method: async function () {
|
||||
const { progress } = this;
|
||||
await batch.processSortedSet('posts:pid', async (pids) => {
|
||||
const postData = await db.getObjectsFields(pids.map(pid => `post:${pid}`), ['uid']);
|
||||
const cids = await posts.getCidsByPids(pids);
|
||||
const uidPostCountByCid = Object.create(null);
|
||||
postData.forEach((post, idx) => {
|
||||
const cid = cids[idx];
|
||||
uidPostCountByCid[post.uid] = uidPostCountByCid[post.uid] || {};
|
||||
uidPostCountByCid[post.uid][cid] = (uidPostCountByCid[post.uid][cid] || 0) + 1;
|
||||
});
|
||||
const bulkIncr = [];
|
||||
Object.keys(uidPostCountByCid).forEach((uid) => {
|
||||
Object.keys(uidPostCountByCid[uid]).forEach((cid) => {
|
||||
bulkIncr.push([`uid:${uid}:cids`, uidPostCountByCid[uid][cid], cid]);
|
||||
});
|
||||
});
|
||||
|
||||
if (bulkIncr.length) {
|
||||
await db.sortedSetIncrByBulk(bulkIncr);
|
||||
}
|
||||
|
||||
progress.incr(pids.length);
|
||||
}, {
|
||||
batch: 500,
|
||||
progress,
|
||||
});
|
||||
},
|
||||
};
|
||||
@@ -126,6 +126,7 @@ module.exports = function (User) {
|
||||
`user:${uid}:usernames`,
|
||||
`user:${uid}:emails`,
|
||||
`uid:${uid}:topics`, `uid:${uid}:posts`,
|
||||
`uid:${uid}:cids`,
|
||||
`uid:${uid}:chats`, `uid:${uid}:chats:unread`,
|
||||
`uid:${uid}:chat:rooms`,
|
||||
`uid:${uid}:chat:rooms:unread`,
|
||||
|
||||
@@ -101,6 +101,7 @@ module.exports = function (User) {
|
||||
`uid:${postData.uid}:posts`,
|
||||
`cid:${postData.cid}:uid:${postData.uid}:pids`,
|
||||
], postData.timestamp, postData.pid);
|
||||
await db.sortedSetIncrBy(`uid:${postData.uid}:cids`, 1, postData.cid);
|
||||
await User.updatePostCount(postData.uid);
|
||||
};
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
const assert = require('assert');
|
||||
const db = require('../mocks/databasemock');
|
||||
const utils = require('../../src/utils');
|
||||
|
||||
describe('Sorted Set methods', () => {
|
||||
before(async () => {
|
||||
@@ -1070,31 +1071,19 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea
|
||||
});
|
||||
});
|
||||
|
||||
describe('sortedSetIncrBy()', () => {
|
||||
it('should create a sorted set with a field set to 1', (done) => {
|
||||
db.sortedSetIncrBy('sortedIncr', 1, 'field1', function (err, newValue) {
|
||||
assert.equal(err, null);
|
||||
assert.equal(arguments.length, 2);
|
||||
assert.strictEqual(newValue, 1);
|
||||
db.sortedSetScore('sortedIncr', 'field1', (err, score) => {
|
||||
assert.equal(err, null);
|
||||
assert.strictEqual(score, 1);
|
||||
done();
|
||||
});
|
||||
});
|
||||
describe('sortedSetIncrBy()/sortedSetIncrByBulk()', () => {
|
||||
it('should create a sorted set with a field set to 1', async () => {
|
||||
const newValue = await db.sortedSetIncrBy('sortedIncr', 1, 'field1');
|
||||
assert.strictEqual(newValue, 1);
|
||||
const score = await db.sortedSetScore('sortedIncr', 'field1');
|
||||
assert.strictEqual(score, 1);
|
||||
});
|
||||
|
||||
it('should increment a field of a sorted set by 5', (done) => {
|
||||
db.sortedSetIncrBy('sortedIncr', 5, 'field1', function (err, newValue) {
|
||||
assert.equal(err, null);
|
||||
assert.equal(arguments.length, 2);
|
||||
assert.strictEqual(newValue, 6);
|
||||
db.sortedSetScore('sortedIncr', 'field1', (err, score) => {
|
||||
assert.equal(err, null);
|
||||
assert.strictEqual(score, 6);
|
||||
done();
|
||||
});
|
||||
});
|
||||
it('should increment a field of a sorted set by 5', async () => {
|
||||
const newValue = await db.sortedSetIncrBy('sortedIncr', 5, 'field1');
|
||||
assert.strictEqual(newValue, 6);
|
||||
const score = await db.sortedSetScore('sortedIncr', 'field1');
|
||||
assert.strictEqual(score, 6);
|
||||
});
|
||||
|
||||
it('should increment fields of sorted sets with a single call', async () => {
|
||||
@@ -1122,12 +1111,27 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea
|
||||
);
|
||||
});
|
||||
|
||||
it('should increment the same zset twice', async () => {
|
||||
const zset = utils.generateUUID();
|
||||
const value1 = utils.generateUUID();
|
||||
const value2 = utils.generateUUID();
|
||||
await db.sortedSetIncrByBulk([
|
||||
[zset, 1, value1],
|
||||
[zset, 1, value2],
|
||||
]);
|
||||
const scores = await Promise.all([
|
||||
db.sortedSetScore(zset, value1),
|
||||
db.sortedSetScore(zset, value2),
|
||||
]);
|
||||
assert.deepStrictEqual(scores, [1, 1]);
|
||||
});
|
||||
|
||||
it('should increment the same field', async () => {
|
||||
const data1 = await db.sortedSetIncrByBulk([
|
||||
await db.sortedSetIncrByBulk([
|
||||
['sortedIncrBulk5', 5, 'value5'],
|
||||
]);
|
||||
|
||||
const data2 = await db.sortedSetIncrByBulk([
|
||||
await db.sortedSetIncrByBulk([
|
||||
['sortedIncrBulk5', 5, 'value5'],
|
||||
]);
|
||||
assert.deepStrictEqual(
|
||||
@@ -1137,6 +1141,41 @@ NUMERIC)-- WsPn&query[cid]=-1&parentCid=0&selectedCids[]=-1&privilege=topics:rea
|
||||
],
|
||||
);
|
||||
});
|
||||
|
||||
it('should return empty array', async function () {
|
||||
const zset = utils.generateUUID();
|
||||
const response = await db.sortedSetIncrByBulk(zset, []);
|
||||
assert(Array.isArray(response));
|
||||
assert.strictEqual(response.length, 0);
|
||||
});
|
||||
|
||||
it('should aggregate increments to the same key/value pair', async function () {
|
||||
const zset = utils.generateUUID();
|
||||
await db.sortedSetIncrByBulk([
|
||||
[zset, 1, 'baz'],
|
||||
[zset, 1, 'baz'],
|
||||
[zset, 7, 'baz'],
|
||||
[zset, 1, 'foo'],
|
||||
[zset, 3, 'foo'],
|
||||
[zset, 4, 'foo'],
|
||||
[zset, 2, 'fizz'],
|
||||
[zset, 1, 'fizz'],
|
||||
[zset, -3, 'fizz'],
|
||||
]);
|
||||
const score = await db.sortedSetScores(zset, ['foo', 'baz', 'fizz']);
|
||||
assert.deepStrictEqual(score, [8, 9, 0]);
|
||||
});
|
||||
|
||||
it('should handle parallel increments with same key/value pairs', async function () {
|
||||
const zset = utils.generateUUID();
|
||||
await Promise.all([
|
||||
db.sortedSetIncrByBulk([[zset, 1, 'baz']]),
|
||||
db.sortedSetIncrByBulk([[zset, 1, 'baz']]),
|
||||
db.sortedSetIncrByBulk([[zset, 1, 'baz']]),
|
||||
]);
|
||||
const score = await db.sortedSetScore(zset, 'baz');
|
||||
assert.deepStrictEqual(score, 3);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
|
||||
@@ -1425,10 +1425,13 @@ describe('Topic\'s', () => {
|
||||
const unreadTids = await topics.getUnreadTids({ cid: 0, uid: uid });
|
||||
|
||||
await sleep(2000);
|
||||
const _unreadTids = await topics.getUnreadTids({ cid: 0, uid: uid });
|
||||
const [_unreadTids, topicData] = await Promise.all([
|
||||
topics.getUnreadTids({ cid: 0, uid: uid }),
|
||||
topics.getTopicData(result.topicData.tid),
|
||||
]);
|
||||
assert(
|
||||
!unreadTids.includes(result.topicData.tid),
|
||||
JSON.stringify({ unreadTids, _unreadTids, tid: result.topicData.tid })
|
||||
JSON.stringify({ unreadTids, _unreadTids, topic: topicData })
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user