From e57e8f9dd78ef1e650200865f71ab6e3ab8d534e Mon Sep 17 00:00:00 2001 From: DYefremov Date: Sun, 17 Oct 2021 23:05:29 +0300 Subject: [PATCH] added simple telnet client --- app/ui/app_menu.ui | 12 ++- app/ui/main.glade | 15 +++ app/ui/main.py | 17 +++- app/ui/telnet.glade | 183 ++++++++++++++++++++++++++++++++++++ app/ui/telnet.py | 224 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 446 insertions(+), 5 deletions(-) create mode 100644 app/ui/telnet.glade create mode 100644 app/ui/telnet.py diff --git a/app/ui/app_menu.ui b/app/ui/app_menu.ui index d922f964..0f7d3ca6 100644 --- a/app/ui/app_menu.ui +++ b/app/ui/app_menu.ui @@ -143,8 +143,10 @@ Backups app.on_backup_tool_show - -
+ + Telnet + app.on_telnet_show +
@@ -327,8 +329,10 @@ Backups app.on_backup_tool_show - -
+ + Telnet + app.on_telnet_show +
diff --git a/app/ui/main.glade b/app/ui/main.glade index cebee79a..493fe476 100644 --- a/app/ui/main.glade +++ b/app/ui/main.glade @@ -3255,6 +3255,21 @@ Author: Dmitriy Yefremov + + True + True + 0 + + + + + False + vertical + + + + + True True diff --git a/app/ui/main.py b/app/ui/main.py index 116aea8b..7ece0313 100644 --- a/app/ui/main.py +++ b/app/ui/main.py @@ -52,6 +52,7 @@ from app.ui.control import ControlTool, EpgTool, TimerTool, RecordingsTool from app.ui.epg import EpgDialog from app.ui.ftp import FtpClientBox from app.ui.playback import PlayerBox +from app.ui.telnet import TelnetClient from app.ui.transmitter import LinksTransmitter from .backup import BackupDialog, backup_data, clear_data_path from .dialogs import show_dialog, DialogType, get_chooser_dialog, WaitDialog, get_message, get_builder @@ -193,6 +194,7 @@ class Application(Gtk.Application): "on_recordings_realize": self.on_recordings_realize, "on_control_realize": self.on_control_realize, "on_ftp_realize": self.on_ftp_realize, + "on_telnet_relize": self.on_telnet_relize, "on_visible_page": self.on_visible_page, "on_data_paned_realize": self.init_main_paned_position} @@ -384,6 +386,7 @@ class Application(Gtk.Application): self._stack_recordings_box = builder.get_object("recordings_box") self._stack_ftp_box = builder.get_object("ftp_box") self._stack_control_box = builder.get_object("control_box") + self._telnet_box = builder.get_object("telnet_box") self.connect("change-page", self.on_page_change) # Header bar. profile_box = builder.get_object("profile_combo_box") @@ -460,6 +463,7 @@ class Application(Gtk.Application): self.set_action("on_backup_tool_show", self.on_backup_tool_show) self.set_action("on_about_app", self.on_about_app) self.set_action("on_close_app", self.on_close_app) + self.set_state_action("on_telnet_show", self.on_telnet_show, False) # Filter. filter_action = Gio.SimpleAction.new("filter", None) filter_action.connect("activate", lambda a, v: self.emit("filter-toggled", None)) @@ -539,6 +543,7 @@ class Application(Gtk.Application): self.set_accels_for_action("app.on_locked", ["l"]) self.set_accels_for_action("app.on_close_app", ["q"]) self.set_accels_for_action("app.on_edit", ["e"]) + self.set_accels_for_action("app.on_telnet_show", ["t"]) self.set_accels_for_action("win.filter", ["f"]) def do_activate(self): @@ -782,10 +787,14 @@ class Application(Gtk.Application): self._ftp_client = FtpClientBox(self, self._settings) box.pack_start(self._ftp_client, True, True, 0) - def on_control_realize(self, box: Gtk.HBox): + def on_control_realize(self, box): self._control_tool = ControlTool(self, self._http_api, self._settings) box.pack_start(self._control_tool, True, True, 0) + def on_telnet_relize(self, box): + telnet_tool = TelnetClient(self, self._settings) + box.pack_start(telnet_tool, True, True, 0) + def on_visible_page(self, stack, param): self._page = Page(stack.get_visible_child_name()) self._fav_paned.set_visible(self._page in self._fav_pages) @@ -2646,6 +2655,12 @@ class Application(Gtk.Application): """ Shows backup tool dialog """ BackupDialog(self._main_window, self._settings, self.open_data).show() + # ***************** Telnet ******************** # + + def on_telnet_show(self, action, value=False): + action.set_state(value) + GLib.idle_add(self._telnet_box.set_visible, value) + # ************************* Streams ***************************** # def on_play_stream(self, item=None): diff --git a/app/ui/telnet.glade b/app/ui/telnet.glade new file mode 100644 index 00000000..64845e88 --- /dev/null +++ b/app/ui/telnet.glade @@ -0,0 +1,183 @@ + + + + + + + + + + + + Normal + False + + + + + tag_table + + + True + False + 5 + 5 + 5 + 5 + 0.49000000953674316 + in + + + 480 + 180 + True + False + 5 + 5 + 5 + vertical + 5 + + + True + False + 5 + 5 + 2 + 5 + + + True + True + True + Connect + + + + True + False + gtk-connect + + + + + False + True + 0 + + + + + True + True + Disconnect + + + + True + False + gtk-disconnect + + + + + False + True + 1 + + + + + True + True + True + Clear + center + center + + + + True + False + gtk-clear + + + + + False + True + 2 + + + + + False + True + 0 + + + + + True + True + in + + + textview-large + True + True + char + 5 + 5 + text_buffer + True + GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE + True + + + + + + + True + True + 1 + + + + + + + True + False + Telnet + + + + diff --git a/app/ui/telnet.py b/app/ui/telnet.py new file mode 100644 index 00000000..647bf7be --- /dev/null +++ b/app/ui/telnet.py @@ -0,0 +1,224 @@ +import re +import selectors +import socket +from collections import deque +from telnetlib import Telnet + +from gi.repository import GLib + +from app.commons import run_task, run_idle, log +from app.ui.uicommons import Gtk, Gdk, UI_RESOURCES_PATH, KeyboardKey, MOD_MASK + + +class ExtTelnet(Telnet): + + def __init__(self, output_callback, **kwargs): + super().__init__(**kwargs) + self._output_callback = output_callback + + def interact(self): + """ Interaction function, emulates a very dumb telnet client. """ + with selectors.DefaultSelector() as selector: + selector.register(self, selectors.EVENT_READ) + + while True: + for key, events in selector.select(): + if key.fileobj is self: + try: + text = self.read_very_eager() + except EOFError as e: + msg = "\n*** Connection closed by remote host ***\n" + self._output_callback(msg) + log(msg) + raise e + else: + if text: + self._output_callback(text) + + +class TelnetClient(Gtk.Box): + """ Very simple telnet client. """ + _COLOR_PATTERN = re.compile("\x1b.*?m") # Color info + _ERASING_PATTERN = re.compile("\x1b.*?K") # Erase to right + _APP_MODE_PATTERN = re.compile("\x1b.*?(1h)|(1l)") # h - on, l - off + _ALL_PATTERN = re.compile(r'(\x1b\[|\x9b)[0-?]*[@-~]') + _NOT_SUPPORTED = {"mc", "mcedit", "vi", "nano"} + + def __init__(self, app, settings, *args, **kwargs): + super().__init__(*args, **kwargs) + self._app = app + self._app.connect("profile-changed", self.on_profile_changed) + + self._tn = None + self._app_mode = False + self._commands = deque(maxlen=10) + + self._handlers = {"on_clear": self.on_clear, + "on_text_view_realize": self.on_text_view_realize, + "on_view_key_press": self.on_view_key_press, + "on_connect": self.on_connect, + "on_disconnect": self.on_disconnect} + + builder = Gtk.Builder() + builder.add_from_file(UI_RESOURCES_PATH + "telnet.glade") + builder.connect_signals(self._handlers) + + self._text_view = builder.get_object("text_view") + self._buf = builder.get_object("text_buffer") + self._end_tag = builder.get_object("end_tag") + self._connect_button = builder.get_object("connect_button") + self._connect_button.bind_property("visible", builder.get_object("disconnect_button"), "visible", 4) + + main_frame = builder.get_object("telnet_frame") + provider = Gtk.CssProvider() + provider.load_from_path(UI_RESOURCES_PATH + "style.css") + main_frame.get_style_context().add_provider(provider, Gtk.STYLE_PROVIDER_PRIORITY_USER) + + self.pack_start(main_frame, True, True, 0) + self.show() + + def on_profile_changed(self, app, data): + self.on_clear() + self.on_disconnect() + self.on_connect() + + def on_text_view_realize(self, view): + self.on_connect() + + @run_task + def on_connect(self, item=None): + try: + GLib.idle_add(self._connect_button.set_visible, False) + settings = self._app.app_settings + user, password, timeout = settings.user, settings.password, settings.telnet_timeout + self._tn = ExtTelnet(self.append_output, host=settings.host, port=settings.telnet_port, timeout=timeout) + + if user != "": + self._tn.read_until(b"login: ") + self._tn.write(user.encode("utf-8") + b"\n") + if password != "": + self._tn.read_until(b"Password: ") + self._tn.write(password.encode("utf-8") + b"\n") + + self._tn.interact() + except (OSError, EOFError, socket.timeout, ConnectionRefusedError) as e: + log(f"{self.__class__.__name__}: {e}") + self._app.show_info_message(str(e), Gtk.MessageType.ERROR) + finally: + GLib.idle_add(self._connect_button.set_visible, True) + + @run_task + def on_disconnect(self, item=None): + if self._tn: + GLib.idle_add(self._connect_button.set_visible, True) + self._tn.close() + + def on_command_done(self, entry): + command = entry.get_text() + entry.set_text("") + if command and self._tn: + self._tn.write(command.encode("ascii") + b"\r") + + def on_clear(self, item=None): + self._buf.delete(self._buf.get_start_iter(), self._buf.get_end_iter()) + + def on_view_key_press(self, view, event): + """ Handling keystrokes on press. """ + if event.keyval == Gdk.KEY_Return: + self.do_command() + return True + + key_code = event.hardware_keycode + if not KeyboardKey.value_exist(key_code): + return + + key = KeyboardKey(key_code) + ctrl = event.state & MOD_MASK + if ctrl and key is KeyboardKey.C: + if self._tn and self._tn.sock: + self._tn.write(b"\x03") # interrupt + + # Last commands navigation. + if key is KeyboardKey.UP: + self.delete_last_command() + if self._commands: + cmd = self._commands.pop() + self._commands.appendleft(cmd) + self._buf.insert_at_cursor(cmd, -1) + return True + elif key is KeyboardKey.DOWN: + self.delete_last_command() + if self._commands: + cmd = self._commands.popleft() + self._commands.append(cmd) + self._buf.insert_at_cursor(cmd, -1) + return True + + def delete_last_command(self): + end = self._buf.get_end_iter() + if end.ends_tag(self._end_tag): + return + + if end.backward_to_tag_toggle(self._end_tag): + self._buf.delete(self._buf.get_end_iter(), end) + + def do_command(self): + count = self._buf.get_line_count() + begin = self._buf.get_iter_at_line(count) + end = self._buf.get_end_iter() + command = [] + + while end.backward_to_tag_toggle(self._end_tag): + command.append(self._buf.get_text(end, begin, False)) + break + else: # if buf is empty + command.append(self._buf.get_text(begin, end, False)) + + # To preventing duplication of the command in the buf. + self._buf.delete(end, begin) + + if command and self._tn.sock: + cmd = command[0] + if cmd in self._NOT_SUPPORTED: + self._app.show_info_message(f"'{cmd}' is not supported by this client.", Gtk.MessageType.ERROR) + else: + self._tn.write(cmd.encode("ascii") + b"\r") + self._commands.append(cmd) + + @run_idle + def append_output(self, txt): + t = txt.decode("ascii", errors="ignore") + + ap = re.search(self._APP_MODE_PATTERN, t) + if ap: + on, of = ap.group(1), ap.group(2) + if on: + self._app_mode = True + elif of: + self._app_mode = False + self.on_clear() + + t = re.sub(self._ALL_PATTERN, "", t) # Removing [replacing] ascii escape sequences. + + if self._app_mode: + start, end = self._buf.get_start_iter(), self._buf.get_end_iter() + count = self._buf.get_line_count() + new_lines = t.split("\r\n") + ext_lines = self._buf.get_text(start, end, True).split("\r\n") + if count < len(new_lines): + self._buf.set_text(re.sub(self._ERASING_PATTERN, "", t)) + else: + for i, line in enumerate(new_lines): + if line: + ext_lines[i] = re.sub(self._ERASING_PATTERN, "", line) + self._buf.set_text("\r\n".join(ext_lines)) + else: + self._buf.insert_at_cursor(t, -1) + + insert = self._buf.get_insert() + self._text_view.scroll_to_mark(insert, 0.0, True, 0.0, 1.0) + self._buf.apply_tag(self._end_tag, self._buf.get_start_iter(), self._buf.get_end_iter()) + + +if __name__ == "__main__": + pass