From 6c7e6144175dc12621b03a5ab06ff9d89d68c82a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bar=C4=B1=C5=9F=20Soner=20U=C5=9Fakl=C4=B1?= Date: Thu, 26 Oct 2023 10:23:27 -0400 Subject: [PATCH] feat: batch.processSortedSet min/max (#12129) * feat: batch.processSortedSet min/max * test if this works --- src/batch.js | 14 +++++++++++++- src/database/mongo/sorted.js | 12 +++++++++++- src/database/postgres/sorted.js | 6 +++++- test/batch.js | 21 +++++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/batch.js b/src/batch.js index edc4038055..48c6571cd4 100644 --- a/src/batch.js +++ b/src/batch.js @@ -39,11 +39,23 @@ exports.processSortedSet = async function (setKey, process, options) { if (process && process.constructor && process.constructor.name !== 'AsyncFunction') { process = util.promisify(process); } + const method = options.reverse ? 'getSortedSetRevRange' : 'getSortedSetRange'; + const isByScore = (options.min && options.min !== '-inf') || (options.max && options.max !== '+inf'); + const byScore = isByScore ? 'ByScore' : ''; + const withScores = options.withScores ? 'WithScores' : ''; let iteration = 1; + const getFn = db[`${method}${byScore}${withScores}`]; while (true) { /* eslint-disable no-await-in-loop */ - const ids = await db[`${method}${options.withScores ? 'WithScores' : ''}`](setKey, start, stop); + const ids = await getFn( + setKey, + start, + isByScore ? stop - start + 1 : stop, + options.reverse ? options.max : options.min, + options.reverse ? options.min : options.max, + ); + if (!ids.length || options.doneIf(start, stop, ids)) { return; } diff --git a/src/database/mongo/sorted.js b/src/database/mongo/sorted.js index eec65ef982..0b5036b064 100644 --- a/src/database/mongo/sorted.js +++ b/src/database/mongo/sorted.js @@ -568,7 +568,17 @@ module.exports = function (module) { if (!options.withScores) { project.score = 0; } - const cursor = await module.client.collection('objects').find({ _key: setKey }, { projection: project }) + const query = { _key: setKey }; + if (options.min && options.min !== '-inf') { + query.score = { $gte: parseFloat(options.min) }; + } + if (options.max && options.max !== '+inf') { + query.score = query.score || {}; + query.score.$lte = parseFloat(options.max); + } + + const cursor = await module.client.collection('objects') + .find(query, { projection: project }) .sort({ score: sort }) .batchSize(options.batch); diff --git a/src/database/postgres/sorted.js b/src/database/postgres/sorted.js index 2b707a3a7d..5e3b6a65aa 100644 --- a/src/database/postgres/sorted.js +++ b/src/database/postgres/sorted.js @@ -665,6 +665,8 @@ SELECT z."value", const client = await module.pool.connect(); const batchSize = (options || {}).batch || 100; const sort = options.reverse ? 'DESC' : 'ASC'; + const min = options.min && options.min !== '-inf' ? options.min : null; + const max = options.max && options.max !== '+inf' ? options.max : null; const cursor = client.query(new Cursor(` SELECT z."value", z."score" FROM "legacy_object_live" o @@ -672,7 +674,9 @@ SELECT z."value", z."score" ON o."_key" = z."_key" AND o."type" = z."type" WHERE o."_key" = $1::TEXT - ORDER BY z."score" ${sort}, z."value" ${sort}`, [setKey])); + AND (z."score" >= $2::NUMERIC OR $2::NUMERIC IS NULL) + AND (z."score" <= $3::NUMERIC OR $3::NUMERIC IS NULL) + ORDER BY z."score" ${sort}, z."value" ${sort}`, [setKey, min, max])); if (process && process.constructor && process.constructor.name !== 'AsyncFunction') { process = util.promisify(process); diff --git a/test/batch.js b/test/batch.js index fe11b72883..f7dad73f5a 100644 --- a/test/batch.js +++ b/test/batch.js @@ -77,6 +77,27 @@ describe('batch', () => { assert.strictEqual(total, 490); }); + it('should process sorted set with min/max scores', async () => { + await db.sortedSetAddBulk([ + ['processByScore', 1, 'item1'], + ['processByScore', 2, 'item2'], + ['processByScore', 3, 'item3'], + ['processByScore', 3, 'item4'], + ['processByScore', 4, 'item5'], + ['processByScore', 5, 'item6'], + ]); + const result = []; + await batch.processSortedSet('processByScore', async (items) => { + result.push(...items); + }, { + min: 3, + max: 4, + }); + assert(result.includes('item3')); + assert(result.includes('item4')); + assert(result.includes('item5')); + }); + it('should process array with callbacks', (done) => { let total = 0; batch.processArray(scores, (nums, next) => {