From 098e6d5227151013e553eabb7c88d9eb57fc1aba Mon Sep 17 00:00:00 2001 From: usmannasir Date: Wed, 25 Jun 2025 19:27:36 +0500 Subject: [PATCH] ai scanner --- CyberCP/settings.py | 3 +- CyberCP/urls.py | 1 + aiScanner/__init__.py | 0 aiScanner/admin.py | 3 + aiScanner/aiScannerManager.py | 679 +++++++++++ aiScanner/api.py | 627 ++++++++++ aiScanner/apps.py | 6 + aiScanner/migrations/__init__.py | 0 aiScanner/models.py | 109 ++ aiScanner/status_api.py | 145 +++ aiScanner/status_models.py | 42 + aiScanner/templates/aiScanner/scanner.html | 1037 +++++++++++++++++ aiScanner/tests.py | 3 + aiScanner/urls.py | 26 + aiScanner/views.py | 439 +++++++ .../templates/baseTemplate/index.html | 3 + 16 files changed, 3122 insertions(+), 1 deletion(-) create mode 100644 aiScanner/__init__.py create mode 100644 aiScanner/admin.py create mode 100644 aiScanner/aiScannerManager.py create mode 100644 aiScanner/api.py create mode 100644 aiScanner/apps.py create mode 100644 aiScanner/migrations/__init__.py create mode 100644 aiScanner/models.py create mode 100644 aiScanner/status_api.py create mode 100644 aiScanner/status_models.py create mode 100644 aiScanner/templates/aiScanner/scanner.html create mode 100644 aiScanner/tests.py create mode 100644 aiScanner/urls.py create mode 100644 aiScanner/views.py diff --git a/CyberCP/settings.py b/CyberCP/settings.py index e50b5abce..c3fbf104d 100644 --- a/CyberCP/settings.py +++ b/CyberCP/settings.py @@ -65,6 +65,7 @@ INSTALLED_APPS = [ 'containerization', 'CLManager', 'IncBackups', + 'aiScanner', # 'WebTerminal' ] @@ -189,4 +190,4 @@ LANGUAGES = ( MEDIA_URL = '/usr/local/CyberCP/tmp/' MEDIA_ROOT = MEDIA_URL -DATA_UPLOAD_MAX_MEMORY_SIZE = 2147483648 +DATA_UPLOAD_MAX_MEMORY_SIZE = 2147483648 \ No newline at end of file diff --git a/CyberCP/urls.py b/CyberCP/urls.py index 39a993960..da7ab903a 100644 --- a/CyberCP/urls.py +++ b/CyberCP/urls.py @@ -44,5 +44,6 @@ urlpatterns = [ path('container/', include('containerization.urls')), path('CloudLinux/', include('CLManager.urls')), path('IncrementalBackups/', include('IncBackups.urls')), + path('aiscanner/', include('aiScanner.urls')), # path('Terminal/', include('WebTerminal.urls')), ] diff --git a/aiScanner/__init__.py b/aiScanner/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aiScanner/admin.py b/aiScanner/admin.py new file mode 100644 index 000000000..8c38f3f3d --- /dev/null +++ b/aiScanner/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/aiScanner/aiScannerManager.py b/aiScanner/aiScannerManager.py new file mode 100644 index 000000000..aa95bd572 --- /dev/null +++ b/aiScanner/aiScannerManager.py @@ -0,0 +1,679 @@ +import requests +import json +import uuid +import secrets +from datetime import datetime, timedelta +from django.shortcuts import render, redirect +from django.http import JsonResponse +from django.utils import timezone +from django.conf import settings +from django.urls import reverse +from django.contrib import messages +from loginSystem.models import Administrator +from .models import AIScannerSettings, ScanHistory, FileAccessToken +from plogical.acl import ACLManager +from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + + +class AIScannerManager: + AI_SCANNER_API_BASE = 'https://platform.cyberpersons.com/ai-scanner' + + def __init__(self): + self.logger = logging + + def scannerHome(self, request, userID): + """Main AI Scanner page""" + try: + admin = Administrator.objects.get(pk=userID) + + # Check ACL permissions (with fallback for new field) + try: + currentACL = ACLManager.loadedACL(userID) + if currentACL.aiScannerAccess == 0: + return ACLManager.loadError() + except AttributeError: + # Field doesn't exist yet, allow access for now + self.logger.writeToFile(f'[AIScannerManager.scannerHome] aiScannerAccess field not found, allowing access') + pass + + # Get or create scanner settings + scanner_settings, created = AIScannerSettings.objects.get_or_create( + admin=admin, + defaults={'balance': 0.0000, 'is_payment_configured': False} + ) + + # Get current pricing from API + pricing_data = self.get_ai_scanner_pricing() + + # Get recent scan history + recent_scans = ScanHistory.objects.filter(admin=admin)[:10] + + # Get current balance if payment is configured + current_balance = scanner_settings.balance + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Stored balance: {current_balance}') + + if scanner_settings.is_payment_configured: + # Try to fetch latest balance from API (now supports flexible auth) + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Fetching balance from API...') + api_balance = self.get_account_balance(scanner_settings.api_key) + self.logger.writeToFile(f'[AIScannerManager.scannerHome] API returned balance: {api_balance}') + + if api_balance is not None: + scanner_settings.balance = api_balance + scanner_settings.save() + current_balance = api_balance + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Updated balance to: {current_balance}') + else: + self.logger.writeToFile(f'[AIScannerManager.scannerHome] API balance call failed, keeping stored balance: {current_balance}') + + # Get user's websites for scan selection + from websiteFunctions.models import Websites + try: + websites = Websites.objects.filter(admin=admin) + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Found {websites.count()} websites for {admin.userName}') + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error fetching websites: {str(e)}') + websites = [] + + # Build context safely + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Building context for {admin.userName}') + + context = { + 'admin': admin, + 'scanner_settings': scanner_settings, + 'pricing_data': pricing_data, + 'recent_scans': recent_scans, + 'current_balance': current_balance, + 'websites': websites, + 'is_payment_configured': scanner_settings.is_payment_configured, + } + + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Context built successfully, rendering template') + + return render(request, 'aiScanner/scanner.html', context) + + except Exception as e: + import traceback + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error: {str(e)}') + self.logger.writeToFile(f'[AIScannerManager.scannerHome] Traceback: {traceback.format_exc()}') + + return render(request, 'aiScanner/scanner.html', { + 'error': f'Failed to load AI Scanner page: {str(e)}', + 'is_payment_configured': False, + 'websites': [], + 'recent_scans': [], + 'current_balance': 0, + 'pricing_data': None + }) + + def setupPayment(self, request, userID): + """Setup payment method for AI scanner""" + try: + if request.method != 'POST': + return JsonResponse({'success': False, 'error': 'Invalid request method'}) + + admin = Administrator.objects.get(pk=userID) + + # Check ACL permissions (with fallback for new field) + try: + currentACL = ACLManager.loadedACL(userID) + if currentACL.aiScannerAccess == 0: + return JsonResponse({'success': False, 'error': 'Access denied'}) + except AttributeError: + # Field doesn't exist yet, allow access for now + pass + + # Get admin email and domain + cyberpanel_host = request.get_host() # Keep full host including port + cyberpanel_domain = cyberpanel_host.split(':')[0] # Domain only for email fallback + admin_email = admin.email if hasattr(admin, 'email') and admin.email else f'{admin.userName}@{cyberpanel_domain}' + + self.logger.writeToFile(f'[AIScannerManager.setupPayment] Admin: {admin.userName}, Email: {admin_email}, Host: {cyberpanel_host}') + + # Setup payment with AI Scanner API + self.logger.writeToFile(f'[AIScannerManager.setupPayment] Attempting payment setup for {admin_email} on {cyberpanel_host}') + setup_data = self.setup_ai_scanner_payment(admin_email, cyberpanel_host) + + if setup_data: + self.logger.writeToFile(f'[AIScannerManager.setupPayment] Payment setup successful for {admin_email}') + return JsonResponse({ + 'success': True, + 'payment_url': setup_data['payment_url'], + 'token': setup_data['token'] + }) + else: + self.logger.writeToFile(f'[AIScannerManager.setupPayment] Payment setup failed for {admin_email}') + return JsonResponse({ + 'success': False, + 'error': 'Failed to setup payment. Please check the logs and try again.' + }) + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.setupPayment] Error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}) + + def setupComplete(self, request, userID): + """Handle return from payment setup""" + try: + admin = Administrator.objects.get(pk=userID) + status = request.GET.get('status') + + # Log all URL parameters for debugging + self.logger.writeToFile(f'[AIScannerManager.setupComplete] All URL params: {dict(request.GET)}') + + if status == 'success': + api_key = request.GET.get('api_key') + balance = request.GET.get('balance', '0.00') + + self.logger.writeToFile(f'[AIScannerManager.setupComplete] API Key: {api_key[:20] if api_key else "None"}...') + self.logger.writeToFile(f'[AIScannerManager.setupComplete] Balance from URL: {balance}') + + if api_key: + # Update scanner settings + scanner_settings, created = AIScannerSettings.objects.get_or_create( + admin=admin, + defaults={ + 'api_key': api_key, + 'balance': float(balance), + 'is_payment_configured': True + } + ) + + if not created: + scanner_settings.api_key = api_key + scanner_settings.balance = float(balance) + scanner_settings.is_payment_configured = True + scanner_settings.save() + + self.logger.writeToFile(f'[AIScannerManager.setupComplete] Saved balance: {scanner_settings.balance}') + messages.success(request, f'Payment setup successful! You have ${balance} credit.') + self.logger.writeToFile(f'[AIScannerManager] Payment setup completed for {admin.userName} with balance ${balance}') + else: + messages.error(request, 'Payment setup completed but API key not received.') + + elif status in ['failed', 'cancelled', 'error']: + error = request.GET.get('error', 'Payment setup failed') + messages.error(request, f'Payment setup failed: {error}') + self.logger.writeToFile(f'[AIScannerManager] Payment setup failed for {admin.userName}: {error}') + + return redirect('aiScannerHome') + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.setupComplete] Error: {str(e)}') + messages.error(request, 'An error occurred during payment setup.') + return redirect('aiScannerHome') + + def startScan(self, request, userID): + """Start a new AI security scan""" + try: + if request.method != 'POST': + return JsonResponse({'success': False, 'error': 'Invalid request method'}) + + admin = Administrator.objects.get(pk=userID) + + # Check ACL permissions (with fallback for new field) + try: + currentACL = ACLManager.loadedACL(userID) + if currentACL.aiScannerAccess == 0: + return JsonResponse({'success': False, 'error': 'Access denied'}) + except AttributeError: + # Field doesn't exist yet, allow access for now + pass + + # Get scanner settings + try: + scanner_settings = AIScannerSettings.objects.get(admin=admin) + if not scanner_settings.is_payment_configured or not scanner_settings.api_key: + return JsonResponse({'success': False, 'error': 'Payment not configured'}) + except AIScannerSettings.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scanner not configured'}) + + # Parse request data + data = json.loads(request.body) + domain = data.get('domain') + scan_type = data.get('scan_type', 'full') + + if not domain: + return JsonResponse({'success': False, 'error': 'Domain is required'}) + + # Validate domain belongs to user + from websiteFunctions.models import Websites + try: + website = Websites.objects.get(domain=domain, admin=admin) + except Websites.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Domain not found or access denied'}) + + # Generate scan ID and file access token + scan_id = f'cp_{uuid.uuid4().hex[:12]}' + file_access_token = self.generate_file_access_token() + + # Create scan history record + scan_history = ScanHistory.objects.create( + admin=admin, + scan_id=scan_id, + domain=domain, + scan_type=scan_type, + status='pending' + ) + + # Create file access token + FileAccessToken.objects.create( + token=file_access_token, + scan_history=scan_history, + domain=domain, + wp_path=f'/home/{domain}/public_html', # Adjust path as needed + expires_at=timezone.now() + timedelta(hours=2) + ) + + # Submit scan to AI Scanner API + callback_url = f"https://{request.get_host()}/api/ai-scanner/callback" + file_access_base_url = f"https://{request.get_host()}/api/ai-scanner/" + scan_response = self.submit_wordpress_scan( + scanner_settings.api_key, + domain, + scan_type, + callback_url, + file_access_token, + file_access_base_url, + scan_id + ) + + if scan_response: + scan_history.status = 'running' + scan_history.save() + + return JsonResponse({ + 'success': True, + 'scan_id': scan_id, + 'message': 'Scan started successfully' + }) + else: + scan_history.status = 'failed' + scan_history.error_message = 'Failed to submit scan to AI Scanner API' + scan_history.save() + + return JsonResponse({'success': False, 'error': 'Failed to start scan'}) + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.startScan] Error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}) + + def refreshBalance(self, request, userID): + """Refresh account balance from API""" + try: + if request.method != 'POST': + return JsonResponse({'success': False, 'error': 'Invalid request method'}) + + admin = Administrator.objects.get(pk=userID) + + # Get scanner settings + try: + scanner_settings = AIScannerSettings.objects.get(admin=admin) + if not scanner_settings.is_payment_configured or not scanner_settings.api_key: + return JsonResponse({'success': False, 'error': 'Payment not configured'}) + except AIScannerSettings.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scanner not configured'}) + + # Fetch balance from API + api_balance = self.get_account_balance(scanner_settings.api_key) + + if api_balance is not None: + old_balance = scanner_settings.balance + scanner_settings.balance = api_balance + scanner_settings.save() + + self.logger.writeToFile(f'[AIScannerManager.refreshBalance] Updated balance from ${old_balance} to ${api_balance} for {admin.userName}') + + return JsonResponse({ + 'success': True, + 'balance': float(api_balance), + 'message': f'Balance refreshed: ${api_balance:.4f}' + }) + else: + return JsonResponse({'success': False, 'error': 'Failed to fetch balance from API'}) + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.refreshBalance] Error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}) + + def addPaymentMethod(self, request, userID): + """Add a new payment method for the user""" + try: + if request.method != 'POST': + return JsonResponse({'success': False, 'error': 'Invalid request method'}) + + admin = Administrator.objects.get(pk=userID) + + # Check if user has scanner configured + try: + scanner_settings = AIScannerSettings.objects.get(admin=admin) + if not scanner_settings.is_payment_configured or not scanner_settings.api_key: + return JsonResponse({'success': False, 'error': 'Initial payment setup required first'}) + except AIScannerSettings.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scanner not configured'}) + + # Get admin email and domain + cyberpanel_host = request.get_host() # Keep full host including port + cyberpanel_domain = cyberpanel_host.split(':')[0] # Domain only for email fallback + admin_email = admin.email if hasattr(admin, 'email') and admin.email else f'{admin.userName}@{cyberpanel_domain}' + + self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Setting up new payment method for {admin.userName} (API key authentication)') + + # Call platform API to add payment method + setup_data = self.setup_add_payment_method(scanner_settings.api_key, admin_email, cyberpanel_host) + + if setup_data: + self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Payment method setup successful for {admin_email}') + return JsonResponse({ + 'success': True, + 'setup_url': setup_data['setup_url'], + 'token': setup_data.get('token', '') + }) + else: + self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Payment method setup failed for {admin_email}') + return JsonResponse({ + 'success': False, + 'error': 'Failed to setup payment method. Please try again.' + }) + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}) + + def paymentMethodComplete(self, request, userID): + """Handle return from adding payment method""" + try: + admin = Administrator.objects.get(pk=userID) + status = request.GET.get('status') + + # Log all URL parameters for debugging + self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] All URL params: {dict(request.GET)}') + + if status == 'success': + payment_method_id = request.GET.get('payment_method_id') + card_last4 = request.GET.get('card_last4') + card_brand = request.GET.get('card_brand') + + self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] Payment method added: {payment_method_id} ({card_brand} ****{card_last4})') + + if payment_method_id: + messages.success(request, f'Payment method added successfully! New {card_brand} card ending in {card_last4}.') + self.logger.writeToFile(f'[AIScannerManager] Payment method added for {admin.userName}: {card_brand} ****{card_last4}') + else: + messages.success(request, 'Payment method added successfully!') + + elif status in ['failed', 'cancelled', 'error']: + error = request.GET.get('error', 'Failed to add payment method') + messages.error(request, f'Failed to add payment method: {error}') + self.logger.writeToFile(f'[AIScannerManager] Payment method add failed for {admin.userName}: {error}') + + return redirect('aiScannerHome') + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] Error: {str(e)}') + messages.error(request, 'An error occurred while adding payment method.') + return redirect('aiScannerHome') + + def scanCallback(self, request): + """Handle scan results callback from AI Scanner API""" + try: + if request.method != 'POST': + return JsonResponse({'success': False, 'error': 'Invalid request method'}) + + data = json.loads(request.body) + scan_id = data.get('scan_id') + status = data.get('status') + + if not scan_id: + return JsonResponse({'success': False, 'error': 'Scan ID required'}) + + # Find scan history record + try: + scan_history = ScanHistory.objects.get(scan_id=scan_id) + except ScanHistory.DoesNotExist: + self.logger.writeToFile(f'[AIScannerManager.scanCallback] Scan not found: {scan_id}') + return JsonResponse({'success': False, 'error': 'Scan not found'}) + + # Update scan status and results + scan_history.status = status + scan_history.completed_at = timezone.now() + + if status == 'completed': + findings = data.get('findings', []) + summary = data.get('summary', {}) + cost_usd = data.get('cost_usd', 0) + files_scanned = data.get('files_scanned', 0) + + scan_history.set_findings(findings) + scan_history.set_summary(summary) + scan_history.cost_usd = cost_usd + scan_history.files_scanned = files_scanned + scan_history.issues_found = len(findings) + + # Update user balance + scanner_settings = scan_history.admin.ai_scanner_settings + if cost_usd and scanner_settings.balance >= cost_usd: + scanner_settings.balance -= cost_usd + scanner_settings.save() + + self.logger.writeToFile(f'[AIScannerManager] Scan completed: {scan_id}, Cost: ${cost_usd}, Issues: {len(findings)}') + + elif status == 'failed': + error_message = data.get('error', 'Scan failed') + scan_history.error_message = error_message + self.logger.writeToFile(f'[AIScannerManager] Scan failed: {scan_id}, Error: {error_message}') + + scan_history.save() + + # Deactivate file access tokens + FileAccessToken.objects.filter(scan_history=scan_history).update(is_active=False) + + return JsonResponse({'success': True}) + + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.scanCallback] Error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}) + + # API Helper Methods + + def get_ai_scanner_pricing(self): + """Get current pricing from AI Scanner API""" + try: + response = requests.get(f'{self.AI_SCANNER_API_BASE}/api/plan/', timeout=10) + if response.status_code == 200: + return response.json() + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.get_ai_scanner_pricing] Error: {str(e)}') + return None + + def setup_ai_scanner_payment(self, user_email, cyberpanel_host): + """Setup payment method with AI Scanner API""" + try: + payload = { + 'email': user_email, + 'domain': cyberpanel_host.split(':')[0], # Send domain without port + 'return_url': f'https://{cyberpanel_host}/aiscanner/setup-complete/' # Include port in URL + } + + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Sending request to: {self.AI_SCANNER_API_BASE}/cyberpanel/setup-payment/') + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Payload: {payload}') + + response = requests.post( + f'{self.AI_SCANNER_API_BASE}/cyberpanel/setup-payment/', + json=payload, + timeout=10 + ) + + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Response status: {response.status_code}') + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Response content: {response.text}') + + if response.status_code == 200: + data = response.json() + if data.get('success'): + return { + 'payment_url': data['payment_url'], + 'token': data['token'] + } + else: + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] API returned success=false: {data.get("error", "Unknown error")}') + else: + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Non-200 status code: {response.status_code}') + + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Exception: {str(e)}') + return None + + def get_account_balance(self, api_key): + """Get current account balance""" + try: + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Requesting balance from: {self.AI_SCANNER_API_BASE}/api/account/balance/') + + response = requests.get( + f'{self.AI_SCANNER_API_BASE}/api/account/balance/', + headers={'X-API-Key': api_key}, + timeout=10 + ) + + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Response status: {response.status_code}') + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Response content: {response.text}') + + if response.status_code == 200: + data = response.json() + if data.get('success'): + # Use the new balance_usd field from flexible API + balance = float(data.get('balance_usd', data.get('balance', 0))) + auth_method = data.get('authenticated_via', 'unknown') + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Parsed balance: {balance} (auth: {auth_method})') + return balance + else: + # Even failed responses now include balance_usd hint + balance_hint = data.get('balance_usd', 0) + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] API returned success=false: {data.get("error", "Unknown error")} (balance hint: {balance_hint})') + # Return the balance hint if available, even on auth failure + if balance_hint > 0: + return float(balance_hint) + else: + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Non-200 status code: {response.status_code}') + + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Exception: {str(e)}') + return None + + def submit_wordpress_scan(self, api_key, domain, scan_type, callback_url, file_access_token, file_access_base_url, scan_id): + """Submit scan request to AI Scanner API""" + try: + payload = { + 'site_url': domain, + 'scan_type': scan_type, + 'cyberpanel_callback': callback_url, + 'file_access_token': file_access_token, + 'file_access_base_url': file_access_base_url, + 'scan_id': scan_id + } + + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Submitting scan {scan_id} for {domain}') + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Payload: {payload}') + + response = requests.post( + f'{self.AI_SCANNER_API_BASE}/api/scan/submit-v2/', + headers={'X-API-Key': api_key}, + json=payload, + timeout=10 + ) + + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Response status: {response.status_code}') + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Response content: {response.text}') + + if response.status_code == 200: + data = response.json() + if data.get('success'): + platform_scan_id = data.get('scan_id') + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Platform assigned scan ID: {platform_scan_id}') + return platform_scan_id + else: + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Platform returned success=false: {data.get("error", "Unknown error")}') + else: + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Non-200 status code: {response.status_code}') + + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Error: {str(e)}') + return None + + def get_scan_status(self, api_key, scan_id): + """Get scan status from AI Scanner API""" + try: + response = requests.get( + f'{self.AI_SCANNER_API_BASE}/api/scan/{scan_id}/status/', + headers={'X-API-Key': api_key}, + timeout=10 + ) + + if response.status_code == 200: + return response.json() + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.get_scan_status] Error: {str(e)}') + return None + + def get_scan_results(self, api_key, scan_id): + """Get scan results from AI Scanner API""" + try: + response = requests.get( + f'{self.AI_SCANNER_API_BASE}/api/scan/{scan_id}/results/', + headers={'X-API-Key': api_key}, + timeout=10 + ) + + if response.status_code == 200: + return response.json() + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.get_scan_results] Error: {str(e)}') + return None + + def setup_add_payment_method(self, api_key, user_email, cyberpanel_host): + """Setup additional payment method with AI Scanner API""" + try: + payload = { + 'domain': cyberpanel_host.split(':')[0], # Send domain without port + 'return_url': f'https://{cyberpanel_host}/aiscanner/payment-method-complete/', # Include port in URL + 'action': 'add_payment_method' # Indicate this is adding a payment method, not initial setup + } + + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Sending request to: {self.AI_SCANNER_API_BASE}/cyberpanel/add-payment-method/') + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Payload: {payload}') + + response = requests.post( + f'{self.AI_SCANNER_API_BASE}/cyberpanel/add-payment-method/', + headers={'X-API-Key': api_key}, + json=payload, + timeout=10 + ) + + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Response status: {response.status_code}') + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Response content: {response.text}') + + if response.status_code == 200: + data = response.json() + if data.get('success'): + return { + 'setup_url': data['setup_url'], + 'token': data.get('token', '') + } + else: + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] API returned success=false: {data.get("error", "Unknown error")}') + else: + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Non-200 status code: {response.status_code}') + + return None + except Exception as e: + self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Exception: {str(e)}') + return None + + def generate_file_access_token(self): + """Generate secure file access token""" + return f'cp_{secrets.token_urlsafe(32)}' \ No newline at end of file diff --git a/aiScanner/api.py b/aiScanner/api.py new file mode 100644 index 000000000..441cb9de3 --- /dev/null +++ b/aiScanner/api.py @@ -0,0 +1,627 @@ +import json +import os +import time +import mimetypes +import base64 +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +from websiteFunctions.models import Websites +from loginSystem.models import Administrator +from .models import FileAccessToken, ScanHistory +from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + + +class SecurityError(Exception): + """Custom exception for security violations""" + pass + + +def validate_access_token(token, scan_id): + """ + Implement proper token validation + - Check token format + - Verify token hasn't expired + - Confirm token is for the correct scan + - Log access attempts + """ + try: + if not token or not token.startswith('cp_'): + logging.writeToFile(f'[API] Invalid token format: {token[:20] if token else "None"}...') + return None, "Invalid token format" + + # Find the token in database + try: + file_token = FileAccessToken.objects.get( + token=token, + scan_history__scan_id=scan_id, + is_active=True + ) + + if file_token.is_expired(): + logging.writeToFile(f'[API] Token expired for scan {scan_id}') + return None, "Token expired" + + logging.writeToFile(f'[API] Token validated successfully for scan {scan_id}') + return file_token, None + + except FileAccessToken.DoesNotExist: + logging.writeToFile(f'[API] Token not found for scan {scan_id}') + return None, "Invalid token" + + except Exception as e: + logging.writeToFile(f'[API] Token validation error: {str(e)}') + return None, "Token validation failed" + + +def secure_path_check(base_path, requested_path): + """ + Ensure requested path is within allowed directory + Prevent directory traversal attacks + """ + try: + if requested_path: + full_path = os.path.join(base_path, requested_path.strip('/')) + else: + full_path = base_path + + full_path = os.path.abspath(full_path) + base_path = os.path.abspath(base_path) + + if not full_path.startswith(base_path): + raise SecurityError("Path outside allowed directory") + + return full_path + except Exception as e: + raise SecurityError(f"Path security check failed: {str(e)}") + + +@csrf_exempt +@require_http_methods(['POST']) +def authenticate_worker(request): + """ + POST /api/ai-scanner/authenticate + + Request Body: + { + "access_token": "cp_access_abc123...", + "scan_id": "550e8400-e29b-41d4-a716-446655440000", + "worker_id": "scanner-1.domain.com" + } + + Response: + { + "success": true, + "site_info": { + "domain": "client-domain.com", + "wp_path": "/home/client/public_html", + "php_version": "8.1", + "wp_version": "6.3.1" + }, + "permissions": ["read_files", "list_directories"], + "expires_at": "2024-12-25T11:00:00Z" + } + """ + try: + data = json.loads(request.body) + access_token = data.get('access_token') + scan_id = data.get('scan_id') + worker_id = data.get('worker_id', 'unknown') + + logging.writeToFile(f'[API] Authentication request from worker {worker_id} for scan {scan_id}') + + if not access_token or not scan_id: + return JsonResponse({'error': 'Missing access_token or scan_id'}, status=400) + + # Validate access token + file_token, error = validate_access_token(access_token, scan_id) + if error: + return JsonResponse({'error': error}, status=401) + + # Get website info + try: + website = Websites.objects.get(domain=file_token.domain) + + # Get WordPress info + wp_path = file_token.wp_path + wp_version = 'Unknown' + php_version = 'Unknown' + + # Try to get WP version from wp-includes/version.php using ProcessUtilities + version_file = os.path.join(wp_path, 'wp-includes', 'version.php') + try: + from plogical.processUtilities import ProcessUtilities + + # Use ProcessUtilities to read file as the website user + command = f'cat "{version_file}"' + result = ProcessUtilities.outputExecutioner(command, user=website.externalApp, retRequired=True) + + if result[1]: # Check if there's content (ignore return code) + content = result[1] + import re + match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content) + if match: + wp_version = match.group(1) + logging.writeToFile(f'[API] Detected WordPress version: {wp_version}') + else: + logging.writeToFile(f'[API] Could not read WP version file: {result[1] if len(result) > 1 else "No content returned"}') + + except Exception as e: + logging.writeToFile(f'[API] Error reading WP version: {str(e)}') + + # Try to detect PHP version (basic detection) + try: + import subprocess + result = subprocess.run(['php', '-v'], capture_output=True, text=True, timeout=5) + if result.returncode == 0: + import re + match = re.search(r'PHP (\d+\.\d+)', result.stdout) + if match: + php_version = match.group(1) + except Exception: + pass + + response_data = { + 'success': True, + 'site_info': { + 'domain': file_token.domain, + 'wp_path': wp_path, + 'php_version': php_version, + 'wp_version': wp_version, + 'scan_id': scan_id + }, + 'permissions': ['read_files', 'list_directories'], + 'expires_at': file_token.expires_at.strftime('%Y-%m-%dT%H:%M:%SZ') + } + + logging.writeToFile(f'[API] Authentication successful for {file_token.domain}') + return JsonResponse(response_data) + + except Websites.DoesNotExist: + logging.writeToFile(f'[API] Website not found: {file_token.domain}') + return JsonResponse({'error': 'Website not found'}, status=404) + + except json.JSONDecodeError: + return JsonResponse({'error': 'Invalid JSON'}, status=400) + except Exception as e: + logging.writeToFile(f'[API] Authentication error: {str(e)}') + return JsonResponse({'error': 'Authentication failed'}, status=500) + + +@csrf_exempt +@require_http_methods(['GET']) +def list_files(request): + """ + GET /api/ai-scanner/files/list?path=wp-content/plugins + + Headers: + Authorization: Bearer cp_access_abc123... + X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000 + + Response: + { + "path": "wp-content/plugins", + "items": [ + { + "name": "akismet", + "type": "directory", + "modified": "2024-12-20T10:30:00Z" + }, + { + "name": "suspicious-plugin.php", + "type": "file", + "size": 15420, + "modified": "2024-12-24T15:20:00Z", + "permissions": "644" + } + ] + } + """ + try: + # Validate authorization + auth_header = request.META.get('HTTP_AUTHORIZATION', '') + if not auth_header.startswith('Bearer '): + return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401) + + access_token = auth_header.replace('Bearer ', '') + scan_id = request.META.get('HTTP_X_SCAN_ID', '') + + if not scan_id: + return JsonResponse({'error': 'X-Scan-ID header required'}, status=400) + + # Validate access token + file_token, error = validate_access_token(access_token, scan_id) + if error: + return JsonResponse({'error': error}, status=401) + + # Get parameters + path = request.GET.get('path', '').strip('/') + + try: + # Security check and get full path + full_path = secure_path_check(file_token.wp_path, path) + + # Path existence and type checking will be done by ProcessUtilities + + # List directory contents using ProcessUtilities + items = [] + try: + from plogical.processUtilities import ProcessUtilities + from websiteFunctions.models import Websites + + # Get website object for user context + try: + website = Websites.objects.get(domain=file_token.domain) + user = website.externalApp + except Websites.DoesNotExist: + return JsonResponse({'error': 'Website not found'}, status=404) + + # Use ls command with ProcessUtilities to list directory as website user + ls_command = f'ls -la "{full_path}"' + result = ProcessUtilities.outputExecutioner(ls_command, user=user, retRequired=True) + + if result[1]: # Check if there's content (ignore return code) + lines = result[1].strip().split('\n') + for line in lines[1:]: # Skip the 'total' line + if not line.strip(): + continue + + parts = line.split() + if len(parts) < 9: + continue + + permissions = parts[0] + size = parts[4] if parts[4].isdigit() else 0 + name = ' '.join(parts[8:]) # Handle filenames with spaces + + # Skip hidden files, current/parent directory entries + if name.startswith('.') or name in ['.', '..'] or name in ['__pycache__', 'node_modules']: + continue + + item_data = { + 'name': name, + 'type': 'directory' if permissions.startswith('d') else 'file', + 'permissions': permissions[1:4] if len(permissions) >= 4 else '644' + } + + if permissions.startswith('-'): # Regular file + try: + item_data['size'] = int(size) + except ValueError: + item_data['size'] = 0 + + # Only include certain file types + if name.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml')): + items.append(item_data) + elif permissions.startswith('d'): # Directory + # Directories don't have a size in the same way + item_data['size'] = 0 + items.append(item_data) + else: + # Other file types (links, etc.) - include with size 0 + item_data['size'] = 0 + items.append(item_data) + else: + logging.writeToFile(f'[API] Directory listing failed: {result[1] if len(result) > 1 else "No content returned"}') + return JsonResponse({'error': 'Directory access failed'}, status=403) + + except Exception as e: + logging.writeToFile(f'[API] Directory listing error: {str(e)}') + return JsonResponse({'error': 'Directory access failed'}, status=403) + + logging.writeToFile(f'[API] Listed {len(items)} items in {path or "root"} for scan {scan_id}') + + return JsonResponse({ + 'path': path, + 'items': sorted(items, key=lambda x: (x['type'] == 'file', x['name'].lower())) + }) + + except SecurityError as e: + logging.writeToFile(f'[API] Security violation: {str(e)}') + return JsonResponse({'error': 'Path not allowed'}, status=403) + + except Exception as e: + logging.writeToFile(f'[API] List files error: {str(e)}') + return JsonResponse({'error': 'Internal server error'}, status=500) + + +@csrf_exempt +@require_http_methods(['GET']) +def get_file_content(request): + """ + GET /api/ai-scanner/files/content?path=wp-content/plugins/plugin.php + + Headers: + Authorization: Bearer cp_access_abc123... + X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000 + + Response: + { + "path": "wp-content/plugins/plugin.php", + "content": " 10 * 1024 * 1024: # 10MB limit + return JsonResponse({'error': 'File too large (max 10MB)'}, status=400) + except ValueError: + logging.writeToFile(f'[API] Could not parse file size: {stat_result[1]}') + file_size = 0 + else: + logging.writeToFile(f'[API] Could not get file size: {stat_result[1] if len(stat_result) > 1 else "No content returned"}') + return JsonResponse({'error': 'File not found or inaccessible'}, status=404) + + # Use cat command with ProcessUtilities to read file as website user + cat_command = f'cat "{full_path}"' + result = ProcessUtilities.outputExecutioner(cat_command, user=user, retRequired=True) + + # Check if content was returned (file might be empty, which is valid) + if len(result) > 1: # We got a tuple back + content = result[1] if result[1] is not None else '' + encoding = 'utf-8' + else: + logging.writeToFile(f'[API] File read failed: No result returned') + return JsonResponse({'error': 'Unable to read file'}, status=400) + + except Exception as e: + logging.writeToFile(f'[API] File read error: {str(e)}') + return JsonResponse({'error': 'Unable to read file'}, status=400) + + # Detect MIME type + mime_type, _ = mimetypes.guess_type(full_path) + if not mime_type: + if file_ext == '.php': + mime_type = 'text/x-php' + elif file_ext == '.js': + mime_type = 'application/javascript' + else: + mime_type = 'text/plain' + + # Base64 encode the content for safe transport + try: + content_base64 = base64.b64encode(content.encode('utf-8')).decode('utf-8') + except UnicodeEncodeError: + # Handle binary files or encoding issues + try: + content_base64 = base64.b64encode(content.encode('latin-1')).decode('utf-8') + encoding = 'latin-1' + except: + logging.writeToFile(f'[API] Failed to encode file content for {path}') + return JsonResponse({'error': 'File encoding not supported'}, status=400) + + logging.writeToFile(f'[API] File content retrieved: {path} ({file_size} bytes) for scan {scan_id}') + + return JsonResponse({ + 'path': path, + 'content': content_base64, + 'size': file_size, + 'encoding': encoding, + 'mime_type': mime_type + }) + + except SecurityError as e: + logging.writeToFile(f'[API] Security violation: {str(e)}') + return JsonResponse({'error': 'Path not allowed'}, status=403) + + except Exception as e: + logging.writeToFile(f'[API] Get file content error: {str(e)}') + return JsonResponse({'error': 'Internal server error'}, status=500) + + +@csrf_exempt +@require_http_methods(['POST']) +def scan_callback(request): + """ + Receive scan completion callbacks from AI Scanner platform + + POST /api/ai-scanner/callback + Content-Type: application/json + + Expected payload: + { + "scan_id": "uuid", + "status": "completed", + "summary": { + "threat_level": "HIGH|MEDIUM|LOW", + "total_findings": 3, + "files_scanned": 25, + "cost": "$0.0456" + }, + "findings": [ + { + "file_path": "wp-content/plugins/file.php", + "severity": "CRITICAL|HIGH|MEDIUM|LOW", + "title": "Issue title", + "description": "Detailed description", + "ai_confidence": 95 + } + ], + "ai_analysis": "AI summary text", + "completed_at": "2025-06-23T11:40:12Z" + } + """ + try: + # Parse JSON payload + data = json.loads(request.body) + + scan_id = data.get('scan_id') + status = data.get('status') + summary = data.get('summary', {}) + findings = data.get('findings', []) + ai_analysis = data.get('ai_analysis', '') + completed_at = data.get('completed_at') + + logging.writeToFile(f"[API] Received callback for scan {scan_id}: {status}") + + # Update scan status in CyberPanel database + try: + from .models import ScanHistory + from django.utils import timezone + import datetime + + # Find the scan record + scan_record = ScanHistory.objects.get(scan_id=scan_id) + + # Update scan record + scan_record.status = status + scan_record.issues_found = summary.get('total_findings', 0) + scan_record.files_scanned = summary.get('files_scanned', 0) + + # Parse and store cost + cost_str = summary.get('cost', '$0.00') + try: + # Remove '$' and convert to float + cost_value = float(cost_str.replace('$', '').replace(',', '')) + scan_record.cost_usd = cost_value + except (ValueError, AttributeError): + scan_record.cost_usd = 0.0 + + # Store findings and AI analysis + scan_record.set_findings(findings) + + # Build summary dict + summary_dict = { + 'threat_level': summary.get('threat_level', 'UNKNOWN'), + 'total_findings': summary.get('total_findings', 0), + 'files_scanned': summary.get('files_scanned', 0), + 'ai_analysis': ai_analysis + } + scan_record.set_summary(summary_dict) + + # Set completion time + if completed_at: + try: + # Parse ISO format datetime + completed_datetime = datetime.datetime.fromisoformat(completed_at.replace('Z', '+00:00')) + scan_record.completed_at = completed_datetime + except ValueError: + scan_record.completed_at = timezone.now() + else: + scan_record.completed_at = timezone.now() + + scan_record.save() + + # Update user balance if scan cost money + if scan_record.cost_usd > 0: + try: + scanner_settings = scan_record.admin.ai_scanner_settings + if scanner_settings.balance >= scan_record.cost_usd: + # Convert to same type to avoid Decimal/float issues + scanner_settings.balance = float(scanner_settings.balance) - float(scan_record.cost_usd) + scanner_settings.save() + logging.writeToFile(f"[API] Deducted ${scan_record.cost_usd} from {scan_record.admin.userName} balance") + else: + logging.writeToFile(f"[API] Insufficient balance for scan cost: ${scan_record.cost_usd}") + except Exception as e: + logging.writeToFile(f"[API] Error updating balance: {str(e)}") + + # Deactivate file access tokens for this scan + try: + from .models import FileAccessToken + FileAccessToken.objects.filter(scan_history=scan_record).update(is_active=False) + logging.writeToFile(f"[API] Deactivated file access tokens for scan {scan_id}") + except Exception as e: + logging.writeToFile(f"[API] Error deactivating tokens: {str(e)}") + + logging.writeToFile(f"[API] Scan {scan_id} completed successfully:") + logging.writeToFile(f"[API] Status: {status}") + logging.writeToFile(f"[API] Threat Level: {summary.get('threat_level')}") + logging.writeToFile(f"[API] Findings: {summary.get('total_findings')}") + logging.writeToFile(f"[API] Files Scanned: {summary.get('files_scanned')}") + logging.writeToFile(f"[API] Cost: {summary.get('cost')}") + + except ScanHistory.DoesNotExist: + logging.writeToFile(f"[API] Scan record not found: {scan_id}") + return JsonResponse({ + 'status': 'error', + 'message': 'Scan record not found', + 'scan_id': scan_id + }, status=404) + + except Exception as e: + logging.writeToFile(f"[API] Failed to update scan record: {str(e)}") + return JsonResponse({ + 'status': 'error', + 'message': 'Failed to update scan record', + 'scan_id': scan_id + }, status=500) + + # Return success response + return JsonResponse({ + 'status': 'success', + 'message': 'Callback received successfully', + 'scan_id': scan_id + }) + + except json.JSONDecodeError: + logging.writeToFile("[API] Invalid JSON in callback request") + return JsonResponse({ + 'status': 'error', + 'message': 'Invalid JSON payload' + }, status=400) + + except Exception as e: + logging.writeToFile(f"[API] Callback processing error: {str(e)}") + return JsonResponse({ + 'status': 'error', + 'message': 'Internal server error' + }, status=500) \ No newline at end of file diff --git a/aiScanner/apps.py b/aiScanner/apps.py new file mode 100644 index 000000000..27c493208 --- /dev/null +++ b/aiScanner/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class AiscannerConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'aiScanner' diff --git a/aiScanner/migrations/__init__.py b/aiScanner/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/aiScanner/models.py b/aiScanner/models.py new file mode 100644 index 000000000..13789d55e --- /dev/null +++ b/aiScanner/models.py @@ -0,0 +1,109 @@ +from django.db import models +from loginSystem.models import Administrator +import json + +# Import the status update model +from .status_models import ScanStatusUpdate + + +class AIScannerSettings(models.Model): + """Store AI scanner configuration and API keys for administrators""" + admin = models.OneToOneField(Administrator, on_delete=models.CASCADE, related_name='ai_scanner_settings') + api_key = models.CharField(max_length=255, blank=True, null=True) + balance = models.DecimalField(max_digits=10, decimal_places=4, default=0.0000) + is_payment_configured = models.BooleanField(default=False) + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + + class Meta: + db_table = 'ai_scanner_settings' + + def __str__(self): + return f"AI Scanner Settings for {self.admin.userName}" + + +class ScanHistory(models.Model): + """Store scan history and results""" + SCAN_STATUS_CHOICES = [ + ('pending', 'Pending'), + ('running', 'Running'), + ('completed', 'Completed'), + ('failed', 'Failed'), + ('cancelled', 'Cancelled'), + ] + + SCAN_TYPE_CHOICES = [ + ('full', 'Full Scan'), + ('quick', 'Quick Scan'), + ('custom', 'Custom Scan'), + ] + + admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scan_history') + scan_id = models.CharField(max_length=100, unique=True) + domain = models.CharField(max_length=255) + scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full') + status = models.CharField(max_length=20, choices=SCAN_STATUS_CHOICES, default='pending') + cost_usd = models.DecimalField(max_digits=10, decimal_places=6, null=True, blank=True) + files_scanned = models.IntegerField(default=0) + issues_found = models.IntegerField(default=0) + findings_json = models.TextField(blank=True, null=True) # Store JSON findings + summary_json = models.TextField(blank=True, null=True) # Store JSON summary + error_message = models.TextField(blank=True, null=True) + started_at = models.DateTimeField(auto_now_add=True) + completed_at = models.DateTimeField(null=True, blank=True) + + class Meta: + db_table = 'ai_scanner_history' + ordering = ['-started_at'] + + def __str__(self): + return f"Scan {self.scan_id} - {self.domain} ({self.status})" + + @property + def findings(self): + """Parse findings JSON""" + if self.findings_json: + try: + return json.loads(self.findings_json) + except json.JSONDecodeError: + return [] + return [] + + @property + def summary(self): + """Parse summary JSON""" + if self.summary_json: + try: + return json.loads(self.summary_json) + except json.JSONDecodeError: + return {} + return {} + + def set_findings(self, findings_list): + """Set findings from list/dict""" + self.findings_json = json.dumps(findings_list) + + def set_summary(self, summary_dict): + """Set summary from dict""" + self.summary_json = json.dumps(summary_dict) + + +class FileAccessToken(models.Model): + """Temporary tokens for file access during scans""" + token = models.CharField(max_length=100, unique=True) + scan_history = models.ForeignKey(ScanHistory, on_delete=models.CASCADE, related_name='access_tokens') + domain = models.CharField(max_length=255) + wp_path = models.CharField(max_length=500) + expires_at = models.DateTimeField() + created_at = models.DateTimeField(auto_now_add=True) + is_active = models.BooleanField(default=True) + + class Meta: + db_table = 'ai_scanner_file_tokens' + + def __str__(self): + return f"Access token {self.token} for {self.domain}" + + def is_expired(self): + from django.utils import timezone + return timezone.now() > self.expires_at diff --git a/aiScanner/status_api.py b/aiScanner/status_api.py new file mode 100644 index 000000000..8b3e61193 --- /dev/null +++ b/aiScanner/status_api.py @@ -0,0 +1,145 @@ +import json +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +from django.utils import timezone +from .status_models import ScanStatusUpdate +from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging + + +@csrf_exempt +@require_http_methods(['POST']) +def receive_status_update(request): + """ + Receive real-time scan status updates from platform + + POST /api/ai-scanner/status-webhook + + Expected payload: + { + "scan_id": "550e8400-e29b-41d4-a716-446655440000", + "phase": "scanning_files", + "progress": 75, + "current_file": "wp-content/plugins/suspicious/malware.php", + "files_discovered": 1247, + "files_scanned": 935, + "files_remaining": 312, + "threats_found": 5, + "critical_threats": 2, + "high_threats": 3, + "activity_description": "Scanning file 935/1247: wp-content/plugins/suspicious/malware.php" + } + """ + try: + data = json.loads(request.body) + + scan_id = data.get('scan_id') + if not scan_id: + logging.writeToFile('[Status API] Missing scan_id in status update') + return JsonResponse({'error': 'scan_id required'}, status=400) + + # Update or create status record + status_update, created = ScanStatusUpdate.objects.update_or_create( + scan_id=scan_id, + defaults={ + 'phase': data.get('phase', ''), + 'progress': int(data.get('progress', 0)), + 'current_file': data.get('current_file', ''), + 'files_discovered': int(data.get('files_discovered', 0)), + 'files_scanned': int(data.get('files_scanned', 0)), + 'files_remaining': int(data.get('files_remaining', 0)), + 'threats_found': int(data.get('threats_found', 0)), + 'critical_threats': int(data.get('critical_threats', 0)), + 'high_threats': int(data.get('high_threats', 0)), + 'activity_description': data.get('activity_description', ''), + 'last_updated': timezone.now() + } + ) + + action = "Created" if created else "Updated" + + # Extended logging for debugging + logging.writeToFile(f'[Status API] ✅ {action} status update for scan {scan_id}') + logging.writeToFile(f'[Status API] Phase: {data.get("phase")} → Progress: {data.get("progress", 0)}%') + logging.writeToFile(f'[Status API] Files: {data.get("files_scanned", 0)}/{data.get("files_discovered", 0)} ({data.get("files_remaining", 0)} remaining)') + logging.writeToFile(f'[Status API] Threats: {data.get("threats_found", 0)} total (Critical: {data.get("critical_threats", 0)}, High: {data.get("high_threats", 0)})') + if data.get('current_file'): + logging.writeToFile(f'[Status API] Current File: {data.get("current_file")}') + if data.get('activity_description'): + logging.writeToFile(f'[Status API] Activity: {data.get("activity_description")}') + + return JsonResponse({'success': True}) + + except json.JSONDecodeError: + logging.writeToFile('[Status API] Invalid JSON in status update') + return JsonResponse({'error': 'Invalid JSON'}, status=400) + except ValueError as e: + logging.writeToFile(f'[Status API] Value error in status update: {str(e)}') + return JsonResponse({'error': 'Invalid data types'}, status=400) + except Exception as e: + logging.writeToFile(f'[Status API] Status update error: {str(e)}') + return JsonResponse({'error': 'Internal server error'}, status=500) + + +@require_http_methods(['GET']) +def get_live_scan_progress(request, scan_id): + """ + Get current scan progress for real-time UI updates + + GET /api/ai-scanner/scan/{scan_id}/live-progress + + Response: + { + "success": true, + "scan_id": "550e8400-e29b-41d4-a716-446655440000", + "phase": "scanning_files", + "progress": 75, + "current_file": "wp-content/plugins/suspicious/malware.php", + "files_discovered": 1247, + "files_scanned": 935, + "files_remaining": 312, + "threats_found": 5, + "critical_threats": 2, + "high_threats": 3, + "activity_description": "Scanning file 935/1247: wp-content/plugins/suspicious/malware.php", + "last_updated": "2024-12-25T10:34:30Z", + "is_active": true + } + """ + try: + # Get latest status update + try: + status_update = ScanStatusUpdate.objects.get(scan_id=scan_id) + except ScanStatusUpdate.DoesNotExist: + logging.writeToFile(f'[Status API] Status not found for scan {scan_id}') + return JsonResponse({'success': False, 'error': 'Scan not found'}, status=404) + + response_data = { + 'success': True, + 'scan_id': scan_id, + 'phase': status_update.phase, + 'progress': status_update.progress, + 'current_file': status_update.current_file, + 'files_discovered': status_update.files_discovered, + 'files_scanned': status_update.files_scanned, + 'files_remaining': status_update.files_remaining, + 'threats_found': status_update.threats_found, + 'critical_threats': status_update.critical_threats, + 'high_threats': status_update.high_threats, + 'activity_description': status_update.activity_description, + 'last_updated': status_update.last_updated.isoformat(), + 'is_active': status_update.is_active + } + + # Extended logging for frontend polling + logging.writeToFile(f'[Status API] 📊 Frontend polling scan {scan_id}') + logging.writeToFile(f'[Status API] Status: {status_update.phase} ({status_update.progress}%) - Active: {status_update.is_active}') + logging.writeToFile(f'[Status API] Progress: {status_update.files_scanned}/{status_update.files_discovered} files, {status_update.threats_found} threats') + if status_update.current_file: + logging.writeToFile(f'[Status API] Currently scanning: {status_update.current_file}') + + return JsonResponse(response_data) + + except Exception as e: + logging.writeToFile(f'[Status API] Get progress error: {str(e)}') + return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500) \ No newline at end of file diff --git a/aiScanner/status_models.py b/aiScanner/status_models.py new file mode 100644 index 000000000..da2ecdd4f --- /dev/null +++ b/aiScanner/status_models.py @@ -0,0 +1,42 @@ +from django.db import models +from django.utils import timezone + + +class ScanStatusUpdate(models.Model): + """Real-time scan progress updates""" + scan_id = models.CharField(max_length=100, db_index=True, primary_key=True) + phase = models.CharField(max_length=50) # starting, discovering_files, scanning_files, completing, completed + progress = models.IntegerField(default=0) # 0-100 + + # File tracking + current_file = models.TextField(blank=True) + files_discovered = models.IntegerField(default=0) + files_scanned = models.IntegerField(default=0) + files_remaining = models.IntegerField(default=0) + + # Threat tracking + threats_found = models.IntegerField(default=0) + critical_threats = models.IntegerField(default=0) + high_threats = models.IntegerField(default=0) + + # Activity description + activity_description = models.TextField(blank=True) + + # Timestamps + last_updated = models.DateTimeField(auto_now=True) + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = 'ai_scanner_status_updates' + ordering = ['-last_updated'] + indexes = [ + models.Index(fields=['scan_id', '-last_updated']), + ] + + def __str__(self): + return f"Status update for {self.scan_id} - {self.phase} ({self.progress}%)" + + @property + def is_active(self): + """Check if scan is still active""" + return self.phase not in ['completed', 'failed', 'cancelled'] \ No newline at end of file diff --git a/aiScanner/templates/aiScanner/scanner.html b/aiScanner/templates/aiScanner/scanner.html new file mode 100644 index 000000000..e2b757cba --- /dev/null +++ b/aiScanner/templates/aiScanner/scanner.html @@ -0,0 +1,1037 @@ +{% extends "baseTemplate/index.html" %} +{% load i18n %} + +{% block title %} +AI Security Scanner - CyberPanel +{% endblock %} + +{% block content %} +
+
+
+

+ AI Security Scanner +

+
+
+ + + {% if error %} +
+ {{ error }} +
+ {% endif %} + + + {% if messages %} + {% for message in messages %} +
+ {{ message }} +
+ {% endfor %} + {% endif %} + + + {% if not is_payment_configured %} +
+
+
+

Setup Required

+

Configure your payment method to start scanning WordPress sites for malware and security vulnerabilities. Your card will be charged $10 initially to fund your scanning account.

+ + {% if pricing_data.success and pricing_data.plan %} +
+
+
Current Pricing:
+
    +
  • {{ pricing_data.plan.name }}
  • +
  • AI Model: {{ pricing_data.plan.ai_model.name }}
  • +
  • Max Files: {{ pricing_data.plan.limits.max_files_per_scan|floatformat:0 }}
  • +
  • Max File Size: {{ pricing_data.plan.limits.max_file_size_mb }}MB
  • +
+
+
+
Example Costs:
+
    +
  • Small Scan: ${{ pricing_data.plan.example_costs.small_scan.estimated_cost_usd }}
  • +
  • Medium Scan: ${{ pricing_data.plan.example_costs.medium_scan.estimated_cost_usd }}
  • +
  • Large Scan: ${{ pricing_data.plan.example_costs.large_scan.estimated_cost_usd }}
  • +
+
+
+ {% endif %} + + +
+
+
+ {% else %} + + +
+
+
+
+

+ Account Balance +

+
+
+

${{ current_balance|floatformat:4 }}

+

Available Credit

+ +
+
+
+ +
+
+
+

+ Recent Scans +

+
+
+

{{ recent_scans|length }}

+

Last 30 Days

+
+
+
+ +
+
+
+

+ Issues Found +

+
+
+

+ {% for scan in recent_scans %}{% if scan.status == 'completed' %}{{ scan.issues_found|add:0 }}{% endif %}{% endfor %} +

+

Total Issues

+
+
+
+
+ + +
+
+
+
+

+ Payment Methods + +

+
+
+

+ + Your account uses automatic billing. Add multiple payment methods for backup security. +

+
+

Payment methods will be managed through the platform.

+
+
+
+
+
+ + +
+
+
+
+

+ Start New Scan +

+
+
+
+
+
+
+ + +
+
+
+
+ + +
+
+
+
+ + +
+
+
+
+
+
+
+
+ {% endif %} + + + {% if is_payment_configured and recent_scans %} +
+
+
+
+

+ Recent Scans + +

+
+
+
+ + + + + + + + + + + + + + + + {% for scan in recent_scans %} + + + + + + + + + + + + {% endfor %} + +
DateDomainTypeStatusProgressFilesIssuesCostActions
{{ scan.started_at|date:"M d, Y H:i" }}{{ scan.domain }} + {{ scan.get_scan_type_display }} + + {% if scan.status == 'completed' %} + Completed + {% elif scan.status == 'running' %} + Running + {% elif scan.status == 'failed' %} + Failed + {% elif scan.status == 'pending' %} + Pending + {% endif %} + + {% if scan.status == 'running' or scan.status == 'pending' %} +
+
+ 0% +
+
+ {% else %} + 100% + {% endif %} +
{{ scan.files_scanned|default:"-" }} + {% if scan.issues_found > 0 %} + {{ scan.issues_found }} + {% else %} + {{ scan.issues_found }} + {% endif %} + + {% if scan.cost_usd %} + ${{ scan.cost_usd|floatformat:4 }} + {% else %} + - + {% endif %} + + +
+
+
+
+
+
+ {% endif %} + +
+
+
+ + + + + +{% endblock %} \ No newline at end of file diff --git a/aiScanner/tests.py b/aiScanner/tests.py new file mode 100644 index 000000000..7ce503c2d --- /dev/null +++ b/aiScanner/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/aiScanner/urls.py b/aiScanner/urls.py new file mode 100644 index 000000000..79f117664 --- /dev/null +++ b/aiScanner/urls.py @@ -0,0 +1,26 @@ +from django.urls import path +from . import views, api + +urlpatterns = [ + # Main AI Scanner pages + path('', views.aiScannerHome, name='aiScannerHome'), + path('setup-payment/', views.setupPayment, name='aiScannerSetupPayment'), + path('setup-complete/', views.setupComplete, name='aiScannerSetupComplete'), + path('start-scan/', views.startScan, name='aiScannerStartScan'), + path('refresh-balance/', views.refreshBalance, name='aiScannerRefreshBalance'), + path('add-payment-method/', views.addPaymentMethod, name='aiScannerAddPaymentMethod'), + path('payment-method-complete/', views.paymentMethodComplete, name='aiScannerPaymentMethodComplete'), + path('callback/', views.scanCallback, name='aiScannerCallback'), + + # Scan management + path('scan-history/', views.getScanHistory, name='aiScannerHistory'), + path('scan-details//', views.getScanDetails, name='aiScannerDetails'), + path('platform-status//', views.getPlatformScanStatus, name='aiScannerPlatformStatus'), + + # Note: RESTful API endpoints are in /api/urls.py for external access + + # Legacy API endpoints (for backward compatibility) + path('api/authenticate/', views.aiScannerAuthenticate, name='aiScannerAuthenticate'), + path('api/list-files/', views.aiScannerListFiles, name='aiScannerListFiles'), + path('api/get-file/', views.aiScannerGetFile, name='aiScannerGetFile'), +] \ No newline at end of file diff --git a/aiScanner/views.py b/aiScanner/views.py new file mode 100644 index 000000000..816785489 --- /dev/null +++ b/aiScanner/views.py @@ -0,0 +1,439 @@ +from django.shortcuts import render, redirect +from django.http import JsonResponse +from django.views.decorators.csrf import csrf_exempt +from django.views.decorators.http import require_http_methods +from loginSystem.views import loadLoginPage +from .aiScannerManager import AIScannerManager +import json +import os + + +def aiScannerHome(request): + """Main AI Scanner page""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.scannerHome(request, userID) + except KeyError: + return redirect(loadLoginPage) + + +def setupPayment(request): + """Setup payment method for AI scanner""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.setupPayment(request, userID) + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + + +def setupComplete(request): + """Handle return from payment setup""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.setupComplete(request, userID) + except KeyError: + return redirect(loadLoginPage) + + +def startScan(request): + """Start a new AI security scan""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.startScan(request, userID) + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + + +def refreshBalance(request): + """Refresh account balance from API""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.refreshBalance(request, userID) + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + + +def addPaymentMethod(request): + """Add a new payment method""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.addPaymentMethod(request, userID) + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + + +def paymentMethodComplete(request): + """Handle return from adding payment method""" + try: + userID = request.session['userID'] + sm = AIScannerManager() + return sm.paymentMethodComplete(request, userID) + except KeyError: + return redirect(loadLoginPage) + + +@csrf_exempt +def scanCallback(request): + """Handle scan results callback from AI Scanner API""" + sm = AIScannerManager() + return sm.scanCallback(request) + + +def getScanHistory(request): + """Get scan history for user""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScanHistory + + admin = Administrator.objects.get(pk=userID) + scans = ScanHistory.objects.filter(admin=admin).order_by('-started_at')[:20] + + scan_data = [] + for scan in scans: + scan_data.append({ + 'scan_id': scan.scan_id, + 'domain': scan.domain, + 'status': scan.status, + 'scan_type': scan.scan_type, + 'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'), + 'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None, + 'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0, + 'files_scanned': scan.files_scanned, + 'issues_found': scan.issues_found, + 'findings': scan.findings[:5] if scan.findings else [], # First 5 findings + 'summary': scan.summary + }) + + return JsonResponse({'success': True, 'scans': scan_data}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +def getScanDetails(request, scan_id): + """Get detailed scan results""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScanHistory + + admin = Administrator.objects.get(pk=userID) + scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin) + + scan_data = { + 'scan_id': scan.scan_id, + 'domain': scan.domain, + 'status': scan.status, + 'scan_type': scan.scan_type, + 'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'), + 'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None, + 'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0, + 'files_scanned': scan.files_scanned, + 'issues_found': scan.issues_found, + 'findings': scan.findings, + 'summary': scan.summary, + 'error_message': scan.error_message + } + + return JsonResponse({'success': True, 'scan': scan_data}) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except ScanHistory.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scan not found'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +def getPlatformScanStatus(request, scan_id): + """Get real-time scan status from AI Scanner platform""" + try: + userID = request.session['userID'] + from loginSystem.models import Administrator + from .models import ScanHistory + + admin = Administrator.objects.get(pk=userID) + scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin) + scanner_settings = admin.ai_scanner_settings + + if not scanner_settings.api_key: + return JsonResponse({'success': False, 'error': 'API key not configured'}) + + # Get real-time status from platform + sm = AIScannerManager() + platform_status = sm.get_scan_status(scanner_settings.api_key, scan_id) + + if platform_status and platform_status.get('success'): + status_data = platform_status.get('data', {}) + + # Return formatted status data for frontend + return JsonResponse({ + 'success': True, + 'scan_id': scan_id, + 'phase': status_data.get('status', 'unknown'), + 'progress': status_data.get('progress', 0), + 'current_file': status_data.get('current_file', ''), + 'files_discovered': status_data.get('files_discovered', 0), + 'files_scanned': status_data.get('files_scanned', 0), + 'files_remaining': status_data.get('files_remaining', 0), + 'threats_found': status_data.get('findings_count', 0), + 'critical_threats': status_data.get('critical_threats', 0), + 'high_threats': status_data.get('high_threats', 0), + 'activity_description': status_data.get('activity_description', ''), + 'last_updated': status_data.get('last_updated', ''), + 'is_active': status_data.get('status') in ['scanning', 'discovering_files', 'starting'], + 'cost': status_data.get('cost', '$0.00') + }) + else: + # No live status available, return scan database status + return JsonResponse({ + 'success': True, + 'scan_id': scan_id, + 'phase': scan.status, + 'progress': 100 if scan.status == 'completed' else 0, + 'current_file': '', + 'files_discovered': scan.files_scanned, + 'files_scanned': scan.files_scanned, + 'files_remaining': 0, + 'threats_found': scan.issues_found, + 'critical_threats': 0, + 'high_threats': 0, + 'activity_description': scan.error_message if scan.status == 'failed' else 'Scan completed', + 'last_updated': scan.completed_at.isoformat() if scan.completed_at else scan.started_at.isoformat(), + 'is_active': False, + 'cost': f'${scan.cost_usd:.4f}' if scan.cost_usd else '$0.00' + }) + + except KeyError: + return JsonResponse({'success': False, 'error': 'Not authenticated'}) + except ScanHistory.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scan not found'}) + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +# File Access API for AI Scanner + +@csrf_exempt +@require_http_methods(['POST']) +def aiScannerAuthenticate(request): + """Authenticate AI scanner access""" + try: + data = json.loads(request.body) + access_token = data.get('access_token') + scan_id = data.get('scan_id') + + if not access_token or not scan_id: + return JsonResponse({'success': False, 'error': 'Missing parameters'}) + + from .models import FileAccessToken, ScanHistory + + # Validate token + try: + file_token = FileAccessToken.objects.get( + token=access_token, + scan_history__scan_id=scan_id, + is_active=True + ) + + if file_token.is_expired(): + return JsonResponse({'success': False, 'error': 'Token expired'}) + + # Get WordPress info + from websiteFunctions.models import Websites + try: + website = Websites.objects.get(domain=file_token.domain) + + # Detect WordPress path and version + wp_path = file_token.wp_path + wp_version = 'Unknown' + php_version = 'Unknown' + + # Try to get WP version from wp-includes/version.php + version_file = os.path.join(wp_path, 'wp-includes', 'version.php') + if os.path.exists(version_file): + try: + with open(version_file, 'r') as f: + content = f.read() + import re + match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content) + if match: + wp_version = match.group(1) + except: + pass + + return JsonResponse({ + 'success': True, + 'site_info': { + 'domain': file_token.domain, + 'wp_path': wp_path, + 'php_version': php_version, + 'wp_version': wp_version, + 'scan_id': scan_id + } + }) + + except Websites.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Website not found'}) + + except FileAccessToken.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Invalid token'}) + + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +@csrf_exempt +@require_http_methods(['GET']) +def aiScannerListFiles(request): + """List directory contents for AI scanner""" + try: + path = request.GET.get('path', '') + access_token = request.headers.get('Authorization', '').replace('Bearer ', '') + + if not access_token: + return JsonResponse({'success': False, 'error': 'No authorization token'}) + + from .models import FileAccessToken + + # Validate token + try: + file_token = FileAccessToken.objects.get(token=access_token, is_active=True) + + if file_token.is_expired(): + return JsonResponse({'success': False, 'error': 'Token expired'}) + + # Construct full path + full_path = os.path.join(file_token.wp_path, path) + + # Security check - ensure path is within WordPress directory + if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)): + return JsonResponse({'success': False, 'error': 'Path not allowed'}) + + if not os.path.exists(full_path): + return JsonResponse({'success': False, 'error': 'Path not found'}) + + if not os.path.isdir(full_path): + return JsonResponse({'success': False, 'error': 'Path is not a directory'}) + + # List directory contents + items = [] + try: + for item in os.listdir(full_path): + item_path = os.path.join(full_path, item) + + # Skip hidden files and certain directories + if item.startswith('.') or item in ['__pycache__', 'node_modules']: + continue + + if os.path.isdir(item_path): + items.append({ + 'name': item, + 'type': 'directory', + 'path': os.path.join(path, item).replace('\\', '/') if path else item + }) + else: + # Only include certain file types + if item.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml')): + items.append({ + 'name': item, + 'type': 'file', + 'path': os.path.join(path, item).replace('\\', '/') if path else item, + 'size': os.path.getsize(item_path) + }) + + return JsonResponse({ + 'success': True, + 'path': path, + 'items': items + }) + + except PermissionError: + return JsonResponse({'success': False, 'error': 'Permission denied'}) + + except FileAccessToken.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Invalid token'}) + + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) + + +@csrf_exempt +@require_http_methods(['GET']) +def aiScannerGetFile(request): + """Get file content for AI scanner""" + try: + file_path = request.GET.get('path') + access_token = request.headers.get('Authorization', '').replace('Bearer ', '') + + if not access_token or not file_path: + return JsonResponse({'success': False, 'error': 'Missing parameters'}) + + from .models import FileAccessToken + + # Validate token + try: + file_token = FileAccessToken.objects.get(token=access_token, is_active=True) + + if file_token.is_expired(): + return JsonResponse({'success': False, 'error': 'Token expired'}) + + # Construct full path + full_path = os.path.join(file_token.wp_path, file_path) + + # Security check - ensure path is within WordPress directory + if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)): + return JsonResponse({'success': False, 'error': 'Path not allowed'}) + + if not os.path.exists(full_path): + return JsonResponse({'success': False, 'error': 'File not found'}) + + if not os.path.isfile(full_path): + return JsonResponse({'success': False, 'error': 'Path is not a file'}) + + # Check file size (max 10MB as per API limits) + file_size = os.path.getsize(full_path) + if file_size > 10 * 1024 * 1024: # 10MB + return JsonResponse({'success': False, 'error': 'File too large'}) + + # Read file content + try: + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + + return JsonResponse({ + 'success': True, + 'path': file_path, + 'content': content, + 'size': file_size + }) + + except UnicodeDecodeError: + # Try binary mode for non-text files + with open(full_path, 'rb') as f: + content = f.read() + + # Return base64 encoded for binary files + import base64 + return JsonResponse({ + 'success': True, + 'path': file_path, + 'content': base64.b64encode(content).decode('utf-8'), + 'encoding': 'base64', + 'size': file_size + }) + + except FileAccessToken.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Invalid token'}) + + except Exception as e: + return JsonResponse({'success': False, 'error': str(e)}) diff --git a/baseTemplate/templates/baseTemplate/index.html b/baseTemplate/templates/baseTemplate/index.html index 68b354fa0..1f51db851 100644 --- a/baseTemplate/templates/baseTemplate/index.html +++ b/baseTemplate/templates/baseTemplate/index.html @@ -1339,6 +1339,9 @@ ImunifyAV + + AI Scanner +