mirror of
https://github.com/usmannasir/cyberpanel.git
synced 2026-01-13 02:52:05 +01:00
359 lines
14 KiB
Python
359 lines
14 KiB
Python
import os
|
|
import subprocess
|
|
import shutil
|
|
try:
|
|
import installLog as logging
|
|
except ImportError:
|
|
# Fallback if installLog is not available
|
|
class logging:
|
|
@staticmethod
|
|
def writeToFile(message):
|
|
print(message)
|
|
|
|
|
|
class PackageManager:
|
|
"""Handles package installation across different distributions"""
|
|
|
|
@staticmethod
|
|
def get_install_command(packages, distro, ubuntu=1, centos=0, cent8=2, openeuler=3):
|
|
"""Returns the appropriate install command for the distribution"""
|
|
if distro == ubuntu:
|
|
return f"DEBIAN_FRONTEND=noninteractive apt-get -y install {packages}"
|
|
elif distro == centos:
|
|
return f"yum install -y {packages}"
|
|
else: # cent8 or openeuler
|
|
return f"dnf install -y {packages}"
|
|
|
|
@staticmethod
|
|
def install_packages(packages, distro, ubuntu=1, centos=0, cent8=2, openeuler=3, use_shell=False):
|
|
"""Install packages with proper error handling"""
|
|
command = PackageManager.get_install_command(packages, distro, ubuntu, centos, cent8, openeuler)
|
|
# Use subprocess directly to avoid circular import
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logging.writeToFile(f'[ERROR] Package installation failed: {command}')
|
|
raise Exception(f'Package installation failed: {result.stderr}')
|
|
return result.returncode
|
|
|
|
|
|
class ServiceManager:
|
|
"""Handles service operations across different distributions"""
|
|
|
|
@staticmethod
|
|
def manage_service(service_name, action, distro, exit_on_error=True):
|
|
"""Manage systemd services (start, stop, enable, disable, restart)"""
|
|
command = f"systemctl {action} {service_name}"
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
if result.returncode != 0 and exit_on_error:
|
|
logging.writeToFile(f'[ERROR] Service management failed: {command}')
|
|
raise Exception(f'Service management failed: {result.stderr}')
|
|
return result.returncode
|
|
|
|
@staticmethod
|
|
def start_service(service_name, distro):
|
|
"""Start a service"""
|
|
ServiceManager.manage_service(service_name, "start", distro)
|
|
|
|
@staticmethod
|
|
def stop_service(service_name, distro, exit_on_error=False):
|
|
"""Stop a service"""
|
|
ServiceManager.manage_service(service_name, "stop", distro, exit_on_error)
|
|
|
|
@staticmethod
|
|
def enable_service(service_name, distro):
|
|
"""Enable a service at startup"""
|
|
ServiceManager.manage_service(service_name, "enable", distro)
|
|
|
|
@staticmethod
|
|
def disable_service(service_name, distro, exit_on_error=False):
|
|
"""Disable a service at startup"""
|
|
ServiceManager.manage_service(service_name, "disable", distro, exit_on_error)
|
|
|
|
@staticmethod
|
|
def restart_service(service_name, distro):
|
|
"""Restart a service"""
|
|
ServiceManager.manage_service(service_name, "restart", distro)
|
|
|
|
|
|
class ConfigFileHandler:
|
|
"""Handles configuration file operations"""
|
|
|
|
@staticmethod
|
|
def replace_in_file(file_path, replacements):
|
|
"""
|
|
Replace multiple patterns in a file
|
|
replacements: list of tuples (search_pattern, replacement)
|
|
"""
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
data = f.readlines()
|
|
|
|
with open(file_path, 'w') as f:
|
|
for line in data:
|
|
modified_line = line
|
|
for search_pattern, replacement in replacements:
|
|
if search_pattern in line:
|
|
modified_line = replacement + '\n' if not replacement.endswith('\n') else replacement
|
|
break
|
|
f.write(modified_line)
|
|
return True
|
|
except IOError as e:
|
|
logging.InstallLog.writeToFile(f'[ERROR] {str(e)} [ConfigFileHandler.replace_in_file]')
|
|
return False
|
|
|
|
@staticmethod
|
|
def sed_replace(file_path, search, replace, distro):
|
|
"""Use sed command for replacements"""
|
|
command = f"sed -i 's|{search}|{replace}|g' {file_path}"
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logging.writeToFile(f'[ERROR] sed command failed: {command}')
|
|
raise Exception(f'sed command failed: {result.stderr}')
|
|
return result.returncode
|
|
|
|
@staticmethod
|
|
def copy_config_file(source, destination, remove_existing=True):
|
|
"""Copy configuration file with optional removal of existing file"""
|
|
try:
|
|
if remove_existing and os.path.exists(destination):
|
|
os.remove(destination)
|
|
shutil.copy(source, destination)
|
|
return True
|
|
except Exception as e:
|
|
logging.InstallLog.writeToFile(f'[ERROR] {str(e)} [ConfigFileHandler.copy_config_file]')
|
|
return False
|
|
|
|
|
|
class CommandExecutor:
|
|
"""Centralized command execution with consistent error handling"""
|
|
|
|
@staticmethod
|
|
def execute(command, distro, description=None, exit_on_error=True, exit_code=os.EX_OSERR):
|
|
"""Execute a command with standard error handling"""
|
|
if description is None:
|
|
description = command
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
if exit_on_error:
|
|
logging.writeToFile(f'[ERROR] Command failed: {command}')
|
|
raise Exception(f'Command failed: {result.stderr}')
|
|
else:
|
|
logging.writeToFile(f'[WARNING] Command failed but continuing: {command}')
|
|
return result.returncode
|
|
|
|
@staticmethod
|
|
def execute_with_output(command, shell=True):
|
|
"""Execute command and return output"""
|
|
try:
|
|
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=shell)
|
|
except:
|
|
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
universal_newlines=True, shell=shell)
|
|
return result
|
|
|
|
|
|
class PHPInstaller:
|
|
"""Handles PHP installation for different versions and distributions"""
|
|
|
|
@staticmethod
|
|
def install_php_ubuntu(versions=None):
|
|
"""Install PHP versions on Ubuntu"""
|
|
if versions is None:
|
|
# Include PHP 8.4 for Ubuntu 24.04
|
|
versions = ['7?', '80', '81', '82', '83', '84']
|
|
|
|
for version in versions:
|
|
if version == '7?':
|
|
command = 'DEBIAN_FRONTEND=noninteractive apt-get -y install ' \
|
|
'lsphp7? lsphp7?-common lsphp7?-curl lsphp7?-dev lsphp7?-imap lsphp7?-intl lsphp7?-json ' \
|
|
'lsphp7?-ldap lsphp7?-mysql lsphp7?-opcache lsphp7?-pspell lsphp7?-recode ' \
|
|
'lsphp7?-sqlite3 lsphp7?-tidy'
|
|
else:
|
|
command = f'DEBIAN_FRONTEND=noninteractive apt-get -y install lsphp{version}*'
|
|
os.system(command)
|
|
|
|
@staticmethod
|
|
def install_php_centos(distro, centos=0, cent8=2, openeuler=3):
|
|
"""Install PHP versions on CentOS/RHEL based systems"""
|
|
if distro == centos:
|
|
# First install the group
|
|
command = 'yum -y groupinstall lsphp-all'
|
|
subprocess.run(command, shell=True, check=True)
|
|
|
|
# Then install individual versions
|
|
versions = ['71', '72', '73', '74', '80', '81', '82', '83']
|
|
for version in versions:
|
|
command = f'yum install -y lsphp{version}* --skip-broken'
|
|
subprocess.call(command, shell=True)
|
|
|
|
elif distro == cent8:
|
|
command = 'dnf install lsphp71* lsphp72* lsphp73* lsphp74* lsphp80* --exclude lsphp73-pecl-zip --exclude *imagick* -y --skip-broken'
|
|
subprocess.call(command, shell=True)
|
|
|
|
command = 'dnf install lsphp81* lsphp82* lsphp83* --exclude *imagick* -y --skip-broken'
|
|
subprocess.call(command, shell=True)
|
|
|
|
elif distro == openeuler:
|
|
command = 'dnf install lsphp71* lsphp72* lsphp73* lsphp74* lsphp80* lsphp81* lsphp82* lsphp83* -y'
|
|
subprocess.call(command, shell=True)
|
|
|
|
|
|
class FileSystemHelper:
|
|
"""Helper for file system operations"""
|
|
|
|
@staticmethod
|
|
def create_directory(path, exit_on_error=False):
|
|
"""Create directory with error handling"""
|
|
try:
|
|
os.mkdir(path)
|
|
return True
|
|
except OSError as e:
|
|
if exit_on_error:
|
|
raise
|
|
logging.InstallLog.writeToFile(f"[ERROR] Could not create directory {path}: {str(e)}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def create_user_and_group(user, group, uid, gid, shell="/bin/false", home="/bin/null", distro=None):
|
|
"""Create system user and group"""
|
|
# Create group
|
|
command = f'groupadd -g {gid} {group}'
|
|
CommandExecutor.execute(command, distro, exit_on_error=True)
|
|
|
|
# Create user
|
|
command = f'useradd -u {uid} -s {shell} -d {home} -c "{user} user" -g {group} {user}'
|
|
CommandExecutor.execute(command, distro, exit_on_error=True)
|
|
|
|
|
|
class DistroHandler:
|
|
"""Handles distribution-specific operations"""
|
|
|
|
# Distribution constants
|
|
CENTOS = 0
|
|
UBUNTU = 1
|
|
CENT8 = 2
|
|
OPENEULER = 3
|
|
|
|
@staticmethod
|
|
def is_rhel_based(distro):
|
|
"""Check if distribution is RHEL-based"""
|
|
return distro in [DistroHandler.CENTOS, DistroHandler.CENT8, DistroHandler.OPENEULER]
|
|
|
|
@staticmethod
|
|
def is_debian_based(distro):
|
|
"""Check if distribution is Debian-based"""
|
|
return distro == DistroHandler.UBUNTU
|
|
|
|
@staticmethod
|
|
def get_package_manager(distro):
|
|
"""Get the package manager for the distribution"""
|
|
if distro == DistroHandler.UBUNTU:
|
|
return "apt-get"
|
|
elif distro == DistroHandler.CENTOS:
|
|
return "yum"
|
|
else: # CENT8, OPENEULER
|
|
return "dnf"
|
|
|
|
@staticmethod
|
|
def get_service_name(base_service, distro):
|
|
"""Get distribution-specific service name"""
|
|
# Add logic for service name mapping if needed
|
|
service_map = {
|
|
'mysql': {
|
|
DistroHandler.UBUNTU: 'mysql',
|
|
DistroHandler.CENTOS: 'mariadb',
|
|
DistroHandler.CENT8: 'mariadb',
|
|
DistroHandler.OPENEULER: 'mariadb'
|
|
}
|
|
}
|
|
|
|
if base_service in service_map and distro in service_map[base_service]:
|
|
return service_map[base_service][distro]
|
|
return base_service
|
|
|
|
|
|
class PermissionManager:
|
|
"""Handles file permissions and ownership"""
|
|
|
|
@staticmethod
|
|
def set_permissions(path, mode, distro, owner=None, group=None):
|
|
"""Set file permissions and optionally ownership"""
|
|
# Set permissions
|
|
CommandExecutor.execute(f'chmod {mode} {path}', distro)
|
|
|
|
# Set ownership if specified
|
|
if owner or group:
|
|
owner_str = owner if owner else ""
|
|
group_str = group if group else ""
|
|
if owner and group:
|
|
ownership = f"{owner}:{group}"
|
|
elif owner:
|
|
ownership = owner
|
|
else:
|
|
ownership = f":{group}"
|
|
|
|
CommandExecutor.execute(f'chown {ownership} {path}', distro)
|
|
|
|
@staticmethod
|
|
def batch_set_permissions(permission_list, distro):
|
|
"""Apply multiple permission changes efficiently"""
|
|
for perm in permission_list:
|
|
PermissionManager.set_permissions(
|
|
perm['path'],
|
|
perm['mode'],
|
|
distro,
|
|
perm.get('owner'),
|
|
perm.get('group')
|
|
)
|
|
|
|
|
|
class DownloadHelper:
|
|
"""Helper for downloading and extracting files"""
|
|
|
|
@staticmethod
|
|
def download_file(url, destination, distro):
|
|
"""Download a file using wget"""
|
|
command = f'wget -O {destination} {url}' if destination else f'wget {url}'
|
|
CommandExecutor.execute(command, distro)
|
|
|
|
@staticmethod
|
|
def extract_archive(archive_path, destination, distro, archive_type='tar'):
|
|
"""Extract an archive file"""
|
|
if archive_type == 'tar':
|
|
command = f'tar -xf {archive_path} -C {destination}'
|
|
elif archive_type == 'tar.gz':
|
|
command = f'tar -xzf {archive_path} -C {destination}'
|
|
elif archive_type == 'zip':
|
|
command = f'unzip {archive_path} -d {destination}'
|
|
else:
|
|
raise ValueError(f"Unsupported archive type: {archive_type}")
|
|
|
|
CommandExecutor.execute(command, distro)
|
|
|
|
@staticmethod
|
|
def download_and_extract(url, extract_dir, distro, archive_type='tar.gz'):
|
|
"""Download and extract in one operation"""
|
|
filename = os.path.basename(url)
|
|
DownloadHelper.download_file(url, filename, distro)
|
|
DownloadHelper.extract_archive(filename, extract_dir, distro, archive_type)
|
|
return filename
|
|
|
|
|
|
class SSLCertificateHelper:
|
|
"""Helper for SSL certificate operations"""
|
|
|
|
@staticmethod
|
|
def generate_self_signed_cert(cert_path, key_path, distro, days=3650,
|
|
subject="/C=US/ST=State/L=City/O=Org/CN=localhost"):
|
|
"""Generate a self-signed SSL certificate"""
|
|
command = (f'openssl req -x509 -nodes -days {days} -newkey rsa:2048 '
|
|
f'-subj "{subject}" -keyout {key_path} -out {cert_path}')
|
|
CommandExecutor.execute(command, distro)
|
|
|
|
@staticmethod
|
|
def generate_csr(key_path, csr_path, distro,
|
|
subject="/C=US/ST=State/L=City/O=Org/CN=localhost"):
|
|
"""Generate a certificate signing request"""
|
|
command = (f'openssl req -new -key {key_path} -out {csr_path} '
|
|
f'-subj "{subject}"')
|
|
CommandExecutor.execute(command, distro) |