mirror of
https://github.com/DYefremov/DemonEditor.git
synced 2026-02-28 09:31:35 +01:00
telnetlib usage refactoring (#218)
This commit is contained in:
@@ -28,6 +28,7 @@
|
||||
|
||||
import os
|
||||
import re
|
||||
import selectors
|
||||
import socket
|
||||
import time
|
||||
import urllib
|
||||
@@ -36,7 +37,6 @@ from enum import Enum
|
||||
from ftplib import FTP, FTP_PORT, CRLF, Error, all_errors
|
||||
from http.client import RemoteDisconnected
|
||||
from pathlib import Path
|
||||
from telnetlib import Telnet
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib.parse import urlencode, quote
|
||||
from urllib.request import (urlopen, HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, build_opener,
|
||||
@@ -78,6 +78,59 @@ class HttpApiException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class StubTelnet:
|
||||
""" Stub class for Telnet.
|
||||
|
||||
Used to run a program on an OS with Python >= 3.13
|
||||
without the need to install telnetlib .
|
||||
-> https://github.com/DYefremov/DemonEditor/issues/218.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self._msg = "Please (re)install [telnetlib] module. -> [https://github.com/DYefremov/DemonEditor/issues/218]"
|
||||
log(self._msg)
|
||||
|
||||
def read_until(self, match, timeout=None):
|
||||
raise TestException(self._msg)
|
||||
|
||||
|
||||
TN = StubTelnet
|
||||
|
||||
try:
|
||||
from telnetlib import Telnet
|
||||
except ModuleNotFoundError as e:
|
||||
log(e)
|
||||
else:
|
||||
TN = Telnet
|
||||
|
||||
|
||||
class ExtTelnet(TN):
|
||||
|
||||
def __init__(self, output_callback=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._output_callback = output_callback
|
||||
|
||||
def interact(self):
|
||||
""" Interaction function, emulates a very dumb telnet client. """
|
||||
with selectors.DefaultSelector() as selector:
|
||||
selector.register(self, selectors.EVENT_READ)
|
||||
|
||||
while True:
|
||||
for key, events in selector.select():
|
||||
if key.fileobj is self:
|
||||
try:
|
||||
text = self.read_very_eager()
|
||||
except EOFError as e:
|
||||
msg = "\n*** Connection closed by remote host ***\n"
|
||||
if self._output_callback:
|
||||
self._output_callback(msg)
|
||||
log(msg)
|
||||
raise e
|
||||
else:
|
||||
if text and self._output_callback:
|
||||
self._output_callback(text)
|
||||
|
||||
|
||||
class UtfFTP(FTP):
|
||||
""" FTP class wrapper. """
|
||||
|
||||
@@ -597,7 +650,7 @@ def http(user, password, url, callback, use_ssl=False, s_type=SettingsType.ENIGM
|
||||
|
||||
def telnet(host, port=23, user="", password="", timeout=5):
|
||||
try:
|
||||
tn = Telnet(host=host, port=port, timeout=timeout)
|
||||
tn = ExtTelnet(host=host, port=port, timeout=timeout)
|
||||
except socket.timeout:
|
||||
log("telnet error: socket timeout")
|
||||
else:
|
||||
@@ -948,7 +1001,7 @@ def test_telnet(host, port, user, password, timeout=5):
|
||||
|
||||
|
||||
def telnet_test(host, port, user, password, timeout):
|
||||
tn = Telnet(host=host, port=port, timeout=timeout)
|
||||
tn = ExtTelnet(host=host, port=port, timeout=timeout)
|
||||
time.sleep(1)
|
||||
tn.read_until(b"login: ", timeout=2)
|
||||
tn.write(user.encode("utf-8") + b"\r")
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
#
|
||||
# The MIT License (MIT)
|
||||
#
|
||||
# Copyright (c) 2018-2021 Dmitriy Yefremov
|
||||
# Copyright (c) 2018-2025 Dmitriy Yefremov
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -27,43 +27,16 @@
|
||||
|
||||
|
||||
import re
|
||||
import selectors
|
||||
import socket
|
||||
from collections import deque
|
||||
from telnetlib import Telnet
|
||||
|
||||
from gi.repository import GLib
|
||||
|
||||
from app.commons import run_task, run_idle, log
|
||||
from app.connections import ExtTelnet
|
||||
from app.ui.uicommons import Gtk, Gdk, UI_RESOURCES_PATH, KeyboardKey, MOD_MASK
|
||||
|
||||
|
||||
class ExtTelnet(Telnet):
|
||||
|
||||
def __init__(self, output_callback, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._output_callback = output_callback
|
||||
|
||||
def interact(self):
|
||||
""" Interaction function, emulates a very dumb telnet client. """
|
||||
with selectors.DefaultSelector() as selector:
|
||||
selector.register(self, selectors.EVENT_READ)
|
||||
|
||||
while True:
|
||||
for key, events in selector.select():
|
||||
if key.fileobj is self:
|
||||
try:
|
||||
text = self.read_very_eager()
|
||||
except EOFError as e:
|
||||
msg = "\n*** Connection closed by remote host ***\n"
|
||||
self._output_callback(msg)
|
||||
log(msg)
|
||||
raise e
|
||||
else:
|
||||
if text:
|
||||
self._output_callback(text)
|
||||
|
||||
|
||||
class TelnetClient(Gtk.Box):
|
||||
""" Very simple telnet client. """
|
||||
_COLOR_PATTERN = re.compile("\x1b.*?m") # Color info
|
||||
@@ -158,7 +131,7 @@ class TelnetClient(Gtk.Box):
|
||||
|
||||
key_code = event.hardware_keycode
|
||||
if not KeyboardKey.value_exist(key_code):
|
||||
return
|
||||
return None
|
||||
|
||||
key = KeyboardKey(key_code)
|
||||
ctrl = event.state & MOD_MASK
|
||||
@@ -181,6 +154,7 @@ class TelnetClient(Gtk.Box):
|
||||
self._commands.append(cmd)
|
||||
self._buf.insert_at_cursor(cmd, -1)
|
||||
return True
|
||||
return False
|
||||
|
||||
def delete_last_command(self):
|
||||
end = self._buf.get_end_iter()
|
||||
|
||||
Reference in New Issue
Block a user