# -*- coding: utf-8 -*- from django.shortcuts import render, redirect from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from plogical.mailUtilities import mailUtilities import os import shutil import subprocess import shlex import json from datetime import datetime, timedelta from xml.etree import ElementTree from plogical.httpProc import httpProc from plogical.plugin_acl import user_can_manage_plugins, deny_plugin_manage_json_response from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging import sys import urllib.request import urllib.error import time import inspect sys.path.append('/usr/local/CyberCP') from pluginInstaller.pluginInstaller import pluginInstaller from .patreon_verifier import PatreonVerifier # Plugin state file location PLUGIN_STATE_DIR = '/home/cyberpanel/plugin_states' # Plugin store cache configuration PLUGIN_STORE_CACHE_DIR = '/home/cyberpanel/plugin_store_cache' PLUGIN_STORE_CACHE_FILE = os.path.join(PLUGIN_STORE_CACHE_DIR, 'plugins_cache.json') PLUGIN_STORE_CACHE_DURATION = 3600 # Base cache duration: 1 hour (3600 seconds) PLUGIN_STORE_CACHE_RANDOM_OFFSET = 600 # Random offset: ±10 minutes (600 seconds) to prevent simultaneous requests GITHUB_REPO_API = 'https://api.github.com/repos/master3395/cyberpanel-plugins/contents' GITHUB_RAW_BASE = 'https://raw.githubusercontent.com/master3395/cyberpanel-plugins/main' GITHUB_COMMITS_API = 'https://api.github.com/repos/master3395/cyberpanel-plugins/commits' # Plugin backup configuration PLUGIN_BACKUP_DIR = '/home/cyberpanel/plugin_backups' # Plugin source paths (checked in order; first match wins for install) PLUGIN_SOURCE_PATHS = ['/home/cyberpanel/plugins', '/home/cyberpanel-plugins'] # Builtin/core plugins that are part of CyberPanel (not user-installable plugins) # These plugins show "Built-in" badge and only Settings button (no Deactivate/Uninstall) BUILTIN_PLUGINS = frozenset(['emailMarketing', 'emailPremium']) def _install_plugin_compat(plugin_name, zip_path_abs): """ Call pluginInstaller.installPlugin with zip_path when supported (newer CyberPanel). Older installs only accept installPlugin(pluginName) and expect pluginName.zip in CWD. """ zip_path_abs = os.path.abspath(zip_path_abs) work_dir = os.path.dirname(zip_path_abs) use_zip_kw = False try: sig = inspect.signature(pluginInstaller.installPlugin) use_zip_kw = 'zip_path' in sig.parameters except (TypeError, ValueError): use_zip_kw = False if use_zip_kw: pluginInstaller.installPlugin(plugin_name, zip_path=zip_path_abs) return prev_cwd = os.getcwd() try: os.chdir(work_dir) expected_zip = os.path.join(work_dir, plugin_name + '.zip') if zip_path_abs != expected_zip: shutil.copy2(zip_path_abs, expected_zip) pluginInstaller.installPlugin(plugin_name) finally: try: os.chdir(prev_cwd) except Exception: pass # Core CyberPanel app dirs under /usr/local/CyberCP that must not be counted as "installed plugins" # (matches pluginHolder.urls so Installed count = store/plugin dirs only, not core apps) RESERVED_PLUGIN_DIRS = frozenset([ 'api', 'backup', 'baseTemplate', 'cloudAPI', 'CLManager', 'containerization', 'CyberCP', 'databases', 'dns', 'dockerManager', 'emailMarketing', 'emailPremium', 'filemanager', 'firewall', 'ftp', 'highAvailability', 'IncBackups', 'loginSystem', 'mailServer', 'managePHP', 'manageSSL', 'manageServices', 'packages', 'pluginHolder', 'plogical', 'pluginInstaller', 'serverLogs', 'serverStatus', 's3Backups', 'tuning', 'userManagment', 'websiteFunctions', 'aiScanner', 'dns', 'help', 'installed', ]) def _find_plugin_prefix_in_archive(namelist, plugin_name): """ Find the path prefix for a plugin inside a GitHub archive (e.g. repo-main/pluginName/ or repo-main/Category/pluginName/). Returns (top_level, plugin_prefix) or (None, None) if not found. """ top_level = None for name in namelist: if '/' in name: top_level = name.split('/')[0] break if not top_level: return None, None plugin_name_lower = plugin_name.lower() # Check every path: find one that has a segment equal to plugin_name (e.g. .../pm2Manager/ or .../snappymailAdmin/) for name in namelist: if '/' not in name: continue parts = name.split('/') # parts[0] = top_level, then we need a segment that matches plugin_name for i in range(1, len(parts)): if parts[i].lower() == plugin_name_lower: # Plugin folder is at top_level/parts[1]/.../parts[i]/ prefix_parts = [top_level] + parts[1:i + 1] plugin_prefix = '/'.join(prefix_parts) + '/' return top_level, plugin_prefix return top_level, None def _get_plugin_source_path(plugin_name): """Return the full path to a plugin's source directory, or None if not found.""" for base in PLUGIN_SOURCE_PATHS: path = os.path.join(base, plugin_name) meta_path = os.path.join(path, 'meta.xml') if os.path.isdir(path) and os.path.exists(meta_path): return path return None def _ensure_plugin_meta_xml(plugin_name): """ If plugin is installed (directory exists) but meta.xml is missing, restore it from source or from GitHub so the grid and version checks work. """ installed_dir = os.path.join('/usr/local/CyberCP', plugin_name) installed_meta = os.path.join(installed_dir, 'meta.xml') if not os.path.isdir(installed_dir) or os.path.exists(installed_meta): return source_path = _get_plugin_source_path(plugin_name) if source_path: source_meta = os.path.join(source_path, 'meta.xml') if os.path.exists(source_meta): try: shutil.copy2(source_meta, installed_meta) logging.writeToFile(f"Restored meta.xml for {plugin_name} from source") except Exception as e: logging.writeToFile(f"Could not restore meta.xml for {plugin_name}: {e}") return try: _sync_meta_xml_from_github(plugin_name) except Exception: pass def _get_plugin_state_file(plugin_name): """Get the path to the plugin state file""" if not os.path.exists(PLUGIN_STATE_DIR): os.makedirs(PLUGIN_STATE_DIR, mode=0o755) return os.path.join(PLUGIN_STATE_DIR, plugin_name + '.state') def _is_plugin_enabled(plugin_name): """Check if a plugin is enabled""" state_file = _get_plugin_state_file(plugin_name) if os.path.exists(state_file): try: with open(state_file, 'r') as f: state = f.read().strip() return state == 'enabled' except: return True # Default to enabled if file read fails return True # Default to enabled if state file doesn't exist def _get_freshness_badge(modify_date): """ Return freshness badge (NEW/Stable/STALE) based on modify_date. modify_date format: 'YYYY-MM-DD HH:MM:SS' or 'N/A' - 0-90 days: NEW (yellow) - 90-365 days: Stable (green) - 730+ days: STALE (red) - 365-730 days: no badge """ if not modify_date or modify_date == 'N/A' or not isinstance(modify_date, str): return None try: dt = datetime.strptime(modify_date[:19], '%Y-%m-%d %H:%M:%S') days_ago = (datetime.now() - dt).days if days_ago <= 90: return {'badge': 'NEW', 'class': 'freshness-badge-new', 'title': 'This plugin was released/updated within the last 3 months'} elif days_ago <= 365: return {'badge': 'Stable', 'class': 'freshness-badge-stable', 'title': 'This plugin was updated within the last year'} elif days_ago < 730: return {'badge': 'Unstable', 'class': 'freshness-badge-unstable', 'title': 'This plugin has not been updated in over 1 year'} else: return {'badge': 'STALE', 'class': 'freshness-badge-stale', 'title': 'This plugin has not been updated in over 2 years'} except (ValueError, TypeError): pass return None def _set_plugin_state(plugin_name, enabled): """Set plugin enabled/disabled state""" state_file = _get_plugin_state_file(plugin_name) try: with open(state_file, 'w') as f: f.write('enabled' if enabled else 'disabled') os.chmod(state_file, 0o644) return True except Exception as e: logging.writeToFile(f"Error writing plugin state for {plugin_name}: {str(e)}") return False def help_page(request): """Display plugin development help page""" mailUtilities.checkHome() proc = httpProc(request, 'pluginHolder/help.html', {}, 'managePlugins') return proc.render() def installed(request): mailUtilities.checkHome() installedPath = '/usr/local/CyberCP' pluginList = [] errorPlugins = [] processed_plugins = set() # Track which plugins we've already processed # Repair pass: ensure every installed plugin dir has meta.xml (from source or GitHub) so counts and grid are correct if os.path.exists(installedPath): for plugin in os.listdir(installedPath): if plugin.startswith('.') or plugin in RESERVED_PLUGIN_DIRS: continue plugin_dir = os.path.join(installedPath, plugin) if os.path.isdir(plugin_dir): _ensure_plugin_meta_xml(plugin) # First, process plugins from source directories (multiple paths: /home/cyberpanel/plugins, /home/cyberpanel-plugins) # BUT: Skip plugins that are already installed - we'll process those from the installed location instead for pluginPath in PLUGIN_SOURCE_PATHS: if not os.path.exists(pluginPath): continue try: dirs_in_path = [p for p in os.listdir(pluginPath) if os.path.isdir(os.path.join(pluginPath, p))] logging.writeToFile(f"Plugin source path {pluginPath}: directories {sorted(dirs_in_path)}") except Exception as e: logging.writeToFile(f"Plugin source path {pluginPath}: listdir error {e}") for plugin in os.listdir(pluginPath): if plugin in processed_plugins: continue # Skip files (like .zip files) - only process directories pluginDir = os.path.join(pluginPath, plugin) if not os.path.isdir(pluginDir): continue # Use same "installed" criterion as install endpoint: plugin directory in /usr/local/CyberCP/ installed_dir = os.path.join(installedPath, plugin) completePath = os.path.join(installedPath, plugin, 'meta.xml') if os.path.exists(completePath): # Plugin is fully installed (dir + meta.xml), skip - second loop will add it continue data = {} # Try installed location first, then fallback to source location sourcePath = os.path.join(pluginDir, 'meta.xml') # Determine which meta.xml to use metaXmlPath = None if os.path.exists(completePath): metaXmlPath = completePath elif os.path.exists(sourcePath): # Plugin not installed but has source meta.xml - use it metaXmlPath = sourcePath # Add error handling to prevent 500 errors try: if metaXmlPath is None: # No meta.xml found in either location - skip (log for diagnostics) logging.writeToFile(f"Plugin {plugin}: skipped (no meta.xml in source or installed)") continue pluginMetaData = ElementTree.parse(metaXmlPath) root = pluginMetaData.getroot() # Validate required fields exist (handle both and formats) name_elem = root.find('name') type_elem = root.find('type') desc_elem = root.find('description') version_elem = root.find('version') # All fields required including type (category) - no default if name_elem is None or type_elem is None or desc_elem is None or version_elem is None: errorPlugins.append({'name': plugin, 'error': 'Missing required metadata fields (name, type/category, description, or version)'}) logging.writeToFile(f"Plugin {plugin}: Missing required metadata fields in meta.xml") continue # Check if text is None or empty (all required) type_text = type_elem.text.strip() if type_elem.text else '' if name_elem.text is None or desc_elem.text is None or version_elem.text is None or not type_text: errorPlugins.append({'name': plugin, 'error': 'Empty metadata fields (name, type/category, description, or version required)'}) logging.writeToFile(f"Plugin {plugin}: Empty metadata fields in meta.xml") continue # Valid categories only: Utility, Security, Backup, Performance (Plugin category removed) if type_text.lower() not in ('utility', 'security', 'backup', 'performance', 'monitoring', 'integration', 'email', 'development', 'analytics'): errorPlugins.append({'name': plugin, 'error': f'Invalid category "{type_text}". Use: Utility, Security, Backup, or Performance.'}) logging.writeToFile(f"Plugin {plugin}: Invalid category '{type_text}'") continue data['name'] = name_elem.text data['type'] = type_text data['desc'] = desc_elem.text data['version'] = version_elem.text data['plugin_dir'] = plugin # Plugin directory name # Set builtin flag (core CyberPanel plugins vs user-installable plugins) data['builtin'] = plugin in BUILTIN_PLUGINS # Installed = plugin directory exists (must match install endpoint which uses directory existence) # Fixes grid showing "Not Installed" when directory exists but meta.xml is missing data['installed'] = os.path.isdir(installed_dir) # Get plugin enabled state (only for installed plugins) if data['installed']: data['enabled'] = _is_plugin_enabled(plugin) else: data['enabled'] = False # Initialize is_paid to False by default (will be set later if paid) data['is_paid'] = False data['patreon_tier'] = None data['patreon_url'] = None # Get modify date from local file (fast, no API calls) # GitHub commit dates are fetched in the plugin store, not here to avoid timeouts modify_date = 'N/A' try: if os.path.exists(metaXmlPath): modify_time = os.path.getmtime(metaXmlPath) modify_date = datetime.fromtimestamp(modify_time).strftime('%Y-%m-%d %H:%M:%S') except Exception: modify_date = 'N/A' data['modify_date'] = modify_date data['freshness_badge'] = _get_freshness_badge(modify_date) # Extract settings URL or main URL for "Manage" button settings_url_elem = root.find('settings_url') url_elem = root.find('url') # Priority: settings_url > url > default pattern # Special handling for core plugins that don't use /plugins/ prefix if plugin == 'emailMarketing': # emailMarketing is a core CyberPanel plugin, uses /emailMarketing/ not /plugins/emailMarketing/ data['manage_url'] = '/emailMarketing/' elif settings_url_elem is not None and settings_url_elem.text: data['manage_url'] = settings_url_elem.text elif url_elem is not None and url_elem.text: data['manage_url'] = url_elem.text else: # Default: try /plugins/{plugin_dir}/settings/ or /plugins/{plugin_dir}/ # Only set if plugin is installed (we can't know if the URL exists otherwise) # Special handling for emailMarketing if plugin == 'emailMarketing': data['manage_url'] = '/emailMarketing/' elif data['installed']: # Plugin directory exists; use main plugin URL main_route = f'/plugins/{plugin}/' data['manage_url'] = main_route else: data['manage_url'] = None # Extract author information author_elem = root.find('author') if author_elem is not None and author_elem.text: data['author'] = author_elem.text else: data['author'] = 'Unknown' # Extract paid plugin information paid_elem = root.find('paid') patreon_tier_elem = root.find('patreon_tier') if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true': data['is_paid'] = True data['patreon_tier'] = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin' data['patreon_url'] = root.find('patreon_url').text if root.find('patreon_url') is not None else 'https://www.patreon.com/c/newstargeted/membership' else: data['is_paid'] = False data['patreon_tier'] = None data['patreon_url'] = None pluginList.append(data) processed_plugins.add(plugin) # Mark as processed except ElementTree.ParseError as e: errorPlugins.append({'name': plugin, 'error': f'XML parse error: {str(e)}'}) logging.writeToFile(f"Plugin {plugin}: XML parse error - {str(e)}") # Don't mark as processed if it failed - let installed check handle it if not os.path.isdir(installed_dir): continue continue except Exception as e: errorPlugins.append({'name': plugin, 'error': f'Error loading plugin: {str(e)}'}) logging.writeToFile(f"Plugin {plugin}: Error loading - {str(e)}") if not os.path.isdir(installed_dir): continue continue # Also check for installed plugins that don't have source directories # This handles plugins installed from the store that may not be in /home/cyberpanel/plugins/ if os.path.exists(installedPath): for plugin in os.listdir(installedPath): # Skip if already processed if plugin in processed_plugins: continue # Only check directories that look like plugins (have meta.xml) pluginInstalledDir = os.path.join(installedPath, plugin) if not os.path.isdir(pluginInstalledDir): continue _ensure_plugin_meta_xml(plugin) metaXmlPath = os.path.join(pluginInstalledDir, 'meta.xml') if not os.path.exists(metaXmlPath): continue # This is an installed plugin without a source directory - process it try: data = {} pluginMetaData = ElementTree.parse(metaXmlPath) root = pluginMetaData.getroot() # Validate required fields (including type/category - no default) name_elem = root.find('name') type_elem = root.find('type') desc_elem = root.find('description') version_elem = root.find('version') if name_elem is None or type_elem is None or desc_elem is None or version_elem is None: errorPlugins.append({'name': plugin, 'error': 'Missing required metadata (name, type/category, description, or version)'}) continue type_text = type_elem.text.strip() if type_elem.text else '' if name_elem.text is None or desc_elem.text is None or version_elem.text is None or not type_text: errorPlugins.append({'name': plugin, 'error': 'Empty metadata (type/category required)'}) continue # Valid categories only: Utility, Security, Backup, Performance (Plugin category removed) if type_text.lower() not in ('utility', 'security', 'backup', 'performance', 'monitoring', 'integration', 'email', 'development', 'analytics'): errorPlugins.append({'name': plugin, 'error': f'Invalid category "{type_text}". Use: Utility, Security, Backup, or Performance.'}) continue data['name'] = name_elem.text data['type'] = type_text data['desc'] = desc_elem.text data['version'] = version_elem.text data['plugin_dir'] = plugin # Set builtin flag (core CyberPanel plugins vs user-installable plugins) data['builtin'] = plugin in BUILTIN_PLUGINS data['installed'] = True # This is an installed plugin data['enabled'] = _is_plugin_enabled(plugin) # Initialize is_paid to False by default (will be set later if paid) data['is_paid'] = False data['patreon_tier'] = None data['patreon_url'] = None # Get modify date from installed location modify_date = 'N/A' try: if os.path.exists(metaXmlPath): modify_time = os.path.getmtime(metaXmlPath) modify_date = datetime.fromtimestamp(modify_time).strftime('%Y-%m-%d %H:%M:%S') except Exception: modify_date = 'N/A' data['modify_date'] = modify_date data['freshness_badge'] = _get_freshness_badge(modify_date) # Extract settings URL or main URL settings_url_elem = root.find('settings_url') url_elem = root.find('url') # Priority: settings_url > url > default pattern # Special handling for core plugins that don't use /plugins/ prefix if plugin == 'emailMarketing': # emailMarketing is a core CyberPanel plugin, uses /emailMarketing/ not /plugins/emailMarketing/ data['manage_url'] = '/emailMarketing/' elif settings_url_elem is not None and settings_url_elem.text: data['manage_url'] = settings_url_elem.text elif url_elem is not None and url_elem.text: data['manage_url'] = url_elem.text else: # Default to /plugins/{plugin}/ for regular plugins # Special handling for emailMarketing if plugin == 'emailMarketing': data['manage_url'] = '/emailMarketing/' else: # Default to main plugin route (most plugins work from main route) data['manage_url'] = f'/plugins/{plugin}/' # Extract author information author_elem = root.find('author') if author_elem is not None and author_elem.text: data['author'] = author_elem.text else: data['author'] = 'Unknown' # Extract paid plugin information (is_paid already initialized to False above) paid_elem = root.find('paid') patreon_tier_elem = root.find('patreon_tier') if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true': data['is_paid'] = True data['patreon_tier'] = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin' patreon_url_elem = root.find('patreon_url') data['patreon_url'] = patreon_url_elem.text if patreon_url_elem is not None and patreon_url_elem.text else 'https://www.patreon.com/membership/27789984' # else: is_paid already False from initialization above pluginList.append(data) processed_plugins.add(plugin) # Mark as processed to prevent duplicates except ElementTree.ParseError as e: errorPlugins.append({'name': plugin, 'error': f'XML parse error: {str(e)}'}) logging.writeToFile(f"Installed plugin {plugin}: XML parse error - {str(e)}") continue except Exception as e: errorPlugins.append({'name': plugin, 'error': f'Error loading installed plugin: {str(e)}'}) logging.writeToFile(f"Installed plugin {plugin}: Error loading - {str(e)}") continue # Ensure redisManager and memcacheManager load when present (fallback if missed by listdir) for plugin_name in ('redisManager', 'memcacheManager'): if plugin_name in processed_plugins: continue source_path = _get_plugin_source_path(plugin_name) installed_meta = os.path.join(installedPath, plugin_name, 'meta.xml') meta_xml_path = installed_meta if os.path.exists(installed_meta) else (os.path.join(source_path, 'meta.xml') if source_path else None) if not meta_xml_path or not os.path.exists(meta_xml_path): continue try: root = ElementTree.parse(meta_xml_path).getroot() name_elem = root.find('name') type_elem = root.find('type') desc_elem = root.find('description') version_elem = root.find('version') if name_elem is None or type_elem is None or desc_elem is None or version_elem is None: continue type_text = (type_elem.text or '').strip() if not type_text or name_elem.text is None or desc_elem.text is None or version_elem.text is None: continue if type_text.lower() not in ('utility', 'security', 'backup', 'performance', 'monitoring', 'integration', 'email', 'development', 'analytics'): continue complete_path = os.path.join(installedPath, plugin_name, 'meta.xml') data = { 'name': name_elem.text, 'type': type_text, 'desc': desc_elem.text, 'version': version_elem.text, 'plugin_dir': plugin_name, 'builtin': plugin_name in BUILTIN_PLUGINS, # Set builtin flag 'installed': os.path.exists(complete_path), 'enabled': _is_plugin_enabled(plugin_name) if os.path.exists(complete_path) else False, 'is_paid': False, 'patreon_tier': None, 'patreon_url': None, 'manage_url': f'/plugins/{plugin_name}/', 'author': root.find('author').text if root.find('author') is not None and root.find('author').text else 'Unknown', } try: modify_time = os.path.getmtime(meta_xml_path) data['modify_date'] = datetime.fromtimestamp(modify_time).strftime('%Y-%m-%d %H:%M:%S') except Exception: data['modify_date'] = 'N/A' data['freshness_badge'] = _get_freshness_badge(data['modify_date']) paid_elem = root.find('paid') if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true': data['is_paid'] = True data['patreon_tier'] = 'CyberPanel Paid Plugin' data['patreon_url'] = root.find('patreon_url').text if root.find('patreon_url') is not None else 'https://www.patreon.com/membership/27789984' pluginList.append(data) processed_plugins.add(plugin_name) logging.writeToFile(f"Plugin {plugin_name}: added via fallback (source or installed)") except Exception as e: logging.writeToFile(f"Plugin {plugin_name} fallback load error: {str(e)}") # Calculate installed and active counts: only count real plugins (have meta.xml, not core apps) installed_plugins_in_filesystem = set() if os.path.exists(installedPath): for plugin in os.listdir(installedPath): if plugin.startswith('.') or plugin in RESERVED_PLUGIN_DIRS: continue pluginInstalledDir = os.path.join(installedPath, plugin) if not os.path.isdir(pluginInstalledDir): continue if not os.path.exists(os.path.join(pluginInstalledDir, 'meta.xml')): continue installed_plugins_in_filesystem.add(plugin) installed_count = len([p for p in pluginList if p.get('installed', False)]) active_count = len([p for p in pluginList if p.get('installed', False) and p.get('enabled', False)]) # Use the larger of list count and filesystem count so header never shows less than grid filesystem_installed_count = len(installed_plugins_in_filesystem) list_installed_count = len([p for p in pluginList if p.get('installed', False)]) if filesystem_installed_count != list_installed_count: logging.writeToFile(f"Plugin count: list installed={list_installed_count}, filesystem with meta.xml={filesystem_installed_count}") installed_count = max(list_installed_count, filesystem_installed_count) if active_count > installed_count: active_count = installed_count # Debug logging to help identify discrepancies logging.writeToFile(f"Plugin count: Total={len(pluginList)}, Installed={installed_count}, Active={active_count}") for p in pluginList: logging.writeToFile(f" - {p.get('plugin_dir')}: installed={p.get('installed')}, enabled={p.get('enabled')}") # Get cache expiry timestamp for display (will be converted to local time in browser) cache_expiry_timestamp, _ = _get_cache_expiry_time() # Sort plugins A-Å by name (case-insensitive) for Grid and Table view pluginList.sort(key=lambda p: (p.get('name') or '').lower()) proc = httpProc(request, 'pluginHolder/plugins.html', {'plugins': pluginList, 'error_plugins': errorPlugins, 'installed_count': installed_count, 'active_count': active_count, 'cache_expiry_timestamp': cache_expiry_timestamp}, 'managePlugins') return proc.render() @csrf_exempt @require_http_methods(["POST"]) def install_plugin(request, plugin_name): """Install a plugin""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if plugin source exists (in any configured source path) pluginSource = _get_plugin_source_path(plugin_name) if not pluginSource: return JsonResponse({ 'success': False, 'error': f'Plugin source not found: {plugin_name} (checked: {", ".join(PLUGIN_SOURCE_PATHS)})' }, status=404) # Check if already installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin already installed: {plugin_name}' }, status=400) # Create zip file for installation (pluginInstaller expects a zip) import tempfile import shutil import zipfile temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, plugin_name + '.zip') # Create zip from source directory with correct structure # The ZIP must contain plugin_name/ directory structure for proper extraction plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) # Walk through source directory and add files with plugin_name prefix for root, dirs, files in os.walk(pluginSource): for file in files: file_path = os.path.join(root, file) # Calculate relative path from plugin source arcname = os.path.relpath(file_path, pluginSource) # Add plugin_name prefix to maintain directory structure arcname = os.path.join(plugin_name, arcname) plugin_zip.write(file_path, arcname) plugin_zip.close() # Verify zip file was created if not os.path.exists(zip_path): shutil.rmtree(temp_dir, ignore_errors=True) return JsonResponse({ 'success': False, 'error': f'Failed to create zip file for {plugin_name}' }, status=500) zip_path_abs = os.path.abspath(zip_path) if not os.path.exists(zip_path_abs): raise Exception(f'Zip file not found: {zip_path_abs}') try: # Install using pluginInstaller (zip_path kw when supported; else legacy CWD + pluginName.zip) try: _install_plugin_compat(plugin_name, zip_path_abs) except Exception as install_error: # Log the full error for debugging error_msg = str(install_error) logging.writeToFile(f"pluginInstaller.installPlugin raised exception: {error_msg}") # Check if plugin directory exists despite the error pluginInstalled = '/usr/local/CyberCP/' + plugin_name if os.path.exists(pluginInstalled): logging.writeToFile(f"Plugin directory exists despite error, continuing...") else: raise Exception(f'Plugin installation failed: {error_msg}') # Wait a moment for file system to sync import time time.sleep(2) # Verify plugin was actually installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): # Check if plugin files were extracted to root (exclude README.md - main repo has it at root) root_files = ['apps.py', 'meta.xml', 'urls.py', 'views.py'] found_root_files = [f for f in root_files if os.path.exists(os.path.join('/usr/local/CyberCP', f))] if found_root_files: raise Exception(f'Plugin installation failed: Files extracted to wrong location. Found {found_root_files} in /usr/local/CyberCP/ root instead of {pluginInstalled}/') raise Exception(f'Plugin installation failed: {pluginInstalled} does not exist after installation') # Set plugin to enabled by default after installation _set_plugin_state(plugin_name, True) _ensure_plugin_meta_xml(plugin_name) logging.writeToFile(f"Plugin {plugin_name} installed successfully (upload)") return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} installed successfully' }) finally: # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) except Exception as e: logging.writeToFile(f"Error installing plugin {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["POST"]) def uninstall_plugin(request, plugin_name): """Uninstall a plugin - but keep source files and settings""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if plugin is installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin not installed: {plugin_name}' }, status=404) # Custom uninstall that keeps source files # We need to remove from settings.py, urls.py, and remove installed directory # but NOT remove from /home/cyberpanel/plugins/ # Remove from settings.py pluginInstaller.removeFromSettings(plugin_name) # Remove from URLs pluginInstaller.removeFromURLs(plugin_name) # Remove interface link pluginInstaller.removeInterfaceLink(plugin_name) # Remove migrations if enabled if pluginInstaller.migrationsEnabled(plugin_name): pluginInstaller.removeMigrations(plugin_name) # Remove installed directory (but keep source in /home/cyberpanel/plugins/) pluginInstaller.removeFiles(plugin_name) # DON'T call informCyberPanelRemoval - we want to keep the source directory # so users can reinstall the plugin later # Restart service pluginInstaller.restartGunicorn() # Keep state file - we want to remember if it was enabled/disabled # So user can reinstall and have same state return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} uninstalled successfully (source files and settings preserved)' }) except Exception as e: logging.writeToFile(f"Error uninstalling plugin {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["POST"]) def enable_plugin(request, plugin_name): """Enable a plugin""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if plugin is installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin not installed: {plugin_name}' }, status=404) # Set plugin state to enabled if _set_plugin_state(plugin_name, True): return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} enabled successfully' }) else: return JsonResponse({ 'success': False, 'error': 'Failed to update plugin state' }, status=500) except Exception as e: logging.writeToFile(f"Error enabling plugin {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["POST"]) def disable_plugin(request, plugin_name): """Disable a plugin""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if plugin is installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin not installed: {plugin_name}' }, status=404) # Set plugin state to disabled if _set_plugin_state(plugin_name, False): return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} disabled successfully' }) else: return JsonResponse({ 'success': False, 'error': 'Failed to update plugin state' }, status=500) except Exception as e: logging.writeToFile(f"Error disabling plugin {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) def _ensure_cache_dir(): """Ensure cache directory exists""" try: if not os.path.exists(PLUGIN_STORE_CACHE_DIR): os.makedirs(PLUGIN_STORE_CACHE_DIR, mode=0o755) except Exception as e: logging.writeToFile(f"Error creating cache directory: {str(e)}") def _get_cache_expiry_time(): """Get the cache expiry time (when cache will be updated next) Returns: tuple: (expiry_timestamp, expiry_datetime_string) or (None, None) if no cache expiry_timestamp is Unix timestamp for JavaScript conversion to local time """ try: if not os.path.exists(PLUGIN_STORE_CACHE_FILE): return None, None # Try to read stored expiry time from cache metadata try: with open(PLUGIN_STORE_CACHE_FILE, 'r', encoding='utf-8') as f: cache_data = json.load(f) stored_expiry = cache_data.get('expiry_timestamp') if stored_expiry: # Return timestamp for JavaScript to convert to local time return stored_expiry, None except: pass # Fall back to calculation if metadata not found # Fallback: calculate from file modification time (for old cache files) cache_mtime = os.path.getmtime(PLUGIN_STORE_CACHE_FILE) expiry_timestamp = cache_mtime + PLUGIN_STORE_CACHE_DURATION return expiry_timestamp, None except Exception as e: logging.writeToFile(f"Error getting cache expiry time: {str(e)}") return None, None def _get_cached_plugins(allow_expired=False): """Get plugins from cache if available and not expired Args: allow_expired: If True, return cache even if expired (for fallback) """ try: if not os.path.exists(PLUGIN_STORE_CACHE_FILE): return None # Read cache file to get stored expiry time with open(PLUGIN_STORE_CACHE_FILE, 'r', encoding='utf-8') as f: cache_data = json.load(f) # Check expiry using stored timestamp if available, otherwise fall back to file mtime current_time = time.time() stored_expiry = cache_data.get('expiry_timestamp') if stored_expiry: # Use stored expiry time (with randomization) cache_age = current_time - (stored_expiry - cache_data.get('cache_duration', PLUGIN_STORE_CACHE_DURATION)) is_expired = current_time >= stored_expiry else: # Fallback for old cache files without expiry metadata cache_mtime = os.path.getmtime(PLUGIN_STORE_CACHE_FILE) cache_age = current_time - cache_mtime is_expired = cache_age > PLUGIN_STORE_CACHE_DURATION if is_expired: if not allow_expired: logging.writeToFile(f"Plugin store cache expired (age: {cache_age:.0f}s)") return None else: logging.writeToFile(f"Using expired cache as fallback (age: {cache_age:.0f}s)") if not allow_expired or not is_expired: logging.writeToFile(f"Using cached plugin store data (age: {cache_age:.0f}s)") return cache_data.get('plugins', []) except Exception as e: logging.writeToFile(f"Error reading plugin store cache: {str(e)}") return None def _save_plugins_cache(plugins): """Save plugins to cache with randomized expiry time""" try: _ensure_cache_dir() # Generate random cache duration to prevent simultaneous requests from all CyberPanel instances # Base duration ± random offset (e.g., 1 hour ± 10 minutes) import random random_offset = random.randint(-PLUGIN_STORE_CACHE_RANDOM_OFFSET, PLUGIN_STORE_CACHE_RANDOM_OFFSET) actual_cache_duration = PLUGIN_STORE_CACHE_DURATION + random_offset # Calculate expiry timestamp current_time = time.time() expiry_timestamp = current_time + actual_cache_duration cache_data = { 'plugins': plugins, 'cached_at': datetime.now().isoformat(), 'expiry_timestamp': expiry_timestamp, 'cache_duration': actual_cache_duration, 'base_duration': PLUGIN_STORE_CACHE_DURATION, 'random_offset': random_offset } with open(PLUGIN_STORE_CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2, ensure_ascii=False) expiry_datetime = datetime.fromtimestamp(expiry_timestamp) logging.writeToFile(f"Plugin store cache saved successfully. Expires at: {expiry_datetime.strftime('%Y-%m-%d %H:%M:%S')} (duration: {actual_cache_duration}s, offset: {random_offset:+d}s)") except Exception as e: logging.writeToFile(f"Error saving plugin store cache: {str(e)}") def _compare_versions(version1, version2): """ Compare two version strings (semantic versioning) Returns: 1 if version1 > version2, -1 if version1 < version2, 0 if equal """ try: # Split versions into parts v1_parts = [int(x) for x in version1.split('.')] v2_parts = [int(x) for x in version2.split('.')] # Pad shorter version with zeros max_len = max(len(v1_parts), len(v2_parts)) v1_parts.extend([0] * (max_len - len(v1_parts))) v2_parts.extend([0] * (max_len - len(v2_parts))) # Compare each part for v1, v2 in zip(v1_parts, v2_parts): if v1 > v2: return 1 elif v1 < v2: return -1 return 0 except: # Fallback to string comparison if parsing fails if version1 > version2: return 1 elif version1 < version2: return -1 return 0 def _get_installed_version(plugin_dir, plugin_install_dir): """Get installed version of a plugin from meta.xml. Supports both and root elements.""" installed_path = os.path.join(plugin_install_dir, plugin_dir) meta_path = os.path.join(installed_path, 'meta.xml') if os.path.exists(meta_path): try: pluginMetaData = ElementTree.parse(meta_path) root = pluginMetaData.getroot() version_elem = root.find('version') if version_elem is not None and version_elem.text: return version_elem.text.strip() except Exception as e: logging.writeToFile(f"Error reading version from {meta_path}: {str(e)}") return None def _sync_meta_xml_from_github(plugin_name, plugin_install_dir='/usr/local/CyberCP'): """ Fetch meta.xml from GitHub raw (main) and overwrite installed meta.xml. Ensures installed version matches store even when archive ZIP is cached/stale. Verifies write by re-reading version. Returns True if synced and version readable, False otherwise. """ meta_url = f'{GITHUB_RAW_BASE}/{plugin_name}/meta.xml' meta_path = os.path.join(plugin_install_dir, plugin_name, 'meta.xml') for attempt in (1, 2): try: req = urllib.request.Request(meta_url, headers={'User-Agent': 'CyberPanel-Plugin-Store/1.0'}) with urllib.request.urlopen(req, timeout=15) as resp: content = resp.read() if not content: if attempt == 2: logging.writeToFile(f"Sync meta.xml for {plugin_name}: empty response from GitHub") continue with open(meta_path, 'wb') as f: f.write(content) f.flush() if hasattr(os, 'fsync'): try: f.fsync() except Exception: pass # Verify we can read version back (ensures file is valid and readable) ver = _get_installed_version(plugin_name, plugin_install_dir) if ver: logging.writeToFile(f"Synced meta.xml for {plugin_name} from GitHub raw (version {ver})") return True if attempt == 2: logging.writeToFile(f"Sync meta.xml for {plugin_name}: wrote file but could not parse version") except Exception as e: logging.writeToFile(f"Could not sync meta.xml for {plugin_name} from GitHub (attempt {attempt}): {str(e)}") return False def _create_plugin_backup(plugin_name, plugin_install_dir='/usr/local/CyberCP'): """ Create a backup of a plugin before upgrade Returns: (backup_path, backup_info) or (None, None) on failure """ try: # Ensure backup directory exists if not os.path.exists(PLUGIN_BACKUP_DIR): os.makedirs(PLUGIN_BACKUP_DIR, mode=0o755) plugin_path = os.path.join(plugin_install_dir, plugin_name) if not os.path.exists(plugin_path): return None, None # Get current version installed_version = _get_installed_version(plugin_name, plugin_install_dir) or 'unknown' # Create backup directory with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f"{plugin_name}_v{installed_version}_{timestamp}" backup_path = os.path.join(PLUGIN_BACKUP_DIR, backup_name) # Copy plugin directory import shutil shutil.copytree(plugin_path, backup_path) # Create backup metadata backup_info = { 'plugin_name': plugin_name, 'version': installed_version, 'timestamp': timestamp, 'backup_path': backup_path, 'created_at': datetime.now().isoformat() } # Save metadata as JSON metadata_file = os.path.join(backup_path, '.backup_metadata.json') with open(metadata_file, 'w') as f: json.dump(backup_info, f, indent=2) logging.writeToFile(f"Created backup for {plugin_name} version {installed_version} at {backup_path}") return backup_path, backup_info except Exception as e: logging.writeToFile(f"Error creating backup for {plugin_name}: {str(e)}") return None, None def _get_plugin_backups(plugin_name): """Get list of available backups for a plugin""" backups = [] if not os.path.exists(PLUGIN_BACKUP_DIR): return backups try: for item in os.listdir(PLUGIN_BACKUP_DIR): if item.startswith(plugin_name + '_'): backup_path = os.path.join(PLUGIN_BACKUP_DIR, item) if os.path.isdir(backup_path): metadata_file = os.path.join(backup_path, '.backup_metadata.json') if os.path.exists(metadata_file): try: with open(metadata_file, 'r') as f: backup_info = json.load(f) backups.append(backup_info) except: # Fallback: parse from directory name parts = item.split('_') if len(parts) >= 3: version = parts[1].replace('v', '') timestamp = '_'.join(parts[2:]) backups.append({ 'plugin_name': plugin_name, 'version': version, 'timestamp': timestamp, 'backup_path': backup_path, 'created_at': timestamp }) else: # No metadata, try to parse from directory name parts = item.split('_') if len(parts) >= 3: version = parts[1].replace('v', '') timestamp = '_'.join(parts[2:]) backups.append({ 'plugin_name': plugin_name, 'version': version, 'timestamp': timestamp, 'backup_path': backup_path, 'created_at': timestamp }) # Sort by timestamp (newest first) backups.sort(key=lambda x: x.get('timestamp', ''), reverse=True) except Exception as e: logging.writeToFile(f"Error listing backups for {plugin_name}: {str(e)}") return backups def _restore_plugin_from_backup(plugin_name, backup_path): """Restore a plugin from a backup""" try: plugin_install_dir = '/usr/local/CyberCP' plugin_path = os.path.join(plugin_install_dir, plugin_name) # Remove current plugin installation if os.path.exists(plugin_path): import shutil shutil.rmtree(plugin_path) # Restore from backup import shutil shutil.copytree(backup_path, plugin_path) # Remove backup metadata file from restored plugin metadata_file = os.path.join(plugin_path, '.backup_metadata.json') if os.path.exists(metadata_file): os.remove(metadata_file) logging.writeToFile(f"Restored {plugin_name} from backup {backup_path}") return True except Exception as e: logging.writeToFile(f"Error restoring {plugin_name} from backup: {str(e)}") return False def _enrich_store_plugins(plugins): """Enrich store plugins with installed/enabled status from local system""" enriched = [] plugin_source_dir = '/home/cyberpanel/plugins' plugin_install_dir = '/usr/local/CyberCP' for plugin in plugins: plugin_dir = plugin.get('plugin_dir', '') if not plugin_dir: continue # Check if plugin is installed locally # Plugin is only considered "installed" if it exists in /usr/local/CyberCP/ # Source directory presence doesn't mean installed - it just means the source files are available installed_path = os.path.join(plugin_install_dir, plugin_dir) plugin['installed'] = os.path.exists(installed_path) # Check if plugin is enabled (only if installed) if plugin['installed']: plugin['enabled'] = _is_plugin_enabled(plugin_dir) # Check for updates by comparing versions installed_version = _get_installed_version(plugin_dir, plugin_install_dir) store_version = plugin.get('version', '0.0.0') if installed_version and store_version: # Update available if store version is newer plugin['update_available'] = _compare_versions(store_version, installed_version) > 0 plugin['installed_version'] = installed_version else: plugin['update_available'] = False plugin['installed_version'] = installed_version or 'Unknown' else: plugin['enabled'] = False plugin['update_available'] = False plugin['installed_version'] = None # Ensure is_paid field exists and is properly set (default to False if not set or invalid) # Handle all possible cases: missing, None, empty string, string values, boolean is_paid_value = plugin.get('is_paid', False) # Normalize is_paid to boolean if is_paid_value is None or is_paid_value == '' or is_paid_value == 'false' or is_paid_value == 'False' or is_paid_value == '0': plugin['is_paid'] = False elif is_paid_value is True or is_paid_value == 'true' or is_paid_value == 'True' or is_paid_value == '1' or str(is_paid_value).lower() == 'true': plugin['is_paid'] = True elif 'is_paid' not in plugin or plugin.get('is_paid') is None: # Try to check from local meta.xml if available meta_path = None source_path = os.path.join(plugin_source_dir, plugin_dir) if os.path.exists(installed_path): meta_path = os.path.join(installed_path, 'meta.xml') elif os.path.exists(source_path): meta_path = os.path.join(source_path, 'meta.xml') if meta_path and os.path.exists(meta_path): try: pluginMetaData = ElementTree.parse(meta_path) root = pluginMetaData.getroot() paid_elem = root.find('paid') if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true': plugin['is_paid'] = True else: plugin['is_paid'] = False except: plugin['is_paid'] = False else: plugin['is_paid'] = False # Default to free if we can't determine else: # Already set, but ensure it's boolean plugin['is_paid'] = bool(plugin['is_paid']) if plugin['is_paid'] not in [True, False] else plugin['is_paid'] enriched.append(plugin) return enriched def _fetch_plugins_from_github(): """Fetch plugins from GitHub repository""" plugins = [] try: # Fetch repository contents req = urllib.request.Request( GITHUB_REPO_API, headers={ 'User-Agent': 'CyberPanel-Plugin-Store/1.0', 'Accept': 'application/vnd.github.v3+json' } ) with urllib.request.urlopen(req, timeout=10) as response: contents = json.loads(response.read().decode('utf-8')) # Filter for directories (plugins) plugin_dirs = [item for item in contents if item.get('type') == 'dir' and not item.get('name', '').startswith('.')] for plugin_dir in plugin_dirs: plugin_name = plugin_dir.get('name', '') if not plugin_name: continue try: # Fetch meta.xml from raw GitHub meta_xml_url = f"{GITHUB_RAW_BASE}/{plugin_name}/meta.xml" meta_req = urllib.request.Request( meta_xml_url, headers={'User-Agent': 'CyberPanel-Plugin-Store/1.0'} ) with urllib.request.urlopen(meta_req, timeout=10) as meta_response: meta_xml_content = meta_response.read().decode('utf-8') # Parse meta.xml root = ElementTree.fromstring(meta_xml_content) # Fetch last commit date for this plugin from GitHub modify_date = 'N/A' try: commits_url = f"{GITHUB_COMMITS_API}?path={plugin_name}&per_page=1" commits_req = urllib.request.Request( commits_url, headers={ 'User-Agent': 'CyberPanel-Plugin-Store/1.0', 'Accept': 'application/vnd.github.v3+json' } ) with urllib.request.urlopen(commits_req, timeout=10) as commits_response: commits_data = json.loads(commits_response.read().decode('utf-8')) if commits_data and len(commits_data) > 0: commit_date = commits_data[0].get('commit', {}).get('author', {}).get('date', '') if commit_date: # Parse ISO 8601 date and format it try: from datetime import datetime dt = datetime.fromisoformat(commit_date.replace('Z', '+00:00')) modify_date = dt.strftime('%Y-%m-%d %H:%M:%S') except Exception: modify_date = commit_date[:19].replace('T', ' ') # Fallback formatting except Exception as e: logging.writeToFile(f"Could not fetch commit date for {plugin_name}: {str(e)}") modify_date = 'N/A' # Extract paid plugin information paid_elem = root.find('paid') patreon_tier_elem = root.find('patreon_tier') is_paid = False patreon_tier = None patreon_url = None if paid_elem is not None and paid_elem.text and paid_elem.text.lower() == 'true': is_paid = True patreon_tier = patreon_tier_elem.text if patreon_tier_elem is not None and patreon_tier_elem.text else 'CyberPanel Paid Plugin' patreon_url_elem = root.find('patreon_url') patreon_url = patreon_url_elem.text if patreon_url_elem is not None else 'https://www.patreon.com/c/newstargeted/membership' # Category (type) is required - valid: Utility, Security, Backup, Performance (Plugin removed) type_elem = root.find('type') if type_elem is None or not type_elem.text or not type_elem.text.strip(): logging.writeToFile(f"Plugin {plugin_name}: Missing required type/category in meta.xml, skipping") continue type_text = type_elem.text.strip().lower() if type_text not in ('utility', 'security', 'backup', 'performance', 'monitoring', 'integration', 'email', 'development', 'analytics'): logging.writeToFile(f"Plugin {plugin_name}: Invalid category '{type_elem.text}', skipping (use Utility, Security, Backup, or Performance)") continue freshness = _get_freshness_badge(modify_date) plugin_data = { 'plugin_dir': plugin_name, 'name': root.find('name').text if root.find('name') is not None else plugin_name, 'type': type_elem.text.strip(), 'description': root.find('description').text if root.find('description') is not None else '', 'version': root.find('version').text if root.find('version') is not None else '1.0.0', 'url': root.find('url').text if root.find('url') is not None else f'/plugins/{plugin_name}/', 'settings_url': root.find('settings_url').text if root.find('settings_url') is not None else f'/plugins/{plugin_name}/settings/', 'author': root.find('author').text if root.find('author') is not None else 'Unknown', 'github_url': f'https://github.com/master3395/cyberpanel-plugins/tree/main/{plugin_name}', 'about_url': f'https://github.com/master3395/cyberpanel-plugins/tree/main/{plugin_name}', 'modify_date': modify_date, 'freshness_badge': freshness, 'is_paid': is_paid, 'patreon_tier': patreon_tier, 'patreon_url': patreon_url } plugins.append(plugin_data) logging.writeToFile(f"Fetched plugin: {plugin_name} (last modified: {modify_date})") except urllib.error.HTTPError as e: if e.code == 403: # Rate limit hit - log and break logging.writeToFile(f"GitHub API rate limit exceeded (403) for plugin {plugin_name}") raise # Re-raise to be caught by outer handler elif e.code == 404: # meta.xml not found, skip this plugin logging.writeToFile(f"meta.xml not found for plugin {plugin_name}, skipping") continue else: logging.writeToFile(f"HTTP error {e.code} fetching {plugin_name}: {str(e)}") continue except Exception as e: logging.writeToFile(f"Error processing plugin {plugin_name}: {str(e)}") continue return plugins except urllib.error.HTTPError as e: if e.code == 403: error_msg = "GitHub API rate limit exceeded. Using cached data if available." logging.writeToFile(f"GitHub API 403 error: {error_msg}") raise Exception(error_msg) else: error_msg = f"GitHub API error {e.code}: {str(e)}" logging.writeToFile(error_msg) raise Exception(error_msg) except urllib.error.URLError as e: error_msg = f"Network error fetching plugins: {str(e)}" logging.writeToFile(error_msg) raise Exception(error_msg) except Exception as e: error_msg = f"Error fetching plugins from GitHub: {str(e)}" logging.writeToFile(error_msg) raise Exception(error_msg) @csrf_exempt @require_http_methods(["GET"]) def fetch_plugin_store(request): """Fetch plugins from the plugin store with caching""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) mailUtilities.checkHome() except Exception as e: logging.writeToFile(f"fetch_plugin_store: checkHome failed: {str(e)}") return JsonResponse({ 'success': False, 'error': 'Authentication required. Please log in again.', 'plugins': [] }, status=401) try: # Try to get from cache first cached_plugins = _get_cached_plugins() if cached_plugins is not None: # Enrich cached plugins with installed/enabled status enriched_plugins = _enrich_store_plugins(cached_plugins) return JsonResponse({ 'success': True, 'plugins': enriched_plugins, 'cached': True }) # Cache miss or expired - fetch from GitHub plugins = _fetch_plugins_from_github() # Enrich plugins with installed/enabled status enriched_plugins = _enrich_store_plugins(plugins) # Save to cache (save original, not enriched, to keep cache clean) if plugins: _save_plugins_cache(plugins) return JsonResponse({ 'success': True, 'plugins': enriched_plugins, 'cached': False }) except Exception as e: error_message = str(e) # If rate limited, try to use stale cache as fallback if '403' in error_message or 'rate limit' in error_message.lower(): stale_cache = _get_cached_plugins(allow_expired=True) # Get cache even if expired if stale_cache is not None: logging.writeToFile("Using stale cache due to rate limit") enriched_plugins = _enrich_store_plugins(stale_cache) return JsonResponse({ 'success': True, 'plugins': enriched_plugins, 'cached': True, 'warning': 'Using cached data due to GitHub rate limit. Data may be outdated.' }) # No cache available, return error return JsonResponse({ 'success': False, 'error': error_message, 'plugins': [] }, status=500) @csrf_exempt @require_http_methods(["POST"]) def upgrade_plugin(request, plugin_name): """Upgrade an installed plugin from GitHub store""" mailUtilities.checkHome() try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if plugin is installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin not installed: {plugin_name}' }, status=400) # Get current version before upgrade installed_version = _get_installed_version(plugin_name, '/usr/local/CyberCP') # Create automatic backup before upgrade backup_path, backup_info = _create_plugin_backup(plugin_name) if backup_path: logging.writeToFile(f"Created automatic backup for {plugin_name} before upgrade: {backup_path}") else: logging.writeToFile(f"Warning: Failed to create backup for {plugin_name}, continuing with upgrade anyway") logging.writeToFile(f"Starting upgrade of {plugin_name} from version {installed_version}") # Download and install plugin from GitHub (same as install_from_store) import tempfile import shutil import zipfile import io # Create temporary directory temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, plugin_name + '.zip') try: # Download from GitHub repo_zip_url = 'https://github.com/master3395/cyberpanel-plugins/archive/refs/heads/main.zip' logging.writeToFile(f"Downloading plugin upgrade from: {repo_zip_url}") repo_req = urllib.request.Request( repo_zip_url, headers={ 'User-Agent': 'CyberPanel-Plugin-Store/1.0', 'Accept': 'application/zip' } ) with urllib.request.urlopen(repo_req, timeout=30) as repo_response: repo_zip_data = repo_response.read() # Extract plugin directory from repository ZIP repo_zip = zipfile.ZipFile(io.BytesIO(repo_zip_data)) namelist = repo_zip.namelist() # Find plugin folder (supports flat repo or nested e.g. Category/pluginName) top_level, plugin_prefix = _find_plugin_prefix_in_archive(namelist, plugin_name) if not top_level: raise Exception('GitHub archive has no recognizable structure') if not plugin_prefix: sample = namelist[:15] if len(namelist) > 15 else namelist logging.writeToFile(f"Plugin {plugin_name} not in archive. Top-level={top_level}, sample paths: {sample}") raise Exception(f'Plugin {plugin_name} not found in GitHub repository (checked under {top_level}/)') plugin_files = [f for f in namelist if f.startswith(plugin_prefix)] if not plugin_files: logging.writeToFile(f"Plugin {plugin_name}: no files under prefix {plugin_prefix}") raise Exception(f'Plugin {plugin_name} not found in GitHub repository') logging.writeToFile(f"Found {len(plugin_files)} files for plugin {plugin_name} in GitHub (prefix {plugin_prefix})") # Create plugin ZIP with correct structure: plugin_name/... for install to /usr/local/CyberCP/plugin_name/ plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) for file_path in plugin_files: relative_path = file_path[len(plugin_prefix):] if relative_path: # Skip directory-only entries file_data = repo_zip.read(file_path) arcname = os.path.join(plugin_name, relative_path) plugin_zip.writestr(arcname, file_data) plugin_zip.close() repo_zip.close() # Verify ZIP was created if not os.path.exists(zip_path): raise Exception(f'Failed to create plugin ZIP file') logging.writeToFile(f"Created plugin ZIP: {zip_path}") zip_path_abs = os.path.abspath(zip_path) if not os.path.exists(zip_path_abs): raise Exception(f'Zip file not found: {zip_path_abs}') logging.writeToFile(f"Upgrading plugin using pluginInstaller (zip={zip_path_abs})") # Install using pluginInstaller (zip_path kw when supported; else legacy) try: _install_plugin_compat(plugin_name, zip_path_abs) except Exception as install_error: error_msg = str(install_error) logging.writeToFile(f"pluginInstaller.installPlugin raised exception: {error_msg}") # Check if plugin directory exists despite the error if not os.path.exists(pluginInstalled): raise Exception(f'Plugin upgrade failed: {error_msg}') # Wait for file system to sync import time time.sleep(3) # Verify plugin was upgraded if not os.path.exists(pluginInstalled): raise Exception(f'Plugin upgrade failed: {pluginInstalled} does not exist after upgrade') # Sync meta.xml from GitHub raw so version matches store (archive ZIP can be cached/stale) _sync_meta_xml_from_github(plugin_name, '/usr/local/CyberCP') new_version = _get_installed_version(plugin_name, '/usr/local/CyberCP') # If version unchanged, meta sync may have failed (e.g. network); retry once if new_version == installed_version: logging.writeToFile(f"Plugin {plugin_name}: version unchanged after first meta sync, retrying sync") _sync_meta_xml_from_github(plugin_name, '/usr/local/CyberCP') new_version = _get_installed_version(plugin_name, '/usr/local/CyberCP') if new_version == installed_version: logging.writeToFile(f"Plugin {plugin_name}: version still {installed_version} after upgrade; meta.xml may not have been updated from GitHub") logging.writeToFile(f"Plugin {plugin_name} upgraded successfully from {installed_version} to {new_version}") backup_message = '' if backup_path: backup_message = f' Backup created at: {backup_info.get("timestamp", "unknown")}' return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} upgraded successfully from {installed_version} to {new_version}.{backup_message}', 'backup_created': backup_path is not None, 'backup_path': backup_path if backup_path else None }) finally: # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) except urllib.error.HTTPError as e: error_msg = f'Failed to download plugin from GitHub: HTTP {e.code}' if e.code == 404: error_msg = f'Plugin {plugin_name} not found in GitHub repository' logging.writeToFile(f"Error upgrading {plugin_name}: {error_msg}") return JsonResponse({ 'success': False, 'error': error_msg }, status=500) except Exception as e: logging.writeToFile(f"Error upgrading plugin {plugin_name}: {str(e)}") import traceback error_details = traceback.format_exc() logging.writeToFile(f"Traceback: {error_details}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["GET"]) def get_plugin_backups(request, plugin_name): """Get list of available backups for a plugin""" mailUtilities.checkHome() try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) backups = _get_plugin_backups(plugin_name) return JsonResponse({ 'success': True, 'backups': backups, 'count': len(backups) }) except Exception as e: logging.writeToFile(f"Error getting backups for {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["POST"]) def revert_plugin(request, plugin_name): """Revert a plugin to a previous version from backup""" mailUtilities.checkHome() try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Get backup path from request data = json.loads(request.body) backup_path = data.get('backup_path') if not backup_path: return JsonResponse({ 'success': False, 'error': 'Backup path is required' }, status=400) # Verify backup exists if not os.path.exists(backup_path): return JsonResponse({ 'success': False, 'error': f'Backup not found: {backup_path}' }, status=404) # Get backup version info metadata_file = os.path.join(backup_path, '.backup_metadata.json') backup_version = 'unknown' if os.path.exists(metadata_file): try: with open(metadata_file, 'r') as f: backup_info = json.load(f) backup_version = backup_info.get('version', 'unknown') except: pass logging.writeToFile(f"Reverting {plugin_name} to version {backup_version} from backup {backup_path}") # Restore from backup if _restore_plugin_from_backup(plugin_name, backup_path): return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} reverted successfully to version {backup_version}' }) else: return JsonResponse({ 'success': False, 'error': 'Failed to restore plugin from backup' }, status=500) except json.JSONDecodeError: return JsonResponse({ 'success': False, 'error': 'Invalid JSON data' }, status=400) except Exception as e: logging.writeToFile(f"Error reverting plugin {plugin_name}: {str(e)}") import traceback error_details = traceback.format_exc() logging.writeToFile(f"Traceback: {error_details}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["POST"]) def install_from_store(request, plugin_name): """Install plugin from GitHub store, with fallback to local source""" mailUtilities.checkHome() try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if already installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if os.path.exists(pluginInstalled): return JsonResponse({ 'success': False, 'error': f'Plugin already installed: {plugin_name}' }, status=400) # Download plugin from GitHub import tempfile import shutil import zipfile import io logging.writeToFile(f"Starting installation of {plugin_name} from GitHub store") # Create temporary directory temp_dir = tempfile.mkdtemp() zip_path = os.path.join(temp_dir, plugin_name + '.zip') try: # Try to download from GitHub first use_local_fallback = False try: # Download repository as ZIP repo_zip_url = 'https://github.com/master3395/cyberpanel-plugins/archive/refs/heads/main.zip' logging.writeToFile(f"Downloading plugin from: {repo_zip_url}") repo_req = urllib.request.Request( repo_zip_url, headers={ 'User-Agent': 'CyberPanel-Plugin-Store/1.0', 'Accept': 'application/zip' } ) with urllib.request.urlopen(repo_req, timeout=30) as repo_response: repo_zip_data = repo_response.read() # Extract plugin directory from repository ZIP repo_zip = zipfile.ZipFile(io.BytesIO(repo_zip_data)) namelist = repo_zip.namelist() # Find plugin folder (supports flat repo or nested e.g. Category/pluginName) top_level, plugin_prefix = _find_plugin_prefix_in_archive(namelist, plugin_name) if not top_level: raise Exception('GitHub archive has no recognizable structure') if not plugin_prefix: repo_zip.close() logging.writeToFile(f"Plugin {plugin_name} not found in GitHub repository, trying local source") use_local_fallback = True else: plugin_files = [f for f in namelist if f.startswith(plugin_prefix)] if not plugin_files: repo_zip.close() logging.writeToFile(f"Plugin {plugin_name} not found in GitHub repository, trying local source") use_local_fallback = True else: logging.writeToFile(f"Found {len(plugin_files)} files for plugin {plugin_name} in GitHub") plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) for file_path in plugin_files: relative_path = file_path[len(plugin_prefix):] if relative_path: file_data = repo_zip.read(file_path) arcname = os.path.join(plugin_name, relative_path) plugin_zip.writestr(arcname, file_data) plugin_zip.close() repo_zip.close() except Exception as github_error: logging.writeToFile(f"GitHub download failed for {plugin_name}: {str(github_error)}, trying local source") use_local_fallback = True # Fallback to local source if GitHub download failed if use_local_fallback: pluginSource = _get_plugin_source_path(plugin_name) if not pluginSource: raise Exception(f'Plugin {plugin_name} not found in GitHub repository and local source not found (checked: {", ".join(PLUGIN_SOURCE_PATHS)})') logging.writeToFile(f"Using local source for {plugin_name} from {pluginSource}") # Create zip from local source directory with correct structure # The ZIP must contain plugin_name/ directory structure for proper extraction import zipfile plugin_zip = zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) # Walk through source directory and add files with plugin_name prefix for root, dirs, files in os.walk(pluginSource): for file in files: file_path = os.path.join(root, file) # Calculate relative path from plugin source arcname = os.path.relpath(file_path, pluginSource) # Add plugin_name prefix to maintain directory structure arcname = os.path.join(plugin_name, arcname) plugin_zip.write(file_path, arcname) plugin_zip.close() # Verify ZIP was created if not os.path.exists(zip_path): raise Exception(f'Failed to create plugin ZIP file') logging.writeToFile(f"Created plugin ZIP: {zip_path}") if not os.path.exists(zip_path): raise Exception(f'Zip file not found: {zip_path}') # Pass absolute path so extraction does not depend on cwd (installPlugin may change cwd) zip_path_abs = os.path.abspath(zip_path) logging.writeToFile(f"Installing plugin using pluginInstaller (zip={zip_path_abs})") # Install using pluginInstaller (zip_path kw when supported; else legacy) try: _install_plugin_compat(plugin_name, zip_path_abs) except Exception as install_error: # Log the full error for debugging error_msg = str(install_error) logging.writeToFile(f"pluginInstaller.installPlugin raised exception: {error_msg}") # Check if plugin directory exists despite the error pluginInstalled = '/usr/local/CyberCP/' + plugin_name if os.path.exists(pluginInstalled): logging.writeToFile(f"Plugin directory exists despite error, continuing...") else: raise Exception(f'Plugin installation failed: {error_msg}') # Wait a moment for file system to sync and service to restart import time time.sleep(3) # Increased wait time for file system sync # Verify plugin was actually installed pluginInstalled = '/usr/local/CyberCP/' + plugin_name if not os.path.exists(pluginInstalled): # Exclude README.md - main CyberPanel repo has it at root root_files = ['apps.py', 'meta.xml', 'urls.py', 'views.py'] found_root_files = [f for f in root_files if os.path.exists(os.path.join('/usr/local/CyberCP', f))] if found_root_files: raise Exception(f'Plugin installation failed: Files extracted to wrong location. Found {found_root_files} in /usr/local/CyberCP/ root instead of {pluginInstalled}/') raise Exception(f'Plugin installation failed: {pluginInstalled} does not exist after installation') # Sync meta.xml from GitHub raw so version matches store _sync_meta_xml_from_github(plugin_name, '/usr/local/CyberCP') logging.writeToFile(f"Plugin {plugin_name} installed successfully") # Set plugin to enabled by default after installation _set_plugin_state(plugin_name, True) _ensure_plugin_meta_xml(plugin_name) return JsonResponse({ 'success': True, 'message': f'Plugin {plugin_name} installed successfully from store' }) finally: # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) except urllib.error.HTTPError as e: error_msg = f'Failed to download plugin from GitHub: HTTP {e.code}' if e.code == 404: error_msg = f'Plugin {plugin_name} not found in GitHub repository' logging.writeToFile(f"Error installing {plugin_name}: {error_msg}") return JsonResponse({ 'success': False, 'error': error_msg }, status=500) except Exception as e: logging.writeToFile(f"Error installing plugin {plugin_name}: {str(e)}") import traceback error_details = traceback.format_exc() logging.writeToFile(f"Traceback: {error_details}") return JsonResponse({ 'success': False, 'error': str(e) }, status=500) @csrf_exempt @require_http_methods(["GET"]) def debug_loaded_plugins(request): """Return which plugins have URL routes loaded and which failed (for diagnosing 404s).""" try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) import pluginHolder.urls as urls_mod loaded = list(getattr(urls_mod, '_loaded_plugins', [])) failed = dict(getattr(urls_mod, '_failed_plugins', {})) return JsonResponse({ 'success': True, 'loaded': loaded, 'failed': failed, 'loaded_count': len(loaded), 'failed_count': len(failed), }, json_dumps_params={'indent': 2}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}, status=500) @require_http_methods(["GET", "POST"]) def plugin_settings_proxy(request, plugin_name): """ Proxy for /plugins//settings/ so plugin settings pages work even when the plugin was installed after the worker started (dynamic URL list is built at import time). """ mailUtilities.checkHome() if not user_can_manage_plugins(request): from django.http import HttpResponseForbidden return HttpResponseForbidden('You are not authorized to manage plugins.') plugin_path = '/usr/local/CyberCP/' + plugin_name urls_py = os.path.join(plugin_path, 'urls.py') if not plugin_name or not os.path.isdir(plugin_path) or not os.path.exists(urls_py): from django.http import HttpResponseNotFound return HttpResponseNotFound('Plugin not found or has no URL configuration.') if plugin_name in RESERVED_PLUGIN_DIRS or plugin_name in ( 'api', 'installed', 'help', 'emailMarketing', 'emailPremium', 'pluginHolder' ): from django.http import HttpResponseNotFound return HttpResponseNotFound('Invalid plugin.') try: import importlib views_mod = importlib.import_module(plugin_name + '.views') settings_view = getattr(views_mod, 'settings', None) if not callable(settings_view): from django.http import HttpResponseNotFound return HttpResponseNotFound('Plugin has no settings view.') return settings_view(request) except Exception as e: logging.writeToFile(f"plugin_settings_proxy for {plugin_name}: {str(e)}") from django.http import HttpResponseServerError return HttpResponseServerError(f'Plugin settings error: {str(e)}') def plugin_help(request, plugin_name): """Plugin-specific help page - shows plugin information, version history, and help content""" mailUtilities.checkHome() # Paths for the plugin plugin_path = '/usr/local/CyberCP/' + plugin_name meta_xml_path = os.path.join(plugin_path, 'meta.xml') # Check if plugin exists if not os.path.exists(plugin_path) or not os.path.exists(meta_xml_path): proc = httpProc(request, 'pluginHolder/plugin_not_found.html', { 'plugin_name': plugin_name }, 'managePlugins') return proc.render() # Parse meta.xml try: plugin_meta = ElementTree.parse(meta_xml_path) root = plugin_meta.getroot() # Extract plugin information plugin_display_name = root.find('name').text if root.find('name') is not None else plugin_name plugin_description = root.find('description').text if root.find('description') is not None else '' plugin_version = root.find('version').text if root.find('version') is not None else 'Unknown' plugin_author = root.find('author').text if root.find('author') is not None else 'Unknown' plugin_type = root.find('type').text if root.find('type') is not None else 'Plugin' # Check if plugin is installed installed = os.path.exists(plugin_path) except Exception as e: logging.writeToFile(f"Error parsing meta.xml for {plugin_name}: {str(e)}") proc = httpProc(request, 'pluginHolder/plugin_not_found.html', { 'plugin_name': plugin_name }, 'managePlugins') return proc.render() # Look for help content files (README.md, CHANGELOG.md, HELP.md, etc.) help_content = '' changelog_content = '' # Check for README.md or HELP.md help_files = ['HELP.md', 'README.md', 'docs/HELP.md', 'docs/README.md'] help_file_path = None for help_file in help_files: potential_path = os.path.join(plugin_path, help_file) if os.path.exists(potential_path): help_file_path = potential_path break if help_file_path: try: with open(help_file_path, 'r', encoding='utf-8') as f: help_content = f.read() except Exception as e: logging.writeToFile(f"Error reading help file for {plugin_name}: {str(e)}") help_content = '' # Check for CHANGELOG.md changelog_paths = ['CHANGELOG.md', 'changelog.md', 'CHANGELOG.txt', 'docs/CHANGELOG.md'] for changelog_file in changelog_paths: potential_path = os.path.join(plugin_path, changelog_file) if os.path.exists(potential_path): try: with open(potential_path, 'r', encoding='utf-8') as f: changelog_content = f.read() break except Exception as e: logging.writeToFile(f"Error reading changelog for {plugin_name}: {str(e)}") # If no local changelog, try fetching from GitHub (non-blocking) if not changelog_content: try: github_changelog_url = f'{GITHUB_RAW_BASE}/{plugin_name}/CHANGELOG.md' try: with urllib.request.urlopen(github_changelog_url, timeout=3) as response: if response.getcode() == 200: changelog_content = response.read().decode('utf-8') logging.writeToFile(f"Fetched CHANGELOG.md from GitHub for {plugin_name}") except (urllib.error.HTTPError, urllib.error.URLError, Exception): # Silently fail - GitHub fetch is optional pass except Exception: # Silently fail - GitHub fetch is optional pass # If no help content and no local README, try fetching README.md from GitHub if not help_content: try: github_readme_url = f'{GITHUB_RAW_BASE}/{plugin_name}/README.md' try: with urllib.request.urlopen(github_readme_url, timeout=3) as response: if response.getcode() == 200: help_content = response.read().decode('utf-8') logging.writeToFile(f"Fetched README.md from GitHub for {plugin_name}") except (urllib.error.HTTPError, urllib.error.URLError, Exception): # Silently fail - GitHub fetch is optional pass except Exception: # Silently fail - GitHub fetch is optional pass # If no help content found, create default content from meta.xml if not help_content: help_content = f"""

Plugin Information

Name: {plugin_display_name}

Type: {plugin_type}

Version: {plugin_version}

Author: {plugin_author}

Description

{plugin_description}

Usage

For detailed information about this plugin, please visit the GitHub repository or check the plugin's documentation.

""" else: # Convert markdown to HTML (basic conversion) import re # Convert linked images first (badges): [![alt](img_url)](link_url) help_content = re.sub( r'\[!\[([^\]]*)\]\(([^\)]+)\)\]\(([^\)]+)\)', r'\1', help_content ) # Convert regular images: ![alt](img_url) help_content = re.sub( r'!\[([^\]]*)\]\(([^\)]+)\)', r'\1', help_content ) # Convert regular links: [text](url) help_content = re.sub( r'\[([^\]]+)\]\(([^\)]+)\)', r'\1', help_content ) # Convert headings help_content = re.sub(r'^### (.*?)$', r'

\1

', help_content, flags=re.MULTILINE) help_content = re.sub(r'^## (.*?)$', r'

\1

', help_content, flags=re.MULTILINE) help_content = re.sub(r'^# (.*?)$', r'

\1

', help_content, flags=re.MULTILINE) # Convert formatting help_content = re.sub(r'\*\*(.*?)\*\*', r'\1', help_content) help_content = re.sub(r'\*(.*?)\*', r'\1', help_content) help_content = re.sub(r'`([^`]+)`', r'\1', help_content) # Convert lists help_content = re.sub(r'^\- (.*?)$', r'
  • \1
  • ', help_content, flags=re.MULTILINE) help_content = re.sub(r'^(\d+)\. (.*?)$', r'
  • \2
  • ', help_content, flags=re.MULTILINE) # Wrap paragraphs (but preserve HTML tags and images) lines = help_content.split('\n') processed_lines = [] for line in lines: line = line.strip() if line and not line.startswith('<') and not line.startswith('http') and not '{line}

    ') elif line: processed_lines.append(line) help_content = '\n'.join(processed_lines) # Add changelog if available if changelog_content: # Convert changelog markdown to HTML import re changelog_html = changelog_content changelog_html = re.sub(r'^## (.*?)$', r'

    \1

    ', changelog_html, flags=re.MULTILINE) changelog_html = re.sub(r'^### (.*?)$', r'

    \1

    ', changelog_html, flags=re.MULTILINE) changelog_html = re.sub(r'^\- (.*?)$', r'
  • \1
  • ', changelog_html, flags=re.MULTILINE) changelog_html = re.sub(r'\*\*(.*?)\*\*', r'\1', changelog_html) # Wrap in pre for code-like formatting changelog_html = f'

    Version History

    {changelog_html}
    ' help_content += changelog_html # Context for template context = { 'plugin_name': plugin_display_name, 'plugin_name_dir': plugin_name, 'plugin_description': plugin_description, 'plugin_version': plugin_version, 'plugin_author': plugin_author, 'plugin_type': plugin_type, 'installed': installed, 'help_content': help_content, } proc = httpProc(request, 'pluginHolder/plugin_help.html', context, 'managePlugins') return proc.render() @csrf_exempt @require_http_methods(["GET"]) def check_plugin_subscription(request, plugin_name): """ API endpoint to check if user has Patreon subscription for a paid plugin Args: request: Django request object plugin_name: Name of the plugin to check Returns: JsonResponse: { 'has_access': bool, 'is_paid': bool, 'message': str, 'patreon_url': str or None } """ try: if not user_can_manage_plugins(request): return deny_plugin_manage_json_response(request) # Check if user is authenticated if not request.user or not request.user.is_authenticated: return JsonResponse({ 'success': False, 'has_access': False, 'is_paid': False, 'message': 'Please log in to check subscription status', 'patreon_url': None }, status=401) # Load plugin metadata from .plugin_access import check_plugin_access, _load_plugin_meta plugin_meta = _load_plugin_meta(plugin_name) # Check access access_result = check_plugin_access(request, plugin_name, plugin_meta) return JsonResponse({ 'success': True, 'has_access': access_result['has_access'], 'is_paid': access_result['is_paid'], 'message': access_result['message'], 'patreon_url': access_result.get('patreon_url') }) except Exception as e: logging.writeToFile(f"Error checking subscription for {plugin_name}: {str(e)}") return JsonResponse({ 'success': False, 'has_access': False, 'is_paid': False, 'message': f'Error checking subscription: {str(e)}', 'patreon_url': None }, status=500)