diff --git a/app/tools/yt.py b/app/tools/yt.py new file mode 100644 index 00000000..814835ef --- /dev/null +++ b/app/tools/yt.py @@ -0,0 +1,40 @@ +""" Module for working with YouTube service """ +import re +import urllib +from urllib.request import Request + +_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=|\/)([\w-]{11})&?(list=)?([\w-]{34})?.*") +_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*") + + +class YouTube: + """ Helper class for working with YouTube service. """ + + @staticmethod + def is_yt_video_link(url): + return re.match(_YT_VIDEO_PATTERN, url) + + @staticmethod + def get_yt_id(url): + """ Returns video id or None """ + yt = re.search(_YT_PATTERN, url) + if yt: + return yt.group(1) + + @staticmethod + def get_yt_link(video_id): + """ Getting link to YouTube video by id. + + returns tuple from the video link and title + """ + headers = {"User-Agent": "Mozilla/5.0"} + req = Request("https://youtube.com/get_video_info?video_id={}".format(video_id), headers=headers) + with urllib.request.urlopen(req, timeout=2) as resp: + data = resp.read().decode('utf-8').split("&") + out = {k: v for k, sep, v in (str(d).partition("=") for d in map(urllib.request.unquote, data))} + stream_map = out.get("url_encoded_fmt_stream_map", None) + if stream_map: + s_map = {k: v for k, sep, v in (str(d).partition("=") for d in stream_map.split("&"))} + url, title = s_map.get("url", None), out.get("title", None) + return urllib.request.unquote(url) if url else "", title.replace("+", " ") if title else "" + return "", "" diff --git a/app/ui/iptv.py b/app/ui/iptv.py index 4a74df04..f7b02d8a 100644 --- a/app/ui/iptv.py +++ b/app/ui/iptv.py @@ -13,6 +13,7 @@ from app.commons import run_idle, run_task from app.eparser.ecommons import BqServiceType, Service from app.eparser.iptv import NEUTRINO_FAV_ID_FORMAT, StreamType, ENIGMA2_FAV_ID_FORMAT from app.properties import Profile +from app.tools.yt import YouTube from .dialogs import Action, show_dialog, DialogType, get_dialogs_string, get_message from .main_helper import get_base_model, get_iptv_url from .uicommons import Gtk, Gdk, TEXT_DOMAIN, UI_RESOURCES_PATH, IPTV_ICON, Column, IS_GNOME_SESSION @@ -21,8 +22,6 @@ _DIGIT_ENTRY_NAME = "digit-entry" _ENIGMA2_REFERENCE = "{}:0:{}:{:X}:{:X}:{:X}:{:X}:0:0:0" _PATTERN = re.compile("(?:^[\\s]*$|\\D)") _UI_PATH = UI_RESOURCES_PATH + "iptv.glade" -_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=|\/)([\w-]{11})&?(list=)?([\w-]{34})?.*") -_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*") def is_data_correct(elems): @@ -43,21 +42,6 @@ def get_stream_type(box): return StreamType.NONE_REC_2.value -def get_yt_link(video_id): - """ Getting link to YouTube video by id. - - returns tuple from the video link and title - """ - headers = {"User-Agent": "Mozilla/5.0"} - req = Request("https://youtube.com/get_video_info?video_id={}".format(video_id), headers=headers) - while True: - with urllib.request.urlopen(req, timeout=2) as resp: - data = urllib.request.unquote(str(resp.readline())).split("&") - out = {k: v for k, sep, v in (str(d).partition("=") for d in map(urllib.request.unquote, data))} - title = out.get("title", None) - return out.get("url", None), title.replace("+", " ") if title else "" - - @lru_cache(maxsize=1) def get_yt_icon(icon_name, size=24): """ Getting YouTube icon. If the icon is not found in the icon themes, the "Info" icon is returned by default! """ @@ -225,17 +209,15 @@ class IptvDialog: url = urlparse(url_str) entry.set_name("GtkEntry" if all([url.scheme, url.netloc, url.path]) else _DIGIT_ENTRY_NAME) - yt = re.search(_YT_PATTERN, url_str) - if yt: + yt_id = YouTube.get_yt_id(url_str) + if yt_id: entry.set_icon_from_pixbuf(Gtk.EntryIconPosition.SECONDARY, get_yt_icon("youtube", 32)) - video_id = yt.group(1) - if video_id: - text = "Found a link to the YouTube resource!\nTry to get a direct link to the video?" - if show_dialog(DialogType.QUESTION, self._dialog, text=text) == Gtk.ResponseType.OK: - entry.set_sensitive(False) - gen = self.set_yt_url(entry, video_id) - GLib.idle_add(lambda: next(gen, False), priority=GLib.PRIORITY_LOW) - elif re.match(_YT_VIDEO_PATTERN, url_str): + text = "Found a link to the YouTube resource!\nTry to get a direct link to the video?" + if show_dialog(DialogType.QUESTION, self._dialog, text=text) == Gtk.ResponseType.OK: + entry.set_sensitive(False) + gen = self.set_yt_url(entry, yt_id) + GLib.idle_add(lambda: next(gen, False), priority=GLib.PRIORITY_LOW) + elif YouTube.is_yt_video_link(url_str): entry.set_icon_from_pixbuf(Gtk.EntryIconPosition.SECONDARY, get_yt_icon("youtube", 32)) else: entry.set_icon_from_stock(Gtk.EntryIconPosition.SECONDARY, None) @@ -243,7 +225,7 @@ class IptvDialog: def set_yt_url(self, entry, video_id): try: - link, title = get_yt_link(video_id) + link, title = YouTube.get_yt_link(video_id) except urllib.error.URLError as e: self.show_info_message(get_message("Get link error:") + (str(e)), Gtk.MessageType.ERROR) return