Files
CyberPanel/install/installHelpers.py
2025-06-28 17:33:05 +05:00

359 lines
14 KiB
Python

import os
import subprocess
import shutil
try:
import installLog as logging
except ImportError:
# Fallback if installLog is not available
class logging:
@staticmethod
def writeToFile(message):
print(message)
class PackageManager:
"""Handles package installation across different distributions"""
@staticmethod
def get_install_command(packages, distro, ubuntu=1, centos=0, cent8=2, openeuler=3):
"""Returns the appropriate install command for the distribution"""
if distro == ubuntu:
return f"DEBIAN_FRONTEND=noninteractive apt-get -y install {packages}"
elif distro == centos:
return f"yum install -y {packages}"
else: # cent8 or openeuler
return f"dnf install -y {packages}"
@staticmethod
def install_packages(packages, distro, ubuntu=1, centos=0, cent8=2, openeuler=3, use_shell=False):
"""Install packages with proper error handling"""
command = PackageManager.get_install_command(packages, distro, ubuntu, centos, cent8, openeuler)
# Use subprocess directly to avoid circular import
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logging.writeToFile(f'[ERROR] Package installation failed: {command}')
raise Exception(f'Package installation failed: {result.stderr}')
return result.returncode
class ServiceManager:
"""Handles service operations across different distributions"""
@staticmethod
def manage_service(service_name, action, distro, exit_on_error=True):
"""Manage systemd services (start, stop, enable, disable, restart)"""
command = f"systemctl {action} {service_name}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0 and exit_on_error:
logging.writeToFile(f'[ERROR] Service management failed: {command}')
raise Exception(f'Service management failed: {result.stderr}')
return result.returncode
@staticmethod
def start_service(service_name, distro):
"""Start a service"""
ServiceManager.manage_service(service_name, "start", distro)
@staticmethod
def stop_service(service_name, distro, exit_on_error=False):
"""Stop a service"""
ServiceManager.manage_service(service_name, "stop", distro, exit_on_error)
@staticmethod
def enable_service(service_name, distro):
"""Enable a service at startup"""
ServiceManager.manage_service(service_name, "enable", distro)
@staticmethod
def disable_service(service_name, distro, exit_on_error=False):
"""Disable a service at startup"""
ServiceManager.manage_service(service_name, "disable", distro, exit_on_error)
@staticmethod
def restart_service(service_name, distro):
"""Restart a service"""
ServiceManager.manage_service(service_name, "restart", distro)
class ConfigFileHandler:
"""Handles configuration file operations"""
@staticmethod
def replace_in_file(file_path, replacements):
"""
Replace multiple patterns in a file
replacements: list of tuples (search_pattern, replacement)
"""
try:
with open(file_path, 'r') as f:
data = f.readlines()
with open(file_path, 'w') as f:
for line in data:
modified_line = line
for search_pattern, replacement in replacements:
if search_pattern in line:
modified_line = replacement + '\n' if not replacement.endswith('\n') else replacement
break
f.write(modified_line)
return True
except IOError as e:
logging.InstallLog.writeToFile(f'[ERROR] {str(e)} [ConfigFileHandler.replace_in_file]')
return False
@staticmethod
def sed_replace(file_path, search, replace, distro):
"""Use sed command for replacements"""
command = f"sed -i 's|{search}|{replace}|g' {file_path}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
logging.writeToFile(f'[ERROR] sed command failed: {command}')
raise Exception(f'sed command failed: {result.stderr}')
return result.returncode
@staticmethod
def copy_config_file(source, destination, remove_existing=True):
"""Copy configuration file with optional removal of existing file"""
try:
if remove_existing and os.path.exists(destination):
os.remove(destination)
shutil.copy(source, destination)
return True
except Exception as e:
logging.InstallLog.writeToFile(f'[ERROR] {str(e)} [ConfigFileHandler.copy_config_file]')
return False
class CommandExecutor:
"""Centralized command execution with consistent error handling"""
@staticmethod
def execute(command, distro, description=None, exit_on_error=True, exit_code=os.EX_OSERR):
"""Execute a command with standard error handling"""
if description is None:
description = command
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode != 0:
if exit_on_error:
logging.writeToFile(f'[ERROR] Command failed: {command}')
raise Exception(f'Command failed: {result.stderr}')
else:
logging.writeToFile(f'[WARNING] Command failed but continuing: {command}')
return result.returncode
@staticmethod
def execute_with_output(command, shell=True):
"""Execute command and return output"""
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=shell)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=shell)
return result
class PHPInstaller:
"""Handles PHP installation for different versions and distributions"""
@staticmethod
def install_php_ubuntu(versions=None):
"""Install PHP versions on Ubuntu"""
if versions is None:
# Include PHP 8.4 for Ubuntu 24.04
versions = ['7?', '80', '81', '82', '83', '84']
for version in versions:
if version == '7?':
command = 'DEBIAN_FRONTEND=noninteractive apt-get -y install ' \
'lsphp7? lsphp7?-common lsphp7?-curl lsphp7?-dev lsphp7?-imap lsphp7?-intl lsphp7?-json ' \
'lsphp7?-ldap lsphp7?-mysql lsphp7?-opcache lsphp7?-pspell lsphp7?-recode ' \
'lsphp7?-sqlite3 lsphp7?-tidy'
else:
command = f'DEBIAN_FRONTEND=noninteractive apt-get -y install lsphp{version}*'
os.system(command)
@staticmethod
def install_php_centos(distro, centos=0, cent8=2, openeuler=3):
"""Install PHP versions on CentOS/RHEL based systems"""
if distro == centos:
# First install the group
command = 'yum -y groupinstall lsphp-all'
subprocess.run(command, shell=True, check=True)
# Then install individual versions
versions = ['71', '72', '73', '74', '80', '81', '82', '83']
for version in versions:
command = f'yum install -y lsphp{version}* --skip-broken'
subprocess.call(command, shell=True)
elif distro == cent8:
command = 'dnf install lsphp71* lsphp72* lsphp73* lsphp74* lsphp80* --exclude lsphp73-pecl-zip --exclude *imagick* -y --skip-broken'
subprocess.call(command, shell=True)
command = 'dnf install lsphp81* lsphp82* lsphp83* --exclude *imagick* -y --skip-broken'
subprocess.call(command, shell=True)
elif distro == openeuler:
command = 'dnf install lsphp71* lsphp72* lsphp73* lsphp74* lsphp80* lsphp81* lsphp82* lsphp83* -y'
subprocess.call(command, shell=True)
class FileSystemHelper:
"""Helper for file system operations"""
@staticmethod
def create_directory(path, exit_on_error=False):
"""Create directory with error handling"""
try:
os.mkdir(path)
return True
except OSError as e:
if exit_on_error:
raise
logging.InstallLog.writeToFile(f"[ERROR] Could not create directory {path}: {str(e)}")
return False
@staticmethod
def create_user_and_group(user, group, uid, gid, shell="/bin/false", home="/bin/null", distro=None):
"""Create system user and group"""
# Create group
command = f'groupadd -g {gid} {group}'
CommandExecutor.execute(command, distro, exit_on_error=True)
# Create user
command = f'useradd -u {uid} -s {shell} -d {home} -c "{user} user" -g {group} {user}'
CommandExecutor.execute(command, distro, exit_on_error=True)
class DistroHandler:
"""Handles distribution-specific operations"""
# Distribution constants
CENTOS = 0
UBUNTU = 1
CENT8 = 2
OPENEULER = 3
@staticmethod
def is_rhel_based(distro):
"""Check if distribution is RHEL-based"""
return distro in [DistroHandler.CENTOS, DistroHandler.CENT8, DistroHandler.OPENEULER]
@staticmethod
def is_debian_based(distro):
"""Check if distribution is Debian-based"""
return distro == DistroHandler.UBUNTU
@staticmethod
def get_package_manager(distro):
"""Get the package manager for the distribution"""
if distro == DistroHandler.UBUNTU:
return "apt-get"
elif distro == DistroHandler.CENTOS:
return "yum"
else: # CENT8, OPENEULER
return "dnf"
@staticmethod
def get_service_name(base_service, distro):
"""Get distribution-specific service name"""
# Add logic for service name mapping if needed
service_map = {
'mysql': {
DistroHandler.UBUNTU: 'mysql',
DistroHandler.CENTOS: 'mariadb',
DistroHandler.CENT8: 'mariadb',
DistroHandler.OPENEULER: 'mariadb'
}
}
if base_service in service_map and distro in service_map[base_service]:
return service_map[base_service][distro]
return base_service
class PermissionManager:
"""Handles file permissions and ownership"""
@staticmethod
def set_permissions(path, mode, distro, owner=None, group=None):
"""Set file permissions and optionally ownership"""
# Set permissions
CommandExecutor.execute(f'chmod {mode} {path}', distro)
# Set ownership if specified
if owner or group:
owner_str = owner if owner else ""
group_str = group if group else ""
if owner and group:
ownership = f"{owner}:{group}"
elif owner:
ownership = owner
else:
ownership = f":{group}"
CommandExecutor.execute(f'chown {ownership} {path}', distro)
@staticmethod
def batch_set_permissions(permission_list, distro):
"""Apply multiple permission changes efficiently"""
for perm in permission_list:
PermissionManager.set_permissions(
perm['path'],
perm['mode'],
distro,
perm.get('owner'),
perm.get('group')
)
class DownloadHelper:
"""Helper for downloading and extracting files"""
@staticmethod
def download_file(url, destination, distro):
"""Download a file using wget"""
command = f'wget -O {destination} {url}' if destination else f'wget {url}'
CommandExecutor.execute(command, distro)
@staticmethod
def extract_archive(archive_path, destination, distro, archive_type='tar'):
"""Extract an archive file"""
if archive_type == 'tar':
command = f'tar -xf {archive_path} -C {destination}'
elif archive_type == 'tar.gz':
command = f'tar -xzf {archive_path} -C {destination}'
elif archive_type == 'zip':
command = f'unzip {archive_path} -d {destination}'
else:
raise ValueError(f"Unsupported archive type: {archive_type}")
CommandExecutor.execute(command, distro)
@staticmethod
def download_and_extract(url, extract_dir, distro, archive_type='tar.gz'):
"""Download and extract in one operation"""
filename = os.path.basename(url)
DownloadHelper.download_file(url, filename, distro)
DownloadHelper.extract_archive(filename, extract_dir, distro, archive_type)
return filename
class SSLCertificateHelper:
"""Helper for SSL certificate operations"""
@staticmethod
def generate_self_signed_cert(cert_path, key_path, distro, days=3650,
subject="/C=US/ST=State/L=City/O=Org/CN=localhost"):
"""Generate a self-signed SSL certificate"""
command = (f'openssl req -x509 -nodes -days {days} -newkey rsa:2048 '
f'-subj "{subject}" -keyout {key_path} -out {cert_path}')
CommandExecutor.execute(command, distro)
@staticmethod
def generate_csr(key_path, csr_path, distro,
subject="/C=US/ST=State/L=City/O=Org/CN=localhost"):
"""Generate a certificate signing request"""
command = (f'openssl req -new -key {key_path} -out {csr_path} '
f'-subj "{subject}"')
CommandExecutor.execute(command, distro)