download.py
da_url(url)
¶
Open and prompt for download of youtube best audio from url.
Source code in mps_youtube/commands/download.py
@command(r'daurl\s(.*[-_a-zA-Z0-9]{11}.*)', 'daurl')
def da_url(url):
""" Open and prompt for download of youtube best audio from url. """
g.browse_mode = "normal"
yt_url(url)
if len(g.model) == 1:
download("da", "1")
if g.command_line:
sys.exit()
dl_url(url)
¶
Open and prompt for download of youtube video url.
Source code in mps_youtube/commands/download.py
@command(r'dlurl\s(.*[-_a-zA-Z0-9]{11}.*)', 'dlurl')
def dl_url(url):
""" Open and prompt for download of youtube video url. """
g.browse_mode = "normal"
yt_url(url)
if len(g.model) == 1:
download("download", "1")
if g.command_line:
sys.exit()
down_many(dltype, choice, subdir=None)
¶
Download multiple items.
Source code in mps_youtube/commands/download.py
@command(r'(da|dv)\s+((?:\d+\s\d+|-\d+|\d+-|\d+,)(?:[\d\s,-]*))', 'da', 'dv')
def down_many(dltype, choice, subdir=None):
""" Download multiple items. """
choice = util.parse_multi(choice)
choice = list(set(choice))
downsongs = [g.model[int(x) - 1] for x in choice]
temp = g.model[::]
g.model.songs = downsongs[::]
count = len(downsongs)
av = "audio" if dltype.startswith("da") else "video"
msg = ""
def handle_error(message):
""" Handle error in download. """
g.message = message
g.content = disp
screen.update()
time.sleep(2)
g.model.songs.pop(0)
try:
for song in downsongs:
g.result_count = len(g.model)
disp = content.generate_songlist_display()
title = "Download Queue (%s):%s\n\n" % (av, c.w)
disp = re.sub(r"(Num\s*?Title.*?\n)", title, disp)
g.content = disp
screen.update()
try:
filename = _make_fname(song, None, av=av, subdir=subdir)
except IOError as e:
handle_error("Error for %s: %s" % (song.title, str(e)))
count -= 1
continue
except KeyError:
handle_error("No audio track for %s" % song.title)
count -= 1
continue
try:
_download(song, filename, url=None, audio=av == "audio")
except HTTPError:
handle_error("HTTP Error for %s" % song.title)
count -= 1
continue
g.model.songs.pop(0)
msg = "Downloaded %s items" % count
g.message = "Saved to " + c.g + song.title + c.w
except KeyboardInterrupt:
msg = "Downloads interrupted!"
finally:
g.model.songs = temp[::]
g.message = msg
g.result_count = len(g.model)
g.content = content.generate_songlist_display()
down_plist(dltype, parturl)
¶
Download YouTube playlist.
Source code in mps_youtube/commands/download.py
@command(r'(da|dv)pl\s+%s' % PL, 'dapl', 'dvpl')
def down_plist(dltype, parturl):
""" Download YouTube playlist. """
plist(parturl)
dump(False)
title = g.pafy_pls[parturl][0].title
# Remove double quotes for convenience
subdir = util.sanitize_filename(title.replace('"', ''))
down_many(dltype, "1-", subdir=subdir)
msg = g.message
plist(parturl)
g.message = msg
down_user_pls(dltype, user)
¶
Download all user playlists.
Source code in mps_youtube/commands/download.py
@command(r'(da|dv)upl\s+(.*)', 'daupl', 'dvupl')
def down_user_pls(dltype, user):
""" Download all user playlists. """
user_pls(user)
for i in g.ytpls:
down_plist(dltype, i.get('link'))
return
download(dltype, num)
¶
Download a track or playlist by menu item number.
Source code in mps_youtube/commands/download.py
@command(r'(dv|da|d|dl|download)\s*(\d{1,4})', 'da', 'dv', 'd', 'dl', 'download')
def download(dltype, num):
""" Download a track or playlist by menu item number. """
# This function needs refactoring!
# pylint: disable=R0912
# pylint: disable=R0914
if g.browse_mode == "ytpl" and dltype in ("da", "dv"):
plid = g.ytpls[int(num) - 1]["link"]
down_plist(dltype, plid)
return
elif g.browse_mode == "ytpl":
g.message = "Use da or dv to specify audio / video playlist download"
g.message = c.y + g.message + c.w
g.content = content.generate_songlist_display()
return
elif g.browse_mode != "normal":
g.message = "Download must refer to a specific video item"
g.message = c.y + g.message + c.w
g.content = content.generate_songlist_display()
return
screen.writestatus("Fetching video info...")
song = (g.model[int(num) - 1])
# best = dltype.startswith("dv") or dltype.startswith("da")
#
# if not best:
#
# try:
# # user prompt for download stream
# url, ext, url_au, ext_au = prompt_dl(song)
#
# except KeyboardInterrupt:
# g.message = c.r + "Download aborted!" + c.w
# g.content = content.generate_songlist_display()
# return
#
# if not url or ext_au == "abort":
# # abort on invalid stream selection
# g.content = content.generate_songlist_display()
# g.message = "%sNo download selected / invalid input%s" % (c.y, c.w)
# return
#
# else:
# # download user selected stream(s)
# filename = _make_fname(song, ext)
# args = (song, filename, url)
#
# if url_au and ext_au:
# # downloading video and audio stream for muxing
# audio = False
# filename_au = _make_fname(song, ext_au)
# args_au = (song, filename_au, url_au)
#
# else:
# audio = ext in ("m4a", "ogg")
#
# kwargs = dict(audio=audio)
#
# elif best:
# # set updownload without prompt
# url_au = None
# av = "audio" if dltype.startswith("da") else "video"
# audio = av == "audio"
# filename = _make_fname(song, None, av=av)
# args = (song, filename)
# kwargs = dict(url=None, audio=audio)
try:
# perform download(s)
# dl_filenames = [args[1]]
# f = _download(*args, **kwargs)
success = pafy.download_video(song.ytid, config.DDIR.get)
if success:
g.message = "Saved \'" + song.title + "\' to " + c.g + config.DDIR.get + c.w
# if url_au:
# dl_filenames += [args_au[1]]
# _download(*args_au, allow_transcode=False, **kwargs)
except KeyboardInterrupt:
g.message = c.r + "Download halted!" + c.w
# try:
# for downloaded in dl_filenames:
# os.remove(downloaded)
#
# except IOError:
# pass
# if url_au:
# # multiplex
# name, ext = os.path.splitext(args[1])
# tmpvideoname = name + '.' +str(random.randint(10000, 99999)) + ext
# os.rename(args[1], tmpvideoname)
# mux_cmd = [g.muxapp, "-i", tmpvideoname, "-i", args_au[1], "-c",
# "copy", name + ".mp4"]
#
# try:
# subprocess.call(mux_cmd)
# g.message = "Saved to :" + c.g + mux_cmd[7] + c.w
# os.remove(tmpvideoname)
# os.remove(args_au[1])
#
# except KeyboardInterrupt:
# g.message = "Audio/Video multiplex aborted!"
g.content = content.generate_songlist_display()
external_download(song, filename, url)
¶
Perform download using external application.
Source code in mps_youtube/commands/download.py
def external_download(song, filename, url):
""" Perform download using external application. """
cmd = config.DOWNLOAD_COMMAND.get
ddir, basename = config.DDIR.get, os.path.basename(filename)
cmd_list = shlex.split(cmd)
def list_string_sub(orig, repl, lst):
""" Replace substrings for items in a list. """
return [x if orig not in x else x.replace(orig, repl) for x in lst]
cmd_list = list_string_sub("%F", filename, cmd_list)
cmd_list = list_string_sub("%d", ddir, cmd_list)
cmd_list = list_string_sub("%f", basename, cmd_list)
cmd_list = list_string_sub("%u", url, cmd_list)
cmd_list = list_string_sub("%i", song.ytid, cmd_list)
util.dbg("Downloading using: %s", " ".join(cmd_list))
subprocess.call(cmd_list)
extract_metadata(name)
¶
Try to determine metadata from video title.
Source code in mps_youtube/commands/download.py
def extract_metadata(name):
""" Try to determine metadata from video title. """
seps = name.count(" - ")
artist = title = None
if seps == 1:
pos = name.find(" - ")
artist = name[:pos].strip()
title = name[pos + 3:].strip()
else:
title = name.strip()
return dict(artist=artist, title=title)
gen_dl_text(ddata, song, p)
¶
Generate text for dl screen.
Source code in mps_youtube/commands/download.py
def gen_dl_text(ddata, song, p):
""" Generate text for dl screen. """
hdr = []
hdr.append(" %s%s%s" % (c.r, song.title, c.w))
author = p.author
hdr.append(c.r + " Uploaded by " + author + c.w)
hdr.append(" [" + util.fmt_time(song.length) + "]")
hdr.append("")
heading = tuple("Item Format Quality Media Size Notes".split())
fmt = " {0}%-6s %-8s %-13s %-7s %-5s %-16s{1}"
heading = [fmt.format(c.w, c.w) % heading]
heading.append("")
content = []
for n, d in enumerate(ddata):
row = (n + 1, d['ext'], d['quality'], d['mediatype'], d['size'],
d['notes'])
fmt = " {0}%-6s %-8s %-13s %-7s %5s Mb %-16s{1}"
row = fmt.format(c.g, c.w) % row
content.append(row)
content.append("")
footer = "Select [%s1-%s%s] to download or [%sEnter%s] to return"
footer = [footer % (c.y, len(content) - 1, c.w, c.y, c.w)]
return(content, hdr, heading, footer)
get_dl_data(song, mediatype='any')
¶
Get filesize and metadata for all streams, return dict.
Source code in mps_youtube/commands/download.py
def get_dl_data(song, mediatype="any"):
""" Get filesize and metadata for all streams, return dict. """
def mbsize(x):
""" Return size in MB. """
return str(int(x / (1024 ** 2)))
p = util.get_pafy(song)
dldata = []
text = " [Fetching stream info] >"
streamlist = [x for x in p.allstreams]
if mediatype == "audio":
streamlist = [x for x in p.audiostreams]
l = len(streamlist)
for n, stream in enumerate(streamlist):
sys.stdout.write(text + "-" * n + ">" + " " * (l - n - 1) + "<\r")
sys.stdout.flush()
try:
size = mbsize(stream.get_filesize())
except TypeError:
util.dbg(c.r + "---Error getting stream size" + c.w)
size = 0
item = {'mediatype': stream.mediatype,
'size': size,
'ext': stream.extension,
'quality': stream.quality,
'notes': stream.notes,
'url': stream.url}
dldata.append(item)
screen.writestatus("")
return dldata, p
menu_prompt(model, prompt=' > ', rows=None, header=None, theading=None, footer=None, force=0)
¶
Generate a list of choice, returns item from model.
Source code in mps_youtube/commands/download.py
def menu_prompt(model, prompt=" > ", rows=None, header=None, theading=None,
footer=None, force=0):
""" Generate a list of choice, returns item from model. """
content = ""
for x in header, theading, rows, footer:
if isinstance(x, list):
for line in x:
content += line + "\n"
elif isinstance(x, str):
content += x + "\n"
g.content = content
screen.update()
choice = input(prompt)
if choice in model:
return model[choice]
elif force:
return menu_prompt(model, prompt, rows, header, theading, footer,
force)
elif not choice.strip():
return False, False
else: # unrecognised input
return False, "abort"
prompt_dl(song)
¶
Prompt user do choose a stream to dl. Return (url, extension).
Source code in mps_youtube/commands/download.py
def prompt_dl(song):
""" Prompt user do choose a stream to dl. Return (url, extension). """
# pylint: disable=R0914
dl_data, p = get_dl_data(song)
dl_text = gen_dl_text(dl_data, song, p)
model = [x['url'] for x in dl_data]
ed = enumerate(dl_data)
model = {str(n + 1): (x['url'], x['ext']) for n, x in ed}
url, ext = menu_prompt(model, "Download number: ", *dl_text)
url2 = ext2 = None
mediatype = [i for i in dl_data if i['url'] == url][0]['mediatype']
if mediatype == "video" and g.muxapp and not config.DOWNLOAD_COMMAND.get:
# offer mux if not using external downloader
dl_data, p = get_dl_data(song, mediatype="audio")
dl_text = gen_dl_text(dl_data, song, p)
au_choices = "1" if len(dl_data) == 1 else "1-%s" % len(dl_data)
footer = [util.F('-audio') % ext, util.F('select mux') % au_choices]
dl_text = tuple(dl_text[0:3]) + (footer,)
aext = ("ogg", "m4a")
model = [x['url'] for x in dl_data if x['ext'] in aext]
ed = enumerate(dl_data)
model = {str(n + 1): (x['url'], x['ext']) for n, x in ed}
prompt = "Audio stream: "
url2, ext2 = menu_prompt(model, prompt, *dl_text)
return url, ext, url2, ext2
remux_audio(filename, title)
¶
Remux audio file. Insert limited metadata tags.
Source code in mps_youtube/commands/download.py
def remux_audio(filename, title):
""" Remux audio file. Insert limited metadata tags. """
util.dbg("starting remux")
temp_file = filename + "." + str(random.randint(10000, 99999))
os.rename(filename, temp_file)
meta = extract_metadata(title)
metadata = ["title=%s" % meta["title"]]
if meta["artist"]:
metadata = ["title=%s" % meta["title"], "-metadata",
"artist=%s" % meta["artist"]]
cmd = [g.muxapp, "-y", "-i", temp_file, "-acodec", "copy", "-metadata"]
cmd += metadata + ["-vn", filename]
util.dbg(cmd)
try:
with open(os.devnull, "w") as devnull:
subprocess.call(cmd, stdout=devnull, stderr=subprocess.STDOUT)
except OSError:
util.dbg("Failed to remux audio using %s", g.muxapp)
os.rename(temp_file, filename)
else:
os.unlink(temp_file)
util.dbg("remuxed audio file using %s" % g.muxapp)
transcode(filename, enc_data)
¶
Re encode a download.
Source code in mps_youtube/commands/download.py
def transcode(filename, enc_data):
""" Re encode a download. """
base = os.path.splitext(filename)[0]
exe = g.muxapp if g.transcoder_path == "auto" else g.transcoder_path
# ensure valid executable
if not exe or not os.path.exists(exe) or not os.access(exe, os.X_OK):
util.xprint("Encoding failed. Couldn't find a valid encoder :(\n")
time.sleep(2)
return filename
command = shlex.split(enc_data['command'])
newcom, outfn = command[::], ""
for n, d in enumerate(command):
if d == "ENCODER_PATH":
newcom[n] = exe
elif d == "IN":
newcom[n] = filename
elif d == "OUT":
newcom[n] = outfn = base
elif d == "OUT.EXT":
newcom[n] = outfn = base + "." + enc_data['ext']
returncode = subprocess.call(newcom)
if returncode == 0 and g.delete_orig:
os.unlink(filename)
return outfn