diff --git a/aiScanner/aiScannerManager.py b/aiScanner/aiScannerManager.py index 2c15a9d8d..329f33711 100644 --- a/aiScanner/aiScannerManager.py +++ b/aiScanner/aiScannerManager.py @@ -26,12 +26,14 @@ class AIScannerManager: try: admin = Administrator.objects.get(pk=userID) + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + # Check ACL permissions (with fallback for new field) try: - currentACL = ACLManager.loadedACL(userID) - if currentACL.aiScannerAccess == 0: + if currentACL.get('aiScannerAccess', 1) == 0: return ACLManager.loadError() - except AttributeError: + except (AttributeError, KeyError): # Field doesn't exist yet, allow access for now self.logger.writeToFile(f'[AIScannerManager.scannerHome] aiScannerAccess field not found, allowing access') pass @@ -45,8 +47,14 @@ class AIScannerManager: # Get current pricing from API pricing_data = self.get_ai_scanner_pricing() - # Get recent scan history - recent_scans = ScanHistory.objects.filter(admin=admin)[:10] + # Get recent scan history with ACL respect + if currentACL['admin'] == 1: + # Admin can see all scans + recent_scans = ScanHistory.objects.all().order_by('-started_at')[:10] + else: + # Users can only see their own scans and their sub-users' scans + user_admins = ACLManager.loadUserObjects(userID) + recent_scans = ScanHistory.objects.filter(admin__in=user_admins).order_by('-started_at')[:10] # Get current balance if payment is configured current_balance = scanner_settings.balance @@ -70,10 +78,9 @@ class AIScannerManager: server_ip = ACLManager.fetchIP() vps_info = self.check_vps_free_scans(server_ip) - # Get user's websites for scan selection - from websiteFunctions.models import Websites + # Get user's websites for scan selection using ACL-aware method try: - websites = Websites.objects.filter(admin=admin) + websites = ACLManager.findWebsiteObjects(currentACL, userID) self.logger.writeToFile(f'[AIScannerManager.scannerHome] Found {websites.count()} websites for {admin.userName}') except Exception as e: self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error fetching websites: {str(e)}') @@ -120,12 +127,14 @@ class AIScannerManager: admin = Administrator.objects.get(pk=userID) + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + # Check ACL permissions (with fallback for new field) try: - currentACL = ACLManager.loadedACL(userID) - if currentACL.aiScannerAccess == 0: + if currentACL.get('aiScannerAccess', 1) == 0: return JsonResponse({'success': False, 'error': 'Access denied'}) - except AttributeError: + except (AttributeError, KeyError): # Field doesn't exist yet, allow access for now pass @@ -217,12 +226,14 @@ class AIScannerManager: admin = Administrator.objects.get(pk=userID) + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + # Check ACL permissions (with fallback for new field) try: - currentACL = ACLManager.loadedACL(userID) - if currentACL.aiScannerAccess == 0: + if currentACL.get('aiScannerAccess', 1) == 0: return JsonResponse({'success': False, 'error': 'Access denied'}) - except AttributeError: + except (AttributeError, KeyError): # Field doesn't exist yet, allow access for now pass @@ -242,12 +253,17 @@ class AIScannerManager: if not domain: return JsonResponse({'success': False, 'error': 'Domain is required'}) - # Validate domain belongs to user + # Validate domain belongs to user using ACL-aware method from websiteFunctions.models import Websites try: - website = Websites.objects.get(domain=domain, admin=admin) + # Check if user has access to this domain through ACL system + if not ACLManager.checkOwnership(domain, admin, currentACL): + return JsonResponse({'success': False, 'error': 'Access denied to this domain'}) + + # Get the website object (we know it exists due to checkOwnership) + website = Websites.objects.get(domain=domain) except Websites.DoesNotExist: - return JsonResponse({'success': False, 'error': 'Domain not found or access denied'}) + return JsonResponse({'success': False, 'error': 'Domain not found'}) # Generate scan ID and file access token scan_id = f'cp_{uuid.uuid4().hex[:12]}' @@ -314,6 +330,17 @@ class AIScannerManager: admin = Administrator.objects.get(pk=userID) + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + + # Check ACL permissions (with fallback for new field) + try: + if currentACL.get('aiScannerAccess', 1) == 0: + return JsonResponse({'success': False, 'error': 'Access denied'}) + except (AttributeError, KeyError): + # Field doesn't exist yet, allow access for now + pass + # Get scanner settings try: scanner_settings = AIScannerSettings.objects.get(admin=admin) @@ -352,6 +379,17 @@ class AIScannerManager: admin = Administrator.objects.get(pk=userID) + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + + # Check ACL permissions (with fallback for new field) + try: + if currentACL.get('aiScannerAccess', 1) == 0: + return JsonResponse({'success': False, 'error': 'Access denied'}) + except (AttributeError, KeyError): + # Field doesn't exist yet, allow access for now + pass + # Check if user has scanner configured try: scanner_settings = AIScannerSettings.objects.get(admin=admin) @@ -392,6 +430,19 @@ class AIScannerManager: """Handle return from adding payment method""" try: admin = Administrator.objects.get(pk=userID) + + # Load ACL permissions + currentACL = ACLManager.loadedACL(userID) + + # Check ACL permissions (with fallback for new field) + try: + if currentACL.get('aiScannerAccess', 1) == 0: + messages.error(request, 'Access denied to AI Scanner') + return redirect('dashboard') + except (AttributeError, KeyError): + # Field doesn't exist yet, allow access for now + pass + status = request.GET.get('status') # Log all URL parameters for debugging diff --git a/aiScanner/views.py b/aiScanner/views.py index 816785489..f33059880 100644 --- a/aiScanner/views.py +++ b/aiScanner/views.py @@ -91,9 +91,19 @@ def getScanHistory(request): userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory + from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) - scans = ScanHistory.objects.filter(admin=admin).order_by('-started_at')[:20] + currentACL = ACLManager.loadedACL(userID) + + # Get scan history with ACL respect + if currentACL['admin'] == 1: + # Admin can see all scans + scans = ScanHistory.objects.all().order_by('-started_at')[:20] + else: + # Users can only see their own scans and their sub-users' scans + user_admins = ACLManager.loadUserObjects(userID) + scans = ScanHistory.objects.filter(admin__in=user_admins).order_by('-started_at')[:20] scan_data = [] for scan in scans: @@ -125,9 +135,23 @@ def getScanDetails(request, scan_id): userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory + from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) - scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin) + currentACL = ACLManager.loadedACL(userID) + + # Get scan with ACL respect + try: + scan = ScanHistory.objects.get(scan_id=scan_id) + + # Check if user has access to this scan + if currentACL['admin'] != 1: + # Non-admin users can only see their own scans and their sub-users' scans + user_admins = ACLManager.loadUserObjects(userID) + if scan.admin not in user_admins: + return JsonResponse({'success': False, 'error': 'Access denied to this scan'}) + except ScanHistory.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scan not found'}) scan_data = { 'scan_id': scan.scan_id, @@ -160,9 +184,23 @@ def getPlatformScanStatus(request, scan_id): userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory + from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) - scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin) + currentACL = ACLManager.loadedACL(userID) + + # Get scan with ACL respect + try: + scan = ScanHistory.objects.get(scan_id=scan_id) + + # Check if user has access to this scan + if currentACL['admin'] != 1: + # Non-admin users can only see their own scans and their sub-users' scans + user_admins = ACLManager.loadUserObjects(userID) + if scan.admin not in user_admins: + return JsonResponse({'success': False, 'error': 'Access denied to this scan'}) + except ScanHistory.DoesNotExist: + return JsonResponse({'success': False, 'error': 'Scan not found'}) scanner_settings = admin.ai_scanner_settings if not scanner_settings.api_key: