diff --git a/aiScanner/management/__init__.py b/aiScanner/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aiScanner/management/commands/__init__.py b/aiScanner/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aiScanner/management/commands/run_scheduled_scans.py b/aiScanner/management/commands/run_scheduled_scans.py new file mode 100644 index 000000000..c94770a11 --- /dev/null +++ b/aiScanner/management/commands/run_scheduled_scans.py @@ -0,0 +1,330 @@ +from django.core.management.base import BaseCommand +from django.utils import timezone +from datetime import datetime, timedelta +import json +import time + + +class Command(BaseCommand): + help = 'Run scheduled AI security scans' + + def add_arguments(self, parser): + parser.add_argument( + '--daemon', + action='store_true', + help='Run as daemon, checking for scheduled scans every minute', + ) + parser.add_argument( + '--scan-id', + type=int, + help='Run a specific scheduled scan by ID', + ) + + def handle(self, *args, **options): + if options['daemon']: + self.stdout.write('Starting scheduled scan daemon...') + self.run_daemon() + elif options['scan_id']: + self.stdout.write(f'Running scheduled scan ID {options["scan_id"]}...') + self.run_scheduled_scan_by_id(options['scan_id']) + else: + self.stdout.write('Checking for scheduled scans to run...') + self.check_and_run_scans() + + def run_daemon(self): + """Run as daemon, checking for scans every minute""" + while True: + try: + self.check_and_run_scans() + time.sleep(60) # Check every minute + except KeyboardInterrupt: + self.stdout.write('Daemon stopped by user') + break + except Exception as e: + self.stderr.write(f'Error in daemon: {str(e)}') + time.sleep(60) # Continue after error + + def check_and_run_scans(self): + """Check for scheduled scans that need to run""" + from aiScanner.models import ScheduledScan + + now = timezone.now() + + # Find scans that are due to run + due_scans = ScheduledScan.objects.filter( + status='active', + next_run__lte=now + ) + + for scan in due_scans: + self.stdout.write(f'Running scheduled scan: {scan.name} (ID: {scan.id})') + self.execute_scheduled_scan(scan) + + def run_scheduled_scan_by_id(self, scan_id): + """Run a specific scheduled scan by ID""" + from aiScanner.models import ScheduledScan + + try: + scan = ScheduledScan.objects.get(id=scan_id) + self.stdout.write(f'Running scheduled scan: {scan.name}') + self.execute_scheduled_scan(scan) + except ScheduledScan.DoesNotExist: + self.stderr.write(f'Scheduled scan with ID {scan_id} not found') + + def execute_scheduled_scan(self, scheduled_scan): + """Execute a scheduled scan""" + from aiScanner.models import ScheduledScanExecution, ScanHistory + from aiScanner.aiScannerManager import AIScannerManager + from loginSystem.models import Administrator + from websiteFunctions.models import Websites + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + + # Create execution record + execution = ScheduledScanExecution.objects.create( + scheduled_scan=scheduled_scan, + status='running', + started_at=timezone.now() + ) + + try: + # Update last run time + scheduled_scan.last_run = timezone.now() + scheduled_scan.next_run = scheduled_scan.calculate_next_run() + scheduled_scan.save() + + # Get domains to scan + domains_to_scan = [] + admin = scheduled_scan.admin + + # Validate domains still exist and user has access + for domain in scheduled_scan.domain_list: + try: + website = Websites.objects.get(domain=domain, admin=admin) + domains_to_scan.append(domain) + except Websites.DoesNotExist: + logging.writeToFile(f'[Scheduled Scan] Domain {domain} no longer accessible for user {admin.userName}') + continue + + if not domains_to_scan: + execution.status = 'failed' + execution.error_message = 'No accessible domains found for scanning' + execution.completed_at = timezone.now() + execution.save() + self.stderr.write(f'No accessible domains for scheduled scan {scheduled_scan.name}') + return + + execution.set_scanned_domains(domains_to_scan) + execution.total_scans = len(domains_to_scan) + execution.save() + + # Initialize scanner manager + sm = AIScannerManager() + scan_ids = [] + successful_scans = 0 + failed_scans = 0 + total_cost = 0.0 + + # Execute scans for each domain + for domain in domains_to_scan: + try: + self.stdout.write(f'Starting scan for domain: {domain}') + + # Create a fake request object for the scanner manager + class FakeRequest: + def __init__(self, admin_id): + self.session = {'userID': admin_id} + self.method = 'POST' + self.POST = { + 'domain': domain, + 'scan_type': scheduled_scan.scan_type + } + + fake_request = FakeRequest(admin.pk) + + # Start the scan + result = sm.startScan(fake_request, admin.pk) + + if hasattr(result, 'content'): + # It's an HTTP response, parse the JSON + import json + response_data = json.loads(result.content.decode('utf-8')) + else: + # It's already a dict + response_data = result + + if response_data.get('success'): + scan_id = response_data.get('scan_id') + if scan_id: + scan_ids.append(scan_id) + successful_scans += 1 + + # Get cost estimate if available + if 'cost_estimate' in response_data: + total_cost += float(response_data['cost_estimate']) + + logging.writeToFile(f'[Scheduled Scan] Successfully started scan {scan_id} for {domain}') + else: + failed_scans += 1 + logging.writeToFile(f'[Scheduled Scan] Failed to get scan ID for {domain}') + else: + failed_scans += 1 + error_msg = response_data.get('error', 'Unknown error') + logging.writeToFile(f'[Scheduled Scan] Failed to start scan for {domain}: {error_msg}') + + except Exception as e: + failed_scans += 1 + error_msg = str(e) + logging.writeToFile(f'[Scheduled Scan] Exception starting scan for {domain}: {error_msg}') + + # Small delay between scans to avoid overwhelming the system + time.sleep(2) + + # Update execution record + execution.successful_scans = successful_scans + execution.failed_scans = failed_scans + execution.total_cost = total_cost + execution.set_scan_ids(scan_ids) + execution.status = 'completed' if failed_scans == 0 else 'completed' # Always completed if we tried all + execution.completed_at = timezone.now() + execution.save() + + # Send notifications if configured + if scheduled_scan.email_notifications: + self.send_notifications(scheduled_scan, execution) + + self.stdout.write( + f'Scheduled scan completed: {successful_scans} successful, {failed_scans} failed' + ) + + except Exception as e: + # Update execution record with error + execution.status = 'failed' + execution.error_message = str(e) + execution.completed_at = timezone.now() + execution.save() + + logging.writeToFile(f'[Scheduled Scan] Failed to execute scheduled scan {scheduled_scan.name}: {str(e)}') + self.stderr.write(f'Failed to execute scheduled scan {scheduled_scan.name}: {str(e)}') + + # Send failure notification + if scheduled_scan.email_notifications and scheduled_scan.notify_on_failure: + self.send_failure_notification(scheduled_scan, str(e)) + + def send_notifications(self, scheduled_scan, execution): + """Send email notifications for completed scan""" + try: + # Determine if we should send notification + should_notify = False + + if execution.status == 'failed' and scheduled_scan.notify_on_failure: + should_notify = True + elif execution.status == 'completed': + if scheduled_scan.notify_on_completion: + should_notify = True + elif scheduled_scan.notify_on_threats and execution.successful_scans > 0: + # Check if any scans found threats + # This would require checking the scan results, which might not be available immediately + # For now, we'll just send completion notifications + should_notify = scheduled_scan.notify_on_completion + + if should_notify: + self.send_execution_notification(scheduled_scan, execution) + + except Exception as e: + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + logging.writeToFile(f'[Scheduled Scan] Failed to send notification: {str(e)}') + + def send_execution_notification(self, scheduled_scan, execution): + """Send notification email for scan execution""" + try: + # Get notification emails + notification_emails = scheduled_scan.notification_email_list + if not notification_emails: + # Use admin email as fallback + notification_emails = [scheduled_scan.admin.email] if scheduled_scan.admin.email else [] + + if not notification_emails: + return + + # Prepare email content + subject = f'AI Scanner: Scheduled Scan "{scheduled_scan.name}" Completed' + + status_text = execution.status.title() + if execution.status == 'completed': + if execution.failed_scans == 0: + status_text = 'Completed Successfully' + else: + status_text = f'Completed with {execution.failed_scans} failures' + + message = f""" +Scheduled AI Security Scan Report + +Scan Name: {scheduled_scan.name} +Status: {status_text} +Execution Time: {execution.execution_time.strftime('%Y-%m-%d %H:%M:%S UTC')} + +Results: +- Total Domains: {execution.total_scans} +- Successful Scans: {execution.successful_scans} +- Failed Scans: {execution.failed_scans} +- Total Cost: ${execution.total_cost:.4f} + +Domains Scanned: {', '.join(execution.scanned_domains)} + +{f'Error Message: {execution.error_message}' if execution.error_message else ''} + +Scan IDs: {', '.join(execution.scan_id_list)} + +View detailed results in your CyberPanel AI Scanner dashboard. +""" + + # Send email using CyberPanel's email system + from plogical.mailUtilities import mailUtilities + mailUtilities.sendEmail(notification_emails[0], subject, message) + + # Log notification sent + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + logging.writeToFile(f'[Scheduled Scan] Notification sent for {scheduled_scan.name} to {len(notification_emails)} recipients') + + except Exception as e: + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + logging.writeToFile(f'[Scheduled Scan] Failed to send notification email: {str(e)}') + + def send_failure_notification(self, scheduled_scan, error_message): + """Send notification email for scan failure""" + try: + # Get notification emails + notification_emails = scheduled_scan.notification_email_list + if not notification_emails: + # Use admin email as fallback + notification_emails = [scheduled_scan.admin.email] if scheduled_scan.admin.email else [] + + if not notification_emails: + return + + # Prepare email content + subject = f'AI Scanner: Scheduled Scan "{scheduled_scan.name}" Failed' + + message = f""" +Scheduled AI Security Scan Failure + +Scan Name: {scheduled_scan.name} +Status: Failed +Time: {timezone.now().strftime('%Y-%m-%d %H:%M:%S UTC')} + +Error: {error_message} + +Please check your CyberPanel AI Scanner configuration and try again. +""" + + # Send email using CyberPanel's email system + from plogical.mailUtilities import mailUtilities + mailUtilities.sendEmail(notification_emails[0], subject, message) + + # Log notification sent + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + logging.writeToFile(f'[Scheduled Scan] Failure notification sent for {scheduled_scan.name}') + + except Exception as e: + from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + logging.writeToFile(f'[Scheduled Scan] Failed to send failure notification email: {str(e)}') \ No newline at end of file diff --git a/aiScanner/models.py b/aiScanner/models.py index 13789d55e..e221f778b 100644 --- a/aiScanner/models.py +++ b/aiScanner/models.py @@ -107,3 +107,216 @@ class FileAccessToken(models.Model): def is_expired(self): from django.utils import timezone return timezone.now() > self.expires_at + + +class ScheduledScan(models.Model): + """Store scheduled scan configurations""" + FREQUENCY_CHOICES = [ + ('daily', 'Daily'), + ('weekly', 'Weekly'), + ('monthly', 'Monthly'), + ('quarterly', 'Quarterly'), + ] + + SCAN_TYPE_CHOICES = [ + ('full', 'Full Scan'), + ('quick', 'Quick Scan'), + ('custom', 'Custom Scan'), + ] + + STATUS_CHOICES = [ + ('active', 'Active'), + ('paused', 'Paused'), + ('disabled', 'Disabled'), + ] + + admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scheduled_scans') + name = models.CharField(max_length=200, help_text="Name for this scheduled scan") + domains = models.TextField(help_text="JSON array of domains to scan") + frequency = models.CharField(max_length=20, choices=FREQUENCY_CHOICES, default='weekly') + scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full') + time_of_day = models.TimeField(help_text="Time of day to run the scan (UTC)") + day_of_week = models.IntegerField(null=True, blank=True, help_text="Day of week for weekly scans (0=Monday, 6=Sunday)") + day_of_month = models.IntegerField(null=True, blank=True, help_text="Day of month for monthly scans (1-31)") + status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='active') + last_run = models.DateTimeField(null=True, blank=True) + next_run = models.DateTimeField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + # Notification settings + email_notifications = models.BooleanField(default=True) + notification_emails = models.TextField(blank=True, help_text="JSON array of email addresses") + notify_on_threats = models.BooleanField(default=True) + notify_on_completion = models.BooleanField(default=False) + notify_on_failure = models.BooleanField(default=True) + + class Meta: + db_table = 'ai_scanner_scheduled_scans' + ordering = ['-created_at'] + + def __str__(self): + return f"Scheduled Scan: {self.name} ({self.frequency})" + + @property + def domain_list(self): + """Parse domains JSON""" + if self.domains: + try: + return json.loads(self.domains) + except json.JSONDecodeError: + return [] + return [] + + @property + def notification_email_list(self): + """Parse notification emails JSON""" + if self.notification_emails: + try: + return json.loads(self.notification_emails) + except json.JSONDecodeError: + return [] + return [] + + def set_domains(self, domain_list): + """Set domains from list""" + self.domains = json.dumps(domain_list) + + def set_notification_emails(self, email_list): + """Set notification emails from list""" + self.notification_emails = json.dumps(email_list) + + def calculate_next_run(self): + """Calculate next run time based on frequency""" + from django.utils import timezone + from datetime import datetime, timedelta + import calendar + + now = timezone.now() + + if self.frequency == 'daily': + # Daily: next run is tomorrow at specified time + next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0) + if next_run <= now: + next_run += timedelta(days=1) + + elif self.frequency == 'weekly': + # Weekly: next run is on specified day of week at specified time + days_ahead = self.day_of_week - now.weekday() + if days_ahead <= 0: # Target day already happened this week + days_ahead += 7 + next_run = now + timedelta(days=days_ahead) + next_run = next_run.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0) + + elif self.frequency == 'monthly': + # Monthly: next run is on specified day of month at specified time + year = now.year + month = now.month + day = min(self.day_of_month, calendar.monthrange(year, month)[1]) + + next_run = now.replace(day=day, hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0) + + if next_run <= now: + # Move to next month + if month == 12: + year += 1 + month = 1 + else: + month += 1 + day = min(self.day_of_month, calendar.monthrange(year, month)[1]) + next_run = next_run.replace(year=year, month=month, day=day) + + elif self.frequency == 'quarterly': + # Quarterly: next run is 3 months from now + next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0) + month = now.month + year = now.year + + # Add 3 months + month += 3 + if month > 12: + year += 1 + month -= 12 + + day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1]) + next_run = next_run.replace(year=year, month=month, day=day) + + if next_run <= now: + # Add another 3 months + month += 3 + if month > 12: + year += 1 + month -= 12 + day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1]) + next_run = next_run.replace(year=year, month=month, day=day) + + else: + # Default to weekly + next_run = now + timedelta(weeks=1) + + return next_run + + def save(self, *args, **kwargs): + """Override save to calculate next_run""" + if not self.next_run or self.status == 'active': + self.next_run = self.calculate_next_run() + super().save(*args, **kwargs) + + +class ScheduledScanExecution(models.Model): + """Track individual executions of scheduled scans""" + STATUS_CHOICES = [ + ('pending', 'Pending'), + ('running', 'Running'), + ('completed', 'Completed'), + ('failed', 'Failed'), + ('cancelled', 'Cancelled'), + ] + + scheduled_scan = models.ForeignKey(ScheduledScan, on_delete=models.CASCADE, related_name='executions') + execution_time = models.DateTimeField(auto_now_add=True) + status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending') + domains_scanned = models.TextField(blank=True, help_text="JSON array of domains that were scanned") + total_scans = models.IntegerField(default=0) + successful_scans = models.IntegerField(default=0) + failed_scans = models.IntegerField(default=0) + total_cost = models.DecimalField(max_digits=10, decimal_places=6, default=0.0) + scan_ids = models.TextField(blank=True, help_text="JSON array of scan IDs created") + error_message = models.TextField(blank=True, null=True) + started_at = models.DateTimeField(null=True, blank=True) + completed_at = models.DateTimeField(null=True, blank=True) + + class Meta: + db_table = 'ai_scanner_scheduled_executions' + ordering = ['-execution_time'] + + def __str__(self): + return f"Execution of {self.scheduled_scan.name} at {self.execution_time}" + + @property + def scanned_domains(self): + """Parse domains scanned JSON""" + if self.domains_scanned: + try: + return json.loads(self.domains_scanned) + except json.JSONDecodeError: + return [] + return [] + + @property + def scan_id_list(self): + """Parse scan IDs JSON""" + if self.scan_ids: + try: + return json.loads(self.scan_ids) + except json.JSONDecodeError: + return [] + return [] + + def set_scanned_domains(self, domain_list): + """Set scanned domains from list""" + self.domains_scanned = json.dumps(domain_list) + + def set_scan_ids(self, scan_id_list): + """Set scan IDs from list""" + self.scan_ids = json.dumps(scan_id_list) diff --git a/aiScanner/scheduled_views.py b/aiScanner/scheduled_views.py new file mode 100644 index 000000000..e886a6121 --- /dev/null +++ b/aiScanner/scheduled_views.py @@ -0,0 +1,270 @@ +from django.shortcuts import render, redirect +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +from loginSystem.views import loadLoginPage +import json + + +@require_http_methods(['GET', 'POST']) +def scheduledScans(request): + """Manage scheduled scans""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScheduledScan + from plogical.acl import ACLManager + + admin = Administrator.objects.get(pk=userID) + currentACL = ACLManager.loadedACL(userID) + + if request.method == 'GET': + # Get scheduled scans with ACL respect + if currentACL['admin'] == 1: + # Admin can see all scheduled scans + scheduled_scans = ScheduledScan.objects.all() + else: + # Users can only see their own scheduled scans and their sub-users' scans + user_admins = ACLManager.loadUserObjects(userID) + scheduled_scans = ScheduledScan.objects.filter(admin__in=user_admins) + + scan_data = [] + for scan in scheduled_scans: + scan_data.append({ + 'id': scan.id, + 'name': scan.name, + 'domains': scan.domain_list, + 'frequency': scan.frequency, + 'scan_type': scan.scan_type, + 'time_of_day': scan.time_of_day.strftime('%H:%M'), + 'day_of_week': scan.day_of_week, + 'day_of_month': scan.day_of_month, + 'status': scan.status, + 'last_run': scan.last_run.isoformat() if scan.last_run else None, + 'next_run': scan.next_run.isoformat() if scan.next_run else None, + 'email_notifications': scan.email_notifications, + 'notification_emails': scan.notification_email_list, + 'notify_on_threats': scan.notify_on_threats, + 'notify_on_completion': scan.notify_on_completion, + 'notify_on_failure': scan.notify_on_failure, + 'created_at': scan.created_at.isoformat() + }) + + return JsonResponse({'success': True, 'scheduled_scans': scan_data}) + + elif request.method == 'POST': + # Create new scheduled scan + data = json.loads(request.body) + + # Validate required fields + required_fields = ['name', 'domains', 'frequency', 'scan_type', 'time_of_day'] + for field in required_fields: + if field not in data or not data[field]: + return JsonResponse({'success': False, 'error': f'Missing required field: {field}'}) + + # Validate domains + if not isinstance(data['domains'], list) or len(data['domains']) == 0: + return JsonResponse({'success': False, 'error': 'At least one domain must be selected'}) + + # Check if user has access to these domains + if currentACL['admin'] != 1: + from websiteFunctions.models import Websites + user_domains = set(Websites.objects.filter(admin=admin).values_list('domain', flat=True)) + requested_domains = set(data['domains']) + + if not requested_domains.issubset(user_domains): + return JsonResponse({'success': False, 'error': 'You do not have access to some of the selected domains'}) + + # Parse time + from datetime import datetime + try: + time_obj = datetime.strptime(data['time_of_day'], '%H:%M').time() + except ValueError: + return JsonResponse({'success': False, 'error': 'Invalid time format'}) + + # Create scheduled scan + scheduled_scan = ScheduledScan( + admin=admin, + name=data['name'], + frequency=data['frequency'], + scan_type=data['scan_type'], + time_of_day=time_obj, + email_notifications=data.get('email_notifications', True), + notify_on_threats=data.get('notify_on_threats', True), + notify_on_completion=data.get('notify_on_completion', False), + notify_on_failure=data.get('notify_on_failure', True) + ) + + # Set domains + scheduled_scan.set_domains(data['domains']) + + # Set notification emails + if data.get('notification_emails'): + scheduled_scan.set_notification_emails(data['notification_emails']) + + # Set frequency-specific fields + if data['frequency'] == 'weekly' and 'day_of_week' in data: + scheduled_scan.day_of_week = int(data['day_of_week']) + elif data['frequency'] in ['monthly', 'quarterly'] and 'day_of_month' in data: + scheduled_scan.day_of_month = int(data['day_of_month']) + + scheduled_scan.save() + + return JsonResponse({'success': True, 'id': scheduled_scan.id}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +@require_http_methods(['GET', 'DELETE']) +def scheduledScanDetail(request, scan_id): + """Get or delete a specific scheduled scan""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScheduledScan + from plogical.acl import ACLManager + + admin = Administrator.objects.get(pk=userID) + currentACL = ACLManager.loadedACL(userID) + + # Get scheduled scan with ACL respect + try: + scheduled_scan = ScheduledScan.objects.get(id=scan_id) + + # Check if user has access to this scheduled scan + if currentACL['admin'] != 1: + user_admins = ACLManager.loadUserObjects(userID) + if scheduled_scan.admin not in user_admins: + return JsonResponse({'success': False, 'error': 'Access denied to this scheduled scan'}) + except ScheduledScan.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scheduled scan not found'}) + + if request.method == 'GET': + # Return scheduled scan details + scan_data = { + 'id': scheduled_scan.id, + 'name': scheduled_scan.name, + 'domains': scheduled_scan.domain_list, + 'frequency': scheduled_scan.frequency, + 'scan_type': scheduled_scan.scan_type, + 'time_of_day': scheduled_scan.time_of_day.strftime('%H:%M'), + 'day_of_week': scheduled_scan.day_of_week, + 'day_of_month': scheduled_scan.day_of_month, + 'status': scheduled_scan.status, + 'last_run': scheduled_scan.last_run.isoformat() if scheduled_scan.last_run else None, + 'next_run': scheduled_scan.next_run.isoformat() if scheduled_scan.next_run else None, + 'email_notifications': scheduled_scan.email_notifications, + 'notification_emails': scheduled_scan.notification_email_list, + 'notify_on_threats': scheduled_scan.notify_on_threats, + 'notify_on_completion': scheduled_scan.notify_on_completion, + 'notify_on_failure': scheduled_scan.notify_on_failure, + 'created_at': scheduled_scan.created_at.isoformat() + } + + return JsonResponse({'success': True, 'scheduled_scan': scan_data}) + + elif request.method == 'DELETE': + # Delete scheduled scan + scheduled_scan.delete() + return JsonResponse({'success': True}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +@csrf_exempt +@require_http_methods(['POST']) +def toggleScheduledScan(request, scan_id): + """Toggle scheduled scan status (active/paused)""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScheduledScan + from plogical.acl import ACLManager + + admin = Administrator.objects.get(pk=userID) + currentACL = ACLManager.loadedACL(userID) + + # Get scheduled scan with ACL respect + try: + scheduled_scan = ScheduledScan.objects.get(id=scan_id) + + # Check if user has access to this scheduled scan + if currentACL['admin'] != 1: + user_admins = ACLManager.loadUserObjects(userID) + if scheduled_scan.admin not in user_admins: + return JsonResponse({'success': False, 'error': 'Access denied to this scheduled scan'}) + except ScheduledScan.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scheduled scan not found'}) + + # Toggle status + if scheduled_scan.status == 'active': + scheduled_scan.status = 'paused' + else: + scheduled_scan.status = 'active' + + scheduled_scan.save() + + return JsonResponse({'success': True, 'status': scheduled_scan.status}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +@require_http_methods(['GET']) +def scheduledScanExecutions(request, scan_id): + """Get execution history for a scheduled scan""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScheduledScan, ScheduledScanExecution + from plogical.acl import ACLManager + + admin = Administrator.objects.get(pk=userID) + currentACL = ACLManager.loadedACL(userID) + + # Get scheduled scan with ACL respect + try: + scheduled_scan = ScheduledScan.objects.get(id=scan_id) + + # Check if user has access to this scheduled scan + if currentACL['admin'] != 1: + user_admins = ACLManager.loadUserObjects(userID) + if scheduled_scan.admin not in user_admins: + return JsonResponse({'success': False, 'error': 'Access denied to this scheduled scan'}) + except ScheduledScan.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scheduled scan not found'}) + + # Get execution history + executions = ScheduledScanExecution.objects.filter(scheduled_scan=scheduled_scan).order_by('-execution_time')[:20] + + execution_data = [] + for execution in executions: + execution_data.append({ + 'id': execution.id, + 'execution_time': execution.execution_time.isoformat(), + 'status': execution.status, + 'domains_scanned': execution.scanned_domains, + 'total_scans': execution.total_scans, + 'successful_scans': execution.successful_scans, + 'failed_scans': execution.failed_scans, + 'total_cost': float(execution.total_cost), + 'scan_ids': execution.scan_id_list, + 'error_message': execution.error_message, + 'started_at': execution.started_at.isoformat() if execution.started_at else None, + 'completed_at': execution.completed_at.isoformat() if execution.completed_at else None + }) + + return JsonResponse({'success': True, 'executions': execution_data}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) \ No newline at end of file diff --git a/aiScanner/templates/aiScanner/scanner.html b/aiScanner/templates/aiScanner/scanner.html index d63d2af22..c278ca8f3 100644 --- a/aiScanner/templates/aiScanner/scanner.html +++ b/aiScanner/templates/aiScanner/scanner.html @@ -642,6 +642,27 @@ AI Security Scanner - CyberPanel + + + {% if is_payment_configured or vps_info.is_vps|default:False %} +
No scheduled scans configured yet.
'; + return; + } + + let html = 'Frequency: ${scan.frequency}
+Scan Type: ${scan.scan_type}
+Domains: ${scan.domains.join(', ')}
+Next Run: ${nextRun}
+Last Run: ${lastRun}
+