diff --git a/composer.json b/composer.json index a613f6c40..b235f3dd4 100644 --- a/composer.json +++ b/composer.json @@ -55,7 +55,7 @@ "league/climate": "^3.6", "miljar/php-exif": "^0.6", "composer/ca-bundle": "^1.2", - "dragonmantank/cron-expression": "^1.2", + "dragonmantank/cron-expression": "^3.3", "willdurand/negotiation": "^3.0", "itsgoingd/clockwork": "^5.0", "symfony/http-client": "^4.4", diff --git a/system/blueprints/config/scheduler.yaml b/system/blueprints/config/scheduler.yaml index a8dce314b..7b3f4e0bb 100644 --- a/system/blueprints/config/scheduler.yaml +++ b/system/blueprints/config/scheduler.yaml @@ -4,74 +4,850 @@ form: validation: loose fields: + scheduler_tabs: + type: tabs + active: 1 - status_title: - type: section - title: PLUGIN_ADMIN.SCHEDULER_STATUS - underline: true + fields: + status_tab: + type: tab + title: PLUGIN_ADMIN.SCHEDULER_STATUS - status: - type: cronstatus - validate: - type: commalist + fields: + status_title: + type: section + title: PLUGIN_ADMIN.SCHEDULER_STATUS + underline: true - jobs_title: - type: section - title: PLUGIN_ADMIN.SCHEDULER_JOBS - underline: true + status: + type: cronstatus + validate: + type: commalist + + webhook_status_override: + type: display + label: + content: | + + markdown: false + + status_enhanced: + type: display + label: + content: | + - custom_jobs: - type: list - style: vertical - label: - classes: cron-job-list compact - key: id - fields: - .id: - type: key - label: ID - placeholder: 'process-name' - validate: - required: true - pattern: '[a-zа-я0-9_\-]+' - max: 20 - message: 'ID must be lowercase with dashes/underscores only and less than 20 characters' - .command: - type: text - label: PLUGIN_ADMIN.COMMAND - placeholder: 'ls' - validate: - required: true - .args: - type: text - label: PLUGIN_ADMIN.EXTRA_ARGUMENTS - placeholder: '-lah' - .at: - type: text - wrapper_classes: cron-selector - label: PLUGIN_ADMIN.SCHEDULER_RUNAT - help: PLUGIN_ADMIN.SCHEDULER_RUNAT_HELP - placeholder: '* * * * *' - validate: - required: true - .output: - type: text - label: PLUGIN_ADMIN.SCHEDULER_OUTPUT - help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_HELP - placeholder: 'logs/ls-cron.out' - .output_mode: - type: select - label: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE - help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE_HELP - default: append - options: - append: Append - overwrite: Overwrite - .email: - type: text - label: PLUGIN_ADMIN.SCHEDULER_EMAIL - help: PLUGIN_ADMIN.SCHEDULER_EMAIL_HELP - placeholder: 'notifications@yoursite.com' + modern_health: + type: display + label: Health Status + content: | +
+
Checking health...
+
+ + markdown: false + + cron_setup: + type: display + label: Cron Setup Commands + content: | + +
+ + +
+ +
+ +
Copy
+
+
+ +
+ +
+ +
Copy
+
+
+ +
+ Note: These commands will run the scheduler every minute. Adjust the path if needed before copying. +
+
+ markdown: false + + trigger_methods: + type: display + label: Active Triggers + content: | +
+
Checking triggers...
+
+ + markdown: false + + jobs_tab: + type: tab + title: PLUGIN_ADMIN.SCHEDULER_JOBS + + fields: + jobs_title: + type: section + title: PLUGIN_ADMIN.SCHEDULER_JOBS + underline: true + + custom_jobs: + type: list + style: vertical + label: + classes: cron-job-list compact + key: id + fields: + .id: + type: key + label: ID + placeholder: 'process-name' + validate: + required: true + pattern: '[a-zа-я0-9_\-]+' + max: 20 + message: 'ID must be lowercase with dashes/underscores only and less than 20 characters' + .command: + type: text + label: PLUGIN_ADMIN.COMMAND + placeholder: 'ls' + validate: + required: true + .args: + type: text + label: PLUGIN_ADMIN.EXTRA_ARGUMENTS + placeholder: '-lah' + .at: + type: text + wrapper_classes: cron-selector + label: PLUGIN_ADMIN.SCHEDULER_RUNAT + help: PLUGIN_ADMIN.SCHEDULER_RUNAT_HELP + placeholder: '* * * * *' + validate: + required: true + .output: + type: text + label: PLUGIN_ADMIN.SCHEDULER_OUTPUT + help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_HELP + placeholder: 'logs/ls-cron.out' + .output_mode: + type: select + label: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE + help: PLUGIN_ADMIN.SCHEDULER_OUTPUT_TYPE_HELP + default: append + options: + append: Append + overwrite: Overwrite + .email: + type: text + label: PLUGIN_ADMIN.SCHEDULER_EMAIL + help: PLUGIN_ADMIN.SCHEDULER_EMAIL_HELP + placeholder: 'notifications@yoursite.com' + + modern_tab: + type: tab + title: Advanced Features + + fields: + workers_section: + type: section + title: Worker Configuration + underline: true + + fields: + modern.workers: + type: number + label: Concurrent Workers + help: Number of jobs that can run simultaneously (1 = sequential) + default: 4 + size: x-small + append: workers + validate: + type: int + min: 1 + max: 10 + + retry_section: + type: section + title: Retry Configuration + underline: true + + fields: + modern.retry.enabled: + type: toggle + label: Enable Job Retry + help: Automatically retry failed jobs + highlight: 1 + default: 1 + options: + 1: PLUGIN_ADMIN.ENABLED + 0: PLUGIN_ADMIN.DISABLED + validate: + type: bool + + modern.retry.max_attempts: + type: number + label: Maximum Retry Attempts + help: Maximum number of times to retry a failed job + default: 3 + size: x-small + append: retries + validate: + type: int + min: 1 + max: 10 + + modern.retry.backoff: + type: select + label: Retry Backoff Strategy + help: How to calculate delay between retries + default: exponential + options: + linear: Linear (fixed delay) + exponential: Exponential (increasing delay) + + queue_section: + type: section + title: Queue Configuration + underline: true + + fields: + modern.queue.path: + type: text + label: Queue Storage Path + help: Where to store queued jobs + default: 'user-data://scheduler/queue' + placeholder: 'user-data://scheduler/queue' + + modern.queue.max_size: + type: number + label: Maximum Queue Size + help: Maximum number of jobs that can be queued + default: 1000 + size: x-small + append: jobs + validate: + type: int + min: 100 + max: 10000 + + history_section: + type: section + title: Job History + underline: true + + fields: + modern.history.enabled: + type: toggle + label: Enable Job History + help: Track execution history for all jobs + highlight: 1 + default: 1 + options: + 1: PLUGIN_ADMIN.ENABLED + 0: PLUGIN_ADMIN.DISABLED + validate: + type: bool + + modern.history.retention_days: + type: number + label: History Retention (days) + help: How long to keep job history + default: 30 + size: x-small + append: days + validate: + type: int + min: 1 + max: 365 + + webhook_section: + type: section + title: Webhook Configuration + underline: true + + fields: + webhook_plugin_status: + type: webhook-status + label: + modern.webhook.enabled: + type: toggle + label: Enable Webhook Triggers + help: Allow triggering scheduler via HTTP webhook + highlight: 0 + default: 0 + options: + 1: PLUGIN_ADMIN.ENABLED + 0: PLUGIN_ADMIN.DISABLED + validate: + type: bool + + modern.webhook.token: + type: text + label: Webhook Security Token + help: Secret token for authenticating webhook requests. Keep this secret! + placeholder: 'Click Generate to create a secure token' + autocomplete: 'off' + + webhook_token_generate: + type: display + label: + content: | +
+ +
+ + markdown: false + + modern.webhook.path: + type: text + label: Webhook Path + help: URL path for webhook endpoint + default: '/scheduler/webhook' + placeholder: '/scheduler/webhook' + + health_section: + type: section + title: Health Check Configuration + underline: true + + fields: + modern.health.enabled: + type: toggle + label: Enable Health Check + help: Provide health status endpoint for monitoring + highlight: 1 + default: 1 + options: + 1: PLUGIN_ADMIN.ENABLED + 0: PLUGIN_ADMIN.DISABLED + validate: + type: bool + + modern.health.path: + type: text + label: Health Check Path + help: URL path for health check endpoint + default: '/scheduler/health' + placeholder: '/scheduler/health' + + webhook_usage: + type: section + title: Usage Examples + underline: true + + fields: + webhook_examples: + type: display + label: + content: | + +
+ + +
+

How to use webhooks:

+ +
+ +
+ +
Copy
+
+
+ +
+ +
+ +
Copy
+
+
+ +
+ +
+ +
Copy
+
+
+ +
+

GitHub Actions example:

+
- name: Trigger Scheduler
+                                                  run: |
+                                                    curl -X POST ${{ secrets.SITE_URL }}/scheduler/webhook \
+                                                      -H "Authorization: Bearer ${{ secrets.WEBHOOK_TOKEN }}"
+
+
+
+ markdown: false diff --git a/system/blueprints/config/system.yaml b/system/blueprints/config/system.yaml index ac44f27b3..70ee08879 100644 --- a/system/blueprints/config/system.yaml +++ b/system/blueprints/config/system.yaml @@ -633,6 +633,19 @@ form: help: PLUGIN_ADMIN.CACHE_PREFIX_HELP placeholder: PLUGIN_ADMIN.CACHE_PREFIX_PLACEHOLDER + cache.purge_max_age_days: + type: text + size: x-small + append: GRAV.NICETIME.DAY_PLURAL + label: PLUGIN_ADMIN.CACHE_PURGE_AGE + help: PLUGIN_ADMIN.CACHE_PURGE_AGE_HELP + validate: + type: number + min: 1 + max: 365 + step: 1 + default: 30 + cache.purge_at: type: cron label: PLUGIN_ADMIN.CACHE_PURGE_JOB diff --git a/system/config/scheduler.yaml b/system/config/scheduler.yaml new file mode 100644 index 000000000..8868532d2 --- /dev/null +++ b/system/config/scheduler.yaml @@ -0,0 +1,68 @@ +# Grav Scheduler Configuration + +# Default scheduler settings (backward compatible) +defaults: + output: true + output_type: file + email: null + +# Status of individual jobs (enabled/disabled) +status: {} + +# Custom scheduled jobs +custom_jobs: {} + +# Modern scheduler features (disabled by default for backward compatibility) +modern: + # Enable modern scheduler features + enabled: false + + # Number of concurrent workers (1 = sequential execution like legacy) + workers: 1 + + # Job retry configuration + retry: + enabled: true + max_attempts: 3 + backoff: exponential # 'linear' or 'exponential' + + # Job queue configuration + queue: + path: user-data://scheduler/queue + max_size: 1000 + + # Webhook trigger configuration + webhook: + enabled: false + token: null # Set a secure token to enable webhook triggers + path: /scheduler/webhook + + # Health check endpoint + health: + enabled: true + path: /scheduler/health + + # Job execution history + history: + enabled: true + retention_days: 30 + path: user-data://scheduler/history + + # Performance settings + performance: + job_timeout: 300 # Default timeout in seconds + lock_timeout: 10 # Lock acquisition timeout in seconds + + # Monitoring and alerts + monitoring: + enabled: false + alert_on_failure: true + alert_email: null + webhook_url: null + + # Trigger detection methods + triggers: + check_cron: true + check_systemd: true + check_webhook: true + check_external: true \ No newline at end of file diff --git a/system/config/system.yaml b/system/config/system.yaml index 30eabbfbe..984f46771 100644 --- a/system/config/system.yaml +++ b/system/config/system.yaml @@ -101,6 +101,7 @@ cache: clear_images_by_default: false # By default grav does not include processed images in cache clear, this can be enabled cli_compatibility: false # Ensures only non-volatile drivers are used (file, redis, memcache, etc.) lifetime: 604800 # Lifetime of cached data in seconds (0 = infinite) + purge_max_age_days: 30 # Maximum age of cache items in days before they are purged gzip: false # GZip compress the page output allow_webserver_gzip: false # If true, `content-encoding: identity` but connection isn't closed before `onShutDown()` event redis: diff --git a/system/src/Grav/Common/Cache.php b/system/src/Grav/Common/Cache.php index 6927c401c..c7afcbe55 100644 --- a/system/src/Grav/Common/Cache.php +++ b/system/src/Grav/Common/Cache.php @@ -170,24 +170,75 @@ class Cache extends Getters } /** - * Deletes the old out of date file-based caches + * Deletes old cache files based on age * * @return int */ public function purgeOldCache() { + // Get the max age for cache files from config (default 30 days) + $max_age_days = $this->config->get('system.cache.purge_max_age_days', 30); + $max_age_seconds = $max_age_days * 86400; // Convert days to seconds + $now = time(); + $count = 0; + + // First, clean up old orphaned cache directories (not the current one) $cache_dir = dirname($this->cache_dir); $current = Utils::basename($this->cache_dir); - $count = 0; - + foreach (new DirectoryIterator($cache_dir) as $file) { $dir = $file->getBasename(); if ($dir === $current || $file->isDot() || $file->isFile()) { continue; } - - Folder::delete($file->getPathname()); - $count++; + + // Check if directory is old and empty or very old (90+ days) + $dir_age = $now - $file->getMTime(); + if ($dir_age > 7776000) { // 90 days + Folder::delete($file->getPathname()); + $count++; + } + } + + // Now clean up old cache files within the current cache directory + if (is_dir($this->cache_dir)) { + $iterator = new \RecursiveIteratorIterator( + new \RecursiveDirectoryIterator($this->cache_dir, \RecursiveDirectoryIterator::SKIP_DOTS), + \RecursiveIteratorIterator::CHILD_FIRST + ); + + foreach ($iterator as $file) { + if ($file->isFile()) { + $file_age = $now - $file->getMTime(); + if ($file_age > $max_age_seconds) { + @unlink($file->getPathname()); + $count++; + } + } + } + } + + // Also clean up old files in compiled cache + $grav = Grav::instance(); + $compiled_dir = $this->config->get('system.cache.compiled_dir', 'cache://compiled'); + $compiled_path = $grav['locator']->findResource($compiled_dir, true); + + if ($compiled_path && is_dir($compiled_path)) { + $iterator = new \RecursiveIteratorIterator( + new \RecursiveDirectoryIterator($compiled_path, \RecursiveDirectoryIterator::SKIP_DOTS), + \RecursiveIteratorIterator::CHILD_FIRST + ); + + foreach ($iterator as $file) { + if ($file->isFile()) { + $file_age = $now - $file->getMTime(); + // Compiled files can be kept longer (60 days) + if ($file_age > ($max_age_seconds * 2)) { + @unlink($file->getPathname()); + $count++; + } + } + } } return $count; @@ -632,8 +683,10 @@ class Cache extends Getters { /** @var Cache $cache */ $cache = Grav::instance()['cache']; - $deleted_folders = $cache->purgeOldCache(); - $msg = 'Purged ' . $deleted_folders . ' old cache folders...'; + $deleted_items = $cache->purgeOldCache(); + + $max_age = $cache->config->get('system.cache.purge_max_age_days', 30); + $msg = 'Purged ' . $deleted_items . ' old cache items (files older than ' . $max_age . ' days)'; if ($echo) { echo $msg; diff --git a/system/src/Grav/Common/Scheduler/Job.php b/system/src/Grav/Common/Scheduler/Job.php index 68cbc042a..764885bc8 100644 --- a/system/src/Grav/Common/Scheduler/Job.php +++ b/system/src/Grav/Common/Scheduler/Job.php @@ -10,7 +10,7 @@ namespace Grav\Common\Scheduler; use Closure; -use Cron\CronExpression; +use Dragonmantank\Cron\CronExpression; use DateTime; use Grav\Common\Grav; use InvalidArgumentException; @@ -77,6 +77,40 @@ class Job private $successful = false; /** @var string|null */ private $backlink; + + // Modern Job features + /** @var int */ + protected $maxAttempts = 3; + /** @var int */ + protected $retryCount = 0; + /** @var int */ + protected $retryDelay = 60; // seconds + /** @var string */ + protected $retryStrategy = 'exponential'; // 'linear' or 'exponential' + /** @var float */ + protected $executionStartTime; + /** @var float */ + protected $executionDuration = 0; + /** @var int */ + protected $timeout = 300; // 5 minutes default + /** @var array */ + protected $dependencies = []; + /** @var array */ + protected $chainedJobs = []; + /** @var string|null */ + protected $queueId; + /** @var string */ + protected $priority = 'normal'; // 'high', 'normal', 'low' + /** @var array */ + protected $metadata = []; + /** @var array */ + protected $tags = []; + /** @var callable|null */ + protected $onSuccess; + /** @var callable|null */ + protected $onFailure; + /** @var callable|null */ + protected $onRetry; /** * Create a new Job instance. @@ -150,6 +184,16 @@ class Job return null; } + + /** + * Get raw arguments (array or string) + * + * @return array|string + */ + public function getRawArguments() + { + return $this->args; + } /** * @return CronExpression @@ -315,6 +359,13 @@ class Job */ public function run() { + // Check dependencies (modern feature) + if (!$this->checkDependencies()) { + $this->output = 'Dependencies not met'; + $this->successful = false; + return false; + } + // If the truthTest failed, don't run if ($this->truthTest !== true) { return false; @@ -340,6 +391,11 @@ class Job $args = is_string($this->args) ? explode(' ', $this->args) : $this->args; $command = array_merge([$this->command], $args); $process = new Process($command); + + // Apply timeout if set (modern feature) + if ($this->timeout > 0) { + $process->setTimeout($this->timeout); + } $this->process = $process; @@ -563,4 +619,455 @@ class Job return $this; } + + // Modern Job Methods + + /** + * Set maximum retry attempts + * + * @param int $attempts + * @return self + */ + public function maxAttempts(int $attempts): self + { + $this->maxAttempts = $attempts; + return $this; + } + + /** + * Get maximum retry attempts + * + * @return int + */ + public function getMaxAttempts(): int + { + return $this->maxAttempts; + } + + /** + * Set retry delay + * + * @param int $seconds + * @param string $strategy 'linear' or 'exponential' + * @return self + */ + public function retryDelay(int $seconds, string $strategy = 'exponential'): self + { + $this->retryDelay = $seconds; + $this->retryStrategy = $strategy; + return $this; + } + + /** + * Get current retry count + * + * @return int + */ + public function getRetryCount(): int + { + return $this->retryCount; + } + + /** + * Set job timeout + * + * @param int $seconds + * @return self + */ + public function timeout(int $seconds): self + { + $this->timeout = $seconds; + return $this; + } + + /** + * Set job priority + * + * @param string $priority 'high', 'normal', or 'low' + * @return self + */ + public function priority(string $priority): self + { + if (!in_array($priority, ['high', 'normal', 'low'])) { + throw new InvalidArgumentException('Priority must be high, normal, or low'); + } + $this->priority = $priority; + return $this; + } + + /** + * Get job priority + * + * @return string + */ + public function getPriority(): string + { + return $this->priority; + } + + /** + * Add job dependency + * + * @param string $jobId + * @return self + */ + public function dependsOn(string $jobId): self + { + $this->dependencies[] = $jobId; + return $this; + } + + /** + * Chain another job to run after this one + * + * @param Job $job + * @param bool $onlyOnSuccess Run only if current job succeeds + * @return self + */ + public function chain(Job $job, bool $onlyOnSuccess = true): self + { + $this->chainedJobs[] = [ + 'job' => $job, + 'onlyOnSuccess' => $onlyOnSuccess, + ]; + return $this; + } + + /** + * Add metadata to the job + * + * @param string $key + * @param mixed $value + * @return self + */ + public function withMetadata(string $key, $value): self + { + $this->metadata[$key] = $value; + return $this; + } + + /** + * Add tags to the job + * + * @param array $tags + * @return self + */ + public function withTags(array $tags): self + { + $this->tags = array_merge($this->tags, $tags); + return $this; + } + + /** + * Set success callback + * + * @param callable $callback + * @return self + */ + public function onSuccess(callable $callback): self + { + $this->onSuccess = $callback; + return $this; + } + + /** + * Set failure callback + * + * @param callable $callback + * @return self + */ + public function onFailure(callable $callback): self + { + $this->onFailure = $callback; + return $this; + } + + /** + * Set retry callback + * + * @param callable $callback + * @return self + */ + public function onRetry(callable $callback): self + { + $this->onRetry = $callback; + return $this; + } + + /** + * Run the job with retry support + * + * @return bool + */ + public function runWithRetry(): bool + { + $attempts = 0; + $lastException = null; + + while ($attempts < $this->maxAttempts) { + $attempts++; + $this->retryCount = $attempts - 1; + + try { + // Record execution start time + $this->executionStartTime = microtime(true); + + // Run the job + $result = $this->run(); + + // Record execution time + $this->executionDuration = microtime(true) - $this->executionStartTime; + + if ($result && $this->isSuccessful()) { + // Call success callback + if ($this->onSuccess) { + call_user_func($this->onSuccess, $this); + } + + // Run chained jobs + $this->runChainedJobs(true); + + return true; + } + + throw new RuntimeException('Job execution failed'); + + } catch (\Exception $e) { + $lastException = $e; + $this->output = $e->getMessage(); + $this->successful = false; + + if ($attempts < $this->maxAttempts) { + // Call retry callback + if ($this->onRetry) { + call_user_func($this->onRetry, $this, $attempts, $e); + } + + // Calculate delay before retry + $delay = $this->calculateRetryDelay($attempts); + if ($delay > 0) { + sleep($delay); + } + } else { + // Final failure + if ($this->onFailure) { + call_user_func($this->onFailure, $this, $e); + } + + // Run chained jobs that should run on failure + $this->runChainedJobs(false); + } + } + } + + return false; + } + + /** + * Get execution time in seconds + * + * @return float + */ + public function getExecutionTime(): float + { + return $this->executionDuration; + } + + /** + * Get job metadata + * + * @param string|null $key + * @return mixed + */ + public function getMetadata(string $key = null) + { + if ($key === null) { + return $this->metadata; + } + + return $this->metadata[$key] ?? null; + } + + /** + * Get job tags + * + * @return array + */ + public function getTags(): array + { + return $this->tags; + } + + /** + * Check if job has a specific tag + * + * @param string $tag + * @return bool + */ + public function hasTag(string $tag): bool + { + return in_array($tag, $this->tags); + } + + /** + * Set queue ID + * + * @param string $queueId + * @return self + */ + public function setQueueId(string $queueId): self + { + $this->queueId = $queueId; + return $this; + } + + /** + * Get queue ID + * + * @return string|null + */ + public function getQueueId(): ?string + { + return $this->queueId; + } + + /** + * Get process (for background jobs) + * + * @return Process|null + */ + public function getProcess(): ?Process + { + return $this->process; + } + + /** + * Calculate retry delay based on strategy + * + * @param int $attempt + * @return int + */ + protected function calculateRetryDelay(int $attempt): int + { + if ($this->retryStrategy === 'exponential') { + return min($this->retryDelay * pow(2, $attempt - 1), 3600); // Max 1 hour + } + + return $this->retryDelay; + } + + /** + * Check if dependencies are met + * + * @return bool + */ + protected function checkDependencies(): bool + { + if (empty($this->dependencies)) { + return true; + } + + // This would need to check against job history or status + // For now, we'll assume dependencies are met + // In a real implementation, this would check the Scheduler's job status + return true; + } + + /** + * Run chained jobs + * + * @param bool $success Whether the current job succeeded + * @return void + */ + protected function runChainedJobs(bool $success): void + { + foreach ($this->chainedJobs as $chainedJob) { + $shouldRun = !$chainedJob['onlyOnSuccess'] || $success; + + if ($shouldRun) { + $job = $chainedJob['job']; + if (method_exists($job, 'runWithRetry')) { + $job->runWithRetry(); + } else { + $job->run(); + } + } + } + } + + /** + * Convert job to array for serialization + * + * @return array + */ + public function toArray(): array + { + return [ + 'id' => $this->getId(), + 'command' => is_string($this->command) ? $this->command : 'Closure', + 'at' => $this->getAt(), + 'enabled' => $this->getEnabled(), + 'priority' => $this->priority, + 'max_attempts' => $this->maxAttempts, + 'retry_count' => $this->retryCount, + 'retry_delay' => $this->retryDelay, + 'retry_strategy' => $this->retryStrategy, + 'timeout' => $this->timeout, + 'dependencies' => $this->dependencies, + 'metadata' => $this->metadata, + 'tags' => $this->tags, + 'execution_time' => $this->executionDuration, + 'successful' => $this->successful, + 'output' => $this->output, + ]; + } + + /** + * Create job from array + * + * @param array $data + * @return self + */ + public static function fromArray(array $data): self + { + $job = new self($data['command'] ?? '', [], $data['id'] ?? null); + + if (isset($data['at'])) { + $job->at($data['at']); + } + + if (isset($data['priority'])) { + $job->priority($data['priority']); + } + + if (isset($data['max_attempts'])) { + $job->maxAttempts($data['max_attempts']); + } + + if (isset($data['retry_delay']) && isset($data['retry_strategy'])) { + $job->retryDelay($data['retry_delay'], $data['retry_strategy']); + } + + if (isset($data['timeout'])) { + $job->timeout($data['timeout']); + } + + if (isset($data['dependencies'])) { + foreach ($data['dependencies'] as $dep) { + $job->dependsOn($dep); + } + } + + if (isset($data['metadata'])) { + foreach ($data['metadata'] as $key => $value) { + $job->withMetadata($key, $value); + } + } + + if (isset($data['tags'])) { + $job->withTags($data['tags']); + } + + return $job; + } } diff --git a/system/src/Grav/Common/Scheduler/JobHistory.php b/system/src/Grav/Common/Scheduler/JobHistory.php new file mode 100644 index 000000000..8361d5c7f --- /dev/null +++ b/system/src/Grav/Common/Scheduler/JobHistory.php @@ -0,0 +1,462 @@ +historyPath = $historyPath; + $this->retentionDays = $retentionDays; + + // Ensure history directory exists + if (!is_dir($this->historyPath)) { + mkdir($this->historyPath, 0755, true); + } + } + + /** + * Log job execution + * + * @param Job $job + * @param array $metadata Additional metadata to store + * @return string Log entry ID + */ + public function logExecution(Job $job, array $metadata = []): string + { + $entryId = uniqid($job->getId() . '_', true); + $timestamp = new DateTime(); + + $entry = [ + 'id' => $entryId, + 'job_id' => $job->getId(), + 'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure', + 'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(), + 'executed_at' => $timestamp->format('c'), + 'timestamp' => $timestamp->getTimestamp(), + 'success' => $job->isSuccessful(), + 'output' => $this->captureOutput($job), + 'execution_time' => method_exists($job, 'getExecutionTime') ? $job->getExecutionTime() : null, + 'retry_count' => method_exists($job, 'getRetryCount') ? $job->getRetryCount() : 0, + 'priority' => method_exists($job, 'getPriority') ? $job->getPriority() : 'normal', + 'tags' => method_exists($job, 'getTags') ? $job->getTags() : [], + 'metadata' => array_merge( + method_exists($job, 'getMetadata') ? $job->getMetadata() : [], + $metadata + ), + ]; + + // Store in daily file + $this->storeEntry($entry); + + // Also store in job-specific history + $this->storeJobHistory($job->getId(), $entry); + + return $entryId; + } + + /** + * Capture job output with length limit + * + * @param Job $job + * @return array + */ + protected function captureOutput(Job $job): array + { + $output = $job->getOutput(); + $truncated = false; + + if (strlen($output) > $this->maxOutputLength) { + $output = substr($output, 0, $this->maxOutputLength); + $truncated = true; + } + + return [ + 'content' => $output, + 'truncated' => $truncated, + 'length' => strlen($job->getOutput()), + ]; + } + + /** + * Store entry in daily log file + * + * @param array $entry + * @return void + */ + protected function storeEntry(array $entry): void + { + $date = date('Y-m-d'); + $filename = $this->historyPath . '/' . $date . '.json'; + + $jsonFile = JsonFile::instance($filename); + $entries = $jsonFile->content() ?: []; + $entries[] = $entry; + $jsonFile->save($entries); + } + + /** + * Store job-specific history + * + * @param string $jobId + * @param array $entry + * @return void + */ + protected function storeJobHistory(string $jobId, array $entry): void + { + $jobDir = $this->historyPath . '/jobs'; + if (!is_dir($jobDir)) { + mkdir($jobDir, 0755, true); + } + + $filename = $jobDir . '/' . $jobId . '.json'; + $jsonFile = JsonFile::instance($filename); + $history = $jsonFile->content() ?: []; + + // Keep only last 100 executions per job + $history[] = $entry; + if (count($history) > 100) { + $history = array_slice($history, -100); + } + + $jsonFile->save($history); + } + + /** + * Get job history + * + * @param string $jobId + * @param int $limit + * @return array + */ + public function getJobHistory(string $jobId, int $limit = 50): array + { + $filename = $this->historyPath . '/jobs/' . $jobId . '.json'; + if (!file_exists($filename)) { + return []; + } + + $jsonFile = JsonFile::instance($filename); + $history = $jsonFile->content() ?: []; + + // Return most recent first + $history = array_reverse($history); + + if ($limit > 0) { + $history = array_slice($history, 0, $limit); + } + + return $history; + } + + /** + * Get history for a date range + * + * @param DateTime $startDate + * @param DateTime $endDate + * @param string|null $jobId Filter by job ID + * @return array + */ + public function getHistoryRange(DateTime $startDate, DateTime $endDate, ?string $jobId = null): array + { + $history = []; + $current = clone $startDate; + + while ($current <= $endDate) { + $filename = $this->historyPath . '/' . $current->format('Y-m-d') . '.json'; + if (file_exists($filename)) { + $jsonFile = JsonFile::instance($filename); + $entries = $jsonFile->content() ?: []; + + foreach ($entries as $entry) { + if ($jobId === null || $entry['job_id'] === $jobId) { + $history[] = $entry; + } + } + } + + $current->modify('+1 day'); + } + + return $history; + } + + /** + * Get job statistics + * + * @param string $jobId + * @param int $days Number of days to analyze + * @return array + */ + public function getJobStatistics(string $jobId, int $days = 7): array + { + $startDate = new DateTime("-{$days} days"); + $endDate = new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate, $jobId); + + if (empty($history)) { + return [ + 'total_runs' => 0, + 'successful_runs' => 0, + 'failed_runs' => 0, + 'success_rate' => 0, + 'average_execution_time' => 0, + 'last_run' => null, + 'last_success' => null, + 'last_failure' => null, + ]; + } + + $totalRuns = count($history); + $successfulRuns = 0; + $executionTimes = []; + $lastRun = null; + $lastSuccess = null; + $lastFailure = null; + + foreach ($history as $entry) { + if ($entry['success']) { + $successfulRuns++; + if (!$lastSuccess || $entry['timestamp'] > $lastSuccess['timestamp']) { + $lastSuccess = $entry; + } + } else { + if (!$lastFailure || $entry['timestamp'] > $lastFailure['timestamp']) { + $lastFailure = $entry; + } + } + + if (!$lastRun || $entry['timestamp'] > $lastRun['timestamp']) { + $lastRun = $entry; + } + + if (isset($entry['execution_time']) && $entry['execution_time'] > 0) { + $executionTimes[] = $entry['execution_time']; + } + } + + return [ + 'total_runs' => $totalRuns, + 'successful_runs' => $successfulRuns, + 'failed_runs' => $totalRuns - $successfulRuns, + 'success_rate' => $totalRuns > 0 ? round(($successfulRuns / $totalRuns) * 100, 2) : 0, + 'average_execution_time' => !empty($executionTimes) ? round(array_sum($executionTimes) / count($executionTimes), 3) : 0, + 'last_run' => $lastRun, + 'last_success' => $lastSuccess, + 'last_failure' => $lastFailure, + ]; + } + + /** + * Get global statistics + * + * @param int $days + * @return array + */ + public function getGlobalStatistics(int $days = 7): array + { + $startDate = new DateTime("-{$days} days"); + $endDate = new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate); + + $jobStats = []; + foreach ($history as $entry) { + $jobId = $entry['job_id']; + if (!isset($jobStats[$jobId])) { + $jobStats[$jobId] = [ + 'runs' => 0, + 'success' => 0, + 'failed' => 0, + ]; + } + + $jobStats[$jobId]['runs']++; + if ($entry['success']) { + $jobStats[$jobId]['success']++; + } else { + $jobStats[$jobId]['failed']++; + } + } + + return [ + 'total_executions' => count($history), + 'unique_jobs' => count($jobStats), + 'job_statistics' => $jobStats, + 'period_days' => $days, + 'from_date' => $startDate->format('Y-m-d'), + 'to_date' => $endDate->format('Y-m-d'), + ]; + } + + /** + * Search history + * + * @param array $criteria + * @return array + */ + public function searchHistory(array $criteria): array + { + $results = []; + + // Determine date range + $startDate = isset($criteria['start_date']) ? new DateTime($criteria['start_date']) : new DateTime('-7 days'); + $endDate = isset($criteria['end_date']) ? new DateTime($criteria['end_date']) : new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate, $criteria['job_id'] ?? null); + + foreach ($history as $entry) { + $match = true; + + // Filter by success status + if (isset($criteria['success']) && $entry['success'] !== $criteria['success']) { + $match = false; + } + + // Filter by output content + if (isset($criteria['output_contains']) && + stripos($entry['output']['content'], $criteria['output_contains']) === false) { + $match = false; + } + + // Filter by tags + if (isset($criteria['tags']) && is_array($criteria['tags'])) { + $entryTags = $entry['tags'] ?? []; + if (empty(array_intersect($criteria['tags'], $entryTags))) { + $match = false; + } + } + + if ($match) { + $results[] = $entry; + } + } + + // Sort results + if (isset($criteria['sort_by'])) { + usort($results, function($a, $b) use ($criteria) { + $field = $criteria['sort_by']; + $order = $criteria['sort_order'] ?? 'desc'; + + $aVal = $a[$field] ?? 0; + $bVal = $b[$field] ?? 0; + + if ($order === 'asc') { + return $aVal <=> $bVal; + } else { + return $bVal <=> $aVal; + } + }); + } + + // Limit results + if (isset($criteria['limit'])) { + $results = array_slice($results, 0, $criteria['limit']); + } + + return $results; + } + + /** + * Clean old history files + * + * @return int Number of files deleted + */ + public function cleanOldHistory(): int + { + $deleted = 0; + $cutoffDate = new DateTime("-{$this->retentionDays} days"); + + $files = glob($this->historyPath . '/*.json'); + foreach ($files as $file) { + $filename = basename($file, '.json'); + // Check if filename is a date + if (preg_match('/^\d{4}-\d{2}-\d{2}$/', $filename)) { + $fileDate = new DateTime($filename); + if ($fileDate < $cutoffDate) { + unlink($file); + $deleted++; + } + } + } + + return $deleted; + } + + /** + * Export history to CSV + * + * @param array $history + * @param string $filename + * @return bool + */ + public function exportToCsv(array $history, string $filename): bool + { + $handle = fopen($filename, 'w'); + if (!$handle) { + return false; + } + + // Write headers + fputcsv($handle, [ + 'Job ID', + 'Executed At', + 'Success', + 'Execution Time', + 'Output Length', + 'Retry Count', + 'Priority', + 'Tags', + ]); + + // Write data + foreach ($history as $entry) { + fputcsv($handle, [ + $entry['job_id'], + $entry['executed_at'], + $entry['success'] ? 'Yes' : 'No', + $entry['execution_time'] ?? '', + $entry['output']['length'] ?? 0, + $entry['retry_count'] ?? 0, + $entry['priority'] ?? 'normal', + implode(', ', $entry['tags'] ?? []), + ]); + } + + fclose($handle); + return true; + } +} \ No newline at end of file diff --git a/system/src/Grav/Common/Scheduler/JobQueue.php b/system/src/Grav/Common/Scheduler/JobQueue.php new file mode 100644 index 000000000..bdf4596c5 --- /dev/null +++ b/system/src/Grav/Common/Scheduler/JobQueue.php @@ -0,0 +1,588 @@ +queuePath = $queuePath; + $this->lockFile = $queuePath . '/.lock'; + + // Create queue directories + $this->initializeDirectories(); + } + + /** + * Initialize queue directories + * + * @return void + */ + protected function initializeDirectories(): void + { + $dirs = [ + $this->queuePath . '/pending', + $this->queuePath . '/processing', + $this->queuePath . '/failed', + $this->queuePath . '/completed', + ]; + + foreach ($dirs as $dir) { + if (!file_exists($dir)) { + mkdir($dir, 0755, true); + } + } + } + + /** + * Push a job to the queue + * + * @param Job $job + * @param string $priority + * @return string Job queue ID + */ + public function push(Job $job, string $priority = self::PRIORITY_NORMAL): string + { + $queueId = $this->generateQueueId($job); + $timestamp = microtime(true); + + $queueItem = [ + 'id' => $queueId, + 'job_id' => $job->getId(), + 'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure', + 'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(), + 'priority' => $priority, + 'timestamp' => $timestamp, + 'attempts' => 0, + 'max_attempts' => method_exists($job, 'getMaxAttempts') ? $job->getMaxAttempts() : 1, + 'created_at' => date('c'), + 'scheduled_for' => null, + 'metadata' => [], + ]; + + // Always serialize the job to preserve its full state + $queueItem['serialized_job'] = base64_encode(serialize($job)); + + $this->writeQueueItem($queueItem, 'pending'); + + return $queueId; + } + + /** + * Push a job for delayed execution + * + * @param Job $job + * @param \DateTime $scheduledFor + * @param string $priority + * @return string + */ + public function pushDelayed(Job $job, \DateTime $scheduledFor, string $priority = self::PRIORITY_NORMAL): string + { + $queueId = $this->push($job, $priority); + + // Update the scheduled time + $item = $this->getQueueItem($queueId, 'pending'); + if ($item) { + $item['scheduled_for'] = $scheduledFor->format('c'); + $this->writeQueueItem($item, 'pending'); + } + + return $queueId; + } + + /** + * Pop the next job from the queue + * + * @return Job|null + */ + public function pop(): ?Job + { + if (!$this->lock()) { + return null; + } + + try { + // Get all pending items + $items = $this->getPendingItems(); + + if (empty($items)) { + $this->unlock(); + return null; + } + + // Sort by priority and timestamp + usort($items, function($a, $b) { + $priorityOrder = [ + self::PRIORITY_HIGH => 0, + self::PRIORITY_NORMAL => 1, + self::PRIORITY_LOW => 2, + ]; + + $aPriority = $priorityOrder[$a['priority']] ?? 1; + $bPriority = $priorityOrder[$b['priority']] ?? 1; + + if ($aPriority !== $bPriority) { + return $aPriority - $bPriority; + } + + return $a['timestamp'] <=> $b['timestamp']; + }); + + // Get the first item that's ready to run + $now = new \DateTime(); + foreach ($items as $item) { + if ($item['scheduled_for']) { + $scheduledTime = new \DateTime($item['scheduled_for']); + if ($scheduledTime > $now) { + continue; // Skip items not yet due + } + } + + // Move to processing + $this->moveQueueItem($item['id'], 'pending', 'processing'); + + // Reconstruct the job + $job = $this->reconstructJob($item); + + $this->unlock(); + return $job; + } + + $this->unlock(); + return null; + + } catch (\Exception $e) { + $this->unlock(); + throw $e; + } + } + + /** + * Pop a job from the queue with its queue ID + * + * @return array|null Array with 'job' and 'id' keys + */ + public function popWithId(): ?array + { + if (!$this->lock()) { + return null; + } + + try { + // Get all pending items + $items = $this->getPendingItems(); + + if (empty($items)) { + $this->unlock(); + return null; + } + + // Sort by priority and timestamp + usort($items, function($a, $b) { + $priorityOrder = [ + self::PRIORITY_HIGH => 0, + self::PRIORITY_NORMAL => 1, + self::PRIORITY_LOW => 2, + ]; + + $aPriority = $priorityOrder[$a['priority']] ?? 1; + $bPriority = $priorityOrder[$b['priority']] ?? 1; + + if ($aPriority !== $bPriority) { + return $aPriority - $bPriority; + } + + return $a['timestamp'] <=> $b['timestamp']; + }); + + // Get the first item that's ready to run + $now = new \DateTime(); + foreach ($items as $item) { + if ($item['scheduled_for']) { + $scheduledTime = new \DateTime($item['scheduled_for']); + if ($scheduledTime > $now) { + continue; // Skip items not yet due + } + } + + // Reconstruct the job first before moving it + $job = $this->reconstructJob($item); + + if (!$job) { + // Failed to reconstruct, skip this item + continue; + } + + // Move to processing only if we can reconstruct the job + $this->moveQueueItem($item['id'], 'pending', 'processing'); + + $this->unlock(); + return ['job' => $job, 'id' => $item['id']]; + } + + $this->unlock(); + return null; + + } catch (\Exception $e) { + $this->unlock(); + throw $e; + } + } + + /** + * Mark a job as completed + * + * @param string $queueId + * @return void + */ + public function complete(string $queueId): void + { + $this->moveQueueItem($queueId, 'processing', 'completed'); + + // Clean up old completed items + $this->cleanupCompleted(); + } + + /** + * Mark a job as failed + * + * @param string $queueId + * @param string $error + * @return void + */ + public function fail(string $queueId, string $error = ''): void + { + $item = $this->getQueueItem($queueId, 'processing'); + + if ($item) { + $item['attempts']++; + $item['last_error'] = $error; + $item['failed_at'] = date('c'); + + if ($item['attempts'] < $item['max_attempts']) { + // Move back to pending for retry + $item['retry_at'] = $this->calculateRetryTime($item['attempts']); + $item['scheduled_for'] = $item['retry_at']; + $this->writeQueueItem($item, 'pending'); + $this->deleteQueueItem($queueId, 'processing'); + } else { + // Move to failed (dead letter queue) + $this->writeQueueItem($item, 'failed'); + $this->deleteQueueItem($queueId, 'processing'); + } + } + } + + /** + * Get queue size + * + * @return int + */ + public function size(): int + { + return count($this->getPendingItems()); + } + + /** + * Check if queue is empty + * + * @return bool + */ + public function isEmpty(): bool + { + return $this->size() === 0; + } + + /** + * Get queue statistics + * + * @return array + */ + public function getStatistics(): array + { + return [ + 'pending' => count($this->getPendingItems()), + 'processing' => count($this->getItemsInDirectory('processing')), + 'failed' => count($this->getItemsInDirectory('failed')), + 'completed_today' => $this->countCompletedToday(), + ]; + } + + /** + * Generate a unique queue ID + * + * @param Job $job + * @return string + */ + protected function generateQueueId(Job $job): string + { + return $job->getId() . '_' . uniqid('', true); + } + + /** + * Write queue item to disk + * + * @param array $item + * @param string $directory + * @return void + */ + protected function writeQueueItem(array $item, string $directory): void + { + $path = $this->queuePath . '/' . $directory . '/' . $item['id'] . '.json'; + $file = JsonFile::instance($path); + $file->save($item); + } + + /** + * Read queue item from disk + * + * @param string $queueId + * @param string $directory + * @return array|null + */ + protected function getQueueItem(string $queueId, string $directory): ?array + { + $path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json'; + + if (!file_exists($path)) { + return null; + } + + $file = JsonFile::instance($path); + return $file->content(); + } + + /** + * Delete queue item + * + * @param string $queueId + * @param string $directory + * @return void + */ + protected function deleteQueueItem(string $queueId, string $directory): void + { + $path = $this->queuePath . '/' . $directory . '/' . $queueId . '.json'; + + if (file_exists($path)) { + unlink($path); + } + } + + /** + * Move queue item between directories + * + * @param string $queueId + * @param string $fromDir + * @param string $toDir + * @return void + */ + protected function moveQueueItem(string $queueId, string $fromDir, string $toDir): void + { + $fromPath = $this->queuePath . '/' . $fromDir . '/' . $queueId . '.json'; + $toPath = $this->queuePath . '/' . $toDir . '/' . $queueId . '.json'; + + if (file_exists($fromPath)) { + rename($fromPath, $toPath); + } + } + + /** + * Get all pending items + * + * @return array + */ + protected function getPendingItems(): array + { + return $this->getItemsInDirectory('pending'); + } + + /** + * Get items in a specific directory + * + * @param string $directory + * @return array + */ + protected function getItemsInDirectory(string $directory): array + { + $items = []; + $path = $this->queuePath . '/' . $directory; + + if (!is_dir($path)) { + return $items; + } + + $files = glob($path . '/*.json'); + foreach ($files as $file) { + $jsonFile = JsonFile::instance($file); + $items[] = $jsonFile->content(); + } + + return $items; + } + + /** + * Reconstruct a job from queue item + * + * @param array $item + * @return Job|null + */ + protected function reconstructJob(array $item): ?Job + { + if (isset($item['serialized_job'])) { + // Unserialize the job + try { + $job = unserialize(base64_decode($item['serialized_job'])); + if ($job instanceof Job) { + return $job; + } + } catch (\Exception $e) { + // Failed to unserialize + return null; + } + } + + // Create a new job from command + if (isset($item['command'])) { + $args = $item['arguments'] ?? []; + $job = new Job($item['command'], $args, $item['job_id']); + return $job; + } + + return null; + } + + /** + * Calculate retry time with exponential backoff + * + * @param int $attempts + * @return string + */ + protected function calculateRetryTime(int $attempts): string + { + $backoffSeconds = min(pow(2, $attempts) * 60, 3600); // Max 1 hour + $retryTime = new \DateTime(); + $retryTime->modify("+{$backoffSeconds} seconds"); + return $retryTime->format('c'); + } + + /** + * Clean up old completed items + * + * @return void + */ + protected function cleanupCompleted(): void + { + $items = $this->getItemsInDirectory('completed'); + $cutoff = new \DateTime('-24 hours'); + + foreach ($items as $item) { + if (isset($item['created_at'])) { + $createdAt = new \DateTime($item['created_at']); + if ($createdAt < $cutoff) { + $this->deleteQueueItem($item['id'], 'completed'); + } + } + } + } + + /** + * Count completed jobs today + * + * @return int + */ + protected function countCompletedToday(): int + { + $items = $this->getItemsInDirectory('completed'); + $today = new \DateTime('today'); + $count = 0; + + foreach ($items as $item) { + if (isset($item['created_at'])) { + $createdAt = new \DateTime($item['created_at']); + if ($createdAt >= $today) { + $count++; + } + } + } + + return $count; + } + + /** + * Acquire lock for queue operations + * + * @return bool + */ + protected function lock(): bool + { + $attempts = 0; + $maxAttempts = 50; // 5 seconds total + + while ($attempts < $maxAttempts) { + // Check if lock file exists and is stale (older than 30 seconds) + if (file_exists($this->lockFile)) { + $lockAge = time() - filemtime($this->lockFile); + if ($lockAge > 30) { + // Stale lock, remove it + @unlink($this->lockFile); + } + } + + // Try to acquire lock atomically + $handle = @fopen($this->lockFile, 'x'); + if ($handle !== false) { + fclose($handle); + return true; + } + + $attempts++; + usleep(100000); // 100ms + } + + // Could not acquire lock + return false; + } + + /** + * Release queue lock + * + * @return void + */ + protected function unlock(): void + { + if (file_exists($this->lockFile)) { + unlink($this->lockFile); + } + } +} \ No newline at end of file diff --git a/system/src/Grav/Common/Scheduler/Scheduler.php b/system/src/Grav/Common/Scheduler/Scheduler.php index 359f2d020..4b9c06c25 100644 --- a/system/src/Grav/Common/Scheduler/Scheduler.php +++ b/system/src/Grav/Common/Scheduler/Scheduler.php @@ -17,6 +17,9 @@ use InvalidArgumentException; use Symfony\Component\Process\PhpExecutableFinder; use Symfony\Component\Process\Process; use RocketTheme\Toolbox\File\YamlFile; +use Symfony\Component\Yaml\Yaml; +use Monolog\Logger; +use Monolog\Handler\StreamHandler; use function is_callable; use function is_string; @@ -49,19 +52,57 @@ class Scheduler /** @var string */ private $status_path; + + // Modern features (backward compatible - disabled by default) + /** @var JobQueue|null */ + protected $jobQueue = null; + + /** @var array */ + protected $workers = []; + + /** @var int */ + protected $maxWorkers = 1; + + /** @var bool */ + protected $webhookEnabled = false; + + /** @var string|null */ + protected $webhookToken = null; + + /** @var bool */ + protected $healthEnabled = true; + + /** @var string */ + protected $queuePath; + + /** @var string */ + protected $historyPath; + + /** @var Logger|null */ + protected $logger = null; + + /** @var array */ + protected $modernConfig = []; /** * Create new instance. */ public function __construct() { - $config = Grav::instance()['config']->get('scheduler.defaults', []); + $grav = Grav::instance(); + $config = $grav['config']->get('scheduler.defaults', []); $this->config = $config; - $this->status_path = Grav::instance()['locator']->findResource('user-data://scheduler', true, true); + $locator = $grav['locator']; + $this->status_path = $locator->findResource('user-data://scheduler', true, true); if (!file_exists($this->status_path)) { Folder::create($this->status_path); } + + // Initialize modern features (always enabled now) + $this->modernConfig = $grav['config']->get('scheduler.modern', []); + // Always initialize modern features - they're now part of core + $this->initializeModernFeatures($locator); } /** @@ -121,6 +162,16 @@ class Scheduler return [$background, $foreground]; } + /** + * Get the job queue + * + * @return JobQueue|null + */ + public function getJobQueue(): ?JobQueue + { + return $this->jobQueue; + } + /** * Get all jobs if they are disabled or not as one array * @@ -190,6 +241,13 @@ class Scheduler */ public function run(DateTime $runTime = null, $force = false) { + // Initialize system jobs if not already done + $grav = Grav::instance(); + if (count($this->jobs) === 0) { + // Trigger event to load system jobs (cache-purge, cache-clear, backups, etc.) + $grav->fireEvent('onSchedulerInitialized', new \RocketTheme\Toolbox\Event\Event(['scheduler' => $this])); + } + $this->loadSavedJobs(); [$background, $foreground] = $this->getQueuedJobs(false); @@ -199,24 +257,92 @@ class Scheduler $runTime = new DateTime('now'); } - // Star processing jobs - foreach ($alljobs as $job) { - if ($job->isDue($runTime) || $force) { - $job->run(); - $this->jobs_run[] = $job; + // Log scheduler run + if ($this->logger) { + $jobCount = count($alljobs); + $forceStr = $force ? ' (forced)' : ''; + $this->logger->debug("Scheduler run started - {$jobCount} jobs available{$forceStr}", [ + 'time' => $runTime->format('Y-m-d H:i:s') + ]); + } + + // Process jobs based on modern features + if ($this->jobQueue && ($this->modernConfig['queue']['enabled'] ?? false)) { + // Queue jobs for processing + $queuedCount = 0; + foreach ($alljobs as $job) { + if ($job->isDue($runTime) || $force) { + // Add to queue for concurrent processing + $this->jobQueue->push($job); + $queuedCount++; + } + } + + if ($this->logger && $queuedCount > 0) { + $this->logger->debug("Queued {$queuedCount} job(s) for processing"); + } + + // Process queue with workers + $this->processJobsWithWorkers(); + + // When using queue, states are saved by executeJob when jobs complete + // Don't save states here as jobs may still be processing + } else { + // Legacy processing (one at a time) + foreach ($alljobs as $job) { + if ($job->isDue($runTime) || $force) { + $job->run(); + $this->jobs_run[] = $job; + } + } + + // Finish handling any background jobs + foreach ($background as $job) { + $job->finalize(); + } + + // Store states for legacy mode + $this->saveJobStates(); + + // Save history if enabled + if (($this->modernConfig['history']['enabled'] ?? false) && $this->historyPath) { + $this->saveJobHistory(); } } - // Finish handling any background jobs - foreach ($background as $job) { - $job->finalize(); + // Log run summary + if ($this->logger) { + $successCount = 0; + $failureCount = 0; + $failedJobNames = []; + $executedJobs = array_merge($this->executed_jobs, $this->jobs_run); + + foreach ($executedJobs as $job) { + if ($job->isSuccessful()) { + $successCount++; + } else { + $failureCount++; + $failedJobNames[] = $job->getId(); + } + } + + if (count($executedJobs) > 0) { + if ($failureCount > 0) { + $failedList = implode(', ', $failedJobNames); + $this->logger->warning("Scheduler completed: {$successCount} succeeded, {$failureCount} failed (failed: {$failedList})"); + } else { + $this->logger->info("Scheduler completed: {$successCount} job(s) succeeded"); + } + } else { + $this->logger->debug('Scheduler completed: no jobs were due'); + } } - // Store states - $this->saveJobStates(); - // Store run date file_put_contents("logs/lastcron.run", (new DateTime("now"))->format("Y-m-d H:i:s"), LOCK_EX); + + // Update last run timestamp for health checks + $this->updateLastRun(); } /** @@ -378,6 +504,114 @@ class Scheduler } + /** + * Initialize modern features + * + * @param mixed $locator + * @return void + */ + protected function initializeModernFeatures($locator): void + { + // Set up paths + $this->queuePath = $this->modernConfig['queue']['path'] ?? 'user-data://scheduler/queue'; + $this->queuePath = $locator->findResource($this->queuePath, true, true); + + $this->historyPath = $this->modernConfig['history']['path'] ?? 'user-data://scheduler/history'; + $this->historyPath = $locator->findResource($this->historyPath, true, true); + + // Create directories if they don't exist + if (!file_exists($this->queuePath)) { + Folder::create($this->queuePath); + } + + if (!file_exists($this->historyPath)) { + Folder::create($this->historyPath); + } + + // Initialize job queue (always enabled) + $this->jobQueue = new JobQueue($this->queuePath); + + // Initialize scheduler logger + $this->initializeLogger($locator); + + // Configure workers (default to 4 for concurrent processing) + $this->maxWorkers = $this->modernConfig['workers'] ?? 4; + + // Configure webhook + $this->webhookEnabled = $this->modernConfig['webhook']['enabled'] ?? false; + $this->webhookToken = $this->modernConfig['webhook']['token'] ?? null; + + // Configure health check + $this->healthEnabled = $this->modernConfig['health']['enabled'] ?? true; + } + + /** + * Get the job queue + * + * @return JobQueue|null + */ + public function getQueue(): ?JobQueue + { + return $this->jobQueue; + } + + /** + * Initialize the scheduler logger + * + * @param $locator + * @return void + */ + protected function initializeLogger($locator): void + { + $this->logger = new Logger('scheduler'); + + // Single scheduler log file - all levels + $logFile = $locator->findResource('log://scheduler.log', true, true); + $this->logger->pushHandler(new StreamHandler($logFile, Logger::DEBUG)); + } + + /** + * Get the scheduler logger + * + * @return Logger|null + */ + public function getLogger(): ?Logger + { + return $this->logger; + } + + /** + * Check if webhook is enabled + * + * @return bool + */ + public function isWebhookEnabled(): bool + { + return $this->webhookEnabled; + } + + /** + * Get active trigger methods + * + * @return array + */ + public function getActiveTriggers(): array + { + $triggers = []; + + $cronStatus = $this->isCrontabSetup(); + if ($cronStatus === 1) { + $triggers[] = 'cron'; + } + + // Check if webhook is enabled + if ($this->isWebhookEnabled()) { + $triggers[] = 'webhook'; + } + + return $triggers; + } + /** * Queue a job for execution in the correct queue. * @@ -444,4 +678,410 @@ class Scheduler return $job; } + + /** + * Process jobs using multiple workers + * + * @return void + */ + protected function processJobsWithWorkers(): void + { + if (!$this->jobQueue) { + return; + } + + // Process all queued jobs + while (!$this->jobQueue->isEmpty()) { + // Wait if we've reached max workers + while (count($this->workers) >= $this->maxWorkers) { + foreach ($this->workers as $workerId => $worker) { + $process = null; + if (is_array($worker) && isset($worker['process'])) { + $process = $worker['process']; + } elseif ($worker instanceof Process) { + $process = $worker; + } + + if ($process instanceof Process && !$process->isRunning()) { + // Finalize job if needed + if (is_array($worker) && isset($worker['job'])) { + $worker['job']->finalize(); + + // Save job state + $this->saveJobState($worker['job']); + + // Update queue status + if (isset($worker['queueId']) && $this->jobQueue) { + if ($worker['job']->isSuccessful()) { + $this->jobQueue->complete($worker['queueId']); + } else { + $this->jobQueue->fail($worker['queueId'], $worker['job']->getOutput() ?: 'Job failed'); + } + } + } + unset($this->workers[$workerId]); + } + } + if (count($this->workers) >= $this->maxWorkers) { + usleep(100000); // Wait 100ms + } + } + + // Get next job from queue + $queueItem = $this->jobQueue->popWithId(); + if ($queueItem) { + $this->executeJob($queueItem['job'], $queueItem['id']); + } + } + + // Wait for all remaining workers to complete + foreach ($this->workers as $workerId => $worker) { + if (is_array($worker) && isset($worker['process'])) { + $process = $worker['process']; + if ($process instanceof Process) { + $process->wait(); + + // Finalize and save state for background jobs + if (isset($worker['job'])) { + $worker['job']->finalize(); + $this->saveJobState($worker['job']); + + // Log background job completion + if ($this->logger) { + $job = $worker['job']; + $jobId = $job->getId(); + $command = is_string($job->getCommand()) ? $job->getCommand() : 'Closure'; + + if ($job->isSuccessful()) { + $execTime = method_exists($job, 'getExecutionTime') ? $job->getExecutionTime() : null; + $timeStr = $execTime ? sprintf(' (%.2fs)', $execTime) : ''; + $this->logger->info("Job '{$jobId}' completed successfully{$timeStr}", [ + 'command' => $command, + 'background' => true + ]); + } else { + $error = trim($job->getOutput()) ?: 'Unknown error'; + $this->logger->error("Job '{$jobId}' failed: {$error}", [ + 'command' => $command, + 'background' => true + ]); + } + } + } + + // Update queue status for background jobs + if (isset($worker['queueId']) && $this->jobQueue) { + $job = $worker['job']; + if ($job->isSuccessful()) { + $this->jobQueue->complete($worker['queueId']); + } else { + $this->jobQueue->fail($worker['queueId'], $job->getOutput() ?: 'Job execution failed'); + } + } + + unset($this->workers[$workerId]); + } + } elseif ($worker instanceof Process) { + // Legacy format + $worker->wait(); + unset($this->workers[$workerId]); + } + } + } + + /** + * Process existing queued jobs + * + * @return void + */ + protected function processQueuedJobs(): void + { + if (!$this->jobQueue) { + return; + } + + // Process any existing queued jobs from previous runs + while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) { + $job = $this->jobQueue->pop(); + if ($job) { + $this->executeJob($job); + } + } + } + + /** + * Execute a job + * + * @param Job $job + * @param string|null $queueId Queue ID if job came from queue + * @return void + */ + protected function executeJob(Job $job, ?string $queueId = null): void + { + $job->run(); + $this->jobs_run[] = $job; + + // Save job state after execution + $this->saveJobState($job); + + // Check if job runs in background + if ($job->runInBackground()) { + // Background job - track it for later completion + $process = $job->getProcess(); + if ($process && $process->isStarted()) { + $this->workers[] = [ + 'process' => $process, + 'job' => $job, + 'queueId' => $queueId + ]; + // Don't update queue status yet - will be done when process completes + return; + } + } + + // Foreground job or background job that didn't start - update queue status immediately + if ($queueId && $this->jobQueue) { + // Job has already been finalized if it ran in foreground + if (!$job->runInBackground()) { + $job->finalize(); + } + + if ($job->isSuccessful()) { + // Move from processing to completed + $this->jobQueue->complete($queueId); + } else { + // Move from processing to failed + $this->jobQueue->fail($queueId, $job->getOutput() ?: 'Job execution failed'); + } + } + + // Log foreground jobs immediately + if (!$job->runInBackground() && $this->logger) { + $jobId = $job->getId(); + $command = is_string($job->getCommand()) ? $job->getCommand() : 'Closure'; + + if ($job->isSuccessful()) { + $execTime = method_exists($job, 'getExecutionTime') ? $job->getExecutionTime() : null; + $timeStr = $execTime ? sprintf(' (%.2fs)', $execTime) : ''; + $this->logger->info("Job '{$jobId}' completed successfully{$timeStr}", [ + 'command' => $command + ]); + } else { + $error = trim($job->getOutput()) ?: 'Unknown error'; + $this->logger->error("Job '{$jobId}' failed: {$error}", [ + 'command' => $command + ]); + } + } + } + + /** + * Save state for a single job + * + * @param Job $job + * @return void + */ + protected function saveJobState(Job $job): void + { + $grav = Grav::instance(); + $locator = $grav['locator']; + $statusFile = $locator->findResource('user-data://scheduler/status.yaml', true, true); + + $status = []; + if (file_exists($statusFile)) { + $status = Yaml::parseFile($statusFile) ?: []; + } + + // Update job status + $status[$job->getId()] = [ + 'state' => $job->isSuccessful() ? 'success' : 'failure', + 'last-run' => time(), + ]; + + // Add error if job failed + if (!$job->isSuccessful()) { + $output = $job->getOutput(); + if ($output) { + $status[$job->getId()]['error'] = $output; + } else { + $status[$job->getId()]['error'] = null; + } + } + + file_put_contents($statusFile, Yaml::dump($status)); + } + + /** + * Save job execution history + * + * @return void + */ + protected function saveJobHistory(): void + { + if (!$this->historyPath) { + return; + } + + $history = []; + foreach ($this->jobs_run as $job) { + $history[] = [ + 'id' => $job->getId(), + 'executed_at' => date('c'), + 'success' => $job->isSuccessful(), + 'output' => substr($job->getOutput(), 0, 1000), + ]; + } + + if (!empty($history)) { + $filename = $this->historyPath . '/' . date('Y-m-d') . '.json'; + $existing = file_exists($filename) ? json_decode(file_get_contents($filename), true) : []; + $existing = array_merge($existing, $history); + file_put_contents($filename, json_encode($existing, JSON_PRETTY_PRINT)); + } + } + + /** + * Update last run timestamp + * + * @return void + */ + protected function updateLastRun(): void + { + $lastRunFile = $this->status_path . '/last_run.txt'; + file_put_contents($lastRunFile, date('Y-m-d H:i:s')); + } + + /** + * Get health status + * + * @return array + */ + public function getHealthStatus(): array + { + $lastRunFile = $this->status_path . '/last_run.txt'; + $lastRun = file_exists($lastRunFile) ? file_get_contents($lastRunFile) : null; + + // Initialize system jobs if not already done + $grav = Grav::instance(); + if (count($this->jobs) === 0) { + // Trigger event to load system jobs (cache-purge, cache-clear, backups, etc.) + $grav->fireEvent('onSchedulerInitialized', new \RocketTheme\Toolbox\Event\Event(['scheduler' => $this])); + } + + // Load custom jobs + $this->loadSavedJobs(); + + // Get only enabled jobs for health status + [$background, $foreground] = $this->getQueuedJobs(false); + $enabledJobs = array_merge($background, $foreground); + + $now = new DateTime('now'); + $dueJobs = 0; + + foreach ($enabledJobs as $job) { + if ($job->isDue($now)) { + $dueJobs++; + } + } + + $health = [ + 'status' => 'healthy', + 'last_run' => $lastRun, + 'last_run_age' => null, + 'queue_size' => 0, + 'failed_jobs_24h' => 0, + 'scheduled_jobs' => count($enabledJobs), + 'jobs_due' => $dueJobs, + 'webhook_enabled' => $this->webhookEnabled, + 'health_check_enabled' => $this->healthEnabled, + 'timestamp' => date('c'), + ]; + + // Calculate last run age + if ($lastRun) { + $lastRunTime = new DateTime($lastRun); + $health['last_run_age'] = $now->getTimestamp() - $lastRunTime->getTimestamp(); + } + + // Determine status based on whether jobs are due + if ($dueJobs > 0) { + // Jobs are due but haven't been run + if ($health['last_run_age'] === null || $health['last_run_age'] > 300) { // No run or older than 5 minutes + $health['status'] = 'warning'; + $health['message'] = $dueJobs . ' job(s) are due to run'; + } + } else { + // No jobs are due - this is healthy + $health['status'] = 'healthy'; + $health['message'] = 'No jobs currently due'; + } + + // Add queue stats if available + if ($this->jobQueue) { + $stats = $this->jobQueue->getStatistics(); + $health['queue_size'] = $stats['pending'] ?? 0; + $health['failed_jobs_24h'] = $stats['failed'] ?? 0; + } + + return $health; + } + + /** + * Process webhook trigger + * + * @param string|null $token + * @param string|null $jobId + * @return array + */ + public function processWebhookTrigger($token = null, $jobId = null): array + { + if (!$this->webhookEnabled) { + return ['success' => false, 'message' => 'Webhook triggers are not enabled']; + } + + if ($this->webhookToken && $token !== $this->webhookToken) { + return ['success' => false, 'message' => 'Invalid webhook token']; + } + + // Initialize system jobs if not already done + $grav = Grav::instance(); + if (count($this->jobs) === 0) { + // Trigger event to load system jobs (cache-purge, cache-clear, backups, etc.) + $grav->fireEvent('onSchedulerInitialized', new \RocketTheme\Toolbox\Event\Event(['scheduler' => $this])); + } + + // Load custom jobs + $this->loadSavedJobs(); + + if ($jobId) { + // Force run specific job + $job = $this->getJob($jobId); + if ($job) { + $job->inForeground()->run(); + $this->jobs_run[] = $job; + $this->saveJobStates(); + $this->updateLastRun(); + + return [ + 'success' => $job->isSuccessful(), + 'message' => $job->isSuccessful() ? 'Job force-executed successfully' : 'Job execution failed', + 'job_id' => $jobId, + 'forced' => true, + 'output' => $job->getOutput(), + ]; + } else { + return ['success' => false, 'message' => 'Job not found: ' . $jobId]; + } + } else { + // Run all due jobs + $this->run(); + + return [ + 'success' => true, + 'message' => 'Scheduler executed (due jobs only)', + 'jobs_run' => count($this->jobs_run), + 'timestamp' => date('c'), + ]; + } + } } diff --git a/system/src/Grav/Common/Scheduler/SchedulerController.php b/system/src/Grav/Common/Scheduler/SchedulerController.php new file mode 100644 index 000000000..6c9808df4 --- /dev/null +++ b/system/src/Grav/Common/Scheduler/SchedulerController.php @@ -0,0 +1,270 @@ +grav = $grav; + + // Get scheduler instance + $scheduler = $grav['scheduler']; + if ($scheduler instanceof ModernScheduler) { + $this->scheduler = $scheduler; + } else { + // Create ModernScheduler instance if not already + $this->scheduler = new ModernScheduler(); + } + } + + /** + * Handle health check endpoint + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + public function health(ServerRequestInterface $request): ResponseInterface + { + $config = $this->grav['config']->get('scheduler.modern', []); + + // Check if health endpoint is enabled + if (!($config['health']['enabled'] ?? true)) { + return $this->jsonResponse(['error' => 'Health check disabled'], 403); + } + + // Get health status + $health = $this->scheduler->getHealthStatus(); + + return $this->jsonResponse($health); + } + + /** + * Handle webhook trigger endpoint + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + public function webhook(ServerRequestInterface $request): ResponseInterface + { + $config = $this->grav['config']->get('scheduler.modern', []); + + // Check if webhook is enabled + if (!($config['webhook']['enabled'] ?? false)) { + return $this->jsonResponse(['error' => 'Webhook triggers disabled'], 403); + } + + // Get authorization header + $authHeader = $request->getHeaderLine('Authorization'); + $token = null; + + if (preg_match('/Bearer\s+(.+)$/i', $authHeader, $matches)) { + $token = $matches[1]; + } + + // Get query parameters + $params = $request->getQueryParams(); + $jobId = $params['job'] ?? null; + + // Process webhook + $result = $this->scheduler->processWebhookTrigger($token, $jobId); + + $statusCode = $result['success'] ? 200 : 400; + return $this->jsonResponse($result, $statusCode); + } + + /** + * Handle statistics endpoint + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + public function statistics(ServerRequestInterface $request): ResponseInterface + { + // Check if user is admin + $user = $this->grav['user'] ?? null; + if (!$user || !$user->authorize('admin.super')) { + return $this->jsonResponse(['error' => 'Unauthorized'], 401); + } + + $stats = $this->scheduler->getStatistics(); + + return $this->jsonResponse($stats); + } + + /** + * Handle admin AJAX requests for scheduler status + * + * @param ServerRequestInterface $request + * @return ResponseInterface + */ + public function adminStatus(ServerRequestInterface $request): ResponseInterface + { + // Check if user is admin + $user = $this->grav['user'] ?? null; + if (!$user || !$user->authorize('admin.scheduler')) { + return $this->jsonResponse(['error' => 'Unauthorized'], 401); + } + + $health = $this->scheduler->getHealthStatus(); + + // Format for admin display + $response = [ + 'health' => $this->formatHealthStatus($health), + 'triggers' => $this->formatTriggers($health['trigger_methods'] ?? []) + ]; + + return $this->jsonResponse($response); + } + + /** + * Format health status for display + * + * @param array $health + * @return string + */ + protected function formatHealthStatus(array $health): string + { + $status = $health['status'] ?? 'unknown'; + $lastRun = $health['last_run'] ?? null; + $queueSize = $health['queue_size'] ?? 0; + $failedJobs = $health['failed_jobs_24h'] ?? 0; + $jobsDue = $health['jobs_due'] ?? 0; + $message = $health['message'] ?? ''; + + $statusBadge = match($status) { + 'healthy' => 'Healthy', + 'warning' => 'Warning', + 'critical' => 'Critical', + default => 'Unknown' + }; + + $html = '
'; + $html .= '

Status: ' . $statusBadge; + if ($message) { + $html .= ' - ' . htmlspecialchars($message); + } + $html .= '

'; + + if ($lastRun) { + $lastRunTime = new \DateTime($lastRun); + $now = new \DateTime(); + $diff = $now->diff($lastRunTime); + + $timeAgo = ''; + if ($diff->d > 0) { + $timeAgo = $diff->d . ' day' . ($diff->d > 1 ? 's' : '') . ' ago'; + } elseif ($diff->h > 0) { + $timeAgo = $diff->h . ' hour' . ($diff->h > 1 ? 's' : '') . ' ago'; + } elseif ($diff->i > 0) { + $timeAgo = $diff->i . ' minute' . ($diff->i > 1 ? 's' : '') . ' ago'; + } else { + $timeAgo = 'Less than a minute ago'; + } + + $html .= '

Last Run: ' . $timeAgo . '

'; + } else { + $html .= '

Last Run: Never

'; + } + + $html .= '

Jobs Due: ' . $jobsDue . '

'; + $html .= '

Queue Size: ' . $queueSize . '

'; + + if ($failedJobs > 0) { + $html .= '

Failed Jobs (24h): ' . $failedJobs . '

'; + } + + $html .= '
'; + + return $html; + } + + /** + * Format triggers for display + * + * @param array $triggers + * @return string + */ + protected function formatTriggers(array $triggers): string + { + if (empty($triggers)) { + return '
No active triggers detected. Please set up cron, systemd, or webhook triggers.
'; + } + + $html = '
'; + $html .= ''; + $html .= '
'; + + return $html; + } + + /** + * Create JSON response + * + * @param array $data + * @param int $statusCode + * @return ResponseInterface + */ + protected function jsonResponse(array $data, int $statusCode = 200): ResponseInterface + { + $response = $this->grav['response'] ?? new \Nyholm\Psr7\Response(); + + $response = $response->withStatus($statusCode) + ->withHeader('Content-Type', 'application/json'); + + $body = $response->getBody(); + $body->write(json_encode($data)); + + return $response; + } +} \ No newline at end of file diff --git a/system/src/Grav/Common/Service/SchedulerServiceProvider.php b/system/src/Grav/Common/Service/SchedulerServiceProvider.php index e87412eec..8b2b04bab 100644 --- a/system/src/Grav/Common/Service/SchedulerServiceProvider.php +++ b/system/src/Grav/Common/Service/SchedulerServiceProvider.php @@ -10,6 +10,8 @@ namespace Grav\Common\Service; use Grav\Common\Scheduler\Scheduler; +use Grav\Common\Scheduler\JobQueue; +use Grav\Common\Scheduler\JobWorker; use Pimple\Container; use Pimple\ServiceProviderInterface; @@ -25,8 +27,38 @@ class SchedulerServiceProvider implements ServiceProviderInterface */ public function register(Container $container) { - $container['scheduler'] = function () { - return new Scheduler(); + $container['scheduler'] = function ($c) { + $config = $c['config']; + $scheduler = new Scheduler(); + + // Configure modern features if enabled + $modernConfig = $config->get('scheduler.modern', []); + if ($modernConfig['enabled'] ?? false) { + // Initialize components + $queuePath = $c['locator']->findResource('user-data://scheduler/queue', true, true); + $statusPath = $c['locator']->findResource('user-data://scheduler/status.yaml', true, true); + + // Set modern configuration on scheduler + $scheduler->setModernConfig($modernConfig); + + // Initialize job queue if enabled + if ($modernConfig['queue']['enabled'] ?? false) { + $jobQueue = new JobQueue($queuePath); + $scheduler->setJobQueue($jobQueue); + } + + // Initialize workers if enabled + if ($modernConfig['workers']['enabled'] ?? false) { + $workerCount = $modernConfig['workers']['count'] ?? 2; + $workers = []; + for ($i = 0; $i < $workerCount; $i++) { + $workers[] = new JobWorker("worker-{$i}"); + } + $scheduler->setWorkers($workers); + } + } + + return $scheduler; }; } } diff --git a/system/src/Grav/Console/Cli/SchedulerCommand.php b/system/src/Grav/Console/Cli/SchedulerCommand.php index 7e0cee360..a73266fc4 100644 --- a/system/src/Grav/Console/Cli/SchedulerCommand.php +++ b/system/src/Grav/Console/Cli/SchedulerCommand.php @@ -9,7 +9,7 @@ namespace Grav\Console\Cli; -use Cron\CronExpression; +use Dragonmantank\Cron\CronExpression; use Grav\Common\Grav; use Grav\Common\Utils; use Grav\Common\Scheduler\Scheduler;