diff --git a/install/package.json b/install/package.json index 607b0352df..cd8659d2d6 100644 --- a/install/package.json +++ b/install/package.json @@ -62,6 +62,7 @@ "connect-redis": "9.0.0", "cookie-parser": "1.4.7", "cron": "4.4.0", + "cronstrue": "3.13.0", "cropperjs": "1.6.2", "csrf-sync": "4.2.1", "daemon": "1.1.0", diff --git a/public/language/en-GB/admin/advanced/jobs.json b/public/language/en-GB/admin/advanced/jobs.json new file mode 100644 index 0000000000..764b7d2b9f --- /dev/null +++ b/public/language/en-GB/admin/advanced/jobs.json @@ -0,0 +1,8 @@ +{ + "jobs": "Jobs", + "job-name": "Job Name", + "schedule": "Schedule", + "next-run": "Next Run", + "last-duration": "Last Duration", + "running": "Running" +} \ No newline at end of file diff --git a/public/language/en-GB/admin/menu.json b/public/language/en-GB/admin/menu.json index 9a2566141e..d55f62045e 100644 --- a/public/language/en-GB/admin/menu.json +++ b/public/language/en-GB/admin/menu.json @@ -78,6 +78,7 @@ "advanced/logs": "Logs", "advanced/errors": "Errors", "advanced/cache": "Cache", + "advanced/jobs": "Jobs", "development/logger": "Logger", "development/info": "Info", diff --git a/public/openapi/read.yaml b/public/openapi/read.yaml index eec723b5c9..94ffd317af 100644 --- a/public/openapi/read.yaml +++ b/public/openapi/read.yaml @@ -186,6 +186,8 @@ paths: $ref: 'read/admin/advanced/cache.yaml' /api/admin/advanced/cache/dump: $ref: 'read/admin/advanced/cache/dump.yaml' + /api/admin/advanced/jobs: + $ref: 'read/admin/advanced/jobs.yaml' /api/admin/development/logger: $ref: 'read/admin/development/logger.yaml' /api/admin/development/info: diff --git a/public/openapi/read/admin/advanced/jobs.yaml b/public/openapi/read/admin/advanced/jobs.yaml new file mode 100644 index 0000000000..5e423ea8e5 --- /dev/null +++ b/public/openapi/read/admin/advanced/jobs.yaml @@ -0,0 +1,35 @@ +get: + tags: + - admin + summary: Get cron job info + responses: + "200": + description: "" + content: + application/json: + schema: + allOf: + - type: object + properties: + jobs: + type: array + items: + type: object + properties: + name: + type: string + cronTime: + type: string + cronTimeHuman: + type: string + nextRun: + type: integer + nextRunISO: + type: integer + duration: + type: integer + durationReadable: + type: string + running: + type: boolean + - $ref: ../../../components/schemas/CommonProps.yaml#/CommonProps \ No newline at end of file diff --git a/src/activitypub/jobs.js b/src/activitypub/jobs.js index 57c2e51221..a0a540e66f 100644 --- a/src/activitypub/jobs.js +++ b/src/activitypub/jobs.js @@ -1,46 +1,55 @@ 'use strict'; -const winston = require('winston'); -const { CronJob } = require('cron'); - const db = require('../database'); const meta = require('../meta'); const topics = require('../topics'); const utils = require('../utils'); +const cron = require('../cron'); + const activitypub = module.parent.exports; const Jobs = module.exports; -Jobs.start = () => { +Jobs.start = async () => { activitypub.helpers.log('[activitypub/jobs] Registering jobs.'); async function tryCronJob(method) { - if (!meta.config.activitypubEnabled) { - return; - } - try { + if (meta.config.activityPubEnabled) { await method(); - } catch (err) { - winston.error(err.stack); } } - new CronJob('0 0 * * *', async () => { - await tryCronJob(async () => { - await activitypub.notes.prune(); - await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); - }); - }, null, true, null, null, false); // change last argument to true for debugging - new CronJob('*/30 * * * *', async () => { - await tryCronJob(activitypub.actors.prune); - }, null, true, null, null, false); // change last argument to true for debugging + await cron.addJob({ + name: 'ap:notes:prune', + cronTime: '0 0 * * *', + runOnInit: false, + onTick: async () => { + await tryCronJob(async () => { + await activitypub.notes.prune(); + await db.sortedSetsRemoveRangeByScore(['activities:datetime'], '-inf', Date.now() - 604800000); + }); + }, + }); - new CronJob('0 * * * * *', async () => { - await tryCronJob(retryFailedMessages); - }, null, true, null, null, false); // change last argument to true for debugging + await cron.addJob({ + name: 'ap:actors:prune', + cronTime: '*/30 * * * *', + runOnInit: false, + onTick: async () => await tryCronJob(activitypub.actors.prune), + }); - new CronJob('15 * * * *', async () => { - await tryCronJob(backfill); - }, null, true, null, null, false); // change last argument to true for debugging + await cron.addJob({ + name: 'ap:retry:send', + cronTime: '0 * * * * *', + runOnInit: false, + onTick: async () => await tryCronJob(retryFailedMessages), + }); + + await cron.addJob({ + name: 'ap:backfill', + cronTime: '15 * * * *', + runOnInit: false, + onTick: async () => await tryCronJob(backfill), + }); }; async function retryFailedMessages() { diff --git a/src/analytics.js b/src/analytics.js index 773f7088af..e054e2e733 100644 --- a/src/analytics.js +++ b/src/analytics.js @@ -1,6 +1,5 @@ 'use strict'; -const cronJob = require('cron').CronJob; const winston = require('winston'); const nconf = require('nconf'); const util = require('util'); @@ -12,6 +11,7 @@ const db = require('./database'); const utils = require('./utils'); const plugins = require('./plugins'); const pubsub = require('./pubsub'); +const cron = require('./cron'); const Analytics = module.exports; @@ -32,19 +32,28 @@ const runJobs = nconf.get('runJobs'); Analytics.pause = false; Analytics.init = async function () { - new cronJob('*/10 * * * * *', (async () => { - if (Analytics.pause) return; - publishLocalAnalytics(); - if (runJobs) { - await sleep(2000); - await Analytics.writeData(); - } - }), null, true); + await cron.addJob({ + name: 'analytics:publish', + cronTime: '*/10 * * * * *', + runOnAllNodes: true, + onTick: async () => { + if (Analytics.pause) return; + publishLocalAnalytics(); + if (runJobs) { + await sleep(2000); + await Analytics.writeData(); + } + }, + }); if (runJobs) { - new cronJob('*/30 * * * *', (async () => { - await db.sortedSetsRemoveRangeByScore(['ip:recent'], '-inf', Date.now() - 172800000); - }), null, true); + await cron.addJob({ + name: 'prune:ip:recent', + cronTime: '*/30 * * * *', + onTick: async () => { + await db.sortedSetsRemoveRangeByScore(['ip:recent'], '-inf', Date.now() - 172800000); + }, + }); } if (runJobs) { diff --git a/src/controllers/admin.js b/src/controllers/admin.js index bdb8ca6ba1..00988e3906 100644 --- a/src/controllers/admin.js +++ b/src/controllers/admin.js @@ -25,6 +25,7 @@ const adminController = { errors: require('./admin/errors'), database: require('./admin/database'), cache: require('./admin/cache'), + jobs: require('./admin/jobs'), plugins: require('./admin/plugins'), settings: require('./admin/settings'), logger: require('./admin/logger'), diff --git a/src/controllers/admin/jobs.js b/src/controllers/admin/jobs.js new file mode 100644 index 0000000000..f59b8f45b0 --- /dev/null +++ b/src/controllers/admin/jobs.js @@ -0,0 +1,12 @@ +'use strict'; + +const jobsController = module.exports; + +const cron = require('../../cron'); + +jobsController.get = async function (req, res) { + const jobs = await cron.getJobs(); + + res.render('admin/advanced/jobs', { jobs }); +}; + diff --git a/src/cron.js b/src/cron.js new file mode 100644 index 0000000000..2f59adf1af --- /dev/null +++ b/src/cron.js @@ -0,0 +1,98 @@ +'use strict'; + +const nconf = require('nconf'); +const { CronJob } = require('cron'); +const cronstrue = require('cronstrue'); +const winston = require('winston'); + +const db = require('./database'); +const utils = require('./utils'); + +const jobs = Object.create(null); + +exports.deleteJobs = async function () { + const jobs = await db.getSortedSetRange('cronJobs', 0, -1); + await db.deleteAll(jobs.map(name => `cronJob:${name}`)); + await db.delete('cronJobs'); +}; + +exports.addJob = async function (options) { + const { + name, + cronTime, + onTick, + onComplete = null, + start = true, + runOnInit = false, + runOnAllNodes = false, + } = options; + + const isJobEnabled = nconf.get('runJobs'); + + if (!isJobEnabled && !runOnAllNodes) { + return; + } + + if (!name || !cronTime || typeof onTick !== 'function') { + throw new Error('[cron] Invalid options'); + } + if (Object.hasOwn(jobs, name)) { + throw new Error('[cron] Job with that name already exists'); + } + + const job = new CronJob(cronTime, async function () { + const start = Date.now(); + try { + await db.setObjectField(`cronJob:${name}`, 'running', 1); + await onTick(); + await db.deleteObjectField(`cronJob:${name}`, 'lastError'); + } catch (err) { + winston.error(`[cron] ${err.stack}`); + await db.setObjectField(`cronJob:${name}`, 'lastError', err.stack); + } finally { + await db.setObject(`cronJob:${name}`, { + running: 0, + duration: Date.now() - start, + nextRun: job.nextDate().toMillis(), + }); + } + }, onComplete, start, null, null, runOnInit); + + jobs[name] = job; + + await db.sortedSetAdd('cronJobs', Date.now(), name); + await db.setObject(`cronJob:${name}`, { + name, + cronTime, + cronTimeHuman: cronstrue.toString(cronTime), + nextRun: job.nextDate().toMillis(), + running: runOnInit ? 1 : 0, + }); + winston.verbose(`[cron/jobs] Registered job: ${name} (${cronTime})`); + return job; +}; + +exports.getJobs = async function () { + const jobNames = await db.getSortedSetRange('cronJobs', 0, -1); + const jobs = await db.getObjects(jobNames.map(name => `cronJob:${name}`)); + jobs.forEach((job) => { + if (job) { + job.running = parseInt(job.running, 10) === 1; + job.duration = job.duration || 0; + job.durationReadable = formatDuration(job.duration); + job.nextRunISO = utils.toISOString(job.nextRun); + } + }); + jobs.sort((a, b) => b.cronTimeHuman.localeCompare(a.cronTimeHuman)); + return jobs; +}; + +function formatDuration(ms) { + const totalSeconds = Math.floor(ms / 1000); + const minutes = Math.floor(totalSeconds / 60); + const seconds = totalSeconds % 60; + if (minutes > 0) { + return `${minutes}m${String(seconds).padStart(2, '0')}s`; + } + return `${seconds}s`; +} diff --git a/src/meta/errors.js b/src/meta/errors.js index d1e11d67d8..e107f336dd 100644 --- a/src/meta/errors.js +++ b/src/meta/errors.js @@ -3,13 +3,13 @@ const nconf = require('nconf'); const winston = require('winston'); const validator = require('validator'); -const cronJob = require('cron').CronJob; const { setTimeout } = require('timers/promises'); const db = require('../database'); const analytics = require('../analytics'); const pubsub = require('../pubsub'); const utils = require('../utils'); +const cron = require('../cron'); const Errors = module.exports; @@ -19,13 +19,18 @@ let counters = {}; let total = {}; Errors.init = async function () { - new cronJob('0 * * * * *', async () => { - publishLocalErrors(); - if (runJobs) { - await setTimeout(2000); - await Errors.writeData(); - } - }, null, true); + await cron.addJob({ + name: 'errors:publish', + cronTime: '0 * * * * *', + runOnAllNodes: true, + onTick: async () => { + publishLocalErrors(); + if (runJobs) { + await setTimeout(2000); + await Errors.writeData(); + } + }, + }); if (runJobs) { pubsub.on('errors:publish', (data) => { diff --git a/src/notifications.js b/src/notifications.js index e52be01609..afe3b675a8 100644 --- a/src/notifications.js +++ b/src/notifications.js @@ -3,7 +3,6 @@ const async = require('async'); const winston = require('winston'); -const cron = require('cron').CronJob; const nconf = require('nconf'); const _ = require('lodash'); @@ -18,6 +17,7 @@ const plugins = require('./plugins'); const utils = require('./utils'); const emailer = require('./emailer'); const ttlCache = require('./cache/ttl'); +const cron = require('./cron'); const Notifications = module.exports; @@ -73,9 +73,13 @@ Notifications.getAllNotificationTypes = async function () { return results.types.concat(results.privilegedTypes); }; -Notifications.startJobs = function () { +Notifications.startJobs = async function () { winston.verbose('[notifications.init] Registering jobs.'); - new cron('*/30 * * * *', Notifications.prune, null, true); + await cron.addJob({ + name: 'notifications:prune', + cronTime: '*/30 * * * *', + onTick: Notifications.prune, + }); }; Notifications.get = async function (nid) { diff --git a/src/plugins/usage.js b/src/plugins/usage.js index f36370b50f..cc9e2433de 100644 --- a/src/plugins/usage.js +++ b/src/plugins/usage.js @@ -3,18 +3,20 @@ const nconf = require('nconf'); const winston = require('winston'); const crypto = require('crypto'); -const cronJob = require('cron').CronJob; const request = require('../request'); +const cron = require('../cron'); const pkg = require('../../package.json'); const meta = require('../meta'); module.exports = function (Plugins) { - Plugins.startJobs = function () { - new cronJob('0 0 0 * * *', (async () => { - await Plugins.submitUsageData(); - }), null, true); + Plugins.startJobs = async function () { + await cron.addJob({ + name: 'plugins:submitUsageData', + cronTime: '0 0 0 * * *', + onTick: Plugins.submitUsageData, + }); }; Plugins.submitUsageData = async function () { diff --git a/src/posts/uploads.js b/src/posts/uploads.js index 372c30ca1e..84b9ce283b 100644 --- a/src/posts/uploads.js +++ b/src/posts/uploads.js @@ -7,7 +7,6 @@ const path = require('path'); const winston = require('winston'); const mime = require('mime'); const validator = require('validator'); -const cronJob = require('cron').CronJob; const chalk = require('chalk'); const db = require('../database'); @@ -16,6 +15,7 @@ const user = require('../user'); const topics = require('../topics'); const file = require('../file'); const meta = require('../meta'); +const cron = require('../cron'); module.exports = function (Posts) { Posts.uploads = {}; @@ -30,18 +30,26 @@ module.exports = function (Posts) { return fullPath.startsWith(pathPrefix) && await file.exists(fullPath) ? filePath : false; }))).filter(Boolean); - const runJobs = nconf.get('runJobs'); - if (runJobs) { - new cronJob('0 2 * * 0', async () => { - const orphans = await Posts.uploads.cleanOrphans(); - if (orphans.length) { - winston.info(`[posts/uploads] Deleting ${orphans.length} orphaned uploads...`); - orphans.forEach((relPath) => { - process.stdout.write(`${chalk.red(' - ')} ${relPath}`); - }); - } - }, null, true); - } + Posts.uploads.startJobs = async function () { + const runJobs = nconf.get('runJobs'); + if (!runJobs) { + return; + } + + await cron.addJob({ + name: 'posts:uploads:cleanupOrphans', + cronTime: '0 2 * * 0', + onTick: async () => { + const orphans = await Posts.uploads.cleanOrphans(); + if (orphans.length) { + winston.info(`[posts/uploads] Deleting ${orphans.length} orphaned uploads...`); + orphans.forEach((relPath) => { + process.stdout.write(`${chalk.red(' - ')} ${relPath}`); + }); + } + }, + }); + }; Posts.uploads.sync = async function (pid) { // Scans a post's content and updates sorted set of uploads diff --git a/src/routes/admin.js b/src/routes/admin.js index 03eb002e5f..86debb305d 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -74,6 +74,7 @@ module.exports = function (app, name, middleware, controllers) { helpers.setupAdminPageRoute(app, `/${name}/advanced/errors`, middlewares, controllers.admin.errors.get); helpers.setupAdminPageRoute(app, `/${name}/advanced/errors/export`, middlewares, controllers.admin.errors.export); helpers.setupAdminPageRoute(app, `/${name}/advanced/cache`, middlewares, controllers.admin.cache.get); + helpers.setupAdminPageRoute(app, `/${name}/advanced/jobs`, middlewares, controllers.admin.jobs.get); helpers.setupAdminPageRoute(app, `/${name}/development/logger`, middlewares, controllers.admin.logger.get); helpers.setupAdminPageRoute(app, `/${name}/development/info`, middlewares, controllers.admin.info.get); diff --git a/src/start.js b/src/start.js index 2928ac6bb6..4e17ed2489 100644 --- a/src/start.js +++ b/src/start.js @@ -35,11 +35,13 @@ start.start = async function () { await sockets.init(webserver.server); if (nconf.get('runJobs')) { - require('./notifications').startJobs(); - require('./user').startJobs(); - require('./plugins').startJobs(); - require('./topics').scheduled.startJobs(); - require('./activitypub').jobs.start(); + await require('./cron').deleteJobs(); + await require('./notifications').startJobs(); + await require('./user').startJobs(); + await require('./plugins').startJobs(); + await require('./topics').scheduled.startJobs(); + await require('./posts').uploads.startJobs(); + await require('./activitypub').jobs.start(); await db.delete('locks'); } diff --git a/src/topics/scheduled.js b/src/topics/scheduled.js index a20109a50d..b6dcba9a74 100644 --- a/src/topics/scheduled.js +++ b/src/topics/scheduled.js @@ -2,7 +2,6 @@ const _ = require('lodash'); const winston = require('winston'); -const { CronJob } = require('cron'); const db = require('../database'); const posts = require('../posts'); @@ -13,18 +12,17 @@ const groups = require('../groups'); const user = require('../user'); const activitypub = require('../activitypub'); const plugins = require('../plugins'); +const cron = require('../cron'); const Scheduled = module.exports; -Scheduled.startJobs = function () { +Scheduled.startJobs = async function () { winston.verbose('[scheduled topics] Starting jobs.'); - new CronJob('*/1 * * * *', async () => { - try { - await Scheduled.handleExpired(); - } catch (err) { - winston.error(err.stack); - } - }, null, true); + await cron.addJob({ + name: 'topics:publish:scheduled', + cronTime: '*/1 * * * *', + onTick: Scheduled.handleExpired, + }); }; Scheduled.handleExpired = async function () { diff --git a/src/user/approval.js b/src/user/approval.js index c0e654438c..c54f057841 100644 --- a/src/user/approval.js +++ b/src/user/approval.js @@ -2,7 +2,6 @@ const validator = require('validator'); const winston = require('winston'); -const cronJob = require('cron').CronJob; const db = require('../database'); const meta = require('../meta'); @@ -14,14 +13,6 @@ const slugify = require('../slugify'); const plugins = require('../plugins'); module.exports = function (User) { - new cronJob('0 * * * *', (async () => { - try { - await User.autoApprove(); - } catch (err) { - winston.error(err.stack); - } - }), null, true); - User.createOrQueue = async function (req, userData, opts = {}) { User.checkUsernameLength(userData.username); const queue = await User.shouldQueueUser(req.ip); diff --git a/src/user/jobs.js b/src/user/jobs.js index 34c797e9e5..2688ba0809 100644 --- a/src/user/jobs.js +++ b/src/user/jobs.js @@ -1,14 +1,14 @@ 'use strict'; const winston = require('winston'); -const cronJob = require('cron').CronJob; const db = require('../database'); const meta = require('../meta'); +const cron = require('../cron'); const jobs = {}; module.exports = function (User) { - User.startJobs = function () { + User.startJobs = async function () { winston.verbose('[user/jobs] (Re-)starting jobs...'); let { digestHour } = meta.config; @@ -22,20 +22,31 @@ module.exports = function (User) { User.stopJobs(); - startDigestJob('digest.daily', `0 ${digestHour} * * *`, 'day'); - startDigestJob('digest.weekly', `0 ${digestHour} * * 0`, 'week'); - startDigestJob('digest.monthly', `0 ${digestHour} 1 * *`, 'month'); + await startDigestJob('digest.daily', `0 ${digestHour} * * *`, 'day'); + await startDigestJob('digest.weekly', `0 ${digestHour} * * 0`, 'week'); + await startDigestJob('digest.monthly', `0 ${digestHour} 1 * *`, 'month'); - jobs['reset.clean'] = new cronJob('0 0 * * *', User.reset.clean, null, true); - winston.verbose('[user/jobs] Starting job (reset.clean)'); + jobs['reset.clean'] = await cron.addJob({ + name: 'user:reset:clean', + cronTime: '0 0 * * *', + onTick: User.reset.clean, + }); + + await cron.addJob({ + name: 'user:autoApprove', + cronTime: '0 * * * *', + onTick: User.autoApprove, + }); winston.verbose(`[user/jobs] jobs started`); }; - function startDigestJob(name, cronString, term) { - jobs[name] = new cronJob(cronString, (async () => { - winston.verbose(`[user/jobs] Digest job (${name}) started.`); - try { + async function startDigestJob(name, cronString, term) { + const newJob = await cron.addJob({ + name, + cronTime: cronString, + onTick: async () => { + winston.verbose(`[user/jobs] Digest job (${name}) started.`); if (name === 'digest.weekly') { const counter = await db.increment('biweeklydigestcounter'); if (counter % 2) { @@ -43,24 +54,27 @@ module.exports = function (User) { } } await User.digest.execute({ interval: term }); - } catch (err) { - winston.error(err.stack); - } - }), null, true); - winston.verbose(`[user/jobs] Starting job (${name})`); + }, + }); + if (newJob) { + jobs[name] = newJob; + } } User.stopJobs = function () { let terminated = 0; // Terminate any active cron jobs - for (const jobId of Object.keys(jobs)) { - winston.verbose(`[user/jobs] Terminating job (${jobId})`); - jobs[jobId].stop(); - delete jobs[jobId]; + for (const [name, job] of Object.entries(jobs)) { + winston.info(`[user/jobs] Terminating job (${name})`); + if (job) { + job.stop(); + delete jobs[name]; + } + terminated += 1; } if (terminated > 0) { - winston.verbose(`[user/jobs] ${terminated} jobs terminated`); + winston.info(`[user/jobs] ${terminated} jobs terminated`); } }; }; diff --git a/src/views/admin/advanced/cache.tpl b/src/views/admin/advanced/cache.tpl index a0efd3ff3e..b7d58b8beb 100644 --- a/src/views/admin/advanced/cache.tpl +++ b/src/views/admin/advanced/cache.tpl @@ -23,7 +23,7 @@ hit ratio hits/sec ttl - + diff --git a/src/views/admin/advanced/jobs.tpl b/src/views/admin/advanced/jobs.tpl new file mode 100644 index 0000000000..4f68840516 --- /dev/null +++ b/src/views/admin/advanced/jobs.tpl @@ -0,0 +1,36 @@ + +
+
+
+

[[admin/advanced/jobs:jobs]]

+
+
+ +
+
+ + + + + + + + + + + + {{{ each jobs }}} + + + + + + + + {{{ end }}} + +
[[admin/advanced/jobs:job-name]][[admin/advanced/jobs:schedule]][[admin/advanced/jobs:next-run]][[admin/advanced/jobs:last-duration]][[admin/advanced/jobs:running]]
{./name}{./cronTimeHuman} ({./cronTime}){./durationReadable}{{{ if ./running }}}Yes{{{ else }}}No{{{ end }}}
+
+
+
+ diff --git a/src/views/admin/partials/navigation.tpl b/src/views/admin/partials/navigation.tpl index 23b6a2c9df..14ceedcbae 100644 --- a/src/views/admin/partials/navigation.tpl +++ b/src/views/admin/partials/navigation.tpl @@ -190,6 +190,7 @@ [[admin/menu:advanced/events]] [[admin/menu:advanced/hooks]] [[admin/menu:advanced/cache]] + [[admin/menu:advanced/jobs]] [[admin/menu:advanced/errors]] [[admin/menu:advanced/logs]] {{{ if env }}} diff --git a/test/user.js b/test/user.js index f35720e217..774588c6da 100644 --- a/test/user.js +++ b/test/user.js @@ -2297,14 +2297,12 @@ describe('User', () => { }); describe('user jobs', () => { - it('should start user jobs', (done) => { - User.startJobs(); - done(); + it('should start user jobs', async () => { + await User.startJobs(); }); - it('should stop user jobs', (done) => { + it('should stop user jobs', async () => { User.stopJobs(); - done(); }); it('should send digest', (done) => {