mirror of
https://github.com/DYefremov/DemonEditor.git
synced 2026-06-25 00:19:37 +02:00
moving the tests functions
This commit is contained in:
@@ -4,8 +4,10 @@ import time
|
||||
from enum import Enum
|
||||
from ftplib import FTP, error_perm
|
||||
from telnetlib import Telnet
|
||||
from urllib.error import HTTPError
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import Request, urlopen
|
||||
from xml.dom.minidom import parse
|
||||
|
||||
from app.commons import log
|
||||
from app.properties import Profile
|
||||
@@ -25,6 +27,10 @@ class DownloadType(Enum):
|
||||
WEBTV = 4
|
||||
|
||||
|
||||
class TestException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def download_data(*, properties, download_type=DownloadType.ALL, callback=None):
|
||||
with FTP(host=properties["host"], user=properties["user"], passwd=properties["password"]) as ftp:
|
||||
ftp.encoding = "utf-8"
|
||||
@@ -43,7 +49,7 @@ def download_data(*, properties, download_type=DownloadType.ALL, callback=None):
|
||||
name = name.split()[-1]
|
||||
download_file(ftp, name, save_path, callback)
|
||||
# satellites.xml and webtv section
|
||||
if download_type in (DownloadType.ALL, DownloadType.SATELLITES, DownloadType.WEB_TV):
|
||||
if download_type in (DownloadType.ALL, DownloadType.SATELLITES, DownloadType.WEBTV):
|
||||
ftp.cwd(properties["satellites_xml_path"])
|
||||
files.clear()
|
||||
ftp.dir(files.append)
|
||||
@@ -52,7 +58,7 @@ def download_data(*, properties, download_type=DownloadType.ALL, callback=None):
|
||||
name = str(file).strip()
|
||||
if download_type in (DownloadType.ALL, DownloadType.SATELLITES) and name.endswith(_SAT_XML_FILE):
|
||||
download_file(ftp, _SAT_XML_FILE, save_path, callback)
|
||||
if download_type in (DownloadType.ALL, DownloadType.WEB_TV) and name.endswith(_WEBTV_XML_FILE):
|
||||
if download_type in (DownloadType.ALL, DownloadType.WEBTV) and name.endswith(_WEBTV_XML_FILE):
|
||||
download_file(ftp, _WEBTV_XML_FILE, save_path, callback)
|
||||
|
||||
if callback is not None:
|
||||
@@ -97,10 +103,10 @@ def upload_data(*, properties, download_type=DownloadType.ALL, remove_unused=Fal
|
||||
callback()
|
||||
return send
|
||||
|
||||
if profile is Profile.NEUTRINO_MP and download_type in (DownloadType.ALL, DownloadType.WEB_TV):
|
||||
if profile is Profile.NEUTRINO_MP and download_type in (DownloadType.ALL, DownloadType.WEBTV):
|
||||
ftp.cwd(properties["satellites_xml_path"])
|
||||
send = send_file(_WEBTV_XML_FILE, data_path, ftp)
|
||||
if download_type is DownloadType.WEB_TV:
|
||||
if download_type is DownloadType.WEBTV:
|
||||
tn.send("init 6")
|
||||
if callback is not None:
|
||||
callback()
|
||||
@@ -196,7 +202,44 @@ def telnet(host, port=23, user="", password="", timeout=5):
|
||||
yield
|
||||
|
||||
|
||||
def test_telnet(host, port, user, password, timeout):
|
||||
# ***************** Connections testing *******************#
|
||||
|
||||
|
||||
def test_ftp(host, port, user, password, timeout=2):
|
||||
try:
|
||||
with FTP(host=host, user=user, passwd=password, timeout=timeout) as ftp:
|
||||
return ftp.getwelcome()
|
||||
except (error_perm, ConnectionRefusedError, OSError) as e:
|
||||
raise TestException(e)
|
||||
|
||||
|
||||
def test_http(host, port, user, password, timeout=5):
|
||||
try:
|
||||
params = urlencode({"text": "Connection test", "type": 2, "timeout": timeout})
|
||||
with urlopen("http://{}/web/message?%s".format(host) % params, timeout=5) as f:
|
||||
dom = parse(f)
|
||||
msg = ""
|
||||
for elem in dom.getElementsByTagName("e2simplexmlresult"):
|
||||
for ch in elem.childNodes:
|
||||
if ch.nodeType == ch.ELEMENT_NODE:
|
||||
msg = "".join(t.nodeValue for t in ch.childNodes if t.nodeType == t.TEXT_NODE)
|
||||
return msg
|
||||
except (URLError, HTTPError) as e:
|
||||
raise TestException(e)
|
||||
|
||||
|
||||
def test_telnet(host, port, user, password, timeout=2):
|
||||
try:
|
||||
gen = telnet_test(host, port, user, password, timeout)
|
||||
res = next(gen)
|
||||
print(res)
|
||||
res = next(gen)
|
||||
return res
|
||||
except (socket.timeout, OSError) as e:
|
||||
raise TestException(e)
|
||||
|
||||
|
||||
def telnet_test(host, port, user, password, timeout):
|
||||
tn = Telnet(host=host, port=port, timeout=timeout)
|
||||
time.sleep(1)
|
||||
tn.read_until(b"login: ", timeout=2)
|
||||
|
||||
@@ -1,13 +1,7 @@
|
||||
import socket
|
||||
from enum import Enum
|
||||
from ftplib import error_perm, FTP
|
||||
from urllib.error import URLError, HTTPError
|
||||
from urllib.parse import urlencode
|
||||
from urllib.request import urlopen
|
||||
from xml.dom.minidom import parse
|
||||
|
||||
from app.commons import run_task, run_idle
|
||||
from app.connections import test_telnet
|
||||
from app.connections import test_telnet, test_ftp, TestException, test_http
|
||||
from app.properties import write_config, Profile, get_default_settings
|
||||
from .uicommons import Gtk, UI_RESOURCES_PATH, TEXT_DOMAIN
|
||||
from .main_helper import update_entry_data
|
||||
@@ -171,16 +165,10 @@ class SettingsDialog:
|
||||
|
||||
def test_http(self):
|
||||
user, password = self._http_login_field.get_text(), self._http_password_field.get_text()
|
||||
host, port = self._host_field.get_text(), self._http_port_field.get_text()
|
||||
try:
|
||||
params = urlencode({"text": "Connection test", "type": 2, "timeout": 5})
|
||||
with urlopen("http://{}/web/message?%s".format(self._host_field.get_text()) % params, timeout=5) as f:
|
||||
dom = parse(f)
|
||||
for elem in dom.getElementsByTagName("e2simplexmlresult"):
|
||||
for ch in elem.childNodes:
|
||||
if ch.nodeType == ch.ELEMENT_NODE:
|
||||
msg = "".join(t.nodeValue for t in ch.childNodes if t.nodeType == t.TEXT_NODE)
|
||||
self.show_info_message(msg, Gtk.MessageType.INFO)
|
||||
except (URLError, HTTPError) as e:
|
||||
self.show_info_message(test_http(host, port, user, password), Gtk.MessageType.INFO)
|
||||
except TestException as e:
|
||||
self.show_info_message(str(e), Gtk.MessageType.ERROR)
|
||||
finally:
|
||||
self.show_spinner(False)
|
||||
@@ -190,13 +178,9 @@ class SettingsDialog:
|
||||
host, port = self._host_field.get_text(), self._telnet_port_field.get_text()
|
||||
user, password = self._telnet_login_field.get_text(), self._telnet_password_field.get_text()
|
||||
try:
|
||||
gen = test_telnet(host, port, user, password, timeout)
|
||||
res = next(gen)
|
||||
print(res)
|
||||
res = next(gen)
|
||||
self.show_info_message(str(res), Gtk.MessageType.INFO)
|
||||
self.show_info_message(test_telnet(host, port, user, password, timeout), Gtk.MessageType.INFO)
|
||||
self.show_spinner(False)
|
||||
except (socket.timeout, OSError) as e:
|
||||
except TestException as e:
|
||||
self.show_info_message(str(e), Gtk.MessageType.ERROR)
|
||||
self.show_spinner(False)
|
||||
|
||||
@@ -204,9 +188,8 @@ class SettingsDialog:
|
||||
host, port = self._host_field.get_text(), self._port_field.get_text()
|
||||
user, password = self._login_field.get_text(), self._password_field.get_text()
|
||||
try:
|
||||
with FTP(host=host, user=user, passwd=password, timeout=5) as ftp:
|
||||
self.show_info_message("OK. {}".format(ftp.getwelcome()), Gtk.MessageType.INFO)
|
||||
except (error_perm, ConnectionRefusedError, OSError) as e:
|
||||
self.show_info_message("OK. {}".format(test_ftp(host, port, user, password)), Gtk.MessageType.INFO)
|
||||
except TestException as e:
|
||||
self.show_info_message(str(e), Gtk.MessageType.ERROR)
|
||||
finally:
|
||||
self.show_spinner(False)
|
||||
|
||||
Reference in New Issue
Block a user