diff --git a/app/ui/app_menu.ui b/app/ui/app_menu.ui
index d922f964..0f7d3ca6 100644
--- a/app/ui/app_menu.ui
+++ b/app/ui/app_menu.ui
@@ -143,8 +143,10 @@
Backups
app.on_backup_tool_show
-
-
+ -
+ Telnet
+ app.on_telnet_show
+
@@ -327,8 +329,10 @@
Backups
app.on_backup_tool_show
-
-
+ -
+ Telnet
+ app.on_telnet_show
+
diff --git a/app/ui/main.glade b/app/ui/main.glade
index cebee79a..493fe476 100644
--- a/app/ui/main.glade
+++ b/app/ui/main.glade
@@ -3255,6 +3255,21 @@ Author: Dmitriy Yefremov
+
+ True
+ True
+ 0
+
+
+
+
True
True
diff --git a/app/ui/main.py b/app/ui/main.py
index 116aea8b..7ece0313 100644
--- a/app/ui/main.py
+++ b/app/ui/main.py
@@ -52,6 +52,7 @@ from app.ui.control import ControlTool, EpgTool, TimerTool, RecordingsTool
from app.ui.epg import EpgDialog
from app.ui.ftp import FtpClientBox
from app.ui.playback import PlayerBox
+from app.ui.telnet import TelnetClient
from app.ui.transmitter import LinksTransmitter
from .backup import BackupDialog, backup_data, clear_data_path
from .dialogs import show_dialog, DialogType, get_chooser_dialog, WaitDialog, get_message, get_builder
@@ -193,6 +194,7 @@ class Application(Gtk.Application):
"on_recordings_realize": self.on_recordings_realize,
"on_control_realize": self.on_control_realize,
"on_ftp_realize": self.on_ftp_realize,
+ "on_telnet_relize": self.on_telnet_relize,
"on_visible_page": self.on_visible_page,
"on_data_paned_realize": self.init_main_paned_position}
@@ -384,6 +386,7 @@ class Application(Gtk.Application):
self._stack_recordings_box = builder.get_object("recordings_box")
self._stack_ftp_box = builder.get_object("ftp_box")
self._stack_control_box = builder.get_object("control_box")
+ self._telnet_box = builder.get_object("telnet_box")
self.connect("change-page", self.on_page_change)
# Header bar.
profile_box = builder.get_object("profile_combo_box")
@@ -460,6 +463,7 @@ class Application(Gtk.Application):
self.set_action("on_backup_tool_show", self.on_backup_tool_show)
self.set_action("on_about_app", self.on_about_app)
self.set_action("on_close_app", self.on_close_app)
+ self.set_state_action("on_telnet_show", self.on_telnet_show, False)
# Filter.
filter_action = Gio.SimpleAction.new("filter", None)
filter_action.connect("activate", lambda a, v: self.emit("filter-toggled", None))
@@ -539,6 +543,7 @@ class Application(Gtk.Application):
self.set_accels_for_action("app.on_locked", ["l"])
self.set_accels_for_action("app.on_close_app", ["q"])
self.set_accels_for_action("app.on_edit", ["e"])
+ self.set_accels_for_action("app.on_telnet_show", ["t"])
self.set_accels_for_action("win.filter", ["f"])
def do_activate(self):
@@ -782,10 +787,14 @@ class Application(Gtk.Application):
self._ftp_client = FtpClientBox(self, self._settings)
box.pack_start(self._ftp_client, True, True, 0)
- def on_control_realize(self, box: Gtk.HBox):
+ def on_control_realize(self, box):
self._control_tool = ControlTool(self, self._http_api, self._settings)
box.pack_start(self._control_tool, True, True, 0)
+ def on_telnet_relize(self, box):
+ telnet_tool = TelnetClient(self, self._settings)
+ box.pack_start(telnet_tool, True, True, 0)
+
def on_visible_page(self, stack, param):
self._page = Page(stack.get_visible_child_name())
self._fav_paned.set_visible(self._page in self._fav_pages)
@@ -2646,6 +2655,12 @@ class Application(Gtk.Application):
""" Shows backup tool dialog """
BackupDialog(self._main_window, self._settings, self.open_data).show()
+ # ***************** Telnet ******************** #
+
+ def on_telnet_show(self, action, value=False):
+ action.set_state(value)
+ GLib.idle_add(self._telnet_box.set_visible, value)
+
# ************************* Streams ***************************** #
def on_play_stream(self, item=None):
diff --git a/app/ui/telnet.glade b/app/ui/telnet.glade
new file mode 100644
index 00000000..64845e88
--- /dev/null
+++ b/app/ui/telnet.glade
@@ -0,0 +1,183 @@
+
+
+
+
+
+
+
+
+
+
+ tag_table
+
+
+ True
+ False
+ 5
+ 5
+ 5
+ 5
+ 0.49000000953674316
+ in
+
+
+ 480
+ 180
+ True
+ False
+ 5
+ 5
+ 5
+ vertical
+ 5
+
+
+ True
+ False
+ 5
+ 5
+ 2
+ 5
+
+
+ True
+ True
+ True
+ Connect
+
+
+
+ True
+ False
+ gtk-connect
+
+
+
+
+ False
+ True
+ 0
+
+
+
+
+ True
+ True
+ Disconnect
+
+
+
+ True
+ False
+ gtk-disconnect
+
+
+
+
+ False
+ True
+ 1
+
+
+
+
+ True
+ True
+ True
+ Clear
+ center
+ center
+
+
+
+ True
+ False
+ gtk-clear
+
+
+
+
+ False
+ True
+ 2
+
+
+
+
+ False
+ True
+ 0
+
+
+
+
+ True
+ True
+ in
+
+
+ textview-large
+ True
+ True
+ char
+ 5
+ 5
+ text_buffer
+ True
+ GTK_INPUT_HINT_WORD_COMPLETION | GTK_INPUT_HINT_NONE
+ True
+
+
+
+
+
+
+ True
+ True
+ 1
+
+
+
+
+
+
+ True
+ False
+ Telnet
+
+
+
+
diff --git a/app/ui/telnet.py b/app/ui/telnet.py
new file mode 100644
index 00000000..647bf7be
--- /dev/null
+++ b/app/ui/telnet.py
@@ -0,0 +1,224 @@
+import re
+import selectors
+import socket
+from collections import deque
+from telnetlib import Telnet
+
+from gi.repository import GLib
+
+from app.commons import run_task, run_idle, log
+from app.ui.uicommons import Gtk, Gdk, UI_RESOURCES_PATH, KeyboardKey, MOD_MASK
+
+
+class ExtTelnet(Telnet):
+
+ def __init__(self, output_callback, **kwargs):
+ super().__init__(**kwargs)
+ self._output_callback = output_callback
+
+ def interact(self):
+ """ Interaction function, emulates a very dumb telnet client. """
+ with selectors.DefaultSelector() as selector:
+ selector.register(self, selectors.EVENT_READ)
+
+ while True:
+ for key, events in selector.select():
+ if key.fileobj is self:
+ try:
+ text = self.read_very_eager()
+ except EOFError as e:
+ msg = "\n*** Connection closed by remote host ***\n"
+ self._output_callback(msg)
+ log(msg)
+ raise e
+ else:
+ if text:
+ self._output_callback(text)
+
+
+class TelnetClient(Gtk.Box):
+ """ Very simple telnet client. """
+ _COLOR_PATTERN = re.compile("\x1b.*?m") # Color info
+ _ERASING_PATTERN = re.compile("\x1b.*?K") # Erase to right
+ _APP_MODE_PATTERN = re.compile("\x1b.*?(1h)|(1l)") # h - on, l - off
+ _ALL_PATTERN = re.compile(r'(\x1b\[|\x9b)[0-?]*[@-~]')
+ _NOT_SUPPORTED = {"mc", "mcedit", "vi", "nano"}
+
+ def __init__(self, app, settings, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._app = app
+ self._app.connect("profile-changed", self.on_profile_changed)
+
+ self._tn = None
+ self._app_mode = False
+ self._commands = deque(maxlen=10)
+
+ self._handlers = {"on_clear": self.on_clear,
+ "on_text_view_realize": self.on_text_view_realize,
+ "on_view_key_press": self.on_view_key_press,
+ "on_connect": self.on_connect,
+ "on_disconnect": self.on_disconnect}
+
+ builder = Gtk.Builder()
+ builder.add_from_file(UI_RESOURCES_PATH + "telnet.glade")
+ builder.connect_signals(self._handlers)
+
+ self._text_view = builder.get_object("text_view")
+ self._buf = builder.get_object("text_buffer")
+ self._end_tag = builder.get_object("end_tag")
+ self._connect_button = builder.get_object("connect_button")
+ self._connect_button.bind_property("visible", builder.get_object("disconnect_button"), "visible", 4)
+
+ main_frame = builder.get_object("telnet_frame")
+ provider = Gtk.CssProvider()
+ provider.load_from_path(UI_RESOURCES_PATH + "style.css")
+ main_frame.get_style_context().add_provider(provider, Gtk.STYLE_PROVIDER_PRIORITY_USER)
+
+ self.pack_start(main_frame, True, True, 0)
+ self.show()
+
+ def on_profile_changed(self, app, data):
+ self.on_clear()
+ self.on_disconnect()
+ self.on_connect()
+
+ def on_text_view_realize(self, view):
+ self.on_connect()
+
+ @run_task
+ def on_connect(self, item=None):
+ try:
+ GLib.idle_add(self._connect_button.set_visible, False)
+ settings = self._app.app_settings
+ user, password, timeout = settings.user, settings.password, settings.telnet_timeout
+ self._tn = ExtTelnet(self.append_output, host=settings.host, port=settings.telnet_port, timeout=timeout)
+
+ if user != "":
+ self._tn.read_until(b"login: ")
+ self._tn.write(user.encode("utf-8") + b"\n")
+ if password != "":
+ self._tn.read_until(b"Password: ")
+ self._tn.write(password.encode("utf-8") + b"\n")
+
+ self._tn.interact()
+ except (OSError, EOFError, socket.timeout, ConnectionRefusedError) as e:
+ log(f"{self.__class__.__name__}: {e}")
+ self._app.show_info_message(str(e), Gtk.MessageType.ERROR)
+ finally:
+ GLib.idle_add(self._connect_button.set_visible, True)
+
+ @run_task
+ def on_disconnect(self, item=None):
+ if self._tn:
+ GLib.idle_add(self._connect_button.set_visible, True)
+ self._tn.close()
+
+ def on_command_done(self, entry):
+ command = entry.get_text()
+ entry.set_text("")
+ if command and self._tn:
+ self._tn.write(command.encode("ascii") + b"\r")
+
+ def on_clear(self, item=None):
+ self._buf.delete(self._buf.get_start_iter(), self._buf.get_end_iter())
+
+ def on_view_key_press(self, view, event):
+ """ Handling keystrokes on press. """
+ if event.keyval == Gdk.KEY_Return:
+ self.do_command()
+ return True
+
+ key_code = event.hardware_keycode
+ if not KeyboardKey.value_exist(key_code):
+ return
+
+ key = KeyboardKey(key_code)
+ ctrl = event.state & MOD_MASK
+ if ctrl and key is KeyboardKey.C:
+ if self._tn and self._tn.sock:
+ self._tn.write(b"\x03") # interrupt
+
+ # Last commands navigation.
+ if key is KeyboardKey.UP:
+ self.delete_last_command()
+ if self._commands:
+ cmd = self._commands.pop()
+ self._commands.appendleft(cmd)
+ self._buf.insert_at_cursor(cmd, -1)
+ return True
+ elif key is KeyboardKey.DOWN:
+ self.delete_last_command()
+ if self._commands:
+ cmd = self._commands.popleft()
+ self._commands.append(cmd)
+ self._buf.insert_at_cursor(cmd, -1)
+ return True
+
+ def delete_last_command(self):
+ end = self._buf.get_end_iter()
+ if end.ends_tag(self._end_tag):
+ return
+
+ if end.backward_to_tag_toggle(self._end_tag):
+ self._buf.delete(self._buf.get_end_iter(), end)
+
+ def do_command(self):
+ count = self._buf.get_line_count()
+ begin = self._buf.get_iter_at_line(count)
+ end = self._buf.get_end_iter()
+ command = []
+
+ while end.backward_to_tag_toggle(self._end_tag):
+ command.append(self._buf.get_text(end, begin, False))
+ break
+ else: # if buf is empty
+ command.append(self._buf.get_text(begin, end, False))
+
+ # To preventing duplication of the command in the buf.
+ self._buf.delete(end, begin)
+
+ if command and self._tn.sock:
+ cmd = command[0]
+ if cmd in self._NOT_SUPPORTED:
+ self._app.show_info_message(f"'{cmd}' is not supported by this client.", Gtk.MessageType.ERROR)
+ else:
+ self._tn.write(cmd.encode("ascii") + b"\r")
+ self._commands.append(cmd)
+
+ @run_idle
+ def append_output(self, txt):
+ t = txt.decode("ascii", errors="ignore")
+
+ ap = re.search(self._APP_MODE_PATTERN, t)
+ if ap:
+ on, of = ap.group(1), ap.group(2)
+ if on:
+ self._app_mode = True
+ elif of:
+ self._app_mode = False
+ self.on_clear()
+
+ t = re.sub(self._ALL_PATTERN, "", t) # Removing [replacing] ascii escape sequences.
+
+ if self._app_mode:
+ start, end = self._buf.get_start_iter(), self._buf.get_end_iter()
+ count = self._buf.get_line_count()
+ new_lines = t.split("\r\n")
+ ext_lines = self._buf.get_text(start, end, True).split("\r\n")
+ if count < len(new_lines):
+ self._buf.set_text(re.sub(self._ERASING_PATTERN, "", t))
+ else:
+ for i, line in enumerate(new_lines):
+ if line:
+ ext_lines[i] = re.sub(self._ERASING_PATTERN, "", line)
+ self._buf.set_text("\r\n".join(ext_lines))
+ else:
+ self._buf.insert_at_cursor(t, -1)
+
+ insert = self._buf.get_insert()
+ self._text_view.scroll_to_mark(insert, 0.0, True, 0.0, 1.0)
+ self._buf.apply_tag(self._end_tag, self._buf.get_start_iter(), self._buf.get_end_iter())
+
+
+if __name__ == "__main__":
+ pass