diff --git a/system/blueprints/config/scheduler.yaml b/system/blueprints/config/scheduler.yaml index ab9a2c016..6bccafbbd 100644 --- a/system/blueprints/config/scheduler.yaml +++ b/system/blueprints/config/scheduler.yaml @@ -250,47 +250,69 @@ form: var statusEl = document.getElementById('scheduler-health-status'); if (!statusEl) return; - var html = '
'; + // Modern card-based layout + var statusColor = '#6c757d'; + var statusLabel = data.status || 'unknown'; + if (data.status === 'healthy') statusColor = '#28a745'; + else if (data.status === 'warning') statusColor = '#ffc107'; + else if (data.status === 'critical') statusColor = '#dc3545'; - // Status badge - var badge = 'secondary'; - if (data.status === 'healthy') badge = 'success'; - else if (data.status === 'warning') badge = 'warning'; - else if (data.status === 'critical') badge = 'danger'; + var html = '
'; - html += '

Status: ' + - (data.status || 'Unknown').toUpperCase() + '

'; + // Status card + html += '
'; + html += 'Status:'; + html += '' + statusLabel + ''; + html += '
'; - // Last run + // Info grid + html += '
'; + + // Last run card + html += '
'; + html += '
Last Run
'; if (data.last_run) { var age = data.last_run_age; var ageText = 'just now'; - if (age > 3600) { + if (age > 86400) { + ageText = Math.floor(age / 86400) + ' day(s) ago'; + } else if (age > 3600) { ageText = Math.floor(age / 3600) + ' hour(s) ago'; } else if (age > 60) { ageText = Math.floor(age / 60) + ' minute(s) ago'; } else if (age > 0) { ageText = age + ' second(s) ago'; } - html += '

Last Run: ' + ageText + '

'; + html += '
' + ageText + '
'; } else { - html += '

Last Run: Never

'; + html += '
Never
'; } - - // Jobs count - html += '

Scheduled Jobs: ' + (data.scheduled_jobs || 0) + '

'; - - // Queue size (if modern features enabled) - if (data.modern_features && data.queue_size !== undefined) { - html += '

Queue Size: ' + data.queue_size + '

'; - } - - // Failed jobs - if (data.failed_jobs_24h > 0) { - html += '

Failed (24h): ' + data.failed_jobs_24h + '

'; - } - html += '
'; + + // Jobs count card + html += '
'; + html += '
Scheduled Jobs
'; + html += '
' + (data.scheduled_jobs || 0) + '
'; + html += '
'; + + html += '
'; // Close grid + + // Additional info if available + if (data.modern_features && data.queue_size !== undefined) { + html += '
'; + html += 'Queue Size: '; + html += '' + data.queue_size + ''; + html += '
'; + } + + // Failed jobs warning + if (data.failed_jobs_24h > 0) { + html += '
'; + html += '⚠️ Failed Jobs (24h): ' + data.failed_jobs_24h; + html += '
'; + } + + html += '
'; // Close main container statusEl.innerHTML = html; }) .catch(error => { @@ -339,26 +361,46 @@ form: var triggersEl = document.getElementById('scheduler-triggers'); if (!triggersEl) return; - var html = ''; + html += '
'; // Add warning if no triggers active if (!cronReady && !data.webhook_enabled) { diff --git a/system/src/Grav/Common/Scheduler/Job.php b/system/src/Grav/Common/Scheduler/Job.php index 6967db37a..764885bc8 100644 --- a/system/src/Grav/Common/Scheduler/Job.php +++ b/system/src/Grav/Common/Scheduler/Job.php @@ -77,6 +77,40 @@ class Job private $successful = false; /** @var string|null */ private $backlink; + + // Modern Job features + /** @var int */ + protected $maxAttempts = 3; + /** @var int */ + protected $retryCount = 0; + /** @var int */ + protected $retryDelay = 60; // seconds + /** @var string */ + protected $retryStrategy = 'exponential'; // 'linear' or 'exponential' + /** @var float */ + protected $executionStartTime; + /** @var float */ + protected $executionDuration = 0; + /** @var int */ + protected $timeout = 300; // 5 minutes default + /** @var array */ + protected $dependencies = []; + /** @var array */ + protected $chainedJobs = []; + /** @var string|null */ + protected $queueId; + /** @var string */ + protected $priority = 'normal'; // 'high', 'normal', 'low' + /** @var array */ + protected $metadata = []; + /** @var array */ + protected $tags = []; + /** @var callable|null */ + protected $onSuccess; + /** @var callable|null */ + protected $onFailure; + /** @var callable|null */ + protected $onRetry; /** * Create a new Job instance. @@ -150,6 +184,16 @@ class Job return null; } + + /** + * Get raw arguments (array or string) + * + * @return array|string + */ + public function getRawArguments() + { + return $this->args; + } /** * @return CronExpression @@ -315,6 +359,13 @@ class Job */ public function run() { + // Check dependencies (modern feature) + if (!$this->checkDependencies()) { + $this->output = 'Dependencies not met'; + $this->successful = false; + return false; + } + // If the truthTest failed, don't run if ($this->truthTest !== true) { return false; @@ -340,6 +391,11 @@ class Job $args = is_string($this->args) ? explode(' ', $this->args) : $this->args; $command = array_merge([$this->command], $args); $process = new Process($command); + + // Apply timeout if set (modern feature) + if ($this->timeout > 0) { + $process->setTimeout($this->timeout); + } $this->process = $process; @@ -563,4 +619,455 @@ class Job return $this; } + + // Modern Job Methods + + /** + * Set maximum retry attempts + * + * @param int $attempts + * @return self + */ + public function maxAttempts(int $attempts): self + { + $this->maxAttempts = $attempts; + return $this; + } + + /** + * Get maximum retry attempts + * + * @return int + */ + public function getMaxAttempts(): int + { + return $this->maxAttempts; + } + + /** + * Set retry delay + * + * @param int $seconds + * @param string $strategy 'linear' or 'exponential' + * @return self + */ + public function retryDelay(int $seconds, string $strategy = 'exponential'): self + { + $this->retryDelay = $seconds; + $this->retryStrategy = $strategy; + return $this; + } + + /** + * Get current retry count + * + * @return int + */ + public function getRetryCount(): int + { + return $this->retryCount; + } + + /** + * Set job timeout + * + * @param int $seconds + * @return self + */ + public function timeout(int $seconds): self + { + $this->timeout = $seconds; + return $this; + } + + /** + * Set job priority + * + * @param string $priority 'high', 'normal', or 'low' + * @return self + */ + public function priority(string $priority): self + { + if (!in_array($priority, ['high', 'normal', 'low'])) { + throw new InvalidArgumentException('Priority must be high, normal, or low'); + } + $this->priority = $priority; + return $this; + } + + /** + * Get job priority + * + * @return string + */ + public function getPriority(): string + { + return $this->priority; + } + + /** + * Add job dependency + * + * @param string $jobId + * @return self + */ + public function dependsOn(string $jobId): self + { + $this->dependencies[] = $jobId; + return $this; + } + + /** + * Chain another job to run after this one + * + * @param Job $job + * @param bool $onlyOnSuccess Run only if current job succeeds + * @return self + */ + public function chain(Job $job, bool $onlyOnSuccess = true): self + { + $this->chainedJobs[] = [ + 'job' => $job, + 'onlyOnSuccess' => $onlyOnSuccess, + ]; + return $this; + } + + /** + * Add metadata to the job + * + * @param string $key + * @param mixed $value + * @return self + */ + public function withMetadata(string $key, $value): self + { + $this->metadata[$key] = $value; + return $this; + } + + /** + * Add tags to the job + * + * @param array $tags + * @return self + */ + public function withTags(array $tags): self + { + $this->tags = array_merge($this->tags, $tags); + return $this; + } + + /** + * Set success callback + * + * @param callable $callback + * @return self + */ + public function onSuccess(callable $callback): self + { + $this->onSuccess = $callback; + return $this; + } + + /** + * Set failure callback + * + * @param callable $callback + * @return self + */ + public function onFailure(callable $callback): self + { + $this->onFailure = $callback; + return $this; + } + + /** + * Set retry callback + * + * @param callable $callback + * @return self + */ + public function onRetry(callable $callback): self + { + $this->onRetry = $callback; + return $this; + } + + /** + * Run the job with retry support + * + * @return bool + */ + public function runWithRetry(): bool + { + $attempts = 0; + $lastException = null; + + while ($attempts < $this->maxAttempts) { + $attempts++; + $this->retryCount = $attempts - 1; + + try { + // Record execution start time + $this->executionStartTime = microtime(true); + + // Run the job + $result = $this->run(); + + // Record execution time + $this->executionDuration = microtime(true) - $this->executionStartTime; + + if ($result && $this->isSuccessful()) { + // Call success callback + if ($this->onSuccess) { + call_user_func($this->onSuccess, $this); + } + + // Run chained jobs + $this->runChainedJobs(true); + + return true; + } + + throw new RuntimeException('Job execution failed'); + + } catch (\Exception $e) { + $lastException = $e; + $this->output = $e->getMessage(); + $this->successful = false; + + if ($attempts < $this->maxAttempts) { + // Call retry callback + if ($this->onRetry) { + call_user_func($this->onRetry, $this, $attempts, $e); + } + + // Calculate delay before retry + $delay = $this->calculateRetryDelay($attempts); + if ($delay > 0) { + sleep($delay); + } + } else { + // Final failure + if ($this->onFailure) { + call_user_func($this->onFailure, $this, $e); + } + + // Run chained jobs that should run on failure + $this->runChainedJobs(false); + } + } + } + + return false; + } + + /** + * Get execution time in seconds + * + * @return float + */ + public function getExecutionTime(): float + { + return $this->executionDuration; + } + + /** + * Get job metadata + * + * @param string|null $key + * @return mixed + */ + public function getMetadata(string $key = null) + { + if ($key === null) { + return $this->metadata; + } + + return $this->metadata[$key] ?? null; + } + + /** + * Get job tags + * + * @return array + */ + public function getTags(): array + { + return $this->tags; + } + + /** + * Check if job has a specific tag + * + * @param string $tag + * @return bool + */ + public function hasTag(string $tag): bool + { + return in_array($tag, $this->tags); + } + + /** + * Set queue ID + * + * @param string $queueId + * @return self + */ + public function setQueueId(string $queueId): self + { + $this->queueId = $queueId; + return $this; + } + + /** + * Get queue ID + * + * @return string|null + */ + public function getQueueId(): ?string + { + return $this->queueId; + } + + /** + * Get process (for background jobs) + * + * @return Process|null + */ + public function getProcess(): ?Process + { + return $this->process; + } + + /** + * Calculate retry delay based on strategy + * + * @param int $attempt + * @return int + */ + protected function calculateRetryDelay(int $attempt): int + { + if ($this->retryStrategy === 'exponential') { + return min($this->retryDelay * pow(2, $attempt - 1), 3600); // Max 1 hour + } + + return $this->retryDelay; + } + + /** + * Check if dependencies are met + * + * @return bool + */ + protected function checkDependencies(): bool + { + if (empty($this->dependencies)) { + return true; + } + + // This would need to check against job history or status + // For now, we'll assume dependencies are met + // In a real implementation, this would check the Scheduler's job status + return true; + } + + /** + * Run chained jobs + * + * @param bool $success Whether the current job succeeded + * @return void + */ + protected function runChainedJobs(bool $success): void + { + foreach ($this->chainedJobs as $chainedJob) { + $shouldRun = !$chainedJob['onlyOnSuccess'] || $success; + + if ($shouldRun) { + $job = $chainedJob['job']; + if (method_exists($job, 'runWithRetry')) { + $job->runWithRetry(); + } else { + $job->run(); + } + } + } + } + + /** + * Convert job to array for serialization + * + * @return array + */ + public function toArray(): array + { + return [ + 'id' => $this->getId(), + 'command' => is_string($this->command) ? $this->command : 'Closure', + 'at' => $this->getAt(), + 'enabled' => $this->getEnabled(), + 'priority' => $this->priority, + 'max_attempts' => $this->maxAttempts, + 'retry_count' => $this->retryCount, + 'retry_delay' => $this->retryDelay, + 'retry_strategy' => $this->retryStrategy, + 'timeout' => $this->timeout, + 'dependencies' => $this->dependencies, + 'metadata' => $this->metadata, + 'tags' => $this->tags, + 'execution_time' => $this->executionDuration, + 'successful' => $this->successful, + 'output' => $this->output, + ]; + } + + /** + * Create job from array + * + * @param array $data + * @return self + */ + public static function fromArray(array $data): self + { + $job = new self($data['command'] ?? '', [], $data['id'] ?? null); + + if (isset($data['at'])) { + $job->at($data['at']); + } + + if (isset($data['priority'])) { + $job->priority($data['priority']); + } + + if (isset($data['max_attempts'])) { + $job->maxAttempts($data['max_attempts']); + } + + if (isset($data['retry_delay']) && isset($data['retry_strategy'])) { + $job->retryDelay($data['retry_delay'], $data['retry_strategy']); + } + + if (isset($data['timeout'])) { + $job->timeout($data['timeout']); + } + + if (isset($data['dependencies'])) { + foreach ($data['dependencies'] as $dep) { + $job->dependsOn($dep); + } + } + + if (isset($data['metadata'])) { + foreach ($data['metadata'] as $key => $value) { + $job->withMetadata($key, $value); + } + } + + if (isset($data['tags'])) { + $job->withTags($data['tags']); + } + + return $job; + } } diff --git a/system/src/Grav/Common/Scheduler/JobHistory.php b/system/src/Grav/Common/Scheduler/JobHistory.php new file mode 100644 index 000000000..8361d5c7f --- /dev/null +++ b/system/src/Grav/Common/Scheduler/JobHistory.php @@ -0,0 +1,462 @@ +historyPath = $historyPath; + $this->retentionDays = $retentionDays; + + // Ensure history directory exists + if (!is_dir($this->historyPath)) { + mkdir($this->historyPath, 0755, true); + } + } + + /** + * Log job execution + * + * @param Job $job + * @param array $metadata Additional metadata to store + * @return string Log entry ID + */ + public function logExecution(Job $job, array $metadata = []): string + { + $entryId = uniqid($job->getId() . '_', true); + $timestamp = new DateTime(); + + $entry = [ + 'id' => $entryId, + 'job_id' => $job->getId(), + 'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure', + 'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(), + 'executed_at' => $timestamp->format('c'), + 'timestamp' => $timestamp->getTimestamp(), + 'success' => $job->isSuccessful(), + 'output' => $this->captureOutput($job), + 'execution_time' => method_exists($job, 'getExecutionTime') ? $job->getExecutionTime() : null, + 'retry_count' => method_exists($job, 'getRetryCount') ? $job->getRetryCount() : 0, + 'priority' => method_exists($job, 'getPriority') ? $job->getPriority() : 'normal', + 'tags' => method_exists($job, 'getTags') ? $job->getTags() : [], + 'metadata' => array_merge( + method_exists($job, 'getMetadata') ? $job->getMetadata() : [], + $metadata + ), + ]; + + // Store in daily file + $this->storeEntry($entry); + + // Also store in job-specific history + $this->storeJobHistory($job->getId(), $entry); + + return $entryId; + } + + /** + * Capture job output with length limit + * + * @param Job $job + * @return array + */ + protected function captureOutput(Job $job): array + { + $output = $job->getOutput(); + $truncated = false; + + if (strlen($output) > $this->maxOutputLength) { + $output = substr($output, 0, $this->maxOutputLength); + $truncated = true; + } + + return [ + 'content' => $output, + 'truncated' => $truncated, + 'length' => strlen($job->getOutput()), + ]; + } + + /** + * Store entry in daily log file + * + * @param array $entry + * @return void + */ + protected function storeEntry(array $entry): void + { + $date = date('Y-m-d'); + $filename = $this->historyPath . '/' . $date . '.json'; + + $jsonFile = JsonFile::instance($filename); + $entries = $jsonFile->content() ?: []; + $entries[] = $entry; + $jsonFile->save($entries); + } + + /** + * Store job-specific history + * + * @param string $jobId + * @param array $entry + * @return void + */ + protected function storeJobHistory(string $jobId, array $entry): void + { + $jobDir = $this->historyPath . '/jobs'; + if (!is_dir($jobDir)) { + mkdir($jobDir, 0755, true); + } + + $filename = $jobDir . '/' . $jobId . '.json'; + $jsonFile = JsonFile::instance($filename); + $history = $jsonFile->content() ?: []; + + // Keep only last 100 executions per job + $history[] = $entry; + if (count($history) > 100) { + $history = array_slice($history, -100); + } + + $jsonFile->save($history); + } + + /** + * Get job history + * + * @param string $jobId + * @param int $limit + * @return array + */ + public function getJobHistory(string $jobId, int $limit = 50): array + { + $filename = $this->historyPath . '/jobs/' . $jobId . '.json'; + if (!file_exists($filename)) { + return []; + } + + $jsonFile = JsonFile::instance($filename); + $history = $jsonFile->content() ?: []; + + // Return most recent first + $history = array_reverse($history); + + if ($limit > 0) { + $history = array_slice($history, 0, $limit); + } + + return $history; + } + + /** + * Get history for a date range + * + * @param DateTime $startDate + * @param DateTime $endDate + * @param string|null $jobId Filter by job ID + * @return array + */ + public function getHistoryRange(DateTime $startDate, DateTime $endDate, ?string $jobId = null): array + { + $history = []; + $current = clone $startDate; + + while ($current <= $endDate) { + $filename = $this->historyPath . '/' . $current->format('Y-m-d') . '.json'; + if (file_exists($filename)) { + $jsonFile = JsonFile::instance($filename); + $entries = $jsonFile->content() ?: []; + + foreach ($entries as $entry) { + if ($jobId === null || $entry['job_id'] === $jobId) { + $history[] = $entry; + } + } + } + + $current->modify('+1 day'); + } + + return $history; + } + + /** + * Get job statistics + * + * @param string $jobId + * @param int $days Number of days to analyze + * @return array + */ + public function getJobStatistics(string $jobId, int $days = 7): array + { + $startDate = new DateTime("-{$days} days"); + $endDate = new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate, $jobId); + + if (empty($history)) { + return [ + 'total_runs' => 0, + 'successful_runs' => 0, + 'failed_runs' => 0, + 'success_rate' => 0, + 'average_execution_time' => 0, + 'last_run' => null, + 'last_success' => null, + 'last_failure' => null, + ]; + } + + $totalRuns = count($history); + $successfulRuns = 0; + $executionTimes = []; + $lastRun = null; + $lastSuccess = null; + $lastFailure = null; + + foreach ($history as $entry) { + if ($entry['success']) { + $successfulRuns++; + if (!$lastSuccess || $entry['timestamp'] > $lastSuccess['timestamp']) { + $lastSuccess = $entry; + } + } else { + if (!$lastFailure || $entry['timestamp'] > $lastFailure['timestamp']) { + $lastFailure = $entry; + } + } + + if (!$lastRun || $entry['timestamp'] > $lastRun['timestamp']) { + $lastRun = $entry; + } + + if (isset($entry['execution_time']) && $entry['execution_time'] > 0) { + $executionTimes[] = $entry['execution_time']; + } + } + + return [ + 'total_runs' => $totalRuns, + 'successful_runs' => $successfulRuns, + 'failed_runs' => $totalRuns - $successfulRuns, + 'success_rate' => $totalRuns > 0 ? round(($successfulRuns / $totalRuns) * 100, 2) : 0, + 'average_execution_time' => !empty($executionTimes) ? round(array_sum($executionTimes) / count($executionTimes), 3) : 0, + 'last_run' => $lastRun, + 'last_success' => $lastSuccess, + 'last_failure' => $lastFailure, + ]; + } + + /** + * Get global statistics + * + * @param int $days + * @return array + */ + public function getGlobalStatistics(int $days = 7): array + { + $startDate = new DateTime("-{$days} days"); + $endDate = new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate); + + $jobStats = []; + foreach ($history as $entry) { + $jobId = $entry['job_id']; + if (!isset($jobStats[$jobId])) { + $jobStats[$jobId] = [ + 'runs' => 0, + 'success' => 0, + 'failed' => 0, + ]; + } + + $jobStats[$jobId]['runs']++; + if ($entry['success']) { + $jobStats[$jobId]['success']++; + } else { + $jobStats[$jobId]['failed']++; + } + } + + return [ + 'total_executions' => count($history), + 'unique_jobs' => count($jobStats), + 'job_statistics' => $jobStats, + 'period_days' => $days, + 'from_date' => $startDate->format('Y-m-d'), + 'to_date' => $endDate->format('Y-m-d'), + ]; + } + + /** + * Search history + * + * @param array $criteria + * @return array + */ + public function searchHistory(array $criteria): array + { + $results = []; + + // Determine date range + $startDate = isset($criteria['start_date']) ? new DateTime($criteria['start_date']) : new DateTime('-7 days'); + $endDate = isset($criteria['end_date']) ? new DateTime($criteria['end_date']) : new DateTime('now'); + + $history = $this->getHistoryRange($startDate, $endDate, $criteria['job_id'] ?? null); + + foreach ($history as $entry) { + $match = true; + + // Filter by success status + if (isset($criteria['success']) && $entry['success'] !== $criteria['success']) { + $match = false; + } + + // Filter by output content + if (isset($criteria['output_contains']) && + stripos($entry['output']['content'], $criteria['output_contains']) === false) { + $match = false; + } + + // Filter by tags + if (isset($criteria['tags']) && is_array($criteria['tags'])) { + $entryTags = $entry['tags'] ?? []; + if (empty(array_intersect($criteria['tags'], $entryTags))) { + $match = false; + } + } + + if ($match) { + $results[] = $entry; + } + } + + // Sort results + if (isset($criteria['sort_by'])) { + usort($results, function($a, $b) use ($criteria) { + $field = $criteria['sort_by']; + $order = $criteria['sort_order'] ?? 'desc'; + + $aVal = $a[$field] ?? 0; + $bVal = $b[$field] ?? 0; + + if ($order === 'asc') { + return $aVal <=> $bVal; + } else { + return $bVal <=> $aVal; + } + }); + } + + // Limit results + if (isset($criteria['limit'])) { + $results = array_slice($results, 0, $criteria['limit']); + } + + return $results; + } + + /** + * Clean old history files + * + * @return int Number of files deleted + */ + public function cleanOldHistory(): int + { + $deleted = 0; + $cutoffDate = new DateTime("-{$this->retentionDays} days"); + + $files = glob($this->historyPath . '/*.json'); + foreach ($files as $file) { + $filename = basename($file, '.json'); + // Check if filename is a date + if (preg_match('/^\d{4}-\d{2}-\d{2}$/', $filename)) { + $fileDate = new DateTime($filename); + if ($fileDate < $cutoffDate) { + unlink($file); + $deleted++; + } + } + } + + return $deleted; + } + + /** + * Export history to CSV + * + * @param array $history + * @param string $filename + * @return bool + */ + public function exportToCsv(array $history, string $filename): bool + { + $handle = fopen($filename, 'w'); + if (!$handle) { + return false; + } + + // Write headers + fputcsv($handle, [ + 'Job ID', + 'Executed At', + 'Success', + 'Execution Time', + 'Output Length', + 'Retry Count', + 'Priority', + 'Tags', + ]); + + // Write data + foreach ($history as $entry) { + fputcsv($handle, [ + $entry['job_id'], + $entry['executed_at'], + $entry['success'] ? 'Yes' : 'No', + $entry['execution_time'] ?? '', + $entry['output']['length'] ?? 0, + $entry['retry_count'] ?? 0, + $entry['priority'] ?? 'normal', + implode(', ', $entry['tags'] ?? []), + ]); + } + + fclose($handle); + return true; + } +} \ No newline at end of file diff --git a/system/src/Grav/Common/Scheduler/JobQueue.php b/system/src/Grav/Common/Scheduler/JobQueue.php index b871d3b59..bdf4596c5 100644 --- a/system/src/Grav/Common/Scheduler/JobQueue.php +++ b/system/src/Grav/Common/Scheduler/JobQueue.php @@ -81,20 +81,18 @@ class JobQueue 'id' => $queueId, 'job_id' => $job->getId(), 'command' => is_string($job->getCommand()) ? $job->getCommand() : 'Closure', - 'arguments' => $job->getArguments(), + 'arguments' => method_exists($job, 'getRawArguments') ? $job->getRawArguments() : $job->getArguments(), 'priority' => $priority, 'timestamp' => $timestamp, 'attempts' => 0, - 'max_attempts' => $job instanceof ModernJob ? $job->getMaxAttempts() : 1, + 'max_attempts' => method_exists($job, 'getMaxAttempts') ? $job->getMaxAttempts() : 1, 'created_at' => date('c'), 'scheduled_for' => null, 'metadata' => [], ]; - // Serialize the job if it's a closure - if (!is_string($job->getCommand())) { - $queueItem['serialized_job'] = base64_encode(serialize($job)); - } + // Always serialize the job to preserve its full state + $queueItem['serialized_job'] = base64_encode(serialize($job)); $this->writeQueueItem($queueItem, 'pending'); @@ -130,7 +128,9 @@ class JobQueue */ public function pop(): ?Job { - $this->lock(); + if (!$this->lock()) { + return null; + } try { // Get all pending items @@ -188,6 +188,78 @@ class JobQueue } } + /** + * Pop a job from the queue with its queue ID + * + * @return array|null Array with 'job' and 'id' keys + */ + public function popWithId(): ?array + { + if (!$this->lock()) { + return null; + } + + try { + // Get all pending items + $items = $this->getPendingItems(); + + if (empty($items)) { + $this->unlock(); + return null; + } + + // Sort by priority and timestamp + usort($items, function($a, $b) { + $priorityOrder = [ + self::PRIORITY_HIGH => 0, + self::PRIORITY_NORMAL => 1, + self::PRIORITY_LOW => 2, + ]; + + $aPriority = $priorityOrder[$a['priority']] ?? 1; + $bPriority = $priorityOrder[$b['priority']] ?? 1; + + if ($aPriority !== $bPriority) { + return $aPriority - $bPriority; + } + + return $a['timestamp'] <=> $b['timestamp']; + }); + + // Get the first item that's ready to run + $now = new \DateTime(); + foreach ($items as $item) { + if ($item['scheduled_for']) { + $scheduledTime = new \DateTime($item['scheduled_for']); + if ($scheduledTime > $now) { + continue; // Skip items not yet due + } + } + + // Reconstruct the job first before moving it + $job = $this->reconstructJob($item); + + if (!$job) { + // Failed to reconstruct, skip this item + continue; + } + + // Move to processing only if we can reconstruct the job + $this->moveQueueItem($item['id'], 'pending', 'processing'); + + $this->unlock(); + return ['job' => $job, 'id' => $item['id']]; + } + + $this->unlock(); + return null; + + } catch (\Exception $e) { + $this->unlock(); + throw $e; + } + } + /** * Mark a job as completed * @@ -470,21 +542,36 @@ class JobQueue /** * Acquire lock for queue operations * - * @return void + * @return bool */ - protected function lock(): void + protected function lock(): bool { $attempts = 0; - while (file_exists($this->lockFile) && $attempts < 10) { - usleep(100000); // 100ms + $maxAttempts = 50; // 5 seconds total + + while ($attempts < $maxAttempts) { + // Check if lock file exists and is stale (older than 30 seconds) + if (file_exists($this->lockFile)) { + $lockAge = time() - filemtime($this->lockFile); + if ($lockAge > 30) { + // Stale lock, remove it + @unlink($this->lockFile); + } + } + + // Try to acquire lock atomically + $handle = @fopen($this->lockFile, 'x'); + if ($handle !== false) { + fclose($handle); + return true; + } + $attempts++; + usleep(100000); // 100ms } - if ($attempts >= 10) { - throw new RuntimeException('Could not acquire queue lock'); - } - - touch($this->lockFile); + // Could not acquire lock + return false; } /** diff --git a/system/src/Grav/Common/Scheduler/ModernScheduler.php b/system/src/Grav/Common/Scheduler/ModernScheduler.php index 9905524f7..4fa75b713 100644 --- a/system/src/Grav/Common/Scheduler/ModernScheduler.php +++ b/system/src/Grav/Common/Scheduler/ModernScheduler.php @@ -79,6 +79,51 @@ class ModernScheduler extends Scheduler return $this->modernConfig['enabled'] ?? false; } + /** + * Override to create ModernJob instances + * + * @param callable $fn + * @param array $args + * @param string|null $id + * @return ModernJob + */ + public function addFunction(callable $fn, $args = [], $id = null): Job + { + $job = new ModernJob($fn, $args, $id); + $this->queueJob($job->configure($this->config)); + + return $job; + } + + /** + * Override to create ModernJob instances + * + * @param string $command + * @param array $args + * @param string|null $id + * @return ModernJob + */ + public function addCommand($command, $args = [], $id = null): Job + { + $job = new ModernJob($command, $args, $id); + $this->queueJob($job->configure($this->config)); + + return $job; + } + + /** + * Override to create ModernJob instances + * + * @param string $command + * @param array $args + * @param string|null $id + * @return ModernJob + */ + public function raw($command, $args = [], $id = null): Job + { + return $this->addCommand($command, $args, $id); + } + /** * Initialize modern scheduler features * @@ -178,13 +223,13 @@ class ModernScheduler extends Scheduler */ protected function processQueuedJobs(): void { - $maxSize = $this->modernConfig['queue']['max_size'] ?? 1000; - + // Process existing queued jobs from previous runs while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) { $job = $this->jobQueue->pop(); if ($job) { $this->executeJob($job); + $this->jobs_run[] = $job; } } } @@ -196,7 +241,28 @@ class ModernScheduler extends Scheduler */ protected function processJobsWithWorkers(): void { - // Wait for all workers to complete + // Process all queued jobs + while (!$this->jobQueue->isEmpty()) { + // Wait if we've reached max workers + while (count($this->workers) >= $this->maxWorkers) { + foreach ($this->workers as $workerId => $process) { + if ($process instanceof Process && !$process->isRunning()) { + unset($this->workers[$workerId]); + } + } + if (count($this->workers) >= $this->maxWorkers) { + usleep(100000); // Wait 100ms + } + } + + // Get next job from queue + $job = $this->jobQueue->pop(); + if ($job) { + $this->executeJob($job); + } + } + + // Wait for all remaining workers to complete foreach ($this->workers as $workerId => $process) { if ($process instanceof Process) { $process->wait(); diff --git a/system/src/Grav/Common/Scheduler/Scheduler.php b/system/src/Grav/Common/Scheduler/Scheduler.php index dce3c6a25..abfb9def8 100644 --- a/system/src/Grav/Common/Scheduler/Scheduler.php +++ b/system/src/Grav/Common/Scheduler/Scheduler.php @@ -17,6 +17,7 @@ use InvalidArgumentException; use Symfony\Component\Process\PhpExecutableFinder; use Symfony\Component\Process\Process; use RocketTheme\Toolbox\File\YamlFile; +use Symfony\Component\Yaml\Yaml; use function is_callable; use function is_string; @@ -49,19 +50,55 @@ class Scheduler /** @var string */ private $status_path; + + // Modern features (backward compatible - disabled by default) + /** @var JobQueue|null */ + protected $jobQueue = null; + + /** @var array */ + protected $workers = []; + + /** @var int */ + protected $maxWorkers = 1; + + /** @var bool */ + protected $webhookEnabled = false; + + /** @var string|null */ + protected $webhookToken = null; + + /** @var bool */ + protected $healthEnabled = true; + + /** @var string */ + protected $queuePath; + + /** @var string */ + protected $historyPath; + + /** @var array */ + protected $modernConfig = []; /** * Create new instance. */ public function __construct() { - $config = Grav::instance()['config']->get('scheduler.defaults', []); + $grav = Grav::instance(); + $config = $grav['config']->get('scheduler.defaults', []); $this->config = $config; - $this->status_path = Grav::instance()['locator']->findResource('user-data://scheduler', true, true); + $locator = $grav['locator']; + $this->status_path = $locator->findResource('user-data://scheduler', true, true); if (!file_exists($this->status_path)) { Folder::create($this->status_path); } + + // Initialize modern features if enabled + $this->modernConfig = $grav['config']->get('scheduler.modern', []); + if ($this->modernConfig['enabled'] ?? false) { + $this->initializeModernFeatures($locator); + } } /** @@ -121,6 +158,16 @@ class Scheduler return [$background, $foreground]; } + /** + * Get the job queue + * + * @return JobQueue|null + */ + public function getJobQueue(): ?JobQueue + { + return $this->jobQueue; + } + /** * Get all jobs if they are disabled or not as one array * @@ -199,22 +246,44 @@ class Scheduler $runTime = new DateTime('now'); } - // Star processing jobs - foreach ($alljobs as $job) { - if ($job->isDue($runTime) || $force) { - $job->run(); - $this->jobs_run[] = $job; + // Process jobs based on modern features + if ($this->jobQueue && ($this->modernConfig['queue']['enabled'] ?? false)) { + // Queue jobs for processing + foreach ($alljobs as $job) { + if ($job->isDue($runTime) || $force) { + // Add to queue for concurrent processing + $this->jobQueue->push($job); + } + } + + // Process queue with workers + $this->processJobsWithWorkers(); + + // When using queue, states are saved by executeJob when jobs complete + // Don't save states here as jobs may still be processing + } else { + // Legacy processing (one at a time) + foreach ($alljobs as $job) { + if ($job->isDue($runTime) || $force) { + $job->run(); + $this->jobs_run[] = $job; + } + } + + // Finish handling any background jobs + foreach ($background as $job) { + $job->finalize(); + } + + // Store states for legacy mode + $this->saveJobStates(); + + // Save history if enabled + if (($this->modernConfig['history']['enabled'] ?? false) && $this->historyPath) { + $this->saveJobHistory(); } } - // Finish handling any background jobs - foreach ($background as $job) { - $job->finalize(); - } - - // Store states - $this->saveJobStates(); - // Store run date file_put_contents("logs/lastcron.run", (new DateTime("now"))->format("Y-m-d H:i:s"), LOCK_EX); } @@ -378,6 +447,54 @@ class Scheduler } + /** + * Initialize modern features + * + * @param mixed $locator + * @return void + */ + protected function initializeModernFeatures($locator): void + { + // Set up paths + $this->queuePath = $this->modernConfig['queue']['path'] ?? 'user-data://scheduler/queue'; + $this->queuePath = $locator->findResource($this->queuePath, true, true); + + $this->historyPath = $this->modernConfig['history']['path'] ?? 'user-data://scheduler/history'; + $this->historyPath = $locator->findResource($this->historyPath, true, true); + + // Create directories if they don't exist + if (!file_exists($this->queuePath)) { + Folder::create($this->queuePath); + } + + if (!file_exists($this->historyPath)) { + Folder::create($this->historyPath); + } + + // Initialize job queue + $this->jobQueue = new JobQueue($this->queuePath); + + // Configure workers + $this->maxWorkers = $this->modernConfig['workers'] ?? 1; + + // Configure webhook + $this->webhookEnabled = $this->modernConfig['webhook']['enabled'] ?? false; + $this->webhookToken = $this->modernConfig['webhook']['token'] ?? null; + + // Configure health check + $this->healthEnabled = $this->modernConfig['health']['enabled'] ?? true; + } + + /** + * Get the job queue + * + * @return JobQueue|null + */ + public function getQueue(): ?JobQueue + { + return $this->jobQueue; + } + /** * Check if webhook is enabled * @@ -385,9 +502,7 @@ class Scheduler */ public function isWebhookEnabled(): bool { - // Check config for webhook settings even in base scheduler - $config = Grav::instance()['config']; - return $config->get('scheduler.modern.webhook.enabled', false); + return $this->webhookEnabled; } /** @@ -478,4 +593,335 @@ class Scheduler return $job; } + + /** + * Process jobs using multiple workers + * + * @return void + */ + protected function processJobsWithWorkers(): void + { + if (!$this->jobQueue) { + return; + } + + // Process all queued jobs + while (!$this->jobQueue->isEmpty()) { + // Wait if we've reached max workers + while (count($this->workers) >= $this->maxWorkers) { + foreach ($this->workers as $workerId => $worker) { + $process = null; + if (is_array($worker) && isset($worker['process'])) { + $process = $worker['process']; + } elseif ($worker instanceof Process) { + $process = $worker; + } + + if ($process instanceof Process && !$process->isRunning()) { + // Finalize job if needed + if (is_array($worker) && isset($worker['job'])) { + $worker['job']->finalize(); + + // Save job state + $this->saveJobState($worker['job']); + + // Update queue status + if (isset($worker['queueId']) && $this->jobQueue) { + if ($worker['job']->isSuccessful()) { + $this->jobQueue->complete($worker['queueId']); + } else { + $this->jobQueue->fail($worker['queueId'], $worker['job']->getOutput() ?: 'Job failed'); + } + } + } + unset($this->workers[$workerId]); + } + } + if (count($this->workers) >= $this->maxWorkers) { + usleep(100000); // Wait 100ms + } + } + + // Get next job from queue + $queueItem = $this->jobQueue->popWithId(); + if ($queueItem) { + $this->executeJob($queueItem['job'], $queueItem['id']); + } + } + + // Wait for all remaining workers to complete + foreach ($this->workers as $workerId => $worker) { + if (is_array($worker) && isset($worker['process'])) { + $process = $worker['process']; + if ($process instanceof Process) { + $process->wait(); + + // Finalize and save state for background jobs + if (isset($worker['job'])) { + $worker['job']->finalize(); + $this->saveJobState($worker['job']); + } + + // Update queue status for background jobs + if (isset($worker['queueId']) && $this->jobQueue) { + $job = $worker['job']; + if ($job->isSuccessful()) { + $this->jobQueue->complete($worker['queueId']); + } else { + $this->jobQueue->fail($worker['queueId'], $job->getOutput() ?: 'Job execution failed'); + } + } + + unset($this->workers[$workerId]); + } + } elseif ($worker instanceof Process) { + // Legacy format + $worker->wait(); + unset($this->workers[$workerId]); + } + } + } + + /** + * Process existing queued jobs + * + * @return void + */ + protected function processQueuedJobs(): void + { + if (!$this->jobQueue) { + return; + } + + // Process any existing queued jobs from previous runs + while (!$this->jobQueue->isEmpty() && count($this->workers) < $this->maxWorkers) { + $job = $this->jobQueue->pop(); + if ($job) { + $this->executeJob($job); + } + } + } + + /** + * Execute a job + * + * @param Job $job + * @param string|null $queueId Queue ID if job came from queue + * @return void + */ + protected function executeJob(Job $job, ?string $queueId = null): void + { + $job->run(); + $this->jobs_run[] = $job; + + // Save job state after execution + $this->saveJobState($job); + + // Check if job runs in background + if ($job->runInBackground()) { + // Background job - track it for later completion + $process = $job->getProcess(); + if ($process && $process->isStarted()) { + $this->workers[] = [ + 'process' => $process, + 'job' => $job, + 'queueId' => $queueId + ]; + // Don't update queue status yet - will be done when process completes + return; + } + } + + // Foreground job or background job that didn't start - update queue status immediately + if ($queueId && $this->jobQueue) { + // Job has already been finalized if it ran in foreground + if (!$job->runInBackground()) { + $job->finalize(); + } + + if ($job->isSuccessful()) { + // Move from processing to completed + $this->jobQueue->complete($queueId); + } else { + // Move from processing to failed + $this->jobQueue->fail($queueId, $job->getOutput() ?: 'Job execution failed'); + } + } + } + + /** + * Save state for a single job + * + * @param Job $job + * @return void + */ + protected function saveJobState(Job $job): void + { + $grav = Grav::instance(); + $locator = $grav['locator']; + $statusFile = $locator->findResource('user-data://scheduler/status.yaml', true, true); + + $status = []; + if (file_exists($statusFile)) { + $status = Yaml::parseFile($statusFile) ?: []; + } + + // Update job status + $status[$job->getId()] = [ + 'state' => $job->isSuccessful() ? 'success' : 'failure', + 'last-run' => time(), + ]; + + // Add error if job failed + if (!$job->isSuccessful()) { + $output = $job->getOutput(); + if ($output) { + $status[$job->getId()]['error'] = $output; + } else { + $status[$job->getId()]['error'] = null; + } + } + + file_put_contents($statusFile, Yaml::dump($status)); + } + + /** + * Save job execution history + * + * @return void + */ + protected function saveJobHistory(): void + { + if (!$this->historyPath) { + return; + } + + $history = []; + foreach ($this->jobs_run as $job) { + $history[] = [ + 'id' => $job->getId(), + 'executed_at' => date('c'), + 'success' => $job->isSuccessful(), + 'output' => substr($job->getOutput(), 0, 1000), + ]; + } + + if (!empty($history)) { + $filename = $this->historyPath . '/' . date('Y-m-d') . '.json'; + $existing = file_exists($filename) ? json_decode(file_get_contents($filename), true) : []; + $existing = array_merge($existing, $history); + file_put_contents($filename, json_encode($existing, JSON_PRETTY_PRINT)); + } + } + + /** + * Update last run timestamp + * + * @return void + */ + protected function updateLastRun(): void + { + $lastRunFile = $this->status_path . '/last_run.txt'; + file_put_contents($lastRunFile, date('Y-m-d H:i:s')); + } + + /** + * Get health status + * + * @return array + */ + public function getHealthStatus(): array + { + $lastRunFile = $this->status_path . '/last_run.txt'; + $lastRun = file_exists($lastRunFile) ? file_get_contents($lastRunFile) : null; + + $health = [ + 'status' => 'healthy', + 'last_run' => $lastRun, + 'last_run_age' => null, + 'queue_size' => 0, + 'failed_jobs_24h' => 0, + 'scheduled_jobs' => count($this->getAllJobs()), + 'modern_features' => $this->modernConfig['enabled'] ?? false, + 'webhook_enabled' => $this->webhookEnabled, + 'health_check_enabled' => $this->healthEnabled, + 'timestamp' => date('c'), + ]; + + // Calculate last run age + if ($lastRun) { + $lastRunTime = new DateTime($lastRun); + $now = new DateTime('now'); + $health['last_run_age'] = $now->getTimestamp() - $lastRunTime->getTimestamp(); + + // Determine status based on age + if ($health['last_run_age'] < 600) { // Less than 10 minutes + $health['status'] = 'healthy'; + } elseif ($health['last_run_age'] < 3600) { // Less than 1 hour + $health['status'] = 'warning'; + } else { + $health['status'] = 'critical'; + } + } else { + $health['status'] = 'unknown'; + } + + // Add queue stats if available + if ($this->jobQueue) { + $stats = $this->jobQueue->getStatistics(); + $health['queue_size'] = $stats['pending'] ?? 0; + $health['failed_jobs_24h'] = $stats['failed'] ?? 0; + } + + return $health; + } + + /** + * Process webhook trigger + * + * @param string|null $token + * @param string|null $jobId + * @return array + */ + public function processWebhookTrigger($token = null, $jobId = null): array + { + if (!$this->webhookEnabled) { + return ['success' => false, 'message' => 'Webhook triggers are not enabled']; + } + + if ($this->webhookToken && $token !== $this->webhookToken) { + return ['success' => false, 'message' => 'Invalid webhook token']; + } + + if ($jobId) { + // Force run specific job + $job = $this->getJob($jobId); + if ($job) { + $job->inForeground()->run(); + $this->jobs_run[] = $job; + $this->saveJobStates(); + $this->updateLastRun(); + + return [ + 'success' => $job->isSuccessful(), + 'message' => $job->isSuccessful() ? 'Job force-executed successfully' : 'Job execution failed', + 'job_id' => $jobId, + 'forced' => true, + 'output' => $job->getOutput(), + ]; + } else { + return ['success' => false, 'message' => 'Job not found: ' . $jobId]; + } + } else { + // Run all due jobs + $this->run(); + + return [ + 'success' => true, + 'message' => 'Scheduler executed (due jobs only)', + 'jobs_run' => count($this->jobs_run), + 'timestamp' => date('c'), + ]; + } + } } diff --git a/system/src/Grav/Common/Service/SchedulerServiceProvider.php b/system/src/Grav/Common/Service/SchedulerServiceProvider.php index f854dbebf..8b2b04bab 100644 --- a/system/src/Grav/Common/Service/SchedulerServiceProvider.php +++ b/system/src/Grav/Common/Service/SchedulerServiceProvider.php @@ -10,7 +10,8 @@ namespace Grav\Common\Service; use Grav\Common\Scheduler\Scheduler; -use Grav\Common\Scheduler\ModernScheduler; +use Grav\Common\Scheduler\JobQueue; +use Grav\Common\Scheduler\JobWorker; use Pimple\Container; use Pimple\ServiceProviderInterface; @@ -28,15 +29,36 @@ class SchedulerServiceProvider implements ServiceProviderInterface { $container['scheduler'] = function ($c) { $config = $c['config']; + $scheduler = new Scheduler(); - // Use ModernScheduler if modern features are enabled - $modernEnabled = $config->get('scheduler.modern.enabled', false); - - if ($modernEnabled) { - return new ModernScheduler(); + // Configure modern features if enabled + $modernConfig = $config->get('scheduler.modern', []); + if ($modernConfig['enabled'] ?? false) { + // Initialize components + $queuePath = $c['locator']->findResource('user-data://scheduler/queue', true, true); + $statusPath = $c['locator']->findResource('user-data://scheduler/status.yaml', true, true); + + // Set modern configuration on scheduler + $scheduler->setModernConfig($modernConfig); + + // Initialize job queue if enabled + if ($modernConfig['queue']['enabled'] ?? false) { + $jobQueue = new JobQueue($queuePath); + $scheduler->setJobQueue($jobQueue); + } + + // Initialize workers if enabled + if ($modernConfig['workers']['enabled'] ?? false) { + $workerCount = $modernConfig['workers']['count'] ?? 2; + $workers = []; + for ($i = 0; $i < $workerCount; $i++) { + $workers[] = new JobWorker("worker-{$i}"); + } + $scheduler->setWorkers($workers); + } } - return new Scheduler(); + return $scheduler; }; } }