Files
CyberPanel/webmail/webmailManager.py

825 lines
32 KiB
Python
Raw Permalink Normal View History

import json
import os
import base64
from django.http import HttpResponse
from django.shortcuts import render, redirect
from .models import Contact, ContactGroup, ContactGroupMembership, WebmailSettings, SieveRule
from .services.imap_client import IMAPClient
from .services.smtp_client import SMTPClient
from .services.email_composer import EmailComposer
from .services.sieve_client import SieveClient
import plogical.CyberCPLogFileWriter as logging
WEBMAIL_CONF = '/etc/cyberpanel/webmail.conf'
class WebmailManager:
def __init__(self, request):
self.request = request
# ── Helpers ────────────────────────────────────────────────
@staticmethod
def _json_response(data):
return HttpResponse(json.dumps(data), content_type='application/json')
@staticmethod
def _error(msg):
return HttpResponse(json.dumps({'status': 0, 'error_message': str(msg)}),
content_type='application/json')
@staticmethod
def _success(extra=None):
data = {'status': 1}
if extra:
data.update(extra)
return HttpResponse(json.dumps(data), content_type='application/json')
def _get_post_data(self):
try:
return json.loads(self.request.body)
except Exception:
return self.request.POST.dict()
def _get_email(self):
# Check for explicit email in POST body (from account switcher)
# This ensures the correct account is used even if session is stale
try:
data = json.loads(self.request.body)
explicit = data.get('fromAccount', '')
if explicit:
accounts = self._get_managed_accounts()
if explicit in accounts:
self.request.session['webmail_email'] = explicit
return explicit
except Exception:
pass
return self.request.session.get('webmail_email')
def _get_master_config(self):
"""Read master user config from /etc/cyberpanel/webmail.conf"""
try:
with open(WEBMAIL_CONF, 'r') as f:
config = json.load(f)
return config.get('master_user'), config.get('master_password')
except Exception:
return None, None
def _get_imap(self, email_addr=None):
"""Create IMAP client, preferring master user auth for SSO sessions."""
addr = email_addr or self._get_email()
if not addr:
raise Exception('No email account selected')
master_user, master_pass = self._get_master_config()
if master_user and master_pass:
return IMAPClient(addr, '', master_user=master_user, master_password=master_pass)
# Fallback: standalone login with stored password
password = self.request.session.get('webmail_password', '')
return IMAPClient(addr, password)
def _get_smtp(self):
addr = self._get_email()
if not addr:
raise Exception('No email account selected')
# If using master user (SSO), we can't auth to SMTP since
# auth_master_user_separator is not set in Dovecot.
# Use local relay via port 25 instead (Postfix permits localhost).
master_user, master_pass = self._get_master_config()
is_standalone = self.request.session.get('webmail_standalone', False)
if master_user and master_pass and not is_standalone:
return SMTPClient(addr, '', use_local_relay=True)
password = self.request.session.get('webmail_password', '')
return SMTPClient(addr, password)
def _get_sieve(self, email_addr=None):
addr = email_addr or self._get_email()
if not addr:
raise Exception('No email account selected')
master_user, master_pass = self._get_master_config()
if master_user and master_pass:
return SieveClient(addr, '', master_user=master_user, master_password=master_pass)
password = self.request.session.get('webmail_password', '')
return SieveClient(addr, password)
def _get_managed_accounts(self):
"""Get email accounts the current CyberPanel user can access."""
try:
from plogical.acl import ACLManager
from loginSystem.models import Administrator
from mailServer.models import Domains, EUsers
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
websites = ACLManager.findAllSites(currentACL, userID)
websites = websites + ACLManager.findChildDomains(websites)
accounts = []
for site in websites:
try:
domain = Domains.objects.get(domain=site)
for eu in EUsers.objects.filter(emailOwner=domain):
accounts.append(eu.email)
except Exception:
continue
return accounts
except Exception:
return []
# ── Page Renders ──────────────────────────────────────────
def loadWebmail(self):
from plogical.httpProc import httpProc
email = self._get_email()
accounts = self._get_managed_accounts()
if not email and accounts:
if len(accounts) == 1:
self.request.session['webmail_email'] = accounts[0]
email = accounts[0]
else:
# Multiple accounts - render picker
proc = httpProc(self.request, 'webmail/index.html',
{'accounts': json.dumps(accounts), 'show_picker': True},
'listEmails')
return proc.render()
proc = httpProc(self.request, 'webmail/index.html',
{'email': email or '',
'accounts': json.dumps(accounts),
'show_picker': False},
'listEmails')
return proc.render()
def loadLogin(self):
return render(self.request, 'webmail/login.html')
# ── Auth APIs ─────────────────────────────────────────────
def apiLogin(self):
data = self._get_post_data()
email_addr = data.get('email', '')
password = data.get('password', '')
if not email_addr or not password:
return self._error('Email and password are required.')
try:
client = IMAPClient(email_addr, password)
client.close()
except Exception as e:
return self._error('Login failed: %s' % str(e))
self.request.session['webmail_email'] = email_addr
self.request.session['webmail_password'] = password
self.request.session['webmail_standalone'] = True
return self._success()
def apiLogout(self):
for key in ['webmail_email', 'webmail_password', 'webmail_standalone']:
self.request.session.pop(key, None)
return self._success()
def apiSSO(self):
"""Auto-login for CyberPanel users."""
accounts = self._get_managed_accounts()
if not accounts:
return self._error('No email accounts found for your user.')
# Preserve previously selected account if still valid
current = self.request.session.get('webmail_email')
if not current or current not in accounts:
current = accounts[0]
self.request.session['webmail_email'] = current
return self._success({'email': current, 'accounts': accounts})
def apiListAccounts(self):
accounts = self._get_managed_accounts()
return self._success({'accounts': accounts})
def apiSwitchAccount(self):
data = self._get_post_data()
email = data.get('email', '')
accounts = self._get_managed_accounts()
if email not in accounts:
return self._error('You do not have access to this account.')
self.request.session['webmail_email'] = email
return self._success({'email': email})
# ── Folder APIs ───────────────────────────────────────────
def apiListFolders(self):
try:
with self._get_imap() as imap:
folders = imap.list_folders()
return self._success({'folders': folders})
except Exception as e:
return self._error(str(e))
def apiCreateFolder(self):
data = self._get_post_data()
name = data.get('name', '')
if not name:
return self._error('Folder name is required.')
try:
with self._get_imap() as imap:
if imap.create_folder(name):
return self._success()
return self._error('Failed to create folder.')
except Exception as e:
return self._error(str(e))
def apiRenameFolder(self):
data = self._get_post_data()
old_name = data.get('oldName', '')
new_name = data.get('newName', '')
if not old_name or not new_name:
return self._error('Both old and new folder names are required.')
try:
with self._get_imap() as imap:
if imap.rename_folder(old_name, new_name):
return self._success()
return self._error('Failed to rename folder.')
except Exception as e:
return self._error(str(e))
def apiDeleteFolder(self):
data = self._get_post_data()
name = data.get('name', '')
if not name:
return self._error('Folder name is required.')
# CyberPanel/Dovecot folder names (INBOX. prefix, separator '.')
protected = ['INBOX', 'INBOX.Sent', 'INBOX.Drafts', 'INBOX.Deleted Items',
'INBOX.Junk E-mail', 'INBOX.Archive']
if name in protected:
return self._error('Cannot delete system folder.')
try:
with self._get_imap() as imap:
if imap.delete_folder(name):
return self._success()
return self._error('Failed to delete folder.')
except Exception as e:
return self._error(str(e))
# ── Message APIs ──────────────────────────────────────────
def apiListMessages(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
page = int(data.get('page', 1))
per_page = int(data.get('perPage', 25))
try:
with self._get_imap() as imap:
result = imap.list_messages(folder, page, per_page)
return self._success(result)
except Exception as e:
return self._error(str(e))
def apiSearchMessages(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
query = data.get('query', '')
try:
with self._get_imap() as imap:
uids = imap.search_messages(folder, query)
uids = [u.decode() if isinstance(u, bytes) else str(u) for u in uids]
return self._success({'uids': uids})
except Exception as e:
return self._error(str(e))
def apiGetMessage(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uid = data.get('uid', '')
if not uid:
return self._error('Message UID is required.')
try:
with self._get_imap() as imap:
msg = imap.get_message(folder, uid)
if msg is None:
return self._error('Message not found.')
imap.mark_read(folder, [uid])
return self._success({'message': msg})
except Exception as e:
return self._error(str(e))
def apiGetAttachment(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uid = data.get('uid', '')
part_id = data.get('partId', '')
try:
with self._get_imap() as imap:
result = imap.get_attachment(folder, uid, part_id)
if result is None:
return self._error('Attachment not found.')
filename, content_type, payload = result
response = HttpResponse(payload, content_type=content_type)
# Sanitize filename to prevent header injection and path traversal
import os as _os
safe_filename = _os.path.basename(filename)
safe_filename = safe_filename.replace('"', '_').replace('\r', '').replace('\n', '').replace('\x00', '')
if not safe_filename:
safe_filename = 'attachment'
response['Content-Disposition'] = 'attachment; filename="%s"' % safe_filename
return response
except Exception as e:
return self._error(str(e))
# ── Action APIs ───────────────────────────────────────────
def apiSendMessage(self):
try:
# For multipart forms, check fromAccount in POST data
if self.request.content_type and 'multipart' in self.request.content_type:
from_account = self.request.POST.get('fromAccount', '')
if from_account:
accounts = self._get_managed_accounts()
if from_account in accounts:
self.request.session['webmail_email'] = from_account
email_addr = self._get_email()
if not email_addr:
return self._error('Not logged in.')
# Handle multipart form data for attachments
if self.request.content_type and 'multipart' in self.request.content_type:
to = self.request.POST.get('to', '')
cc = self.request.POST.get('cc', '')
bcc = self.request.POST.get('bcc', '')
subject = self.request.POST.get('subject', '')
body_html = self.request.POST.get('body', '')
in_reply_to = self.request.POST.get('inReplyTo', '')
references = self.request.POST.get('references', '')
attachments = []
for key in self.request.FILES:
f = self.request.FILES[key]
attachments.append((f.name, f.content_type, f.read()))
else:
data = self._get_post_data()
to = data.get('to', '')
cc = data.get('cc', '')
bcc = data.get('bcc', '')
subject = data.get('subject', '')
body_html = data.get('body', '')
in_reply_to = data.get('inReplyTo', '')
references = data.get('references', '')
attachments = None
if not to:
return self._error('At least one recipient is required.')
mime_msg = EmailComposer.compose(
from_addr=email_addr,
to_addrs=to,
subject=subject,
body_html=body_html,
cc_addrs=cc,
bcc_addrs=bcc,
attachments=attachments,
in_reply_to=in_reply_to,
references=references,
)
smtp = self._get_smtp()
result = smtp.send_message(mime_msg)
if not result['success']:
return self._error(result.get('error', 'Failed to send.'))
# Save to Sent folder
try:
with self._get_imap() as imap:
raw = mime_msg.as_bytes()
smtp.save_to_sent(imap, raw)
except Exception:
pass
# Auto-collect contacts
try:
settings = WebmailSettings.objects.filter(email_account=email_addr).first()
if settings is None or settings.auto_collect_contacts:
self._auto_collect(email_addr, to, cc)
except Exception:
pass
return self._success({'messageId': result['message_id'], 'sentFrom': email_addr})
except Exception as e:
return self._error(str(e))
def _auto_collect(self, owner, to_addrs, cc_addrs=''):
"""Auto-save recipients as contacts."""
import re
all_addrs = '%s,%s' % (to_addrs, cc_addrs) if cc_addrs else to_addrs
emails = re.findall(r'[\w.+-]+@[\w-]+\.[\w.-]+', all_addrs)
for addr in emails:
if addr == owner:
continue
Contact.objects.get_or_create(
owner_email=owner,
email_address=addr,
defaults={'is_auto_collected': True, 'display_name': addr.split('@')[0]},
)
def apiSaveDraft(self):
try:
email_addr = self._get_email()
if not email_addr:
return self._error('Not logged in.')
data = self._get_post_data()
to = data.get('to', '')
subject = data.get('subject', '')
body_html = data.get('body', '')
mime_msg = EmailComposer.compose(
from_addr=email_addr,
to_addrs=to,
subject=subject,
body_html=body_html,
)
with self._get_imap() as imap:
# CyberPanel's Dovecot uses INBOX.Drafts
draft_folders = ['INBOX.Drafts', 'Drafts', 'Draft']
saved = False
for folder in draft_folders:
try:
if imap.append_message(folder, mime_msg.as_bytes(), '\\Draft \\Seen'):
saved = True
break
except Exception:
continue
if not saved:
imap.create_folder('INBOX.Drafts')
imap.append_message('INBOX.Drafts', mime_msg.as_bytes(), '\\Draft \\Seen')
return self._success()
except Exception as e:
return self._error(str(e))
def apiDeleteMessages(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uids = data.get('uids', [])
if not uids:
return self._error('No messages selected.')
try:
with self._get_imap() as imap:
imap.delete_messages(folder, uids)
return self._success()
except Exception as e:
return self._error(str(e))
def apiMoveMessages(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uids = data.get('uids', [])
target = data.get('targetFolder', '')
if not uids or not target:
return self._error('Messages and target folder are required.')
try:
with self._get_imap() as imap:
imap.move_messages(folder, uids, target)
return self._success()
except Exception as e:
return self._error(str(e))
def apiMarkRead(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uids = data.get('uids', [])
try:
with self._get_imap() as imap:
imap.mark_read(folder, uids)
return self._success()
except Exception as e:
return self._error(str(e))
def apiMarkUnread(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uids = data.get('uids', [])
try:
with self._get_imap() as imap:
imap.mark_unread(folder, uids)
return self._success()
except Exception as e:
return self._error(str(e))
def apiMarkFlagged(self):
data = self._get_post_data()
folder = data.get('folder', 'INBOX')
uids = data.get('uids', [])
try:
with self._get_imap() as imap:
imap.mark_flagged(folder, uids)
return self._success()
except Exception as e:
return self._error(str(e))
# ── Contact APIs ──────────────────────────────────────────
def apiListContacts(self):
email = self._get_email()
try:
contacts = list(Contact.objects.filter(owner_email=email).values(
'id', 'display_name', 'email_address', 'phone', 'organization', 'notes', 'is_auto_collected'
))
return self._success({'contacts': contacts})
except Exception as e:
return self._error(str(e))
def apiCreateContact(self):
email = self._get_email()
data = self._get_post_data()
try:
contact = Contact.objects.create(
owner_email=email,
display_name=data.get('displayName', ''),
email_address=data.get('emailAddress', ''),
phone=data.get('phone', ''),
organization=data.get('organization', ''),
notes=data.get('notes', ''),
)
return self._success({'id': contact.id})
except Exception as e:
return self._error(str(e))
def apiUpdateContact(self):
email = self._get_email()
data = self._get_post_data()
contact_id = data.get('id')
try:
contact = Contact.objects.get(id=contact_id, owner_email=email)
for field in ['display_name', 'email_address', 'phone', 'organization', 'notes']:
camel = field.replace('_', ' ').title().replace(' ', '')
camel = camel[0].lower() + camel[1:]
if camel in data:
setattr(contact, field, data[camel])
contact.save()
return self._success()
except Contact.DoesNotExist:
return self._error('Contact not found.')
except Exception as e:
return self._error(str(e))
def apiDeleteContact(self):
email = self._get_email()
data = self._get_post_data()
contact_id = data.get('id')
try:
Contact.objects.filter(id=contact_id, owner_email=email).delete()
return self._success()
except Exception as e:
return self._error(str(e))
def apiSearchContacts(self):
email = self._get_email()
data = self._get_post_data()
query = data.get('query', '')
try:
from django.db.models import Q
contacts = Contact.objects.filter(
owner_email=email
).filter(
Q(display_name__icontains=query) | Q(email_address__icontains=query)
).values('id', 'display_name', 'email_address')[:20]
return self._success({'contacts': list(contacts)})
except Exception as e:
return self._error(str(e))
def apiListContactGroups(self):
email = self._get_email()
try:
groups = list(ContactGroup.objects.filter(owner_email=email).values('id', 'name'))
return self._success({'groups': groups})
except Exception as e:
return self._error(str(e))
def apiCreateContactGroup(self):
email = self._get_email()
data = self._get_post_data()
name = data.get('name', '')
if not name:
return self._error('Group name is required.')
try:
group = ContactGroup.objects.create(owner_email=email, name=name)
return self._success({'id': group.id})
except Exception as e:
return self._error(str(e))
def apiDeleteContactGroup(self):
email = self._get_email()
data = self._get_post_data()
group_id = data.get('id')
try:
ContactGroup.objects.filter(id=group_id, owner_email=email).delete()
return self._success()
except Exception as e:
return self._error(str(e))
# ── Sieve Rule APIs ───────────────────────────────────────
def apiListRules(self):
email = self._get_email()
try:
rules = list(SieveRule.objects.filter(email_account=email).values(
'id', 'name', 'priority', 'is_active',
'condition_field', 'condition_type', 'condition_value',
'action_type', 'action_value',
))
return self._success({'rules': rules})
except Exception as e:
return self._error(str(e))
def apiCreateRule(self):
email = self._get_email()
data = self._get_post_data()
try:
rule = SieveRule.objects.create(
email_account=email,
name=data.get('name', 'New Rule'),
priority=data.get('priority', 0),
is_active=data.get('isActive', True),
condition_field=data.get('conditionField', 'from'),
condition_type=data.get('conditionType', 'contains'),
condition_value=data.get('conditionValue', ''),
action_type=data.get('actionType', 'move'),
action_value=data.get('actionValue', ''),
)
self._sync_sieve_rules(email)
return self._success({'id': rule.id})
except Exception as e:
return self._error(str(e))
def apiUpdateRule(self):
email = self._get_email()
data = self._get_post_data()
rule_id = data.get('id')
try:
rule = SieveRule.objects.get(id=rule_id, email_account=email)
for field in ['name', 'priority', 'is_active', 'condition_field',
'condition_type', 'condition_value', 'action_type', 'action_value']:
camel = field.replace('_', ' ').title().replace(' ', '')
camel = camel[0].lower() + camel[1:]
if camel in data:
val = data[camel]
if field == 'is_active':
val = bool(val)
elif field == 'priority':
val = int(val)
setattr(rule, field, val)
rule.save()
self._sync_sieve_rules(email)
return self._success()
except SieveRule.DoesNotExist:
return self._error('Rule not found.')
except Exception as e:
return self._error(str(e))
def apiDeleteRule(self):
email = self._get_email()
data = self._get_post_data()
rule_id = data.get('id')
try:
SieveRule.objects.filter(id=rule_id, email_account=email).delete()
self._sync_sieve_rules(email)
return self._success()
except Exception as e:
return self._error(str(e))
def apiActivateRules(self):
email = self._get_email()
try:
self._sync_sieve_rules(email)
return self._success()
except Exception as e:
return self._error(str(e))
def _sync_sieve_rules(self, email):
"""Generate sieve script from DB rules and upload to Dovecot.
ManageSieve may not be available if dovecot-sieve/pigeonhole is not
installed or if the ManageSieve service isn't running on port 4190.
Rules are always saved to the database; Sieve sync is best-effort.
"""
rules = SieveRule.objects.filter(email_account=email, is_active=True).order_by('priority')
rule_dicts = []
for r in rules:
rule_dicts.append({
'name': r.name,
'condition_field': r.condition_field,
'condition_type': r.condition_type,
'condition_value': r.condition_value,
'action_type': r.action_type,
'action_value': r.action_value,
})
script = SieveClient.rules_to_sieve(rule_dicts)
try:
with self._get_sieve(email) as sieve:
sieve.put_script('cyberpanel', script)
sieve.activate_script('cyberpanel')
except ConnectionRefusedError:
logging.CyberCPLogFileWriter.writeToFile(
'Sieve sync skipped for %s: ManageSieve not running on port 4190. '
'Install dovecot-sieve and enable ManageSieve.' % email)
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile('Sieve sync failed for %s: %s' % (email, str(e)))
# ── Settings APIs ─────────────────────────────────────────
def apiGetSettings(self):
email = self._get_email()
try:
settings, created = WebmailSettings.objects.get_or_create(email_account=email)
return self._success({
'settings': {
'displayName': settings.display_name,
'signatureHtml': settings.signature_html,
'messagesPerPage': settings.messages_per_page,
'defaultReplyBehavior': settings.default_reply_behavior,
'themePreference': settings.theme_preference,
'autoCollectContacts': settings.auto_collect_contacts,
}
})
except Exception as e:
return self._error(str(e))
def apiSaveSettings(self):
email = self._get_email()
data = self._get_post_data()
try:
settings, created = WebmailSettings.objects.get_or_create(email_account=email)
if 'displayName' in data:
settings.display_name = data['displayName']
if 'signatureHtml' in data:
settings.signature_html = data['signatureHtml']
if 'messagesPerPage' in data:
settings.messages_per_page = int(data['messagesPerPage'])
if 'defaultReplyBehavior' in data:
settings.default_reply_behavior = data['defaultReplyBehavior']
if 'themePreference' in data:
settings.theme_preference = data['themePreference']
if 'autoCollectContacts' in data:
settings.auto_collect_contacts = bool(data['autoCollectContacts'])
settings.save()
return self._success()
except Exception as e:
return self._error(str(e))
# ── Image Proxy ───────────────────────────────────────────
def apiProxyImage(self):
"""Proxy external images to prevent tracking and mixed content."""
if not self._get_email():
return self._error('Not logged in.')
url_b64 = self.request.GET.get('url', '') or self.request.POST.get('url', '')
try:
url = base64.urlsafe_b64decode(url_b64).decode('utf-8')
except Exception:
return self._error('Invalid URL.')
if not url.startswith(('http://', 'https://')):
return self._error('Invalid URL scheme.')
# Block internal/private IPs to prevent SSRF
import urllib.parse
import socket
import ipaddress
hostname = urllib.parse.urlparse(url).hostname or ''
try:
resolved_ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(resolved_ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast or ip_obj.is_reserved:
return self._error('Invalid URL.')
except (socket.gaierror, ValueError):
return self._error('Invalid URL.')
try:
import urllib.request
req = urllib.request.Request(url, headers={
'User-Agent': 'CyberPanel-Webmail-Proxy/1.0',
})
with urllib.request.urlopen(req, timeout=10) as resp:
content_type = resp.headers.get('Content-Type', 'image/png')
if not content_type.startswith('image/'):
return self._error('Not an image.')
data = resp.read(5 * 1024 * 1024) # 5MB max
return HttpResponse(data, content_type=content_type)
except Exception:
return self._error('Failed to fetch image.')