Implement Catch-All Email Configuration Management

- Added new model `CatchAllEmail` to store catch-all email configurations per domain.
- Implemented views for fetching, saving, and deleting catch-all email configurations, enhancing email management capabilities.
- Updated URL routing to include endpoints for catch-all email operations.
- Enhanced error handling and permission checks for email forwarding actions.

These changes improve the flexibility and user experience of email management within CyberPanel.
This commit is contained in:
Master3395
2025-12-31 22:18:33 +01:00
parent ffaa0ca63d
commit ff382f2d78
9 changed files with 3172 additions and 24 deletions

View File

@@ -30,7 +30,7 @@ import _thread
try:
from dns.models import Domains as dnsDomains
from dns.models import Records as dnsRecords
from mailServer.models import Forwardings, Pipeprograms
from mailServer.models import Forwardings, Pipeprograms, CatchAllEmail
from plogical.acl import ACLManager
from plogical.dnsUtilities import DNS
from loginSystem.models import Administrator
@@ -2030,6 +2030,151 @@ protocol sieve {
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def fetchCatchAllConfig(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if ACLManager.currentContextPermission(currentACL, 'emailForwarding') == 0:
return ACLManager.loadErrorJson('fetchStatus', 0)
data = json.loads(self.request.body)
domain = data['domain']
admin = Administrator.objects.get(pk=userID)
if ACLManager.checkOwnership(domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadErrorJson()
try:
domainObj = Domains.objects.get(domain=domain)
catchAll = CatchAllEmail.objects.get(domain=domainObj)
data_ret = {
'status': 1,
'fetchStatus': 1,
'configured': 1,
'destination': catchAll.destination,
'enabled': catchAll.enabled
}
except CatchAllEmail.DoesNotExist:
data_ret = {
'status': 1,
'fetchStatus': 1,
'configured': 0
}
except Domains.DoesNotExist:
data_ret = {
'status': 0,
'fetchStatus': 0,
'error_message': 'Domain not found in email system'
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'status': 0, 'fetchStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def saveCatchAllConfig(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if ACLManager.currentContextPermission(currentACL, 'emailForwarding') == 0:
return ACLManager.loadErrorJson('saveStatus', 0)
data = json.loads(self.request.body)
domain = data['domain']
destination = data['destination']
enabled = data.get('enabled', True)
admin = Administrator.objects.get(pk=userID)
if ACLManager.checkOwnership(domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadErrorJson()
# Validate destination email
if '@' not in destination:
data_ret = {'status': 0, 'saveStatus': 0, 'error_message': 'Invalid destination email address'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
domainObj = Domains.objects.get(domain=domain)
# Create or update catch-all config
catchAll, created = CatchAllEmail.objects.update_or_create(
domain=domainObj,
defaults={'destination': destination, 'enabled': enabled}
)
# Also add/update entry in Forwardings table for Postfix
catchAllSource = '@' + domain
if enabled:
# Remove existing catch-all forwarding if any
Forwardings.objects.filter(source=catchAllSource).delete()
# Add new forwarding
forwarding = Forwardings(source=catchAllSource, destination=destination)
forwarding.save()
else:
# Remove catch-all forwarding when disabled
Forwardings.objects.filter(source=catchAllSource).delete()
data_ret = {
'status': 1,
'saveStatus': 1,
'message': 'Catch-all email configured successfully'
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'status': 0, 'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def deleteCatchAllConfig(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if ACLManager.currentContextPermission(currentACL, 'emailForwarding') == 0:
return ACLManager.loadErrorJson('deleteStatus', 0)
data = json.loads(self.request.body)
domain = data['domain']
admin = Administrator.objects.get(pk=userID)
if ACLManager.checkOwnership(domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadErrorJson()
domainObj = Domains.objects.get(domain=domain)
# Delete catch-all config
CatchAllEmail.objects.filter(domain=domainObj).delete()
# Remove from Forwardings table
catchAllSource = '@' + domain
Forwardings.objects.filter(source=catchAllSource).delete()
data_ret = {
'status': 1,
'deleteStatus': 1,
'message': 'Catch-all email removed successfully'
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'status': 0, 'deleteStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def refreshEmailDiskUsage(self):
"""Refresh disk usage for all email accounts in a domain"""
try: