mirror of
https://github.com/DYefremov/DemonEditor.git
synced 2026-05-07 11:37:06 +02:00
added token security support
This commit is contained in:
@@ -24,8 +24,6 @@ _DATA_FILES_LIST = ("lamedb", "lamedb5", "services.xml", "blacklist", "whitelist
|
||||
_SAT_XML_FILE = "satellites.xml"
|
||||
_WEBTV_XML_FILE = "webtv.xml"
|
||||
|
||||
_HEADERS = {"User-Agent": "Mozilla/5.0 (X11; Linux i586; rv:31.0) Gecko/20100101 Firefox/71.0"}
|
||||
|
||||
|
||||
class DownloadType(Enum):
|
||||
ALL = 0
|
||||
@@ -40,10 +38,11 @@ class HttpRequestType(Enum):
|
||||
ZAP = "zap?sRef="
|
||||
INFO = "about"
|
||||
SIGNAL = "tunersignal"
|
||||
STREAM = "streamcurrentm3u"
|
||||
STREAM = "streamcurrent.m3u"
|
||||
CURRENT = "getcurrent"
|
||||
PLAY = "mediaplayerplay?file=4097:0:1:0:0:0:0:0:0:0:"
|
||||
TEST = None
|
||||
TOKEN = "session"
|
||||
|
||||
|
||||
class TestException(Exception):
|
||||
@@ -106,12 +105,13 @@ def upload_data(*, settings, download_type=DownloadType.ALL, remove_unused=False
|
||||
s_type = settings.setting_type
|
||||
data_path = settings.data_local_path
|
||||
host = settings.host
|
||||
base_url = "http{}://{}:{}/web/".format("s" if settings.http_use_ssl else "", host, settings.http_port)
|
||||
base_url = "http{}://{}:{}".format("s" if settings.http_use_ssl else "", host, settings.http_port)
|
||||
url = "{}/web/".format(base_url)
|
||||
tn, ht = None, None # telnet, http
|
||||
|
||||
try:
|
||||
if s_type is SettingsType.ENIGMA_2 and use_http:
|
||||
ht = http(settings.http_user, settings.http_password, base_url, callback)
|
||||
ht = http(settings.http_user, settings.http_password, base_url, callback, settings.http_use_ssl)
|
||||
next(ht)
|
||||
message = ""
|
||||
if download_type is DownloadType.BOUQUETS:
|
||||
@@ -122,12 +122,11 @@ def upload_data(*, settings, download_type=DownloadType.ALL, remove_unused=False
|
||||
message = "Satellites.xml file will be updated!"
|
||||
|
||||
params = urlencode({"text": message, "type": 2, "timeout": 5})
|
||||
url = base_url + "message?{}".format(params)
|
||||
ht.send(url)
|
||||
ht.send((url + "message?{}".format(params), "Sending info message... "))
|
||||
|
||||
if download_type is DownloadType.ALL:
|
||||
time.sleep(5)
|
||||
ht.send(base_url + "powerstate?newstate=0")
|
||||
ht.send((url + "powerstate?newstate=0", "Toggle Standby "))
|
||||
time.sleep(2)
|
||||
else:
|
||||
# telnet
|
||||
@@ -172,10 +171,10 @@ def upload_data(*, settings, download_type=DownloadType.ALL, remove_unused=False
|
||||
tn.send("init 3" if s_type is SettingsType.ENIGMA_2 else "init 6")
|
||||
elif ht and use_http:
|
||||
if download_type is DownloadType.BOUQUETS:
|
||||
ht.send(base_url + "servicelistreload?mode=2")
|
||||
ht.send((url + "servicelistreload?mode=2", "Reloading Userbouquets."))
|
||||
elif download_type is DownloadType.ALL:
|
||||
ht.send(base_url + "servicelistreload?mode=0")
|
||||
ht.send(base_url + "powerstate?newstate=4")
|
||||
ht.send((url + "servicelistreload?mode=0", "Reloading lamedb and Userbouquets."))
|
||||
ht.send((url + "powerstate?newstate=4", "Wakeup from Standby."))
|
||||
|
||||
if done_callback is not None:
|
||||
done_callback()
|
||||
@@ -247,13 +246,14 @@ def send_file(file_name, path, ftp, callback):
|
||||
callback("Uploading file: {}. Status: {}\n".format(file_name, str(ftp.storbinary("STOR " + file_name, f))))
|
||||
|
||||
|
||||
def http(user, password, url, callback):
|
||||
init_auth(user, password, url)
|
||||
def http(user, password, url, callback, use_ssl=False):
|
||||
init_auth(user, password, url, use_ssl)
|
||||
data = get_post_data(url, password, url)
|
||||
|
||||
while True:
|
||||
url = yield
|
||||
msg = get_response(HttpRequestType.TEST, url).get("e2statetext", None)
|
||||
if msg:
|
||||
callback("HTTP: {}\n".format(msg))
|
||||
url, message = yield
|
||||
resp = get_response(HttpRequestType.TEST, url, data).get("e2statetext", None)
|
||||
callback("HTTP: {} {}\n".format(message, "Successful." if resp and message else ""))
|
||||
|
||||
|
||||
def telnet(host, port=23, user="", password="", timeout=5):
|
||||
@@ -289,6 +289,8 @@ class HttpAPI:
|
||||
def __init__(self, settings):
|
||||
self._settings = settings
|
||||
self._base_url = None
|
||||
self._session_id = 0
|
||||
self._data = None
|
||||
self.init()
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
|
||||
@@ -302,22 +304,27 @@ class HttpAPI:
|
||||
elif req_type is HttpRequestType.PLAY:
|
||||
url += urllib.parse.quote(ref).replace("%3A", "%253A")
|
||||
|
||||
future = self._executor.submit(get_response, req_type, url)
|
||||
future = self._executor.submit(get_response, req_type, url, self._data)
|
||||
future.add_done_callback(lambda f: callback(f.result()))
|
||||
|
||||
def init(self):
|
||||
user, password = self._settings.http_user, self._settings.http_password
|
||||
use_ssl = self._settings.http_use_ssl
|
||||
url = "http{}://{}:{}".format("s" if use_ssl else "", self._settings.host, self._settings.http_port)
|
||||
self._base_url = "{}/web/".format(url)
|
||||
init_auth(self._settings.http_user, self._settings.http_password, url, use_ssl)
|
||||
init_auth(user, password, url, use_ssl)
|
||||
url = "{}/web/{}".format(url, HttpRequestType.TOKEN.value)
|
||||
s_id = get_session_id(user, password, url)
|
||||
if s_id != "0":
|
||||
self._data = urllib.parse.urlencode({"user": user, "password": password, "sessionid": s_id}).encode("utf-8")
|
||||
|
||||
def close(self):
|
||||
self._executor.shutdown(False)
|
||||
|
||||
|
||||
def get_response(req_type, url):
|
||||
def get_response(req_type, url, data=None):
|
||||
try:
|
||||
with urlopen(Request(url, headers=_HEADERS), timeout=10) as f:
|
||||
with urlopen(Request(url, data=data), timeout=10) as f:
|
||||
if req_type is HttpRequestType.STREAM:
|
||||
return f.read().decode("utf-8")
|
||||
elif req_type is HttpRequestType.CURRENT:
|
||||
@@ -334,6 +341,36 @@ def get_response(req_type, url):
|
||||
return {}
|
||||
|
||||
|
||||
def init_auth(user, password, url, use_ssl=False):
|
||||
""" Init authentication """
|
||||
pass_mgr = HTTPPasswordMgrWithDefaultRealm()
|
||||
pass_mgr.add_password(None, url, user, password)
|
||||
auth_handler = HTTPBasicAuthHandler(pass_mgr)
|
||||
|
||||
if use_ssl:
|
||||
import ssl
|
||||
from urllib.request import HTTPSHandler
|
||||
|
||||
opener = build_opener(auth_handler, HTTPSHandler(context=ssl._create_unverified_context()))
|
||||
else:
|
||||
opener = build_opener(auth_handler)
|
||||
|
||||
install_opener(opener)
|
||||
|
||||
|
||||
def get_session_id(user, password, url):
|
||||
data = urllib.parse.urlencode(dict(user=user, password=password)).encode("utf-8")
|
||||
return get_response(HttpRequestType.TOKEN, url, data=data).get("e2sessionid", "0")
|
||||
|
||||
|
||||
def get_post_data(base_url, password, user):
|
||||
s_id = get_session_id(user, password, "{}/web/{}".format(base_url, HttpRequestType.TOKEN.value))
|
||||
data = None
|
||||
if s_id != "0":
|
||||
data = urllib.parse.urlencode({"user": user, "password": password, "sessionid": s_id}).encode("utf-8")
|
||||
return data
|
||||
|
||||
|
||||
# ***************** Connections testing *******************#
|
||||
|
||||
def test_ftp(host, port, user, password, timeout=5):
|
||||
@@ -350,30 +387,14 @@ def test_http(host, port, user, password, timeout=5, use_ssl=False, skip_message
|
||||
base_url = "http{}://{}:{}".format("s" if use_ssl else "", host, port)
|
||||
# authentication
|
||||
init_auth(user, password, base_url, use_ssl)
|
||||
data = get_post_data(base_url, password, user)
|
||||
|
||||
try:
|
||||
return get_response(HttpRequestType.TEST, "{}/web/{}".format(base_url, params)).get("e2statetext", "")
|
||||
return get_response(HttpRequestType.TEST, "{}/web/{}".format(base_url, params), data).get("e2statetext", "")
|
||||
except (RemoteDisconnected, URLError, HTTPError) as e:
|
||||
raise TestException(e)
|
||||
|
||||
|
||||
def init_auth(user, password, url, use_ssl=False):
|
||||
""" Init authentication """
|
||||
pass_mgr = HTTPPasswordMgrWithDefaultRealm()
|
||||
pass_mgr.add_password(None, url, user, password)
|
||||
auth_handler = HTTPBasicAuthHandler(pass_mgr)
|
||||
|
||||
if use_ssl:
|
||||
import ssl
|
||||
from urllib.request import HTTPSHandler
|
||||
# https://stackoverflow.com/a/28052583
|
||||
context = ssl._create_unverified_context()
|
||||
opener = build_opener(auth_handler, HTTPSHandler(context=context))
|
||||
else:
|
||||
opener = build_opener(auth_handler)
|
||||
|
||||
install_opener(opener)
|
||||
|
||||
|
||||
def test_telnet(host, port, user, password, timeout=5):
|
||||
try:
|
||||
gen = telnet_test(host, port, user, password, timeout)
|
||||
|
||||
Reference in New Issue
Block a user