initial improved schedular functionality

Signed-off-by: Andy Miller <rhuk@mac.com>
This commit is contained in:
Andy Miller
2025-08-24 20:27:28 +01:00
parent d07f3770bc
commit 56cc894c1d
11 changed files with 2744 additions and 68 deletions

View File

@@ -55,7 +55,7 @@
"league/climate": "^3.6",
"miljar/php-exif": "^0.6",
"composer/ca-bundle": "^1.2",
"dragonmantank/cron-expression": "^1.2",
"dragonmantank/cron-expression": "^3.3",
"willdurand/negotiation": "^3.0",
"itsgoingd/clockwork": "^5.0",
"symfony/http-client": "^4.4",

View File

@@ -4,74 +4,708 @@ form:
validation: loose
fields:
scheduler_tabs:
type: tabs
active: 1
status_title:
type: section
title: PLUGIN_ADMIN.SCHEDULER_STATUS
underline: true
fields:
status_tab:
type: tab
title: PLUGIN_ADMIN.SCHEDULER_STATUS
status:
type: cronstatus
validate:
type: commalist
fields:
status_title:
type: section
title: PLUGIN_ADMIN.SCHEDULER_STATUS
underline: true
jobs_title:
type: section
title: PLUGIN_ADMIN.SCHEDULER_JOBS
underline: true
status:
type: cronstatus
validate:
type: commalist
webhook_status_override:
type: display
label:
content: |
<script>
(function() {
function updateSchedulerStatus() {
// Find all notice bars
var notices = document.querySelectorAll('.notice');
var webhookStatusChecked = false;
// Check for modern scheduler and webhook settings
fetch(window.location.origin + '/grav-editor-pro/scheduler/health')
.then(response => response.json())
.then(data => {
if (data.webhook_enabled) {
notices.forEach(function(notice) {
if (notice.textContent.includes('Not Enabled for user:')) {
// This is the cron status notice - replace it
notice.className = 'notice info';
notice.innerHTML = '<i class="fa fa-fw fa-check-circle"></i> <strong>Webhook Active</strong> - Scheduler can be triggered via webhook. Cron is not configured.';
}
});
// Also update the main status if it exists
var statusDiv = document.querySelector('.cronstatus-status');
if (statusDiv && statusDiv.textContent.includes('Not Enabled')) {
statusDiv.className = 'cronstatus-status success';
statusDiv.innerHTML = '<i class="fa fa-fw fa-check"></i> Webhook Ready';
}
}
})
.catch(error => {
console.log('Webhook status check failed:', error);
});
}
// Run on page load
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', updateSchedulerStatus);
} else {
updateSchedulerStatus();
}
// Also run after a short delay to catch any late-rendered elements
setTimeout(updateSchedulerStatus, 500);
})();
</script>
markdown: false
status_enhanced:
type: display
label:
content: |
<script>
document.addEventListener('DOMContentLoaded', function() {
// Check if modern features are enabled
var modernEnabled = document.querySelector('[name="data[scheduler][modern][enabled]"]:checked');
var webhookEnabled = document.querySelector('[name="data[scheduler][modern][webhook][enabled]"]:checked');
var statusDiv = document.querySelector('.cronstatus-status');
// Also find the parent notice bar
var noticeBar = document.querySelector('.notice.alert');
if (statusDiv) {
var currentStatus = statusDiv.textContent || statusDiv.innerText;
var cronReady = currentStatus.includes('Ready');
var cronNotEnabled = currentStatus.includes('Not Enabled');
// Check if scheduler-webhook plugin exists
var webhookPluginInstalled = false;
fetch(window.location.origin + '/grav-editor-pro/scheduler/health')
.then(response => response.json())
.then(data => {
webhookPluginInstalled = true;
updateStatusDisplay(data);
})
.catch(error => {
updateStatusDisplay(null);
});
function updateStatusDisplay(healthData) {
var isModernEnabled = modernEnabled && modernEnabled.value == '1';
var isWebhookEnabled = webhookEnabled && webhookEnabled.value == '1';
var isWebhookReady = webhookPluginInstalled && isWebhookEnabled && healthData && healthData.webhook_enabled;
// Update the main status text
var mainStatusText = '';
var mainStatusClass = '';
if (cronReady && isWebhookReady) {
mainStatusText = 'Cron and Webhook Ready';
mainStatusClass = 'success';
} else if (cronReady) {
mainStatusText = 'Cron Ready';
mainStatusClass = 'success';
} else if (isWebhookReady) {
mainStatusText = 'Webhook Ready (No Cron)';
mainStatusClass = 'success'; // Changed from warning to success
} else if (cronNotEnabled && !isWebhookReady) {
mainStatusText = 'Not Configured';
mainStatusClass = 'error';
} else {
mainStatusText = 'Configuration Pending';
mainStatusClass = 'warning';
}
// Update the notice bar if webhooks are ready
if (noticeBar && isWebhookReady) {
// Change from error (red) to success (green) or info (blue)
noticeBar.classList.remove('alert');
noticeBar.classList.add('info');
var noticeIcon = noticeBar.querySelector('i.fa');
if (noticeIcon) {
noticeIcon.classList.remove('fa-times-circle');
noticeIcon.classList.add('fa-check-circle');
}
var noticeText = noticeBar.querySelector('strong') || noticeBar;
var username = noticeText.textContent.match(/user:\s*(\w+)/);
if (username) {
noticeText.innerHTML = 'Webhook Ready for user: <b>' + username[1] + '</b> (Cron not configured)';
} else {
noticeText.innerHTML = mainStatusText;
}
}
// Update the main status div
if (statusDiv) {
statusDiv.innerHTML = '<i class="fa fa-fw fa-' +
(mainStatusClass === 'success' ? 'check' : mainStatusClass === 'warning' ? 'exclamation' : 'times') +
'"></i> ' + mainStatusText;
statusDiv.className = 'cronstatus-status ' + mainStatusClass;
}
// Update install instructions button/content
var installButton = document.querySelector('.cronstatus-install-button');
var installDiv = document.querySelector('.cronstatus-install');
if (installDiv) {
var installHtml = '<div class="alert alert-info">';
installHtml += '<h4>Setup Instructions:</h4>';
var hasInstructions = false;
// Cron setup
if (!cronReady) {
installHtml += '<p><strong>Option 1: Traditional Cron</strong><br>';
installHtml += 'Run: <code>bin/grav scheduler --install</code><br>';
installHtml += 'This will add a cron job that runs every minute.</p>';
hasInstructions = true;
}
// Webhook setup
if (isModernEnabled) {
if (!webhookPluginInstalled) {
installHtml += '<p><strong>Option 2: Webhook Support</strong><br>';
installHtml += '1. Install plugin: <code>bin/gpm install scheduler-webhook</code><br>';
installHtml += '2. Configure webhook token in Modern Features tab<br>';
installHtml += '3. Use webhook URL in your CI/CD or cloud scheduler</p>';
hasInstructions = true;
} else if (!isWebhookEnabled) {
installHtml += '<p><strong>Webhook Plugin Installed</strong><br>';
installHtml += 'Enable webhooks in Modern Features tab and set a secure token.</p>';
hasInstructions = true;
} else if (isWebhookReady) {
installHtml += '<p><strong>✅ Webhook is Active!</strong><br>';
installHtml += 'Trigger URL: <code>' + window.location.origin + '/grav-editor-pro/scheduler/webhook</code><br>';
installHtml += 'Use with Authorization header: <code>Bearer YOUR_TOKEN</code></p>';
if (!cronReady) {
installHtml += '<p class="text-muted"><small>Note: No cron job configured. Scheduler runs only via webhook triggers.</small></p>';
}
}
}
if (!hasInstructions && cronReady) {
installHtml += '<p><strong>✅ Cron is configured and ready!</strong><br>';
installHtml += 'The scheduler runs automatically every minute via system cron.</p>';
if (!isModernEnabled) {
installHtml += '<p class="text-muted"><small>Enable Modern Features for webhook support and advanced options.</small></p>';
}
}
installHtml += '</div>';
installDiv.innerHTML = installHtml;
// Update button text based on status
if (installButton) {
if (cronReady && isWebhookReady) {
installButton.innerHTML = '<i class="fa fa-info-circle"></i> Configuration Details';
} else if (cronReady || isWebhookReady) {
installButton.innerHTML = '<i class="fa fa-plus-circle"></i> Add More Triggers';
} else {
installButton.innerHTML = '<i class="fa fa-exclamation-triangle"></i> Install Instructions';
}
}
}
}
}
});
</script>
custom_jobs:
type: list
style: vertical
label:
classes: cron-job-list compact
key: id
fields:
.id:
type: key
label: ID
placeholder: 'process-name'
validate:
required: true
pattern: '[a-zа-я0-9_\-]+'
max: 20
message: 'ID must be lowercase with dashes/underscores only and less than 20 characters'
.command:
type: text
label: PLUGIN_ADMIN.COMMAND
placeholder: 'ls'
validate:
required: true
.args:
type: text
label: PLUGIN_ADMIN.EXTRA_ARGUMENTS
placeholder: '-lah'
.at:
type: text
wrapper_classes: cron-selector
label: PLUGIN_ADMIN.SCHEDULER_RUNAT
help: PLUGIN_ADMIN.SCHEDULER_RUNAT_HELP
placeholder: '* * * * *'
validate:
required: true
.output:
type: text
label: PLUGIN_ADMIN.SCHEDULER_OUTPUT
help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_HELP
placeholder: 'logs/ls-cron.out'
.output_mode:
type: select
label: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE
help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE_HELP
default: append
options:
append: Append
overwrite: Overwrite
.email:
type: text
label: PLUGIN_ADMIN.SCHEDULER_EMAIL
help: PLUGIN_ADMIN.SCHEDULER_EMAIL_HELP
placeholder: 'notifications@yoursite.com'
modern_status:
type: conditional
condition: config.scheduler.modern.enabled
fields:
modern_health:
type: display
label: Health Status
content: |
<div id="scheduler-health-status">
<div class="text-muted">Checking health...</div>
</div>
<script>
(function() {
function loadHealthStatus() {
fetch(window.location.origin + '/grav-editor-pro/scheduler/health')
.then(response => response.json())
.then(data => {
var statusEl = document.getElementById('scheduler-health-status');
if (!statusEl) return;
var html = '<div class="scheduler-health-info">';
// Status badge
var badge = 'secondary';
if (data.status === 'healthy') badge = 'success';
else if (data.status === 'warning') badge = 'warning';
else if (data.status === 'critical') badge = 'danger';
html += '<p><strong>Status:</strong> <span class="badge badge-' + badge + '">' +
(data.status || 'Unknown').toUpperCase() + '</span></p>';
// Last run
if (data.last_run) {
var age = data.last_run_age;
var ageText = 'just now';
if (age > 3600) {
ageText = Math.floor(age / 3600) + ' hour(s) ago';
} else if (age > 60) {
ageText = Math.floor(age / 60) + ' minute(s) ago';
} else if (age > 0) {
ageText = age + ' second(s) ago';
}
html += '<p><strong>Last Run:</strong> ' + ageText + '</p>';
} else {
html += '<p><strong>Last Run:</strong> Never</p>';
}
// Jobs count
html += '<p><strong>Scheduled Jobs:</strong> ' + (data.scheduled_jobs || 0) + '</p>';
// Queue size (if modern features enabled)
if (data.modern_features && data.queue_size !== undefined) {
html += '<p><strong>Queue Size:</strong> ' + data.queue_size + '</p>';
}
// Failed jobs
if (data.failed_jobs_24h > 0) {
html += '<p class="text-danger"><strong>Failed (24h):</strong> ' + data.failed_jobs_24h + '</p>';
}
html += '</div>';
statusEl.innerHTML = html;
})
.catch(error => {
var statusEl = document.getElementById('scheduler-health-status');
if (statusEl) {
statusEl.innerHTML = '<div class="alert alert-warning">Unable to fetch health status. Ensure scheduler-webhook plugin is installed.</div>';
}
});
}
// Load on page ready
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', loadHealthStatus);
} else {
loadHealthStatus();
}
// Refresh every 30 seconds
setInterval(loadHealthStatus, 30000);
})();
</script>
markdown: false
trigger_methods:
type: display
label: Active Triggers
content: |
<div id="scheduler-triggers">
<div class="text-muted">Checking triggers...</div>
</div>
<script>
(function() {
function loadTriggers() {
// Check cron status from the main status field
var cronReady = false;
var statusDiv = document.querySelector('.cronstatus-status');
if (statusDiv) {
var statusText = statusDiv.textContent || statusDiv.innerText;
cronReady = statusText.includes('Ready');
}
// Check webhook status
fetch(window.location.origin + '/grav-editor-pro/scheduler/health')
.then(response => response.json())
.then(data => {
var triggersEl = document.getElementById('scheduler-triggers');
if (!triggersEl) return;
var html = '<ul class="list-unstyled">';
// Cron status
if (cronReady) {
html += '<li>✅ <strong>Cron:</strong> <span class="badge badge-success">Active</span></li>';
} else {
html += '<li>❌ <strong>Cron:</strong> <span class="badge badge-secondary">Not Configured</span></li>';
}
// Webhook status
if (data.webhook_enabled) {
html += '<li>✅ <strong>Webhook:</strong> <span class="badge badge-success">Active</span></li>';
} else {
var modernEnabled = document.querySelector('[name="data[scheduler][modern][enabled]"]:checked');
if (modernEnabled && modernEnabled.value == '1') {
html += '<li>⚠️ <strong>Webhook:</strong> <span class="badge badge-warning">Disabled</span></li>';
}
}
html += '</ul>';
// Add warning if no triggers active
if (!cronReady && !data.webhook_enabled) {
html += '<div class="alert alert-warning">No triggers active! Configure cron or enable webhooks.</div>';
}
triggersEl.innerHTML = html;
})
.catch(error => {
var triggersEl = document.getElementById('scheduler-triggers');
if (triggersEl) {
// Show just cron status if health endpoint not available
var html = '<ul class="list-unstyled">';
if (cronReady) {
html += '<li>✅ <strong>Cron:</strong> <span class="badge badge-success">Active</span></li>';
} else {
html += '<li>❌ <strong>Cron:</strong> <span class="badge badge-secondary">Not Configured</span></li>';
}
html += '<li>⚠️ <strong>Webhook:</strong> <span class="badge badge-secondary">Plugin Not Installed</span></li>';
html += '</ul>';
triggersEl.innerHTML = html;
}
});
}
// Load on page ready
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', loadTriggers);
} else {
loadTriggers();
}
})();
</script>
markdown: false
jobs_tab:
type: tab
title: PLUGIN_ADMIN.SCHEDULER_JOBS
fields:
jobs_title:
type: section
title: PLUGIN_ADMIN.SCHEDULER_JOBS
underline: true
custom_jobs:
type: list
style: vertical
label:
classes: cron-job-list compact
key: id
fields:
.id:
type: key
label: ID
placeholder: 'process-name'
validate:
required: true
pattern: '[a-zа-я0-9_\-]+'
max: 20
message: 'ID must be lowercase with dashes/underscores only and less than 20 characters'
.command:
type: text
label: PLUGIN_ADMIN.COMMAND
placeholder: 'ls'
validate:
required: true
.args:
type: text
label: PLUGIN_ADMIN.EXTRA_ARGUMENTS
placeholder: '-lah'
.at:
type: text
wrapper_classes: cron-selector
label: PLUGIN_ADMIN.SCHEDULER_RUNAT
help: PLUGIN_ADMIN.SCHEDULER_RUNAT_HELP
placeholder: '* * * * *'
validate:
required: true
.output:
type: text
label: PLUGIN_ADMIN.SCHEDULER_OUTPUT
help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_HELP
placeholder: 'logs/ls-cron.out'
.output_mode:
type: select
label: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE
help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE_HELP
default: append
options:
append: Append
overwrite: Overwrite
.email:
type: text
label: PLUGIN_ADMIN.SCHEDULER_EMAIL
help: PLUGIN_ADMIN.SCHEDULER_EMAIL_HELP
placeholder: 'notifications@yoursite.com'
modern_tab:
type: tab
title: Modern Features
fields:
modern.enabled:
type: toggle
label: Enable Modern Scheduler
help: Enable enhanced scheduler features (job queue, retry, webhooks, monitoring)
highlight: 0
default: 0
options:
1: PLUGIN_ADMIN.ENABLED
0: PLUGIN_ADMIN.DISABLED
validate:
type: bool
modern_features:
type: conditional
condition: config.scheduler.modern.enabled
fields:
workers_section:
type: section
title: Worker Configuration
underline: true
fields:
modern.workers:
type: number
label: Concurrent Workers
help: Number of jobs that can run simultaneously (1 = sequential)
default: 1
size: x-small
append: workers
validate:
type: int
min: 1
max: 10
retry_section:
type: section
title: Retry Configuration
underline: true
fields:
modern.retry.enabled:
type: toggle
label: Enable Job Retry
help: Automatically retry failed jobs
highlight: 1
default: 1
options:
1: PLUGIN_ADMIN.ENABLED
0: PLUGIN_ADMIN.DISABLED
validate:
type: bool
modern.retry.max_attempts:
type: number
label: Maximum Retry Attempts
help: Maximum number of times to retry a failed job
default: 3
size: x-small
append: retries
validate:
type: int
min: 1
max: 10
modern.retry.backoff:
type: select
label: Retry Backoff Strategy
help: How to calculate delay between retries
default: exponential
options:
linear: Linear (fixed delay)
exponential: Exponential (increasing delay)
queue_section:
type: section
title: Queue Configuration
underline: true
fields:
modern.queue.path:
type: text
label: Queue Storage Path
help: Where to store queued jobs
default: 'user-data://scheduler/queue'
placeholder: 'user-data://scheduler/queue'
modern.queue.max_size:
type: number
label: Maximum Queue Size
help: Maximum number of jobs that can be queued
default: 1000
size: x-small
append: jobs
validate:
type: int
min: 100
max: 10000
history_section:
type: section
title: Job History
underline: true
fields:
modern.history.enabled:
type: toggle
label: Enable Job History
help: Track execution history for all jobs
highlight: 1
default: 1
options:
1: PLUGIN_ADMIN.ENABLED
0: PLUGIN_ADMIN.DISABLED
validate:
type: bool
modern.history.retention_days:
type: number
label: History Retention (days)
help: How long to keep job history
default: 30
size: x-small
append: days
validate:
type: int
min: 1
max: 365
webhook_section:
type: section
title: Webhook Configuration
underline: true
fields:
webhook_plugin_notice:
type: display
label:
content: |
<div class="alert alert-warning">
<strong>Plugin Required:</strong> The <code>scheduler-webhook</code> plugin must be installed and enabled for webhook functionality to work.
<br><br>
Install with: <code>bin/gpm install scheduler-webhook</code>
</div>
markdown: false
modern.webhook.enabled:
type: toggle
label: Enable Webhook Triggers
help: Allow triggering scheduler via HTTP webhook
highlight: 0
default: 0
options:
1: PLUGIN_ADMIN.ENABLED
0: PLUGIN_ADMIN.DISABLED
validate:
type: bool
modern.webhook.token:
type: text
label: Webhook Security Token
help: Secret token for authenticating webhook requests. Keep this secret!
placeholder: 'Click Generate to create a secure token'
webhook_token_generate:
type: display
label:
content: |
<button type="button" class="button" onclick="generateWebhookToken()">Generate Token</button>
<script>
function generateWebhookToken() {
const array = new Uint8Array(32);
crypto.getRandomValues(array);
const token = Array.from(array, byte => byte.toString(16).padStart(2, '0')).join('');
const field = document.querySelector('[name="data[scheduler][modern][webhook][token]"]');
if (field) {
field.value = token;
field.dispatchEvent(new Event('change', { bubbles: true }));
}
}
</script>
markdown: false
modern.webhook.path:
type: text
label: Webhook Path
help: URL path for webhook endpoint
default: '/scheduler/webhook'
placeholder: '/scheduler/webhook'
health_section:
type: section
title: Health Check Configuration
underline: true
fields:
modern.health.enabled:
type: toggle
label: Enable Health Check
help: Provide health status endpoint for monitoring
highlight: 1
default: 1
options:
1: PLUGIN_ADMIN.ENABLED
0: PLUGIN_ADMIN.DISABLED
validate:
type: bool
modern.health.path:
type: text
label: Health Check Path
help: URL path for health check endpoint
default: '/scheduler/health'
placeholder: '/scheduler/health'
webhook_usage:
type: section
title: Usage Examples
underline: true
fields:
webhook_examples:
type: display
label:
content: |
<div class="alert alert-info">
<h4>How to use webhooks:</h4>
<p><strong>Trigger all due jobs (respects schedule):</strong></p>
<pre>curl -X POST https://your-site.com/scheduler/webhook \
-H "Authorization: Bearer YOUR_TOKEN"</pre>
<p><strong>Force-run specific job (ignores schedule):</strong></p>
<pre>curl -X POST https://your-site.com/scheduler/webhook?job=backup \
-H "Authorization: Bearer YOUR_TOKEN"</pre>
<p><strong>Check health status:</strong></p>
<pre>curl https://your-site.com/scheduler/health</pre>
<p><strong>GitHub Actions example:</strong></p>
<pre>- name: Trigger Scheduler
run: |
curl -X POST ${{ secrets.SITE_URL }}/scheduler/webhook \
-H "Authorization: Bearer ${{ secrets.WEBHOOK_TOKEN }}"</pre>
</div>
markdown: false

View File

@@ -0,0 +1,68 @@
# Grav Scheduler Configuration
# Default scheduler settings (backward compatible)
defaults:
output: true
output_type: file
email: null
# Status of individual jobs (enabled/disabled)
status: {}
# Custom scheduled jobs
custom_jobs: {}
# Modern scheduler features (disabled by default for backward compatibility)
modern:
# Enable modern scheduler features
enabled: false
# Number of concurrent workers (1 = sequential execution like legacy)
workers: 1
# Job retry configuration
retry:
enabled: true
max_attempts: 3
backoff: exponential # 'linear' or 'exponential'
# Job queue configuration
queue:
path: user-data://scheduler/queue
max_size: 1000
# Webhook trigger configuration
webhook:
enabled: false
token: null # Set a secure token to enable webhook triggers
path: /scheduler/webhook
# Health check endpoint
health:
enabled: true
path: /scheduler/health
# Job execution history
history:
enabled: true
retention_days: 30
path: user-data://scheduler/history
# Performance settings
performance:
job_timeout: 300 # Default timeout in seconds
lock_timeout: 10 # Lock acquisition timeout in seconds
# Monitoring and alerts
monitoring:
enabled: false
alert_on_failure: true
alert_email: null
webhook_url: null
# Trigger detection methods
triggers:
check_cron: true
check_systemd: true
check_webhook: true
check_external: true

View File

@@ -10,7 +10,7 @@
namespace Grav\Common\Scheduler;
use Closure;
use Cron\CronExpression;
use Dragonmantank\Cron\CronExpression;
use DateTime;
use Grav\Common\Grav;
use InvalidArgumentException;

View File

@@ -0,0 +1,501 @@
<?php
/**
* @package Grav\Common\Scheduler
*
* @copyright Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
* @license MIT License; see LICENSE file for details.
*/
namespace Grav\Common\Scheduler;
use RocketTheme\Toolbox\File\JsonFile;
use RuntimeException;
/**
* File-based job queue implementation
*
* @package Grav\Common\Scheduler
*/
class JobQueue
{
/** @var string */
protected $queuePath;
/** @var string */
protected $lockFile;
/** @var array Priority levels */
const PRIORITY_HIGH = 'high';
const PRIORITY_NORMAL = 'normal';
const PRIORITY_LOW = 'low';
/**
* JobQueue constructor
*
* @param string $queuePath
*/
public function __construct(string $queuePath)
{
$this->queuePath = $queuePath;
$this->lockFile = $queuePath . '/.lock';
// Create queue directories
$this->initializeDirectories();
}
/**
* Initialize queue directories
*
* @return void
*/
protected function initializeDirectories(): void
{
$dirs = [
$this->queuePath . '/pending',
$this->queuePath . '/processing',
$this->queuePath . '/failed',
$this->queuePath . '/completed',
];
foreach ($dirs as $dir) {
if (!file_exists($dir)) {
mkdir($dir, 0755, true);
}
}
}
/**
* Push a job to the queue
*
* @param Job $job
* @param string $priority
* @return string Job queue ID
*/
public function push(Job $job, string $priority = self::PRIORITY_NORMAL): string
{
$queueId = $this->generateQueueId($job);
$timestamp = microtime(true);
$queueItem = [
'id' => $queueId,
'job_id' => $job->getId(),
'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure',
'arguments' => $job->getArguments(),
'priority' => $priority,
'timestamp' => $timestamp,
'attempts' => 0,
'max_attempts' => $job instanceof ModernJob ? $job->getMaxAttempts() : 1,
'created_at' => date('c'),
'scheduled_for' => null,
'metadata' => [],
];
// Serialize the job if it's a closure
if (!is_string($job->getCommand())) {
$queueItem['serialized_job'] = base64_encode(serialize($job));
}
$this->writeQueueItem($queueItem, 'pending');
return $queueId;
}
/**
* Push a job for delayed execution
*
* @param Job $job
* @param \DateTime $scheduledFor
* @param string $priority
* @return string
*/
public function pushDelayed(Job $job, \DateTime $scheduledFor, string $priority = self::PRIORITY_NORMAL): string
{
$queueId = $this->push($job, $priority);
// Update the scheduled time
$item = $this->getQueueItem($queueId, 'pending');
if ($item) {
$item['scheduled_for'] = $scheduledFor->format('c');
$this->writeQueueItem($item, 'pending');
}
return $queueId;
}
/**
* Pop the next job from the queue
*
* @return Job|null
*/
public function pop(): ?Job
{
$this->lock();
try {
// Get all pending items
$items = $this->getPendingItems();
if (empty($items)) {
$this->unlock();
return null;
}
// Sort by priority and timestamp
usort($items, function($a, $b) {
$priorityOrder = [
self::PRIORITY_HIGH => 0,
self::PRIORITY_NORMAL => 1,
self::PRIORITY_LOW => 2,
];
$aPriority = $priorityOrder[$a['priority']] ?? 1;
$bPriority = $priorityOrder[$b['priority']] ?? 1;
if ($aPriority !== $bPriority) {
return $aPriority - $bPriority;
}
return $a['timestamp'] <=> $b['timestamp'];
});
// Get the first item that's ready to run
$now = new \DateTime();
foreach ($items as $item) {
if ($item['scheduled_for']) {
$scheduledTime = new \DateTime($item['scheduled_for']);
if ($scheduledTime > $now) {
continue; // Skip items not yet due
}
}
// Move to processing
$this->moveQueueItem($item['id'], 'pending', 'processing');
// Reconstruct the job
$job = $this->reconstructJob($item);
$this->unlock();
return $job;
}
$this->unlock();
return null;
} catch (\Exception $e) {
$this->unlock();
throw $e;
}
}
/**
* Mark a job as completed
*
* @param string $queueId
* @return void
*/
public function complete(string $queueId): void
{
$this->moveQueueItem($queueId, 'processing', 'completed');
// Clean up old completed items
$this->cleanupCompleted();
}
/**
* Mark a job as failed
*
* @param string $queueId
* @param string $error
* @return void
*/
public function fail(string $queueId, string $error = ''): void
{
$item = $this->getQueueItem($queueId, 'processing');
if ($item) {
$item['attempts']++;
$item['last_error'] = $error;
$item['failed_at'] = date('c');
if ($item['attempts'] < $item['max_attempts']) {
// Move back to pending for retry
$item['retry_at'] = $this->calculateRetryTime($item['attempts']);
$item['scheduled_for'] = $item['retry_at'];
$this->writeQueueItem($item, 'pending');
$this->deleteQueueItem($queueId, 'processing');
} else {
// Move to failed (dead letter queue)
$this->writeQueueItem($item, 'failed');
$this->deleteQueueItem($queueId, 'processing');
}
}
}
/**
* Get queue size
*
* @return int
*/
public function size(): int
{
return count($this->getPendingItems());
}
/**
* Check if queue is empty
*
* @return bool
*/
public function isEmpty(): bool
{
return $this->size() === 0;
}
/**
* Get queue statistics
*
* @return array
*/
public function getStatistics(): array
{
return [
'pending' => count($this->getPendingItems()),
'processing' => count($this->getItemsInDirectory('processing')),
'failed' => count($this->getItemsInDirectory('failed')),
'completed_today' => $this->countCompletedToday(),
];
}
/**
* Generate a unique queue ID
*
* @param Job $job
* @return string
*/
protected function generateQueueId(Job $job): string
{
return $job->getId() . '_' . uniqid('', true);
}
/**
* Write queue item to disk
*
* @param array $item
* @param string $directory
* @return void
*/
protected function writeQueueItem(array $item, string $directory): void
{
$path = $this->queuePath . '/' . $directory . '/' . $item['id'] . '.json';
$file = JsonFile::instance($path);
$file->save($item);
}
/**
* Read queue item from disk
*
* @param string $queueId
* @param string $directory
* @return array|null
*/
protected function getQueueItem(string $queueId, string $directory): ?array
{
$path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json';
if (!file_exists($path)) {
return null;
}
$file = JsonFile::instance($path);
return $file->content();
}
/**
* Delete queue item
*
* @param string $queueId
* @param string $directory
* @return void
*/
protected function deleteQueueItem(string $queueId, string $directory): void
{
$path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json';
if (file_exists($path)) {
unlink($path);
}
}
/**
* Move queue item between directories
*
* @param string $queueId
* @param string $fromDir
* @param string $toDir
* @return void
*/
protected function moveQueueItem(string $queueId, string $fromDir, string $toDir): void
{
$fromPath = $this->queuePath . '/' . $fromDir . '/' . $queueId . '.json';
$toPath = $this->queuePath . '/' . $toDir . '/' . $queueId . '.json';
if (file_exists($fromPath)) {
rename($fromPath, $toPath);
}
}
/**
* Get all pending items
*
* @return array
*/
protected function getPendingItems(): array
{
return $this->getItemsInDirectory('pending');
}
/**
* Get items in a specific directory
*
* @param string $directory
* @return array
*/
protected function getItemsInDirectory(string $directory): array
{
$items = [];
$path = $this->queuePath . '/' . $directory;
if (!is_dir($path)) {
return $items;
}
$files = glob($path . '/*.json');
foreach ($files as $file) {
$jsonFile = JsonFile::instance($file);
$items[] = $jsonFile->content();
}
return $items;
}
/**
* Reconstruct a job from queue item
*
* @param array $item
* @return Job|null
*/
protected function reconstructJob(array $item): ?Job
{
if (isset($item['serialized_job'])) {
// Unserialize the job
try {
$job = unserialize(base64_decode($item['serialized_job']));
if ($job instanceof Job) {
return $job;
}
} catch (\Exception $e) {
// Failed to unserialize
return null;
}
}
// Create a new job from command
if (isset($item['command'])) {
$args = $item['arguments'] ?? [];
$job = new Job($item['command'], $args, $item['job_id']);
return $job;
}
return null;
}
/**
* Calculate retry time with exponential backoff
*
* @param int $attempts
* @return string
*/
protected function calculateRetryTime(int $attempts): string
{
$backoffSeconds = min(pow(2, $attempts) * 60, 3600); // Max 1 hour
$retryTime = new \DateTime();
$retryTime->modify("+{$backoffSeconds} seconds");
return $retryTime->format('c');
}
/**
* Clean up old completed items
*
* @return void
*/
protected function cleanupCompleted(): void
{
$items = $this->getItemsInDirectory('completed');
$cutoff = new \DateTime('-24 hours');
foreach ($items as $item) {
if (isset($item['created_at'])) {
$createdAt = new \DateTime($item['created_at']);
if ($createdAt < $cutoff) {
$this->deleteQueueItem($item['id'], 'completed');
}
}
}
}
/**
* Count completed jobs today
*
* @return int
*/
protected function countCompletedToday(): int
{
$items = $this->getItemsInDirectory('completed');
$today = new \DateTime('today');
$count = 0;
foreach ($items as $item) {
if (isset($item['created_at'])) {
$createdAt = new \DateTime($item['created_at']);
if ($createdAt >= $today) {
$count++;
}
}
}
return $count;
}
/**
* Acquire lock for queue operations
*
* @return void
*/
protected function lock(): void
{
$attempts = 0;
while (file_exists($this->lockFile) && $attempts < 10) {
usleep(100000); // 100ms
$attempts++;
}
if ($attempts >= 10) {
throw new RuntimeException('Could not acquire queue lock');
}
touch($this->lockFile);
}
/**
* Release queue lock
*
* @return void
*/
protected function unlock(): void
{
if (file_exists($this->lockFile)) {
unlink($this->lockFile);
}
}
}

View File

@@ -0,0 +1,545 @@
<?php
/**
* @package Grav\Common\Scheduler
*
* @copyright Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
* @license MIT License; see LICENSE file for details.
*/
namespace Grav\Common\Scheduler;
use DateTime;
use Exception;
use RuntimeException;
use Symfony\Component\Process\Process;
/**
* Enhanced Job class with modern features
*
* @package Grav\Common\Scheduler
*/
class ModernJob extends Job
{
/** @var int */
protected $maxAttempts = 3;
/** @var int */
protected $retryCount = 0;
/** @var int */
protected $retryDelay = 60; // seconds
/** @var string */
protected $retryStrategy = 'exponential'; // 'linear' or 'exponential'
/** @var float */
protected $executionStartTime;
/** @var float */
protected $executionTime = 0;
/** @var int */
protected $timeout = 300; // 5 minutes default
/** @var array */
protected $dependencies = [];
/** @var array */
protected $chainedJobs = [];
/** @var string|null */
protected $queueId;
/** @var string */
protected $priority = 'normal'; // 'high', 'normal', 'low'
/** @var array */
protected $metadata = [];
/** @var array */
protected $tags = [];
/** @var callable|null */
protected $onSuccess;
/** @var callable|null */
protected $onFailure;
/** @var callable|null */
protected $onRetry;
/**
* Set maximum retry attempts
*
* @param int $attempts
* @return self
*/
public function maxAttempts(int $attempts): self
{
$this->maxAttempts = $attempts;
return $this;
}
/**
* Get maximum retry attempts
*
* @return int
*/
public function getMaxAttempts(): int
{
return $this->maxAttempts;
}
/**
* Set retry delay
*
* @param int $seconds
* @param string $strategy 'linear' or 'exponential'
* @return self
*/
public function retryDelay(int $seconds, string $strategy = 'exponential'): self
{
$this->retryDelay = $seconds;
$this->retryStrategy = $strategy;
return $this;
}
/**
* Get current retry count
*
* @return int
*/
public function getRetryCount(): int
{
return $this->retryCount;
}
/**
* Set job timeout
*
* @param int $seconds
* @return self
*/
public function timeout(int $seconds): self
{
$this->timeout = $seconds;
return $this;
}
/**
* Set job priority
*
* @param string $priority 'high', 'normal', or 'low'
* @return self
*/
public function priority(string $priority): self
{
if (!in_array($priority, ['high', 'normal', 'low'])) {
throw new \InvalidArgumentException('Priority must be high, normal, or low');
}
$this->priority = $priority;
return $this;
}
/**
* Get job priority
*
* @return string
*/
public function getPriority(): string
{
return $this->priority;
}
/**
* Add job dependency
*
* @param string $jobId
* @return self
*/
public function dependsOn(string $jobId): self
{
$this->dependencies[] = $jobId;
return $this;
}
/**
* Chain another job to run after this one
*
* @param Job $job
* @param bool $onlyOnSuccess Run only if current job succeeds
* @return self
*/
public function chain(Job $job, bool $onlyOnSuccess = true): self
{
$this->chainedJobs[] = [
'job' => $job,
'onlyOnSuccess' => $onlyOnSuccess,
];
return $this;
}
/**
* Add metadata to the job
*
* @param string $key
* @param mixed $value
* @return self
*/
public function withMetadata(string $key, $value): self
{
$this->metadata[$key] = $value;
return $this;
}
/**
* Add tags to the job
*
* @param array $tags
* @return self
*/
public function withTags(array $tags): self
{
$this->tags = array_merge($this->tags, $tags);
return $this;
}
/**
* Set success callback
*
* @param callable $callback
* @return self
*/
public function onSuccess(callable $callback): self
{
$this->onSuccess = $callback;
return $this;
}
/**
* Set failure callback
*
* @param callable $callback
* @return self
*/
public function onFailure(callable $callback): self
{
$this->onFailure = $callback;
return $this;
}
/**
* Set retry callback
*
* @param callable $callback
* @return self
*/
public function onRetry(callable $callback): self
{
$this->onRetry = $callback;
return $this;
}
/**
* Run the job with retry support
*
* @return bool
*/
public function runWithRetry(): bool
{
$attempts = 0;
$lastException = null;
while ($attempts < $this->maxAttempts) {
$attempts++;
$this->retryCount = $attempts - 1;
try {
// Record execution start time
$this->executionStartTime = microtime(true);
// Run the job
$result = $this->run();
// Record execution time
$this->executionTime = microtime(true) - $this->executionStartTime;
if ($result && $this->isSuccessful()) {
// Call success callback
if ($this->onSuccess) {
call_user_func($this->onSuccess, $this);
}
// Run chained jobs
$this->runChainedJobs(true);
return true;
}
throw new RuntimeException('Job execution failed');
} catch (Exception $e) {
$lastException = $e;
$this->output = $e->getMessage();
$this->successful = false;
if ($attempts < $this->maxAttempts) {
// Call retry callback
if ($this->onRetry) {
call_user_func($this->onRetry, $this, $attempts, $e);
}
// Calculate delay before retry
$delay = $this->calculateRetryDelay($attempts);
if ($delay > 0) {
sleep($delay);
}
} else {
// Final failure
if ($this->onFailure) {
call_user_func($this->onFailure, $this, $e);
}
// Run chained jobs that should run on failure
$this->runChainedJobs(false);
}
}
}
return false;
}
/**
* Override parent run method to add timeout support
*
* @return bool
*/
public function run(): bool
{
// Check dependencies
if (!$this->checkDependencies()) {
$this->output = 'Dependencies not met';
$this->successful = false;
return false;
}
// Call parent run method
$result = parent::run();
// Apply timeout to process if applicable
if ($this->process instanceof Process && $this->timeout > 0) {
$this->process->setTimeout($this->timeout);
}
return $result;
}
/**
* Get execution time in seconds
*
* @return float
*/
public function getExecutionTime(): float
{
return $this->executionTime;
}
/**
* Get job metadata
*
* @param string|null $key
* @return mixed
*/
public function getMetadata(string $key = null)
{
if ($key === null) {
return $this->metadata;
}
return $this->metadata[$key] ?? null;
}
/**
* Get job tags
*
* @return array
*/
public function getTags(): array
{
return $this->tags;
}
/**
* Check if job has a specific tag
*
* @param string $tag
* @return bool
*/
public function hasTag(string $tag): bool
{
return in_array($tag, $this->tags);
}
/**
* Set queue ID
*
* @param string $queueId
* @return self
*/
public function setQueueId(string $queueId): self
{
$this->queueId = $queueId;
return $this;
}
/**
* Get queue ID
*
* @return string|null
*/
public function getQueueId(): ?string
{
return $this->queueId;
}
/**
* Get process (for background jobs)
*
* @return Process|null
*/
public function getProcess(): ?Process
{
return $this->process;
}
/**
* Calculate retry delay based on strategy
*
* @param int $attempt
* @return int
*/
protected function calculateRetryDelay(int $attempt): int
{
if ($this->retryStrategy === 'exponential') {
return min($this->retryDelay * pow(2, $attempt - 1), 3600); // Max 1 hour
}
return $this->retryDelay;
}
/**
* Check if dependencies are met
*
* @return bool
*/
protected function checkDependencies(): bool
{
if (empty($this->dependencies)) {
return true;
}
// This would need to check against job history or status
// For now, we'll assume dependencies are met
// In a real implementation, this would check the ModernScheduler's job status
return true;
}
/**
* Run chained jobs
*
* @param bool $success Whether the current job succeeded
* @return void
*/
protected function runChainedJobs(bool $success): void
{
foreach ($this->chainedJobs as $chainedJob) {
$shouldRun = !$chainedJob['onlyOnSuccess'] || $success;
if ($shouldRun) {
$job = $chainedJob['job'];
if ($job instanceof ModernJob) {
$job->runWithRetry();
} else {
$job->run();
}
}
}
}
/**
* Convert job to array for serialization
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->getId(),
'command' => is_string($this->command) ? $this->command : 'Closure',
'at' => $this->getAt(),
'enabled' => $this->getEnabled(),
'priority' => $this->priority,
'max_attempts' => $this->maxAttempts,
'retry_count' => $this->retryCount,
'retry_delay' => $this->retryDelay,
'retry_strategy' => $this->retryStrategy,
'timeout' => $this->timeout,
'dependencies' => $this->dependencies,
'metadata' => $this->metadata,
'tags' => $this->tags,
'execution_time' => $this->executionTime,
'successful' => $this->successful,
'output' => $this->output,
];
}
/**
* Create job from array
*
* @param array $data
* @return self
*/
public static function fromArray(array $data): self
{
$job = new self($data['command'] ?? '', [], $data['id'] ?? null);
if (isset($data['at'])) {
$job->at($data['at']);
}
if (isset($data['priority'])) {
$job->priority($data['priority']);
}
if (isset($data['max_attempts'])) {
$job->maxAttempts($data['max_attempts']);
}
if (isset($data['retry_delay']) && isset($data['retry_strategy'])) {
$job->retryDelay($data['retry_delay'], $data['retry_strategy']);
}
if (isset($data['timeout'])) {
$job->timeout($data['timeout']);
}
if (isset($data['dependencies'])) {
foreach ($data['dependencies'] as $dep) {
$job->dependsOn($dep);
}
}
if (isset($data['metadata'])) {
foreach ($data['metadata'] as $key => $value) {
$job->withMetadata($key, $value);
}
}
if (isset($data['tags'])) {
$job->withTags($data['tags']);
}
return $job;
}
}

View File

@@ -0,0 +1,621 @@
<?php
/**
* @package Grav\Common\Scheduler
*
* @copyright Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
* @license MIT License; see LICENSE file for details.
*/
namespace Grav\Common\Scheduler;
use DateTime;
use Grav\Common\Filesystem\Folder;
use Grav\Common\Grav;
use Grav\Common\Utils;
use RocketTheme\Toolbox\File\YamlFile;
use RocketTheme\Toolbox\File\JsonFile;
use Symfony\Component\Process\Process;
use InvalidArgumentException;
use RuntimeException;
/**
* Modern Scheduler with enhanced features for reliability and monitoring
*
* @package Grav\Common\Scheduler
*/
class ModernScheduler extends Scheduler
{
/** @var array */
protected $workers = [];
/** @var int */
protected $maxWorkers = 1;
/** @var string */
protected $queuePath;
/** @var string */
protected $historyPath;
/** @var array */
protected $modernConfig;
/** @var bool */
protected $webhookEnabled = false;
/** @var string|null */
protected $webhookToken;
/** @var bool */
protected $healthEnabled = true;
/** @var JobQueue */
protected $jobQueue;
/**
* Create new ModernScheduler instance
*/
public function __construct()
{
parent::__construct();
$grav = Grav::instance();
$this->modernConfig = $grav['config']->get('scheduler.modern', []);
// Set up modern features if enabled
if ($this->isModernEnabled()) {
$this->initializeModernFeatures();
}
}
/**
* Check if modern features are enabled
*
* @return bool
*/
public function isModernEnabled(): bool
{
return $this->modernConfig['enabled'] ?? false;
}
/**
* Initialize modern scheduler features
*
* @return void
*/
protected function initializeModernFeatures(): void
{
$locator = Grav::instance()['locator'];
// Set up paths
$this->queuePath = $this->modernConfig['queue']['path'] ?? 'user-data://scheduler/queue';
$this->queuePath = $locator->findResource($this->queuePath, true, true);
$this->historyPath = $this->modernConfig['history']['path'] ?? 'user-data://scheduler/history';
$this->historyPath = $locator->findResource($this->historyPath, true, true);
// Create directories if they don't exist
if (!file_exists($this->queuePath)) {
Folder::create($this->queuePath);
}
if (!file_exists($this->historyPath)) {
Folder::create($this->historyPath);
}
// Initialize job queue
$this->jobQueue = new JobQueue($this->queuePath);
// Configure workers
$this->maxWorkers = $this->modernConfig['workers'] ?? 1;
// Configure webhook
$this->webhookEnabled = $this->modernConfig['webhook']['enabled'] ?? false;
$this->webhookToken = $this->modernConfig['webhook']['token'] ?? null;
// Configure health check
$this->healthEnabled = $this->modernConfig['health']['enabled'] ?? true;
}
/**
* Enhanced run method with modern features
*
* @param DateTime|null $runTime
* @param bool $force
* @return void
*/
public function run(DateTime $runTime = null, $force = false): void
{
if (!$this->isModernEnabled()) {
// Fall back to parent implementation
parent::run($runTime, $force);
return;
}
$this->loadSavedJobs();
if (null === $runTime) {
$runTime = new DateTime('now');
}
// Process queued jobs first
$this->processQueuedJobs();
// Get scheduled jobs
[$background, $foreground] = $this->getQueuedJobs(false);
$alljobs = array_merge($background, $foreground);
// Check which jobs are due and add them to the queue
foreach ($alljobs as $job) {
if ($job->isDue($runTime) || $force) {
if ($job instanceof ModernJob) {
// Add to queue for processing
$this->jobQueue->push($job);
} else {
// Run legacy jobs directly
$job->run();
$this->jobs_run[] = $job;
}
}
}
// Process jobs with workers
$this->processJobsWithWorkers();
// Store states and history
$this->saveJobStates();
$this->saveJobHistory();
// Update last run timestamp
$this->updateLastRun();
}
/**
* Process jobs from the queue
*
* @return void
*/
protected function processQueuedJobs(): void
{
$maxSize = $this->modernConfig['queue']['max_size'] ?? 1000;
while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) {
$job = $this->jobQueue->pop();
if ($job) {
$this->executeJob($job);
}
}
}
/**
* Process jobs using multiple workers
*
* @return void
*/
protected function processJobsWithWorkers(): void
{
// Wait for all workers to complete
foreach ($this->workers as $workerId => $process) {
if ($process instanceof Process) {
$process->wait();
unset($this->workers[$workerId]);
}
}
}
/**
* Execute a job with retry support
*
* @param Job $job
* @return void
*/
protected function executeJob(Job $job): void
{
if ($job instanceof ModernJob) {
// Use modern job execution with retry
$job->runWithRetry();
} else {
// Use standard job execution
$job->run();
}
$this->jobs_run[] = $job;
// Handle background jobs
if ($job->runInBackground() && $this->maxWorkers > 1) {
$process = $job->getProcess();
if ($process) {
$this->workers[] = $process;
}
}
}
/**
* Save job execution history
*
* @return void
*/
protected function saveJobHistory(): void
{
if (!$this->modernConfig['history']['enabled'] ?? true) {
return;
}
$now = new DateTime('now');
$historyFile = $this->historyPath . '/' . $now->format('Y-m-d') . '.json';
$history = [];
if (file_exists($historyFile)) {
$file = JsonFile::instance($historyFile);
$history = $file->content();
} else {
$file = JsonFile::instance($historyFile);
}
foreach ($this->jobs_run as $job) {
$entry = [
'job_id' => $job->getId(),
'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure',
'timestamp' => $now->format('c'),
'success' => $job->isSuccessful(),
'duration' => $job instanceof ModernJob ? $job->getExecutionTime() : null,
'output' => substr($job->getOutput(), 0, 1000), // Limit output size
'retry_count' => $job instanceof ModernJob ? $job->getRetryCount() : 0,
];
$history[] = $entry;
}
$file->save($history);
// Clean up old history files
$this->cleanupHistory();
}
/**
* Clean up old history files
*
* @return void
*/
protected function cleanupHistory(): void
{
$retentionDays = $this->modernConfig['history']['retention_days'] ?? 30;
$cutoffDate = new DateTime("-{$retentionDays} days");
$files = glob($this->historyPath . '/*.json');
foreach ($files as $file) {
$filename = basename($file, '.json');
$fileDate = DateTime::createFromFormat('Y-m-d', $filename);
if ($fileDate && $fileDate < $cutoffDate) {
unlink($file);
}
}
}
/**
* Update last run timestamp
*
* @return void
*/
protected function updateLastRun(): void
{
$lastRunFile = $this->status_path . '/last_run.txt';
file_put_contents($lastRunFile, (new DateTime('now'))->format('c'), LOCK_EX);
// Also update the legacy location for backward compatibility
file_put_contents('logs/lastcron.run', (new DateTime('now'))->format('Y-m-d H:i:s'), LOCK_EX);
}
/**
* Check scheduler health
*
* @return array
*/
public function getHealthStatus(): array
{
$lastRunFile = $this->status_path . '/last_run.txt';
$lastRun = file_exists($lastRunFile) ? file_get_contents($lastRunFile) : null;
$health = [
'status' => 'healthy',
'last_run' => $lastRun,
'last_run_age' => null,
'queue_size' => 0,
'failed_jobs_24h' => 0,
'scheduled_jobs' => count($this->getAllJobs()),
'modern_features' => $this->isModernEnabled(),
'workers' => $this->maxWorkers,
'trigger_methods' => $this->getActiveTriggers(),
];
if ($lastRun) {
$lastRunTime = new DateTime($lastRun);
$now = new DateTime('now');
$diff = $now->getTimestamp() - $lastRunTime->getTimestamp();
$health['last_run_age'] = $diff;
// Mark as unhealthy if no run in last 10 minutes
if ($diff > 600) {
$health['status'] = 'warning';
}
// Mark as critical if no run in last hour
if ($diff > 3600) {
$health['status'] = 'critical';
}
} else {
$health['status'] = 'unknown';
}
// Get queue size if modern features enabled
if ($this->isModernEnabled() && $this->jobQueue) {
$health['queue_size'] = $this->jobQueue->size();
}
// Count failed jobs in last 24 hours
$health['failed_jobs_24h'] = $this->countRecentFailures();
return $health;
}
/**
* Check if webhook is enabled
*
* @return bool
*/
public function isWebhookEnabled(): bool
{
return $this->webhookEnabled;
}
/**
* Get active trigger methods
*
* @return array
*/
public function getActiveTriggers(): array
{
$triggers = [];
// Check cron
$cronStatus = $this->isCrontabSetup();
if ($cronStatus === 1) {
$triggers[] = 'cron';
}
// Check systemd timer
if ($this->isSystemdTimerActive()) {
$triggers[] = 'systemd';
}
// Check webhook
if ($this->webhookEnabled) {
$triggers[] = 'webhook';
}
// Check for external triggers
$lastRunFile = $this->status_path . '/last_run.txt';
if (file_exists($lastRunFile)) {
$lastRun = file_get_contents($lastRunFile);
$lastRunTime = new DateTime($lastRun);
$now = new DateTime('now');
$diff = $now->getTimestamp() - $lastRunTime->getTimestamp();
if ($diff < 120) {
$triggers[] = 'external';
}
}
return $triggers;
}
/**
* Check if systemd timer is active
*
* @return bool
*/
protected function isSystemdTimerActive(): bool
{
if (Utils::isWindows()) {
return false;
}
$process = new Process(['systemctl', 'is-active', 'grav-scheduler.timer']);
$process->run();
return $process->isSuccessful() && trim($process->getOutput()) === 'active';
}
/**
* Count recent job failures
*
* @return int
*/
protected function countRecentFailures(): int
{
$count = 0;
$cutoff = new DateTime('-24 hours');
// Check today's history
$todayFile = $this->historyPath . '/' . date('Y-m-d') . '.json';
if (file_exists($todayFile)) {
$file = JsonFile::instance($todayFile);
$history = $file->content();
foreach ($history as $entry) {
$entryTime = new DateTime($entry['timestamp']);
if ($entryTime > $cutoff && !$entry['success']) {
$count++;
}
}
}
// Check yesterday's history
$yesterdayFile = $this->historyPath . '/' . $cutoff->format('Y-m-d') . '.json';
if (file_exists($yesterdayFile)) {
$file = JsonFile::instance($yesterdayFile);
$history = $file->content();
foreach ($history as $entry) {
$entryTime = new DateTime($entry['timestamp']);
if ($entryTime > $cutoff && !$entry['success']) {
$count++;
}
}
}
return $count;
}
/**
* Process webhook trigger
*
* @param string|null $token
* @param string|null $jobId
* @return array
*/
public function processWebhookTrigger($token = null, $jobId = null): array
{
if (!$this->webhookEnabled) {
return ['success' => false, 'message' => 'Webhook triggers are not enabled'];
}
if ($this->webhookToken && $token !== $this->webhookToken) {
return ['success' => false, 'message' => 'Invalid webhook token'];
}
if ($jobId) {
// Force run specific job (manual override - ignore schedule)
$job = $this->getJob($jobId);
if ($job) {
// Force run in foreground to get immediate result
$job->inForeground()->run();
// Track as manually executed
$this->jobs_run[] = $job;
$this->saveJobStates();
$this->updateLastRun();
return [
'success' => $job->isSuccessful(),
'message' => $job->isSuccessful() ? 'Job force-executed successfully' : 'Job execution failed',
'job_id' => $jobId,
'forced' => true,
'output' => $job->getOutput(),
];
} else {
return ['success' => false, 'message' => 'Job not found: ' . $jobId];
}
} else {
// Run all due jobs normally
$this->run();
return [
'success' => true,
'message' => 'Scheduler executed (due jobs only)',
'jobs_run' => count($this->jobs_run),
'timestamp' => date('c'),
];
}
}
/**
* Get scheduler statistics
*
* @return array
*/
public function getStatistics(): array
{
$stats = [
'total_jobs' => count($this->getAllJobs()),
'enabled_jobs' => 0,
'disabled_jobs' => 0,
'executions_today' => 0,
'failures_today' => 0,
'average_execution_time' => 0,
'queue_size' => 0,
];
// Count enabled/disabled jobs
foreach ($this->getAllJobs() as $job) {
if ($job->getEnabled()) {
$stats['enabled_jobs']++;
} else {
$stats['disabled_jobs']++;
}
}
// Get today's statistics
$todayFile = $this->historyPath . '/' . date('Y-m-d') . '.json';
if (file_exists($todayFile)) {
$file = JsonFile::instance($todayFile);
$history = $file->content();
$totalTime = 0;
$timeCount = 0;
foreach ($history as $entry) {
$stats['executions_today']++;
if (!$entry['success']) {
$stats['failures_today']++;
}
if (isset($entry['duration']) && $entry['duration'] > 0) {
$totalTime += $entry['duration'];
$timeCount++;
}
}
if ($timeCount > 0) {
$stats['average_execution_time'] = round($totalTime / $timeCount, 2);
}
}
// Get queue size
if ($this->isModernEnabled() && $this->jobQueue) {
$stats['queue_size'] = $this->jobQueue->size();
}
return $stats;
}
/**
* Run scheduler in daemon mode
*
* @param int $interval Check interval in seconds (default: 60)
* @return void
*/
public function runDaemon($interval = 60): void
{
if (!$this->isModernEnabled()) {
throw new RuntimeException('Daemon mode requires modern features to be enabled');
}
$lastRun = 0;
while (true) {
$now = time();
// Run scheduler every minute
if ($now - $lastRun >= $interval) {
$this->run();
$lastRun = $now;
}
// Process any queued jobs
$this->processQueuedJobs();
// Sleep for a short interval
sleep(5);
// Check for shutdown signal
if (file_exists($this->status_path . '/shutdown')) {
unlink($this->status_path . '/shutdown');
break;
}
}
}
}

View File

@@ -378,6 +378,40 @@ class Scheduler
}
/**
* Check if webhook is enabled
*
* @return bool
*/
public function isWebhookEnabled(): bool
{
// Check config for webhook settings even in base scheduler
$config = Grav::instance()['config'];
return $config->get('scheduler.modern.webhook.enabled', false);
}
/**
* Get active trigger methods
*
* @return array
*/
public function getActiveTriggers(): array
{
$triggers = [];
$cronStatus = $this->isCrontabSetup();
if ($cronStatus === 1) {
$triggers[] = 'cron';
}
// Check if webhook is enabled
if ($this->isWebhookEnabled()) {
$triggers[] = 'webhook';
}
return $triggers;
}
/**
* Queue a job for execution in the correct queue.
*

View File

@@ -0,0 +1,263 @@
<?php
/**
* @package Grav\Common\Scheduler
*
* @copyright Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
* @license MIT License; see LICENSE file for details.
*/
namespace Grav\Common\Scheduler;
use Grav\Common\Grav;
use Grav\Common\Utils;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
/**
* Scheduler Controller for handling HTTP endpoints
*
* @package Grav\Common\Scheduler
*/
class SchedulerController
{
/** @var Grav */
protected $grav;
/** @var ModernScheduler */
protected $scheduler;
/**
* SchedulerController constructor
*
* @param Grav $grav
*/
public function __construct(Grav $grav)
{
$this->grav = $grav;
// Get scheduler instance
$scheduler = $grav['scheduler'];
if ($scheduler instanceof ModernScheduler) {
$this->scheduler = $scheduler;
} else {
// Create ModernScheduler instance if not already
$this->scheduler = new ModernScheduler();
}
}
/**
* Handle health check endpoint
*
* @param ServerRequestInterface $request
* @return ResponseInterface
*/
public function health(ServerRequestInterface $request): ResponseInterface
{
$config = $this->grav['config']->get('scheduler.modern', []);
// Check if health endpoint is enabled
if (!($config['health']['enabled'] ?? true)) {
return $this->jsonResponse(['error' => 'Health check disabled'], 403);
}
// Get health status
$health = $this->scheduler->getHealthStatus();
return $this->jsonResponse($health);
}
/**
* Handle webhook trigger endpoint
*
* @param ServerRequestInterface $request
* @return ResponseInterface
*/
public function webhook(ServerRequestInterface $request): ResponseInterface
{
$config = $this->grav['config']->get('scheduler.modern', []);
// Check if webhook is enabled
if (!($config['webhook']['enabled'] ?? false)) {
return $this->jsonResponse(['error' => 'Webhook triggers disabled'], 403);
}
// Get authorization header
$authHeader = $request->getHeaderLine('Authorization');
$token = null;
if (preg_match('/Bearer\s+(.+)$/i', $authHeader, $matches)) {
$token = $matches[1];
}
// Get query parameters
$params = $request->getQueryParams();
$jobId = $params['job'] ?? null;
// Process webhook
$result = $this->scheduler->processWebhookTrigger($token, $jobId);
$statusCode = $result['success'] ? 200 : 400;
return $this->jsonResponse($result, $statusCode);
}
/**
* Handle statistics endpoint
*
* @param ServerRequestInterface $request
* @return ResponseInterface
*/
public function statistics(ServerRequestInterface $request): ResponseInterface
{
// Check if user is admin
$user = $this->grav['user'] ?? null;
if (!$user || !$user->authorize('admin.super')) {
return $this->jsonResponse(['error' => 'Unauthorized'], 401);
}
$stats = $this->scheduler->getStatistics();
return $this->jsonResponse($stats);
}
/**
* Handle admin AJAX requests for scheduler status
*
* @param ServerRequestInterface $request
* @return ResponseInterface
*/
public function adminStatus(ServerRequestInterface $request): ResponseInterface
{
// Check if user is admin
$user = $this->grav['user'] ?? null;
if (!$user || !$user->authorize('admin.scheduler')) {
return $this->jsonResponse(['error' => 'Unauthorized'], 401);
}
$health = $this->scheduler->getHealthStatus();
// Format for admin display
$response = [
'health' => $this->formatHealthStatus($health),
'triggers' => $this->formatTriggers($health['trigger_methods'] ?? [])
];
return $this->jsonResponse($response);
}
/**
* Format health status for display
*
* @param array $health
* @return string
*/
protected function formatHealthStatus(array $health): string
{
$status = $health['status'] ?? 'unknown';
$lastRun = $health['last_run'] ?? null;
$queueSize = $health['queue_size'] ?? 0;
$failedJobs = $health['failed_jobs_24h'] ?? 0;
$statusBadge = match($status) {
'healthy' => '<span class="badge badge-success">Healthy</span>',
'warning' => '<span class="badge badge-warning">Warning</span>',
'critical' => '<span class="badge badge-danger">Critical</span>',
default => '<span class="badge badge-secondary">Unknown</span>'
};
$html = '<div class="scheduler-health">';
$html .= '<p>Status: ' . $statusBadge . '</p>';
if ($lastRun) {
$lastRunTime = new \DateTime($lastRun);
$now = new \DateTime();
$diff = $now->diff($lastRunTime);
$timeAgo = '';
if ($diff->d > 0) {
$timeAgo = $diff->d . ' day' . ($diff->d > 1 ? 's' : '') . ' ago';
} elseif ($diff->h > 0) {
$timeAgo = $diff->h . ' hour' . ($diff->h > 1 ? 's' : '') . ' ago';
} elseif ($diff->i > 0) {
$timeAgo = $diff->i . ' minute' . ($diff->i > 1 ? 's' : '') . ' ago';
} else {
$timeAgo = 'Less than a minute ago';
}
$html .= '<p>Last Run: <strong>' . $timeAgo . '</strong></p>';
} else {
$html .= '<p>Last Run: <strong>Never</strong></p>';
}
$html .= '<p>Queue Size: <strong>' . $queueSize . '</strong></p>';
if ($failedJobs > 0) {
$html .= '<p class="text-danger">Failed Jobs (24h): <strong>' . $failedJobs . '</strong></p>';
}
$html .= '</div>';
return $html;
}
/**
* Format triggers for display
*
* @param array $triggers
* @return string
*/
protected function formatTriggers(array $triggers): string
{
if (empty($triggers)) {
return '<div class="alert alert-warning">No active triggers detected. Please set up cron, systemd, or webhook triggers.</div>';
}
$html = '<div class="scheduler-triggers">';
$html .= '<ul class="list-unstyled">';
foreach ($triggers as $trigger) {
$icon = match($trigger) {
'cron' => '⏰',
'systemd' => '⚙️',
'webhook' => '🔗',
'external' => '🌐',
default => '•'
};
$label = match($trigger) {
'cron' => 'Cron Job',
'systemd' => 'Systemd Timer',
'webhook' => 'Webhook Triggers',
'external' => 'External Triggers',
default => ucfirst($trigger)
};
$html .= '<li>' . $icon . ' <strong>' . $label . '</strong> <span class="badge badge-success">Active</span></li>';
}
$html .= '</ul>';
$html .= '</div>';
return $html;
}
/**
* Create JSON response
*
* @param array $data
* @param int $statusCode
* @return ResponseInterface
*/
protected function jsonResponse(array $data, int $statusCode = 200): ResponseInterface
{
$response = $this->grav['response'] ?? new \Nyholm\Psr7\Response();
$response = $response->withStatus($statusCode)
->withHeader('Content-Type', 'application/json');
$body = $response->getBody();
$body->write(json_encode($data));
return $response;
}
}

View File

@@ -10,6 +10,7 @@
namespace Grav\Common\Service;
use Grav\Common\Scheduler\Scheduler;
use Grav\Common\Scheduler\ModernScheduler;
use Pimple\Container;
use Pimple\ServiceProviderInterface;
@@ -25,7 +26,16 @@ class SchedulerServiceProvider implements ServiceProviderInterface
*/
public function register(Container $container)
{
$container['scheduler'] = function () {
$container['scheduler'] = function ($c) {
$config = $c['config'];
// Use ModernScheduler if modern features are enabled
$modernEnabled = $config->get('scheduler.modern.enabled', false);
if ($modernEnabled) {
return new ModernScheduler();
}
return new Scheduler();
};
}

View File

@@ -9,7 +9,7 @@
namespace Grav\Console\Cli;
use Cron\CronExpression;
use Dragonmantank\Cron\CronExpression;
use Grav\Common\Grav;
use Grav\Common\Utils;
use Grav\Common\Scheduler\Scheduler;