From a55495fd7c1ea9cf38e51bfad9794d22f14023ec Mon Sep 17 00:00:00 2001 From: DYefremov Date: Tue, 25 Jan 2022 18:07:34 +0300 Subject: [PATCH] reworked built-in function for getting yt links --- app/tools/yt.py | 140 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 96 insertions(+), 44 deletions(-) diff --git a/app/tools/yt.py b/app/tools/yt.py index d4e751fb..c74dd37e 100644 --- a/app/tools/yt.py +++ b/app/tools/yt.py @@ -2,7 +2,7 @@ # # The MIT License (MIT) # -# Copyright (c) 2018-2021 Dmitriy Yefremov +# Copyright (c) 2018-2022 Dmitriy Yefremov # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -26,7 +26,7 @@ # -""" Module for working with YouTube service """ +""" Module for working with YouTube service. """ import gzip import json import os @@ -35,20 +35,21 @@ import shutil import sys from html.parser import HTMLParser from json import JSONDecodeError +from urllib import parse from urllib.error import URLError -from urllib.parse import unquote from urllib.request import Request, urlopen, urlretrieve from app.commons import log from app.settings import SEP from app.ui.uicommons import show_notification -_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=)([\w-]{11}).*") -_YT_LIST_PATTERN = re.compile(r"https://www.youtube.com/.+?(?:list=)([\w-]{18,})?.*") -_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*") +_TIMEOUT = 5 _HEADERS = {"User-Agent": "Mozilla/5.0 (Linux x86_64; rv:92.0) Gecko/20100101 Firefox/92.0", "DNT": "1", "Accept-Encoding": "gzip, deflate"} +_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=)([\w-]{11}).*") +_YT_LIST_PATTERN = re.compile(r"https://www.youtube.com/.+?(?:list=)([\w-]{18,})?.*") +_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*") Quality = {137: "1080p", 136: "720p", 135: "480p", 134: "360p", 133: "240p", 160: "144p", 0: "0p", 18: "360p", 22: "720p"} @@ -121,44 +122,27 @@ class YouTube: Returns tuple from the video links dict and title. """ - req = Request(YouTube._VIDEO_INFO_LINK.format(video_id), headers=_HEADERS) + info = InnerTube().player(video_id) + det = info.get("videoDetails", None) + title = det.get("title", None) if det else None + streaming_data = info.get("streamingData", None) + fmts = streaming_data.get("formats", None) if streaming_data else None - with urlopen(req, timeout=2) as resp: - data = unquote(gzip.decompress(resp.read()).decode("utf-8")).split("&") - out = {k: v for k, sep, v in (str(d).partition("=") for d in map(unquote, data))} - player_resp = out.get("player_response", None) + if fmts: + links = {Quality[i["itag"]]: i["url"] for i in filter( + lambda i: i.get("itag", -1) in Quality, fmts) if "url" in i} - if player_resp: - try: - resp = json.loads(player_resp) - except JSONDecodeError as e: - log(f"{__class__.__name__}: Parsing player response error: {e}") - else: - det = resp.get("videoDetails", None) - title = det.get("title", None) if det else None - streaming_data = resp.get("streamingData", None) - fmts = streaming_data.get("formats", None) if streaming_data else None + if links and title: + return links, title.replace("+", " ") - if fmts: - urls = {Quality[i["itag"]]: i["url"] for i in - filter(lambda i: i.get("itag", -1) in Quality, fmts) if "url" in i} + cause = None + status = info.get("playabilityStatus", None) + if status: + cause = f"[{status.get('status', '')}] {status.get('reason', '')}" - if urls and title: - return urls, title.replace("+", " ") + log(f"{__class__.__name__}: Getting link to video with id '{video_id}' filed! Cause: {cause}") - stream_map = out.get("url_encoded_fmt_stream_map", None) - if stream_map: - s_map = {k: v for k, sep, v in (str(d).partition("=") for d in stream_map.split("&"))} - url, title = s_map.get("url", None), out.get("title", None) - url, title = unquote(url) if url else "", title.replace("+", " ") if title else "" - if url and title: - return {Quality[0]: url}, title.replace("+", " ") - - rsn = out.get("reason", None) - rsn = rsn.replace("+", " ") if rsn else "" - log("{}: Getting link to video with id {} filed! Cause: {}".format(__class__.__name__, video_id, rsn)) - - return None, rsn + return None, cause def get_yt_playlist(self, list_id, url=None): """ Returns tuple from the playlist header and list of tuples (title, video id). """ @@ -181,6 +165,74 @@ class YouTube: return PlayListParser.get_yt_playlist(list_id) +class InnerTube: + """ Object for interacting with the innertube API. + + Based on InnerTube class from pytube [https://github.com/pytube/pytube] project! + """ + _BASE_URI = "https://www.youtube.com/youtubei/v1" + + _DEFAULT_CLIENTS = { + "ANDROID": { + "context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20"}}, + "api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8" + }, + "ANDROID_EMBED": { + "context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20", "clientScreen": "EMBED"}}, + "api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8" + } + } + + def __init__(self, client="ANDROID"): + """ Initialize an InnerTube object. + + @param client: Client to use for the object. Default to web because it returns the most playback types. + """ + self.context = self._DEFAULT_CLIENTS[client]["context"] + self.api_key = self._DEFAULT_CLIENTS[client]["api_key"] + + @property + def base_data(self): + """Return the base json data to transmit to the innertube API.""" + return {"context": self.context} + + @property + def base_params(self): + """Return the base query parameters to transmit to the innertube API.""" + return {"key": self.api_key, "contentCheckOk": True, "racyCheckOk": True} + + def player(self, video_id): + """ Make a request to the player endpoint. Returns raw player info results. """ + endpoint = f"{self._BASE_URI}/player" + query = {"videoId": video_id} + query.update(self.base_params) + return self._call_api(endpoint, query, self.base_data) or {} + + @staticmethod + def _call_api(endpoint, query, data): + """ Make a request to a given endpoint with the provided query parameters and data.""" + headers = {"Content-Type": "application/json", } + response = InnerTube._execute(f"{endpoint}?{parse.urlencode(query)}", "POST", headers=headers, data=data) + + try: + resp = json.loads(response.read()) + except JSONDecodeError as e: + log(f"{__class__.__name__}: Parsing response error: {e}") + else: + return resp + + @staticmethod + def _execute(url, method=None, headers=None, data=None, timeout=_TIMEOUT): + base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"} + if headers: + base_headers.update(headers) + if data: + # Encoding data for request. + if not isinstance(data, bytes): + data = bytes(json.dumps(data), encoding="utf-8") + return urlopen(Request(url, headers=base_headers, method=method, data=data), timeout=timeout) + + class PlayListParser(HTMLParser): """ Very simple parser to handle YouTube playlist pages. """ @@ -207,7 +259,7 @@ class PlayListParser(HTMLParser): try: resp = json.loads(data) except JSONDecodeError as e: - log("{}: Parsing data error: {}".format(__class__.__name__, e)) + log(f"{__class__.__name__}: Parsing data error: {e}") else: sb = resp.get("sidebar", None) if sb: @@ -241,9 +293,9 @@ class PlayListParser(HTMLParser): returns tuple from the playlist header and list of tuples (title, video id) """ - request = Request("https://www.youtube.com/playlist?list={}&hl=en".format(play_list_id), headers=_HEADERS) + request = Request(f"https://www.youtube.com/playlist?list={play_list_id}&hl=en", headers=_HEADERS) - with urlopen(request, timeout=2) as resp: + with urlopen(request, timeout=_TIMEOUT) as resp: data = gzip.decompress(resp.read()).decode("utf-8") parser = PlayListParser() parser.feed(data) @@ -295,10 +347,10 @@ class YouTubeDL: try: import youtube_dl except ModuleNotFoundError as e: - log("YouTubeDLHelper error: {}".format(str(e))) + log(f"YouTubeDLHelper error: {e}") raise YouTubeException(e) except ImportError as e: - log("YouTubeDLHelper error: {}".format(str(e))) + log(f"YouTubeDLHelper error: {e}") else: if self._path not in youtube_dl.__file__: msg = "Another version of youtube-dl was found on your system!"