mirror of
https://github.com/usmannasir/cyberpanel.git
synced 2026-05-07 06:47:02 +02:00
fix(pluginHolder): resolve CyberPanel admin identity for activation APIs
Use session userID -> Administrator email for subscription checks, activation persistence, and paid-plugin access when Django auth user is not populated.
This commit is contained in:
@@ -133,6 +133,43 @@ def verify_saved_activation_key(plugin_name, user_identity, activation_key):
|
||||
logging.writeToFile('plugin_access.verify_saved_activation_key failed: %s' % str(e))
|
||||
return False
|
||||
|
||||
|
||||
def _resolve_identity_for_request(request):
|
||||
"""
|
||||
CyberPanel often authenticates via session userID (not Django auth user).
|
||||
Prefer Administrator email when available, otherwise username.
|
||||
"""
|
||||
candidates = []
|
||||
try:
|
||||
if getattr(request, 'user', None) and request.user.is_authenticated:
|
||||
u = request.user
|
||||
email = getattr(u, 'email', None) or ''
|
||||
if email:
|
||||
candidates.append(email)
|
||||
uname = getattr(u, 'username', None) or ''
|
||||
if uname:
|
||||
candidates.append(uname)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
uid = request.session.get('userID') if hasattr(request, 'session') else None
|
||||
if uid:
|
||||
from loginSystem.models import Administrator
|
||||
admin = Administrator.objects.filter(pk=uid).only('email', 'userName').first()
|
||||
if admin:
|
||||
if getattr(admin, 'email', '') and str(admin.email).lower() != 'none':
|
||||
candidates.append(str(admin.email))
|
||||
if getattr(admin, 'userName', ''):
|
||||
candidates.append(str(admin.userName))
|
||||
except Exception:
|
||||
pass
|
||||
for item in candidates:
|
||||
item = (item or '').strip()
|
||||
if item:
|
||||
return item.lower()
|
||||
return ''
|
||||
|
||||
|
||||
def check_plugin_access(request, plugin_name, plugin_meta=None):
|
||||
"""
|
||||
Check if user has access to a plugin
|
||||
@@ -166,21 +203,7 @@ def check_plugin_access(request, plugin_name, plugin_meta=None):
|
||||
if not plugin_meta or not plugin_meta.get('is_paid', False):
|
||||
return default_response
|
||||
|
||||
# Plugin is paid - check Patreon membership
|
||||
if not request.user or not request.user.is_authenticated:
|
||||
return {
|
||||
'has_access': False,
|
||||
'is_paid': True,
|
||||
'message': 'Please log in to access this plugin',
|
||||
'patreon_url': plugin_meta.get('patreon_url')
|
||||
}
|
||||
|
||||
# Get user email
|
||||
user_email = getattr(request.user, 'email', None)
|
||||
if not user_email:
|
||||
# Try to get from username or other fields
|
||||
user_email = getattr(request.user, 'username', '')
|
||||
|
||||
user_email = _resolve_identity_for_request(request)
|
||||
if not user_email:
|
||||
return {
|
||||
'has_access': False,
|
||||
|
||||
@@ -49,6 +49,42 @@ PLUGIN_SOURCE_PATHS = ['/home/cyberpanel/plugins', '/home/cyberpanel-plugins']
|
||||
BUILTIN_PLUGINS = frozenset(['emailMarketing', 'emailPremium'])
|
||||
|
||||
|
||||
def _resolve_logged_in_plugin_identity(request):
|
||||
"""
|
||||
CyberPanel often authenticates via session userID (not Django auth user).
|
||||
Use Administrator email when available, otherwise username.
|
||||
"""
|
||||
candidates = []
|
||||
try:
|
||||
if getattr(request, 'user', None) and request.user.is_authenticated:
|
||||
u = request.user
|
||||
email = getattr(u, 'email', None) or ''
|
||||
if email:
|
||||
candidates.append(email)
|
||||
uname = getattr(u, 'username', None) or ''
|
||||
if uname:
|
||||
candidates.append(uname)
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
uid = request.session.get('userID') if hasattr(request, 'session') else None
|
||||
if uid:
|
||||
from loginSystem.models import Administrator
|
||||
admin = Administrator.objects.filter(pk=uid).only('email', 'userName').first()
|
||||
if admin:
|
||||
if getattr(admin, 'email', '') and str(admin.email).lower() != 'none':
|
||||
candidates.append(str(admin.email))
|
||||
if getattr(admin, 'userName', ''):
|
||||
candidates.append(str(admin.userName))
|
||||
except Exception:
|
||||
pass
|
||||
for item in candidates:
|
||||
item = (item or '').strip()
|
||||
if item:
|
||||
return item.lower()
|
||||
return ''
|
||||
|
||||
|
||||
def _install_plugin_compat(plugin_name, zip_path_abs):
|
||||
"""
|
||||
Call pluginInstaller.installPlugin with zip_path when supported (newer CyberPanel).
|
||||
@@ -2456,15 +2492,6 @@ def check_plugin_subscription(request, plugin_name):
|
||||
try:
|
||||
if not user_can_manage_plugins(request):
|
||||
return deny_plugin_manage_json_response(request)
|
||||
# Check if user is authenticated
|
||||
if not request.user or not request.user.is_authenticated:
|
||||
return JsonResponse({
|
||||
'success': False,
|
||||
'has_access': False,
|
||||
'is_paid': False,
|
||||
'message': 'Please log in to check subscription status',
|
||||
'patreon_url': None
|
||||
}, status=401)
|
||||
|
||||
# Load plugin metadata
|
||||
from .plugin_access import (
|
||||
@@ -2476,7 +2503,15 @@ def check_plugin_subscription(request, plugin_name):
|
||||
|
||||
plugin_meta = _load_plugin_meta(plugin_name)
|
||||
|
||||
user_email = getattr(request.user, 'email', None) or getattr(request.user, 'username', '')
|
||||
user_email = _resolve_logged_in_plugin_identity(request)
|
||||
if not user_email:
|
||||
return JsonResponse({
|
||||
'success': False,
|
||||
'has_access': False,
|
||||
'is_paid': False,
|
||||
'message': 'Unable to determine user identity',
|
||||
'patreon_url': None
|
||||
}, status=400)
|
||||
activation_key = ''
|
||||
if request.method == 'POST':
|
||||
try:
|
||||
@@ -2539,8 +2574,6 @@ def store_plugin_activation_key(request, plugin_name):
|
||||
try:
|
||||
if not user_can_manage_plugins(request):
|
||||
return deny_plugin_manage_json_response(request)
|
||||
if not request.user or not request.user.is_authenticated:
|
||||
return JsonResponse({'success': False, 'message': 'Authentication required'}, status=401)
|
||||
|
||||
try:
|
||||
payload = json.loads(request.body.decode('utf-8') or '{}')
|
||||
@@ -2551,7 +2584,7 @@ def store_plugin_activation_key(request, plugin_name):
|
||||
if not activation_key:
|
||||
return JsonResponse({'success': False, 'message': 'activation_key is required'}, status=400)
|
||||
|
||||
user_email = getattr(request.user, 'email', None) or getattr(request.user, 'username', '')
|
||||
user_email = _resolve_logged_in_plugin_identity(request)
|
||||
if not user_email:
|
||||
return JsonResponse({'success': False, 'message': 'Unable to determine user identity'}, status=400)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user