feat(firewall): merge Auto Ban Security Alerts logs into banned IPs API

- getBannedIPs: append AutoBanLog rows (latest per IP) not already in DB/JSON
- Skip expired timed bans; tag rows with ban_source autoBanSecurityAlerts
- removeBannedIP/deleteBannedIP: handle synthetic id ablog-<pk> via unban by IP
This commit is contained in:
master3395
2026-03-27 21:48:08 +01:00
parent d57dcdbf7d
commit 515ba2ae33

View File

@@ -39,6 +39,174 @@ class FirewallManager:
def __init__(self, request = None):
self.request = request
@staticmethod
def _normalize_banned_ip_key(ip):
return (ip or '').strip().lower()
def _autoban_duration_seconds(self, duration):
duration_map = {'1h': 3600, '24h': 86400, '7d': 604800, '30d': 2592000}
d = (duration or '').strip().lower()
if d == 'permanent' or not d:
return None
return duration_map.get(d, 86400)
def _autoban_log_expired(self, log, current_time):
"""True if timed ban from AutoBanLog is past expiry (permanent never expires here)."""
d = (log.ban_duration or '').strip().lower()
if d == 'permanent' or not d:
return False
sec = self._autoban_duration_seconds(log.ban_duration)
if sec is None:
return False
if not log.banned_at:
return False
try:
ts = int(log.banned_at.timestamp())
except Exception:
return False
return (ts + sec) <= int(current_time)
def _merge_autoban_security_alerts_logs(self, active_banned_ips, current_time):
"""
List Auto Ban Security Alerts log rows whose IP is not already in the firewall
merged list (DB + JSON), so /firewall/#banned-ips matches the plugin settings view.
"""
try:
from django.apps import apps
if not apps.is_installed('autoBanSecurityAlerts'):
return
from autoBanSecurityAlerts.models import AutoBanLog
except Exception:
return
seen = set()
for row in active_banned_ips:
k = self._normalize_banned_ip_key(row.get('ip') or row.get('ip_address'))
if k:
seen.add(k)
try:
autoban_by_ip = {}
for log in AutoBanLog.objects.order_by('-banned_at'):
k = self._normalize_banned_ip_key(str(log.ip_address))
if not k:
continue
if k not in autoban_by_ip:
autoban_by_ip[k] = log
for _k, log in autoban_by_ip.items():
ip_key = self._normalize_banned_ip_key(str(log.ip_address))
if ip_key in seen:
continue
if self._autoban_log_expired(log, current_time):
continue
banned_on_str = 'N/A'
if log.banned_at:
try:
banned_on_str = log.banned_at.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
banned_on_str = str(log.banned_at)
expires_display = 'Never'
dlow = (log.ban_duration or '').strip().lower()
if dlow != 'permanent' and dlow:
sec = self._autoban_duration_seconds(log.ban_duration)
if sec is not None and log.banned_at:
try:
from datetime import datetime
exp_ts = int(log.banned_at.timestamp()) + sec
expires_display = datetime.fromtimestamp(exp_ts).strftime(
'%Y-%m-%d %H:%M:%S'
)
except Exception:
expires_display = 'Never'
active_banned_ips.append(
{
'id': 'ablog-%s' % log.pk,
'ip': str(log.ip_address),
'reason': log.ban_reason or '',
'duration': log.ban_duration or 'permanent',
'banned_on': banned_on_str,
'expires': expires_display,
'active': True,
'ban_source': 'autoBanSecurityAlerts',
}
)
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(
'getBannedIPs: AutoBanSecurityAlerts merge failed: %s' % str(e)
)
def _remove_autoban_log_by_suffix_id(self, userID, ablog_id_str):
"""Handle unban when UI sends id like ablog-123 (orphan log-only row)."""
try:
pk = int(str(ablog_id_str).split('-', 1)[1])
except Exception:
final_dic = {'status': 0, 'error_message': 'Invalid auto-ban log id'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
from django.apps import apps
if not apps.is_installed('autoBanSecurityAlerts'):
final_dic = {'status': 0, 'error_message': 'Auto Ban plugin is not installed'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
from autoBanSecurityAlerts.models import AutoBanLog
log = AutoBanLog.objects.filter(pk=pk).first()
if not log:
final_dic = {'status': 0, 'error_message': 'Auto-ban log entry not found'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
ip = str(log.ip_address).strip()
admin0 = Administrator.objects.filter(acl__adminStatus=1).first()
if not admin0:
final_dic = {'status': 0, 'error_message': 'No admin user found'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
result = self.removeBannedIP(admin0.pk, {'ip': ip})
raw = getattr(result, 'content', None)
parsed = {}
if raw is not None:
try:
if isinstance(raw, bytes):
raw = raw.decode('utf-8', errors='replace')
parsed = json.loads(raw)
except Exception:
parsed = {}
err_raw = (parsed.get('error_message') or parsed.get('error') or '').strip()
err_msg = err_raw.lower()
if parsed.get('status') == 1:
log.delete()
return HttpResponse(
json.dumps(
{
'status': 1,
'message': parsed.get('message', 'IP unbanned successfully'),
}
),
content_type='application/json',
)
if 'not found' in err_msg:
log.delete()
return HttpResponse(
json.dumps(
{
'status': 1,
'message': 'Log cleared (ban was already removed or not found in firewall)',
}
),
content_type='application/json',
)
final_dic = {'status': 0, 'error_message': err_raw or 'Failed to remove ban'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
def _load_banned_ips_store(self):
"""
Load banned IPs from the primary store, falling back to legacy path.
@@ -2094,6 +2262,9 @@ class FirewallManager:
'active': True
})
# Auto Ban Security Alerts: show log-only / plugin-visible bans on same list as firewall UI
self._merge_autoban_security_alerts_logs(active_banned_ips, current_time)
# Optional server-side search: filter by IP or reason (case-insensitive substring)
# Normalize: strip query and compare against stripped IP/reason so "1.2.3.4" matches stored " 1.2.3.4 "
search_q = (data.get('search') or data.get('q') or '').strip() if data else ''
@@ -2296,6 +2467,9 @@ class FirewallManager:
return ACLManager.loadError()
banned_ip_id = data.get('id')
if isinstance(banned_ip_id, str) and banned_ip_id.startswith('ablog-'):
return self._remove_autoban_log_by_suffix_id(userID, banned_ip_id)
requested_ip = (data.get('ip') or '').strip()
ip_to_unban = None
@@ -2388,6 +2562,9 @@ class FirewallManager:
return ACLManager.loadError()
banned_ip_id = data.get('id')
if isinstance(banned_ip_id, str) and banned_ip_id.startswith('ablog-'):
return self._remove_autoban_log_by_suffix_id(userID, banned_ip_id)
requested_ip = (data.get('ip') or '').strip()
try: