fix(pluginHolder): resolve CyberPanel admin identity for activation APIs

Use session userID -> Administrator email for subscription checks, activation persistence, and paid-plugin access when Django auth user is not populated.
This commit is contained in:
master3395
2026-03-26 23:16:45 +01:00
parent 8d3e2cd51a
commit 2b23826948
2 changed files with 84 additions and 28 deletions

View File

@@ -133,6 +133,43 @@ def verify_saved_activation_key(plugin_name, user_identity, activation_key):
logging.writeToFile('plugin_access.verify_saved_activation_key failed: %s' % str(e))
return False
def _resolve_identity_for_request(request):
"""
CyberPanel often authenticates via session userID (not Django auth user).
Prefer Administrator email when available, otherwise username.
"""
candidates = []
try:
if getattr(request, 'user', None) and request.user.is_authenticated:
u = request.user
email = getattr(u, 'email', None) or ''
if email:
candidates.append(email)
uname = getattr(u, 'username', None) or ''
if uname:
candidates.append(uname)
except Exception:
pass
try:
uid = request.session.get('userID') if hasattr(request, 'session') else None
if uid:
from loginSystem.models import Administrator
admin = Administrator.objects.filter(pk=uid).only('email', 'userName').first()
if admin:
if getattr(admin, 'email', '') and str(admin.email).lower() != 'none':
candidates.append(str(admin.email))
if getattr(admin, 'userName', ''):
candidates.append(str(admin.userName))
except Exception:
pass
for item in candidates:
item = (item or '').strip()
if item:
return item.lower()
return ''
def check_plugin_access(request, plugin_name, plugin_meta=None):
"""
Check if user has access to a plugin
@@ -166,21 +203,7 @@ def check_plugin_access(request, plugin_name, plugin_meta=None):
if not plugin_meta or not plugin_meta.get('is_paid', False):
return default_response
# Plugin is paid - check Patreon membership
if not request.user or not request.user.is_authenticated:
return {
'has_access': False,
'is_paid': True,
'message': 'Please log in to access this plugin',
'patreon_url': plugin_meta.get('patreon_url')
}
# Get user email
user_email = getattr(request.user, 'email', None)
if not user_email:
# Try to get from username or other fields
user_email = getattr(request.user, 'username', '')
user_email = _resolve_identity_for_request(request)
if not user_email:
return {
'has_access': False,

View File

@@ -49,6 +49,42 @@ PLUGIN_SOURCE_PATHS = ['/home/cyberpanel/plugins', '/home/cyberpanel-plugins']
BUILTIN_PLUGINS = frozenset(['emailMarketing', 'emailPremium'])
def _resolve_logged_in_plugin_identity(request):
"""
CyberPanel often authenticates via session userID (not Django auth user).
Use Administrator email when available, otherwise username.
"""
candidates = []
try:
if getattr(request, 'user', None) and request.user.is_authenticated:
u = request.user
email = getattr(u, 'email', None) or ''
if email:
candidates.append(email)
uname = getattr(u, 'username', None) or ''
if uname:
candidates.append(uname)
except Exception:
pass
try:
uid = request.session.get('userID') if hasattr(request, 'session') else None
if uid:
from loginSystem.models import Administrator
admin = Administrator.objects.filter(pk=uid).only('email', 'userName').first()
if admin:
if getattr(admin, 'email', '') and str(admin.email).lower() != 'none':
candidates.append(str(admin.email))
if getattr(admin, 'userName', ''):
candidates.append(str(admin.userName))
except Exception:
pass
for item in candidates:
item = (item or '').strip()
if item:
return item.lower()
return ''
def _install_plugin_compat(plugin_name, zip_path_abs):
"""
Call pluginInstaller.installPlugin with zip_path when supported (newer CyberPanel).
@@ -2456,15 +2492,6 @@ def check_plugin_subscription(request, plugin_name):
try:
if not user_can_manage_plugins(request):
return deny_plugin_manage_json_response(request)
# Check if user is authenticated
if not request.user or not request.user.is_authenticated:
return JsonResponse({
'success': False,
'has_access': False,
'is_paid': False,
'message': 'Please log in to check subscription status',
'patreon_url': None
}, status=401)
# Load plugin metadata
from .plugin_access import (
@@ -2476,7 +2503,15 @@ def check_plugin_subscription(request, plugin_name):
plugin_meta = _load_plugin_meta(plugin_name)
user_email = getattr(request.user, 'email', None) or getattr(request.user, 'username', '')
user_email = _resolve_logged_in_plugin_identity(request)
if not user_email:
return JsonResponse({
'success': False,
'has_access': False,
'is_paid': False,
'message': 'Unable to determine user identity',
'patreon_url': None
}, status=400)
activation_key = ''
if request.method == 'POST':
try:
@@ -2539,8 +2574,6 @@ def store_plugin_activation_key(request, plugin_name):
try:
if not user_can_manage_plugins(request):
return deny_plugin_manage_json_response(request)
if not request.user or not request.user.is_authenticated:
return JsonResponse({'success': False, 'message': 'Authentication required'}, status=401)
try:
payload = json.loads(request.body.decode('utf-8') or '{}')
@@ -2551,7 +2584,7 @@ def store_plugin_activation_key(request, plugin_name):
if not activation_key:
return JsonResponse({'success': False, 'message': 'activation_key is required'}, status=400)
user_email = getattr(request.user, 'email', None) or getattr(request.user, 'username', '')
user_email = _resolve_logged_in_plugin_identity(request)
if not user_email:
return JsonResponse({'success': False, 'message': 'Unable to determine user identity'}, status=400)