From 64de141807b526f7eef71291df0fb046addd934a Mon Sep 17 00:00:00 2001 From: DYefremov Date: Wed, 25 Aug 2021 14:05:27 +0300 Subject: [PATCH] minor refactoring for neutrino --- app/eparser/neutrino/__init__.py | 39 ++++++++++ app/eparser/neutrino/bouquets.py | 78 ++++++++++---------- app/eparser/neutrino/services.py | 120 ++++++++++++++----------------- app/ui/service_details_dialog.py | 12 ++-- 4 files changed, 140 insertions(+), 109 deletions(-) diff --git a/app/eparser/neutrino/__init__.py b/app/eparser/neutrino/__init__.py index e69de29b..0bd74160 100644 --- a/app/eparser/neutrino/__init__.py +++ b/app/eparser/neutrino/__init__.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# The MIT License (MIT) +# +# Copyright (c) 2018-2021 Dmitriy Yefremov +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# Author: Dmitriy Yefremov +# + +SP = "_:::_" +KSP = "_::_" +API_VER = "4" + + +def get_attributes(data): + return {el[0]: el[1] for el in (e.split(KSP) for e in data.split(SP))} + + +def get_xml_attributes(attr): + attrs = attr.attributes + return {t: attrs[t].value for t in attrs.keys()} diff --git a/app/eparser/neutrino/bouquets.py b/app/eparser/neutrino/bouquets.py index 589d8017..1f746ec3 100644 --- a/app/eparser/neutrino/bouquets.py +++ b/app/eparser/neutrino/bouquets.py @@ -30,6 +30,7 @@ import os from xml.dom.minidom import parse, Document from app.eparser.iptv import NEUTRINO_FAV_ID_FORMAT +from app.eparser.neutrino import KSP, SP, get_xml_attributes, get_attributes, API_VER from app.ui.uicommons import LOCKED_ICON, HIDE_ICON from ..ecommons import Bouquets, Bouquet, BouquetService, BqServiceType, PROVIDER, BqType @@ -55,25 +56,25 @@ def parse_bouquets(file, name, bq_type): for elem in dom.getElementsByTagName("Bouquet"): if elem.hasAttributes(): - bq_name = elem.attributes["name"].value - hidden = elem.attributes.get("hidden") - hidden = hidden.value if hidden else hidden - locked = elem.attributes.get("locked") - locked = locked.value if locked else locked - # epg = elem.attributes["epg"].value + bq_attrs = get_xml_attributes(elem) + bq_name = bq_attrs.get("name", "") + hidden = bq_attrs.get("hidden", "0") + locked = bq_attrs.get("locked", "0") services = [] for srv_elem in elem.getElementsByTagName("S"): if srv_elem.hasAttributes(): - ssid = srv_elem.attributes["i"].value - on = srv_elem.attributes["on"].value - tr_id = srv_elem.attributes["t"].value + s_attrs = get_xml_attributes(srv_elem) + ssid = s_attrs.get("i", "0") + on = s_attrs.get("on", "0") + tr_id = s_attrs.get("t", "0") fav_id = "{}:{}:{}".format(tr_id, on, ssid) services.append(BouquetService(None, BqServiceType.DEFAULT, fav_id, 0)) bouquets[2].append(Bouquet(name=bq_name, type=bq_type, services=services, locked=LOCKED_ICON if locked == "1" else None, - hidden=HIDE_ICON if hidden == "1" else None)) + hidden=HIDE_ICON if hidden == "1" else None, + file=SP.join("{}{}{}".format(k, KSP, v) for k, v in bq_attrs.items()))) if BqType(bq_type) is BqType.BOUQUET: for bq in bouquets.bouquets: @@ -95,31 +96,23 @@ def parse_webtv(path, name, bq_type): services = [] for elem in dom.getElementsByTagName("webtv"): if elem.hasAttributes(): - title = elem.attributes["title"].value - url = elem.attributes["url"].value - description = elem.attributes.get("description") - description = description.value if description else description - urlkey = elem.attributes.get("urlkey", None) - urlkey = urlkey.value if urlkey else urlkey - account = elem.attributes.get("account", None) - account = account.value if account else account - usrname = elem.attributes.get("usrname", None) - usrname = usrname.value if usrname else usrname - psw = elem.attributes.get("psw", None) - psw = psw.value if psw else psw - s_type = elem.attributes.get("type", None) - s_type = s_type.value if s_type else s_type - iconsrc = elem.attributes.get("iconsrc", None) - iconsrc = iconsrc.value if iconsrc else iconsrc - iconsrc_b = elem.attributes.get("iconsrc_b", None) - iconsrc_b = iconsrc_b.value if iconsrc_b else iconsrc_b - group = elem.attributes.get("group", None) - group = group.value if group else group + web_attrs = get_xml_attributes(elem) + title = web_attrs.get("title", "") + url = web_attrs.get("url", "") + description = web_attrs.get("description", "") + urlkey = web_attrs.get("urlkey", None) + account = web_attrs.get("account", None) + usrname = web_attrs.get("usrname", None) + psw = web_attrs.get("psw", None) + s_type = web_attrs.get("type", None) + iconsrc = web_attrs.get("iconsrc", None) + iconsrc_b = web_attrs.get("iconsrc_b", None) + group = web_attrs.get("group", None) fav_id = NEUTRINO_FAV_ID_FORMAT.format(url, description, urlkey, account, usrname, psw, s_type, iconsrc, iconsrc_b, group) services.append(BouquetService(name=title, type=BqServiceType.IPTV, data=fav_id, num=0)) - bouquet = Bouquet(name="default", type=bq_type, services=services, locked=None, hidden=None) + bouquet = Bouquet(name="default", type=bq_type, services=services, locked=None, hidden=None, file=None) bouquets[2].append(bouquet) return bouquets @@ -137,29 +130,38 @@ def write_bouquets(path, bouquets): def write_bouquet(file, bouquet): doc = Document() root = doc.createElement("zapit") + root.setAttribute("api", API_VER) doc.appendChild(root) comment = doc.createComment(_COMMENT) doc.appendChild(comment) for bq in bouquet.bouquets: + attrs = get_attributes(bq.file) if bq.file else {} + attrs["name"] = bq.name + if bq.hidden: + attrs["hidden"] = "1" + else: + attrs.pop("hidden", None) + if bq.locked: + attrs["locked"] = "1" + else: + attrs.pop("locked", None) + bq_elem = doc.createElement("Bouquet") - bq_elem.setAttribute("name", bq.name) - bq_elem.setAttribute("hidden", "1" if bq.hidden else "0") - bq_elem.setAttribute("locked", "1" if bq.locked else "0") - bq_elem.setAttribute("epg", "0") + for k, v in attrs.items(): + bq_elem.setAttribute(k, v) + root.appendChild(bq_elem) for srv in bq.services: - f_data = srv.flags_cas.split(":") tr_id, on, ssid = srv.fav_id.split(":") srv_elem = doc.createElement("S") srv_elem.setAttribute("i", ssid) srv_elem.setAttribute("n", srv.service) srv_elem.setAttribute("t", tr_id) srv_elem.setAttribute("on", on) - srv_elem.setAttribute("s", f_data[1]) srv_elem.setAttribute("frq", srv.freq) - srv_elem.setAttribute("l", "0") # temporary !!! + srv_elem.setAttribute("s", get_attributes(srv.flags_cas).get("position", "0")) bq_elem.appendChild(srv_elem) doc.writexml(open(file, "w"), addindent=" ", newl="\n", encoding="UTF-8") diff --git a/app/eparser/neutrino/services.py b/app/eparser/neutrino/services.py index be8a460b..43cc9d06 100644 --- a/app/eparser/neutrino/services.py +++ b/app/eparser/neutrino/services.py @@ -26,17 +26,16 @@ # -import re from collections import defaultdict -from xml.dom.minidom import Document, parseString +from xml.dom.minidom import Document, parse +from xml.parsers.expat import ExpatError from app.commons import log from app.eparser.ecommons import (Service, POLARIZATION, FEC, SYSTEM, SERVICE_TYPE, PROVIDER, T_SYSTEM, TrType, SystemCable) +from app.eparser.neutrino import get_xml_attributes, SP, KSP, get_attributes, API_VER _FILE = "services.xml" -SP = "_:::_" -KSP = "_::_" def write_services(path, services): @@ -53,7 +52,7 @@ class NeutrinoServiceWriter: self._path = path self._services = services - self._api = "4" + self._api = API_VER self._doc = Document() self._root = self._doc.createElement("zapit") self._root.setAttribute("api", self._api) @@ -81,7 +80,7 @@ class NeutrinoServiceWriter: for sat in sats: sat_elem = self._doc.createElement(s_type) - attrs = self.get_attributes(sat) + attrs = get_attributes(sat) for k, v in attrs.items(): sat_elem.setAttribute(k, v) @@ -93,13 +92,13 @@ class NeutrinoServiceWriter: for tr in transponders: tr_elem = self._doc.createElement("TS") - for k, v in self.get_attributes(tr).items(): + for k, v in get_attributes(tr).items(): tr_elem.setAttribute(k, v) sat_elem.appendChild(tr_elem) for srv in transponders.get(tr): srv_elem = self._doc.createElement("S") - s_attrs = self.get_attributes(srv.data_id) + s_attrs = get_attributes(srv.data_id) api = s_attrs.pop("api", self._api) if api != self._api: self._root.setAttribute("api", api) @@ -109,10 +108,6 @@ class NeutrinoServiceWriter: tr_elem.appendChild(srv_elem) - @staticmethod - def get_attributes(data): - return {el[0]: el[1] for el in (e.split(KSP) for e in data.split(SP))} - class NeutrinoServicesReader: @@ -124,56 +119,60 @@ class NeutrinoServicesReader: self._services = [] def get_services(self): - with open(self._path + _FILE, "rb") as f: - # Pre-processing is required to replace the '&' character. - dom = parseString(re.sub("&", "&", f.read().decode(encoding="utf-8", errors="ignore"))) + try: + dom = parse(self._path + _FILE) + except ExpatError as e: + # Some neutrino configuration files may contain text data with invalid characters ['&', etc]. + # https://www.w3.org/TR/xml/#syntax + # Apparently there is an error in Neutrino itself and the document is not initially formed correctly. + # TODO: Come up with a way to handle this case. + msg = "The file [{}] is not formatted correctly or contains invalid characters! Cause: {}" - for root in dom.getElementsByTagName("zapit"): - if root.hasAttributes(): - api = root.attributes["api"] - self._api = api.value if api else self._api + raise ValueError(msg.format(self._path + _FILE, e)) - for elem in root.getElementsByTagName("sat"): - if elem.hasAttributes(): - sat_attrs = self.get_attributes(elem) - sat_attrs["name"] = re.sub("&", "&", sat_attrs.get("name", "")) - sat_pos = 0 - try: - sat_pos = int(sat_attrs.get("position", "0")) - sat_pos = "{:0.1f}{}".format(abs(sat_pos / 10), "W" if sat_pos < 0 else "E") - except ValueError as e: - log("Neutrino parsing error [parse sat position]: {}".format(e)) - sat = SP.join("{}{}{}".format(k, KSP, v) for k, v in sat_attrs.items()) - for tr_elem in elem.getElementsByTagName("TS"): - if tr_elem.hasAttributes(): - self.parse_sat_transponder(sat, sat_pos, tr_elem) + for root in dom.getElementsByTagName("zapit"): + if root.hasAttributes(): + api = root.attributes["api"] + self._api = api.value if api else self._api - # Terrestrial DVB-T[2]. - for elem in root.getElementsByTagName("terrestrial"): - if elem.hasAttributes(): - terr_attrs = self.get_attributes(elem) - terr_attrs["name"] = re.sub("&", "&", terr_attrs.get("name", "")) - terr = SP.join("{}{}{}".format(k, KSP, v) for k, v in terr_attrs.items()) + for elem in root.getElementsByTagName("sat"): + if elem.hasAttributes(): + sat_attrs = get_xml_attributes(elem) + sat_pos = 0 + try: + sat_pos = int(sat_attrs.get("position", "0")) + sat_pos = "{:0.1f}{}".format(abs(sat_pos / 10), "W" if sat_pos < 0 else "E") + except ValueError as e: + log("Neutrino parsing error [parse sat position]: {}".format(e)) + sat = SP.join("{}{}{}".format(k, KSP, v) for k, v in sat_attrs.items()) + for tr_elem in elem.getElementsByTagName("TS"): + if tr_elem.hasAttributes(): + self.parse_sat_transponder(sat, sat_pos, tr_elem) - for tr_elem in elem.getElementsByTagName("TS"): - if tr_elem.hasAttributes(): - self.parse_ct_transponder(terr, tr_elem, TrType.Terrestrial) + # Terrestrial DVB-T[2]. + for elem in root.getElementsByTagName("terrestrial"): + if elem.hasAttributes(): + terr_attrs = get_xml_attributes(elem) + terr = SP.join("{}{}{}".format(k, KSP, v) for k, v in terr_attrs.items()) - # Cable. - for elem in root.getElementsByTagName("cable"): - if elem.hasAttributes(): - cable_attrs = self.get_attributes(elem) - terr_attrs["name"] = re.sub("&", "&", cable_attrs.get("name", "")) - cable = SP.join("{}{}{}".format(k, KSP, v) for k, v in cable_attrs.items()) + for tr_elem in elem.getElementsByTagName("TS"): + if tr_elem.hasAttributes(): + self.parse_ct_transponder(terr, tr_elem, TrType.Terrestrial) - for tr_elem in elem.getElementsByTagName("TS"): - if tr_elem.hasAttributes(): - self.parse_ct_transponder(cable, tr_elem, TrType.Cable) + # Cable. + for elem in root.getElementsByTagName("cable"): + if elem.hasAttributes(): + cable_attrs = get_xml_attributes(elem) + cable = SP.join("{}{}{}".format(k, KSP, v) for k, v in cable_attrs.items()) - return self._services + for tr_elem in elem.getElementsByTagName("TS"): + if tr_elem.hasAttributes(): + self.parse_ct_transponder(cable, tr_elem, TrType.Cable) + + return self._services def parse_sat_transponder(self, sat, sat_pos, tr_elem): - tr_attr = self.get_attributes(tr_elem) + tr_attr = get_xml_attributes(tr_elem) tr = SP.join("{}{}{}".format(k, KSP, v) for k, v in tr_attr.items()) tr_id = tr_attr.get("id", "0").lstrip("0") on = tr_attr.get("on", "0") @@ -191,11 +190,9 @@ class NeutrinoServicesReader: for srv_elem in tr_elem.getElementsByTagName("S"): if srv_elem.hasAttributes(): - at = self.get_attributes(srv_elem) + at = get_xml_attributes(srv_elem) at["api"] = self._api ssid, name, s_type, sys = at.get("i", "0"), at.get("n", ""), at.get("t", "3"), at.get("s", "0") - name = re.sub("amp;", "", name) - at["n"] = name data_id = SP.join("{}{}{}".format(k, KSP, v) for k, v in at.items()) fav_id = "{}:{}:{}".format(tr_id, on.lstrip("0"), ssid.lstrip("0")) picon_id = "{}{}{}.png".format(tr_id, on, ssid) @@ -207,17 +204,15 @@ class NeutrinoServicesReader: self._services.append(srv) def parse_ct_transponder(self, terr, tr_elem, tr_type): - attrs = self.get_attributes(tr_elem) + attrs = get_xml_attributes(tr_elem) tr = SP.join("{}{}{}".format(k, KSP, v) for k, v in attrs.items()) tr_id, on, freq = attrs.get("id", "0").lstrip("0"), attrs.get("on", "0"), attrs.get("frq", "0") for srv_elem in tr_elem.getElementsByTagName("S"): if srv_elem.hasAttributes(): - s_at = self.get_attributes(srv_elem) + s_at = get_xml_attributes(srv_elem) s_at["api"] = self._api ssid, name, s_type, sys = s_at.get("i", "0"), s_at.get("n", ""), s_at.get("t", "3"), s_at.get("s", "0") - name = re.sub("amp;", "", name) - s_at["n"] = name data_id = SP.join("{}{}{}".format(k, KSP, v) for k, v in s_at.items()) fav_id = "{}:{}:{}".format(tr_id, on.lstrip("0"), ssid.lstrip("0")) picon_id = "{}{}{}.png".format(tr_id, on, ssid) @@ -238,11 +233,6 @@ class NeutrinoServicesReader: freq, "0", None, None, sys, pos, data_id, fav_id, tr) self._services.append(srv) - @staticmethod - def get_attributes(attr): - attrs = attr.attributes - return {t: attrs[t].value for t in attrs.keys()} - if __name__ == "__main__": pass diff --git a/app/ui/service_details_dialog.py b/app/ui/service_details_dialog.py index da398147..f011237c 100644 --- a/app/ui/service_details_dialog.py +++ b/app/ui/service_details_dialog.py @@ -35,7 +35,7 @@ from app.eparser.ecommons import (MODULATION, Inversion, ROLL_OFF, Pilot, Flag, get_value_by_name, FEC_DEFAULT, PLS_MODE, SERVICE_TYPE, T_MODULATION, C_MODULATION, TrType, SystemCable, T_SYSTEM, BANDWIDTH, TRANSMISSION_MODE, GUARD_INTERVAL, T_FEC, HIERARCHY, A_MODULATION) -from app.eparser.neutrino.services import NeutrinoServiceWriter, SP, KSP +from app.eparser.neutrino import get_attributes, SP, KSP from app.settings import SettingsType from .dialogs import show_dialog, DialogType, Action, get_builder from .main_helper import get_base_model @@ -353,7 +353,7 @@ class ServiceDetailsDialog: def init_neutrino_data(self, srv): if self._tr_type is not TrType.Satellite: return - tr_data = NeutrinoServiceWriter.get_attributes(srv.transponder) + tr_data = get_attributes(srv.transponder) self._transponder_id_entry.set_text(str(int(tr_data.get("id", "0"), 16))) self._network_id_entry.set_text(str(int(tr_data.get("on", "0"), 16))) self.select_active_text(self._invertion_combo_box, Inversion(tr_data.get("inv", "2")).name) @@ -543,7 +543,7 @@ class ServiceDetailsDialog: if self._s_type is SettingsType.ENIGMA_2: return self.get_enigma2_flags() elif self._s_type is SettingsType.NEUTRINO_MP: - flags = NeutrinoServiceWriter.get_attributes(self._old_service.flags_cas) + flags = get_attributes(self._old_service.flags_cas) flags["position"] = self.get_sat_position() return SP.join("{}{}{}".format(k, KSP, v) for k, v in flags.items()) @@ -599,7 +599,7 @@ class ServiceDetailsDialog: fav_id = self._ENIGMA2_FAV_ID.format(ssid, tr_id, net_id, namespace) return fav_id, data_id elif self._s_type is SettingsType.NEUTRINO_MP: - data = NeutrinoServiceWriter.get_attributes(self._old_service.data_id) + data = get_attributes(self._old_service.data_id) data["n"] = self._name_entry.get_text() data["t"] = "{:x}".format(int(service_type)) data["i"] = "{:04x}".format(ssid) @@ -652,7 +652,7 @@ class ServiceDetailsDialog: return "{}:{}:{}:{}:{}{}".format(dvb_s_tr, flag, mod, roll_off, pilot, pls) elif self._s_type is SettingsType.NEUTRINO_MP: - tr_data = NeutrinoServiceWriter.get_attributes(self._old_service.transponder) + tr_data = get_attributes(self._old_service.transponder) tr_data["frq"] = freq tr_data["sr"] = rate tr_data["pol"] = pol @@ -721,7 +721,7 @@ class ServiceDetailsDialog: continue if self._s_type is SettingsType.NEUTRINO_MP: - flags = NeutrinoServiceWriter.get_attributes(srv[Column.SRV_CAS_FLAGS]) + flags = get_attributes(srv[Column.SRV_CAS_FLAGS]) flags["position"] = sat_pos srv[Column.SRV_CAS_FLAGS] = SP.join("{}{}{}".format(k, KSP, v) for k, v in flags.items())