From 4c555bb7f4ac638d701c809270b67dd5f02841f4 Mon Sep 17 00:00:00 2001 From: DYefremov Date: Thu, 8 Nov 2018 13:07:24 +0300 Subject: [PATCH] moving the tests functions --- app/connections.py | 55 ++++++++++++++++++++++++++++++++++----- app/ui/settings_dialog.py | 33 ++++++----------------- 2 files changed, 57 insertions(+), 31 deletions(-) diff --git a/app/connections.py b/app/connections.py index 60fd9da3..f2adcce5 100644 --- a/app/connections.py +++ b/app/connections.py @@ -4,8 +4,10 @@ import time from enum import Enum from ftplib import FTP, error_perm from telnetlib import Telnet -from urllib.error import HTTPError +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode from urllib.request import Request, urlopen +from xml.dom.minidom import parse from app.commons import log from app.properties import Profile @@ -25,6 +27,10 @@ class DownloadType(Enum): WEBTV = 4 +class TestException(Exception): + pass + + def download_data(*, properties, download_type=DownloadType.ALL, callback=None): with FTP(host=properties["host"], user=properties["user"], passwd=properties["password"]) as ftp: ftp.encoding = "utf-8" @@ -43,7 +49,7 @@ def download_data(*, properties, download_type=DownloadType.ALL, callback=None): name = name.split()[-1] download_file(ftp, name, save_path, callback) # satellites.xml and webtv section - if download_type in (DownloadType.ALL, DownloadType.SATELLITES, DownloadType.WEB_TV): + if download_type in (DownloadType.ALL, DownloadType.SATELLITES, DownloadType.WEBTV): ftp.cwd(properties["satellites_xml_path"]) files.clear() ftp.dir(files.append) @@ -52,7 +58,7 @@ def download_data(*, properties, download_type=DownloadType.ALL, callback=None): name = str(file).strip() if download_type in (DownloadType.ALL, DownloadType.SATELLITES) and name.endswith(_SAT_XML_FILE): download_file(ftp, _SAT_XML_FILE, save_path, callback) - if download_type in (DownloadType.ALL, DownloadType.WEB_TV) and name.endswith(_WEBTV_XML_FILE): + if download_type in (DownloadType.ALL, DownloadType.WEBTV) and name.endswith(_WEBTV_XML_FILE): download_file(ftp, _WEBTV_XML_FILE, save_path, callback) if callback is not None: @@ -97,10 +103,10 @@ def upload_data(*, properties, download_type=DownloadType.ALL, remove_unused=Fal callback() return send - if profile is Profile.NEUTRINO_MP and download_type in (DownloadType.ALL, DownloadType.WEB_TV): + if profile is Profile.NEUTRINO_MP and download_type in (DownloadType.ALL, DownloadType.WEBTV): ftp.cwd(properties["satellites_xml_path"]) send = send_file(_WEBTV_XML_FILE, data_path, ftp) - if download_type is DownloadType.WEB_TV: + if download_type is DownloadType.WEBTV: tn.send("init 6") if callback is not None: callback() @@ -196,7 +202,44 @@ def telnet(host, port=23, user="", password="", timeout=5): yield -def test_telnet(host, port, user, password, timeout): +# ***************** Connections testing *******************# + + +def test_ftp(host, port, user, password, timeout=2): + try: + with FTP(host=host, user=user, passwd=password, timeout=timeout) as ftp: + return ftp.getwelcome() + except (error_perm, ConnectionRefusedError, OSError) as e: + raise TestException(e) + + +def test_http(host, port, user, password, timeout=5): + try: + params = urlencode({"text": "Connection test", "type": 2, "timeout": timeout}) + with urlopen("http://{}/web/message?%s".format(host) % params, timeout=5) as f: + dom = parse(f) + msg = "" + for elem in dom.getElementsByTagName("e2simplexmlresult"): + for ch in elem.childNodes: + if ch.nodeType == ch.ELEMENT_NODE: + msg = "".join(t.nodeValue for t in ch.childNodes if t.nodeType == t.TEXT_NODE) + return msg + except (URLError, HTTPError) as e: + raise TestException(e) + + +def test_telnet(host, port, user, password, timeout=2): + try: + gen = telnet_test(host, port, user, password, timeout) + res = next(gen) + print(res) + res = next(gen) + return res + except (socket.timeout, OSError) as e: + raise TestException(e) + + +def telnet_test(host, port, user, password, timeout): tn = Telnet(host=host, port=port, timeout=timeout) time.sleep(1) tn.read_until(b"login: ", timeout=2) diff --git a/app/ui/settings_dialog.py b/app/ui/settings_dialog.py index d88b06c4..f840e926 100644 --- a/app/ui/settings_dialog.py +++ b/app/ui/settings_dialog.py @@ -1,13 +1,7 @@ -import socket from enum import Enum -from ftplib import error_perm, FTP -from urllib.error import URLError, HTTPError -from urllib.parse import urlencode -from urllib.request import urlopen -from xml.dom.minidom import parse from app.commons import run_task, run_idle -from app.connections import test_telnet +from app.connections import test_telnet, test_ftp, TestException, test_http from app.properties import write_config, Profile, get_default_settings from .uicommons import Gtk, UI_RESOURCES_PATH, TEXT_DOMAIN from .main_helper import update_entry_data @@ -171,16 +165,10 @@ class SettingsDialog: def test_http(self): user, password = self._http_login_field.get_text(), self._http_password_field.get_text() + host, port = self._host_field.get_text(), self._http_port_field.get_text() try: - params = urlencode({"text": "Connection test", "type": 2, "timeout": 5}) - with urlopen("http://{}/web/message?%s".format(self._host_field.get_text()) % params, timeout=5) as f: - dom = parse(f) - for elem in dom.getElementsByTagName("e2simplexmlresult"): - for ch in elem.childNodes: - if ch.nodeType == ch.ELEMENT_NODE: - msg = "".join(t.nodeValue for t in ch.childNodes if t.nodeType == t.TEXT_NODE) - self.show_info_message(msg, Gtk.MessageType.INFO) - except (URLError, HTTPError) as e: + self.show_info_message(test_http(host, port, user, password), Gtk.MessageType.INFO) + except TestException as e: self.show_info_message(str(e), Gtk.MessageType.ERROR) finally: self.show_spinner(False) @@ -190,13 +178,9 @@ class SettingsDialog: host, port = self._host_field.get_text(), self._telnet_port_field.get_text() user, password = self._telnet_login_field.get_text(), self._telnet_password_field.get_text() try: - gen = test_telnet(host, port, user, password, timeout) - res = next(gen) - print(res) - res = next(gen) - self.show_info_message(str(res), Gtk.MessageType.INFO) + self.show_info_message(test_telnet(host, port, user, password, timeout), Gtk.MessageType.INFO) self.show_spinner(False) - except (socket.timeout, OSError) as e: + except TestException as e: self.show_info_message(str(e), Gtk.MessageType.ERROR) self.show_spinner(False) @@ -204,9 +188,8 @@ class SettingsDialog: host, port = self._host_field.get_text(), self._port_field.get_text() user, password = self._login_field.get_text(), self._password_field.get_text() try: - with FTP(host=host, user=user, passwd=password, timeout=5) as ftp: - self.show_info_message("OK. {}".format(ftp.getwelcome()), Gtk.MessageType.INFO) - except (error_perm, ConnectionRefusedError, OSError) as e: + self.show_info_message("OK. {}".format(test_ftp(host, port, user, password)), Gtk.MessageType.INFO) + except TestException as e: self.show_info_message(str(e), Gtk.MessageType.ERROR) finally: self.show_spinner(False)