# -*- coding: utf-8 -*- """ pafy.py. Python library to download YouTube content and retrieve metadata https://github.com/np1/pafy Copyright (C) 2013-2014 np1 This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . """ from __future__ import unicode_literals __version__ = "0.3.74" __author__ = "np1" __license__ = "LGPLv3" import re import os import sys import time import json import logging import hashlib import tempfile from xml.etree import ElementTree early_py_version = sys.version_info[:2] < (2, 7) if sys.version_info[:2] >= (3, 0): # pylint: disable=E0611,F0401,I0011 from urllib.request import build_opener from urllib.error import HTTPError, URLError from urllib.parse import parse_qs, unquote_plus, urlencode uni, pyver = str, 3 else: from urllib2 import build_opener, HTTPError, URLError from urllib import unquote_plus, urlencode from urlparse import parse_qs uni, pyver = unicode, 2 if os.environ.get("pafydebug") == "1": logging.basicConfig(level=logging.DEBUG) dbg = logging.debug def parseqs(data): """ parse_qs, return unicode. """ if type(data) == uni: return parse_qs(data) elif pyver == 3: data = data.decode("utf8") data = parse_qs(data) else: data = parse_qs(data) out = {} for k, v in data.items(): k = k.decode("utf8") out[k] = [x.decode("utf8") for x in v] data = out return data def fetch_decode(url, encoding=None): """ Fetch url and decode. """ try: req = g.opener.open(url) except HTTPError as e: if e.getcode() == 503: time.sleep(.5) return fetch_decode(url, encoding) else: raise e ct = req.headers['content-type'] if encoding: return req.read().decode(encoding) elif "charset=" in ct: dbg("charset: %s", ct) encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1) return req.read().decode(encoding) else: dbg("encoding unknown") return req.read() def new(url, basic=True, gdata=False, signature=True, size=False, callback=lambda x: None): """ Return a new pafy instance given a url or video id. NOTE: The signature argument has been deprecated and now has no effect, it will be removed in a future version. Optional arguments: basic - fetch basic metadata and streams gdata - fetch gdata info (upload date, description, category) size - fetch the size of each stream (slow)(decrypts urls if needed) callback - a callback function to receive status strings If any of the first three above arguments are False, those data items will be fetched only when first called for. The defaults are recommended for most cases. If you wish to create many video objects at once, you may want to set basic to False, eg: video = pafy.new(basic=False) This will be quick because no http requests will be made on initialisation. Setting size to True will override the basic argument and force basic data to be fetched too (basic data is required to obtain Stream objects). """ if not signature: logging.warning("signature argument has no effect and will be removed" " in a future version.") return Pafy(url, basic, gdata, signature, size, callback) def get_video_info(video_id, newurl=None): """ Return info for video_id. Returns dict. """ url = g.urls['vidinfo'] % video_id url = newurl if newurl else url info = fetch_decode(url) # bytes info = parseqs(info) # unicode dict dbg("Fetched video info%s", " (age ver)" if newurl else "") if info['status'][0] == "fail" and info['errorcode'][0] == '150' and \ "confirm your age" in info['reason'][0]: # Video requires age verification dbg("Age verification video") new.callback("Age verification video") newurl = g.urls['age_vidinfo'] % (video_id, video_id) info = get_video_info(video_id, newurl) info.update({"age_ver": True}) elif info['status'][0] == "fail": reason = info['reason'][0] or "Bad video argument" raise IOError("Youtube says: %s [%s]" % (reason, video_id)) return info def get_video_gdata(video_id): """ Return json string containing video metadata from gdata api. """ new.callback("Fetching video gdata") query = {'part': 'id,snippet,statistics', 'maxResults': 1, 'id': video_id, 'key': g.api_key} url = g.urls['gdata'] + '?' + urlencode(query) gdata = fetch_decode(url) # unicode dbg("Fetched video gdata") new.callback("Fetched video gdata") return gdata def extract_video_id(url): """ Extract the video id from a url, return video id as str. """ ok = (r"\w-",) * 3 regx = re.compile(r'(?:^|[^%s]+)([%s]{11})(?:[^%s]+|$)' % ok) url = str(url) m = regx.search(url) if not m: err = "Need 11 character video id or the URL of the video. Got %s" raise ValueError(err % url) vidid = m.group(1) return vidid class g(object): """ Class for holding constants needed throughout the module. """ urls = { 'gdata': "https://www.googleapis.com/youtube/v3/videos", 'watchv': "http://www.youtube.com/watch?v=%s", 'vidcat': "https://www.googleapis.com/youtube/v3/videoCategories", 'vidinfo': ('http://www.youtube.com/get_video_info?' 'video_id=%s&asv=3&el=detailpage&hl=en_US'), 'playlist': ('http://www.youtube.com/list_ajax?' 'style=json&action_get_list=1&list=%s'), 'age_vidinfo': ('http://www.youtube.com/get_video_info?video_id=%s&' 'eurl=https://youtube.googleapis.com/v/%s&sts=1588') } api_key = "AIzaSyCIM4EzNqi1in22f4Z3Ru3iYvLaY8tc3bo" user_agent = "pafy " + __version__ UEFSM = 'url_encoded_fmt_stream_map' AF = 'adaptive_fmts' jsplayer = r';ytplayer\.config\s*=\s*({.*?});' lifespan = 60 * 60 * 5 # 5 hours opener = build_opener() opener.addheaders = [('User-Agent', user_agent)] cache = {} itags = { '5': ('320x240', 'flv', "normal", ''), '17': ('176x144', '3gp', "normal", ''), '18': ('640x360', 'mp4', "normal", ''), '22': ('1280x720', 'mp4', "normal", ''), '34': ('640x360', 'flv', "normal", ''), '35': ('854x480', 'flv', "normal", ''), '36': ('320x240', '3gp', "normal", ''), '37': ('1920x1080', 'mp4', "normal", ''), '38': ('4096x3072', 'mp4', "normal", '4:3 hi-res'), '43': ('640x360', 'webm', "normal", ''), '44': ('854x480', 'webm', "normal", ''), '45': ('1280x720', 'webm', "normal", ''), '46': ('1920x1080', 'webm', "normal", ''), '82': ('640x360-3D', 'mp4', "normal", ''), '83': ('640x480-3D', 'mp4', 'normal', ''), '84': ('1280x720-3D', 'mp4', "normal", ''), '100': ('640x360-3D', 'webm', "normal", ''), '102': ('1280x720-3D', 'webm', "normal", ''), '133': ('426x240', 'm4v', 'video', ''), '134': ('640x360', 'm4v', 'video', ''), '135': ('854x480', 'm4v', 'video', ''), '136': ('1280x720', 'm4v', 'video', ''), '137': ('1920x1080', 'm4v', 'video', ''), '138': ('4096x3072', 'm4v', 'video', ''), '139': ('48k', 'm4a', 'audio', ''), '140': ('128k', 'm4a', 'audio', ''), '141': ('256k', 'm4a', 'audio', ''), '160': ('256x144', 'm4v', 'video', ''), '167': ('640x480', 'webm', 'video', ''), '168': ('854x480', 'webm', 'video', ''), '169': ('1280x720', 'webm', 'video', ''), '170': ('1920x1080', 'webm', 'video', ''), '171': ('128k', 'ogg', 'audio', ''), '172': ('192k', 'ogg', 'audio', ''), '218': ('854x480', 'webm', 'video', 'VP8'), '219': ('854x480', 'webm', 'video', 'VP8'), '242': ('360x240', 'webm', 'video', 'VP9'), '243': ('480x360', 'webm', 'video', 'VP9'), '244': ('640x480', 'webm', 'video', 'VP9 low'), '245': ('640x480', 'webm', 'video', 'VP9 med'), '246': ('640x480', 'webm', 'video', 'VP9 high'), '247': ('720x480', 'webm', 'video', 'VP9'), '248': ('1920x1080', 'webm', 'video', 'VP9'), '249': ('48k', 'ogg', 'audio', 'Opus'), '250': ('56k', 'ogg', 'audio', 'Opus'), '251': ('128k', 'ogg', 'audio', 'Opus'), '256': ('192k', 'm4a', 'audio', '6-channel'), '258': ('320k', 'm4a', 'audio', '6-channel'), '264': ('2560x1440', 'm4v', 'video', ''), '266': ('3840x2160', 'm4v', 'video', 'AVC'), '271': ('1920x1280', 'webm', 'video', 'VP9'), '272': ('3414x1080', 'webm', 'video', 'VP9'), '278': ('256x144', 'webm', 'video', 'VP9'), '298': ('1280x720', 'm4v', 'video', '60fps'), '299': ('1920x1080', 'm4v', 'video', '60fps'), '302': ('1280x720', 'webm', 'video', 'VP9'), '303': ('1920x1080', 'webm', 'video', 'VP9'), } def _extract_smap(map_name, dic, zero_idx=True): """ Extract stream map, returns list of dicts. """ if map_name in dic: smap = dic.get(map_name) smap = smap[0] if zero_idx else smap smap = smap.split(",") smap = [parseqs(x) for x in smap] return [dict((k, v[0]) for k, v in x.items()) for x in smap] return [] def _extract_dash(dashurl): """ Download dash url and extract some data. """ # pylint: disable = R0914 dbg("Fetching dash page") dashdata = fetch_decode(dashurl) dbg("DASH list fetched") ns = "{urn:mpeg:DASH:schema:MPD:2011}" ytns = "{http://youtube.com/yt/2012/10/10}" tree = ElementTree.fromstring(dashdata) tlist = tree.findall(".//%sRepresentation" % ns) dashmap = [] for x in tlist: baseurl = x.find("%sBaseURL" % ns) url = baseurl.text size = baseurl.attrib["%scontentLength" % ytns] bitrate = x.get("bandwidth") itag = uni(x.get("id")) width = uni(x.get("width")) height = uni(x.get("height")) type_ = re.search(r"(?:\?|&)mime=([\w\d\/]+)", url).group(1) dashmap.append(dict(bitrate=bitrate, dash=True, itag=itag, width=width, height=height, url=url, size=size, type=type_)) return dashmap def _extract_function_from_js(name, js): """ Find a function definition called `name` and extract components. Return a dict representation of the function. """ dbg("Extracting function '%s' from javascript", name) fpattern = r'function\s+%s\(((?:\w+,?)+)\)\{([^}]+)\}' m = re.search(fpattern % re.escape(name), js) args, body = m.groups() dbg("extracted function %s(%s){%s};", name, args, body) func = {'name': name, 'parameters': args.split(","), 'body': body} return func def _extract_dictfunc_from_js(name, js): """ Find anonymous function from within a dict. """ dbg("Extracting function '%s' from javascript", name) var, _, fname = name.partition(".") fpattern = (r'var\s+%s\s*\=\s*\{.{,2000}?%s' r'\:function\(((?:\w+,?)+)\)\{([^}]+)\}') m = re.search(fpattern % (re.escape(var), re.escape(fname)), js) args, body = m.groups() dbg("extracted dict function %s(%s){%s};", name, args, body) func = {'name': name, 'parameters': args.split(","), 'body': body} return func def _get_mainfunc_from_js(js): """ Return main signature decryption function from javascript as dict. """ dbg("Scanning js for main function.") m = re.search(r'\w\.sig\|\|([$\w]+)\(\w+\.\w+\)', js) funcname = m.group(1) dbg("Found main function: %s", funcname) function = _extract_function_from_js(funcname, js) return function def _get_other_funcs(primary_func, js): """ Return all secondary functions used in primary_func. """ dbg("scanning javascript for secondary functions.") body = primary_func['body'] body = body.split(";") # standard function call; X=F(A,B,C...) call = re.compile(r'(?:[$\w+])=([$\w]+)\(((?:\w+,?)+)\)$') # dot notation function call; X=O.F(A,B,C..) dotcall = re.compile(r'(?:[$\w+]=)?([$\w]+)\.([$\w]+)\(((?:\w+,?)+)\)$') functions = {} for part in body: # is this a function? if call.match(part): match = call.match(part) name = match.group(1) # dbg("found secondary function '%s'", name) if name not in functions: # extract from javascript if not previously done functions[name] = _extract_function_from_js(name, js) # else: # dbg("function '%s' is already in map.", name) elif dotcall.match(part): match = dotcall.match(part) name = "%s.%s" % (match.group(1), match.group(2)) # don't treat X=A.slice(B) as X=O.F(B) if match.group(2) in ["slice", "splice"]: continue if name not in functions: functions[name] = _extract_dictfunc_from_js(name, js) return functions def _getval(val, argsdict): """ resolve variable values, preserve int literals. Return dict.""" m = re.match(r'(\d+)', val) if m: return int(m.group(1)) elif val in argsdict: return argsdict[val] else: raise IOError("Error val %s from dict %s" % (val, argsdict)) def _get_func_from_call(caller, name, arguments, js_url): """ Return called function complete with called args given a caller function . This function requires that Pafy.funcmap contains the function `name`. It retrieves the function and fills in the parameter values as called in the caller, returning them in the returned newfunction `args` dict """ newfunction = Pafy.funcmap[js_url][name] newfunction['args'] = {} for n, arg in enumerate(arguments): value = _getval(arg, caller['args']) # function may not use all arguments if n < len(newfunction['parameters']): param = newfunction['parameters'][n] newfunction['args'][param] = value return newfunction def _solve(f, js_url, returns=True): """Solve basic javascript function. Return solution value (str). """ # pylint: disable=R0914,R0912 resv = "slice|splice|reverse" patterns = { 'split_or_join': r'(\w+)=\1\.(?:split|join)\(""\)$', 'func_call': r'(\w+)=([$\w]+)\(((?:\w+,?)+)\)$', 'x1': r'var\s(\w+)=(\w+)\[(\w+)\]$', 'x2': r'(\w+)\[(\w+)\]=(\w+)\[(\w+)\%(\w+)\.length\]$', 'x3': r'(\w+)\[(\w+)\]=(\w+)$', 'return': r'return (\w+)(\.join\(""\))?$', 'reverse': r'(\w+)=(\w+)\.reverse\(\)$', 'reverse_noass': r'(\w+)\.reverse\(\)$', 'return_reverse': r'return (\w+)\.reverse\(\)$', 'slice': r'(\w+)=(\w+)\.slice\((\w+)\)$', 'splice_noass': r'([$\w]+)\.splice\(([$\w]+)\,([$\w]+)\)$', 'return_slice': r'return (\w+)\.slice\((\w+)\)$', 'func_call_dict': r'(\w)=([$\w]+)\.(?!%s)([$\w]+)\(((?:\w+,?)+)\)$' % resv, 'func_call_dict_noret': r'([$\w]+)\.(?!%s)([$\w]+)\(((?:\w+,?)+)\)$' % resv } parts = f['body'].split(";") for part in parts: # dbg("Working on part: " + part) name = "" for n, p in patterns.items(): m, name = re.match(p, part), n if m: break else: raise IOError("no match for %s" % part) if name == "split_or_join": pass elif name == "func_call_dict": lhs, dic, key, args = m.group(1, 2, 3, 4) funcname = "%s.%s" % (dic, key) newfunc = _get_func_from_call(f, funcname, args.split(","), js_url) f['args'][lhs] = _solve(newfunc, js_url) elif name == "func_call_dict_noret": dic, key, args = m.group(1, 2, 3) funcname = "%s.%s" % (dic, key) newfunc = _get_func_from_call(f, funcname, args.split(","), js_url) changed_args = _solve(newfunc, js_url, returns=False) for arg in f['args']: if arg in changed_args: f['args'][arg] = changed_args[arg] elif name == "func_call": lhs, funcname, args = m.group(1, 2, 3) newfunc = _get_func_from_call(f, funcname, args.split(","), js_url) f['args'][lhs] = _solve(newfunc, js_url) # recursive call # new var is an index of another var; eg: var a = b[c] elif name == "x1": b, c = [_getval(x, f['args']) for x in m.group(2, 3)] f['args'][m.group(1)] = b[c] # a[b]=c[d%e.length] elif name == "x2": vals = m.group(*range(1, 6)) a, b, c, d, e = [_getval(x, f['args']) for x in vals] f['args'][m.group(1)] = a[:b] + c[d % len(e)] + a[b + 1:] # a[b]=c elif name == "x3": a, b, c = [_getval(x, f['args']) for x in m.group(1, 2, 3)] f['args'][m.group(1)] = a[:b] + c + a[b + 1:] # a[b] = c elif name == "return": return f['args'][m.group(1)] elif name == "reverse": f['args'][m.group(1)] = _getval(m.group(2), f['args'])[::-1] elif name == "reverse_noass": f['args'][m.group(1)] = _getval(m.group(1), f['args'])[::-1] elif name == "splice_noass": a, b, c = [_getval(x, f['args']) for x in m.group(1, 2, 3)] f['args'][m.group(1)] = a[:b] + a[b + c:] elif name == "return_reverse": return f['args'][m.group(1)][::-1] elif name == "return_slice": a, b = [_getval(x, f['args']) for x in m.group(1, 2)] return a[b:] elif name == "slice": a, b, c = [_getval(x, f['args']) for x in m.group(1, 2, 3)] f['args'][m.group(1)] = b[c:] if not returns: # Return the args dict if no return statement in function return f['args'] else: raise IOError("Processed js funtion parts without finding return") def _decodesig(sig, js_url): """ Return decrypted sig given an encrypted sig and js_url key. """ # lookup main function in Pafy.funcmap dict mainfunction = Pafy.funcmap[js_url]['mainfunction'] param = mainfunction['parameters'] if not len(param) == 1: raise IOError("Main sig js function has more than one arg: %s" % param) # fill in function argument with signature mainfunction['args'] = {param[0]: sig} new.callback("Decrypting signature") solved = _solve(mainfunction, js_url) dbg("Decrypted sig = %s...", solved[:30]) new.callback("Decrypted signature") return solved def remux(infile, outfile, quiet=False, muxer="ffmpeg"): """ Remux audio. """ from subprocess import call, STDOUT muxer = muxer if isinstance(muxer, str) else "ffmpeg" for tool in set([muxer, "ffmpeg", "avconv"]): cmd = [tool, "-y", "-i", infile, "-acodec", "copy", "-vn", outfile] try: with open(os.devnull, "w") as devnull: call(cmd, stdout=devnull, stderr=STDOUT) except OSError: dbg("Failed to remux audio using %s", tool) else: os.unlink(infile) dbg("remuxed audio file using %s" % tool) if not quiet: sys.stdout.write("\nAudio remuxed.\n") break else: logging.warning("audio remux failed") os.rename(infile, outfile) def cache(name): """ Returns a sub-cache dictionary under which global key, value pairs can be stored. Regardless of whether a dictionary already exists for the given name, the sub-cache is returned by reference. """ if name not in g.cache: g.cache[name] = {} return g.cache[name] def fetch_cached(url, encoding=None, dbg_ref="", file_prefix=""): """ Fetch url - from tmpdir if already retrieved. """ tmpdir = os.path.join(tempfile.gettempdir(), "pafy") if not os.path.exists(tmpdir): os.makedirs(tmpdir) url_md5 = hashlib.md5(url.encode("utf8")).hexdigest() cached_filename = os.path.join(tmpdir, file_prefix + url_md5) if os.path.exists(cached_filename): dbg("fetched %s from cache", dbg_ref) with open(cached_filename) as f: retval = f.read() return retval else: data = fetch_decode(url, "utf8") # unicode dbg("Fetched %s", dbg_ref) new.callback("Fetched %s" % dbg_ref) with open(cached_filename, "w") as f: f.write(data) # prune files after write prune_files(tmpdir, file_prefix) return data def prune_files(path, prefix="", age_max=3600 * 24 * 14, count_max=4): """ Remove oldest files from path that start with prefix. remove files older than age_max, leave maximum of count_max files. """ tempfiles = [] if not os.path.isdir(path): return for f in os.listdir(path): filepath = os.path.join(path, f) if os.path.isfile(filepath) and f.startswith(prefix): age = time.time() - os.path.getmtime(filepath) if age > age_max: os.unlink(filepath) else: tempfiles.append((filepath, age)) tempfiles = sorted(tempfiles, key=lambda x: x[1], reverse=True) for f in tempfiles[:-count_max]: os.unlink(f[0]) def get_js_sm(video_id): """ Fetch watchinfo page and extract stream map and js funcs if not known. This function is needed by videos with encrypted signatures. If the js url referred to in the watchv page is not a key in Pafy.funcmap, the javascript is fetched and functions extracted. Returns streammap (list of dicts), js url (str) and funcs (dict) """ watch_url = g.urls['watchv'] % video_id new.callback("Fetching watch page") watchinfo = fetch_decode(watch_url) # unicode dbg("Fetched watch page") new.callback("Fetched watch page") m = re.search(g.jsplayer, watchinfo) myjson = json.loads(m.group(1)) stream_info = myjson['args'] dash_url = stream_info['dashmpd'] sm = _extract_smap(g.UEFSM, stream_info, False) asm = _extract_smap(g.AF, stream_info, False) js_url = myjson['assets']['js'] js_url = "https:" + js_url if js_url.startswith("//") else js_url funcs = Pafy.funcmap.get(js_url) if not funcs: dbg("Fetching javascript") new.callback("Fetching javascript") javascript = fetch_cached(js_url, encoding="utf8", dbg_ref="javascript", file_prefix="js-") mainfunc = _get_mainfunc_from_js(javascript) funcs = _get_other_funcs(mainfunc, javascript) funcs['mainfunction'] = mainfunc elif funcs: dbg("Using functions in memory extracted from %s", js_url) dbg("Mem contains %s js func sets", len(Pafy.funcmap)) return (sm, asm), js_url, funcs, dash_url def _make_url(raw, sig, quick=True): """ Return usable url. Set quick=False to disable ratebypass override. """ if quick and "ratebypass=" not in raw: raw += "&ratebypass=yes" if "signature=" not in raw: if sig is None: raise IOError("Error retrieving url") raw += "&signature=" + sig return raw class Stream(object): """ YouTube video stream class. """ def __init__(self, sm, parent): """ Set initial values. """ self._itag = sm['itag'] # is_dash = "width" in sm and "height" in sm is_dash = "dash" in sm if self._itag not in g.itags: logging.warning("Unknown itag: %s", self._itag) return None self._mediatype = g.itags[self.itag][2] self._threed = 'stereo3d' in sm and sm['stereo3d'] == '1' if is_dash: if sm['width'] != "None": # dash video self._resolution = "%sx%s" % (sm['width'], sm['height']) self._quality = self._resolution self._dimensions = (int(sm['width']), int(sm['height'])) else: # dash audio self._resolution = "0x0" self._dimensions = (0, 0) self._rawbitrate = int(sm['bitrate']) # self._bitrate = uni(int(sm['bitrate']) // 1024) + "k" self._bitrate = g.itags[self.itag][0] self._quality = self._bitrate self._fsize = int(sm['size']) # self._bitrate = sm['bitrate'] # self._rawbitrate = uni(int(self._bitrate) // 1024) + "k" else: # not dash self._resolution = g.itags[self.itag][0] self._fsize = None self._bitrate = self._rawbitrate = None self._dimensions = tuple(self.resolution.split("-")[0].split("x")) self._dimensions = tuple([int(x) if x.isdigit() else x for x in self._dimensions]) self._quality = self.resolution self._vidformat = sm['type'].split(';')[0] # undocumented self._extension = g.itags[self.itag][1] self._title = parent.title self.encrypted = 's' in sm self._parent = parent self._filename = self.generate_filename() self._notes = g.itags[self.itag][3] self._url = None self._rawurl = sm['url'] self._sig = sm['s'] if self.encrypted else sm.get("sig") self._active = False if self.mediatype == "audio" and not is_dash: self._dimensions = (0, 0) self._bitrate = self.resolution self._quality = self.bitrate self._resolution = "0x0" self._rawbitrate = int(sm["bitrate"]) def generate_filename(self, meta=False): """ Generate filename. """ ok = re.compile(r'[^/]') if os.name == "nt": ok = re.compile(r'[^\\/:*?"<>|]') filename = "".join(x if ok.match(x) else "_" for x in self._title) if meta: filename += "-%s-%s" % (self._parent.videoid, self._itag) filename += "." + self._extension return filename @property def rawbitrate(self): """ Return raw bitrate value. """ return self._rawbitrate @property def threed(self): """ Return bool, True if stream is 3D. """ return self._threed @property def itag(self): """ Return itag value of stream. """ return self._itag @property def resolution(self): """ Return resolution of stream as str. 0x0 if audio. """ return self._resolution @property def dimensions(self): """ Return dimensions of stream as tuple. (0, 0) if audio. """ return self._dimensions @property def quality(self): """ Return quality of stream (bitrate or resolution). eg, 128k or 640x480 (str) """ return self._quality @property def title(self): """ Return YouTube video title as a string. """ return self._title @property def extension(self): """ Return appropriate file extension for stream (str). Possible values are: 3gp, m4a, m4v, mp4, webm, ogg """ return self._extension @property def bitrate(self): """ Return bitrate of an audio stream. """ return self._bitrate @property def mediatype(self): """ Return mediatype string (normal, audio or video). (normal means a stream containing both video and audio.) """ return self._mediatype @property def notes(self): """ Return additional notes regarding the stream format. """ return self._notes @property def filename(self): """ Return filename of stream; derived from title and extension. """ return self._filename @property def url(self): """ Return the url, decrypt if required. """ if not self._url: if self._parent.age_ver: if self._sig: s = self._sig self._sig = s[2:63] + s[82] + s[64:82] + s[63] elif self.encrypted: self._sig = _decodesig(self._sig, self._parent.js_url) self._url = _make_url(self._rawurl, self._sig) return self._url @property def url_https(self): """ Return https url. """ return self.url.replace("http://", "https://") def __repr__(self): """ Return string representation. """ out = "%s:%s@%s" % (self.mediatype, self.extension, self.quality) return out def get_filesize(self): """ Return filesize of the stream in bytes. Set member variable. """ if not self._fsize: try: dbg("Getting stream size") cl = "content-length" self._fsize = int(g.opener.open(self.url).headers[cl]) dbg("Got stream size") except (AttributeError, HTTPError, URLError): self._fsize = 0 return self._fsize def cancel(self): """ Cancel an active download. """ if self._active: self._active = False return True def download(self, filepath="", quiet=False, callback=lambda *x: None, meta=False, remux_audio=False): """ Download. Use quiet=True to supress output. Return filename. Use meta=True to append video id and itag to generated filename Use remax_audio=True to remux audio file downloads """ # pylint: disable=R0912,R0914 # Too many branches, too many local vars savedir = filename = "" if filepath and os.path.isdir(filepath): savedir, filename = filepath, self.generate_filename() elif filepath: savedir, filename = os.path.split(filepath) else: filename = self.generate_filename(meta=meta) filepath = os.path.join(savedir, filename) temp_filepath = filepath + ".temp" status_string = (' {:,} Bytes [{:.2%}] received. Rate: [{:4.0f} ' 'KB/s]. ETA: [{:.0f} secs]') if early_py_version: status_string = (' {0:} Bytes [{1:.2%}] received. Rate:' ' [{2:4.0f} KB/s]. ETA: [{3:.0f} secs]') response = g.opener.open(self.url) total = int(response.info()['Content-Length'].strip()) chunksize, bytesdone, t0 = 16384, 0, time.time() fmode, offset = "wb", 0 if os.path.exists(temp_filepath): if os.stat(temp_filepath).st_size < total: offset = os.stat(temp_filepath).st_size fmode = "ab" outfh = open(temp_filepath, fmode) if offset: # partial file exists, resume download resuming_opener = build_opener() resuming_opener.addheaders = [('User-Agent', g.user_agent), ("Range", "bytes=%s-" % offset)] response = resuming_opener.open(self.url) bytesdone = offset self._active = True while self._active: chunk = response.read(chunksize) outfh.write(chunk) elapsed = time.time() - t0 bytesdone += len(chunk) if elapsed: rate = ((bytesdone - offset) / 1024) / elapsed eta = (total - bytesdone) / (rate * 1024) else: # Avoid ZeroDivisionError rate = 0 eta = 0 progress_stats = (bytesdone, bytesdone * 1.0 / total, rate, eta) if not chunk: outfh.close() break if not quiet: status = status_string.format(*progress_stats) sys.stdout.write("\r" + status + ' ' * 4 + "\r") sys.stdout.flush() if callback: callback(total, *progress_stats) if self._active: if remux_audio and self.mediatype == "audio": remux(temp_filepath, filepath, quiet=quiet, muxer=remux_audio) else: os.rename(temp_filepath, filepath) return filepath else: # download incomplete, return temp filepath outfh.close() return temp_filepath class Pafy(object): """ Class to represent a YouTube video. """ funcmap = {} # keep functions as a class variable def __init__(self, video_url, basic=True, gdata=False, signature=True, size=False, callback=lambda x: None): """ Set initial values. """ self.version = __version__ self.videoid = extract_video_id(video_url) self.watchv_url = g.urls['watchv'] % self.videoid new.callback = callback self._have_basic = False self._have_gdata = False self._description = None self._likes = None self._dislikes = None self._category = None self._published = None self._username = None self.sm = [] self.asm = [] self.dash = [] self.js_url = None # if js_url is set then has new stream map self._dashurl = None self.age_ver = False self._streams = [] self._oggstreams = [] self._m4astreams = [] self._allstreams = [] self._videostreams = [] self._audiostreams = [] self._title = None self._thumb = None self._rating = None self._length = None self._author = None self._formats = None self.ciphertag = None # used by Stream class in url property def self._duration = None self._keywords = None self._bigthumb = None self._viewcount = None self._bigthumbhd = None self._mix_pl = None self.expiry = None self.playlist_meta = None if basic: self.fetch_basic() if gdata: self._fetch_gdata() if size: for s in self.allstreams: # pylint: disable=W0104 s.get_filesize() def fetch_basic(self): """ Fetch basic data and streams. """ if self._have_basic: return self._fetch_basic() sm_ciphertag = "s" in self.sm[0] if self.ciphertag != sm_ciphertag: dbg("ciphertag mismatch") self.ciphertag = not self.ciphertag if self.ciphertag: dbg("Encrypted signature detected.") if not self.age_ver: smaps, js_url, funcs, dashurl = get_js_sm(self.videoid) Pafy.funcmap[js_url] = funcs self.sm, self.asm = smaps self.js_url = js_url dashsig = re.search(r"/s/([\w\.]+)", dashurl).group(1) dbg("decrypting dash sig") goodsig = _decodesig(dashsig, js_url) self._dashurl = re.sub(r"/s/[\w\.]+", "/signature/%s" % goodsig, dashurl) else: s = re.search(r"/s/([\w\.]+)", self._dashurl).group(1) s = s[2:63] + s[82] + s[64:82] + s[63] self._dashurl = re.sub(r"/s/[\w\.]+", "/signature/%s" % s, self._dashurl) if self._dashurl != 'unknown': self.dash = _extract_dash(self._dashurl) self._have_basic = 1 self._process_streams() self.expiry = time.time() + g.lifespan def _fetch_basic(self, info_url=None): """ Fetch info url page and set member vars. """ allinfo = get_video_info(self.videoid, newurl=info_url) if allinfo.get("age_ver"): self.age_ver = True new.callback("Fetched video info") def _get_lst(key, default="unknown", dic=allinfo): """ Dict get function, returns first index. """ retval = dic.get(key, default) return retval[0] if retval != default else default self._title = _get_lst('title') self._dashurl = _get_lst('dashmpd') self._author = _get_lst('author') self._rating = float(_get_lst('avg_rating', 0.0)) self._length = int(_get_lst('length_seconds', 0)) self._viewcount = int(_get_lst('view_count'), 0) self._thumb = unquote_plus(_get_lst('thumbnail_url', "")) self._formats = [x.split("/") for x in _get_lst('fmt_list').split(",")] self._keywords = _get_lst('keywords', "").split(',') self._bigthumb = _get_lst('iurlsd', "") self._bigthumbhd = _get_lst('iurlsdmaxres', "") self.ciphertag = _get_lst("use_cipher_signature") == "True" self.sm = _extract_smap(g.UEFSM, allinfo, True) self.asm = _extract_smap(g.AF, allinfo, True) dbg("extracted stream maps") def _fetch_gdata(self): """ Extract gdata values, fetch gdata if necessary. """ if self._have_gdata: return gdata = get_video_gdata(self.videoid) item = json.loads(gdata)['items'][0] snippet = item['snippet'] self._published = uni(snippet['publishedAt']) self._description = uni(snippet["description"]) self._category = get_categoryname(snippet['categoryId']) # TODO: Make sure actual usename is not available through the api self._username = uni(snippet['channelTitle']) statistics = item["statistics"] self._likes = int(statistics["likeCount"]) self._dislikes = int(statistics["dislikeCount"]) self._have_gdata = 1 def _process_streams(self): """ Create Stream object lists from internal stream maps. """ if not self._have_basic: self.fetch_basic() streams = [Stream(z, self) for z in self.sm] streams = [x for x in streams if x.itag in g.itags] adpt_streams = [Stream(z, self) for z in self.asm] adpt_streams = [x for x in adpt_streams if x.itag in g.itags] dash_streams = [Stream(z, self) for z in self.dash] dash_streams = [x for x in dash_streams if x.itag in g.itags] audiostreams = [x for x in adpt_streams if x.bitrate] videostreams = [x for x in adpt_streams if not x.bitrate] dash_itags = [x.itag for x in dash_streams] audiostreams = [x for x in audiostreams if x.itag not in dash_itags] videostreams = [x for x in videostreams if x.itag not in dash_itags] audiostreams += [x for x in dash_streams if x.mediatype == "audio"] videostreams += [x for x in dash_streams if x.mediatype != "audio"] audiostreams = sorted(audiostreams, key=lambda x: x.rawbitrate, reverse=True) videostreams = sorted(videostreams, key=lambda x: x.dimensions, reverse=True) m4astreams = [x for x in audiostreams if x.extension == "m4a"] oggstreams = [x for x in audiostreams if x.extension == "ogg"] self._streams = streams self._audiostreams = audiostreams self._videostreams = videostreams self._m4astreams, self._oggstreams = m4astreams, oggstreams self._allstreams = streams + videostreams + audiostreams def __repr__(self): """ Print video metadata. Return utf8 string. """ if self._have_basic: keys = "Title Author ID Duration Rating Views Thumbnail Keywords" keys = keys.split(" ") keywords = ", ".join(self.keywords) info = {"Title": self.title, "Author": self.author, "Views": self.viewcount, "Rating": self.rating, "Duration": self.duration, "ID": self.videoid, "Thumbnail": self.thumb, "Keywords": keywords} nfo = "\n".join(["%s: %s" % (k, info.get(k, "")) for k in keys]) else: nfo = "Pafy object: %s [%s]" % (self.videoid, self.title[:45] + "..") return nfo.encode("utf8", "replace") if pyver == 2 else nfo @property def streams(self): """ The streams for a video. Returns list.""" self.fetch_basic() return self._streams @property def allstreams(self): """ All stream types for a video. Returns list. """ self.fetch_basic() return self._allstreams @property def audiostreams(self): """ Return a list of audio Stream objects. """ self.fetch_basic() return self._audiostreams @property def videostreams(self): """ The video streams for a video. Returns list. """ self.fetch_basic() return self._videostreams @property def oggstreams(self): """ Return a list of ogg encoded Stream objects. """ self.fetch_basic() return self._oggstreams @property def m4astreams(self): """ Return a list of m4a encoded Stream objects. """ self.fetch_basic() return self._m4astreams @property def title(self): """ Return YouTube video title as a string. """ if not self._title: self.fetch_basic() return self._title @property def author(self): """ The uploader of the video. Returns str. """ if not self._author: self.fetch_basic() return self._author @property def rating(self): """ Rating for a video. Returns float. """ if not self._rating: self.fetch_basic() return self._rating @property def length(self): """ Length of a video in seconds. Returns int. """ if not self._length: self.fetch_basic() return self._length @property def viewcount(self): """ Number of views for a video. Returns int. """ if not self._viewcount: self.fetch_basic() return self._viewcount @property def bigthumb(self): """ Large thumbnail image url. Returns str. """ self.fetch_basic() return self._bigthumb @property def bigthumbhd(self): """ Extra large thumbnail image url. Returns str. """ self.fetch_basic() return self._bigthumbhd @property def thumb(self): """ Thumbnail image url. Returns str. """ if not self._thumb: self.fetch_basic() return self._thumb @property def duration(self): """ Duration of a video (HH:MM:SS). Returns str. """ if not self._length: self.fetch_basic() self._duration = time.strftime('%H:%M:%S', time.gmtime(self._length)) self._duration = uni(self._duration) return self._duration @property def keywords(self): """ Return keywords as list of str. """ self.fetch_basic() return self._keywords @property def category(self): """ YouTube category of the video. Returns string. """ self._fetch_gdata() return self._category @property def description(self): """ Description of the video. Returns string. """ if not self._description: self._fetch_gdata() return self._description @property def username(self): """ Return the username of the uploader. """ self._fetch_gdata() return self._username @property def published(self): """ The upload date and time of the video. Returns string. """ self._fetch_gdata() return self._published.replace(".000Z", "").replace("T", " ") @property def likes(self): """ The number of likes for the video. Returns int. """ self._fetch_gdata() return self._likes @property def dislikes(self): """ The number of dislikes for the video. Returns int. """ self._fetch_gdata() return self._dislikes @property def mix(self): """ The playlist for the related YouTube mix. Returns a dict containing Pafy objects. """ if self._mix_pl is None: try: self._mix_pl = get_playlist("RD" + self.videoid) except IOError: return None return self._mix_pl def _getbest(self, preftype="any", ftypestrict=True, vidonly=False): """ Return the highest resolution video available. Select from video-only streams if vidonly is True """ streams = self.videostreams if vidonly else self.streams if not streams: return None def _sortkey(x, key3d=0, keyres=0, keyftype=0): """ sort function for max(). """ key3d = "3D" not in x.resolution keyres = int(x.resolution.split("x")[0]) keyftype = preftype == x.extension strict = (key3d, keyftype, keyres) nonstrict = (key3d, keyres, keyftype) return strict if ftypestrict else nonstrict r = max(streams, key=_sortkey) if ftypestrict and preftype != "any" and r.extension != preftype: return None else: return r def getbestvideo(self, preftype="any", ftypestrict=True): """ Return the best resolution video-only stream. set ftypestrict to False to return a non-preferred format if that has a higher resolution """ return self._getbest(preftype, ftypestrict, vidonly=True) def getbest(self, preftype="any", ftypestrict=True): """ Return the highest resolution video+audio stream. set ftypestrict to False to return a non-preferred format if that has a higher resolution """ return self._getbest(preftype, ftypestrict, vidonly=False) def getbestaudio(self, preftype="any", ftypestrict=True): """ Return the highest bitrate audio Stream object.""" if not self.audiostreams: return None def _sortkey(x, keybitrate=0, keyftype=0): """ Sort function for max(). """ keybitrate = int(x.rawbitrate) keyftype = preftype == x.extension strict, nonstrict = (keyftype, keybitrate), (keybitrate, keyftype) return strict if ftypestrict else nonstrict r = max(self.audiostreams, key=_sortkey) if ftypestrict and preftype != "any" and r.extension != preftype: return None else: return r def populate_from_playlist(self, pl_data): """ Populate Pafy object with items fetched from playlist data. """ self._title = pl_data.get("title") self._author = pl_data.get("author") self._length = int(pl_data.get("length_seconds", 0)) self._rating = pl_data.get("rating", 0.0) self._viewcount = "".join(re.findall(r"\d", pl_data.get("views", "0"))) self._viewcount = int(self._viewcount) self._thumb = pl_data.get("thumbnail") self._description = pl_data.get("description") self.playlist_meta = pl_data def get_categoryname(cat_id): """ Returns a list of video category names for one category ID. """ timestamp = time.time() cat_cache = cache('categories') cached = cat_cache.get(cat_id, {}) if cached.get('updated', 0) > timestamp - g.lifespan: return cached.get('title', 'unknown') # call videoCategories API endpoint to retrieve title url = g.urls['vidcat'] query = {'id': cat_id, 'part': 'snippet', 'key': g.api_key} url += "?" + urlencode(query) catinfo = json.loads(fetch_decode(url)) try: for item in catinfo.get('items', []): title = item.get('snippet', {}).get('title', 'unknown') cat_cache[cat_id] = {'title':title, 'updated':timestamp} return title cat_cache[cat_id] = {'updated':timestamp} return 'unknown' except Exception: raise IOError("Error fetching category name for ID %s" % cat_id) def set_categories(categories): """ Take a dictionary mapping video category IDs to name and retrieval time. All items are stored into cache node 'videoCategories', but for the ones with a retrieval time too long ago, the v3 API is queried before. """ timestamp = time.time() idlist = [cid for cid, item in categories.items() if item.get('updated', 0) < timestamp - g.lifespan] if len(idlist) > 0: url = g.urls['vidcat'] query = {'id': ','.join(idlist), 'part': 'snippet', 'key': g.api_key} url += "?" + urlencode(query) catinfo = json.loads(fetch_decode(url)) try: for item in catinfo.get('items', []): cid = item['id'] title = item.get('snippet', {}).get('title', 'unknown') categories[cid] = {'title':title, 'updated':timestamp} except Exception: raise IOError("Error fetching category name for IDs %s" % idlist) cache('categories').update(categories) def load_cache(newcache): """Loads a dict into pafy's internal cache.""" set_categories(newcache.get('categories', {})) def dump_cache(): """Returns pafy's cache for storing by program.""" return g.cache def get_playlist(playlist_url, basic=False, gdata=False, signature=True, size=False, callback=lambda x: None): """ Return a dict containing Pafy objects from a YouTube Playlist. The returned Pafy objects are initialised using the arguments to get_playlist() in the manner documented for pafy.new() """ # pylint: disable=R0914 # too many local vars # Normal playlists start with PL, Mixes start with RD + first video ID regx = re.compile(r'((?:RD|PL)[-_0-9a-zA-Z]+)') m = regx.search(playlist_url) if not m: err = "Unrecognized playlist url: %s" raise ValueError(err % playlist_url) playlist_id = m.group(1) url = g.urls["playlist"] % playlist_id try: allinfo = fetch_decode(url) # unicode allinfo = json.loads(allinfo) except: raise IOError("Error fetching playlist %s" % m.groups(0)) # playlist specific metadata playlist = dict( playlist_id=playlist_id, likes=allinfo.get('likes'), title=allinfo.get('title'), author=allinfo.get('author'), dislikes=allinfo.get('dislikes'), description=allinfo.get('description'), items=[] ) # playlist items specific metadata for v in allinfo['video']: vid_data = dict( added=v.get('added'), is_cc=v.get('is_cc'), is_hd=v.get('is_hd'), likes=v.get('likes'), title=v.get('title'), views=v.get('views'), rating=v.get('rating'), author=v.get('author'), user_id=v.get('user_id'), privacy=v.get('privacy'), start=v.get('start', 0.0), dislikes=v.get('dislikes'), duration=v.get('duration'), comments=v.get('comments'), keywords=v.get('keywords'), thumbnail=v.get('thumbnail'), cc_license=v.get('cc_license'), category_id=v.get('category_id'), description=v.get('description'), encrypted_id=v.get('encrypted_id'), time_created=v.get('time_created'), time_updated=v.get('time_updated'), length_seconds=v.get('length_seconds'), end=v.get('end', v.get('length_seconds')) ) try: pafy_obj = new(vid_data['encrypted_id'], basic=basic, gdata=gdata, signature=signature, size=size, callback=callback) except IOError as e: callback("%s: %s" % (v['title'], e.message)) continue pafy_obj.populate_from_playlist(vid_data) playlist['items'].append(dict(pafy=pafy_obj, playlist_meta=vid_data)) callback("Added video: %s" % v['title']) return playlist def set_api_key(key): """Sets the api key to be used with youtube.""" g.api_key = key