mirror of
https://github.com/DYefremov/DemonEditor.git
synced 2026-03-05 12:01:45 +01:00
reworked built-in function for getting yt links
This commit is contained in:
140
app/tools/yt.py
140
app/tools/yt.py
@@ -2,7 +2,7 @@
|
||||
#
|
||||
# The MIT License (MIT)
|
||||
#
|
||||
# Copyright (c) 2018-2021 Dmitriy Yefremov
|
||||
# Copyright (c) 2018-2022 Dmitriy Yefremov
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -26,7 +26,7 @@
|
||||
#
|
||||
|
||||
|
||||
""" Module for working with YouTube service """
|
||||
""" Module for working with YouTube service. """
|
||||
import gzip
|
||||
import json
|
||||
import os
|
||||
@@ -35,20 +35,21 @@ import shutil
|
||||
import sys
|
||||
from html.parser import HTMLParser
|
||||
from json import JSONDecodeError
|
||||
from urllib import parse
|
||||
from urllib.error import URLError
|
||||
from urllib.parse import unquote
|
||||
from urllib.request import Request, urlopen, urlretrieve
|
||||
|
||||
from app.commons import log
|
||||
from app.settings import SEP
|
||||
from app.ui.uicommons import show_notification
|
||||
|
||||
_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=)([\w-]{11}).*")
|
||||
_YT_LIST_PATTERN = re.compile(r"https://www.youtube.com/.+?(?:list=)([\w-]{18,})?.*")
|
||||
_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*")
|
||||
_TIMEOUT = 5
|
||||
_HEADERS = {"User-Agent": "Mozilla/5.0 (Linux x86_64; rv:92.0) Gecko/20100101 Firefox/92.0",
|
||||
"DNT": "1",
|
||||
"Accept-Encoding": "gzip, deflate"}
|
||||
_YT_PATTERN = re.compile(r"https://www.youtube.com/.+(?:v=)([\w-]{11}).*")
|
||||
_YT_LIST_PATTERN = re.compile(r"https://www.youtube.com/.+?(?:list=)([\w-]{18,})?.*")
|
||||
_YT_VIDEO_PATTERN = re.compile(r"https://r\d+---sn-[\w]{10}-[\w]{3,5}.googlevideo.com/videoplayback?.*")
|
||||
|
||||
Quality = {137: "1080p", 136: "720p", 135: "480p", 134: "360p",
|
||||
133: "240p", 160: "144p", 0: "0p", 18: "360p", 22: "720p"}
|
||||
@@ -121,44 +122,27 @@ class YouTube:
|
||||
|
||||
Returns tuple from the video links dict and title.
|
||||
"""
|
||||
req = Request(YouTube._VIDEO_INFO_LINK.format(video_id), headers=_HEADERS)
|
||||
info = InnerTube().player(video_id)
|
||||
det = info.get("videoDetails", None)
|
||||
title = det.get("title", None) if det else None
|
||||
streaming_data = info.get("streamingData", None)
|
||||
fmts = streaming_data.get("formats", None) if streaming_data else None
|
||||
|
||||
with urlopen(req, timeout=2) as resp:
|
||||
data = unquote(gzip.decompress(resp.read()).decode("utf-8")).split("&")
|
||||
out = {k: v for k, sep, v in (str(d).partition("=") for d in map(unquote, data))}
|
||||
player_resp = out.get("player_response", None)
|
||||
if fmts:
|
||||
links = {Quality[i["itag"]]: i["url"] for i in filter(
|
||||
lambda i: i.get("itag", -1) in Quality, fmts) if "url" in i}
|
||||
|
||||
if player_resp:
|
||||
try:
|
||||
resp = json.loads(player_resp)
|
||||
except JSONDecodeError as e:
|
||||
log(f"{__class__.__name__}: Parsing player response error: {e}")
|
||||
else:
|
||||
det = resp.get("videoDetails", None)
|
||||
title = det.get("title", None) if det else None
|
||||
streaming_data = resp.get("streamingData", None)
|
||||
fmts = streaming_data.get("formats", None) if streaming_data else None
|
||||
if links and title:
|
||||
return links, title.replace("+", " ")
|
||||
|
||||
if fmts:
|
||||
urls = {Quality[i["itag"]]: i["url"] for i in
|
||||
filter(lambda i: i.get("itag", -1) in Quality, fmts) if "url" in i}
|
||||
cause = None
|
||||
status = info.get("playabilityStatus", None)
|
||||
if status:
|
||||
cause = f"[{status.get('status', '')}] {status.get('reason', '')}"
|
||||
|
||||
if urls and title:
|
||||
return urls, title.replace("+", " ")
|
||||
log(f"{__class__.__name__}: Getting link to video with id '{video_id}' filed! Cause: {cause}")
|
||||
|
||||
stream_map = out.get("url_encoded_fmt_stream_map", None)
|
||||
if stream_map:
|
||||
s_map = {k: v for k, sep, v in (str(d).partition("=") for d in stream_map.split("&"))}
|
||||
url, title = s_map.get("url", None), out.get("title", None)
|
||||
url, title = unquote(url) if url else "", title.replace("+", " ") if title else ""
|
||||
if url and title:
|
||||
return {Quality[0]: url}, title.replace("+", " ")
|
||||
|
||||
rsn = out.get("reason", None)
|
||||
rsn = rsn.replace("+", " ") if rsn else ""
|
||||
log("{}: Getting link to video with id {} filed! Cause: {}".format(__class__.__name__, video_id, rsn))
|
||||
|
||||
return None, rsn
|
||||
return None, cause
|
||||
|
||||
def get_yt_playlist(self, list_id, url=None):
|
||||
""" Returns tuple from the playlist header and list of tuples (title, video id). """
|
||||
@@ -181,6 +165,74 @@ class YouTube:
|
||||
return PlayListParser.get_yt_playlist(list_id)
|
||||
|
||||
|
||||
class InnerTube:
|
||||
""" Object for interacting with the innertube API.
|
||||
|
||||
Based on InnerTube class from pytube [https://github.com/pytube/pytube] project!
|
||||
"""
|
||||
_BASE_URI = "https://www.youtube.com/youtubei/v1"
|
||||
|
||||
_DEFAULT_CLIENTS = {
|
||||
"ANDROID": {
|
||||
"context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20"}},
|
||||
"api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8"
|
||||
},
|
||||
"ANDROID_EMBED": {
|
||||
"context": {"client": {"clientName": "ANDROID", "clientVersion": "16.20", "clientScreen": "EMBED"}},
|
||||
"api_key": "AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8"
|
||||
}
|
||||
}
|
||||
|
||||
def __init__(self, client="ANDROID"):
|
||||
""" Initialize an InnerTube object.
|
||||
|
||||
@param client: Client to use for the object. Default to web because it returns the most playback types.
|
||||
"""
|
||||
self.context = self._DEFAULT_CLIENTS[client]["context"]
|
||||
self.api_key = self._DEFAULT_CLIENTS[client]["api_key"]
|
||||
|
||||
@property
|
||||
def base_data(self):
|
||||
"""Return the base json data to transmit to the innertube API."""
|
||||
return {"context": self.context}
|
||||
|
||||
@property
|
||||
def base_params(self):
|
||||
"""Return the base query parameters to transmit to the innertube API."""
|
||||
return {"key": self.api_key, "contentCheckOk": True, "racyCheckOk": True}
|
||||
|
||||
def player(self, video_id):
|
||||
""" Make a request to the player endpoint. Returns raw player info results. """
|
||||
endpoint = f"{self._BASE_URI}/player"
|
||||
query = {"videoId": video_id}
|
||||
query.update(self.base_params)
|
||||
return self._call_api(endpoint, query, self.base_data) or {}
|
||||
|
||||
@staticmethod
|
||||
def _call_api(endpoint, query, data):
|
||||
""" Make a request to a given endpoint with the provided query parameters and data."""
|
||||
headers = {"Content-Type": "application/json", }
|
||||
response = InnerTube._execute(f"{endpoint}?{parse.urlencode(query)}", "POST", headers=headers, data=data)
|
||||
|
||||
try:
|
||||
resp = json.loads(response.read())
|
||||
except JSONDecodeError as e:
|
||||
log(f"{__class__.__name__}: Parsing response error: {e}")
|
||||
else:
|
||||
return resp
|
||||
|
||||
@staticmethod
|
||||
def _execute(url, method=None, headers=None, data=None, timeout=_TIMEOUT):
|
||||
base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"}
|
||||
if headers:
|
||||
base_headers.update(headers)
|
||||
if data:
|
||||
# Encoding data for request.
|
||||
if not isinstance(data, bytes):
|
||||
data = bytes(json.dumps(data), encoding="utf-8")
|
||||
return urlopen(Request(url, headers=base_headers, method=method, data=data), timeout=timeout)
|
||||
|
||||
|
||||
class PlayListParser(HTMLParser):
|
||||
""" Very simple parser to handle YouTube playlist pages. """
|
||||
|
||||
@@ -207,7 +259,7 @@ class PlayListParser(HTMLParser):
|
||||
try:
|
||||
resp = json.loads(data)
|
||||
except JSONDecodeError as e:
|
||||
log("{}: Parsing data error: {}".format(__class__.__name__, e))
|
||||
log(f"{__class__.__name__}: Parsing data error: {e}")
|
||||
else:
|
||||
sb = resp.get("sidebar", None)
|
||||
if sb:
|
||||
@@ -241,9 +293,9 @@ class PlayListParser(HTMLParser):
|
||||
|
||||
returns tuple from the playlist header and list of tuples (title, video id)
|
||||
"""
|
||||
request = Request("https://www.youtube.com/playlist?list={}&hl=en".format(play_list_id), headers=_HEADERS)
|
||||
request = Request(f"https://www.youtube.com/playlist?list={play_list_id}&hl=en", headers=_HEADERS)
|
||||
|
||||
with urlopen(request, timeout=2) as resp:
|
||||
with urlopen(request, timeout=_TIMEOUT) as resp:
|
||||
data = gzip.decompress(resp.read()).decode("utf-8")
|
||||
parser = PlayListParser()
|
||||
parser.feed(data)
|
||||
@@ -295,10 +347,10 @@ class YouTubeDL:
|
||||
try:
|
||||
import youtube_dl
|
||||
except ModuleNotFoundError as e:
|
||||
log("YouTubeDLHelper error: {}".format(str(e)))
|
||||
log(f"YouTubeDLHelper error: {e}")
|
||||
raise YouTubeException(e)
|
||||
except ImportError as e:
|
||||
log("YouTubeDLHelper error: {}".format(str(e)))
|
||||
log(f"YouTubeDLHelper error: {e}")
|
||||
else:
|
||||
if self._path not in youtube_dl.__file__:
|
||||
msg = "Another version of youtube-dl was found on your system!"
|
||||
|
||||
Reference in New Issue
Block a user