bug fix: security checks in wp manager

This commit is contained in:
Usman Nasir
2022-06-22 16:37:02 +05:00
parent 18e4f3d5ce
commit 675f2ed386
2 changed files with 77 additions and 31 deletions

View File

@@ -957,4 +957,21 @@ class ACLManager:
except:
return 1
@staticmethod
def CheckIPBackupObjectOwner(currentACL, backupobj, user):
if currentACL['admin'] == 1:
return 1
elif backupobj.owner == user:
return 1
else:
return 0
@staticmethod
def CheckIPPluginObjectOwner(currentACL, backupobj, user):
if currentACL['admin'] == 1:
return 1
elif backupobj.owner == user:
return 1
else:
return 0

View File

@@ -178,11 +178,17 @@ class WebsiteManager:
def RestoreHome(self, request=None, userID=None, BackupID=None ):
Data = {}
currentACL = ACLManager.loadedACL(userID)
admin = Administrator.objects.get(pk=userID)
if ACLManager.CheckForPremFeature('wp-manager'):
Data['backupobj'] = WPSitesBackup.objects.get(pk=BackupID)
if ACLManager.CheckIPBackupObjectOwner(currentACL, Data['backupobj'], admin) == 1:
pass
else:
return ACLManager.loadError()
config = json.loads(Data['backupobj'].config)
Data['FileName']= config['name']
try:
@@ -215,15 +221,23 @@ class WebsiteManager:
backobj = WPSitesBackup.objects.filter(owner=admin).order_by('-id')
if ACLManager.CheckIPBackupObjectOwner(currentACL, backobj, admin) == 1:
pass
else:
return ACLManager.loadError()
try:
if DeleteID != None:
DeleteIDobj = WPSitesBackup.objects.get(pk=DeleteID, owner=admin)
config = DeleteIDobj.config
conf = json.loads(config)
FileName = conf['name']
command = "rm -r /home/backup/%s.tar.gz"%FileName
ProcessUtilities.executioner(command)
DeleteIDobj.delete()
DeleteIDobj = WPSitesBackup.objects.get(pk=DeleteID)
if ACLManager.CheckIPBackupObjectOwner(currentACL, DeleteIDobj, admin) == 1:
config = DeleteIDobj.config
conf = json.loads(config)
FileName = conf['name']
command = "rm -r /home/backup/%s.tar.gz"%FileName
ProcessUtilities.executioner(command)
DeleteIDobj.delete()
except BaseException as msg:
pass
@@ -400,7 +414,14 @@ class WebsiteManager:
def EidtPlugin(self,request=None, userID=None, pluginbID=None):
Data ={}
currentACL = ACLManager.loadedACL(userID)
admin = Administrator.objects.get(pk=userID)
pluginobj = wpplugins.objects.get(pk=pluginbID)
if ACLManager.CheckIPPluginObjectOwner(currentACL, pluginobj, admin) == 1:
pass
else:
return ACLManager.loadError()
lmo = json.loads(pluginobj.config)
Data['Selectedplugins'] = lmo
Data['pluginbID'] = pluginbID
@@ -414,16 +435,20 @@ class WebsiteManager:
def deletesPlgin(self, userID=None, data=None,):
try:
currentACL = ACLManager.loadedACL(userID)
admin = Administrator.objects.get(pk=userID)
userobj = Administrator.objects.get(pk=userID)
pluginname = data['pluginname']
pluginbBucketID = data['pluginbBucketID']
# logging.CyberCPLogFileWriter.writeToFile("pluginbID ....... %s" % pluginbBucketID)
# logging.CyberCPLogFileWriter.writeToFile("pluginname ....... %s" % pluginname)
obj = wpplugins.objects.get(pk=pluginbBucketID, owner=userobj)
if ACLManager.CheckIPPluginObjectOwner(currentACL, obj, admin) == 1:
pass
else:
return ACLManager.loadError()
ab = []
ab = json.loads(obj.config)
ab.remove(pluginname)
@@ -442,7 +467,7 @@ class WebsiteManager:
def Addplugineidt(self, userID=None, data=None,):
try:
currentACL = ACLManager.loadedACL(userID)
admin = Administrator.objects.get(pk=userID)
userobj = Administrator.objects.get(pk=userID)
pluginname = data['pluginname']
pluginbBucketID = data['pluginbBucketID']
@@ -451,6 +476,12 @@ class WebsiteManager:
#logging.CyberCPLogFileWriter.writeToFile("pluginname ....... %s" % pluginname)
pObj = wpplugins.objects.get(pk=pluginbBucketID, owner=userobj)
if ACLManager.CheckIPPluginObjectOwner(currentACL, pObj, admin) == 1:
pass
else:
return ACLManager.loadError()
listofplugin = json.loads(pObj.config)
try:
index = listofplugin.index(pluginname)
@@ -725,8 +756,6 @@ class WebsiteManager:
FinalPHPPath = '/usr/local/lsws/lsphp%s/bin/php' % (php)
command = 'sudo -u %s %s -d error_reporting=0 /usr/bin/wp theme list --skip-plugins --skip-themes --format=json --path=%s' % (Vhuser, FinalPHPPath, path)
stdoutput = ProcessUtilities.outputExecutioner(command)
json_data = stdoutput.splitlines()[-1]
@@ -902,7 +931,6 @@ class WebsiteManager:
'tempStatusPath': extraArgs['tempStatusPath']}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'status': 0, 'installStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
@@ -956,18 +984,24 @@ class WebsiteManager:
backupid = data['backupid']
DesSiteID = data['DesSite']
# bwp = WPSites.objects.get(pk=int(backupid))
# dwp = WPSites.objects.get(pk=int(DesSiteID))
#
# if ACLManager.checkOwnership(bwp.owner.domain, admin, currentACL) == 1:
# pass
# else:
# return ACLManager.loadError()
#
# if ACLManager.checkOwnership(dwp.owner.domain, admin, currentACL) == 1:
# pass
# else:
# return ACLManager.loadError()
try:
bwp = WPSites.objects.get(pk=int(backupid))
if ACLManager.checkOwnership(bwp.owner.domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadError()
except:
pass
dwp = WPSites.objects.get(pk=int(DesSiteID))
if ACLManager.checkOwnership(dwp.owner.domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadError()
Domain = data['Domain']
@@ -1171,17 +1205,12 @@ class WebsiteManager:
extraArgs['path'] = path
extraArgs['Vhuser'] = Vhuser
background = ApplicationInstaller('UpdateWPTheme', extraArgs)
background.start()
time.sleep(2)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)