security fixes

This commit is contained in:
usmannasir
2025-12-18 12:18:32 +05:00
parent ae020ece7b
commit eca0c3cbeb
3 changed files with 161 additions and 17 deletions

View File

@@ -2,6 +2,7 @@
import os
import os.path
import sys
import re
from io import StringIO
import django
@@ -784,9 +785,35 @@ class BackupManager:
except:
finalDic['user'] = "root"
# SECURITY: Validate all inputs to prevent command injection
if ACLManager.commandInjectionCheck(finalDic['ipAddress']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in IP address'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['password']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in password'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['port']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in port'}
return HttpResponse(json.dumps(final_dic))
if ACLManager.commandInjectionCheck(finalDic['user']) == 1:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Invalid characters in username'}
return HttpResponse(json.dumps(final_dic))
# SECURITY: Validate port is numeric
try:
port_int = int(finalDic['port'])
if port_int < 1 or port_int > 65535:
raise ValueError("Port out of range")
except ValueError:
final_dic = {'status': 0, 'destStatus': 0, 'error_message': 'Port must be a valid number (1-65535)'}
return HttpResponse(json.dumps(final_dic))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/backupUtilities.py"
execPath = execPath + " submitDestinationCreation --ipAddress " + finalDic['ipAddress'] + " --password " \
+ finalDic['password'] + " --port " + finalDic['port'] + ' --user %s' % (finalDic['user'])
execPath = execPath + " submitDestinationCreation --ipAddress " + shlex.quote(finalDic['ipAddress']) + " --password " \
+ shlex.quote(finalDic['password']) + " --port " + shlex.quote(finalDic['port']) + ' --user %s' % (shlex.quote(finalDic['user']))
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(execPath)
@@ -880,8 +907,13 @@ class BackupManager:
ipAddress = data['IPAddress']
# SECURITY: Validate IP address to prevent command injection
if ACLManager.commandInjectionCheck(ipAddress) == 1:
final_dic = {'connStatus': 0, 'error_message': 'Invalid characters in IP address'}
return HttpResponse(json.dumps(final_dic))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/backupUtilities.py"
execPath = execPath + " getConnectionStatus --ipAddress " + ipAddress
execPath = execPath + " getConnectionStatus --ipAddress " + shlex.quote(ipAddress)
output = ProcessUtilities.executioner(execPath)
@@ -1342,16 +1374,32 @@ class BackupManager:
if ACLManager.currentContextPermission(currentACL, 'remoteBackups') == 0:
return ACLManager.loadErrorJson('remoteTransferStatus', 0)
backupDir = data['backupDir']
backupDir = str(data['backupDir'])
backupDirComplete = "/home/backup/transfer-" + str(backupDir)
# adminEmail = admin.email
# SECURITY: Validate backupDir to prevent command injection and path traversal
if ACLManager.commandInjectionCheck(backupDir) == 1:
data = {'remoteRestoreStatus': 0, 'error_message': 'Invalid characters in backup directory name'}
return HttpResponse(json.dumps(data))
##
# SECURITY: Ensure backupDir is alphanumeric only (backup dirs are typically numeric IDs)
if not re.match(r'^[a-zA-Z0-9_-]+$', backupDir):
data = {'remoteRestoreStatus': 0, 'error_message': 'Backup directory name must be alphanumeric'}
return HttpResponse(json.dumps(data))
# SECURITY: Prevent path traversal
if '..' in backupDir or '/' in backupDir:
data = {'remoteRestoreStatus': 0, 'error_message': 'Invalid backup directory path'}
return HttpResponse(json.dumps(data))
backupDirComplete = "/home/backup/transfer-" + backupDir
# SECURITY: Verify the backup directory exists
if not os.path.exists(backupDirComplete):
data = {'remoteRestoreStatus': 0, 'error_message': 'Backup directory does not exist'}
return HttpResponse(json.dumps(data))
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/remoteTransferUtilities.py"
execPath = execPath + " remoteBackupRestore --backupDirComplete " + backupDirComplete + " --backupDir " + str(
backupDir)
execPath = execPath + " remoteBackupRestore --backupDirComplete " + shlex.quote(backupDirComplete) + " --backupDir " + shlex.quote(backupDir)
ProcessUtilities.popenExecutioner(execPath)
@@ -1373,16 +1421,35 @@ class BackupManager:
if ACLManager.currentContextPermission(currentACL, 'remoteBackups') == 0:
return ACLManager.loadErrorJson('remoteTransferStatus', 0)
backupDir = data['backupDir']
backupDir = str(data['backupDir'])
# SECURITY: Validate backupDir to prevent command injection and path traversal
if ACLManager.commandInjectionCheck(backupDir) == 1:
data = {'remoteTransferStatus': 0, 'error_message': 'Invalid characters in backup directory name', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# SECURITY: Ensure backupDir is alphanumeric only
if not re.match(r'^[a-zA-Z0-9_-]+$', backupDir):
data = {'remoteTransferStatus': 0, 'error_message': 'Backup directory name must be alphanumeric', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# SECURITY: Prevent path traversal
if '..' in backupDir or '/' in backupDir:
data = {'remoteTransferStatus': 0, 'error_message': 'Invalid backup directory path', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
# admin = Administrator.objects.get(userName=username)
backupLogPath = "/home/backup/transfer-" + backupDir + "/" + "backup_log"
removalPath = "/home/backup/transfer-" + backupDir
removalPath = "/home/backup/transfer-" + str(backupDir)
# SECURITY: Verify the backup directory exists before operating on it
if not os.path.exists(removalPath):
data = {'remoteTransferStatus': 0, 'error_message': 'Backup directory does not exist', "status": "None", "complete": 0}
return HttpResponse(json.dumps(data))
time.sleep(3)
command = "sudo cat " + backupLogPath
command = "sudo cat " + shlex.quote(backupLogPath)
status = ProcessUtilities.outputExecutioner(command)
@@ -1393,14 +1460,14 @@ class BackupManager:
if status.find("completed[success]") > -1:
command = "rm -rf " + removalPath
command = "rm -rf " + shlex.quote(removalPath)
ProcessUtilities.executioner(command)
data_ret = {'remoteTransferStatus': 1, 'error_message': "None", "status": status, "complete": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
elif status.find("[5010]") > -1:
command = "sudo rm -rf " + removalPath
command = "sudo rm -rf " + shlex.quote(removalPath)
ProcessUtilities.executioner(command)
data = {'remoteTransferStatus': 0, 'error_message': status,
"status": "None", "complete": 0}