simplify extended jobs logic

Signed-off-by: Andy Miller <rhuk@mac.com>
This commit is contained in:
Andy Miller
2025-08-24 22:14:14 +01:00
parent 56cc894c1d
commit e497a93da6
7 changed files with 1713 additions and 81 deletions

View File

@@ -250,47 +250,69 @@ form:
var statusEl = document.getElementById('scheduler-health-status');
if (!statusEl) return;
var html = '<div class="scheduler-health-info">';
// Modern card-based layout
var statusColor = '#6c757d';
var statusLabel = data.status || 'unknown';
if (data.status === 'healthy') statusColor = '#28a745';
else if (data.status === 'warning') statusColor = '#ffc107';
else if (data.status === 'critical') statusColor = '#dc3545';
// Status badge
var badge = 'secondary';
if (data.status === 'healthy') badge = 'success';
else if (data.status === 'warning') badge = 'warning';
else if (data.status === 'critical') badge = 'danger';
var html = '<div style="display: flex; flex-direction: column; gap: 1rem;">';
html += '<p><strong>Status:</strong> <span class="badge badge-' + badge + '">' +
(data.status || 'Unknown').toUpperCase() + '</span></p>';
// Status card
html += '<div style="display: flex; align-items: center; justify-content: space-between; padding: 0.75rem 1rem; background: linear-gradient(135deg, #f8f9fa 0%, #fff 100%); border-radius: 6px; border: 1px solid #e9ecef; box-shadow: 0 1px 3px rgba(0,0,0,0.05);">';
html += '<span style="font-weight: 500; color: #495057;">Status:</span>';
html += '<span style="background: ' + statusColor + '; color: white; padding: 0.375rem 0.75rem; font-size: 0.875rem; font-weight: 500; border-radius: 4px; text-transform: uppercase; letter-spacing: 0.025em;">' + statusLabel + '</span>';
html += '</div>';
// Last run
// Info grid
html += '<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 0.75rem;">';
// Last run card
html += '<div style="background: white; border: 1px solid #e9ecef; border-radius: 6px; padding: 0.75rem; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<div style="color: #6c757d; font-size: 0.75rem; text-transform: uppercase; letter-spacing: 0.05em; margin-bottom: 0.25rem;">Last Run</div>';
if (data.last_run) {
var age = data.last_run_age;
var ageText = 'just now';
if (age > 3600) {
if (age > 86400) {
ageText = Math.floor(age / 86400) + ' day(s) ago';
} else if (age > 3600) {
ageText = Math.floor(age / 3600) + ' hour(s) ago';
} else if (age > 60) {
ageText = Math.floor(age / 60) + ' minute(s) ago';
} else if (age > 0) {
ageText = age + ' second(s) ago';
}
html += '<p><strong>Last Run:</strong> ' + ageText + '</p>';
html += '<div style="font-size: 1rem; color: #212529; font-weight: 500;">' + ageText + '</div>';
} else {
html += '<p><strong>Last Run:</strong> Never</p>';
html += '<div style="font-size: 1rem; color: #6c757d;">Never</div>';
}
// Jobs count
html += '<p><strong>Scheduled Jobs:</strong> ' + (data.scheduled_jobs || 0) + '</p>';
// Queue size (if modern features enabled)
if (data.modern_features && data.queue_size !== undefined) {
html += '<p><strong>Queue Size:</strong> ' + data.queue_size + '</p>';
}
// Failed jobs
if (data.failed_jobs_24h > 0) {
html += '<p class="text-danger"><strong>Failed (24h):</strong> ' + data.failed_jobs_24h + '</p>';
}
html += '</div>';
// Jobs count card
html += '<div style="background: white; border: 1px solid #e9ecef; border-radius: 6px; padding: 0.75rem; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<div style="color: #6c757d; font-size: 0.75rem; text-transform: uppercase; letter-spacing: 0.05em; margin-bottom: 0.25rem;">Scheduled Jobs</div>';
html += '<div style="font-size: 1rem; color: #212529; font-weight: 500;">' + (data.scheduled_jobs || 0) + '</div>';
html += '</div>';
html += '</div>'; // Close grid
// Additional info if available
if (data.modern_features && data.queue_size !== undefined) {
html += '<div style="background: white; border: 1px solid #e9ecef; border-radius: 6px; padding: 0.75rem; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<span style="color: #6c757d; font-size: 0.875rem;">Queue Size: </span>';
html += '<span style="font-weight: 500;">' + data.queue_size + '</span>';
html += '</div>';
}
// Failed jobs warning
if (data.failed_jobs_24h > 0) {
html += '<div style="background: #fff5f5; border: 1px solid #feb2b2; border-radius: 6px; padding: 0.75rem; color: #c53030;">';
html += '<strong>⚠️ Failed Jobs (24h):</strong> ' + data.failed_jobs_24h;
html += '</div>';
}
html += '</div>'; // Close main container
statusEl.innerHTML = html;
})
.catch(error => {
@@ -339,26 +361,46 @@ form:
var triggersEl = document.getElementById('scheduler-triggers');
if (!triggersEl) return;
var html = '<ul class="list-unstyled">';
var html = '<div style="display: flex; flex-direction: column; gap: 0.75rem;">';
// Cron status
if (cronReady) {
html += '<li>✅ <strong>Cron:</strong> <span class="badge badge-success">Active</span></li>';
} else {
html += '<li>❌ <strong>Cron:</strong> <span class="badge badge-secondary">Not Configured</span></li>';
}
// Cron trigger card
var cronBg = cronReady ? 'linear-gradient(135deg, #d4edda 0%, #fff 100%)' : 'linear-gradient(135deg, #f8f9fa 0%, #fff 100%)';
var cronBorder = cronReady ? '#c3e6cb' : '#dee2e6';
var cronIcon = cronReady ? '✅' : '❌';
var cronStatus = cronReady ? 'Active' : 'Not Configured';
var cronColor = cronReady ? '#155724' : '#6c757d';
// Webhook status
html += '<div style="display: flex; align-items: center; justify-content: space-between; padding: 0.75rem 1rem; background: ' + cronBg + '; border: 1px solid ' + cronBorder + '; border-radius: 6px; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<div style="display: flex; align-items: center; gap: 0.5rem;">';
html += '<span style="font-size: 1.1rem;">' + cronIcon + '</span>';
html += '<span style="font-weight: 500; color: #495057;">Cron:</span>';
html += '</div>';
html += '<span style="background: ' + (cronReady ? '#28a745' : '#6c757d') + '; color: white; padding: 0.25rem 0.625rem; font-size: 0.8125rem; font-weight: 500; border-radius: 4px;">' + cronStatus + '</span>';
html += '</div>';
// Webhook trigger card
if (data.webhook_enabled) {
html += '<li>✅ <strong>Webhook:</strong> <span class="badge badge-success">Active</span></li>';
html += '<div style="display: flex; align-items: center; justify-content: space-between; padding: 0.75rem 1rem; background: linear-gradient(135deg, #d4edda 0%, #fff 100%); border: 1px solid #c3e6cb; border-radius: 6px; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<div style="display: flex; align-items: center; gap: 0.5rem;">';
html += '<span style="font-size: 1.1rem;">✅</span>';
html += '<span style="font-weight: 500; color: #495057;">Webhook:</span>';
html += '</div>';
html += '<span style="background: #28a745; color: white; padding: 0.25rem 0.625rem; font-size: 0.8125rem; font-weight: 500; border-radius: 4px;">Active</span>';
html += '</div>';
} else {
var modernEnabled = document.querySelector('[name="data[scheduler][modern][enabled]"]:checked');
if (modernEnabled && modernEnabled.value == '1') {
html += '<li>⚠️ <strong>Webhook:</strong> <span class="badge badge-warning">Disabled</span></li>';
html += '<div style="display: flex; align-items: center; justify-content: space-between; padding: 0.75rem 1rem; background: linear-gradient(135deg, #fff3cd 0%, #fff 100%); border: 1px solid #ffeeba; border-radius: 6px; box-shadow: 0 1px 2px rgba(0,0,0,0.03);">';
html += '<div style="display: flex; align-items: center; gap: 0.5rem;">';
html += '<span style="font-size: 1.1rem;">⚠️</span>';
html += '<span style="font-weight: 500; color: #495057;">Webhook:</span>';
html += '</div>';
html += '<span style="background: #ffc107; color: #212529; padding: 0.25rem 0.625rem; font-size: 0.8125rem; font-weight: 500; border-radius: 4px;">Disabled</span>';
html += '</div>';
}
}
html += '</ul>';
html += '</div>';
// Add warning if no triggers active
if (!cronReady && !data.webhook_enabled) {

View File

@@ -77,6 +77,40 @@ class Job
private $successful = false;
/** @var string|null */
private $backlink;
// Modern Job features
/** @var int */
protected $maxAttempts = 3;
/** @var int */
protected $retryCount = 0;
/** @var int */
protected $retryDelay = 60; // seconds
/** @var string */
protected $retryStrategy = 'exponential'; // 'linear' or 'exponential'
/** @var float */
protected $executionStartTime;
/** @var float */
protected $executionDuration = 0;
/** @var int */
protected $timeout = 300; // 5 minutes default
/** @var array */
protected $dependencies = [];
/** @var array */
protected $chainedJobs = [];
/** @var string|null */
protected $queueId;
/** @var string */
protected $priority = 'normal'; // 'high', 'normal', 'low'
/** @var array */
protected $metadata = [];
/** @var array */
protected $tags = [];
/** @var callable|null */
protected $onSuccess;
/** @var callable|null */
protected $onFailure;
/** @var callable|null */
protected $onRetry;
/**
* Create a new Job instance.
@@ -150,6 +184,16 @@ class Job
return null;
}
/**
* Get raw arguments (array or string)
*
* @return array|string
*/
public function getRawArguments()
{
return $this->args;
}
/**
* @return CronExpression
@@ -315,6 +359,13 @@ class Job
*/
public function run()
{
// Check dependencies (modern feature)
if (!$this->checkDependencies()) {
$this->output = 'Dependencies not met';
$this->successful = false;
return false;
}
// If the truthTest failed, don't run
if ($this->truthTest !== true) {
return false;
@@ -340,6 +391,11 @@ class Job
$args = is_string($this->args) ? explode(' ', $this->args) : $this->args;
$command = array_merge([$this->command], $args);
$process = new Process($command);
// Apply timeout if set (modern feature)
if ($this->timeout > 0) {
$process->setTimeout($this->timeout);
}
$this->process = $process;
@@ -563,4 +619,455 @@ class Job
return $this;
}
// Modern Job Methods
/**
* Set maximum retry attempts
*
* @param int $attempts
* @return self
*/
public function maxAttempts(int $attempts): self
{
$this->maxAttempts = $attempts;
return $this;
}
/**
* Get maximum retry attempts
*
* @return int
*/
public function getMaxAttempts(): int
{
return $this->maxAttempts;
}
/**
* Set retry delay
*
* @param int $seconds
* @param string $strategy 'linear' or 'exponential'
* @return self
*/
public function retryDelay(int $seconds, string $strategy = 'exponential'): self
{
$this->retryDelay = $seconds;
$this->retryStrategy = $strategy;
return $this;
}
/**
* Get current retry count
*
* @return int
*/
public function getRetryCount(): int
{
return $this->retryCount;
}
/**
* Set job timeout
*
* @param int $seconds
* @return self
*/
public function timeout(int $seconds): self
{
$this->timeout = $seconds;
return $this;
}
/**
* Set job priority
*
* @param string $priority 'high', 'normal', or 'low'
* @return self
*/
public function priority(string $priority): self
{
if (!in_array($priority, ['high', 'normal', 'low'])) {
throw new InvalidArgumentException('Priority must be high, normal, or low');
}
$this->priority = $priority;
return $this;
}
/**
* Get job priority
*
* @return string
*/
public function getPriority(): string
{
return $this->priority;
}
/**
* Add job dependency
*
* @param string $jobId
* @return self
*/
public function dependsOn(string $jobId): self
{
$this->dependencies[] = $jobId;
return $this;
}
/**
* Chain another job to run after this one
*
* @param Job $job
* @param bool $onlyOnSuccess Run only if current job succeeds
* @return self
*/
public function chain(Job $job, bool $onlyOnSuccess = true): self
{
$this->chainedJobs[] = [
'job' => $job,
'onlyOnSuccess' => $onlyOnSuccess,
];
return $this;
}
/**
* Add metadata to the job
*
* @param string $key
* @param mixed $value
* @return self
*/
public function withMetadata(string $key, $value): self
{
$this->metadata[$key] = $value;
return $this;
}
/**
* Add tags to the job
*
* @param array $tags
* @return self
*/
public function withTags(array $tags): self
{
$this->tags = array_merge($this->tags, $tags);
return $this;
}
/**
* Set success callback
*
* @param callable $callback
* @return self
*/
public function onSuccess(callable $callback): self
{
$this->onSuccess = $callback;
return $this;
}
/**
* Set failure callback
*
* @param callable $callback
* @return self
*/
public function onFailure(callable $callback): self
{
$this->onFailure = $callback;
return $this;
}
/**
* Set retry callback
*
* @param callable $callback
* @return self
*/
public function onRetry(callable $callback): self
{
$this->onRetry = $callback;
return $this;
}
/**
* Run the job with retry support
*
* @return bool
*/
public function runWithRetry(): bool
{
$attempts = 0;
$lastException = null;
while ($attempts < $this->maxAttempts) {
$attempts++;
$this->retryCount = $attempts - 1;
try {
// Record execution start time
$this->executionStartTime = microtime(true);
// Run the job
$result = $this->run();
// Record execution time
$this->executionDuration = microtime(true) - $this->executionStartTime;
if ($result && $this->isSuccessful()) {
// Call success callback
if ($this->onSuccess) {
call_user_func($this->onSuccess, $this);
}
// Run chained jobs
$this->runChainedJobs(true);
return true;
}
throw new RuntimeException('Job execution failed');
} catch (\Exception $e) {
$lastException = $e;
$this->output = $e->getMessage();
$this->successful = false;
if ($attempts < $this->maxAttempts) {
// Call retry callback
if ($this->onRetry) {
call_user_func($this->onRetry, $this, $attempts, $e);
}
// Calculate delay before retry
$delay = $this->calculateRetryDelay($attempts);
if ($delay > 0) {
sleep($delay);
}
} else {
// Final failure
if ($this->onFailure) {
call_user_func($this->onFailure, $this, $e);
}
// Run chained jobs that should run on failure
$this->runChainedJobs(false);
}
}
}
return false;
}
/**
* Get execution time in seconds
*
* @return float
*/
public function getExecutionTime(): float
{
return $this->executionDuration;
}
/**
* Get job metadata
*
* @param string|null $key
* @return mixed
*/
public function getMetadata(string $key = null)
{
if ($key === null) {
return $this->metadata;
}
return $this->metadata[$key] ?? null;
}
/**
* Get job tags
*
* @return array
*/
public function getTags(): array
{
return $this->tags;
}
/**
* Check if job has a specific tag
*
* @param string $tag
* @return bool
*/
public function hasTag(string $tag): bool
{
return in_array($tag, $this->tags);
}
/**
* Set queue ID
*
* @param string $queueId
* @return self
*/
public function setQueueId(string $queueId): self
{
$this->queueId = $queueId;
return $this;
}
/**
* Get queue ID
*
* @return string|null
*/
public function getQueueId(): ?string
{
return $this->queueId;
}
/**
* Get process (for background jobs)
*
* @return Process|null
*/
public function getProcess(): ?Process
{
return $this->process;
}
/**
* Calculate retry delay based on strategy
*
* @param int $attempt
* @return int
*/
protected function calculateRetryDelay(int $attempt): int
{
if ($this->retryStrategy === 'exponential') {
return min($this->retryDelay * pow(2, $attempt - 1), 3600); // Max 1 hour
}
return $this->retryDelay;
}
/**
* Check if dependencies are met
*
* @return bool
*/
protected function checkDependencies(): bool
{
if (empty($this->dependencies)) {
return true;
}
// This would need to check against job history or status
// For now, we'll assume dependencies are met
// In a real implementation, this would check the Scheduler's job status
return true;
}
/**
* Run chained jobs
*
* @param bool $success Whether the current job succeeded
* @return void
*/
protected function runChainedJobs(bool $success): void
{
foreach ($this->chainedJobs as $chainedJob) {
$shouldRun = !$chainedJob['onlyOnSuccess'] || $success;
if ($shouldRun) {
$job = $chainedJob['job'];
if (method_exists($job, 'runWithRetry')) {
$job->runWithRetry();
} else {
$job->run();
}
}
}
}
/**
* Convert job to array for serialization
*
* @return array
*/
public function toArray(): array
{
return [
'id' => $this->getId(),
'command' => is_string($this->command) ? $this->command : 'Closure',
'at' => $this->getAt(),
'enabled' => $this->getEnabled(),
'priority' => $this->priority,
'max_attempts' => $this->maxAttempts,
'retry_count' => $this->retryCount,
'retry_delay' => $this->retryDelay,
'retry_strategy' => $this->retryStrategy,
'timeout' => $this->timeout,
'dependencies' => $this->dependencies,
'metadata' => $this->metadata,
'tags' => $this->tags,
'execution_time' => $this->executionDuration,
'successful' => $this->successful,
'output' => $this->output,
];
}
/**
* Create job from array
*
* @param array $data
* @return self
*/
public static function fromArray(array $data): self
{
$job = new self($data['command'] ?? '', [], $data['id'] ?? null);
if (isset($data['at'])) {
$job->at($data['at']);
}
if (isset($data['priority'])) {
$job->priority($data['priority']);
}
if (isset($data['max_attempts'])) {
$job->maxAttempts($data['max_attempts']);
}
if (isset($data['retry_delay']) && isset($data['retry_strategy'])) {
$job->retryDelay($data['retry_delay'], $data['retry_strategy']);
}
if (isset($data['timeout'])) {
$job->timeout($data['timeout']);
}
if (isset($data['dependencies'])) {
foreach ($data['dependencies'] as $dep) {
$job->dependsOn($dep);
}
}
if (isset($data['metadata'])) {
foreach ($data['metadata'] as $key => $value) {
$job->withMetadata($key, $value);
}
}
if (isset($data['tags'])) {
$job->withTags($data['tags']);
}
return $job;
}
}

View File

@@ -0,0 +1,462 @@
<?php
/**
* @package Grav\Common\Scheduler
*
* @copyright Copyright (c) 2015 - 2025 Trilby Media, LLC. All rights reserved.
* @license MIT License; see LICENSE file for details.
*/
namespace Grav\Common\Scheduler;
use DateTime;
use RocketTheme\Toolbox\File\JsonFile;
/**
* Job History Manager
*
* Provides comprehensive job execution history, logging, and analytics
*
* @package Grav\Common\Scheduler
*/
class JobHistory
{
/** @var string */
protected $historyPath;
/** @var int */
protected $retentionDays = 30;
/** @var int */
protected $maxOutputLength = 5000;
/**
* Constructor
*
* @param string $historyPath
* @param int $retentionDays
*/
public function __construct(string $historyPath, int $retentionDays = 30)
{
$this->historyPath = $historyPath;
$this->retentionDays = $retentionDays;
// Ensure history directory exists
if (!is_dir($this->historyPath)) {
mkdir($this->historyPath, 0755, true);
}
}
/**
* Log job execution
*
* @param Job $job
* @param array $metadata Additional metadata to store
* @return string Log entry ID
*/
public function logExecution(Job $job, array $metadata = []): string
{
$entryId = uniqid($job->getId() . '_', true);
$timestamp = new DateTime();
$entry = [
'id' => $entryId,
'job_id' => $job->getId(),
'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure',
'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(),
'executed_at' => $timestamp->format('c'),
'timestamp' => $timestamp->getTimestamp(),
'success' => $job->isSuccessful(),
'output' => $this->captureOutput($job),
'execution_time' => method_exists($job, 'getExecutionTime') ? $job->getExecutionTime() : null,
'retry_count' => method_exists($job, 'getRetryCount') ? $job->getRetryCount() : 0,
'priority' => method_exists($job, 'getPriority') ? $job->getPriority() : 'normal',
'tags' => method_exists($job, 'getTags') ? $job->getTags() : [],
'metadata' => array_merge(
method_exists($job, 'getMetadata') ? $job->getMetadata() : [],
$metadata
),
];
// Store in daily file
$this->storeEntry($entry);
// Also store in job-specific history
$this->storeJobHistory($job->getId(), $entry);
return $entryId;
}
/**
* Capture job output with length limit
*
* @param Job $job
* @return array
*/
protected function captureOutput(Job $job): array
{
$output = $job->getOutput();
$truncated = false;
if (strlen($output) > $this->maxOutputLength) {
$output = substr($output, 0, $this->maxOutputLength);
$truncated = true;
}
return [
'content' => $output,
'truncated' => $truncated,
'length' => strlen($job->getOutput()),
];
}
/**
* Store entry in daily log file
*
* @param array $entry
* @return void
*/
protected function storeEntry(array $entry): void
{
$date = date('Y-m-d');
$filename = $this->historyPath . '/' . $date . '.json';
$jsonFile = JsonFile::instance($filename);
$entries = $jsonFile->content() ?: [];
$entries[] = $entry;
$jsonFile->save($entries);
}
/**
* Store job-specific history
*
* @param string $jobId
* @param array $entry
* @return void
*/
protected function storeJobHistory(string $jobId, array $entry): void
{
$jobDir = $this->historyPath . '/jobs';
if (!is_dir($jobDir)) {
mkdir($jobDir, 0755, true);
}
$filename = $jobDir . '/' . $jobId . '.json';
$jsonFile = JsonFile::instance($filename);
$history = $jsonFile->content() ?: [];
// Keep only last 100 executions per job
$history[] = $entry;
if (count($history) > 100) {
$history = array_slice($history, -100);
}
$jsonFile->save($history);
}
/**
* Get job history
*
* @param string $jobId
* @param int $limit
* @return array
*/
public function getJobHistory(string $jobId, int $limit = 50): array
{
$filename = $this->historyPath . '/jobs/' . $jobId . '.json';
if (!file_exists($filename)) {
return [];
}
$jsonFile = JsonFile::instance($filename);
$history = $jsonFile->content() ?: [];
// Return most recent first
$history = array_reverse($history);
if ($limit > 0) {
$history = array_slice($history, 0, $limit);
}
return $history;
}
/**
* Get history for a date range
*
* @param DateTime $startDate
* @param DateTime $endDate
* @param string|null $jobId Filter by job ID
* @return array
*/
public function getHistoryRange(DateTime $startDate, DateTime $endDate, ?string $jobId = null): array
{
$history = [];
$current = clone $startDate;
while ($current <= $endDate) {
$filename = $this->historyPath . '/' . $current->format('Y-m-d') . '.json';
if (file_exists($filename)) {
$jsonFile = JsonFile::instance($filename);
$entries = $jsonFile->content() ?: [];
foreach ($entries as $entry) {
if ($jobId === null || $entry['job_id'] === $jobId) {
$history[] = $entry;
}
}
}
$current->modify('+1 day');
}
return $history;
}
/**
* Get job statistics
*
* @param string $jobId
* @param int $days Number of days to analyze
* @return array
*/
public function getJobStatistics(string $jobId, int $days = 7): array
{
$startDate = new DateTime("-{$days} days");
$endDate = new DateTime('now');
$history = $this->getHistoryRange($startDate, $endDate, $jobId);
if (empty($history)) {
return [
'total_runs' => 0,
'successful_runs' => 0,
'failed_runs' => 0,
'success_rate' => 0,
'average_execution_time' => 0,
'last_run' => null,
'last_success' => null,
'last_failure' => null,
];
}
$totalRuns = count($history);
$successfulRuns = 0;
$executionTimes = [];
$lastRun = null;
$lastSuccess = null;
$lastFailure = null;
foreach ($history as $entry) {
if ($entry['success']) {
$successfulRuns++;
if (!$lastSuccess || $entry['timestamp'] > $lastSuccess['timestamp']) {
$lastSuccess = $entry;
}
} else {
if (!$lastFailure || $entry['timestamp'] > $lastFailure['timestamp']) {
$lastFailure = $entry;
}
}
if (!$lastRun || $entry['timestamp'] > $lastRun['timestamp']) {
$lastRun = $entry;
}
if (isset($entry['execution_time']) && $entry['execution_time'] > 0) {
$executionTimes[] = $entry['execution_time'];
}
}
return [
'total_runs' => $totalRuns,
'successful_runs' => $successfulRuns,
'failed_runs' => $totalRuns - $successfulRuns,
'success_rate' => $totalRuns > 0 ? round(($successfulRuns / $totalRuns) * 100, 2) : 0,
'average_execution_time' => !empty($executionTimes) ? round(array_sum($executionTimes) / count($executionTimes), 3) : 0,
'last_run' => $lastRun,
'last_success' => $lastSuccess,
'last_failure' => $lastFailure,
];
}
/**
* Get global statistics
*
* @param int $days
* @return array
*/
public function getGlobalStatistics(int $days = 7): array
{
$startDate = new DateTime("-{$days} days");
$endDate = new DateTime('now');
$history = $this->getHistoryRange($startDate, $endDate);
$jobStats = [];
foreach ($history as $entry) {
$jobId = $entry['job_id'];
if (!isset($jobStats[$jobId])) {
$jobStats[$jobId] = [
'runs' => 0,
'success' => 0,
'failed' => 0,
];
}
$jobStats[$jobId]['runs']++;
if ($entry['success']) {
$jobStats[$jobId]['success']++;
} else {
$jobStats[$jobId]['failed']++;
}
}
return [
'total_executions' => count($history),
'unique_jobs' => count($jobStats),
'job_statistics' => $jobStats,
'period_days' => $days,
'from_date' => $startDate->format('Y-m-d'),
'to_date' => $endDate->format('Y-m-d'),
];
}
/**
* Search history
*
* @param array $criteria
* @return array
*/
public function searchHistory(array $criteria): array
{
$results = [];
// Determine date range
$startDate = isset($criteria['start_date']) ? new DateTime($criteria['start_date']) : new DateTime('-7 days');
$endDate = isset($criteria['end_date']) ? new DateTime($criteria['end_date']) : new DateTime('now');
$history = $this->getHistoryRange($startDate, $endDate, $criteria['job_id'] ?? null);
foreach ($history as $entry) {
$match = true;
// Filter by success status
if (isset($criteria['success']) && $entry['success'] !== $criteria['success']) {
$match = false;
}
// Filter by output content
if (isset($criteria['output_contains']) &&
stripos($entry['output']['content'], $criteria['output_contains']) === false) {
$match = false;
}
// Filter by tags
if (isset($criteria['tags']) && is_array($criteria['tags'])) {
$entryTags = $entry['tags'] ?? [];
if (empty(array_intersect($criteria['tags'], $entryTags))) {
$match = false;
}
}
if ($match) {
$results[] = $entry;
}
}
// Sort results
if (isset($criteria['sort_by'])) {
usort($results, function($a, $b) use ($criteria) {
$field = $criteria['sort_by'];
$order = $criteria['sort_order'] ?? 'desc';
$aVal = $a[$field] ?? 0;
$bVal = $b[$field] ?? 0;
if ($order === 'asc') {
return $aVal <=> $bVal;
} else {
return $bVal <=> $aVal;
}
});
}
// Limit results
if (isset($criteria['limit'])) {
$results = array_slice($results, 0, $criteria['limit']);
}
return $results;
}
/**
* Clean old history files
*
* @return int Number of files deleted
*/
public function cleanOldHistory(): int
{
$deleted = 0;
$cutoffDate = new DateTime("-{$this->retentionDays} days");
$files = glob($this->historyPath . '/*.json');
foreach ($files as $file) {
$filename = basename($file, '.json');
// Check if filename is a date
if (preg_match('/^\d{4}-\d{2}-\d{2}$/', $filename)) {
$fileDate = new DateTime($filename);
if ($fileDate < $cutoffDate) {
unlink($file);
$deleted++;
}
}
}
return $deleted;
}
/**
* Export history to CSV
*
* @param array $history
* @param string $filename
* @return bool
*/
public function exportToCsv(array $history, string $filename): bool
{
$handle = fopen($filename, 'w');
if (!$handle) {
return false;
}
// Write headers
fputcsv($handle, [
'Job ID',
'Executed At',
'Success',
'Execution Time',
'Output Length',
'Retry Count',
'Priority',
'Tags',
]);
// Write data
foreach ($history as $entry) {
fputcsv($handle, [
$entry['job_id'],
$entry['executed_at'],
$entry['success'] ? 'Yes' : 'No',
$entry['execution_time'] ?? '',
$entry['output']['length'] ?? 0,
$entry['retry_count'] ?? 0,
$entry['priority'] ?? 'normal',
implode(', ', $entry['tags'] ?? []),
]);
}
fclose($handle);
return true;
}
}

View File

@@ -81,20 +81,18 @@ class JobQueue
'id' => $queueId,
'job_id' => $job->getId(),
'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure',
'arguments' => $job->getArguments(),
'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(),
'priority' => $priority,
'timestamp' => $timestamp,
'attempts' => 0,
'max_attempts' => $job instanceof ModernJob ? $job->getMaxAttempts() : 1,
'max_attempts' => method_exists($job, 'getMaxAttempts') ? $job->getMaxAttempts() : 1,
'created_at' => date('c'),
'scheduled_for' => null,
'metadata' => [],
];
// Serialize the job if it's a closure
if (!is_string($job->getCommand())) {
$queueItem['serialized_job'] = base64_encode(serialize($job));
}
// Always serialize the job to preserve its full state
$queueItem['serialized_job'] = base64_encode(serialize($job));
$this->writeQueueItem($queueItem, 'pending');
@@ -130,7 +128,9 @@ class JobQueue
*/
public function pop(): ?Job
{
$this->lock();
if (!$this->lock()) {
return null;
}
try {
// Get all pending items
@@ -188,6 +188,78 @@ class JobQueue
}
}
/**
* Pop a job from the queue with its queue ID
*
* @return array|null Array with 'job' and 'id' keys
*/
public function popWithId(): ?array
{
if (!$this->lock()) {
return null;
}
try {
// Get all pending items
$items = $this->getPendingItems();
if (empty($items)) {
$this->unlock();
return null;
}
// Sort by priority and timestamp
usort($items, function($a, $b) {
$priorityOrder = [
self::PRIORITY_HIGH => 0,
self::PRIORITY_NORMAL => 1,
self::PRIORITY_LOW => 2,
];
$aPriority = $priorityOrder[$a['priority']] ?? 1;
$bPriority = $priorityOrder[$b['priority']] ?? 1;
if ($aPriority !== $bPriority) {
return $aPriority - $bPriority;
}
return $a['timestamp'] <=> $b['timestamp'];
});
// Get the first item that's ready to run
$now = new \DateTime();
foreach ($items as $item) {
if ($item['scheduled_for']) {
$scheduledTime = new \DateTime($item['scheduled_for']);
if ($scheduledTime > $now) {
continue; // Skip items not yet due
}
}
// Reconstruct the job first before moving it
$job = $this->reconstructJob($item);
if (!$job) {
// Failed to reconstruct, skip this item
continue;
}
// Move to processing only if we can reconstruct the job
$this->moveQueueItem($item['id'], 'pending', 'processing');
$this->unlock();
return ['job' => $job, 'id' => $item['id']];
}
$this->unlock();
return null;
} catch (\Exception $e) {
$this->unlock();
throw $e;
}
}
/**
* Mark a job as completed
*
@@ -470,21 +542,36 @@ class JobQueue
/**
* Acquire lock for queue operations
*
* @return void
* @return bool
*/
protected function lock(): void
protected function lock(): bool
{
$attempts = 0;
while (file_exists($this->lockFile) && $attempts < 10) {
usleep(100000); // 100ms
$maxAttempts = 50; // 5 seconds total
while ($attempts < $maxAttempts) {
// Check if lock file exists and is stale (older than 30 seconds)
if (file_exists($this->lockFile)) {
$lockAge = time() - filemtime($this->lockFile);
if ($lockAge > 30) {
// Stale lock, remove it
@unlink($this->lockFile);
}
}
// Try to acquire lock atomically
$handle = @fopen($this->lockFile, 'x');
if ($handle !== false) {
fclose($handle);
return true;
}
$attempts++;
usleep(100000); // 100ms
}
if ($attempts >= 10) {
throw new RuntimeException('Could not acquire queue lock');
}
touch($this->lockFile);
// Could not acquire lock
return false;
}
/**

View File

@@ -79,6 +79,51 @@ class ModernScheduler extends Scheduler
return $this->modernConfig['enabled'] ?? false;
}
/**
* Override to create ModernJob instances
*
* @param callable $fn
* @param array $args
* @param string|null $id
* @return ModernJob
*/
public function addFunction(callable $fn, $args = [], $id = null): Job
{
$job = new ModernJob($fn, $args, $id);
$this->queueJob($job->configure($this->config));
return $job;
}
/**
* Override to create ModernJob instances
*
* @param string $command
* @param array $args
* @param string|null $id
* @return ModernJob
*/
public function addCommand($command, $args = [], $id = null): Job
{
$job = new ModernJob($command, $args, $id);
$this->queueJob($job->configure($this->config));
return $job;
}
/**
* Override to create ModernJob instances
*
* @param string $command
* @param array $args
* @param string|null $id
* @return ModernJob
*/
public function raw($command, $args = [], $id = null): Job
{
return $this->addCommand($command, $args, $id);
}
/**
* Initialize modern scheduler features
*
@@ -178,13 +223,13 @@ class ModernScheduler extends Scheduler
*/
protected function processQueuedJobs(): void
{
$maxSize = $this->modernConfig['queue']['max_size'] ?? 1000;
// Process existing queued jobs from previous runs
while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) {
$job = $this->jobQueue->pop();
if ($job) {
$this->executeJob($job);
$this->jobs_run[] = $job;
}
}
}
@@ -196,7 +241,28 @@ class ModernScheduler extends Scheduler
*/
protected function processJobsWithWorkers(): void
{
// Wait for all workers to complete
// Process all queued jobs
while (!$this->jobQueue->isEmpty()) {
// Wait if we've reached max workers
while (count($this->workers) >= $this->maxWorkers) {
foreach ($this->workers as $workerId => $process) {
if ($process instanceof Process && !$process->isRunning()) {
unset($this->workers[$workerId]);
}
}
if (count($this->workers) >= $this->maxWorkers) {
usleep(100000); // Wait 100ms
}
}
// Get next job from queue
$job = $this->jobQueue->pop();
if ($job) {
$this->executeJob($job);
}
}
// Wait for all remaining workers to complete
foreach ($this->workers as $workerId => $process) {
if ($process instanceof Process) {
$process->wait();

View File

@@ -17,6 +17,7 @@ use InvalidArgumentException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use RocketTheme\Toolbox\File\YamlFile;
use Symfony\Component\Yaml\Yaml;
use function is_callable;
use function is_string;
@@ -49,19 +50,55 @@ class Scheduler
/** @var string */
private $status_path;
// Modern features (backward compatible - disabled by default)
/** @var JobQueue|null */
protected $jobQueue = null;
/** @var array */
protected $workers = [];
/** @var int */
protected $maxWorkers = 1;
/** @var bool */
protected $webhookEnabled = false;
/** @var string|null */
protected $webhookToken = null;
/** @var bool */
protected $healthEnabled = true;
/** @var string */
protected $queuePath;
/** @var string */
protected $historyPath;
/** @var array */
protected $modernConfig = [];
/**
* Create new instance.
*/
public function __construct()
{
$config = Grav::instance()['config']->get('scheduler.defaults', []);
$grav = Grav::instance();
$config = $grav['config']->get('scheduler.defaults', []);
$this->config = $config;
$this->status_path = Grav::instance()['locator']->findResource('user-data://scheduler', true, true);
$locator = $grav['locator'];
$this->status_path = $locator->findResource('user-data://scheduler', true, true);
if (!file_exists($this->status_path)) {
Folder::create($this->status_path);
}
// Initialize modern features if enabled
$this->modernConfig = $grav['config']->get('scheduler.modern', []);
if ($this->modernConfig['enabled'] ?? false) {
$this->initializeModernFeatures($locator);
}
}
/**
@@ -121,6 +158,16 @@ class Scheduler
return [$background, $foreground];
}
/**
* Get the job queue
*
* @return JobQueue|null
*/
public function getJobQueue(): ?JobQueue
{
return $this->jobQueue;
}
/**
* Get all jobs if they are disabled or not as one array
*
@@ -199,22 +246,44 @@ class Scheduler
$runTime = new DateTime('now');
}
// Star processing jobs
foreach ($alljobs as $job) {
if ($job->isDue($runTime) || $force) {
$job->run();
$this->jobs_run[] = $job;
// Process jobs based on modern features
if ($this->jobQueue && ($this->modernConfig['queue']['enabled'] ?? false)) {
// Queue jobs for processing
foreach ($alljobs as $job) {
if ($job->isDue($runTime) || $force) {
// Add to queue for concurrent processing
$this->jobQueue->push($job);
}
}
// Process queue with workers
$this->processJobsWithWorkers();
// When using queue, states are saved by executeJob when jobs complete
// Don't save states here as jobs may still be processing
} else {
// Legacy processing (one at a time)
foreach ($alljobs as $job) {
if ($job->isDue($runTime) || $force) {
$job->run();
$this->jobs_run[] = $job;
}
}
// Finish handling any background jobs
foreach ($background as $job) {
$job->finalize();
}
// Store states for legacy mode
$this->saveJobStates();
// Save history if enabled
if (($this->modernConfig['history']['enabled'] ?? false) && $this->historyPath) {
$this->saveJobHistory();
}
}
// Finish handling any background jobs
foreach ($background as $job) {
$job->finalize();
}
// Store states
$this->saveJobStates();
// Store run date
file_put_contents("logs/lastcron.run", (new DateTime("now"))->format("Y-m-d H:i:s"), LOCK_EX);
}
@@ -378,6 +447,54 @@ class Scheduler
}
/**
* Initialize modern features
*
* @param mixed $locator
* @return void
*/
protected function initializeModernFeatures($locator): void
{
// Set up paths
$this->queuePath = $this->modernConfig['queue']['path'] ?? 'user-data://scheduler/queue';
$this->queuePath = $locator->findResource($this->queuePath, true, true);
$this->historyPath = $this->modernConfig['history']['path'] ?? 'user-data://scheduler/history';
$this->historyPath = $locator->findResource($this->historyPath, true, true);
// Create directories if they don't exist
if (!file_exists($this->queuePath)) {
Folder::create($this->queuePath);
}
if (!file_exists($this->historyPath)) {
Folder::create($this->historyPath);
}
// Initialize job queue
$this->jobQueue = new JobQueue($this->queuePath);
// Configure workers
$this->maxWorkers = $this->modernConfig['workers'] ?? 1;
// Configure webhook
$this->webhookEnabled = $this->modernConfig['webhook']['enabled'] ?? false;
$this->webhookToken = $this->modernConfig['webhook']['token'] ?? null;
// Configure health check
$this->healthEnabled = $this->modernConfig['health']['enabled'] ?? true;
}
/**
* Get the job queue
*
* @return JobQueue|null
*/
public function getQueue(): ?JobQueue
{
return $this->jobQueue;
}
/**
* Check if webhook is enabled
*
@@ -385,9 +502,7 @@ class Scheduler
*/
public function isWebhookEnabled(): bool
{
// Check config for webhook settings even in base scheduler
$config = Grav::instance()['config'];
return $config->get('scheduler.modern.webhook.enabled', false);
return $this->webhookEnabled;
}
/**
@@ -478,4 +593,335 @@ class Scheduler
return $job;
}
/**
* Process jobs using multiple workers
*
* @return void
*/
protected function processJobsWithWorkers(): void
{
if (!$this->jobQueue) {
return;
}
// Process all queued jobs
while (!$this->jobQueue->isEmpty()) {
// Wait if we've reached max workers
while (count($this->workers) >= $this->maxWorkers) {
foreach ($this->workers as $workerId => $worker) {
$process = null;
if (is_array($worker) && isset($worker['process'])) {
$process = $worker['process'];
} elseif ($worker instanceof Process) {
$process = $worker;
}
if ($process instanceof Process && !$process->isRunning()) {
// Finalize job if needed
if (is_array($worker) && isset($worker['job'])) {
$worker['job']->finalize();
// Save job state
$this->saveJobState($worker['job']);
// Update queue status
if (isset($worker['queueId']) && $this->jobQueue) {
if ($worker['job']->isSuccessful()) {
$this->jobQueue->complete($worker['queueId']);
} else {
$this->jobQueue->fail($worker['queueId'], $worker['job']->getOutput() ?: 'Job failed');
}
}
}
unset($this->workers[$workerId]);
}
}
if (count($this->workers) >= $this->maxWorkers) {
usleep(100000); // Wait 100ms
}
}
// Get next job from queue
$queueItem = $this->jobQueue->popWithId();
if ($queueItem) {
$this->executeJob($queueItem['job'], $queueItem['id']);
}
}
// Wait for all remaining workers to complete
foreach ($this->workers as $workerId => $worker) {
if (is_array($worker) && isset($worker['process'])) {
$process = $worker['process'];
if ($process instanceof Process) {
$process->wait();
// Finalize and save state for background jobs
if (isset($worker['job'])) {
$worker['job']->finalize();
$this->saveJobState($worker['job']);
}
// Update queue status for background jobs
if (isset($worker['queueId']) && $this->jobQueue) {
$job = $worker['job'];
if ($job->isSuccessful()) {
$this->jobQueue->complete($worker['queueId']);
} else {
$this->jobQueue->fail($worker['queueId'], $job->getOutput() ?: 'Job execution failed');
}
}
unset($this->workers[$workerId]);
}
} elseif ($worker instanceof Process) {
// Legacy format
$worker->wait();
unset($this->workers[$workerId]);
}
}
}
/**
* Process existing queued jobs
*
* @return void
*/
protected function processQueuedJobs(): void
{
if (!$this->jobQueue) {
return;
}
// Process any existing queued jobs from previous runs
while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) {
$job = $this->jobQueue->pop();
if ($job) {
$this->executeJob($job);
}
}
}
/**
* Execute a job
*
* @param Job $job
* @param string|null $queueId Queue ID if job came from queue
* @return void
*/
protected function executeJob(Job $job, ?string $queueId = null): void
{
$job->run();
$this->jobs_run[] = $job;
// Save job state after execution
$this->saveJobState($job);
// Check if job runs in background
if ($job->runInBackground()) {
// Background job - track it for later completion
$process = $job->getProcess();
if ($process && $process->isStarted()) {
$this->workers[] = [
'process' => $process,
'job' => $job,
'queueId' => $queueId
];
// Don't update queue status yet - will be done when process completes
return;
}
}
// Foreground job or background job that didn't start - update queue status immediately
if ($queueId && $this->jobQueue) {
// Job has already been finalized if it ran in foreground
if (!$job->runInBackground()) {
$job->finalize();
}
if ($job->isSuccessful()) {
// Move from processing to completed
$this->jobQueue->complete($queueId);
} else {
// Move from processing to failed
$this->jobQueue->fail($queueId, $job->getOutput() ?: 'Job execution failed');
}
}
}
/**
* Save state for a single job
*
* @param Job $job
* @return void
*/
protected function saveJobState(Job $job): void
{
$grav = Grav::instance();
$locator = $grav['locator'];
$statusFile = $locator->findResource('user-data://scheduler/status.yaml', true, true);
$status = [];
if (file_exists($statusFile)) {
$status = Yaml::parseFile($statusFile) ?: [];
}
// Update job status
$status[$job->getId()] = [
'state' => $job->isSuccessful() ? 'success' : 'failure',
'last-run' => time(),
];
// Add error if job failed
if (!$job->isSuccessful()) {
$output = $job->getOutput();
if ($output) {
$status[$job->getId()]['error'] = $output;
} else {
$status[$job->getId()]['error'] = null;
}
}
file_put_contents($statusFile, Yaml::dump($status));
}
/**
* Save job execution history
*
* @return void
*/
protected function saveJobHistory(): void
{
if (!$this->historyPath) {
return;
}
$history = [];
foreach ($this->jobs_run as $job) {
$history[] = [
'id' => $job->getId(),
'executed_at' => date('c'),
'success' => $job->isSuccessful(),
'output' => substr($job->getOutput(), 0, 1000),
];
}
if (!empty($history)) {
$filename = $this->historyPath . '/' . date('Y-m-d') . '.json';
$existing = file_exists($filename) ? json_decode(file_get_contents($filename), true) : [];
$existing = array_merge($existing, $history);
file_put_contents($filename, json_encode($existing, JSON_PRETTY_PRINT));
}
}
/**
* Update last run timestamp
*
* @return void
*/
protected function updateLastRun(): void
{
$lastRunFile = $this->status_path . '/last_run.txt';
file_put_contents($lastRunFile, date('Y-m-d H:i:s'));
}
/**
* Get health status
*
* @return array
*/
public function getHealthStatus(): array
{
$lastRunFile = $this->status_path . '/last_run.txt';
$lastRun = file_exists($lastRunFile) ? file_get_contents($lastRunFile) : null;
$health = [
'status' => 'healthy',
'last_run' => $lastRun,
'last_run_age' => null,
'queue_size' => 0,
'failed_jobs_24h' => 0,
'scheduled_jobs' => count($this->getAllJobs()),
'modern_features' => $this->modernConfig['enabled'] ?? false,
'webhook_enabled' => $this->webhookEnabled,
'health_check_enabled' => $this->healthEnabled,
'timestamp' => date('c'),
];
// Calculate last run age
if ($lastRun) {
$lastRunTime = new DateTime($lastRun);
$now = new DateTime('now');
$health['last_run_age'] = $now->getTimestamp() - $lastRunTime->getTimestamp();
// Determine status based on age
if ($health['last_run_age'] < 600) { // Less than 10 minutes
$health['status'] = 'healthy';
} elseif ($health['last_run_age'] < 3600) { // Less than 1 hour
$health['status'] = 'warning';
} else {
$health['status'] = 'critical';
}
} else {
$health['status'] = 'unknown';
}
// Add queue stats if available
if ($this->jobQueue) {
$stats = $this->jobQueue->getStatistics();
$health['queue_size'] = $stats['pending'] ?? 0;
$health['failed_jobs_24h'] = $stats['failed'] ?? 0;
}
return $health;
}
/**
* Process webhook trigger
*
* @param string|null $token
* @param string|null $jobId
* @return array
*/
public function processWebhookTrigger($token = null, $jobId = null): array
{
if (!$this->webhookEnabled) {
return ['success' => false, 'message' => 'Webhook triggers are not enabled'];
}
if ($this->webhookToken && $token !== $this->webhookToken) {
return ['success' => false, 'message' => 'Invalid webhook token'];
}
if ($jobId) {
// Force run specific job
$job = $this->getJob($jobId);
if ($job) {
$job->inForeground()->run();
$this->jobs_run[] = $job;
$this->saveJobStates();
$this->updateLastRun();
return [
'success' => $job->isSuccessful(),
'message' => $job->isSuccessful() ? 'Job force-executed successfully' : 'Job execution failed',
'job_id' => $jobId,
'forced' => true,
'output' => $job->getOutput(),
];
} else {
return ['success' => false, 'message' => 'Job not found: ' . $jobId];
}
} else {
// Run all due jobs
$this->run();
return [
'success' => true,
'message' => 'Scheduler executed (due jobs only)',
'jobs_run' => count($this->jobs_run),
'timestamp' => date('c'),
];
}
}
}

View File

@@ -10,7 +10,8 @@
namespace Grav\Common\Service;
use Grav\Common\Scheduler\Scheduler;
use Grav\Common\Scheduler\ModernScheduler;
use Grav\Common\Scheduler\JobQueue;
use Grav\Common\Scheduler\JobWorker;
use Pimple\Container;
use Pimple\ServiceProviderInterface;
@@ -28,15 +29,36 @@ class SchedulerServiceProvider implements ServiceProviderInterface
{
$container['scheduler'] = function ($c) {
$config = $c['config'];
$scheduler = new Scheduler();
// Use ModernScheduler if modern features are enabled
$modernEnabled = $config->get('scheduler.modern.enabled', false);
if ($modernEnabled) {
return new ModernScheduler();
// Configure modern features if enabled
$modernConfig = $config->get('scheduler.modern', []);
if ($modernConfig['enabled'] ?? false) {
// Initialize components
$queuePath = $c['locator']->findResource('user-data://scheduler/queue', true, true);
$statusPath = $c['locator']->findResource('user-data://scheduler/status.yaml', true, true);
// Set modern configuration on scheduler
$scheduler->setModernConfig($modernConfig);
// Initialize job queue if enabled
if ($modernConfig['queue']['enabled'] ?? false) {
$jobQueue = new JobQueue($queuePath);
$scheduler->setJobQueue($jobQueue);
}
// Initialize workers if enabled
if ($modernConfig['workers']['enabled'] ?? false) {
$workerCount = $modernConfig['workers']['count'] ?? 2;
$workers = [];
for ($i = 0; $i < $workerCount; $i++) {
$workers[] = new JobWorker("worker-{$i}");
}
$scheduler->setWorkers($workers);
}
}
return new Scheduler();
return $scheduler;
};
}
}