Files
CyberPanel/pluginHolder/views.py

1336 lines
60 KiB
Python
Raw Normal View History

2025-08-01 14:56:30 +05:00
# -*- coding: utf-8 -*-
from django.shortcuts import render, redirect
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
2025-08-01 14:56:30 +05:00
from plogical.mailUtilities import mailUtilities
import os
import subprocess
import shlex
import json
from datetime import datetime, timedelta
2025-08-01 14:56:30 +05:00
from xml.etree import ElementTree
from plogical.httpProc import httpProc
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
import sys
import urllib.request
import urllib.error
import time
sys.path.append('/usr/local/CyberCP')
from pluginInstaller.pluginInstaller import pluginInstaller
from .patreon_verifier import PatreonVerifier
# Plugin state file location
PLUGIN_STATE_DIR = '/home/cyberpanel/plugin_states'
# Plugin store cache configuration
PLUGIN_STORE_CACHE_DIR = '/home/cyberpanel/plugin_store_cache'
PLUGIN_STORE_CACHE_FILE = os.path.join(PLUGIN_STORE_CACHE_DIR, 'plugins_cache.json')
PLUGIN_STORE_CACHE_DURATION = 3600 # Cache for 1 hour (3600 seconds)
GITHUB_REPO_API = 'https://api.github.com/repos/master3395/cyberpanel-plugins/contents'
GITHUB_RAW_BASE = 'https://raw.githubusercontent.com/master3395/cyberpanel-plugins/main'
GITHUB_COMMITS_API = 'https://api.github.com/repos/master3395/cyberpanel-plugins/commits'
def _get_plugin_state_file(plugin_name):
"""Get the path to the plugin state file"""
if not os.path.exists(PLUGIN_STATE_DIR):
os.makedirs(PLUGIN_STATE_DIR, mode=0o755)
return os.path.join(PLUGIN_STATE_DIR, plugin_name + '.state')
def _is_plugin_enabled(plugin_name):
"""Check if a plugin is enabled"""
state_file = _get_plugin_state_file(plugin_name)
if os.path.exists(state_file):
try:
with open(state_file, 'r') as f:
state = f.read().strip()
return state == 'enabled'
except:
return True # Default to enabled if file read fails
return True # Default to enabled if state file doesn't exist
def _set_plugin_state(plugin_name, enabled):
"""Set plugin enabled/disabled state"""
state_file = _get_plugin_state_file(plugin_name)
try:
with open(state_file, 'w') as f:
f.write('enabled' if enabled else 'disabled')
os.chmod(state_file, 0o644)
return True
except Exception as e:
logging.writeToFile(f"Error writing plugin state for {plugin_name}: {str(e)}")
return False
2025-08-01 14:56:30 +05:00
def help_page(request):
"""Display plugin development help page"""
mailUtilities.checkHome()
proc = httpProc(request, 'pluginHolder/help.html', {}, 'admin')
return proc.render()
2025-08-01 14:56:30 +05:00
def installed(request):
mailUtilities.checkHome()
pluginPath = '/home/cyberpanel/plugins'
installedPath = '/usr/local/CyberCP'
2025-08-01 14:56:30 +05:00
pluginList = []
errorPlugins = []
processed_plugins = set() # Track which plugins we've already processed
2025-08-01 14:56:30 +05:00
# First, process plugins from source directory
2025-08-01 14:56:30 +05:00
if os.path.exists(pluginPath):
for plugin in os.listdir(pluginPath):
# Skip files (like .zip files) - only process directories
pluginDir = os.path.join(pluginPath, plugin)
if not os.path.isdir(pluginDir):
continue
2025-08-01 14:56:30 +05:00
data = {}
# Try installed location first, then fallback to source location
completePath = installedPath + '/' + plugin + '/meta.xml'
sourcePath = os.path.join(pluginDir, 'meta.xml')
# Determine which meta.xml to use
metaXmlPath = None
if os.path.exists(completePath):
metaXmlPath = completePath
elif os.path.exists(sourcePath):
# Plugin not installed but has source meta.xml - use it
metaXmlPath = sourcePath
# Add error handling to prevent 500 errors
try:
if metaXmlPath is None:
# No meta.xml found in either location - skip silently
continue
pluginMetaData = ElementTree.parse(metaXmlPath)
root = pluginMetaData.getroot()
# Validate required fields exist (handle both <plugin> and <cyberpanelPluginConfig> formats)
name_elem = root.find('name')
type_elem = root.find('type')
desc_elem = root.find('description')
version_elem = root.find('version')
# Type field is optional (testPlugin doesn't have it)
if name_elem is None or desc_elem is None or version_elem is None:
errorPlugins.append({'name': plugin, 'error': 'Missing required metadata fields (name, description, or version)'})
logging.writeToFile(f"Plugin {plugin}: Missing required metadata fields in meta.xml")
continue
# Check if text is None (empty elements)
if name_elem.text is None or desc_elem.text is None or version_elem.text is None:
errorPlugins.append({'name': plugin, 'error': 'Empty metadata fields'})
logging.writeToFile(f"Plugin {plugin}: Empty metadata fields in meta.xml")
continue
data['name'] = name_elem.text
data['type'] = type_elem.text if type_elem is not None and type_elem.text is not None else 'Plugin'
data['desc'] = desc_elem.text
data['version'] = version_elem.text
data['plugin_dir'] = plugin # Plugin directory name
# Check if plugin is installed (only if it exists in /usr/local/CyberCP/)
# Source directory presence doesn't mean installed - it just means the source files are available
data['installed'] = os.path.exists(completePath)
# Get plugin enabled state (only for installed plugins)
if data['installed']:
data['enabled'] = _is_plugin_enabled(plugin)
else:
data['enabled'] = False
# Initialize is_paid to False by default (will be set later if paid)
data['is_paid'] = False
data['patreon_tier'] = None
data['patreon_url'] = None
# Get modify date from local file (fast, no API calls)
# GitHub commit dates are fetched in the plugin store, not here to avoid timeouts
modify_date = 'N/A'
try:
if os.path.exists(metaXmlPath):
modify_time = os.path.getmtime(metaXmlPath)
modify_date = datetime.fromtimestamp(modify_time).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
modify_date = 'N/A'
data['modify_date'] = modify_date
# Extract settings URL or main URL for "Manage" button
settings_url_elem = root.find('settings_url')
url_elem = root.find('url')
# Priority: settings_url > url > default pattern
# Special handling for core plugins that don't use /plugins/ prefix
if plugin == 'emailMarketing':
# emailMarketing is a core CyberPanel plugin, uses /emailMarketing/ not /plugins/emailMarketing/
data['manage_url'] = '/emailMarketing/'
elif settings_url_elem is not None and settings_url_elem.text:
data['manage_url'] = settings_url_elem.text
elif url_elem is not None and url_elem.text:
data['manage_url'] = url_elem.text
else:
# Default: try /plugins/{plugin_dir}/settings/ or /plugins/{plugin_dir}/
# Only set if plugin is installed (we can't know if the URL exists otherwise)
# Special handling for emailMarketing
if plugin == 'emailMarketing':
data['manage_url'] = '/emailMarketing/'
elif os.path.exists(completePath):
# Check if settings route exists, otherwise use main plugin URL
settings_route = f'/plugins/{plugin}/settings/'
main_route = f'/plugins/{plugin}/'
# Default to main route - most plugins have a main route even if no settings
data['manage_url'] = main_route
else:
data['manage_url'] = None
# Extract author information
author_elem = root.find('author')
if author_elem is not None and author_elem.text:
data['author'] = author_elem.text
else:
data['author'] = 'Unknown'
# Extract paid plugin information
paid_elem = root.find('paid')
patreon_tier_elem = root.find('patreon_tier')
if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true':
data['is_paid'] = True
data['patreon_tier'] = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin'
data['patreon_url'] = root.find('patreon_url').text if root.find('patreon_url') is not None else 'https://www.patreon.com/c/newstargeted/membership'
else:
data['is_paid'] = False
data['patreon_tier'] = None
data['patreon_url'] = None
2025-08-01 14:56:30 +05:00
pluginList.append(data)
processed_plugins.add(plugin) # Mark as processed
except ElementTree.ParseError as e:
errorPlugins.append({'name': plugin, 'error': f'XML parse error: {str(e)}'})
logging.writeToFile(f"Plugin {plugin}: XML parse error - {str(e)}")
# Don't mark as processed if it failed - let installed check handle it
# This ensures plugins that exist in /usr/local/CyberCP/ but have bad source meta.xml still get counted
if not os.path.exists(completePath):
# Only skip if it's not actually installed
continue
# If it exists in installed location, don't mark as processed so it gets checked there
continue
except Exception as e:
errorPlugins.append({'name': plugin, 'error': f'Error loading plugin: {str(e)}'})
logging.writeToFile(f"Plugin {plugin}: Error loading - {str(e)}")
# Don't mark as processed if it failed - let installed check handle it
if not os.path.exists(completePath):
# Only skip if it's not actually installed
continue
# If it exists in installed location, don't mark as processed so it gets checked there
continue
# Also check for installed plugins that don't have source directories
# This handles plugins installed from the store that may not be in /home/cyberpanel/plugins/
if os.path.exists(installedPath):
for plugin in os.listdir(installedPath):
# Skip if already processed
if plugin in processed_plugins:
continue
# Only check directories that look like plugins (have meta.xml)
pluginInstalledDir = os.path.join(installedPath, plugin)
if not os.path.isdir(pluginInstalledDir):
continue
metaXmlPath = os.path.join(pluginInstalledDir, 'meta.xml')
if not os.path.exists(metaXmlPath):
continue
# This is an installed plugin without a source directory - process it
try:
data = {}
pluginMetaData = ElementTree.parse(metaXmlPath)
root = pluginMetaData.getroot()
# Validate required fields
name_elem = root.find('name')
type_elem = root.find('type')
desc_elem = root.find('description')
version_elem = root.find('version')
if name_elem is None or desc_elem is None or version_elem is None:
continue
if name_elem.text is None or desc_elem.text is None or version_elem.text is None:
continue
data['name'] = name_elem.text
data['type'] = type_elem.text if type_elem is not None and type_elem.text is not None else 'Plugin'
data['desc'] = desc_elem.text
data['version'] = version_elem.text
data['plugin_dir'] = plugin
data['installed'] = True # This is an installed plugin
data['enabled'] = _is_plugin_enabled(plugin)
# Initialize is_paid to False by default (will be set later if paid)
data['is_paid'] = False
data['patreon_tier'] = None
data['patreon_url'] = None
# Get modify date from installed location
modify_date = 'N/A'
try:
if os.path.exists(metaXmlPath):
modify_time = os.path.getmtime(metaXmlPath)
modify_date = datetime.fromtimestamp(modify_time).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
modify_date = 'N/A'
data['modify_date'] = modify_date
# Extract settings URL or main URL
settings_url_elem = root.find('settings_url')
url_elem = root.find('url')
# Priority: settings_url > url > default pattern
# Special handling for core plugins that don't use /plugins/ prefix
if plugin == 'emailMarketing':
# emailMarketing is a core CyberPanel plugin, uses /emailMarketing/ not /plugins/emailMarketing/
data['manage_url'] = '/emailMarketing/'
elif settings_url_elem is not None and settings_url_elem.text:
data['manage_url'] = settings_url_elem.text
elif url_elem is not None and url_elem.text:
data['manage_url'] = url_elem.text
else:
# Default to /plugins/{plugin}/ for regular plugins
# Special handling for emailMarketing
if plugin == 'emailMarketing':
data['manage_url'] = '/emailMarketing/'
else:
# Default to main plugin route (most plugins work from main route)
data['manage_url'] = f'/plugins/{plugin}/'
# Extract author information
author_elem = root.find('author')
if author_elem is not None and author_elem.text:
data['author'] = author_elem.text
else:
data['author'] = 'Unknown'
# Extract paid plugin information (is_paid already initialized to False above)
paid_elem = root.find('paid')
patreon_tier_elem = root.find('patreon_tier')
if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true':
data['is_paid'] = True
data['patreon_tier'] = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin'
patreon_url_elem = root.find('patreon_url')
data['patreon_url'] = patreon_url_elem.text if patreon_url_elem is not None and patreon_url_elem.text else 'https://www.patreon.com/membership/27789984'
# else: is_paid already False from initialization above
pluginList.append(data)
except ElementTree.ParseError as e:
errorPlugins.append({'name': plugin, 'error': f'XML parse error: {str(e)}'})
logging.writeToFile(f"Installed plugin {plugin}: XML parse error - {str(e)}")
continue
except Exception as e:
errorPlugins.append({'name': plugin, 'error': f'Error loading installed plugin: {str(e)}'})
logging.writeToFile(f"Installed plugin {plugin}: Error loading - {str(e)}")
continue
2025-08-01 14:56:30 +05:00
# Calculate installed and active counts
# Double-check by also counting plugins that actually exist in /usr/local/CyberCP/
installed_plugins_in_filesystem = set()
if os.path.exists(installedPath):
for plugin in os.listdir(installedPath):
pluginInstalledDir = os.path.join(installedPath, plugin)
if os.path.isdir(pluginInstalledDir):
metaXmlPath = os.path.join(pluginInstalledDir, 'meta.xml')
if os.path.exists(metaXmlPath):
installed_plugins_in_filesystem.add(plugin)
# Count installed plugins from the list
installed_count = len([p for p in pluginList if p.get('installed', False)])
active_count = len([p for p in pluginList if p.get('installed', False) and p.get('enabled', False)])
# If there's a discrepancy, use the filesystem count as the source of truth
filesystem_installed_count = len(installed_plugins_in_filesystem)
if filesystem_installed_count != installed_count:
logging.writeToFile(f"WARNING: Plugin count mismatch! List says {installed_count}, filesystem has {filesystem_installed_count}")
logging.writeToFile(f"Plugins in filesystem: {sorted(installed_plugins_in_filesystem)}")
logging.writeToFile(f"Plugins in list with installed=True: {[p.get('plugin_dir') for p in pluginList if p.get('installed', False)]}")
# Use filesystem count as source of truth
installed_count = filesystem_installed_count
# Debug logging to help identify discrepancies
logging.writeToFile(f"Plugin count: Total={len(pluginList)}, Installed={installed_count}, Active={active_count}")
for p in pluginList:
logging.writeToFile(f" - {p.get('plugin_dir')}: installed={p.get('installed')}, enabled={p.get('enabled')}")
proc = httpProc(request, 'pluginHolder/plugins.html',
{'plugins': pluginList, 'error_plugins': errorPlugins,
'installed_count': installed_count, 'active_count': active_count}, 'admin')
return proc.render()
@csrf_exempt
@require_http_methods(["POST"])
def install_plugin(request, plugin_name):
"""Install a plugin"""
try:
# Check if plugin source exists
pluginSource = '/home/cyberpanel/plugins/' + plugin_name
if not os.path.exists(pluginSource):
return JsonResponse({
'success': False,
'error': f'Plugin source not found: {plugin_name}'
}, status=404)
# Check if already installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if os.path.exists(pluginInstalled):
return JsonResponse({
'success': False,
'error': f'Plugin already installed: {plugin_name}'
}, status=400)
# Create zip file for installation (pluginInstaller expects a zip)
import tempfile
import shutil
import zipfile
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(temp_dir, plugin_name + '.zip')
# Create zip from source directory with correct structure
# The ZIP must contain plugin_name/ directory structure for proper extraction
plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED)
# Walk through source directory and add files with plugin_name prefix
for root, dirs, files in os.walk(pluginSource):
for file in files:
file_path = os.path.join(root, file)
# Calculate relative path from plugin source
arcname = os.path.relpath(file_path, pluginSource)
# Add plugin_name prefix to maintain directory structure
arcname = os.path.join(plugin_name, arcname)
plugin_zip.write(file_path, arcname)
plugin_zip.close()
# Verify zip file was created
if not os.path.exists(zip_path):
shutil.rmtree(temp_dir, ignore_errors=True)
return JsonResponse({
'success': False,
'error': f'Failed to create zip file for {plugin_name}'
}, status=500)
# Copy zip to current directory (pluginInstaller expects it in cwd)
original_cwd = os.getcwd()
os.chdir(temp_dir)
try:
# Verify zip file exists in current directory
zip_file = plugin_name + '.zip'
if not os.path.exists(zip_file):
raise Exception(f'Zip file {zip_file} not found in temp directory')
# Install using pluginInstaller
try:
pluginInstaller.installPlugin(plugin_name)
except Exception as install_error:
# Log the full error for debugging
error_msg = str(install_error)
logging.writeToFile(f"pluginInstaller.installPlugin raised exception: {error_msg}")
# Check if plugin directory exists despite the error
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if os.path.exists(pluginInstalled):
logging.writeToFile(f"Plugin directory exists despite error, continuing...")
else:
raise Exception(f'Plugin installation failed: {error_msg}')
# Wait a moment for file system to sync
import time
time.sleep(2)
# Verify plugin was actually installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if not os.path.exists(pluginInstalled):
# Check if files were extracted to root instead
root_files = ['README.md', 'apps.py', 'meta.xml', 'urls.py', 'views.py']
found_root_files = [f for f in root_files if os.path.exists(os.path.join('/usr/local/CyberCP', f))]
if found_root_files:
raise Exception(f'Plugin installation failed: Files extracted to wrong location. Found {found_root_files} in /usr/local/CyberCP/ root instead of {pluginInstalled}/')
raise Exception(f'Plugin installation failed: {pluginInstalled} does not exist after installation')
# Set plugin to enabled by default after installation
_set_plugin_state(plugin_name, True)
return JsonResponse({
'success': True,
'message': f'Plugin {plugin_name} installed successfully'
})
finally:
os.chdir(original_cwd)
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logging.writeToFile(f"Error installing plugin {plugin_name}: {str(e)}")
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def uninstall_plugin(request, plugin_name):
"""Uninstall a plugin - but keep source files and settings"""
try:
# Check if plugin is installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if not os.path.exists(pluginInstalled):
return JsonResponse({
'success': False,
'error': f'Plugin not installed: {plugin_name}'
}, status=404)
# Custom uninstall that keeps source files
# We need to remove from settings.py, urls.py, and remove installed directory
# but NOT remove from /home/cyberpanel/plugins/
# Remove from settings.py
pluginInstaller.removeFromSettings(plugin_name)
# Remove from URLs
pluginInstaller.removeFromURLs(plugin_name)
# Remove interface link
pluginInstaller.removeInterfaceLink(plugin_name)
# Remove migrations if enabled
if pluginInstaller.migrationsEnabled(plugin_name):
pluginInstaller.removeMigrations(plugin_name)
# Remove installed directory (but keep source in /home/cyberpanel/plugins/)
pluginInstaller.removeFiles(plugin_name)
# DON'T call informCyberPanelRemoval - we want to keep the source directory
# so users can reinstall the plugin later
# Restart service
pluginInstaller.restartGunicorn()
# Keep state file - we want to remember if it was enabled/disabled
# So user can reinstall and have same state
return JsonResponse({
'success': True,
'message': f'Plugin {plugin_name} uninstalled successfully (source files and settings preserved)'
})
except Exception as e:
logging.writeToFile(f"Error uninstalling plugin {plugin_name}: {str(e)}")
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def enable_plugin(request, plugin_name):
"""Enable a plugin"""
try:
# Check if plugin is installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if not os.path.exists(pluginInstalled):
return JsonResponse({
'success': False,
'error': f'Plugin not installed: {plugin_name}'
}, status=404)
# Set plugin state to enabled
if _set_plugin_state(plugin_name, True):
return JsonResponse({
'success': True,
'message': f'Plugin {plugin_name} enabled successfully'
})
else:
return JsonResponse({
'success': False,
'error': 'Failed to update plugin state'
}, status=500)
except Exception as e:
logging.writeToFile(f"Error enabling plugin {plugin_name}: {str(e)}")
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def disable_plugin(request, plugin_name):
"""Disable a plugin"""
try:
# Check if plugin is installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if not os.path.exists(pluginInstalled):
return JsonResponse({
'success': False,
'error': f'Plugin not installed: {plugin_name}'
}, status=404)
# Set plugin state to disabled
if _set_plugin_state(plugin_name, False):
return JsonResponse({
'success': True,
'message': f'Plugin {plugin_name} disabled successfully'
})
else:
return JsonResponse({
'success': False,
'error': 'Failed to update plugin state'
}, status=500)
except Exception as e:
logging.writeToFile(f"Error disabling plugin {plugin_name}: {str(e)}")
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
def _ensure_cache_dir():
"""Ensure cache directory exists"""
try:
if not os.path.exists(PLUGIN_STORE_CACHE_DIR):
os.makedirs(PLUGIN_STORE_CACHE_DIR, mode=0o755)
except Exception as e:
logging.writeToFile(f"Error creating cache directory: {str(e)}")
def _get_cached_plugins(allow_expired=False):
"""Get plugins from cache if available and not expired
Args:
allow_expired: If True, return cache even if expired (for fallback)
"""
try:
if not os.path.exists(PLUGIN_STORE_CACHE_FILE):
return None
# Check if cache is expired
cache_mtime = os.path.getmtime(PLUGIN_STORE_CACHE_FILE)
cache_age = time.time() - cache_mtime
if cache_age > PLUGIN_STORE_CACHE_DURATION:
if not allow_expired:
logging.writeToFile(f"Plugin store cache expired (age: {cache_age:.0f}s)")
return None
else:
logging.writeToFile(f"Using expired cache as fallback (age: {cache_age:.0f}s)")
# Read cache file
with open(PLUGIN_STORE_CACHE_FILE, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
if not allow_expired or cache_age <= PLUGIN_STORE_CACHE_DURATION:
logging.writeToFile(f"Using cached plugin store data (age: {cache_age:.0f}s)")
return cache_data.get('plugins', [])
except Exception as e:
logging.writeToFile(f"Error reading plugin store cache: {str(e)}")
return None
def _save_plugins_cache(plugins):
"""Save plugins to cache"""
try:
_ensure_cache_dir()
cache_data = {
'plugins': plugins,
'cached_at': datetime.now().isoformat(),
'cache_duration': PLUGIN_STORE_CACHE_DURATION
}
with open(PLUGIN_STORE_CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
logging.writeToFile("Plugin store cache saved successfully")
except Exception as e:
logging.writeToFile(f"Error saving plugin store cache: {str(e)}")
def _enrich_store_plugins(plugins):
"""Enrich store plugins with installed/enabled status from local system"""
enriched = []
plugin_source_dir = '/home/cyberpanel/plugins'
plugin_install_dir = '/usr/local/CyberCP'
for plugin in plugins:
plugin_dir = plugin.get('plugin_dir', '')
if not plugin_dir:
continue
# Check if plugin is installed locally
# Plugin is only considered "installed" if it exists in /usr/local/CyberCP/
# Source directory presence doesn't mean installed - it just means the source files are available
installed_path = os.path.join(plugin_install_dir, plugin_dir)
plugin['installed'] = os.path.exists(installed_path)
# Check if plugin is enabled (only if installed)
if plugin['installed']:
plugin['enabled'] = _is_plugin_enabled(plugin_dir)
else:
plugin['enabled'] = False
# Ensure is_paid field exists and is properly set (default to False if not set or invalid)
# Handle all possible cases: missing, None, empty string, string values, boolean
is_paid_value = plugin.get('is_paid', False)
# Normalize is_paid to boolean
if is_paid_value is None or is_paid_value == '' or is_paid_value == 'false' or is_paid_value == 'False' or is_paid_value == '0':
plugin['is_paid'] = False
elif is_paid_value is True or is_paid_value == 'true' or is_paid_value == 'True' or is_paid_value == '1' or str(is_paid_value).lower() == 'true':
plugin['is_paid'] = True
elif 'is_paid' not in plugin or plugin.get('is_paid') is None:
# Try to check from local meta.xml if available
meta_path = None
if os.path.exists(installed_path):
meta_path = os.path.join(installed_path, 'meta.xml')
elif os.path.exists(source_path):
meta_path = os.path.join(source_path, 'meta.xml')
if meta_path and os.path.exists(meta_path):
try:
pluginMetaData = ElementTree.parse(meta_path)
root = pluginMetaData.getroot()
paid_elem = root.find('paid')
if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true':
plugin['is_paid'] = True
else:
plugin['is_paid'] = False
except:
plugin['is_paid'] = False
else:
plugin['is_paid'] = False # Default to free if we can't determine
else:
# Already set, but ensure it's boolean
plugin['is_paid'] = bool(plugin['is_paid']) if plugin['is_paid'] not in [True, False] else plugin['is_paid']
enriched.append(plugin)
return enriched
def _fetch_plugins_from_github():
"""Fetch plugins from GitHub repository"""
plugins = []
try:
# Fetch repository contents
req = urllib.request.Request(
GITHUB_REPO_API,
headers={
'User-Agent': 'CyberPanel-Plugin-Store/1.0',
'Accept': 'application/vnd.github.v3+json'
}
)
with urllib.request.urlopen(req, timeout=10) as response:
contents = json.loads(response.read().decode('utf-8'))
# Filter for directories (plugins)
plugin_dirs = [item for item in contents if item.get('type') == 'dir' and not item.get('name', '').startswith('.')]
for plugin_dir in plugin_dirs:
plugin_name = plugin_dir.get('name', '')
if not plugin_name:
continue
try:
# Fetch meta.xml from raw GitHub
meta_xml_url = f"{GITHUB_RAW_BASE}/{plugin_name}/meta.xml"
meta_req = urllib.request.Request(
meta_xml_url,
headers={'User-Agent': 'CyberPanel-Plugin-Store/1.0'}
)
with urllib.request.urlopen(meta_req, timeout=10) as meta_response:
meta_xml_content = meta_response.read().decode('utf-8')
# Parse meta.xml
root = ElementTree.fromstring(meta_xml_content)
# Fetch last commit date for this plugin from GitHub
modify_date = 'N/A'
try:
commits_url = f"{GITHUB_COMMITS_API}?path={plugin_name}&per_page=1"
commits_req = urllib.request.Request(
commits_url,
headers={
'User-Agent': 'CyberPanel-Plugin-Store/1.0',
'Accept': 'application/vnd.github.v3+json'
}
)
with urllib.request.urlopen(commits_req, timeout=10) as commits_response:
commits_data = json.loads(commits_response.read().decode('utf-8'))
if commits_data and len(commits_data) > 0:
commit_date = commits_data[0].get('commit', {}).get('author', {}).get('date', '')
if commit_date:
# Parse ISO 8601 date and format it
try:
from datetime import datetime
dt = datetime.fromisoformat(commit_date.replace('Z', '+00:00'))
modify_date = dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
modify_date = commit_date[:19].replace('T', ' ') # Fallback formatting
except Exception as e:
logging.writeToFile(f"Could not fetch commit date for {plugin_name}: {str(e)}")
modify_date = 'N/A'
# Extract paid plugin information
paid_elem = root.find('paid')
patreon_tier_elem = root.find('patreon_tier')
is_paid = False
patreon_tier = None
patreon_url = None
if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true':
is_paid = True
patreon_tier = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin'
patreon_url_elem = root.find('patreon_url')
patreon_url = patreon_url_elem.text if patreon_url_elem is not None else 'https://www.patreon.com/c/newstargeted/membership'
plugin_data = {
'plugin_dir': plugin_name,
'name': root.find('name').text if root.find('name') is not None else plugin_name,
'type': root.find('type').text if root.find('type') is not None else 'Plugin',
'description': root.find('description').text if root.find('description') is not None else '',
'version': root.find('version').text if root.find('version') is not None else '1.0.0',
'url': root.find('url').text if root.find('url') is not None else f'/plugins/{plugin_name}/',
'settings_url': root.find('settings_url').text if root.find('settings_url') is not None else f'/plugins/{plugin_name}/settings/',
'author': root.find('author').text if root.find('author') is not None else 'Unknown',
'github_url': f'https://github.com/master3395/cyberpanel-plugins/tree/main/{plugin_name}',
'about_url': f'https://github.com/master3395/cyberpanel-plugins/tree/main/{plugin_name}',
'modify_date': modify_date,
'is_paid': is_paid,
'patreon_tier': patreon_tier,
'patreon_url': patreon_url
}
plugins.append(plugin_data)
logging.writeToFile(f"Fetched plugin: {plugin_name} (last modified: {modify_date})")
except urllib.error.HTTPError as e:
if e.code == 403:
# Rate limit hit - log and break
logging.writeToFile(f"GitHub API rate limit exceeded (403) for plugin {plugin_name}")
raise # Re-raise to be caught by outer handler
elif e.code == 404:
# meta.xml not found, skip this plugin
logging.writeToFile(f"meta.xml not found for plugin {plugin_name}, skipping")
continue
else:
logging.writeToFile(f"HTTP error {e.code} fetching {plugin_name}: {str(e)}")
continue
except Exception as e:
logging.writeToFile(f"Error processing plugin {plugin_name}: {str(e)}")
continue
return plugins
except urllib.error.HTTPError as e:
if e.code == 403:
error_msg = "GitHub API rate limit exceeded. Using cached data if available."
logging.writeToFile(f"GitHub API 403 error: {error_msg}")
raise Exception(error_msg)
else:
error_msg = f"GitHub API error {e.code}: {str(e)}"
logging.writeToFile(error_msg)
raise Exception(error_msg)
except urllib.error.URLError as e:
error_msg = f"Network error fetching plugins: {str(e)}"
logging.writeToFile(error_msg)
raise Exception(error_msg)
except Exception as e:
error_msg = f"Error fetching plugins from GitHub: {str(e)}"
logging.writeToFile(error_msg)
raise Exception(error_msg)
@csrf_exempt
@require_http_methods(["GET"])
def fetch_plugin_store(request):
"""Fetch plugins from the plugin store with caching"""
mailUtilities.checkHome()
# Try to get from cache first
cached_plugins = _get_cached_plugins()
if cached_plugins is not None:
# Enrich cached plugins with installed/enabled status
enriched_plugins = _enrich_store_plugins(cached_plugins)
return JsonResponse({
'success': True,
'plugins': enriched_plugins,
'cached': True
})
# Cache miss or expired - fetch from GitHub
try:
plugins = _fetch_plugins_from_github()
# Enrich plugins with installed/enabled status
enriched_plugins = _enrich_store_plugins(plugins)
# Save to cache (save original, not enriched, to keep cache clean)
if plugins:
_save_plugins_cache(plugins)
return JsonResponse({
'success': True,
'plugins': enriched_plugins,
'cached': False
})
except Exception as e:
error_message = str(e)
# If rate limited, try to use stale cache as fallback
if '403' in error_message or 'rate limit' in error_message.lower():
stale_cache = _get_cached_plugins(allow_expired=True) # Get cache even if expired
if stale_cache is not None:
logging.writeToFile("Using stale cache due to rate limit")
enriched_plugins = _enrich_store_plugins(stale_cache)
return JsonResponse({
'success': True,
'plugins': enriched_plugins,
'cached': True,
'warning': 'Using cached data due to GitHub rate limit. Data may be outdated.'
})
# No cache available, return error
return JsonResponse({
'success': False,
'error': error_message,
'plugins': []
}, status=500)
@csrf_exempt
@require_http_methods(["POST"])
def install_from_store(request, plugin_name):
"""Install plugin from GitHub store, with fallback to local source"""
mailUtilities.checkHome()
try:
# Check if already installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if os.path.exists(pluginInstalled):
return JsonResponse({
'success': False,
'error': f'Plugin already installed: {plugin_name}'
}, status=400)
# Download plugin from GitHub
import tempfile
import shutil
import zipfile
import io
logging.writeToFile(f"Starting installation of {plugin_name} from GitHub store")
# Create temporary directory
temp_dir = tempfile.mkdtemp()
zip_path = os.path.join(temp_dir, plugin_name + '.zip')
try:
# Try to download from GitHub first
use_local_fallback = False
try:
# Download repository as ZIP
repo_zip_url = 'https://github.com/master3395/cyberpanel-plugins/archive/refs/heads/main.zip'
logging.writeToFile(f"Downloading plugin from: {repo_zip_url}")
repo_req = urllib.request.Request(
repo_zip_url,
headers={
'User-Agent': 'CyberPanel-Plugin-Store/1.0',
'Accept': 'application/zip'
}
)
with urllib.request.urlopen(repo_req, timeout=30) as repo_response:
repo_zip_data = repo_response.read()
# Extract plugin directory from repository ZIP
repo_zip = zipfile.ZipFile(io.BytesIO(repo_zip_data))
# Find plugin directory in ZIP
plugin_prefix = f'cyberpanel-plugins-main/{plugin_name}/'
plugin_files = [f for f in repo_zip.namelist() if f.startswith(plugin_prefix)]
if not plugin_files:
logging.writeToFile(f"Plugin {plugin_name} not found in GitHub repository, trying local source")
use_local_fallback = True
else:
logging.writeToFile(f"Found {len(plugin_files)} files for plugin {plugin_name} in GitHub")
# Create plugin ZIP file from GitHub with correct structure
# The ZIP must contain plugin_name/ directory structure for proper extraction
plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED)
for file_path in plugin_files:
# Remove the repository root prefix
relative_path = file_path[len(plugin_prefix):]
if relative_path: # Skip directories
file_data = repo_zip.read(file_path)
# Add plugin_name prefix to maintain directory structure
arcname = os.path.join(plugin_name, relative_path)
plugin_zip.writestr(arcname, file_data)
plugin_zip.close()
repo_zip.close()
except Exception as github_error:
logging.writeToFile(f"GitHub download failed for {plugin_name}: {str(github_error)}, trying local source")
use_local_fallback = True
# Fallback to local source if GitHub download failed
if use_local_fallback:
pluginSource = '/home/cyberpanel/plugins/' + plugin_name
if not os.path.exists(pluginSource):
raise Exception(f'Plugin {plugin_name} not found in GitHub repository and local source not found at {pluginSource}')
logging.writeToFile(f"Using local source for {plugin_name} from {pluginSource}")
# Create zip from local source directory with correct structure
# The ZIP must contain plugin_name/ directory structure for proper extraction
import zipfile
plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED)
# Walk through source directory and add files with plugin_name prefix
for root, dirs, files in os.walk(pluginSource):
for file in files:
file_path = os.path.join(root, file)
# Calculate relative path from plugin source
arcname = os.path.relpath(file_path, pluginSource)
# Add plugin_name prefix to maintain directory structure
arcname = os.path.join(plugin_name, arcname)
plugin_zip.write(file_path, arcname)
plugin_zip.close()
# Verify ZIP was created
if not os.path.exists(zip_path):
raise Exception(f'Failed to create plugin ZIP file')
logging.writeToFile(f"Created plugin ZIP: {zip_path}")
# Copy ZIP to current directory (pluginInstaller expects it in cwd)
original_cwd = os.getcwd()
os.chdir(temp_dir)
try:
# Verify zip file exists in current directory
zip_file = plugin_name + '.zip'
if not os.path.exists(zip_file):
raise Exception(f'Zip file {zip_file} not found in temp directory')
logging.writeToFile(f"Installing plugin using pluginInstaller")
# Install using pluginInstaller (direct call, not via command line)
try:
pluginInstaller.installPlugin(plugin_name)
except Exception as install_error:
# Log the full error for debugging
error_msg = str(install_error)
logging.writeToFile(f"pluginInstaller.installPlugin raised exception: {error_msg}")
# Check if plugin directory exists despite the error
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if os.path.exists(pluginInstalled):
logging.writeToFile(f"Plugin directory exists despite error, continuing...")
else:
raise Exception(f'Plugin installation failed: {error_msg}')
# Wait a moment for file system to sync and service to restart
import time
time.sleep(3) # Increased wait time for file system sync
# Verify plugin was actually installed
pluginInstalled = '/usr/local/CyberCP/' + plugin_name
if not os.path.exists(pluginInstalled):
# Check if files were extracted to root instead
root_files = ['README.md', 'apps.py', 'meta.xml', 'urls.py', 'views.py']
found_root_files = [f for f in root_files if os.path.exists(os.path.join('/usr/local/CyberCP', f))]
if found_root_files:
raise Exception(f'Plugin installation failed: Files extracted to wrong location. Found {found_root_files} in /usr/local/CyberCP/ root instead of {pluginInstalled}/')
raise Exception(f'Plugin installation failed: {pluginInstalled} does not exist after installation')
logging.writeToFile(f"Plugin {plugin_name} installed successfully")
# Set plugin to enabled by default after installation
_set_plugin_state(plugin_name, True)
return JsonResponse({
'success': True,
'message': f'Plugin {plugin_name} installed successfully from store'
})
finally:
os.chdir(original_cwd)
finally:
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
except urllib.error.HTTPError as e:
error_msg = f'Failed to download plugin from GitHub: HTTP {e.code}'
if e.code == 404:
error_msg = f'Plugin {plugin_name} not found in GitHub repository'
logging.writeToFile(f"Error installing {plugin_name}: {error_msg}")
return JsonResponse({
'success': False,
'error': error_msg
}, status=500)
except Exception as e:
logging.writeToFile(f"Error installing plugin {plugin_name}: {str(e)}")
import traceback
error_details = traceback.format_exc()
logging.writeToFile(f"Traceback: {error_details}")
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
def plugin_help(request, plugin_name):
"""Plugin-specific help page - shows plugin information, version history, and help content"""
mailUtilities.checkHome()
# Paths for the plugin
plugin_path = '/usr/local/CyberCP/' + plugin_name
meta_xml_path = os.path.join(plugin_path, 'meta.xml')
# Check if plugin exists
if not os.path.exists(plugin_path) or not os.path.exists(meta_xml_path):
proc = httpProc(request, 'pluginHolder/plugin_not_found.html', {
'plugin_name': plugin_name
}, 'admin')
return proc.render()
# Parse meta.xml
try:
plugin_meta = ElementTree.parse(meta_xml_path)
root = plugin_meta.getroot()
# Extract plugin information
plugin_display_name = root.find('name').text if root.find('name') is not None else plugin_name
plugin_description = root.find('description').text if root.find('description') is not None else ''
plugin_version = root.find('version').text if root.find('version') is not None else 'Unknown'
plugin_author = root.find('author').text if root.find('author') is not None else 'Unknown'
plugin_type = root.find('type').text if root.find('type') is not None else 'Plugin'
# Check if plugin is installed
installed = os.path.exists(plugin_path)
except Exception as e:
logging.writeToFile(f"Error parsing meta.xml for {plugin_name}: {str(e)}")
proc = httpProc(request, 'pluginHolder/plugin_not_found.html', {
'plugin_name': plugin_name
}, 'admin')
return proc.render()
# Look for help content files (README.md, CHANGELOG.md, HELP.md, etc.)
help_content = ''
changelog_content = ''
# Check for README.md or HELP.md
help_files = ['HELP.md', 'README.md', 'docs/HELP.md', 'docs/README.md']
help_file_path = None
for help_file in help_files:
potential_path = os.path.join(plugin_path, help_file)
if os.path.exists(potential_path):
help_file_path = potential_path
break
if help_file_path:
try:
with open(help_file_path, 'r', encoding='utf-8') as f:
help_content = f.read()
except Exception as e:
logging.writeToFile(f"Error reading help file for {plugin_name}: {str(e)}")
help_content = ''
# Check for CHANGELOG.md
changelog_paths = ['CHANGELOG.md', 'changelog.md', 'CHANGELOG.txt', 'docs/CHANGELOG.md']
for changelog_file in changelog_paths:
potential_path = os.path.join(plugin_path, changelog_file)
if os.path.exists(potential_path):
try:
with open(potential_path, 'r', encoding='utf-8') as f:
changelog_content = f.read()
break
except Exception as e:
logging.writeToFile(f"Error reading changelog for {plugin_name}: {str(e)}")
# If no local changelog, try fetching from GitHub (non-blocking)
if not changelog_content:
try:
github_changelog_url = f'{GITHUB_RAW_BASE}/{plugin_name}/CHANGELOG.md'
try:
with urllib.request.urlopen(github_changelog_url, timeout=3) as response:
if response.getcode() == 200:
changelog_content = response.read().decode('utf-8')
logging.writeToFile(f"Fetched CHANGELOG.md from GitHub for {plugin_name}")
except (urllib.error.HTTPError, urllib.error.URLError, Exception):
# Silently fail - GitHub fetch is optional
pass
except Exception:
# Silently fail - GitHub fetch is optional
pass
# If no help content and no local README, try fetching README.md from GitHub
if not help_content:
try:
github_readme_url = f'{GITHUB_RAW_BASE}/{plugin_name}/README.md'
try:
with urllib.request.urlopen(github_readme_url, timeout=3) as response:
if response.getcode() == 200:
help_content = response.read().decode('utf-8')
logging.writeToFile(f"Fetched README.md from GitHub for {plugin_name}")
except (urllib.error.HTTPError, urllib.error.URLError, Exception):
# Silently fail - GitHub fetch is optional
pass
except Exception:
# Silently fail - GitHub fetch is optional
pass
# If no help content found, create default content from meta.xml
if not help_content:
help_content = f"""
<h2>Plugin Information</h2>
<p><strong>Name:</strong> {plugin_display_name}</p>
<p><strong>Type:</strong> {plugin_type}</p>
<p><strong>Version:</strong> {plugin_version}</p>
<p><strong>Author:</strong> {plugin_author}</p>
<h2>Description</h2>
<p>{plugin_description}</p>
<h2>Usage</h2>
<p>For detailed information about this plugin, please visit the GitHub repository or check the plugin's documentation.</p>
"""
else:
# Convert markdown to HTML (basic conversion)
import re
# Convert linked images first (badges): [![alt](img_url)](link_url)
help_content = re.sub(
r'\[!\[([^\]]*)\]\(([^\)]+)\)\]\(([^\)]+)\)',
r'<a href="\3" target="_blank" rel="noopener noreferrer"><img src="\2" alt="\1" style="display:inline-block;margin:0 4px;vertical-align:middle;"></a>',
help_content
)
# Convert regular images: ![alt](img_url)
help_content = re.sub(
r'!\[([^\]]*)\]\(([^\)]+)\)',
r'<img src="\2" alt="\1" style="display:inline-block;margin:4px 0;max-width:100%;">',
help_content
)
# Convert regular links: [text](url)
help_content = re.sub(
r'\[([^\]]+)\]\(([^\)]+)\)',
r'<a href="\2" target="_blank" rel="noopener noreferrer">\1</a>',
help_content
)
# Convert headings
help_content = re.sub(r'^### (.*?)$', r'<h3>\1</h3>', help_content, flags=re.MULTILINE)
help_content = re.sub(r'^## (.*?)$', r'<h2>\1</h2>', help_content, flags=re.MULTILINE)
help_content = re.sub(r'^# (.*?)$', r'<h1>\1</h1>', help_content, flags=re.MULTILINE)
# Convert formatting
help_content = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', help_content)
help_content = re.sub(r'\*(.*?)\*', r'<em>\1</em>', help_content)
help_content = re.sub(r'`([^`]+)`', r'<code>\1</code>', help_content)
# Convert lists
help_content = re.sub(r'^\- (.*?)$', r'<li>\1</li>', help_content, flags=re.MULTILINE)
help_content = re.sub(r'^(\d+)\. (.*?)$', r'<li>\2</li>', help_content, flags=re.MULTILINE)
# Wrap paragraphs (but preserve HTML tags and images)
lines = help_content.split('\n')
processed_lines = []
for line in lines:
line = line.strip()
if line and not line.startswith('<') and not line.startswith('http') and not '<img' in line and not '<a' in line:
processed_lines.append(f'<p>{line}</p>')
elif line:
processed_lines.append(line)
help_content = '\n'.join(processed_lines)
# Add changelog if available
if changelog_content:
# Convert changelog markdown to HTML
import re
changelog_html = changelog_content
changelog_html = re.sub(r'^## (.*?)$', r'<h3>\1</h3>', changelog_html, flags=re.MULTILINE)
changelog_html = re.sub(r'^### (.*?)$', r'<h4>\1</h4>', changelog_html, flags=re.MULTILINE)
changelog_html = re.sub(r'^\- (.*?)$', r'<li>\1</li>', changelog_html, flags=re.MULTILINE)
changelog_html = re.sub(r'\*\*(.*?)\*\*', r'<strong>\1</strong>', changelog_html)
# Wrap in pre for code-like formatting
changelog_html = f'<div class="changelog-content"><h2>Version History</h2><pre>{changelog_html}</pre></div>'
help_content += changelog_html
# Context for template
context = {
'plugin_name': plugin_display_name,
'plugin_name_dir': plugin_name,
'plugin_description': plugin_description,
'plugin_version': plugin_version,
'plugin_author': plugin_author,
'plugin_type': plugin_type,
'installed': installed,
'help_content': help_content,
}
proc = httpProc(request, 'pluginHolder/plugin_help.html', context, 'admin')
return proc.render()
@csrf_exempt
@require_http_methods(["GET"])
def check_plugin_subscription(request, plugin_name):
"""
API endpoint to check if user has Patreon subscription for a paid plugin
Args:
request: Django request object
plugin_name: Name of the plugin to check
Returns:
JsonResponse: {
'has_access': bool,
'is_paid': bool,
'message': str,
'patreon_url': str or None
}
"""
try:
# Check if user is authenticated
if not request.user or not request.user.is_authenticated:
return JsonResponse({
'success': False,
'has_access': False,
'is_paid': False,
'message': 'Please log in to check subscription status',
'patreon_url': None
}, status=401)
# Load plugin metadata
from .plugin_access import check_plugin_access, _load_plugin_meta
plugin_meta = _load_plugin_meta(plugin_name)
# Check access
access_result = check_plugin_access(request, plugin_name, plugin_meta)
return JsonResponse({
'success': True,
'has_access': access_result['has_access'],
'is_paid': access_result['is_paid'],
'message': access_result['message'],
'patreon_url': access_result.get('patreon_url')
})
except Exception as e:
logging.writeToFile(f"Error checking subscription for {plugin_name}: {str(e)}")
return JsonResponse({
'success': False,
'has_access': False,
'is_paid': False,
'message': f'Error checking subscription: {str(e)}',
'patreon_url': None
}, status=500)