From 9ce172836cab44016bb84db8d83029db68e502e6 Mon Sep 17 00:00:00 2001 From: gutosie Date: Mon, 8 Dec 2025 14:26:33 +0200 Subject: [PATCH] add download image --- NeoBoot/files/i_neo.py | 376 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 NeoBoot/files/i_neo.py diff --git a/NeoBoot/files/i_neo.py b/NeoBoot/files/i_neo.py new file mode 100644 index 0000000..9939bd8 --- /dev/null +++ b/NeoBoot/files/i_neo.py @@ -0,0 +1,376 @@ +from Screens.ChoiceBox import ChoiceBox +from Screens.Screen import Screen +from Screens.MessageBox import MessageBox +from Screens.Standby import getReasons +from Components.Sources.StaticText import StaticText +from Components.ChoiceList import ChoiceList, ChoiceEntryComponent +from Components.config import config, configfile +from Components.ActionMap import ActionMap +from Components.Console import Console +from Components.Label import Label +from Components.Pixmap import Pixmap +from Components.ProgressBar import ProgressBar +from Components.SystemInfo import BoxInfo +from Tools.BoundFunction import boundFunction +from Tools.Directories import resolveFilename, SCOPE_PLUGINS, fileExists, pathExists, fileHas +from Tools.Downloader import downloadWithProgress +from Tools.HardwareInfo import HardwareInfo +from Tools.Multiboot import getImagelist, getCurrentImage, getCurrentImageMode, deleteImage, restoreImages +import os +import re +from urllib.request import urlopen, Request +import xml.etree.ElementTree +import json +import time +import zipfile +import shutil +import tempfile +import struct + +from enigma import eEPGCache, eEnv + + +def checkimagefiles(files): + return len([x for x in files if 'kernel' in x and '.bin' in x or x in ('uImage', 'rootfs.bin', 'root_cfe_auto.bin', 'root_cfe_auto.jffs2', 'oe_rootfs.bin', 'e2jffs2.img', 'rootfs.tar.bz2', 'rootfs.ubi')]) == 2 + + +class SelectImage(Screen): + def __init__(self, session, *args): + Screen.__init__(self, session) + self.imageBrandList = {} + self.jsonlist = {} + self.imagesList = {} + self.setIndex = 0 + self.expanded = [] + self.model = HardwareInfo().get_machine_name() + self.selectedImage = ["OpenPLi", {"url": "https://downloads.openpli.org/json/%s" % self.model, "model": self.model}] + self.models = [self.model] + self.setTitle(_("Select image")) + self["key_red"] = StaticText(_("Cancel")) + self["key_green"] = StaticText() + self["key_blue"] = StaticText() + self["description"] = Label() + self["list"] = ChoiceList(list=[ChoiceEntryComponent('', ((_("Retrieving image list - Please wait...")), "Waiter"))]) + + self["actions"] = ActionMap(["OkCancelActions", "ColorActions", "DirectionActions", "KeyboardInputActions", "MenuActions"], + { + "ok": self.keyOk, + "cancel": boundFunction(self.close, None), + "red": boundFunction(self.close, None), + "green": self.keyOk, + "blue": self.otherImages, + "up": self.keyUp, + "down": self.keyDown, + "left": self.keyLeft, + "right": self.keyRight, + "upRepeated": self.keyUp, + "downRepeated": self.keyDown, + "leftRepeated": self.keyLeft, + "rightRepeated": self.keyRight, + "menu": boundFunction(self.close, True), + }, -1) + + self.callLater(self.getImagesList) + + def getImagesList(self): + + def getImages(path, files): + for file in files: + try: + if checkimagefiles([x.split(os.sep)[-1] for x in zipfile.ZipFile(file).namelist()]): + imagetyp = _("Downloaded Images") + if 'backup' in file.split(os.sep)[-1]: + imagetyp = _("Fullbackup Images") + if imagetyp not in self.imagesList: + self.imagesList[imagetyp] = {} + self.imagesList[imagetyp][file] = {'link': file, 'name': file.split(os.sep)[-1]} + except: + pass + + def checkModels(file): + for model in self.models: + if '-%s-' % model or '-%_' % model in file: + return True + return False + + def conditional_sort(ls, f): + y = iter(reversed(sorted(w for w in ls if f(w)))) + return [w if not f(w) else next(y) for w in ls] + + if not self.imageBrandList: + url = "%s%s" % ("https://raw.githubusercontent.com/OpenPLi/FlashImage/main/", self.model) + try: + self.imageBrandList = json.load(urlopen(url, timeout=3)) + except: + print("[FlashImage] getImageBrandList Error: Unable to load json data from URL '%s'!" % url) + if self.imageBrandList: + self.imageBrandList.update({self.selectedImage[0]: self.selectedImage[1]}) + self.models = set([self.imageBrandList[image]['model'] for image in self.imageBrandList.keys()]) + if len(self.imageBrandList) > 1: + self["key_blue"].setText(_("Other Images")) + if not self.imagesList: + if not self.jsonlist: + try: + self.jsonlist = dict(json.load(urlopen(self.selectedImage[1]["url"], timeout=3))) + except: + print("[FlashImage] getImagesList Error: Unable to load json data from URL '%s'!" % self.selectedImage[1]["url"]) + alternative_imagefeed = config.usage.alternative_imagefeed.value + if alternative_imagefeed: + if "http" in alternative_imagefeed: + url = "%s%s" % (config.usage.alternative_imagefeed.value, self.model) + try: + self.jsonlist.update(dict(json.load(urlopen(url, timeout=3)))) + except: + print("[FlashImage] getImagesList Error: Unable to load json data from alternative URL '%s'!" % url) + + self.imagesList = dict(self.jsonlist) + + for media in ['/media/%s' % x for x in os.listdir('/media')] + (['/media/net/%s' % x for x in os.listdir('/media/net')] if os.path.isdir('/media/net') else []): + try: + getImages(media, [os.path.join(media, x) for x in os.listdir(media) if os.path.splitext(x)[1] == ".zip" and checkModels(x)]) + for folder in ["ImagesUpload"]: + if folder in os.listdir(media): + subfolder = os.path.join(media, folder) + if os.path.isdir(subfolder) and not os.path.islink(subfolder) and not os.path.ismount(subfolder): + getImages(subfolder, [os.path.join(subfolder, x) for x in os.listdir(subfolder) if os.path.splitext(x)[1] == ".zip" and checkModels(x)]) + for dir in [dir for dir in [os.path.join(subfolder, dir) for dir in os.listdir(subfolder)] if os.path.isdir(dir) and os.path.splitext(dir)[1] == ".unzipped"]: + shutil.rmtree(dir) + except: + pass + + list = [] + for catagorie in conditional_sort(self.imagesList.keys(), lambda w: _("Downloaded Images") not in w and _("Fullbackup Images") not in w): + if catagorie in self.expanded: + list.append(ChoiceEntryComponent('expanded', ((str(catagorie)), "Expander"))) + for image in reversed(sorted(self.imagesList[catagorie].keys())): + list.append(ChoiceEntryComponent('verticalline', ((str(self.imagesList[catagorie][image]['name'])), str(self.imagesList[catagorie][image]['link'])))) + else: + for image in self.imagesList[catagorie].keys(): + list.append(ChoiceEntryComponent('expandable', ((str(catagorie)), "Expander"))) + break + if list: + self["list"].setList(list) + if self.setIndex: + self["list"].moveToIndex(self.setIndex if self.setIndex < len(list) else len(list) - 1) + if self["list"].l.getCurrentSelection()[0][1] == "Expander": + self.setIndex -= 1 + if self.setIndex: + self["list"].moveToIndex(self.setIndex if self.setIndex < len(list) else len(list) - 1) + self.setIndex = 0 + self.selectionChanged() + else: + self["list"].setList([ChoiceEntryComponent('', ((_("Cannot find images - please try later or select an alternate image")), "Waiter"))]) + + def keyOk(self): + currentSelected = self["list"].l.getCurrentSelection() + if currentSelected[0][1] == "Expander": + if currentSelected[0][0] in self.expanded: + self.expanded.remove(currentSelected[0][0]) + else: + self.expanded.append(currentSelected[0][0]) + self.getImagesList() + elif currentSelected[0][1] != "Waiter": + self.session.openWithCallback(self.reloadImagesList, DownloadImageNeo, currentSelected[0][0], currentSelected[0][1]) + + def reloadImagesList(self): + self["list"].setList([ChoiceEntryComponent('', ((_("Retrieving image list - Please wait...")), "Waiter"))]) + self["list"].moveToIndex(0) + self.selectionChanged() + self.imagesList = {} + self.callLater(self.getImagesList) + + + def otherImages(self): + if len(self.imageBrandList) > 1: + self.session.openWithCallback(self.otherImagesCallback, ChoiceBox, list=[(key, self.imageBrandList[key]) for key in self.imageBrandList.keys()], windowTitle=_("Select an image brand")) + + def otherImagesCallback(self, image): + if image: + self.selectedImage = image + self.jsonlist = {} + self.expanded = [] + self.reloadImagesList() + + def selectionChanged(self): + currentSelected = self["list"].l.getCurrentSelection() + if currentSelected[0][1] == "Waiter": + self["key_green"].setText("") + else: + if currentSelected[0][1] == "Expander": + self["key_green"].setText(_("Collapse") if currentSelected[0][0] in self.expanded else _("Expand")) + self["description"].setText("") + else: + self["key_green"].setText(_("Download Image")) + self["description"].setText(currentSelected[0][1]) + + def keyLeft(self): + self["list"].instance.moveSelection(self["list"].instance.pageUp) + self.selectionChanged() + + def keyRight(self): + self["list"].instance.moveSelection(self["list"].instance.pageDown) + self.selectionChanged() + + def keyUp(self): + self["list"].instance.moveSelection(self["list"].instance.moveUp) + self.selectionChanged() + + def keyDown(self): + self["list"].instance.moveSelection(self["list"].instance.moveDown) + self.selectionChanged() + + + +class DownloadImageNeo(Screen): + skin = """ + + + + """ + + def __init__(self, session, imagename, source): + Screen.__init__(self, session) + self.containerbackup = None + self.containerofgwrite = None + self.getImageList = None + self.downloader = None + self.source = source + self.imagename = imagename + self.reasons = getReasons(session) + + self["header"] = Label(_("Backup settings")) + self["info"] = Label(_("Save settings and EPG data")) + self["progress"] = ProgressBar() + self["progress"].setRange((0, 100)) + self["progress"].setValue(0) + + self["actions"] = ActionMap(["OkCancelActions", "ColorActions"], + { + "cancel": self.abort, + "red": self.abort, + "ok": self.ok, + "green": self.ok, + }, -1) + + self.callLater(self.confirmation) + + def confirmation(self): + if self.reasons: + self.message = _("%s\nDo you still want to download image\n%s?") % (self.reasons, self.imagename) + else: + self.message = _("Do you want to download image\n%s") % self.imagename + if BoxInfo.getItem("canMultiBoot"): + pass + + else: + choices = [(_("Download Image neoboot"))] + choices.append((_("No, do not flash image"), False)) + self.session.openWithCallback(self.checkMedia, MessageBox, self.message, list=choices, default=False, simple=True) + + def checkMedia(self, retval): + if retval: + if BoxInfo.getItem("canMultiBoot"): + self.multibootslot = retval[0] + self.onlyDownload = retval[1] == "only download" + else: + self.onlyDownload = retval == "only download" + + def findmedia(path): + def avail(path): + if not path.startswith('/mmc') and os.path.isdir(path) and os.access(path, os.W_OK): + try: + statvfs = os.statvfs(path) + return (statvfs.f_bavail * statvfs.f_frsize) // (1 << 20) + except OSError as err: + print("[FlashImage] checkMedia Error %d: Unable to get status for '%s'! (%s)" % (err.errno, path, err.strerror)) + return 0 + + def checkIfDevice(path, diskstats): + st_dev = os.stat(path).st_dev + return (os.major(st_dev), os.minor(st_dev)) in diskstats + + diskstats = [(int(x[0]), int(x[1])) for x in [x.split()[0:3] for x in open('/proc/diskstats').readlines()] if x[2].startswith("sd")] + if os.path.isdir(path) and checkIfDevice(path, diskstats) and avail(path) > 500: + return (path, True) + mounts = [] + devices = [] + for path in ['/media/%s' % x for x in os.listdir('/media')] + (['/media/net/%s' % x for x in os.listdir('/media/net')] if os.path.isdir('/media/net') else []): + try: + if checkIfDevice(path, diskstats): + devices.append((path, avail(path))) + else: + mounts.append((path, avail(path))) + except OSError: + pass + devices.sort(key=lambda x: x[1], reverse=True) + mounts.sort(key=lambda x: x[1], reverse=True) + return ((devices[0][1] > 500 and (devices[0][0], True)) if devices else mounts and mounts[0][1] > 500 and (mounts[0][0], False)) or (None, None) + + if os.path.isdir('/media/usb/ImagesUpload'): + self.destination, isDevice = findmedia('/media/usb') + else: + self.destination, isDevice = findmedia('/media/hdd') + + + if self.destination: + destination = os.path.join(self.destination, 'ImagesUpload') + self.zippedimage = "://" in self.source and os.path.join(destination, self.imagename) or self.source + self.unzippedimage = os.path.join(destination, '%s.unzipped' % self.imagename[:-4]) + + try: + if os.path.isfile(destination): + os.remove(destination) + if not os.path.isdir(destination): + os.mkdir(destination) + self.startDownload() + except: + self.session.openWithCallback(self.abort, MessageBox, _("Unable to create the required directories on the media (e.g. USB stick or Harddisk) - Please verify media and try again!"), type=MessageBox.TYPE_ERROR, simple=True) + else: + self.session.openWithCallback(self.abort, MessageBox, _("Could not find suitable media - Please remove some downloaded images or insert a media (e.g. USB stick) with sufficiant free space and try again!"), type=MessageBox.TYPE_ERROR, simple=True) + else: + self.abort() + + def startDownload(self, reply=True): + self.show() + if reply: + if "://" in self.source: + from Tools.Downloader import downloadWithProgress + self["header"].setText(_("Downloading Image")) + self["info"].setText(self.imagename) + self.downloader = downloadWithProgress(self.source, self.zippedimage) + self.downloader.addProgress(self.downloadProgress) + self.downloader.addEnd(self.downloadEnd) + self.downloader.addError(self.downloadError) + self.downloader.start() + else: + pass + else: + self.abort() + + def downloadProgress(self, current, total): + self["progress"].setValue(int(100 * current / total)) + + def downloadError(self, reason, status): + self.downloader.stop() + self.session.openWithCallback(self.abort, MessageBox, _("Error during downloading image\n%s\n%s") % (self.imagename, reason), type=MessageBox.TYPE_ERROR, simple=True) + + def downloadEnd(self): + self.downloader.stop() + self.abort() + + def abort(self, reply=None): + if self.getImageList or self.containerofgwrite: + return 0 + if self.downloader: + self.downloader.stop() + if self.containerbackup: + self.containerbackup.killAll() + self.close() + + + def ok(self): + if self["header"].text == _("Downloading image successful"): + self.session.openWithCallback(self.abort, MultibootSelection) + else: + return 0 +