From 087863134c26ef99e154f3d7c40f457389689e29 Mon Sep 17 00:00:00 2001 From: master3395 Date: Fri, 27 Mar 2026 21:48:08 +0100 Subject: [PATCH] feat(firewall): merge Auto Ban Security Alerts logs into banned IPs API - getBannedIPs: append AutoBanLog rows (latest per IP) not already in DB/JSON - Skip expired timed bans; tag rows with ban_source autoBanSecurityAlerts - removeBannedIP/deleteBannedIP: handle synthetic id ablog- via unban by IP --- firewall/firewallManager.py | 177 ++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) diff --git a/firewall/firewallManager.py b/firewall/firewallManager.py index 10a0fc913..126a9c35e 100644 --- a/firewall/firewallManager.py +++ b/firewall/firewallManager.py @@ -39,6 +39,174 @@ class FirewallManager: def __init__(self, request = None): self.request = request + @staticmethod + def _normalize_banned_ip_key(ip): + return (ip or '').strip().lower() + + def _autoban_duration_seconds(self, duration): + duration_map = {'1h': 3600, '24h': 86400, '7d': 604800, '30d': 2592000} + d = (duration or '').strip().lower() + if d == 'permanent' or not d: + return None + return duration_map.get(d, 86400) + + def _autoban_log_expired(self, log, current_time): + """True if timed ban from AutoBanLog is past expiry (permanent never expires here).""" + d = (log.ban_duration or '').strip().lower() + if d == 'permanent' or not d: + return False + sec = self._autoban_duration_seconds(log.ban_duration) + if sec is None: + return False + if not log.banned_at: + return False + try: + ts = int(log.banned_at.timestamp()) + except Exception: + return False + return (ts + sec) <= int(current_time) + + def _merge_autoban_security_alerts_logs(self, active_banned_ips, current_time): + """ + List Auto Ban Security Alerts log rows whose IP is not already in the firewall + merged list (DB + JSON), so /firewall/#banned-ips matches the plugin settings view. + """ + try: + from django.apps import apps + + if not apps.is_installed('autoBanSecurityAlerts'): + return + from autoBanSecurityAlerts.models import AutoBanLog + except Exception: + return + + seen = set() + for row in active_banned_ips: + k = self._normalize_banned_ip_key(row.get('ip') or row.get('ip_address')) + if k: + seen.add(k) + + try: + autoban_by_ip = {} + for log in AutoBanLog.objects.order_by('-banned_at'): + k = self._normalize_banned_ip_key(str(log.ip_address)) + if not k: + continue + if k not in autoban_by_ip: + autoban_by_ip[k] = log + + for _k, log in autoban_by_ip.items(): + ip_key = self._normalize_banned_ip_key(str(log.ip_address)) + if ip_key in seen: + continue + if self._autoban_log_expired(log, current_time): + continue + banned_on_str = 'N/A' + if log.banned_at: + try: + banned_on_str = log.banned_at.strftime('%Y-%m-%d %H:%M:%S') + except Exception: + banned_on_str = str(log.banned_at) + expires_display = 'Never' + dlow = (log.ban_duration or '').strip().lower() + if dlow != 'permanent' and dlow: + sec = self._autoban_duration_seconds(log.ban_duration) + if sec is not None and log.banned_at: + try: + from datetime import datetime + + exp_ts = int(log.banned_at.timestamp()) + sec + expires_display = datetime.fromtimestamp(exp_ts).strftime( + '%Y-%m-%d %H:%M:%S' + ) + except Exception: + expires_display = 'Never' + active_banned_ips.append( + { + 'id': 'ablog-%s' % log.pk, + 'ip': str(log.ip_address), + 'reason': log.ban_reason or '', + 'duration': log.ban_duration or 'permanent', + 'banned_on': banned_on_str, + 'expires': expires_display, + 'active': True, + 'ban_source': 'autoBanSecurityAlerts', + } + ) + except Exception as e: + logging.CyberCPLogFileWriter.writeToFile( + 'getBannedIPs: AutoBanSecurityAlerts merge failed: %s' % str(e) + ) + + def _remove_autoban_log_by_suffix_id(self, userID, ablog_id_str): + """Handle unban when UI sends id like ablog-123 (orphan log-only row).""" + try: + pk = int(str(ablog_id_str).split('-', 1)[1]) + except Exception: + final_dic = {'status': 0, 'error_message': 'Invalid auto-ban log id'} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + try: + admin = Administrator.objects.get(pk=userID) + if admin.acl.adminStatus != 1: + return ACLManager.loadError() + from django.apps import apps + + if not apps.is_installed('autoBanSecurityAlerts'): + final_dic = {'status': 0, 'error_message': 'Auto Ban plugin is not installed'} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + from autoBanSecurityAlerts.models import AutoBanLog + + log = AutoBanLog.objects.filter(pk=pk).first() + if not log: + final_dic = {'status': 0, 'error_message': 'Auto-ban log entry not found'} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + ip = str(log.ip_address).strip() + admin0 = Administrator.objects.filter(acl__adminStatus=1).first() + if not admin0: + final_dic = {'status': 0, 'error_message': 'No admin user found'} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + + result = self.removeBannedIP(admin0.pk, {'ip': ip}) + raw = getattr(result, 'content', None) + parsed = {} + if raw is not None: + try: + if isinstance(raw, bytes): + raw = raw.decode('utf-8', errors='replace') + parsed = json.loads(raw) + except Exception: + parsed = {} + err_raw = (parsed.get('error_message') or parsed.get('error') or '').strip() + err_msg = err_raw.lower() + + if parsed.get('status') == 1: + log.delete() + return HttpResponse( + json.dumps( + { + 'status': 1, + 'message': parsed.get('message', 'IP unbanned successfully'), + } + ), + content_type='application/json', + ) + if 'not found' in err_msg: + log.delete() + return HttpResponse( + json.dumps( + { + 'status': 1, + 'message': 'Log cleared (ban was already removed or not found in firewall)', + } + ), + content_type='application/json', + ) + final_dic = {'status': 0, 'error_message': err_raw or 'Failed to remove ban'} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + except BaseException as msg: + final_dic = {'status': 0, 'error_message': str(msg)} + return HttpResponse(json.dumps(final_dic), content_type='application/json') + def _load_banned_ips_store(self): """ Load banned IPs from the primary store, falling back to legacy path. @@ -2094,6 +2262,9 @@ class FirewallManager: 'active': True }) + # Auto Ban Security Alerts: show log-only / plugin-visible bans on same list as firewall UI + self._merge_autoban_security_alerts_logs(active_banned_ips, current_time) + # Optional server-side search: filter by IP or reason (case-insensitive substring) # Normalize: strip query and compare against stripped IP/reason so "1.2.3.4" matches stored " 1.2.3.4 " search_q = (data.get('search') or data.get('q') or '').strip() if data else '' @@ -2296,6 +2467,9 @@ class FirewallManager: return ACLManager.loadError() banned_ip_id = data.get('id') + if isinstance(banned_ip_id, str) and banned_ip_id.startswith('ablog-'): + return self._remove_autoban_log_by_suffix_id(userID, banned_ip_id) + requested_ip = (data.get('ip') or '').strip() ip_to_unban = None @@ -2388,6 +2562,9 @@ class FirewallManager: return ACLManager.loadError() banned_ip_id = data.get('id') + if isinstance(banned_ip_id, str) and banned_ip_id.startswith('ablog-'): + return self._remove_autoban_log_by_suffix_id(userID, banned_ip_id) + requested_ip = (data.get('ip') or '').strip() try: