Files
CyberPanel/webmail/services/imap_client.py
usmannasir 632dc3fbe9 Fix critical webmail bugs: XSS, SSRF, install ordering, and UI issues
Security fixes:
- Escape plain text body to prevent XSS via trustAsHtml
- Add SSRF protection to image proxy (block private IPs, require auth)
- Sanitize Content-Disposition filename to prevent header injection
- Escape Sieve script values to prevent script injection
- Escape IMAP search query to prevent search injection

Install/upgrade fixes:
- Move setupWebmail() call to after Dovecot is installed (was running
  before doveadm existed, silently failing on every fresh install)
- Make setupWebmail() a static method callable from install.py
- Fix upgrade idempotency: always run dovecot.conf patching and
  migrations even if webmail.conf already exists (partial failure recovery)

Frontend fixes:
- Fix search being a no-op (was ignoring results and just reloading)
- Fix loading spinner stuck forever on API errors (add errback)
- Fix unread count decrementing on already-read messages
- Fix draft auto-save timer leak when navigating away from compose
- Fix composeToContact missing signature and auto-save
- Fix null subject crash in reply/forward
- Clear stale data when switching accounts
- Fix attachment part_id mismatch between parser and downloader

Backend fixes:
- Fix Sieve _read_response infinite loop on connection drop
- Add login check to apiSaveDraft
2026-03-05 05:10:14 +05:00

359 lines
13 KiB
Python

import imaplib
import ssl
import email
import re
from email.header import decode_header
class IMAPClient:
"""Wrapper around imaplib.IMAP4_SSL for Dovecot IMAP operations.
CyberPanel's Dovecot uses namespace: separator='.', prefix='INBOX.'
So folders are: INBOX, INBOX.Sent, INBOX.Drafts, INBOX.Deleted Items,
INBOX.Junk E-mail, INBOX.Archive, etc.
"""
# Dovecot namespace config: separator='.', prefix='INBOX.'
NS_PREFIX = 'INBOX.'
NS_SEP = '.'
# Map of standard folder purposes to actual Dovecot folder names
# (CyberPanel creates these in mailUtilities.py)
SPECIAL_FOLDERS = {
'sent': 'INBOX.Sent',
'drafts': 'INBOX.Drafts',
'trash': 'INBOX.Deleted Items',
'junk': 'INBOX.Junk E-mail',
'archive': 'INBOX.Archive',
}
def __init__(self, email_address, password, host='localhost', port=993,
master_user=None, master_password=None):
self.email_address = email_address
self.host = host
self.port = port
self.conn = None
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self.conn = imaplib.IMAP4_SSL(host, port, ssl_context=ctx)
if master_user and master_password:
login_user = '%s*%s' % (email_address, master_user)
self.conn.login(login_user, master_password)
else:
self.conn.login(email_address, password)
def close(self):
try:
self.conn.close()
except Exception:
pass
try:
self.conn.logout()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _decode_header_value(self, value):
if value is None:
return ''
decoded_parts = decode_header(value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or 'utf-8', errors='replace'))
else:
result.append(part)
return ''.join(result)
def _parse_folder_list(self, line):
if isinstance(line, bytes):
line = line.decode('utf-8', errors='replace')
match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+"?([^"]+)"?', line)
if not match:
match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+(.+)', line)
if match:
flags = match.group(1)
delimiter = match.group(2)
name = match.group(3).strip('"')
return {'name': name, 'delimiter': delimiter, 'flags': flags}
return None
def _display_name(self, folder_name):
"""Strip INBOX. prefix for display, keep INBOX as-is."""
if folder_name == 'INBOX':
return 'Inbox'
if folder_name.startswith(self.NS_PREFIX):
return folder_name[len(self.NS_PREFIX):]
return folder_name
def _folder_type(self, folder_name):
"""Identify special folder type for UI icon mapping."""
for ftype, fname in self.SPECIAL_FOLDERS.items():
if folder_name == fname:
return ftype
if folder_name == 'INBOX':
return 'inbox'
return 'folder'
def list_folders(self):
status, data = self.conn.list()
if status != 'OK':
return []
folders = []
for item in data:
if item is None:
continue
parsed = self._parse_folder_list(item)
if parsed is None:
continue
folder_name = parsed['name']
unread = 0
total = 0
try:
# Quote folder names with spaces for STATUS command
quoted = '"%s"' % folder_name
st, counts = self.conn.status(quoted, '(MESSAGES UNSEEN)')
if st == 'OK' and counts[0]:
count_str = counts[0].decode('utf-8', errors='replace') if isinstance(counts[0], bytes) else counts[0]
m = re.search(r'MESSAGES\s+(\d+)', count_str)
u = re.search(r'UNSEEN\s+(\d+)', count_str)
if m:
total = int(m.group(1))
if u:
unread = int(u.group(1))
except Exception:
pass
folders.append({
'name': folder_name,
'display_name': self._display_name(folder_name),
'folder_type': self._folder_type(folder_name),
'delimiter': parsed['delimiter'],
'flags': parsed['flags'],
'unread_count': unread,
'total_count': total,
})
return folders
def _select(self, folder):
"""Select a folder, quoting names with spaces."""
return self.conn.select('"%s"' % folder)
def list_messages(self, folder='INBOX', page=1, per_page=25, sort='date_desc'):
self._select(folder)
status, data = self.conn.uid('search', None, 'ALL')
if status != 'OK':
return {'messages': [], 'total': 0, 'page': page, 'pages': 0}
uids = data[0].split() if data[0] else []
if sort == 'date_desc':
uids = list(reversed(uids))
total = len(uids)
pages = max(1, (total + per_page - 1) // per_page)
page = max(1, min(page, pages))
start = (page - 1) * per_page
end = start + per_page
page_uids = uids[start:end]
if not page_uids:
return {'messages': [], 'total': total, 'page': page, 'pages': pages}
uid_str = b','.join(page_uids)
status, msg_data = self.conn.uid('fetch', uid_str,
'(UID FLAGS ENVELOPE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (FROM TO SUBJECT DATE)])')
if status != 'OK':
return {'messages': [], 'total': total, 'page': page, 'pages': pages}
messages = []
i = 0
while i < len(msg_data):
item = msg_data[i]
if isinstance(item, tuple) and len(item) == 2:
meta_line = item[0].decode('utf-8', errors='replace') if isinstance(item[0], bytes) else item[0]
header_bytes = item[1]
uid_match = re.search(r'UID\s+(\d+)', meta_line)
flags_match = re.search(r'FLAGS\s+\(([^)]*)\)', meta_line)
size_match = re.search(r'RFC822\.SIZE\s+(\d+)', meta_line)
uid = uid_match.group(1) if uid_match else '0'
flags = flags_match.group(1) if flags_match else ''
size = int(size_match.group(1)) if size_match else 0
msg = email.message_from_bytes(header_bytes) if isinstance(header_bytes, bytes) else email.message_from_string(header_bytes)
messages.append({
'uid': uid,
'from': self._decode_header_value(msg.get('From', '')),
'to': self._decode_header_value(msg.get('To', '')),
'subject': self._decode_header_value(msg.get('Subject', '(No Subject)')),
'date': msg.get('Date', ''),
'flags': flags,
'is_read': '\\Seen' in flags,
'is_flagged': '\\Flagged' in flags,
'has_attachment': False,
'size': size,
})
i += 1
return {'messages': messages, 'total': total, 'page': page, 'pages': pages}
def search_messages(self, folder='INBOX', query='', criteria='ALL'):
self._select(folder)
if query:
# Escape quotes to prevent IMAP search injection
safe_query = query.replace('\\', '\\\\').replace('"', '\\"')
search_criteria = '(OR OR (FROM "%s") (TO "%s") (SUBJECT "%s"))' % (safe_query, safe_query, safe_query)
else:
search_criteria = criteria
status, data = self.conn.uid('search', None, search_criteria)
if status != 'OK':
return []
return data[0].split() if data[0] else []
def get_message(self, folder, uid):
self._select(folder)
status, data = self.conn.uid('fetch', str(uid).encode(), '(RFC822 FLAGS)')
if status != 'OK' or not data or not data[0]:
return None
raw = None
flags = ''
for item in data:
if isinstance(item, tuple) and len(item) == 2:
meta = item[0].decode('utf-8', errors='replace') if isinstance(item[0], bytes) else item[0]
raw = item[1]
flags_match = re.search(r'FLAGS\s+\(([^)]*)\)', meta)
if flags_match:
flags = flags_match.group(1)
break
if raw is None:
return None
from .email_parser import EmailParser
parsed = EmailParser.parse_message(raw)
parsed['uid'] = str(uid)
parsed['flags'] = flags
parsed['is_read'] = '\\Seen' in flags
parsed['is_flagged'] = '\\Flagged' in flags
return parsed
def get_attachment(self, folder, uid, part_id):
self._select(folder)
status, data = self.conn.uid('fetch', str(uid).encode(), '(RFC822)')
if status != 'OK' or not data or not data[0]:
return None
raw = None
for item in data:
if isinstance(item, tuple) and len(item) == 2:
raw = item[1]
break
if raw is None:
return None
msg = email.message_from_bytes(raw) if isinstance(raw, bytes) else email.message_from_string(raw)
part_idx = 0
for part in msg.walk():
content_type = part.get_content_type()
if content_type.startswith('multipart/'):
continue
disposition = str(part.get('Content-Disposition', ''))
# Match the same indexing logic as email_parser.py:
# count parts that are attachments or non-text with disposition
if 'attachment' in disposition or (content_type not in ('text/html', 'text/plain') and disposition):
if str(part_idx) == str(part_id):
filename = part.get_filename() or 'attachment'
filename = self._decode_header_value(filename)
payload = part.get_payload(decode=True)
return (filename, content_type, payload)
part_idx += 1
return None
def move_messages(self, folder, uids, target_folder):
self._select(folder)
uid_str = ','.join(str(u) for u in uids)
# Quote target folder name for folders with spaces (e.g. "INBOX.Deleted Items")
quoted_target = '"%s"' % target_folder
try:
status, _ = self.conn.uid('move', uid_str, quoted_target)
if status == 'OK':
return True
except Exception:
pass
status, _ = self.conn.uid('copy', uid_str, quoted_target)
if status == 'OK':
self.conn.uid('store', uid_str, '+FLAGS', '(\\Deleted)')
self.conn.expunge()
return True
return False
def delete_messages(self, folder, uids):
self._select(folder)
uid_str = ','.join(str(u) for u in uids)
# CyberPanel/Dovecot uses "INBOX.Deleted Items" as trash
trash_folders = ['INBOX.Deleted Items', 'INBOX.Trash', 'Trash']
if folder not in trash_folders:
for trash in trash_folders:
try:
status, _ = self.conn.uid('copy', uid_str, '"%s"' % trash)
if status == 'OK':
self.conn.uid('store', uid_str, '+FLAGS', '(\\Deleted)')
self.conn.expunge()
return True
except Exception:
continue
# Already in trash or no trash folder found - permanently delete
self.conn.uid('store', uid_str, '+FLAGS', '(\\Deleted)')
self.conn.expunge()
return True
def set_flags(self, folder, uids, flags, action='add'):
self._select(folder)
uid_str = ','.join(str(u) for u in uids)
flag_str = '(%s)' % ' '.join(flags)
if action == 'add':
self.conn.uid('store', uid_str, '+FLAGS', flag_str)
elif action == 'remove':
self.conn.uid('store', uid_str, '-FLAGS', flag_str)
return True
def mark_read(self, folder, uids):
return self.set_flags(folder, uids, ['\\Seen'], 'add')
def mark_unread(self, folder, uids):
return self.set_flags(folder, uids, ['\\Seen'], 'remove')
def mark_flagged(self, folder, uids):
return self.set_flags(folder, uids, ['\\Flagged'], 'add')
def create_folder(self, name):
status, _ = self.conn.create(name)
return status == 'OK'
def rename_folder(self, old_name, new_name):
status, _ = self.conn.rename(old_name, new_name)
return status == 'OK'
def delete_folder(self, name):
status, _ = self.conn.delete(name)
return status == 'OK'
def append_message(self, folder, raw_message, flags=''):
if isinstance(raw_message, str):
raw_message = raw_message.encode('utf-8')
flag_str = '(%s)' % flags if flags else None
status, _ = self.conn.append('"%s"' % folder, flag_str, None, raw_message)
return status == 'OK'