minor refactoring for neutrino

This commit is contained in:
DYefremov
2021-08-25 14:05:27 +03:00
parent cab20f744f
commit 64de141807
4 changed files with 140 additions and 109 deletions

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (c) 2018-2021 Dmitriy Yefremov
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Author: Dmitriy Yefremov
#
SP = "_:::_"
KSP = "_::_"
API_VER = "4"
def get_attributes(data):
return {el[0]: el[1] for el in (e.split(KSP) for e in data.split(SP))}
def get_xml_attributes(attr):
attrs = attr.attributes
return {t: attrs[t].value for t in attrs.keys()}

View File

@@ -30,6 +30,7 @@ import os
from xml.dom.minidom import parse, Document
from app.eparser.iptv import NEUTRINO_FAV_ID_FORMAT
from app.eparser.neutrino import KSP, SP, get_xml_attributes, get_attributes, API_VER
from app.ui.uicommons import LOCKED_ICON, HIDE_ICON
from ..ecommons import Bouquets, Bouquet, BouquetService, BqServiceType, PROVIDER, BqType
@@ -55,25 +56,25 @@ def parse_bouquets(file, name, bq_type):
for elem in dom.getElementsByTagName("Bouquet"):
if elem.hasAttributes():
bq_name = elem.attributes["name"].value
hidden = elem.attributes.get("hidden")
hidden = hidden.value if hidden else hidden
locked = elem.attributes.get("locked")
locked = locked.value if locked else locked
# epg = elem.attributes["epg"].value
bq_attrs = get_xml_attributes(elem)
bq_name = bq_attrs.get("name", "")
hidden = bq_attrs.get("hidden", "0")
locked = bq_attrs.get("locked", "0")
services = []
for srv_elem in elem.getElementsByTagName("S"):
if srv_elem.hasAttributes():
ssid = srv_elem.attributes["i"].value
on = srv_elem.attributes["on"].value
tr_id = srv_elem.attributes["t"].value
s_attrs = get_xml_attributes(srv_elem)
ssid = s_attrs.get("i", "0")
on = s_attrs.get("on", "0")
tr_id = s_attrs.get("t", "0")
fav_id = "{}:{}:{}".format(tr_id, on, ssid)
services.append(BouquetService(None, BqServiceType.DEFAULT, fav_id, 0))
bouquets[2].append(Bouquet(name=bq_name,
type=bq_type,
services=services,
locked=LOCKED_ICON if locked == "1" else None,
hidden=HIDE_ICON if hidden == "1" else None))
hidden=HIDE_ICON if hidden == "1" else None,
file=SP.join("{}{}{}".format(k, KSP, v) for k, v in bq_attrs.items())))
if BqType(bq_type) is BqType.BOUQUET:
for bq in bouquets.bouquets:
@@ -95,31 +96,23 @@ def parse_webtv(path, name, bq_type):
services = []
for elem in dom.getElementsByTagName("webtv"):
if elem.hasAttributes():
title = elem.attributes["title"].value
url = elem.attributes["url"].value
description = elem.attributes.get("description")
description = description.value if description else description
urlkey = elem.attributes.get("urlkey", None)
urlkey = urlkey.value if urlkey else urlkey
account = elem.attributes.get("account", None)
account = account.value if account else account
usrname = elem.attributes.get("usrname", None)
usrname = usrname.value if usrname else usrname
psw = elem.attributes.get("psw", None)
psw = psw.value if psw else psw
s_type = elem.attributes.get("type", None)
s_type = s_type.value if s_type else s_type
iconsrc = elem.attributes.get("iconsrc", None)
iconsrc = iconsrc.value if iconsrc else iconsrc
iconsrc_b = elem.attributes.get("iconsrc_b", None)
iconsrc_b = iconsrc_b.value if iconsrc_b else iconsrc_b
group = elem.attributes.get("group", None)
group = group.value if group else group
web_attrs = get_xml_attributes(elem)
title = web_attrs.get("title", "")
url = web_attrs.get("url", "")
description = web_attrs.get("description", "")
urlkey = web_attrs.get("urlkey", None)
account = web_attrs.get("account", None)
usrname = web_attrs.get("usrname", None)
psw = web_attrs.get("psw", None)
s_type = web_attrs.get("type", None)
iconsrc = web_attrs.get("iconsrc", None)
iconsrc_b = web_attrs.get("iconsrc_b", None)
group = web_attrs.get("group", None)
fav_id = NEUTRINO_FAV_ID_FORMAT.format(url, description, urlkey, account, usrname, psw, s_type, iconsrc,
iconsrc_b, group)
services.append(BouquetService(name=title, type=BqServiceType.IPTV, data=fav_id, num=0))
bouquet = Bouquet(name="default", type=bq_type, services=services, locked=None, hidden=None)
bouquet = Bouquet(name="default", type=bq_type, services=services, locked=None, hidden=None, file=None)
bouquets[2].append(bouquet)
return bouquets
@@ -137,29 +130,38 @@ def write_bouquets(path, bouquets):
def write_bouquet(file, bouquet):
doc = Document()
root = doc.createElement("zapit")
root.setAttribute("api", API_VER)
doc.appendChild(root)
comment = doc.createComment(_COMMENT)
doc.appendChild(comment)
for bq in bouquet.bouquets:
attrs = get_attributes(bq.file) if bq.file else {}
attrs["name"] = bq.name
if bq.hidden:
attrs["hidden"] = "1"
else:
attrs.pop("hidden", None)
if bq.locked:
attrs["locked"] = "1"
else:
attrs.pop("locked", None)
bq_elem = doc.createElement("Bouquet")
bq_elem.setAttribute("name", bq.name)
bq_elem.setAttribute("hidden", "1" if bq.hidden else "0")
bq_elem.setAttribute("locked", "1" if bq.locked else "0")
bq_elem.setAttribute("epg", "0")
for k, v in attrs.items():
bq_elem.setAttribute(k, v)
root.appendChild(bq_elem)
for srv in bq.services:
f_data = srv.flags_cas.split(":")
tr_id, on, ssid = srv.fav_id.split(":")
srv_elem = doc.createElement("S")
srv_elem.setAttribute("i", ssid)
srv_elem.setAttribute("n", srv.service)
srv_elem.setAttribute("t", tr_id)
srv_elem.setAttribute("on", on)
srv_elem.setAttribute("s", f_data[1])
srv_elem.setAttribute("frq", srv.freq)
srv_elem.setAttribute("l", "0") # temporary !!!
srv_elem.setAttribute("s", get_attributes(srv.flags_cas).get("position", "0"))
bq_elem.appendChild(srv_elem)
doc.writexml(open(file, "w"), addindent=" ", newl="\n", encoding="UTF-8")

View File

@@ -26,17 +26,16 @@
#
import re
from collections import defaultdict
from xml.dom.minidom import Document, parseString
from xml.dom.minidom import Document, parse
from xml.parsers.expat import ExpatError
from app.commons import log
from app.eparser.ecommons import (Service, POLARIZATION, FEC, SYSTEM, SERVICE_TYPE, PROVIDER, T_SYSTEM, TrType,
SystemCable)
from app.eparser.neutrino import get_xml_attributes, SP, KSP, get_attributes, API_VER
_FILE = "services.xml"
SP = "_:::_"
KSP = "_::_"
def write_services(path, services):
@@ -53,7 +52,7 @@ class NeutrinoServiceWriter:
self._path = path
self._services = services
self._api = "4"
self._api = API_VER
self._doc = Document()
self._root = self._doc.createElement("zapit")
self._root.setAttribute("api", self._api)
@@ -81,7 +80,7 @@ class NeutrinoServiceWriter:
for sat in sats:
sat_elem = self._doc.createElement(s_type)
attrs = self.get_attributes(sat)
attrs = get_attributes(sat)
for k, v in attrs.items():
sat_elem.setAttribute(k, v)
@@ -93,13 +92,13 @@ class NeutrinoServiceWriter:
for tr in transponders:
tr_elem = self._doc.createElement("TS")
for k, v in self.get_attributes(tr).items():
for k, v in get_attributes(tr).items():
tr_elem.setAttribute(k, v)
sat_elem.appendChild(tr_elem)
for srv in transponders.get(tr):
srv_elem = self._doc.createElement("S")
s_attrs = self.get_attributes(srv.data_id)
s_attrs = get_attributes(srv.data_id)
api = s_attrs.pop("api", self._api)
if api != self._api:
self._root.setAttribute("api", api)
@@ -109,10 +108,6 @@ class NeutrinoServiceWriter:
tr_elem.appendChild(srv_elem)
@staticmethod
def get_attributes(data):
return {el[0]: el[1] for el in (e.split(KSP) for e in data.split(SP))}
class NeutrinoServicesReader:
@@ -124,56 +119,60 @@ class NeutrinoServicesReader:
self._services = []
def get_services(self):
with open(self._path + _FILE, "rb") as f:
# Pre-processing is required to replace the '&' character.
dom = parseString(re.sub("&", "&", f.read().decode(encoding="utf-8", errors="ignore")))
try:
dom = parse(self._path + _FILE)
except ExpatError as e:
# Some neutrino configuration files may contain text data with invalid characters ['&', etc].
# https://www.w3.org/TR/xml/#syntax
# Apparently there is an error in Neutrino itself and the document is not initially formed correctly.
# TODO: Come up with a way to handle this case.
msg = "The file [{}] is not formatted correctly or contains invalid characters! Cause: {}"
for root in dom.getElementsByTagName("zapit"):
if root.hasAttributes():
api = root.attributes["api"]
self._api = api.value if api else self._api
raise ValueError(msg.format(self._path + _FILE, e))
for elem in root.getElementsByTagName("sat"):
if elem.hasAttributes():
sat_attrs = self.get_attributes(elem)
sat_attrs["name"] = re.sub("&", "&", sat_attrs.get("name", ""))
sat_pos = 0
try:
sat_pos = int(sat_attrs.get("position", "0"))
sat_pos = "{:0.1f}{}".format(abs(sat_pos / 10), "W" if sat_pos < 0 else "E")
except ValueError as e:
log("Neutrino parsing error [parse sat position]: {}".format(e))
sat = SP.join("{}{}{}".format(k, KSP, v) for k, v in sat_attrs.items())
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_sat_transponder(sat, sat_pos, tr_elem)
for root in dom.getElementsByTagName("zapit"):
if root.hasAttributes():
api = root.attributes["api"]
self._api = api.value if api else self._api
# Terrestrial DVB-T[2].
for elem in root.getElementsByTagName("terrestrial"):
if elem.hasAttributes():
terr_attrs = self.get_attributes(elem)
terr_attrs["name"] = re.sub("&amp;", "&", terr_attrs.get("name", ""))
terr = SP.join("{}{}{}".format(k, KSP, v) for k, v in terr_attrs.items())
for elem in root.getElementsByTagName("sat"):
if elem.hasAttributes():
sat_attrs = get_xml_attributes(elem)
sat_pos = 0
try:
sat_pos = int(sat_attrs.get("position", "0"))
sat_pos = "{:0.1f}{}".format(abs(sat_pos / 10), "W" if sat_pos < 0 else "E")
except ValueError as e:
log("Neutrino parsing error [parse sat position]: {}".format(e))
sat = SP.join("{}{}{}".format(k, KSP, v) for k, v in sat_attrs.items())
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_sat_transponder(sat, sat_pos, tr_elem)
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_ct_transponder(terr, tr_elem, TrType.Terrestrial)
# Terrestrial DVB-T[2].
for elem in root.getElementsByTagName("terrestrial"):
if elem.hasAttributes():
terr_attrs = get_xml_attributes(elem)
terr = SP.join("{}{}{}".format(k, KSP, v) for k, v in terr_attrs.items())
# Cable.
for elem in root.getElementsByTagName("cable"):
if elem.hasAttributes():
cable_attrs = self.get_attributes(elem)
terr_attrs["name"] = re.sub("&amp;", "&", cable_attrs.get("name", ""))
cable = SP.join("{}{}{}".format(k, KSP, v) for k, v in cable_attrs.items())
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_ct_transponder(terr, tr_elem, TrType.Terrestrial)
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_ct_transponder(cable, tr_elem, TrType.Cable)
# Cable.
for elem in root.getElementsByTagName("cable"):
if elem.hasAttributes():
cable_attrs = get_xml_attributes(elem)
cable = SP.join("{}{}{}".format(k, KSP, v) for k, v in cable_attrs.items())
return self._services
for tr_elem in elem.getElementsByTagName("TS"):
if tr_elem.hasAttributes():
self.parse_ct_transponder(cable, tr_elem, TrType.Cable)
return self._services
def parse_sat_transponder(self, sat, sat_pos, tr_elem):
tr_attr = self.get_attributes(tr_elem)
tr_attr = get_xml_attributes(tr_elem)
tr = SP.join("{}{}{}".format(k, KSP, v) for k, v in tr_attr.items())
tr_id = tr_attr.get("id", "0").lstrip("0")
on = tr_attr.get("on", "0")
@@ -191,11 +190,9 @@ class NeutrinoServicesReader:
for srv_elem in tr_elem.getElementsByTagName("S"):
if srv_elem.hasAttributes():
at = self.get_attributes(srv_elem)
at = get_xml_attributes(srv_elem)
at["api"] = self._api
ssid, name, s_type, sys = at.get("i", "0"), at.get("n", ""), at.get("t", "3"), at.get("s", "0")
name = re.sub("amp;", "", name)
at["n"] = name
data_id = SP.join("{}{}{}".format(k, KSP, v) for k, v in at.items())
fav_id = "{}:{}:{}".format(tr_id, on.lstrip("0"), ssid.lstrip("0"))
picon_id = "{}{}{}.png".format(tr_id, on, ssid)
@@ -207,17 +204,15 @@ class NeutrinoServicesReader:
self._services.append(srv)
def parse_ct_transponder(self, terr, tr_elem, tr_type):
attrs = self.get_attributes(tr_elem)
attrs = get_xml_attributes(tr_elem)
tr = SP.join("{}{}{}".format(k, KSP, v) for k, v in attrs.items())
tr_id, on, freq = attrs.get("id", "0").lstrip("0"), attrs.get("on", "0"), attrs.get("frq", "0")
for srv_elem in tr_elem.getElementsByTagName("S"):
if srv_elem.hasAttributes():
s_at = self.get_attributes(srv_elem)
s_at = get_xml_attributes(srv_elem)
s_at["api"] = self._api
ssid, name, s_type, sys = s_at.get("i", "0"), s_at.get("n", ""), s_at.get("t", "3"), s_at.get("s", "0")
name = re.sub("amp;", "", name)
s_at["n"] = name
data_id = SP.join("{}{}{}".format(k, KSP, v) for k, v in s_at.items())
fav_id = "{}:{}:{}".format(tr_id, on.lstrip("0"), ssid.lstrip("0"))
picon_id = "{}{}{}.png".format(tr_id, on, ssid)
@@ -238,11 +233,6 @@ class NeutrinoServicesReader:
freq, "0", None, None, sys, pos, data_id, fav_id, tr)
self._services.append(srv)
@staticmethod
def get_attributes(attr):
attrs = attr.attributes
return {t: attrs[t].value for t in attrs.keys()}
if __name__ == "__main__":
pass

View File

@@ -35,7 +35,7 @@ from app.eparser.ecommons import (MODULATION, Inversion, ROLL_OFF, Pilot, Flag,
get_value_by_name, FEC_DEFAULT, PLS_MODE, SERVICE_TYPE, T_MODULATION, C_MODULATION,
TrType, SystemCable, T_SYSTEM, BANDWIDTH, TRANSMISSION_MODE, GUARD_INTERVAL, T_FEC,
HIERARCHY, A_MODULATION)
from app.eparser.neutrino.services import NeutrinoServiceWriter, SP, KSP
from app.eparser.neutrino import get_attributes, SP, KSP
from app.settings import SettingsType
from .dialogs import show_dialog, DialogType, Action, get_builder
from .main_helper import get_base_model
@@ -353,7 +353,7 @@ class ServiceDetailsDialog:
def init_neutrino_data(self, srv):
if self._tr_type is not TrType.Satellite:
return
tr_data = NeutrinoServiceWriter.get_attributes(srv.transponder)
tr_data = get_attributes(srv.transponder)
self._transponder_id_entry.set_text(str(int(tr_data.get("id", "0"), 16)))
self._network_id_entry.set_text(str(int(tr_data.get("on", "0"), 16)))
self.select_active_text(self._invertion_combo_box, Inversion(tr_data.get("inv", "2")).name)
@@ -543,7 +543,7 @@ class ServiceDetailsDialog:
if self._s_type is SettingsType.ENIGMA_2:
return self.get_enigma2_flags()
elif self._s_type is SettingsType.NEUTRINO_MP:
flags = NeutrinoServiceWriter.get_attributes(self._old_service.flags_cas)
flags = get_attributes(self._old_service.flags_cas)
flags["position"] = self.get_sat_position()
return SP.join("{}{}{}".format(k, KSP, v) for k, v in flags.items())
@@ -599,7 +599,7 @@ class ServiceDetailsDialog:
fav_id = self._ENIGMA2_FAV_ID.format(ssid, tr_id, net_id, namespace)
return fav_id, data_id
elif self._s_type is SettingsType.NEUTRINO_MP:
data = NeutrinoServiceWriter.get_attributes(self._old_service.data_id)
data = get_attributes(self._old_service.data_id)
data["n"] = self._name_entry.get_text()
data["t"] = "{:x}".format(int(service_type))
data["i"] = "{:04x}".format(ssid)
@@ -652,7 +652,7 @@ class ServiceDetailsDialog:
return "{}:{}:{}:{}:{}{}".format(dvb_s_tr, flag, mod, roll_off, pilot, pls)
elif self._s_type is SettingsType.NEUTRINO_MP:
tr_data = NeutrinoServiceWriter.get_attributes(self._old_service.transponder)
tr_data = get_attributes(self._old_service.transponder)
tr_data["frq"] = freq
tr_data["sr"] = rate
tr_data["pol"] = pol
@@ -721,7 +721,7 @@ class ServiceDetailsDialog:
continue
if self._s_type is SettingsType.NEUTRINO_MP:
flags = NeutrinoServiceWriter.get_attributes(srv[Column.SRV_CAS_FLAGS])
flags = get_attributes(srv[Column.SRV_CAS_FLAGS])
flags["position"] = sat_pos
srv[Column.SRV_CAS_FLAGS] = SP.join("{}{}{}".format(k, KSP, v) for k, v in flags.items())