Files
CyberPanel/firewall/firewallManager.py
master3395 b6d5472eb7 Fix Ban IP from Recent SSH Logs and Firewall Banned IPs
- plogical/firewallUtilities: fix inverted success/failure (result==1 = success); write blocked_ips.log under CyberCP/data for cyberpanel write access
- plogical/processUtilities: when root, use normalExecutioner return value so executioner reflects actual command success/failure
- firewall/firewallManager: addBannedIP uses FirewallUtilities.blockIP; ACL and all errors return JSON with error_message/error; rollback store if block fails
- baseTemplate/views: blockIPAddress uses FirewallUtilities.blockIP instead of subprocess
- baseTemplate/homePage: inline Ban IP calls /firewall/addBannedIP with ip/reason/duration; show server error in notifications
- baseTemplate/system-status.js: handle string response and show server error_message in success and error callbacks
2026-02-04 18:30:03 +01:00

2684 lines
108 KiB
Python

#!/usr/local/CyberCP/bin/python
import os
import os.path
import sys
import django
PROJECT_ROOT = '/usr/local/CyberCP'
while PROJECT_ROOT in sys.path:
sys.path.remove(PROJECT_ROOT)
sys.path.insert(0, PROJECT_ROOT)
from loginSystem.models import Administrator
from plogical.httpProc import httpProc
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
import json
from plogical.acl import ACLManager
import plogical.CyberCPLogFileWriter as logging
from plogical.virtualHostUtilities import virtualHostUtilities
import subprocess
from django.shortcuts import HttpResponse, render, redirect
from random import randint
import time
from plogical.firewallUtilities import FirewallUtilities
from firewall.models import FirewallRules
from plogical.modSec import modSec
from plogical.csf import CSF
from plogical.processUtilities import ProcessUtilities
from serverStatus.serverStatusUtil import ServerStatusUtil
class FirewallManager:
imunifyPath = '/usr/bin/imunify360-agent'
CLPath = '/etc/sysconfig/cloudlinux'
imunifyAVPath = '/etc/sysconfig/imunify360/integration.conf'
BANNED_IPS_PRIMARY_FILE = '/usr/local/CyberCP/data/banned_ips.json'
BANNED_IPS_LEGACY_FILE = '/etc/cyberpanel/banned_ips.json'
def __init__(self, request = None):
self.request = request
def _load_banned_ips_store(self):
"""
Load banned IPs from the primary store, falling back to legacy path.
Returns (data, path_used)
"""
for path in [self.BANNED_IPS_PRIMARY_FILE, self.BANNED_IPS_LEGACY_FILE]:
if os.path.exists(path):
try:
with open(path, 'r') as f:
return json.load(f), path
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to read banned IPs from {path}: {str(e)}')
return [], self.BANNED_IPS_PRIMARY_FILE
def _save_banned_ips_store(self, data):
"""
Persist banned IPs to the primary store in a writable location.
"""
target = self.BANNED_IPS_PRIMARY_FILE
try:
os.makedirs(os.path.dirname(target), exist_ok=True)
with open(target, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to write banned IPs to {target}: {str(e)}')
raise
return target
def securityHome(self, request = None, userID = None):
proc = httpProc(request, 'firewall/index.html',
None, 'admin')
return proc.render()
def firewallHome(self, request = None, userID = None):
csfPath = '/etc/csf'
if os.path.exists(csfPath):
return redirect('/configservercsf/')
else:
proc = httpProc(request, 'firewall/firewall.html',
None, 'admin')
return proc.render()
def getCurrentRules(self, userID = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
rules = FirewallRules.objects.all()
# Ensure CyberPanel port 7080 rule exists in database for visibility
cyberpanel_rule_exists = False
for rule in rules:
if rule.port == '7080':
cyberpanel_rule_exists = True
break
if not cyberpanel_rule_exists:
# Create database entry for port 7080 (already enabled in system firewall)
try:
cyberpanel_rule = FirewallRules(
name="CyberPanel Admin",
proto="tcp",
port="7080",
ipAddress="0.0.0.0/0"
)
cyberpanel_rule.save()
logging.CyberCPLogFileWriter.writeToFile("Added CyberPanel port 7080 to firewall database for UI visibility")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to add CyberPanel port 7080 to database: {str(e)}")
# Refresh rules after potential creation
rules = FirewallRules.objects.all()
json_data = "["
checker = 0
for items in rules:
dic = {
'id': items.id,
'name': items.name,
'proto': items.proto,
'port': items.port,
'ipAddress': items.ipAddress,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
ruleName = data['ruleName']
ruleProtocol = data['ruleProtocol']
rulePort = data['rulePort']
ruleIP = data['ruleIP']
FirewallUtilities.addRule(ruleProtocol, rulePort, ruleIP)
newFWRule = FirewallRules(name=ruleName, proto=ruleProtocol, port=rulePort, ipAddress=ruleIP)
newFWRule.save()
final_dic = {'status': 1, 'add_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
ruleID = data['id']
ruleProtocol = data['proto']
rulePort = data['port']
ruleIP = data['ruleIP']
FirewallUtilities.deleteRule(ruleProtocol, rulePort, ruleIP)
delRule = FirewallRules.objects.get(id=ruleID)
delRule.delete()
final_dic = {'status': 1, 'delete_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def reloadFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('reload_status', 0)
command = 'sudo firewall-cmd --reload'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'reload_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'reload_status': 0,
'error_message': "Can not reload firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'reload_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def startFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('start_status', 0)
command = 'sudo systemctl start firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'start_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'start_status': 0,
'error_message': "Can not start firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'start_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def stopFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('stop_status', 0)
command = 'sudo systemctl stop firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'stop_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'stop_status': 0,
'error_message': "Can not stop firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'stop_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def firewallStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
command = 'systemctl status firewalld'
status = ProcessUtilities.outputExecutioner(command)
if status.find("dead") > -1:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def secureSSH(self, request = None, userID = None):
proc = httpProc(request, 'firewall/secureSSH.html',
None, 'admin')
return proc.render()
def getSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
type = data['type']
if type == "1":
## temporarily changing permission for sshd files
pathToSSH = "/etc/ssh/sshd_config"
cat = "sudo cat " + pathToSSH
data = ProcessUtilities.outputExecutioner(cat).split('\n')
permitRootLogin = 0
sshPort = "22"
for items in data:
if items.find("PermitRootLogin") > -1:
if items.find("Yes") > -1 or items.find("yes") > -1:
permitRootLogin = 1
continue
if items.find("Port") > -1 and not items.find("GatewayPorts") > -1:
sshPort = items.split(" ")[1].strip("\n")
final_dic = {'status': 1, 'permitRootLogin': permitRootLogin, 'sshPort': sshPort}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
pathToKeyFile = "/root/.ssh/authorized_keys"
cat = "sudo cat " + pathToKeyFile
data = ProcessUtilities.outputExecutioner(cat).split('\n')
json_data = "["
checker = 0
for items in data:
if items.find("ssh-rsa") > -1:
keydata = items.split(" ")
try:
key = "ssh-rsa " + keydata[1][:50] + " .. " + keydata[2]
try:
userName = keydata[2][:keydata[2].index("@")]
except:
userName = keydata[2]
except:
key = "ssh-rsa " + keydata[1][:50]
userName = ''
dic = {'userName': userName,
'key': key,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
type = data['type']
sshPort = data['sshPort']
rootLogin = data['rootLogin']
if rootLogin == True:
rootLogin = "1"
else:
rootLogin = "0"
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " saveSSHConfigs --type " + str(type) + " --sshPort " + sshPort + " --rootLogin " + rootLogin
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
csfPath = '/etc/csf'
if os.path.exists(csfPath):
dataIn = {'protocol': 'TCP_IN', 'ports': sshPort}
self.modifyPorts(dataIn)
dataIn = {'protocol': 'TCP_OUT', 'ports': sshPort}
self.modifyPorts(dataIn)
else:
try:
updateFW = FirewallRules.objects.get(name="SSHCustom")
FirewallUtilities.deleteRule("tcp", updateFW.port, "0.0.0.0/0")
updateFW.port = sshPort
updateFW.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
except:
try:
newFireWallRule = FirewallRules(name="SSHCustom", port=sshPort, proto="tcp")
newFireWallRule.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
command = 'firewall-cmd --permanent --remove-service=ssh'
ProcessUtilities.executioner(command)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg))
final_dic = {'status': 1, 'saveStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'saveStatus': 0, "error_message": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0 ,'saveStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
key = data['key']
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " deleteSSHKey --key '" + key + "'"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'delete_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'delete_status': 1, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
key = data['key']
tempPath = "/home/cyberpanel/" + str(randint(1000, 9999))
writeToFile = open(tempPath, "w")
writeToFile.write(key)
writeToFile.close()
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " addSSHKey --tempPath " + tempPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'add_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'add_status': 0, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def loadModSecurityHome(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
OLS = 1
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
OLS = 0
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurity.html',
{'modSecInstalled': modSecInstalled, 'OLS': OLS}, 'admin')
return proc.render()
def installModSec(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installModSec', 0)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSec"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(3)
final_json = json.dumps({'installModSec': 1, 'error_message': "None"})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'installModSec': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusModSec(self, userID = None, data = None):
try:
command = "sudo cat " + modSec.installLogPath
installStatus = ProcessUtilities.outputExecutioner(command)
if installStatus.find("[200]") > -1:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSecConfigs"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
pass
else:
final_json = json.dumps({
'error_message': "Failed to install ModSecurity configurations.",
'requestStatus': installStatus,
'abort': 1,
'installed': 0,
})
return HttpResponse(final_json)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort': 1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
final_json = json.dumps({
'abort': 1,
'installed': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort': 1, 'installed': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchModSecSettings(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = 0
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/httpd_config.conf')
modSecPath = os.path.join(virtualHostUtilities.Server_root, 'modules', 'mod_security.so')
if os.path.exists(modSecPath):
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('modsecurity ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
modsecurity = 1
continue
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'modsecurity': modsecurity,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
else:
final_dic = {'fetchStatus': 1,
'installed': 0}
else:
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/modsec.conf')
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecConfigurations(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = data['modsecurity_status']
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if modsecurity == True:
modsecurity = "modsecurity on"
else:
modsecurity = "modsecurity off"
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(modsecurity + "\n")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRules(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRules.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def fetchModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/modsec/rules.conf")
if modSecInstalled:
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/rules.conf")
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0,
'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
newModSecRules = data['modSecRules']
## writing data temporary to file
rulesPath = open(modSec.tempRulesFile, "w")
rulesPath.write(newModSecRules)
rulesPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecRules"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRulesPacks(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRulesPacks.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def getOWASPAndComodoStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
comodoInstalled = 0
owaspInstalled = 0
if modSecInstalled:
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
for items in httpdConfig:
# Check for Comodo rules
if items.find('modsec/comodo') > -1:
comodoInstalled = 1
# Check for OWASP rules - improved detection
elif items.find('modsec/owasp') > -1 or items.find('owasp-modsecurity-crs') > -1:
owaspInstalled = 1
if owaspInstalled == 1 and comodoInstalled == 1:
break
# Also check rules.conf for manual OWASP installations (case-insensitive)
if owaspInstalled == 0:
rulesConfPath = os.path.join(virtualHostUtilities.Server_root, "conf/modsec/rules.conf")
if os.path.exists(rulesConfPath):
try:
command = "sudo cat " + rulesConfPath
rulesConfig = ProcessUtilities.outputExecutioner(command).splitlines()
for items in rulesConfig:
# Check for OWASP includes in rules.conf (case-insensitive)
if ('owasp' in items.lower() or 'crs-setup' in items.lower()) and \
('include' in items.lower() or 'modsecurity_rules_file' in items.lower()):
owaspInstalled = 1
break
except:
pass
# Additional check: verify OWASP files actually exist
if owaspInstalled == 0:
owaspPath = os.path.join(virtualHostUtilities.Server_root, "conf/modsec/owasp-modsecurity-crs-4.18.0")
if os.path.exists(owaspPath) and os.path.exists(os.path.join(owaspPath, "owasp-master.conf")):
owaspInstalled = 1
# Additional check: verify Comodo files actually exist
if comodoInstalled == 0:
comodoPath = os.path.join(virtualHostUtilities.Server_root, "conf/modsec/comodo")
if os.path.exists(comodoPath) and os.path.exists(os.path.join(comodoPath, "modsecurity.conf")):
comodoInstalled = 1
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
comodoInstalled = 0
owaspInstalled = 0
try:
command = 'sudo ls /usr/local/lsws/conf/comodo_litespeed/'
output = ProcessUtilities.outputExecutioner(command)
if output.find('No such') > -1:
comodoInstalled = 0
else:
comodoInstalled = 1
except subprocess.CalledProcessError:
pass
try:
command = 'cat /usr/local/lsws/conf/modsec.conf'
output = ProcessUtilities.outputExecutioner(command)
if output.find('modsec/owasp') > -1:
owaspInstalled = 1
except:
pass
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installModSecRulesPack(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
packName = data['packName']
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
# if packName == 'disableOWASP' or packName == 'installOWASP':
# final_json = json.dumps({'installStatus': 0, 'error_message': "OWASP will be available later.", })
# return HttpResponse(final_json)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'installStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def getRulesFiles(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
packName = data['packName']
confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
json_data = "["
checker = 0
counter = 0
for items in httpdConfig:
if items.find('modsec/' + packName) > -1:
counter = counter + 1
if items[0] == '#':
status = False
else:
status = True
fileName = items.lstrip('#')
fileName = fileName.split('/')[-1]
dic = {
'id': counter,
'fileName': fileName,
'packName': packName,
'status': status,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
# if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
# confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
#
# command = "sudo cat " + confPath
# httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
# checker = 0
# counter = 0
#
# for items in httpdConfig:
#
# if items.find('modsec/' + packName) > -1:
# counter = counter + 1
# if items[0] == '#':
# status = False
# else:
# status = True
#
# fileName = items.lstrip('#')
# fileName = fileName.split('/')[-1]
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
# else:
#
# command = 'cat /usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf'
# files = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
#
# counter = 0
# checker = 0
# for fileName in files:
#
# if fileName == 'categories.conf':
# continue
#
# if fileName.endswith('bak'):
# status = 0
# fileName = fileName.rstrip('.bak')
# elif fileName.endswith('conf'):
# status = 1
# else:
# continue
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# counter = counter + 1
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def enableDisableRuleFile(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
packName = data['packName']
fileName = data['fileName']
currentStatus = data['status']
if currentStatus == True:
functionName = 'disableRuleFile'
else:
functionName = 'enableRuleFile'
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + functionName + ' --packName ' + packName + ' --fileName "%s"' % (fileName)
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def csf(self):
csfInstalled = 1
try:
command = 'csf -h'
output = ProcessUtilities.outputExecutioner(command)
if output.find("command not found") > -1:
csfInstalled = 0
except subprocess.CalledProcessError:
csfInstalled = 0
proc = httpProc(self.request, 'firewall/csf.html',
{'csfInstalled': csfInstalled}, 'admin')
return proc.render()
def installCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " installCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusCSF(self):
try:
userID = self.request.session['userID']
installStatus = ProcessUtilities.outputExecutioner("sudo cat " + CSF.installLogPath)
if installStatus.find("[200]")>-1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort':1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'abort':1,
'installed':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort':1, 'installed':0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def removeCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " removeCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchCSFSettings(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
currentSettings = CSF.fetchCSFSettings()
data_ret = {"fetchStatus": 1, 'testingMode' : currentSettings['TESTING'],
'tcpIN' : currentSettings['tcpIN'],
'tcpOUT': currentSettings['tcpOUT'],
'udpIN': currentSettings['udpIN'],
'udpOUT': currentSettings['udpOUT'],
'firewallStatus': currentSettings['firewallStatus']
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': 'CSF is not installed.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def changeStatus(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
controller = data['controller']
status = data['status']
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " changeStatus --controller " + controller + " --status " + status
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyPorts(self, data = None):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
protocol = data['protocol']
ports = data['ports']
portsPath = '/home/cyberpanel/' + str(randint(1000, 9999))
if os.path.exists(portsPath):
os.remove(portsPath)
writeToFile = open(portsPath, 'w')
writeToFile.write(ports)
writeToFile.close()
command = 'chmod 600 %s' % (portsPath)
ProcessUtilities.executioner(command)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " modifyPorts --protocol " + protocol + " --ports " + portsPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyIPs(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
mode = data['mode']
ipAddress = data['ipAddress']
if mode == 'allowIP':
CSF.allowIP(ipAddress)
elif mode == 'blockIP':
CSF.blockIP(ipAddress)
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def imunify(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
data['CL'] = 1
# Auto-fix PHP-FPM issues when accessing Imunify360 page
try:
from plogical import upgrade
logging.CyberCPLogFileWriter.writeToFile("Auto-fixing PHP-FPM pool configurations for Imunify360 compatibility...")
fix_result = upgrade.Upgrade.CreateMissingPoolsforFPM()
if fix_result == 0:
logging.CyberCPLogFileWriter.writeToFile("PHP-FPM pool configurations auto-fixed successfully")
else:
logging.CyberCPLogFileWriter.writeToFile("Warning: PHP-FPM auto-fix had issues")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Error in auto-fix for Imunify360: {str(e)}")
if os.path.exists(FirewallManager.imunifyPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['CL'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
elif data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunify.html',
data, 'admin')
return proc.render()
def submitinstallImunify(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
data = json.loads(self.request.body)
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunify --key %s" % (data['key'])
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def imunifyAV(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
# Auto-fix PHP-FPM issues when accessing ImunifyAV page
try:
from plogical import upgrade
logging.CyberCPLogFileWriter.writeToFile("Auto-fixing PHP-FPM pool configurations for ImunifyAV compatibility...")
fix_result = upgrade.Upgrade.CreateMissingPoolsforFPM()
if fix_result == 0:
logging.CyberCPLogFileWriter.writeToFile("PHP-FPM pool configurations auto-fixed successfully")
else:
logging.CyberCPLogFileWriter.writeToFile("Warning: PHP-FPM auto-fix had issues")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Error in auto-fix for ImunifyAV: {str(e)}")
if os.path.exists(FirewallManager.imunifyAVPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailableAV.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunifyAV.html',
data, 'admin')
return proc.render()
def submitinstallImunifyAV(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunifyAV"
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def litespeed_ent_conf(self, request = None, userID = None):
proc = httpProc(request, 'firewall/litespeed_ent_conf.html',
None, 'admin')
return proc.render()
def fetchlitespeed_Conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
if not os.path.exists(file_path):
command = "touch /usr/local/lsws/conf/pre_main_global.conf"
ProcessUtilities.executioner(command)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveLitespeed_conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
command = f'rm -f {file_path}'
ProcessUtilities.executioner(command)
currentLitespeed_conf = data['modSecRules']
tempRulesPath = '/home/cyberpanel/pre_main_global.conf'
WriteToFile = open(tempRulesPath, 'w')
WriteToFile.write(currentLitespeed_conf)
WriteToFile.close()
command = f'mv {tempRulesPath} {file_path}'
ProcessUtilities.executioner(command)
command = f'chmod 644 {file_path} && chown lsadm:lsadm {file_path}'
ProcessUtilities.executioner(command, None, True)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def getBannedIPs(self, userID=None):
"""
Get list of banned IP addresses from database, or fall back to JSON file.
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
active_banned_ips = []
try:
from firewall.models import BannedIP
from django.db.models import Q
current_time = int(time.time())
banned_ips_queryset = BannedIP.objects.filter(
active=True
).filter(
Q(expires__isnull=True) | Q(expires__gt=current_time)
).order_by('-banned_on')
for banned_ip in banned_ips_queryset:
ip_data = {
'id': banned_ip.id,
'ip': banned_ip.ip_address,
'reason': banned_ip.reason,
'duration': banned_ip.duration,
'banned_on': banned_ip.get_banned_on_display(),
'expires': banned_ip.get_expires_display(),
'active': not banned_ip.is_expired() and banned_ip.active
}
if ip_data['active']:
active_banned_ips.append(ip_data)
except (ImportError, AttributeError) as e:
# Fall back to JSON file when BannedIP model unavailable
import plogical.CyberCPLogFileWriter as _log
_log.CyberCPLogFileWriter.writeToFile('getBannedIPs: using JSON fallback (%s)' % str(e))
active_banned_ips = []
# If DB returns nothing (or model not available), merge in JSON fallback
if not active_banned_ips:
banned_ips, _ = self._load_banned_ips_store()
for b in banned_ips:
if not b.get('active', True):
continue
exp = b.get('expires')
if exp == 'Never' or exp is None:
expires_display = 'Never'
elif isinstance(exp, (int, float)):
from datetime import datetime
try:
expires_display = datetime.fromtimestamp(exp).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
expires_display = 'Never'
else:
expires_display = str(exp)
banned_on = b.get('banned_on')
if isinstance(banned_on, (int, float)):
from datetime import datetime
try:
banned_on = datetime.fromtimestamp(banned_on).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
banned_on = 'N/A'
else:
banned_on = str(banned_on) if banned_on else 'N/A'
active_banned_ips.append({
'id': b.get('id'),
'ip': b.get('ip', ''),
'reason': b.get('reason', ''),
'duration': b.get('duration', 'permanent'),
'banned_on': banned_on,
'expires': expires_display,
'active': True
})
final_dic = {'status': 1, 'bannedIPs': active_banned_ips}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
except BaseException as msg:
import plogical.CyberCPLogFileWriter as logging
logging.CyberCPLogFileWriter.writeToFile('Error in getBannedIPs: %s' % str(msg))
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addBannedIP(self, userID=None, data=None):
"""
Add a banned IP address
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
final_dic = {'status': 0, 'error_message': 'You are not authorized to access this resource.', 'error': 'You are not authorized to access this resource.'}
return HttpResponse(json.dumps(final_dic), content_type='application/json', status=403)
ip = data.get('ip', '').strip()
reason = data.get('reason', '').strip()
duration = data.get('duration', '24h')
if not ip or not reason:
final_dic = {'status': 0, 'error_message': 'IP address and reason are required', 'error': 'IP address and reason are required'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
# Validate IP address format
import ipaddress
try:
ipaddress.ip_address(ip.split('/')[0]) # Handle CIDR notation
except ValueError:
final_dic = {'status': 0, 'error_message': 'Invalid IP address format', 'error': 'Invalid IP address format'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
# Calculate expiration time
current_time = time.time()
if duration == 'permanent':
expires = 'Never'
else:
duration_map = {
'1h': 3600,
'24h': 86400,
'7d': 604800,
'30d': 2592000
}
duration_seconds = duration_map.get(duration, 86400)
expires = current_time + duration_seconds
# Load existing banned IPs
banned_ips, _ = self._load_banned_ips_store()
# Check if IP is already banned
for banned_ip in banned_ips:
if banned_ip.get('ip') == ip and banned_ip.get('active', True):
msg = 'IP address %s is already banned' % ip
final_dic = {'status': 0, 'error_message': msg, 'error': msg}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
# Add new banned IP
new_banned_ip = {
'id': int(time.time()),
'ip': ip,
'reason': reason,
'duration': duration,
'banned_on': current_time,
'expires': expires,
'active': True
}
banned_ips.append(new_banned_ip)
# Save to file
self._save_banned_ips_store(banned_ips)
# Apply firewall rule using FirewallUtilities (runs with proper privileges via ProcessUtilities/lscpd)
try:
block_ok, block_msg = FirewallUtilities.blockIP(ip, reason)
if not block_ok:
# Rollback: remove the IP we just added from the store
banned_ips_rollback = [b for b in banned_ips if b.get('ip') != ip or not b.get('active', True)]
if len(banned_ips_rollback) < len(banned_ips):
self._save_banned_ips_store(banned_ips_rollback)
logging.CyberCPLogFileWriter.writeToFile('Failed to add firewalld rule for %s: %s' % (ip, block_msg))
err_msg = block_msg or 'Failed to add firewall rule'
final_dic = {'status': 0, 'error_message': err_msg, 'error': err_msg}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
logging.CyberCPLogFileWriter.writeToFile(f'Banned IP {ip} with reason: {reason}')
except Exception as e:
# Rollback store on any exception
try:
banned_ips_rollback = [b for b in banned_ips if b.get('ip') != ip or not b.get('active', True)]
if len(banned_ips_rollback) < len(banned_ips):
self._save_banned_ips_store(banned_ips_rollback)
except Exception:
pass
logging.CyberCPLogFileWriter.writeToFile('Failed to add firewalld rule for %s: %s' % (ip, str(e)))
err_msg = 'Firewall command failed: %s' % str(e)
final_dic = {'status': 0, 'error_message': err_msg, 'error': err_msg}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
final_dic = {'status': 1, 'message': 'IP address %s has been banned successfully' % ip}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
except BaseException as msg:
err_msg = str(msg)
final_dic = {'status': 0, 'error_message': err_msg, 'error': err_msg}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
def removeBannedIP(self, userID=None, data=None):
"""
Remove/unban an IP address.
Supports both BannedIP database model and JSON file storage.
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
banned_ip_id = data.get('id')
requested_ip = (data.get('ip') or '').strip()
ip_to_unban = None
try:
if isinstance(banned_ip_id, str) and banned_ip_id.isdigit():
banned_ip_id = int(banned_ip_id)
elif isinstance(banned_ip_id, float):
banned_ip_id = int(banned_ip_id)
except Exception:
pass
# Try database (BannedIP model) first - ids are typically small integers
try:
from firewall.models import BannedIP
except Exception as e:
BannedIP = None
logging.CyberCPLogFileWriter.writeToFile(f'Warning: BannedIP model import failed, using JSON fallback: {str(e)}')
if BannedIP is not None:
try:
banned_ip = None
if banned_ip_id not in (None, ''):
try:
banned_ip = BannedIP.objects.get(pk=banned_ip_id)
except BannedIP.DoesNotExist:
banned_ip = None
if banned_ip is None and requested_ip:
banned_ip = BannedIP.objects.filter(ip_address=requested_ip).first()
if banned_ip is None:
raise BannedIP.DoesNotExist()
ip_to_unban = banned_ip.ip_address
banned_ip.active = False
banned_ip.save()
# Remove firewalld rule using FirewallUtilities (runs with proper privileges)
try:
FirewallUtilities.unblockIP(ip_to_unban)
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Warning removing firewalld rule: {str(e)}')
final_dic = {'status': 1, 'message': f'IP address {ip_to_unban} has been unbanned successfully'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
except BannedIP.DoesNotExist:
pass
# Fall back to JSON file storage
banned_ips, _ = self._load_banned_ips_store()
# Find and update the banned IP
ip_to_unban = None
for banned_ip in banned_ips:
id_match = banned_ip_id not in (None, '') and str(banned_ip.get('id')) == str(banned_ip_id)
ip_match = requested_ip and str(banned_ip.get('ip', '')).strip() == requested_ip
if id_match or ip_match:
banned_ip['active'] = False
banned_ip['unbanned_on'] = time.time()
ip_to_unban = banned_ip['ip']
break
if not ip_to_unban:
final_dic = {'status': 0, 'error_message': 'Banned IP not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
# Save updated banned IPs
self._save_banned_ips_store(banned_ips)
# Remove firewalld rule using FirewallUtilities (runs with proper privileges)
try:
FirewallUtilities.unblockIP(ip_to_unban)
logging.CyberCPLogFileWriter.writeToFile(f'Unbanned IP {ip_to_unban}')
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Warning removing firewalld rule for {ip_to_unban}: {str(e)}')
final_dic = {'status': 1, 'message': f'IP address {ip_to_unban} has been unbanned successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
def deleteBannedIP(self, userID=None, data=None):
"""
Permanently delete a banned IP record.
Supports both BannedIP database model and JSON file storage.
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
banned_ip_id = data.get('id')
requested_ip = (data.get('ip') or '').strip()
try:
if isinstance(banned_ip_id, str) and banned_ip_id.isdigit():
banned_ip_id = int(banned_ip_id)
elif isinstance(banned_ip_id, float):
banned_ip_id = int(banned_ip_id)
except Exception:
pass
# Try database (BannedIP model) first
try:
from firewall.models import BannedIP
except Exception as e:
BannedIP = None
logging.CyberCPLogFileWriter.writeToFile(f'Warning: BannedIP model import failed, using JSON fallback: {str(e)}')
if BannedIP is not None:
try:
banned_ip = None
if banned_ip_id not in (None, ''):
try:
banned_ip = BannedIP.objects.get(pk=banned_ip_id)
except BannedIP.DoesNotExist:
banned_ip = None
if banned_ip is None and requested_ip:
banned_ip = BannedIP.objects.filter(ip_address=requested_ip).first()
if banned_ip is None:
raise BannedIP.DoesNotExist()
ip_to_delete = banned_ip.ip_address
banned_ip.delete()
logging.CyberCPLogFileWriter.writeToFile(f'Deleted banned IP record for {ip_to_delete}')
final_dic = {'status': 1, 'message': f'Banned IP record for {ip_to_delete} has been deleted successfully'}
return HttpResponse(json.dumps(final_dic), content_type='application/json')
except BannedIP.DoesNotExist:
pass
# Fall back to JSON file storage
banned_ips, _ = self._load_banned_ips_store()
# Find and remove the banned IP
ip_to_delete = None
updated_banned_ips = []
for banned_ip in banned_ips:
id_match = banned_ip_id not in (None, '') and str(banned_ip.get('id')) == str(banned_ip_id)
ip_match = requested_ip and str(banned_ip.get('ip', '')).strip() == requested_ip
if id_match or ip_match:
ip_to_delete = banned_ip['ip']
else:
updated_banned_ips.append(banned_ip)
if not ip_to_delete:
final_dic = {'status': 0, 'error_message': 'Banned IP record not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
# Save updated banned IPs
self._save_banned_ips_store(updated_banned_ips)
logging.CyberCPLogFileWriter.writeToFile(f'Deleted banned IP record for {ip_to_delete}')
final_dic = {'status': 1, 'message': f'Banned IP record for {ip_to_delete} has been deleted successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
def modifyBannedIP(self, userID=None, data=None):
"""
Modify an existing banned IP record (reason, duration).
Supports both BannedIP database model and JSON file storage.
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
banned_ip_id = data.get('id')
requested_ip = (data.get('ip') or '').strip()
reason = data.get('reason', '').strip()
duration = data.get('duration', '').strip()
try:
if isinstance(banned_ip_id, str) and banned_ip_id.isdigit():
banned_ip_id = int(banned_ip_id)
elif isinstance(banned_ip_id, float):
banned_ip_id = int(banned_ip_id)
except Exception:
pass
if banned_ip_id in (None, '') and not requested_ip:
final_dic = {'status': 0, 'error_message': 'Banned IP ID or IP address is required'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
if not reason:
final_dic = {'status': 0, 'error_message': 'Reason is required'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
# Try database (BannedIP model) first - ids are typically small integers
try:
from firewall.models import BannedIP
except Exception as e:
BannedIP = None
logging.CyberCPLogFileWriter.writeToFile(f'Warning: BannedIP model import failed, using JSON fallback: {str(e)}')
if BannedIP is not None:
try:
banned_ip = None
if banned_ip_id not in (None, ''):
try:
banned_ip = BannedIP.objects.get(pk=banned_ip_id)
except BannedIP.DoesNotExist:
banned_ip = None
if banned_ip is None and requested_ip:
banned_ip = BannedIP.objects.filter(ip_address=requested_ip).first()
if banned_ip is None:
raise BannedIP.DoesNotExist()
if not banned_ip.active:
final_dic = {'status': 0, 'error_message': 'Cannot modify an inactive/expired ban'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
banned_ip.reason = reason
if duration:
banned_ip.duration = duration
duration_map = {
'1h': 3600,
'24h': 86400,
'7d': 604800,
'30d': 2592000,
'permanent': None
}
if duration == 'permanent':
banned_ip.expires = None
else:
duration_seconds = duration_map.get(duration, 86400)
banned_ip.expires = int(time.time()) + duration_seconds
banned_ip.save()
logging.CyberCPLogFileWriter.writeToFile(f'Modified banned IP {banned_ip.ip_address} (id={banned_ip_id})')
final_dic = {'status': 1, 'message': f'Banned IP {banned_ip.ip_address} has been updated successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
except BannedIP.DoesNotExist:
pass
# Fall back to JSON file storage (ids are timestamps)
banned_ips, _ = self._load_banned_ips_store()
updated = False
for banned_ip in banned_ips:
id_match = banned_ip_id not in (None, '') and str(banned_ip.get('id')) == str(banned_ip_id)
ip_match = requested_ip and str(banned_ip.get('ip', '')).strip() == requested_ip
if (id_match or ip_match) and banned_ip.get('active', True):
banned_ip['reason'] = reason
if duration:
banned_ip['duration'] = duration
if duration == 'permanent':
banned_ip['expires'] = 'Never'
else:
duration_map = {
'1h': 3600,
'24h': 86400,
'7d': 604800,
'30d': 2592000
}
duration_seconds = duration_map.get(duration, 86400)
banned_ip['expires'] = time.time() + duration_seconds
updated = True
break
if not updated:
final_dic = {'status': 0, 'error_message': 'Banned IP record not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
self._save_banned_ips_store(banned_ips)
ip_addr = next((b.get('ip') for b in banned_ips if (str(b.get('id')) == str(banned_ip_id)) or (requested_ip and str(b.get('ip', '')).strip() == requested_ip)), 'unknown')
logging.CyberCPLogFileWriter.writeToFile(f'Modified banned IP {ip_addr} (id={banned_ip_id}) in JSON')
final_dic = {'status': 1, 'message': f'Banned IP {ip_addr} has been updated successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(f'Error in modifyBannedIP: {str(msg)}')
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json, content_type='application/json')
def exportFirewallRules(self, userID=None):
"""
Export all custom firewall rules to a JSON file, excluding default CyberPanel rules
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('exportStatus', 0)
# Get all firewall rules
rules = FirewallRules.objects.all()
# Default CyberPanel rules to exclude
default_rules = ['CyberPanel Admin', 'SSHCustom']
# Filter out default rules
custom_rules = []
for rule in rules:
if rule.name not in default_rules:
custom_rules.append({
'name': rule.name,
'proto': rule.proto,
'port': rule.port,
'ipAddress': rule.ipAddress
})
# Create export data with metadata
export_data = {
'version': '1.0',
'exported_at': time.strftime('%Y-%m-%d %H:%M:%S'),
'total_rules': len(custom_rules),
'rules': custom_rules
}
# Create JSON response with file download
json_content = json.dumps(export_data, indent=2)
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules exported successfully. Total rules: {len(custom_rules)}")
# Return file as download
response = HttpResponse(json_content, content_type='application/json')
response['Content-Disposition'] = f'attachment; filename="firewall_rules_export_{int(time.time())}.json"'
return response
except BaseException as msg:
final_dic = {'exportStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def importFirewallRules(self, userID=None, data=None):
"""
Import firewall rules from a JSON file
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('importStatus', 0)
# Handle file upload
if hasattr(self.request, 'FILES') and 'import_file' in self.request.FILES:
import_file = self.request.FILES['import_file']
# Read file content
import_data = json.loads(import_file.read().decode('utf-8'))
else:
# Fallback to file path method
import_file_path = data.get('import_file_path', '')
if not import_file_path or not os.path.exists(import_file_path):
final_dic = {'importStatus': 0, 'error_message': 'Import file not found or invalid path'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Read and parse the import file
with open(import_file_path, 'r') as f:
import_data = json.load(f)
# Validate the import data structure
if 'rules' not in import_data:
final_dic = {'importStatus': 0, 'error_message': 'Invalid import file format. Missing rules array.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Default CyberPanel rules to exclude from import
default_rules = ['CyberPanel Admin', 'SSHCustom']
for rule_data in import_data['rules']:
try:
# Skip default rules
if rule_data.get('name', '') in default_rules:
skipped_count += 1
continue
# Check if rule already exists
existing_rule = FirewallRules.objects.filter(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
).first()
if existing_rule:
skipped_count += 1
continue
# Add the rule to the system firewall
FirewallUtilities.addRule(
rule_data['proto'],
rule_data['port'],
rule_data['ipAddress']
)
# Add the rule to the database
new_rule = FirewallRules(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
)
new_rule.save()
imported_count += 1
except Exception as e:
error_count += 1
errors.append(f"Rule '{rule_data.get('name', 'Unknown')}': {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Error importing rule {rule_data.get('name', 'Unknown')}: {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules import completed. Imported: {imported_count}, Skipped: {skipped_count}, Errors: {error_count}")
final_dic = {
'importStatus': 1,
'error_message': "None",
'imported_count': imported_count,
'skipped_count': skipped_count,
'error_count': error_count,
'errors': errors
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'importStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def exportBannedIPs(self, userID=None):
"""
Export banned IPs to a JSON file
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] != 1:
return ACLManager.loadErrorJson('exportStatus', 0)
banned_records = []
# Try database model first
try:
from firewall.models import BannedIP
except Exception as e:
BannedIP = None
logging.CyberCPLogFileWriter.writeToFile(f'Warning: BannedIP model import failed, using JSON fallback: {str(e)}')
if BannedIP is not None:
try:
for banned_ip in BannedIP.objects.all().order_by('-banned_on'):
banned_records.append({
'id': banned_ip.id,
'ip': banned_ip.ip_address,
'reason': banned_ip.reason,
'duration': banned_ip.duration,
'banned_on': banned_ip.get_banned_on_display(),
'expires': banned_ip.get_expires_display(),
'active': banned_ip.active and not banned_ip.is_expired()
})
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error exporting banned IPs from DB: {str(e)}')
# If DB is unavailable/empty, fall back to JSON file
if not banned_records:
banned_ips, _ = self._load_banned_ips_store()
for b in banned_ips:
banned_records.append({
'id': b.get('id'),
'ip': b.get('ip', ''),
'reason': b.get('reason', ''),
'duration': b.get('duration', 'permanent'),
'banned_on': b.get('banned_on', 'N/A'),
'expires': b.get('expires', 'Never'),
'active': b.get('active', True)
})
export_data = {
'version': '1.0',
'exported_at': time.strftime('%Y-%m-%d %H:%M:%S'),
'total_banned_ips': len(banned_records),
'banned_ips': banned_records
}
json_content = json.dumps(export_data, indent=2)
logging.CyberCPLogFileWriter.writeToFile(f"Banned IPs exported successfully. Total: {len(banned_records)}")
response = HttpResponse(json_content, content_type='application/json')
response['Content-Disposition'] = f'attachment; filename=\"banned_ips_export_{int(time.time())}.json\"'
return response
except BaseException as msg:
final_dic = {'exportStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def importBannedIPs(self, userID=None, data=None):
"""
Import banned IPs from a JSON file
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] != 1:
return ACLManager.loadErrorJson('importStatus', 0)
request_files = getattr(self.request, 'FILES', None)
if request_files and 'import_file' in request_files:
import_file = self.request.FILES['import_file']
import_data = json.loads(import_file.read().decode('utf-8'))
else:
import_file_path = data.get('import_file_path', '') if data else ''
if not import_file_path or not os.path.exists(import_file_path):
final_dic = {'importStatus': 0, 'error_message': 'Import file not found or invalid path'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
with open(import_file_path, 'r') as f:
import_data = json.load(f)
if 'banned_ips' not in import_data or not isinstance(import_data.get('banned_ips'), list):
final_dic = {'importStatus': 0, 'error_message': 'Invalid import file format. Missing banned_ips array.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Try database model first
try:
from firewall.models import BannedIP
except Exception as e:
BannedIP = None
logging.CyberCPLogFileWriter.writeToFile(f'Warning: BannedIP model import failed, using JSON fallback: {str(e)}')
# Prepare JSON fallback store if needed
banned_ips_json = []
if BannedIP is None:
banned_ips_json, _ = self._load_banned_ips_store()
import ipaddress
for item in import_data.get('banned_ips', []):
try:
ip = (item.get('ip') or '').strip()
reason = (item.get('reason') or '').strip()
duration = (item.get('duration') or 'permanent').strip()
active = bool(item.get('active', True))
if not ip or not reason:
skipped_count += 1
continue
try:
ipaddress.ip_address(ip.split('/')[0])
except ValueError:
error_count += 1
errors.append(f"Invalid IP: {ip}")
continue
if BannedIP is not None:
existing = BannedIP.objects.filter(ip_address=ip).first()
if existing:
skipped_count += 1
continue
new_ban = BannedIP(
ip_address=ip,
reason=reason,
duration=duration,
active=active
)
if duration == 'permanent':
new_ban.expires = None
else:
duration_map = {'1h': 3600, '24h': 86400, '7d': 604800, '30d': 2592000}
duration_seconds = duration_map.get(duration, 86400)
new_ban.expires = int(time.time()) + duration_seconds
new_ban.save()
imported_count += 1
else:
# JSON fallback storage
exists = any(str(b.get('ip', '')).strip() == ip for b in banned_ips_json)
if exists:
skipped_count += 1
continue
banned_ips_json.append({
'id': int(time.time() * 1000),
'ip': ip,
'reason': reason,
'duration': duration,
'banned_on': int(time.time()),
'expires': 'Never' if duration == 'permanent' else int(time.time()) + {
'1h': 3600, '24h': 86400, '7d': 604800, '30d': 2592000
}.get(duration, 86400),
'active': active
})
imported_count += 1
except Exception as e:
error_count += 1
errors.append(f"IP '{item.get('ip', 'unknown')}': {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Error importing banned IP {item.get('ip', 'unknown')}: {str(e)}")
if BannedIP is None:
self._save_banned_ips_store(banned_ips_json)
logging.CyberCPLogFileWriter.writeToFile(f"Banned IPs import completed. Imported: {imported_count}, Skipped: {skipped_count}, Errors: {error_count}")
final_dic = {
'importStatus': 1,
'error_message': "None",
'imported_count': imported_count,
'skipped_count': skipped_count,
'error_count': error_count,
'errors': errors
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'importStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)