SSH trusted IPs: sync UI and analysis with firewall tab

Remove duplicate Trusted IPs management from dashboard Recent SSH Logs;
use actionable alert count on the tab (exclude info-only SSH tips).
Add sshSecurityWhitelistUtilities with normalized IP matching for logs
and analyzeSSHSecurity. Wire whitelist API routes, firewall ban guard,
and login hooks. Firewall tab remains the canonical trusted-IP editor.
This commit is contained in:
master3395
2026-04-10 17:53:17 +02:00
parent 85981f1cac
commit 8f57463550
11 changed files with 1128 additions and 62 deletions

View File

@@ -1121,6 +1121,18 @@ var dashboardStatsControllerFn = function ($scope, $http, $timeout) {
$scope.errorSSHLogs = '';
$scope.securityAlerts = [];
$scope.loadingSecurityAnalysis = false;
/** Tab badge: actionable alerts only (high/medium/low). Excludes informational SSH tips. */
$scope.actionableSecurityAlertCount = function () {
var list = $scope.securityAlerts || [];
var c = 0;
for (var i = 0; i < list.length; i++) {
var sev = (list[i] && list[i].severity) ? String(list[i].severity) : '';
if (sev !== 'info') {
c++;
}
}
return c;
};
$scope.getSSHLogsTotalPages = function() {
return Math.ceil($scope.sshLogs.length / $scope.sshLogsPerPage);
@@ -1215,6 +1227,114 @@ var dashboardStatsControllerFn = function ($scope, $http, $timeout) {
$scope.blockingIP = null;
$scope.blockedIPs = {};
// SSH Security: trusted IPs (never blocked, excluded from analysis alerts)
// Use an object for ng-model: inputs live under ng-if child scopes; primitives would not update parent.
$scope.sshSecurityWhitelist = [];
$scope.sshWhitelistMap = {};
$scope.whitelistUi = { ip: '', label: '' };
$scope._syncWhitelistMap = function () {
$scope.sshWhitelistMap = {};
if ($scope.sshSecurityWhitelist && $scope.sshSecurityWhitelist.length) {
$scope.sshSecurityWhitelist.forEach(function (r) {
$scope.sshWhitelistMap[r.ip] = true;
});
}
};
$scope._decorateWhitelistEntries = function (entries) {
$scope.sshSecurityWhitelist = (entries || []).map(function (e) {
return {
ip: e.ip,
label: e.label || '',
updated: e.updated || 0,
_l: e.label || '',
_nip: ''
};
});
$scope._syncWhitelistMap();
};
$scope.isSshWhitelisted = function (ip) {
if (!ip) return false;
return !!$scope.sshWhitelistMap[String(ip).trim()];
};
$scope.loadSshSecurityWhitelist = function () {
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistList', {}, h).then(function (res) {
if (res.data && res.data.status === 1) {
$scope._decorateWhitelistEntries(res.data.entries);
}
});
};
$scope.addSshSecurityWhitelist = function () {
var ip = ($scope.whitelistUi && $scope.whitelistUi.ip || '').trim();
var label = ($scope.whitelistUi && $scope.whitelistUi.label || '').trim();
if (!ip) {
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'Enter an IP address', type: 'warning', delay: 4000 }); }
return;
}
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistAdd', { ip: ip, label: label }, h).then(function (res) {
if (res.data && res.data.status === 1) {
if ($scope.whitelistUi) {
$scope.whitelistUi.ip = '';
$scope.whitelistUi.label = '';
}
$scope._decorateWhitelistEntries(res.data.entries);
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'IP added to trusted list', type: 'success', delay: 4000 }); }
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err = (res.data && (res.data.error || res.data.message)) ? (res.data.error || res.data.message) : 'Failed to add';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err, type: 'error', delay: 6000 }); }
}
}, function (err) {
var msg = 'Request failed';
if (err.data && err.data.error) msg = err.data.error;
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: msg, type: 'error', delay: 6000 }); }
});
};
$scope.removeSshSecurityWhitelist = function (ip) {
if (!ip) return;
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistRemove', { ip: ip }, h).then(function (res) {
if (res.data && res.data.status === 1) {
$scope._decorateWhitelistEntries(res.data.entries);
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'IP removed from trusted list', type: 'success', delay: 4000 }); }
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err2 = (res.data && res.data.error) ? res.data.error : 'Failed to remove';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err2, type: 'error', delay: 6000 }); }
}
});
};
$scope.saveSshSecurityWhitelistRow = function (row) {
if (!row || !row.ip) return;
var payload = { ip: row.ip, label: row._l };
if (row._nip && String(row._nip).trim()) payload.new_ip = String(row._nip).trim();
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistUpdate', payload, h).then(function (res) {
var d = res.data || {};
var st = d.status === 1 || d.status === '1';
if (st) {
$scope._decorateWhitelistEntries(d.entries);
if (typeof PNotify !== 'undefined') {
var unchanged = d.unchanged === true || d.unchanged === 'true' || d.unchanged === 1;
var txt = (d.message && String(d.message).length) ? d.message : (unchanged ? 'No changes to save.' : 'Entry updated');
new PNotify({ title: 'Trusted IP', text: txt, type: unchanged ? 'info' : 'success', delay: 4000 });
}
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err3 = d.error ? d.error : 'Failed to update';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err3, type: 'error', delay: 6000 }); }
}
});
};
$scope.analyzeSSHSecurity = function() {
$scope.loadingSecurityAnalysis = true;
$scope.showAddonRequired = false;
@@ -1227,6 +1347,9 @@ var dashboardStatsControllerFn = function ($scope, $http, $timeout) {
$scope.securityAlerts = [];
} else if (response.data.status === 1) {
$scope.securityAlerts = response.data.alerts;
if (response.data.whitelist_entries) {
$scope._decorateWhitelistEntries(response.data.whitelist_entries);
}
$scope.showAddonRequired = false;
}
}

View File

@@ -1002,8 +1002,8 @@
<button class="activity-tab" onclick="switchTab('ssh-logs', this)">
<i class="fas fa-file-alt"></i>
<span>Recent SSH Logs</span>
<span ng-if="securityAlerts.length > 0" style="background: #dc2626; color: white; padding: 2px 6px; border-radius: 4px; font-size: 10px; margin-left: 6px;">
{$ securityAlerts.length $}
<span ng-if="actionableSecurityAlertCount() > 0" style="background: #dc2626; color: white; padding: 2px 6px; border-radius: 4px; font-size: 10px; margin-left: 6px;" title="Actionable SSH security alerts (not log line count; tips excluded)">
{$ actionableSecurityAlertCount() $}
</span>
</button>
<button class="activity-tab" onclick="switchTab('top-process', this)">
@@ -1207,6 +1207,10 @@
</div>
<!-- Add to Firewall Button for all alerts with IP addresses -->
<div ng-if="alert.details && (alert.details['IP Address'] || alert.details['Top IP'])" style="margin-top: 12px;">
<span ng-if="isSshWhitelisted(alert.details['IP Address'] || alert.details['Top IP'])" style="display: inline-flex; align-items: center; gap: 6px; color: #0ea5e9; font-size: 12px; font-weight: 600;">
<i class="fas fa-shield-alt"></i> Trusted IP — not shown as a threat and cannot be banned from here.
</span>
<span ng-if="!isSshWhitelisted(alert.details['IP Address'] || alert.details['Top IP'])">
<button ng-click="blockIPAddress(alert.details['IP Address'] || alert.details['Top IP']); $event.stopPropagation()"
ng-disabled="blockingIP === (alert.details['IP Address'] || alert.details['Top IP'])"
style="background: #dc2626; color: white; border: none; padding: 8px 16px; border-radius: 6px; font-size: 12px; font-weight: 600; cursor: pointer; display: inline-flex; align-items: center; gap: 6px;"
@@ -1224,6 +1228,7 @@
style="margin-left: 10px; color: #10b981; font-size: 12px; font-weight: 600;">
<i class="fas fa-check-circle"></i> Blocked
</span>
</span>
</div>
</div>
<span style="background: {$ alert.severity === 'high' ? '#fee2e2' : (alert.severity === 'medium' ? '#fef3c7' : (alert.severity === 'low' ? '#dbeafe' : '#d1fae5')) $};
@@ -1263,7 +1268,7 @@
<span ng-if="!log.ip_address" style="color: #8893a7;">-</span>
</td>
<td>
<button ng-if="log.ip_address && blockingIP !== log.ip_address && !blockedIPs[log.ip_address]"
<button ng-if="log.ip_address && blockingIP !== log.ip_address && !blockedIPs[log.ip_address] && !isSshWhitelisted(log.ip_address)"
ng-click="banIPFromSSHLog(log.ip_address)"
style="background: #dc2626; color: white; border: none; padding: 6px 12px; border-radius: 6px; font-size: 11px; font-weight: 600; cursor: pointer; display: inline-flex; align-items: center; gap: 4px;"
onmouseover="this.style.background='#b91c1c'"
@@ -1283,6 +1288,11 @@
<i class="fas fa-check-circle"></i>
Banned
</span>
<span ng-if="log.ip_address && isSshWhitelisted(log.ip_address)"
style="color: #0ea5e9; font-size: 11px; font-weight: 600; display: inline-flex; align-items: center; gap: 4px;">
<i class="fas fa-shield-alt"></i>
Trusted
</span>
<span ng-if="!log.ip_address" style="color: #8893a7; font-size: 11px;">-</span>
</td>
</tr>

View File

@@ -25,6 +25,10 @@ urlpatterns = [
re_path(r'^getSSHUserActivity$', views.getSSHUserActivity, name='getSSHUserActivity'),
re_path(r'^getTopProcesses$', views.getTopProcesses, name='getTopProcesses'),
re_path(r'^analyzeSSHSecurity$', views.analyzeSSHSecurity, name='analyzeSSHSecurity'),
re_path(r'^sshSecurityWhitelistList$', views.sshSecurityWhitelistList, name='sshSecurityWhitelistList'),
re_path(r'^sshSecurityWhitelistAdd$', views.sshSecurityWhitelistAdd, name='sshSecurityWhitelistAdd'),
re_path(r'^sshSecurityWhitelistRemove$', views.sshSecurityWhitelistRemove, name='sshSecurityWhitelistRemove'),
re_path(r'^sshSecurityWhitelistUpdate$', views.sshSecurityWhitelistUpdate, name='sshSecurityWhitelistUpdate'),
re_path(r'^blockIPAddress$', views.blockIPAddress, name='blockIPAddress'),
re_path(r'^dismiss_backup_notification$', views.dismiss_backup_notification, name='dismiss_backup_notification'),
re_path(r'^dismiss_ai_scanner_notification$', views.dismiss_ai_scanner_notification, name='dismiss_ai_scanner_notification'),

View File

@@ -861,6 +861,11 @@ def getRecentSSHLogins(request):
lines = output.strip().split('\n')
logins = []
ip_cache = {}
try:
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
ssh_whitelist_ips = SSHSecurityWhitelistUtilities.ip_set()
except Exception:
ssh_whitelist_ips = set()
for line in lines:
if not line.strip() or any(x in line for x in ['reboot', 'system boot', 'wtmp begins']):
continue
@@ -928,6 +933,12 @@ def getRecentSSHLogins(request):
country, flag = 'IPv6', ''
elif ip == '127.0.0.1' or ip == '::1':
country, flag = 'Local', ''
try:
ip_for_wl = SSHSecurityWhitelistUtilities.normalize_ip(ip)
if ip_for_wl and ip_for_wl in ssh_whitelist_ips:
continue
except Exception:
pass
logins.append({
'user': user,
'ip': ip,
@@ -986,6 +997,11 @@ def getRecentSSHLogs(request):
output = ProcessUtilities.outputExecutioner(f'tail -n 500 {log_path}')
except Exception as e:
return HttpResponse(json.dumps({'error': f'Failed to read log: {str(e)}'}), content_type='application/json', status=500)
try:
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
ssh_whitelist_ips = SSHSecurityWhitelistUtilities.ip_set()
except Exception:
ssh_whitelist_ips = set()
lines = output.split('\n')
logs = []
# IP address regex patterns (IPv4)
@@ -1015,6 +1031,12 @@ def getRecentSSHLogs(request):
if not ip_address and ip_matches:
ip_address = ip_matches[0]
try:
ip_wl = SSHSecurityWhitelistUtilities.normalize_ip(ip_address) if ip_address else ''
if ip_wl and ip_wl in ssh_whitelist_ips:
continue
except Exception:
pass
logs.append({
'timestamp': timestamp,
'message': message,
@@ -1185,10 +1207,21 @@ def analyzeSSHSecurity(request):
ip = match.group(1)
repeated_connections[ip] += 1
# Trusted IPs: never show block recommendations / never block via FirewallUtilities
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
try:
whitelist_entries = SSHSecurityWhitelistUtilities.load_entries()
wl_set = {e['ip'] for e in whitelist_entries}
except Exception:
whitelist_entries = []
wl_set = set()
# Generate alerts based on analysis
# High severity: Brute force attacks
for ip, count in failed_passwords.items():
if SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(ip, wl_set):
continue
if count >= 10:
recommendation = f'Block this IP immediately:\nfirewall-cmd --permanent --add-rich-rule="rule family=ipv4 source address={ip} drop" && firewall-cmd --reload'
@@ -1204,22 +1237,30 @@ def analyzeSSHSecurity(request):
'recommendation': recommendation
})
# High severity: Root login attempts
if root_login_attempts:
# High severity: Root login attempts (exclude trusted IPs)
root_login_attempts_filtered = [
r for r in root_login_attempts
if not SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(r['ip'], wl_set)
]
if root_login_attempts_filtered:
unique_ips = set(r["ip"] for r in root_login_attempts_filtered)
top_ip = max(unique_ips, key=lambda x: sum(1 for r in root_login_attempts_filtered if r["ip"] == x))
alerts.append({
'title': 'Root Login Attempts Detected',
'description': f'Direct root login attempts detected from {len(set(r["ip"] for r in root_login_attempts))} IP addresses. Root SSH access should be disabled.',
'description': f'Direct root login attempts detected from {len(unique_ips)} IP addresses. Root SSH access should be disabled.',
'severity': 'high',
'details': {
'Unique IPs': len(set(r["ip"] for r in root_login_attempts)),
'Total Attempts': len(root_login_attempts),
'Top IP': max(set(r["ip"] for r in root_login_attempts), key=lambda x: sum(1 for r in root_login_attempts if r["ip"] == x))
'Unique IPs': len(unique_ips),
'Total Attempts': len(root_login_attempts_filtered),
'Top IP': top_ip
},
'recommendation': 'Disable root SSH login by setting "PermitRootLogin no" in /etc/ssh/sshd_config'
})
# Medium severity: Dictionary attacks
for ip, count in invalid_users.items():
if SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(ip, wl_set):
continue
if count >= 5:
if firewall_cmd == 'csf':
recommendation = f'Consider blocking this IP:\ncsf -d {ip} "Dictionary attack - {count} invalid users"\n\nAlso configure CSF Login Failure Daemon (lfd) for automatic blocking.'
@@ -1240,6 +1281,8 @@ def analyzeSSHSecurity(request):
# Medium severity: Port scanning
for ip, count in port_scan_attempts.items():
if SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(ip, wl_set):
continue
if count >= 3:
alerts.append({
'title': 'Port Scan Detected',
@@ -1255,6 +1298,8 @@ def analyzeSSHSecurity(request):
# Low severity: Successful login after failures
for ip, successes in successful_after_failures.items():
if SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(ip, wl_set):
continue
if successes:
max_failures = max(s['failures'] for s in successes)
if max_failures >= 3:
@@ -1272,6 +1317,8 @@ def analyzeSSHSecurity(request):
# High severity: Rapid connection attempts (DDoS/flooding)
for ip, count in repeated_connections.items():
if SSHSecurityWhitelistUtilities.normalized_ip_in_whitelist(ip, wl_set):
continue
if count >= 50:
if firewall_cmd == 'csf':
recommendation = f'Block this IP immediately to prevent resource exhaustion:\ncsf -d {ip} "SSH flooding - {count} connections"'
@@ -1348,12 +1395,137 @@ def analyzeSSHSecurity(request):
return HttpResponse(json.dumps({
'status': 1,
'alerts': alerts
'alerts': alerts,
'whitelist_entries': whitelist_entries,
}), content_type='application/json')
except Exception as e:
return HttpResponse(json.dumps({'error': str(e)}), content_type='application/json', status=500)
def _ssh_whitelist_acl(request):
"""Return (user_id, error_response) or (user_id, None)."""
user_id = request.session.get('userID')
if not user_id:
return None, HttpResponse(json.dumps({'error': 'Not logged in'}), content_type='application/json', status=403)
currentACL = ACLManager.loadedACL(user_id)
if not currentACL.get('admin', 0):
return None, HttpResponse(json.dumps({'error': 'Admin only'}), content_type='application/json', status=403)
if not ACLManager.CheckForPremFeature('all'):
return None, HttpResponse(json.dumps({
'status': 0,
'error': 'SSH Security trusted IPs require the same access as SSH Security Analysis (addons).',
}), content_type='application/json', status=403)
return user_id, None
@csrf_exempt
@require_POST
def sshSecurityWhitelistList(request):
try:
_, err = _ssh_whitelist_acl(request)
if err:
return err
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
try:
SSHSecurityWhitelistUtilities.ensure_cyberpanel_public_ip_whitelisted()
except Exception:
pass
entries = SSHSecurityWhitelistUtilities.load_entries()
return HttpResponse(json.dumps({
'status': 1,
'entries': entries,
}), content_type='application/json')
except Exception as e:
return HttpResponse(json.dumps({'status': 0, 'error': str(e)}), content_type='application/json', status=500)
@csrf_exempt
@require_POST
def sshSecurityWhitelistAdd(request):
try:
_, err = _ssh_whitelist_acl(request)
if err:
return err
try:
data = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
return HttpResponse(json.dumps({'status': 0, 'error': 'Invalid JSON'}), content_type='application/json', status=400)
ip = (data.get('ip') or '').strip()
label = (data.get('label') or '').strip()
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
ok, msg = SSHSecurityWhitelistUtilities.add_entry(ip, label)
if not ok:
return HttpResponse(json.dumps({'status': 0, 'error': msg}), content_type='application/json', status=400)
return HttpResponse(json.dumps({
'status': 1,
'message': 'Trusted IP added',
'ip': msg,
'entries': SSHSecurityWhitelistUtilities.load_entries(),
}), content_type='application/json')
except Exception as e:
return HttpResponse(json.dumps({'status': 0, 'error': str(e)}), content_type='application/json', status=500)
@csrf_exempt
@require_POST
def sshSecurityWhitelistRemove(request):
try:
_, err = _ssh_whitelist_acl(request)
if err:
return err
try:
data = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
return HttpResponse(json.dumps({'status': 0, 'error': 'Invalid JSON'}), content_type='application/json', status=400)
ip = (data.get('ip') or '').strip()
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
ok, msg = SSHSecurityWhitelistUtilities.remove_entry(ip)
if not ok:
return HttpResponse(json.dumps({'status': 0, 'error': msg}), content_type='application/json', status=400)
return HttpResponse(json.dumps({
'status': 1,
'message': 'Trusted IP removed',
'entries': SSHSecurityWhitelistUtilities.load_entries(),
}), content_type='application/json')
except Exception as e:
return HttpResponse(json.dumps({'status': 0, 'error': str(e)}), content_type='application/json', status=500)
@csrf_exempt
@require_POST
def sshSecurityWhitelistUpdate(request):
try:
_, err = _ssh_whitelist_acl(request)
if err:
return err
try:
data = json.loads(request.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
return HttpResponse(json.dumps({'status': 0, 'error': 'Invalid JSON'}), content_type='application/json', status=400)
ip = (data.get('ip') or '').strip()
new_ip = data.get('new_ip')
if new_ip is not None:
new_ip = str(new_ip).strip() or None
label = data.get('label')
if label is not None:
label = str(label).strip()
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
ok, msg, unchanged = SSHSecurityWhitelistUtilities.update_entry(ip, new_ip=new_ip, label=label)
if not ok:
return HttpResponse(json.dumps({'status': 0, 'error': msg}), content_type='application/json', status=400)
message = 'No changes to save.' if unchanged else 'Trusted IP updated.'
return HttpResponse(json.dumps({
'status': 1,
'message': message,
'unchanged': bool(unchanged),
'ip': msg,
'entries': SSHSecurityWhitelistUtilities.load_entries(),
}), content_type='application/json')
except Exception as e:
return HttpResponse(json.dumps({'status': 0, 'error': str(e)}), content_type='application/json', status=500)
@csrf_exempt
@require_POST
def blockIPAddress(request):

View File

@@ -2337,6 +2337,18 @@ class FirewallManager:
final_dic = {'status': 0, 'error_message': 'Invalid IP address format', 'error': 'Invalid IP address format'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
try:
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
if SSHSecurityWhitelistUtilities.is_whitelisted(ip):
final_dic = {
'status': 0,
'error_message': 'This IP is on the SSH Security trusted list and cannot be banned. Remove it from Trusted IPs first.',
'error': 'SSH Security whitelist',
}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
except Exception:
pass
current_time = time.time()
duration_map = {
'1h': 3600,

View File

@@ -40,9 +40,12 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
// Initialize rules array - prevents "Cannot read 'length' of undefined" when template evaluates rules.length before API loads
$scope.rules = [];
// Banned IPs variables tab from hash so we stay on /firewall/ (avoids 404 on servers without /firewall/firewall-rules/)
/* Use window.location.hash only. Angular $location can disagree with the fragment on /firewall/#… and wrongly map to "rules". */
function tabFromHash() {
var h = (window.location.hash || '').replace(/^#/, '');
return (h === 'banned-ips') ? 'banned' : 'rules';
var h = String(window.location.hash || '').replace(/^#/, '').toLowerCase();
if (h === 'banned-ips' || h === 'banned') return 'banned';
if (h === 'trusted-ips' || h === 'ssh-whitelist') return 'trusted';
return 'rules';
}
$scope.activeTab = tabFromHash();
$scope.bannedIPs = []; // Initialize as empty array
@@ -52,7 +55,9 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
var tab = tabFromHash();
if ($scope.activeTab !== tab) {
$scope.activeTab = tab;
if (tab === 'banned') { populateBannedIPs(); } else { populateCurrentRecords(); }
if (tab === 'banned') { populateBannedIPs(); }
else if (tab === 'trusted') { populateTrustedSSHWhitelist(); }
else { populateCurrentRecords(); }
if (!$scope.$$phase && !$scope.$root.$$phase) { $scope.$apply(); }
}
}
@@ -63,23 +68,51 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
window.addEventListener('load', function() { $timeout(applyTabFromHash, 0); });
}
// Sync tab with hash and load that tab's data on switch
$scope.setFirewallTab = function(tab) {
// Sync tab with hash and load that tab's data on switch (single source of truth from ng-click).
$scope.setFirewallTab = function(tab, $event) {
if ($event) {
try {
$event.stopPropagation();
} catch (ignoreErr) {}
}
$timeout(function() {
$scope.activeTab = tab;
window.location.hash = (tab === 'banned') ? '#banned-ips' : '#rules';
if (tab === 'banned') { populateBannedIPs(); } else { populateCurrentRecords(); }
function setHashIfNeeded(frag) {
try {
if ((window.location.hash || '') === frag) {
return;
}
var path = window.location.pathname + window.location.search + frag;
if (window.history && typeof window.history.replaceState === 'function') {
window.history.replaceState(null, '', path);
} else {
window.location.hash = frag;
}
} catch (ignoreHash) {}
}
if (tab === 'banned') {
setHashIfNeeded('#banned-ips');
populateBannedIPs();
} else if (tab === 'trusted') {
setHashIfNeeded('#trusted-ips');
populateTrustedSSHWhitelist();
} else {
setHashIfNeeded('#rules');
populateCurrentRecords();
}
}, 0);
};
// Back/forward or direct hash change: sync tab and load its data
function syncTabFromHash() {
var tab = tabFromHash();
if ($scope.activeTab !== tab) {
$scope.activeTab = tab;
if (tab === 'banned') { populateBannedIPs(); } else { populateCurrentRecords(); }
if (!$scope.$$phase && !$scope.$root.$$phase) { $scope.$apply(); }
}
$scope.$evalAsync(function() {
if ($scope.activeTab !== tab) {
$scope.activeTab = tab;
if (tab === 'banned') { populateBannedIPs(); }
else if (tab === 'trusted') { populateTrustedSSHWhitelist(); }
else { populateCurrentRecords(); }
}
});
}
window.addEventListener('hashchange', syncTabFromHash);
@@ -108,6 +141,9 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
$scope.banIP = '';
$scope.banReason = '';
$scope.banDuration = '24h';
$scope.trustedSSHWhitelist = [];
$scope.trustedForm = { ip: '', label: '' };
$scope.trustedSSHLoading = false;
$scope.bannedIPSearch = '';
$scope.searchBannedIPFilter = function(item) {
var q = ($scope.bannedIPSearch || '').toLowerCase().trim();
@@ -142,6 +178,7 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
$timeout(function() {
try {
if (newVal === 'banned' && typeof populateBannedIPs === 'function') populateBannedIPs();
else if (newVal === 'trusted' && typeof populateTrustedSSHWhitelist === 'function') populateTrustedSSHWhitelist();
else if (newVal === 'rules' && typeof populateCurrentRecords === 'function') populateCurrentRecords();
} catch (e) {}
}, 0);
@@ -246,6 +283,156 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
}
);
}
function populateTrustedSSHWhitelist() {
$scope.trustedSSHLoading = true;
var config = { headers: { 'X-CSRFToken': getCookie('csrftoken') || '' } };
$http.post('/base/sshSecurityWhitelistList', {}, config).then(
function(response) {
$scope.trustedSSHLoading = false;
var res = response.data;
if (typeof res === 'string') {
try { res = JSON.parse(res); } catch (e) { res = {}; }
}
if (res && res.status === 1) {
var ent = res.entries || [];
$scope.trustedSSHWhitelist = ent.map(function(e) {
return {
ip: e.ip,
label: e.label || '',
updated: e.updated || 0,
_l: e.label || '',
_nip: ''
};
});
} else {
$scope.trustedSSHWhitelist = [];
var errMsg = (res && res.error) ? res.error : 'Could not load trusted IPs';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: errMsg, type: 'error', delay: 6000 });
}
}
},
function(error) {
$scope.trustedSSHLoading = false;
$scope.trustedSSHWhitelist = [];
var msg = (error.data && error.data.error) ? error.data.error : 'Request failed';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: msg, type: 'error', delay: 6000 });
}
}
);
}
$scope.populateTrustedSSHWhitelist = function() {
populateTrustedSSHWhitelist();
};
$scope.addTrustedSSHWhitelist = function() {
var ip = ($scope.trustedForm.ip || '').trim();
var label = ($scope.trustedForm.label || '').trim();
if (!ip) {
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: 'Enter an IP address', type: 'warning', delay: 5000 });
}
return;
}
var config = { headers: { 'X-CSRFToken': getCookie('csrftoken') || '' } };
$http.post('/base/sshSecurityWhitelistAdd', { ip: ip, label: label }, config).then(
function(response) {
var res = response.data;
if (typeof res === 'string') {
try { res = JSON.parse(res); } catch (e) { res = {}; }
}
if (res && res.status === 1) {
$scope.trustedForm.ip = '';
$scope.trustedForm.label = '';
populateTrustedSSHWhitelist();
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: 'IP added to trusted list', type: 'success', delay: 4000 });
}
} else {
var errAdd = (res && res.error) ? res.error : 'Failed to add';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: errAdd, type: 'error', delay: 6000 });
}
}
},
function(err) {
var em = (err.data && err.data.error) ? err.data.error : 'Request failed';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: em, type: 'error', delay: 6000 });
}
}
);
};
$scope.removeTrustedSSHWhitelist = function(ip) {
if (!ip) return;
var config = { headers: { 'X-CSRFToken': getCookie('csrftoken') || '' } };
$http.post('/base/sshSecurityWhitelistRemove', { ip: ip }, config).then(
function(response) {
var res = response.data;
if (typeof res === 'string') {
try { res = JSON.parse(res); } catch (e) { res = {}; }
}
if (res && res.status === 1) {
populateTrustedSSHWhitelist();
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: 'IP removed', type: 'success', delay: 4000 });
}
} else {
var errRm = (res && res.error) ? res.error : 'Failed to remove';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: errRm, type: 'error', delay: 6000 });
}
}
},
function(err) {
var em2 = (err.data && err.data.error) ? err.data.error : 'Request failed';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: em2, type: 'error', delay: 6000 });
}
}
);
};
$scope.saveTrustedSSHWhitelistRow = function(row) {
if (!row || !row.ip) return;
var payload = { ip: row.ip, label: row._l };
if (row._nip && String(row._nip).trim()) {
payload.new_ip = String(row._nip).trim();
}
var config = { headers: { 'X-CSRFToken': getCookie('csrftoken') || '' } };
$http.post('/base/sshSecurityWhitelistUpdate', payload, config).then(
function(response) {
var res = response.data;
if (typeof res === 'string') {
try { res = JSON.parse(res); } catch (e) { res = {}; }
}
var ok = res && (res.status === 1 || res.status === '1');
if (ok) {
populateTrustedSSHWhitelist();
if (typeof PNotify !== 'undefined') {
var unchanged = res.unchanged === true || res.unchanged === 'true' || res.unchanged === 1;
var msgOk = (res.message && String(res.message).length) ? res.message : (unchanged ? 'No changes to save.' : 'Entry updated');
new PNotify({ title: 'Trusted IPs', text: msgOk, type: unchanged ? 'info' : 'success', delay: 4000 });
}
} else {
var errUp = (res && res.error) ? res.error : 'Failed to update';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: errUp, type: 'error', delay: 6000 });
}
}
},
function(err) {
var em3 = (err.data && err.data.error) ? err.data.error : 'Request failed';
if (typeof PNotify !== 'undefined') {
new PNotify({ title: 'Trusted IPs', text: em3, type: 'error', delay: 6000 });
}
}
);
};
// Expose to scope for template access
$scope.populateBannedIPs = function() {
@@ -301,7 +488,9 @@ app.controller('firewallController', function ($scope, $http, $timeout) {
window.__firewallLoadTab = function(tab) {
$scope.$evalAsync(function() {
$scope.activeTab = tab;
if (tab === 'banned') { populateBannedIPs(); } else { populateCurrentRecords(); }
if (tab === 'banned') { populateBannedIPs(); }
else if (tab === 'trusted') { populateTrustedSSHWhitelist(); }
else { populateCurrentRecords(); }
});
};
}
@@ -3254,18 +3443,20 @@ app.controller('litespeed_ent_conf', function ($scope, $http, $timeout, $window)
function syncFirewallTabFromHash() {
var nav = document.getElementById('firewall-tab-nav');
if (!nav) return;
var h = (window.location.hash || '').replace(/^#/, '');
var tab = (h === 'banned-ips') ? 'banned' : 'rules';
var h = (window.location.hash || '').replace(/^#/, '').toLowerCase();
var tab = 'rules';
if (h === 'banned-ips' || h === 'banned') tab = 'banned';
else if (h === 'trusted-ips' || h === 'ssh-whitelist') tab = 'trusted';
if (window.__firewallLoadTab) {
try { window.__firewallLoadTab(tab); } catch (e) {}
}
}
/* Initial sync only — hashchange is handled by Angular syncTabFromHash in firewallController
(multiple listeners were racing and could reset #trusted-ips to #rules). */
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', syncFirewallTabFromHash);
} else {
syncFirewallTabFromHash();
}
setTimeout(syncFirewallTabFromHash, 100);
window.addEventListener('hashchange', syncFirewallTabFromHash);
})();

View File

@@ -1257,15 +1257,19 @@
</div>
<!-- Tab Navigation: buttons with native fallback so clicks always work -->
<div class="tab-navigation" role="tablist" id="firewall-tab-nav">
<button type="button" class="tab-button" role="tab" data-tab="rules" title="{% trans 'Firewall Rules' %}" ng-class="{'tab-active': activeTab === 'rules'}" ng-click="setFirewallTab('rules')">
<div class="tab-navigation" id="firewall-tab-nav">
<button type="button" class="tab-button" data-tab="rules" title="{% trans 'Firewall Rules' %}" ng-class="{'tab-active': activeTab === 'rules'}" ng-click="setFirewallTab('rules', $event)">
<i class="fas fa-list-alt"></i>
{% trans "Firewall Rules" %}
</button>
<button type="button" class="tab-button" role="tab" data-tab="banned" title="{% trans 'Banned IPs' %}" ng-class="{'tab-active': activeTab === 'banned'}" ng-click="setFirewallTab('banned')">
<button type="button" class="tab-button" data-tab="banned" title="{% trans 'Banned IPs' %}" ng-class="{'tab-active': activeTab === 'banned'}" ng-click="setFirewallTab('banned', $event)">
<i class="fas fa-ban"></i>
{% trans "Banned IPs" %}
</button>
<button type="button" class="tab-button" data-tab="trusted" title="{% trans 'SSH trusted IPs' %}" ng-class="{'tab-active': activeTab === 'trusted'}" ng-click="setFirewallTab('trusted', $event)">
<i class="fas fa-shield-alt"></i>
{% trans "SSH trusted IPs" %}
</button>
</div>
<!-- Rules Panel (ng-show so panel is always in DOM; visibility toggled by tab) -->
@@ -1768,13 +1772,75 @@
</div>
</div>
</div>
<!-- SSH Security trusted IPs (same list as Dashboard → SSH Security Analysis) -->
<div class="banned-ips-panel trusted-ssh-panel" ng-show="activeTab === 'trusted'">
<div class="panel-header">
<div class="panel-title">
<div class="panel-icon">
<i class="fas fa-shield-alt"></i>
</div>
{% trans "SSH trusted IPs (never block)" %}
</div>
<div ng-show="trustedSSHLoading" class="loading-spinner"></div>
</div>
<div style="padding: 0 2rem 1rem; color: var(--text-secondary, #64748b); font-size: 0.95rem; line-height: 1.5;">
{% trans "These addresses are skipped in SSH Security Analysis and cannot be added under Banned IPs. Use your home or office public IP to avoid accidental lockouts." %}
</div>
<div class="add-banned-section" style="padding-top: 0;">
<form class="banned-form" onsubmit="return false;">
<div class="form-group">
<label class="form-label">{% trans "IP address" %}</label>
<input type="text" class="form-control" ng-model="trustedForm.ip" placeholder="{% trans 'Public IPv4 / IPv6' %}" autocomplete="off">
</div>
<div class="form-group">
<label class="form-label">{% trans "Label (optional)" %}</label>
<input type="text" class="form-control" ng-model="trustedForm.label" placeholder="{% trans 'e.g. Home PC' %}" autocomplete="off">
</div>
<button type="button" ng-click="addTrustedSSHWhitelist()" class="btn-add" style="background: var(--primary, #5b5fcf);">
<i class="fas fa-plus"></i>
{% trans "Add trusted IP" %}
</button>
</form>
</div>
<div class="table-responsive" style="padding: 0 2rem 2rem;">
<table class="rules-table" ng-if="trustedSSHWhitelist.length > 0">
<thead>
<tr>
<th>{% trans "IP" %}</th>
<th>{% trans "Label" %}</th>
<th>{% trans "Replace IP (optional)" %}</th>
<th>{% trans "Actions" %}</th>
</tr>
</thead>
<tbody>
<tr ng-repeat="row in trustedSSHWhitelist track by row.ip">
<td><code>{$ row.ip $}</code></td>
<td><input type="text" class="form-control" ng-model="row._l" style="min-width: 12rem;"></td>
<td><input type="text" class="form-control" ng-model="row._nip" placeholder="{% trans 'New IP' %}" style="min-width: 10rem;"></td>
<td>
<button type="button" class="btn-modify" ng-click="saveTrustedSSHWhitelistRow(row)">
<i class="fas fa-save"></i> {% trans "Save" %}
</button>
<button type="button" class="btn-delete" ng-click="removeTrustedSSHWhitelist(row.ip)">
<i class="fas fa-times"></i> {% trans "Remove" %}
</button>
</td>
</tr>
</tbody>
</table>
<div ng-if="!trustedSSHWhitelist || trustedSSHWhitelist.length === 0" class="empty-state" style="margin-top: 1rem;">
<i class="fas fa-user-shield empty-icon"></i>
<h3 class="empty-title">{% trans "No trusted IPs yet" %}</h3>
<p class="empty-text">{% trans "Add at least the public IP you use to manage this server." %}</p>
</div>
</div>
</div>
</div>
<script>
(function(){
var nav = document.getElementById('firewall-tab-nav');
if (!nav) return;
/* Bootstrapping only: race Angular controller init. Tab *clicks* use ng-click only (no duplicate nav listeners). */
function loadTabViaAngularScope(tab) {
if (!window.angular) return false;
var container = document.querySelector('.modern-container[ng-controller="firewallController"]') || document.querySelector('.modern-container');
@@ -1785,6 +1851,7 @@
scope.$evalAsync(function() {
scope.activeTab = tab;
if (tab === 'banned' && scope.populateBannedIPs) scope.populateBannedIPs();
else if (tab === 'trusted' && scope.populateTrustedSSHWhitelist) scope.populateTrustedSSHWhitelist();
else if (tab === 'rules' && scope.populateCurrentRecords) scope.populateCurrentRecords();
});
return true;
@@ -1794,7 +1861,7 @@
}
function loadTab(tab) {
if (!tab || (tab !== 'rules' && tab !== 'banned')) return;
if (!tab || (tab !== 'rules' && tab !== 'banned' && tab !== 'trusted')) return;
var done = false;
if (window.__firewallLoadTab) {
try { window.__firewallLoadTab(tab); done = true; } catch (e) {}
@@ -1811,38 +1878,18 @@
if (window.__firewallLoadTab) { try { window.__firewallLoadTab(tab); } catch (e) {} }
else { loadTabViaAngularScope(tab); }
}, 200);
setTimeout(function() {
if (window.__firewallLoadTab) { try { window.__firewallLoadTab(tab); } catch (e) {} }
else { loadTabViaAngularScope(tab); }
}, 500);
}
}
function onTabButtonClick(btn) {
var tab = btn && btn.getAttribute('data-tab');
if (!tab) return;
window.location.hash = (tab === 'banned') ? '#banned-ips' : '#rules';
loadTab(tab);
}
nav.addEventListener('click', function(e) {
var btn = (e.target && e.target.closest) ? e.target.closest('.tab-button[data-tab]') : null;
if (btn && nav.contains(btn)) onTabButtonClick(btn);
}, false);
nav.addEventListener('mousedown', function(e) {
var btn = (e.target && e.target.closest) ? e.target.closest('.tab-button[data-tab]') : null;
if (btn && nav.contains(btn)) onTabButtonClick(btn);
}, false);
function loadTabFromHash() {
var h = (window.location.hash || '').replace(/^#/, '');
var tab = (h === 'banned-ips') ? 'banned' : 'rules';
var h = (window.location.hash || '').replace(/^#/, '').toLowerCase();
var tab = 'rules';
if (h === 'banned-ips' || h === 'banned') tab = 'banned';
else if (h === 'trusted-ips' || h === 'ssh-whitelist') tab = 'trusted';
loadTab(tab);
}
var h = (window.location.hash || '').replace(/^#/, '');
if (h === 'banned-ips') loadTabFromHash();
window.addEventListener('hashchange', loadTabFromHash);
setTimeout(loadTabFromHash, 150);
setTimeout(loadTabFromHash, 500);
var h = (window.location.hash || '').replace(/^#/, '').toLowerCase();
if (h === 'banned-ips' || h === 'banned' || h === 'trusted-ips' || h === 'ssh-whitelist') loadTabFromHash();
})();
</script>

View File

@@ -145,6 +145,11 @@ def verifyLogin(request):
request.session['ipAddr'] = ipAddr
request.session.set_expiry(43200)
try:
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
SSHSecurityWhitelistUtilities.on_successful_panel_login(request, admin)
except Exception:
pass
data = {'userID': admin.pk, 'loginStatus': 1, 'error_message': "None"}
json_data = json.dumps(data)
response.write(json_data)

View File

@@ -162,6 +162,13 @@ class WebAuthnAuthenticationComplete(WebAuthnAPIView):
if ip_addr.find(':') > -1:
ip_addr = ':'.join(ip_addr.split(':')[:3])
request.session['ipAddr'] = ip_addr
try:
from loginSystem.models import Administrator
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
adm = Administrator.objects.select_related('acl').get(pk=int(result['user_id']))
SSHSecurityWhitelistUtilities.on_successful_panel_login(request, adm)
except Exception:
pass
redirect_url = data.get('redirect') or request.session.pop('webauthn_redirect', '/') or '/'
if '//' in redirect_url or not redirect_url.startswith('/'):
redirect_url = '/'
@@ -189,6 +196,13 @@ class WebAuthnAuthenticationComplete(WebAuthnAPIView):
if ip_addr.find(':') > -1:
ip_addr = ':'.join(ip_addr.split(':')[:3])
request.session['ipAddr'] = ip_addr
try:
from loginSystem.models import Administrator
from plogical.sshSecurityWhitelistUtilities import SSHSecurityWhitelistUtilities
adm = Administrator.objects.select_related('acl').get(pk=int(result['user_id']))
SSHSecurityWhitelistUtilities.on_successful_panel_login(request, adm)
except Exception:
pass
logger.info(f"WebAuthn authentication successful for user ID: {result['user_id']}")
return self.json_response(result)

View File

@@ -0,0 +1,377 @@
#!/usr/local/CyberCP/bin/python
# -*- coding: utf-8 -*-
"""
SSH Security Analysis — IPs that must never be blocked (firewalld drop / Banned IPs).
Stored in /usr/local/CyberCP/data/ssh_security_whitelist.json as a JSON array of objects:
[{"ip": "203.0.113.10", "label": "Office", "updated": 1710000000}, ...]
"""
from __future__ import annotations
import ipaddress
import json
import os
import time
from typing import Any, Dict, List, Optional, Set
WHITELIST_PATH = '/usr/local/CyberCP/data/ssh_security_whitelist.json'
PUBLIC_IP_CACHE_PATH = '/usr/local/CyberCP/data/ssh_whitelist_public_ipv4.cache.json'
FIRST_ADMIN_LOGIN_FLAG_PATH = '/usr/local/CyberCP/data/ssh_whitelist_first_admin_login.recorded'
LABEL_SERVER_AUTO = 'CyberPanel server public IPv4 (auto)'
LABEL_FIRST_ADMIN_AUTO = 'First CyberPanel admin login (auto)'
class SSHSecurityWhitelistUtilities:
"""Load/save trusted IPs for SSH security analysis and firewall ban protection."""
@staticmethod
def _ensure_data_dir() -> None:
d = os.path.dirname(WHITELIST_PATH)
if d:
try:
os.makedirs(d, mode=0o750, exist_ok=True)
except OSError:
pass
@staticmethod
def normalize_ip(ip: str) -> str:
raw = (ip or '').strip()
if not raw:
return ''
host = raw.split('/')[0].strip()
if '%' in host:
host = host.split('%', 1)[0]
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
try:
return str(ipaddress.ip_address(host))
except ValueError:
return ''
@staticmethod
def normalized_ip_in_whitelist(raw_ip: str, wl_set: Set[str]) -> bool:
"""True if raw IP from a log line normalizes to an address in wl_set."""
if not raw_ip or not wl_set:
return False
norm = SSHSecurityWhitelistUtilities.normalize_ip(raw_ip)
return bool(norm and norm in wl_set)
@staticmethod
def validate_ip(ip: str) -> Optional[str]:
norm = SSHSecurityWhitelistUtilities.normalize_ip(ip)
if not norm:
return None
try:
obj = ipaddress.ip_address(norm)
except ValueError:
return None
if (
obj.is_private
or obj.is_loopback
or obj.is_link_local
or obj.is_multicast
or obj.is_reserved
):
return None
return norm
@staticmethod
def load_entries() -> List[Dict[str, Any]]:
SSHSecurityWhitelistUtilities._ensure_data_dir()
if not os.path.isfile(WHITELIST_PATH):
return []
try:
with open(WHITELIST_PATH, 'r', encoding='utf-8', errors='replace') as f:
data = json.load(f)
if not isinstance(data, list):
return []
out: List[Dict[str, Any]] = []
for item in data:
if not isinstance(item, dict):
continue
ip = item.get('ip') or item.get('address')
norm = SSHSecurityWhitelistUtilities.normalize_ip(str(ip) if ip else '')
if not norm:
continue
label = (item.get('label') or item.get('note') or '').strip()
updated = item.get('updated') or item.get('modified') or 0
try:
updated = int(updated)
except (TypeError, ValueError):
updated = 0
out.append({'ip': norm, 'label': label, 'updated': updated})
return out
except (OSError, json.JSONDecodeError):
return []
@staticmethod
def save_entries(entries: List[Dict[str, Any]]) -> bool:
SSHSecurityWhitelistUtilities._ensure_data_dir()
try:
serializable = []
for e in entries:
serializable.append({
'ip': e['ip'],
'label': e.get('label') or '',
'updated': int(e.get('updated') or 0),
})
tmp = WHITELIST_PATH + '.tmp'
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(serializable, f, indent=2, ensure_ascii=False)
f.write('\n')
os.replace(tmp, WHITELIST_PATH)
try:
os.chmod(WHITELIST_PATH, 0o640)
except OSError:
pass
try:
import pwd
import grp
uid = pwd.getpwnam('cyberpanel').pw_uid
gid = grp.getgrnam('cyberpanel').gr_gid
os.chown(WHITELIST_PATH, uid, gid)
except (OSError, KeyError, ImportError):
pass
return True
except OSError:
return False
@staticmethod
def ip_set() -> Set[str]:
return {e['ip'] for e in SSHSecurityWhitelistUtilities.load_entries()}
@staticmethod
def is_whitelisted(ip: str) -> bool:
norm = SSHSecurityWhitelistUtilities.normalize_ip(ip)
if not norm:
return False
return norm in SSHSecurityWhitelistUtilities.ip_set()
@staticmethod
def add_entry(ip: str, label: str = '') -> tuple:
v = SSHSecurityWhitelistUtilities.validate_ip(ip)
if not v:
return False, 'Invalid or non-public IP address'
entries = SSHSecurityWhitelistUtilities.load_entries()
label = (label or '').strip()[:200]
now = int(time.time())
for e in entries:
if e['ip'] == v:
e['label'] = label
e['updated'] = now
if not SSHSecurityWhitelistUtilities.save_entries(entries):
return False, 'Failed to save whitelist'
return True, v
entries.append({'ip': v, 'label': label, 'updated': now})
if not SSHSecurityWhitelistUtilities.save_entries(entries):
return False, 'Failed to save whitelist'
return True, v
@staticmethod
def remove_entry(ip: str) -> tuple:
norm = SSHSecurityWhitelistUtilities.normalize_ip(ip)
if not norm:
return False, 'Invalid IP address'
entries = SSHSecurityWhitelistUtilities.load_entries()
new_list = [e for e in entries if e['ip'] != norm]
if len(new_list) == len(entries):
return False, 'IP not found in whitelist'
if not SSHSecurityWhitelistUtilities.save_entries(new_list):
return False, 'Failed to save whitelist'
return True, norm
@staticmethod
def update_entry(ip: str, new_ip: Optional[str] = None, label: Optional[str] = None) -> tuple:
"""
Update whitelist row. Returns (ok, ip_or_error_message, unchanged).
unchanged is True when nothing differed from stored values (still success).
"""
norm = SSHSecurityWhitelistUtilities.normalize_ip(ip)
if not norm:
return False, 'Invalid IP address', False
entries = SSHSecurityWhitelistUtilities.load_entries()
idx = next((i for i, e in enumerate(entries) if e['ip'] == norm), -1)
if idx < 0:
return False, 'IP not found in whitelist', False
now = int(time.time())
target_ip = norm
changed = False
current_label = str(entries[idx].get('label') or '').strip()[:200]
if new_ip is not None and str(new_ip).strip():
v = SSHSecurityWhitelistUtilities.validate_ip(str(new_ip).strip())
if not v:
return False, 'Invalid or non-public new IP address', False
if any(e['ip'] == v for e in entries if e['ip'] != norm):
return False, 'New IP already listed', False
if v != norm:
entries[idx]['ip'] = v
target_ip = v
changed = True
if label is not None:
new_l = str(label).strip()[:200]
if new_l != current_label:
entries[idx]['label'] = new_l
changed = True
if not changed:
return True, norm, True
entries[idx]['updated'] = now
if not SSHSecurityWhitelistUtilities.save_entries(entries):
return False, 'Failed to save whitelist', False
return True, target_ip, False
@staticmethod
def client_ip_from_request(request: Any) -> str:
"""Best-effort client IP for whitelisting (CF header or REMOTE_ADDR)."""
if request is None:
return ''
try:
meta = getattr(request, 'META', None) or {}
raw = meta.get('HTTP_CF_CONNECTING_IP') or meta.get('REMOTE_ADDR') or ''
raw = str(raw).split(',')[0].strip()
if '%' in raw:
raw = raw.split('%')[0]
return raw
except Exception:
return ''
@staticmethod
def upsert_whitelist_ip_if_absent(ip: str, label: str) -> bool:
"""
Add public IP to whitelist only if not already listed (does not overwrite labels).
"""
v = SSHSecurityWhitelistUtilities.validate_ip(ip)
if not v:
return False
entries = SSHSecurityWhitelistUtilities.load_entries()
label = (label or '').strip()[:200]
now = int(time.time())
for e in entries:
if e['ip'] == v:
return True
entries.append({'ip': v, 'label': label, 'updated': now})
return SSHSecurityWhitelistUtilities.save_entries(entries)
@staticmethod
def _fetch_ipv4_public_ip() -> str:
import re
import urllib.request
urls = (
'https://ipv4.icanhazip.com',
'https://api.ipify.org',
'https://checkip.amazonaws.com',
)
for url in urls:
try:
with urllib.request.urlopen(url, timeout=8) as resp:
ip = resp.read().decode('utf-8', errors='replace').strip()
if re.match(r'^(\d{1,3}\.){3}\d{1,3}$', ip):
return ip
except Exception:
continue
return ''
@staticmethod
def ensure_cyberpanel_public_ip_whitelisted(max_cache_age: int = 3600) -> None:
"""
Ensure this machine's outbound/public IPv4 is on the whitelist (SSH + ban protection).
Uses a short-lived cache to avoid HTTP self-lookups on every request.
"""
SSHSecurityWhitelistUtilities._ensure_data_dir()
now = int(time.time())
cached_ip = ''
cache_ts = 0
try:
if os.path.isfile(PUBLIC_IP_CACHE_PATH):
with open(PUBLIC_IP_CACHE_PATH, 'r', encoding='utf-8', errors='replace') as fc:
cj = json.load(fc)
if isinstance(cj, dict):
cached_ip = str(cj.get('ip') or '').strip()
try:
cache_ts = int(cj.get('ts') or 0)
except (TypeError, ValueError):
cache_ts = 0
except (OSError, json.JSONDecodeError):
pass
ip_to_try = ''
if cached_ip and (now - cache_ts) < max_cache_age:
ip_to_try = cached_ip
else:
detected = SSHSecurityWhitelistUtilities._fetch_ipv4_public_ip()
if detected:
ip_to_try = detected
try:
tmp = PUBLIC_IP_CACHE_PATH + '.tmp'
with open(tmp, 'w', encoding='utf-8') as ft:
json.dump({'ip': detected, 'ts': now}, ft, indent=2)
ft.write('\n')
os.replace(tmp, PUBLIC_IP_CACHE_PATH)
try:
os.chmod(PUBLIC_IP_CACHE_PATH, 0o640)
except OSError:
pass
except OSError:
pass
elif cached_ip:
ip_to_try = cached_ip
if not ip_to_try:
return
SSHSecurityWhitelistUtilities.upsert_whitelist_ip_if_absent(ip_to_try, LABEL_SERVER_AUTO)
@staticmethod
def maybe_whitelist_first_admin_login(client_ip: str) -> None:
"""
Once per installation: whitelist the client IP of the first successful login
by a user with admin ACL (adminStatus == 1).
"""
if os.path.isfile(FIRST_ADMIN_LOGIN_FLAG_PATH):
return
v = SSHSecurityWhitelistUtilities.validate_ip(client_ip)
if not v:
return
SSHSecurityWhitelistUtilities.upsert_whitelist_ip_if_absent(v, LABEL_FIRST_ADMIN_AUTO)
try:
tmp = FIRST_ADMIN_LOGIN_FLAG_PATH + '.tmp'
with open(tmp, 'w', encoding='utf-8') as tf:
tf.write(v + '\n')
os.replace(tmp, FIRST_ADMIN_LOGIN_FLAG_PATH)
try:
os.chmod(FIRST_ADMIN_LOGIN_FLAG_PATH, 0o640)
except OSError:
pass
try:
import grp
import pwd
uid = pwd.getpwnam('cyberpanel').pw_uid
gid = grp.getgrnam('cyberpanel').gr_gid
os.chown(FIRST_ADMIN_LOGIN_FLAG_PATH, uid, gid)
except (OSError, KeyError, ImportError):
pass
except OSError:
pass
@staticmethod
def on_successful_panel_login(request: Any, admin: Any) -> None:
"""
Called after a successful CyberPanel login. Safe to call on every login; errors swallowed.
"""
try:
SSHSecurityWhitelistUtilities.ensure_cyberpanel_public_ip_whitelisted()
except Exception:
pass
try:
acl = getattr(admin, 'acl', None)
if acl is None:
return
if int(getattr(acl, 'adminStatus', 0) or 0) != 1:
return
ip = SSHSecurityWhitelistUtilities.client_ip_from_request(request)
SSHSecurityWhitelistUtilities.maybe_whitelist_first_admin_login(ip)
except Exception:
pass

View File

@@ -1215,6 +1215,114 @@ var dashboardStatsControllerFn = function ($scope, $http, $timeout) {
$scope.blockingIP = null;
$scope.blockedIPs = {};
// SSH Security: trusted IPs (never blocked, excluded from analysis alerts)
// Use an object for ng-model: inputs live under ng-if child scopes; primitives would not update parent.
$scope.sshSecurityWhitelist = [];
$scope.sshWhitelistMap = {};
$scope.whitelistUi = { ip: '', label: '' };
$scope._syncWhitelistMap = function () {
$scope.sshWhitelistMap = {};
if ($scope.sshSecurityWhitelist && $scope.sshSecurityWhitelist.length) {
$scope.sshSecurityWhitelist.forEach(function (r) {
$scope.sshWhitelistMap[r.ip] = true;
});
}
};
$scope._decorateWhitelistEntries = function (entries) {
$scope.sshSecurityWhitelist = (entries || []).map(function (e) {
return {
ip: e.ip,
label: e.label || '',
updated: e.updated || 0,
_l: e.label || '',
_nip: ''
};
});
$scope._syncWhitelistMap();
};
$scope.isSshWhitelisted = function (ip) {
if (!ip) return false;
return !!$scope.sshWhitelistMap[String(ip).trim()];
};
$scope.loadSshSecurityWhitelist = function () {
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistList', {}, h).then(function (res) {
if (res.data && res.data.status === 1) {
$scope._decorateWhitelistEntries(res.data.entries);
}
});
};
$scope.addSshSecurityWhitelist = function () {
var ip = ($scope.whitelistUi && $scope.whitelistUi.ip || '').trim();
var label = ($scope.whitelistUi && $scope.whitelistUi.label || '').trim();
if (!ip) {
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'Enter an IP address', type: 'warning', delay: 4000 }); }
return;
}
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistAdd', { ip: ip, label: label }, h).then(function (res) {
if (res.data && res.data.status === 1) {
if ($scope.whitelistUi) {
$scope.whitelistUi.ip = '';
$scope.whitelistUi.label = '';
}
$scope._decorateWhitelistEntries(res.data.entries);
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'IP added to trusted list', type: 'success', delay: 4000 }); }
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err = (res.data && (res.data.error || res.data.message)) ? (res.data.error || res.data.message) : 'Failed to add';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err, type: 'error', delay: 6000 }); }
}
}, function (err) {
var msg = 'Request failed';
if (err.data && err.data.error) msg = err.data.error;
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: msg, type: 'error', delay: 6000 }); }
});
};
$scope.removeSshSecurityWhitelist = function (ip) {
if (!ip) return;
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistRemove', { ip: ip }, h).then(function (res) {
if (res.data && res.data.status === 1) {
$scope._decorateWhitelistEntries(res.data.entries);
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Trusted IP', text: 'IP removed from trusted list', type: 'success', delay: 4000 }); }
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err2 = (res.data && res.data.error) ? res.data.error : 'Failed to remove';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err2, type: 'error', delay: 6000 }); }
}
});
};
$scope.saveSshSecurityWhitelistRow = function (row) {
if (!row || !row.ip) return;
var payload = { ip: row.ip, label: row._l };
if (row._nip && String(row._nip).trim()) payload.new_ip = String(row._nip).trim();
var h = { headers: { 'X-CSRFToken': (typeof getCookie === 'function') ? getCookie('csrftoken') : '' } };
$http.post('/base/sshSecurityWhitelistUpdate', payload, h).then(function (res) {
var d = res.data || {};
var st = d.status === 1 || d.status === '1';
if (st) {
$scope._decorateWhitelistEntries(d.entries);
if (typeof PNotify !== 'undefined') {
var unchanged = d.unchanged === true || d.unchanged === 'true' || d.unchanged === 1;
var txt = (d.message && String(d.message).length) ? d.message : (unchanged ? 'No changes to save.' : 'Entry updated');
new PNotify({ title: 'Trusted IP', text: txt, type: unchanged ? 'info' : 'success', delay: 4000 });
}
if ($scope.analyzeSSHSecurity) { $scope.analyzeSSHSecurity(); }
} else {
var err3 = d.error ? d.error : 'Failed to update';
if (typeof PNotify !== 'undefined') { new PNotify({ title: 'Error', text: err3, type: 'error', delay: 6000 }); }
}
});
};
$scope.analyzeSSHSecurity = function() {
$scope.loadingSecurityAnalysis = true;
$scope.showAddonRequired = false;
@@ -1227,6 +1335,9 @@ var dashboardStatsControllerFn = function ($scope, $http, $timeout) {
$scope.securityAlerts = [];
} else if (response.data.status === 1) {
$scope.securityAlerts = response.data.alerts;
if (response.data.whitelist_entries) {
$scope._decorateWhitelistEntries(response.data.whitelist_entries);
}
$scope.showAddonRequired = false;
}
}