add online download iamge pli, atv, obh, vix

This commit is contained in:
gutosie
2025-12-09 16:23:44 +02:00
committed by GitHub
parent 5982fd29b7
commit 07fac9a188

View File

@@ -1,24 +1,51 @@
try:
from boxbranding import getBoxType, getImageType, getImageDistro, getImageVersion, getImageBuild, getImageDevBuild, getImageFolder, getImageFileSystem, getBrandOEM, getMachineBrand, getMachineName, getMachineBuild, getMachineMake, getMachineMtdRoot, getMachineRootFile, getMachineMtdKernel, getMachineKernelFile, getMachineMKUBIFS, getMachineUBINIZE
except :
pass
from os import path, stat, system, mkdir, makedirs, listdir, remove, rename, rmdir, sep as ossep, statvfs, chmod, walk, symlink, unlink
from shutil import copy, copyfile, move, rmtree
from time import localtime, time, strftime, mktime
from . import _, PluginLanguageDomain
from Components.Button import Button
from Components.Harddisk import harddiskmanager, getProcMounts
from Components.MenuList import MenuList
import Components.Task
from Screens.Standby import TryQuitMainloop
from Screens.Setup import Setup
from Screens.TaskView import JobView
from Screens.TextBox import TextBox
try:
import Tools.CopyFiles
except :
pass
from Screens.ChoiceBox import ChoiceBox
from Screens.Screen import Screen
from Screens.MessageBox import MessageBox
from Screens.Standby import getReasons
try:
from Screens.Standby import getReasons
except :
pass
from Components.Sources.StaticText import StaticText
from Components.ChoiceList import ChoiceList, ChoiceEntryComponent
from Components.config import config, configfile
from Components.config import config, ConfigSubsection, ConfigYesNo, ConfigSelection, ConfigText, ConfigNumber, NoSave, ConfigClock, configfile
from Components.ActionMap import ActionMap
from Components.Console import Console
from Components.Label import Label
from Components.Pixmap import Pixmap
from Components.ProgressBar import ProgressBar
from Components.SystemInfo import BoxInfo
from Components.SystemInfo import BoxInfo, SystemInfo
from Tools.BoundFunction import boundFunction
from Tools.Directories import resolveFilename, SCOPE_PLUGINS, fileExists, pathExists, fileHas
from Tools.Downloader import downloadWithProgress
from Tools.HardwareInfo import HardwareInfo
from Tools.Multiboot import getImagelist, getCurrentImage, getCurrentImageMode, deleteImage, restoreImages
try:
from Tools.Multiboot import getImagelist, getCurrentImage, getCurrentImageMode, deleteImage, restoreImages
except :
pass
import os
import re
from urllib.request import urlopen, Request
from urllib.parse import urlparse
import xml.etree.ElementTree
import json
import time
@@ -27,13 +54,477 @@ import shutil
import tempfile
import struct
from enigma import eEPGCache, eEnv
from enigma import eEPGCache, eEnv, eTimer, fbClass
try:
from Tools.HardwareInfo import HardwareInfo
except :
from Plugins.Extensions.NeoBoot.files.tools import HardwareInfo
try:
from Tools.Multiboot import GetImagelist
except :
pass
try:
from Tools.Notifications import AddPopupWithCallback
except :
pass
def getMountChoices():
choices = []
for p in harddiskmanager.getMountedPartitions():
if path.exists(p.mountpoint):
d = path.normpath(p.mountpoint)
entry = (p.mountpoint, d)
if p.mountpoint != "/" and entry not in choices:
choices.append(entry)
choices.sort()
return choices
def getMountDefault(choices):
if fileExists("/media/usb/ImagesUpload/"):
choices = {x[1]: x[0] for x in choices}
default = choices.get("/media/usb") or choices.get("/media/hdd")
return default
else:
choices = {x[1]: x[0] for x in choices}
default = choices.get("/media/hdd") or choices.get("/media/usb")
return default
def __onPartitionChange(*args, **kwargs):
global choices
choices = getMountChoices()
config.imagemanager.backuplocation.setChoices(choices=choices, default=getMountDefault(choices))
try:
defaultprefix = getImageDistro()
config.imagemanager = ConfigSubsection()
config.imagemanager.autosettingsbackup = ConfigYesNo(default=True)
choices = getMountChoices()
config.imagemanager.backuplocation = ConfigSelection(choices=choices, default=getMountDefault(choices))
config.imagemanager.extensive_location_search = ConfigYesNo(default=True)
harddiskmanager.on_partition_list_change.append(__onPartitionChange) # to update backuplocation choices on mountpoint change
config.imagemanager.backupretry = ConfigNumber(default=30)
config.imagemanager.backupretrycount = NoSave(ConfigNumber(default=0))
config.imagemanager.folderprefix = ConfigText(default=defaultprefix, fixed_size=False)
config.imagemanager.nextscheduletime = NoSave(ConfigNumber(default=0))
config.imagemanager.repeattype = ConfigSelection(default="daily", choices=[("daily", _("Daily")), ("weekly", _("Weekly")), ("monthly", _("30 Days"))])
config.imagemanager.schedule = ConfigYesNo(default=False)
config.imagemanager.scheduletime = ConfigClock(default=0) # 1:00
config.imagemanager.query = ConfigYesNo(default=True)
config.imagemanager.lastbackup = ConfigNumber(default=0)
config.imagemanager.number_to_keep = ConfigNumber(default=0)
# Add a method for users to download images directly from their own build servers.
# Script must be able to handle urls in the form http://domain/scriptname/boxname.
# Format of the JSON output from the script must be the same as the official urls above.
# The option will only show once a url has been added.
config.imagemanager.imagefeed_MyBuild = ConfigText(default="", fixed_size=False)
config.imagemanager.login_as_ViX_developer = ConfigYesNo(default=False)
config.imagemanager.developer_username = ConfigText(default="username", fixed_size=False)
config.imagemanager.developer_password = ConfigText(default="password", fixed_size=False)
except :
pass
DISTRO = 0
URL = 1
ACTION = 2
FEED_URLS = [
("OpenViX", "https://www.openvix.co.uk/json/%s", "getMachineMake"),
("OpenATV", "https://images.mynonpublic.com/openatv/json/%s", "getMachineMake"),
("OpenBH", "https://images.openbh.net/json/%s", "getMachineMake"),
("OpenPLi", "http://downloads.openpli.org/json/%s", "HardwareInfo"),
]
class tmp:
dir = None
BackupTime = 0
def checkimagefiles(files):
return len([x for x in files if 'kernel' in x and '.bin' in x or x in ('uImage', 'rootfs.bin', 'root_cfe_auto.bin', 'root_cfe_auto.jffs2', 'oe_rootfs.bin', 'e2jffs2.img', 'rootfs.tar.bz2', 'rootfs.ubi')]) == 2
############################other image###############################################
class ImageManager(Screen):
skin = """<screen name="ImageManager" position="center,center" size="663,195">
<widget name="key_green" position="215,2" zPosition="1" size="195,50" font="Regular;15" halign="center" valign="center" backgroundColor="#a08500" transparent="1" />
<widget name="lab1" position="0,65" size="663,130" font="Regular; 20" zPosition="2" transparent="0" halign="center" />
</screen>"""
def __init__(self, session):
Screen.__init__(self, session)
self.setTitle(_("NeoBoot image download"))
self["lab1"] = Label()
self["key_green"] = Button(_("Downloads"))
self["infoactions"] = ActionMap(["SetupActions"], {
"info": self.showInfo,
}, -1)
self["defaultactions"] = ActionMap(["OkCancelActions"], {
"cancel": self.close,
}, -1)
self["mainactions"] = ActionMap(["ColorActions", "OkCancelActions", "DirectionActions", "KeyboardInputActions"], {
"green": self.doDownload,
"ok": self.doDownload,
"up": self.refreshUp,
"down": self.refreshDown,
"left": self.keyLeft,
"right": self.keyRight,
"upRepeated": self.refreshUp,
"downRepeated": self.refreshDown,
"leftRepeated": self.keyLeft,
"rightRepeated": self.keyRight,
}, -1)
self["mainactions"].setEnabled(False)
self.BackupRunning = False
self.mountAvailable = False
self.BackupDirectory = " "
if SystemInfo["canMultiBoot"]:
self.mtdboot = SystemInfo["MBbootdevice"]
self.onChangedEntry = []
if choices:
self["list"] = MenuList(list=[((_("No images found on the selected download server...if password check validity")), "Waiter")])
else:
self["list"] = MenuList(list=[((_(" Press 'Menu' to select a storage device - none available")), "Waiter")])
self["key_red"].hide()
self["key_green"].hide()
self["key_yellow"].hide()
self["key_blue"].hide()
self.populate_List()
self.activityTimer = eTimer()
self.activityTimer.startLongTimer(10)
self.Console = Console()
self.ConsoleB = Console(binary=True)
if BackupTime > 0:
t = localtime(BackupTime)
backuptext = _("Next backup: ") + strftime(_("%a %e %b %-H:%M"), t)
else:
backuptext = _("Next backup: ")
if self.selectionChanged not in self["list"].onSelectionChanged:
self["list"].onSelectionChanged.append(self.selectionChanged)
def selectionChanged(self):
# Where is this used? self.onChangedEntry does not appear to be populated anywhere. Maybe dead code.
item = self["list"].getCurrent() # (name, link)
if item:
name = item[1]
else:
name = ""
for cb in self.onChangedEntry:
cb(name, desc)
def refreshUp(self):
self["list"].moveUp()
def refreshDown(self):
self["list"].moveDown()
def keyLeft(self):
self["list"].pageUp()
self.selectionChanged()
def keyRight(self):
self["list"].pageDown()
self.selectionChanged()
def refreshList(self):
if self.BackupDirectory == " ":
return
imglist = []
imagesDownloadedList = self.getImagesDownloaded()
for image in imagesDownloadedList:
imglist.append((image["name"], image["link"]))
if imglist:
self["key_green"].show()
else:
self["key_green"].hide()
self["list"].setList(imglist)
self["list"].show()
self.selectionChanged()
def getJobName(self, job):
return "%s: %s (%d%%)" % (job.getStatustext(), job.name, int(100 * job.progress / float(job.end)))
def showJobView(self, job):
Components.Task.job_manager.in_background = False
self.session.openWithCallback(self.JobViewCB, JobView, job, cancelable=False, backgroundable=True, afterEventChangeable=False, afterEvent="close")
def JobViewCB(self, in_background):
Components.Task.job_manager.in_background = in_background
def populate_List(self):
if config.imagemanager.backuplocation.value.endswith("/"):
mount = config.imagemanager.backuplocation.value, config.imagemanager.backuplocation.value[:-1]
else:
mount = config.imagemanager.backuplocation.value + "/", config.imagemanager.backuplocation.value
if fileExists("/media/usb/ImagesUpload/"):
hdd = '/media/usb/', '/media/usb/'
else:
hdd = '/media/hdd/', '/media/hdd/'
if mount not in config.imagemanager.backuplocation.choices.choices and hdd not in config.imagemanager.backuplocation.choices.choices:
self["mainactions"].setEnabled(False)
self.mountAvailable = False
self["key_green"].hide()
self["lab1"].setText(_("Device: None available") + "\n" + _("Press 'Menu' to select a storage device"))
else:
self.BackupDirectory = config.imagemanager.backuplocation.value + "ImagesUpload/"
s = statvfs(config.imagemanager.backuplocation.value)
free = (s.f_bsize * s.f_bavail) // (1024 * 1024)
self["lab1"].setText(_("Device: ") + config.imagemanager.backuplocation.value + " " + _("Free space:") + " " + str(free) + _("MB") + "\n" + _("Press the OK or green button to download iamge."))
try:
if not path.exists(self.BackupDirectory):
mkdir(self.BackupDirectory, 0o755)
self.refreshList()
except Exception:
self["lab1"].setText(_("Device: ") + config.imagemanager.backuplocation.value + "\n" + _("Press the green button to download image.."))
self["mainactions"].setEnabled(True)
self.mountAvailable = True
def doDownload(self):
choices = [(x[DISTRO], x) for x in FEED_URLS]
if config.imagemanager.imagefeed_MyBuild.value.startswith("http"):
choices.insert(0, ("My build", ("My build", config.imagemanager.imagefeed_MyBuild.value, "getMachineMake")))
message = _("From which image library do you want to download?")
self.session.openWithCallback(self.doDownloadCallback, MessageBox, message, list=choices, default=1, simple=True)
def doDownloadCallback(self, retval): # retval will be the config element (or False, in the case of aborting the MessageBox).
if retval:
self.session.openWithCallback(self.refreshList, ImageManagerDownload, self.BackupDirectory, retval)
def getImagesDownloaded(self):
def getImages(files):
for file in files:
imagesFound.append({'link': file, 'name': file.split(ossep)[-1], 'mtime': stat(file).st_mtime})
def checkMachineNameInFilename(filename):
return model in filename or "-" + device_name + "-" in filename
model = getMachineMake()
device_name = HardwareInfo().get_device_name()
imagesFound = []
if config.imagemanager.extensive_location_search.value:
mediaList = ['/media/%s' % x for x in listdir('/media')] + (['/media/net/%s' % x for x in listdir('/media/net')] if path.isdir('/media/net') else []) + (['/media/autofs/%s' % x for x in listdir('/media/autofs')] if path.isdir('/media/autofs') else [])
else:
mediaList = [config.imagemanager.backuplocation.value]
imagesFound.sort(key=lambda x: x['mtime'], reverse=True)
# print("[ImageManager][getImagesDownloaded] imagesFound=%s" % imagesFound)
return imagesFound
def showInfo(self):
self.session.open(TextBox, self.infoText(), self.title + " - " + _("info"))
class ImageManagerDownload(Screen):
skin = """
<screen name="ImageManager" position="center,center" size="663,195">
<widget name="key_green" position="215,2" zPosition="1" size="195,50" font="Regular;15" halign="center" valign="center" backgroundColor="#a08500" transparent="1" />
<widget name="lab1" position="0,65" size="663,130" font="Regular; 20" zPosition="2" transparent="0" halign="center" />
</screen>"""
def __init__(self, session, BackupDirectory, imagefeed):
Screen.__init__(self, session)
self.setTitle(_("%s downloads") % imagefeed[DISTRO])
self.imagefeed = imagefeed
self.BackupDirectory = BackupDirectory
self["lab1"] = Label(_("Select an image to download for %s:") % getMachineMake())
self["key_red"] = Button(_("Close"))
self["key_green"] = Button(_("Download"))
self["ImageDown"] = ActionMap(["OkCancelActions", "ColorActions", "DirectionActions", "KeyboardInputActions", "MenuActions"], {
"cancel": self.close,
"red": self.close,
"green": self.keyDownload,
"ok": self.keyDownload,
"up": self.keyUp,
"down": self.keyDown,
"left": self.keyLeft,
"right": self.keyRight,
"upRepeated": self.keyUp,
"downRepeated": self.keyDown,
"leftRepeated": self.keyLeft,
"rightRepeated": self.keyRight,
"menu": self.close,
}, -1)
self.imagesList = {}
self.setIndex = 0
self.expanded = []
self["list"] = ChoiceList(list=[ChoiceEntryComponent("", ((_("No images found on the selected download server...if password check validity")), "Waiter"))])
self.getImageDistro()
def showError(self):
self.session.open(MessageBox, self.msg, MessageBox.TYPE_ERROR)
self.close()
def getImageDistro(self):
if not path.exists(self.BackupDirectory):
try:
mkdir(self.BackupDirectory, 0o755)
except Exception as err:
self.msg = _("Error creating backup folder:\n%s: %s") % (type(err).__name__, err)
print("[ImageManagerDownload][getImageDistro] " + self.msg)
self.pausetimer = eTimer()
self.pausetimer.callback.append(self.showError)
self.pausetimer.start(50, True)
return
boxtype = getMachineMake()
if self.imagefeed[ACTION] == "HardwareInfo":
boxtype = HardwareInfo().get_device_name()
print("[ImageManager1] boxtype:%s" % (boxtype))
if "dm800" in boxtype:
boxtype = getMachineMake()
if not self.imagesList:
# Legacy: self.imagefeed[URL] didn't contain "%s" where to insert the boxname.
# So just tag the boxname onto the end of the url like it is a subfolder.
# Obviously the url needs to exist.
if "%s" not in self.imagefeed[URL] and "?" not in self.imagefeed[URL]:
url = path.join(self.imagefeed[URL], boxtype)
else: # New style: self.imagefeed[URL] contains "%s" and boxname is inserted there.
url = self.imagefeed[URL] % boxtype
# special case for openvix developer downloads using user/pass
if self.imagefeed[DISTRO].lower() == "openvix" \
and self.imagefeed[URL].startswith("https") \
and config.imagemanager.login_as_ViX_developer.value \
and config.imagemanager.developer_username.value \
and config.imagemanager.developer_username.value != config.imagemanager.developer_username.default \
and config.imagemanager.developer_password.value \
and config.imagemanager.developer_password.value != config.imagemanager.developer_password.default:
url = path.join(url, config.imagemanager.developer_username.value, config.imagemanager.developer_password.value)
try:
self.imagesList = dict(json.load(urlopen(url)))
except Exception:
print("[ImageManager] no images available for: the '%s' at '%s'" % (boxtype, url))
return
if not self.imagesList: # Nothing has been found on that server so we might as well give up.
return
imglist = [] # this is reset on every "ok" key press of an expandable item so it reflects the current state of expandability of that item
for categorie in sorted(self.imagesList.keys(), reverse=True):
if categorie in self.expanded:
imglist.append(ChoiceEntryComponent("expanded", ((str(categorie)), "Expander")))
for image in sorted(self.imagesList[categorie].keys(), reverse=True):
imglist.append(ChoiceEntryComponent("verticalline", ((str(self.imagesList[categorie][image]["name"])), str(self.imagesList[categorie][image]["link"]))))
else:
# print("[ImageManager] [GetImageDistro] keys: %s" % list(self.imagesList[categorie].keys()))
for image in list(self.imagesList[categorie].keys()):
imglist.append(ChoiceEntryComponent("expandable", ((str(categorie)), "Expander")))
break
if imglist:
# print("[ImageManager] [GetImageDistro] imglist: %s" % imglist)
self["list"].setList(imglist)
if self.setIndex:
self["list"].moveToIndex(self.setIndex if self.setIndex < len(list) else len(list) - 1)
if self["list"].getCurrent()[0][1] == "Expander":
self.setIndex -= 1
if self.setIndex:
self["list"].moveToIndex(self.setIndex if self.setIndex < len(list) else len(list) - 1)
self.setIndex = 0
self.SelectionChanged()
def SelectionChanged(self):
currentSelected = self["list"].getCurrent()
if currentSelected[0][1] == "Waiter":
self["key_green"].setText("")
else:
if currentSelected[0][1] == "Expander":
self["key_green"].setText(_("Compress") if currentSelected[0][0] in self.expanded else _("Expand"))
else:
self["key_green"].setText(_("Download"))
def keyLeft(self):
self["list"].pageUp()
self.SelectionChanged()
def keyRight(self):
self["list"].pageDown()
self.SelectionChanged()
def keyUp(self):
self["list"].moveUp()
self.SelectionChanged()
def keyDown(self):
self["list"].moveDown()
self.SelectionChanged()
def keyDownload(self):
currentSelected = self["list"].getCurrent()
if currentSelected[0][1] == "Expander":
if currentSelected[0][0] in self.expanded:
self.expanded.remove(currentSelected[0][0])
else:
self.expanded.append(currentSelected[0][0])
self.getImageDistro()
elif currentSelected[0][1] != "Waiter":
self.sel = currentSelected[0][0]
if self.sel:
message = _("Are you sure you want to download this image:\n ") + self.sel
ybox = self.session.openWithCallback(self.doDownloadX, MessageBox, message, MessageBox.TYPE_YESNO)
ybox.setTitle(_("Download confirmation"))
else:
self.close()
def doDownloadX(self, answer):
if answer:
currentSelected = self["list"].getCurrent()
selectedimage = currentSelected[0][0]
headers, fileurl = self.processAuthLogin(currentSelected[0][1])
fileloc = self.BackupDirectory + selectedimage
Tools.CopyFiles.downloadFile(fileurl, fileloc, selectedimage.replace("_usb", ""), headers=headers)
for job in Components.Task.job_manager.getPendingJobs():
if job.name.startswith(_("Downloading")):
break
self.showJobView(job)
self.close()
def showJobView(self, job):
Components.Task.job_manager.in_background = False
self.session.openWithCallback(self.JobViewCB, JobView, job, cancelable=False, backgroundable=True, afterEventChangeable=False, afterEvent="close")
def JobViewCB(self, in_background):
Components.Task.job_manager.in_background = in_background
def processAuthLogin(self, url):
headers = None
parsed = urlparse(url)
scheme = parsed.scheme
username = parsed.username if parsed.username else ""
password = parsed.password if parsed.password else ""
hostname = parsed.hostname
port = ":%s" % parsed.port if parsed.port else ""
query = "?%s" % parsed.query if parsed.query else ""
if username or password:
import base64
base64bytes = base64.b64encode(('%s:%s' % (username, password)).encode())
headers = {("Authorization").encode(): ("Basic %s" % base64bytes.decode()).encode()}
return headers, scheme + "://" + hostname + port + parsed.path + query
########################____PLi image____#################################
class SelectImage(Screen):
def __init__(self, session, *args):
Screen.__init__(self, session)
@@ -376,4 +867,5 @@ class DownloadImageNeo(Screen):
self.session.openWithCallback(self.abort, MultibootSelection)
else:
return 0