Files
CyberPanel/plogical/pluginMigrationSQL.py
master3395 db24409d0d Plugin UI: Premium filter, URL hash sync; installer DB SQL fallback; loginSystem migrations.
- plugins.html: Premium show filter, #grid?show=&sort=&cat=&q= hash restore, cache 28.03.2026-v3.
- pluginInstaller + plogical/pluginMigrationSQL: migrate fallback via sqlmigrate/mariadb and DROP cleanup on remove.
- loginSystem: initial migration (SeparateDatabaseAndState) for graph compatibility.
- README: Updated 28.03.2026.
2026-03-28 01:21:37 +01:00

210 lines
7.0 KiB
Python

# -*- coding: utf-8 -*-
"""
Plugin install/remove: robust DB migration and teardown for CyberPanel.
- Prefer Django ``migrate`` / ``migrate zero``.
- If the global loader fails or migrate errors, apply ``sqlmigrate`` output and
``--fake``, or drop plugin tables and clean ``django_migrations`` on removal.
Assumes ``manage.py`` / ``CyberCP.settings`` are available under /usr/local/CyberCP.
"""
from __future__ import annotations
import os
import subprocess
def _manage_py() -> str:
return '/usr/local/CyberCP/manage.py'
def _python_executable() -> str:
for candidate in ('/usr/local/CyberCP/bin/python', '/usr/local/CyberCP/bin/python3'):
try:
if os.path.isfile(candidate) and os.access(candidate, os.X_OK):
return candidate
except OSError:
continue
return 'python3'
def _django_setup():
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CyberCP.settings')
import django
django.setup()
def _db_name() -> str:
_django_setup()
from django.conf import settings
return settings.DATABASES['default']['NAME']
def run_manage(args: list, cwd: str | None = None) -> subprocess.CompletedProcess:
"""Run manage.py with args (e.g. ['migrate', 'contaboAutoSnapshot', '--noinput'])."""
cmd = [_python_executable(), _manage_py()] + args
return subprocess.run(
cmd,
cwd=cwd or '/usr/local/CyberCP',
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
timeout=600,
)
def list_plugin_migration_names(plugin_name: str) -> list[str]:
"""Sorted migration names from disk (e.g. 0001_initial), excluding __init__."""
mig_dir = '/usr/local/CyberCP/' + plugin_name + '/migrations'
if not os.path.isdir(mig_dir):
return []
names = []
try:
for fn in os.listdir(mig_dir):
if not fn.endswith('.py') or fn == '__init__.py':
continue
names.append(fn[:-3])
except OSError:
return []
return sorted(names)
def applied_plugin_migrations(plugin_name: str) -> set[str]:
_django_setup()
from django.db import connection
from django.db.migrations.recorder import MigrationRecorder
recorder = MigrationRecorder(connection)
return {name for app, name in recorder.applied_migrations() if app == plugin_name}
def sqlmigrate_text(plugin_name: str, migration_name: str) -> tuple[int, str]:
"""Return (returncode, combined stdout+stderr) for sqlmigrate."""
proc = run_manage(['sqlmigrate', plugin_name, migration_name])
out = (proc.stdout or '') + (proc.stderr or '')
return proc.returncode, out
def _strip_sql_comments(sql: str) -> str:
lines = []
for line in sql.splitlines():
s = line.strip()
if s.startswith('--'):
continue
lines.append(line)
return '\n'.join(lines)
def _execute_multi_sql_mysql(sql: str) -> tuple[bool, str]:
"""Execute multi-statement ``sqlmigrate`` output via ``mariadb`` CLI (socket auth as root)."""
sql = _strip_sql_comments(sql).strip()
if not sql:
return True, ''
db = _db_name()
try:
proc = subprocess.run(
['mariadb', db],
input=sql.encode('utf-8', errors='replace'),
capture_output=True,
timeout=300,
)
err = (proc.stderr or b'').decode('utf-8', errors='replace')[:2000]
if proc.returncode != 0:
return False, err or 'mariadb exited %s' % proc.returncode
return True, ''
except FileNotFoundError:
return False, 'mariadb client not found; install MariaDB client or fix PATH'
except Exception as e:
return False, str(e)[:2000]
def apply_pending_migrations_via_sql_and_fake(plugin_name: str, log) -> bool:
"""
For each on-disk migration not in django_migrations: sqlmigrate + execute, then
``migrate <app> <name> --fake`` so each is recorded.
"""
pending = [m for m in list_plugin_migration_names(plugin_name) if m not in applied_plugin_migrations(plugin_name)]
if not pending:
log(
'SQL fallback: no pending migrations recorded for %s — schema/DB mismatch may remain; '
'check migrate errors above.'
% plugin_name
)
return False
for mig in pending:
rc, out = sqlmigrate_text(plugin_name, mig)
if rc != 0:
log('sqlmigrate %s %s failed (rc=%s): %s' % (plugin_name, mig, rc, out[:800]))
return False
ok, err = _execute_multi_sql_mysql(out)
if not ok:
log('SQL exec failed for %s %s: %s' % (plugin_name, mig, err))
return False
log('Applied raw SQL for %s %s' % (plugin_name, mig))
proc = run_manage(['migrate', plugin_name, mig, '--fake', '--noinput'])
if proc.returncode != 0:
log(
'migrate --fake %s %s failed: %s'
% (plugin_name, mig, (proc.stderr or proc.stdout or '')[:800])
)
return False
return True
def drop_plugin_tables_and_migration_rows(plugin_name: str, log) -> bool:
"""
DROP all tables for this app label (prefix ``pluginName_``) and remove
django_migrations rows. Uses FOREIGN_KEY_CHECKS=0.
"""
_django_setup()
from django.db import connection
db = _db_name()
prefix_a = plugin_name + '_'
prefix_b = plugin_name.lower() + '_'
tables = []
try:
with connection.cursor() as c:
c.execute(
'SELECT table_name FROM information_schema.tables WHERE table_schema = %s '
'AND (table_name LIKE %s OR table_name LIKE %s)',
[db, prefix_a + '%', prefix_b + '%'],
)
tables = [row[0] for row in c.fetchall()]
except Exception as e:
log('list tables for drop failed: %s' % str(e)[:800])
return False
if not tables:
log('No tables matched prefix for %s; cleaning django_migrations only.' % plugin_name)
try:
with connection.cursor() as c:
c.execute('SET FOREIGN_KEY_CHECKS=0')
for t in tables:
safe = t.replace('`', '``')
c.execute('DROP TABLE IF EXISTS `%s`' % safe)
c.execute('SET FOREIGN_KEY_CHECKS=1')
c.execute('DELETE FROM django_migrations WHERE app = %s', [plugin_name])
log('Dropped %s table(s) for %s and removed django_migrations rows.' % (len(tables), plugin_name))
return True
except Exception as e:
log('drop_plugin_tables failed: %s' % str(e)[:800])
return False
def ensure_login_system_migrations_applied(log) -> None:
"""
Apply loginSystem 0001 if present (graph fix). Safe no-op if already applied.
"""
mig_path = '/usr/local/CyberCP/loginSystem/migrations/0001_initial.py'
if not os.path.isfile(mig_path):
return
proc = run_manage(['migrate', 'loginSystem', '--noinput'])
if proc.returncode != 0:
log(
'migrate loginSystem exited %s (panels may need manual fix): %s'
% (proc.returncode, (proc.stderr or proc.stdout or '')[:600])
)