Skip to content

search.py

cache_userdata(userterm, username, channel_id)

Cache user name and channel id tuple

Source code in mps_youtube/commands/search.py
def cache_userdata(userterm, username, channel_id):
    """ Cache user name and channel id tuple """
    userterm = ''.join([t.strip().lower() for t in userterm.split(' ')])
    g.username_query_cache[userterm] = (username, channel_id)
    util.dbg('Cache data for username search query "{}": {} ({})'.format(
        userterm, username, channel_id))

    while len(g.username_query_cache) > 300:
        g.username_query_cache.popitem(last=False)
    return (username, channel_id)

channelfromname(user)

Query channel id from username.

Source code in mps_youtube/commands/search.py
def channelfromname(user):
    """ Query channel id from username. """

    cached = userdata_cached(user)
    if cached:
        return cached

    try:
        channel_id, channel_name = pafy.channel_id_from_name(user)
        return cache_userdata(user, channel_name, channel_id)

    except Exception as e:
        import traceback
        traceback.print_exception(type(e), e, e.__traceback__)
        g.message = "Could not retrieve information for user {}\n{}".format(
            c.y + user + c.w, e)
        util.dbg('Error during channel request for user {}:\n{}'.format(
            user, e))

    # at this point, we know the channel id associated to a user name
    return None

get_pl_from_json(pldata)

Process json playlist data.

Source code in mps_youtube/commands/search.py
def get_pl_from_json(pldata):
    """ Process json playlist data. """

    try:
        items = pldata

    except KeyError:
        items = []

    results = []

    for item in items:
        results.append(dict(
            link=item["id"],
            size=item["videoCount"],
            title=item["title"],
            author=item['channel']["name"],
            created=item.get("publishedAt"),
            updated=item.get('publishedAt'), #XXX Not available in API?
            description=item.get("description")))

    return results

get_track_id_from_json(item)

Try to extract video Id from various response types

Source code in mps_youtube/commands/search.py
def get_track_id_from_json(item):
    """ Try to extract video Id from various response types """
    fields = ['contentDetails/videoId',
              'snippet/resourceId/videoId',
              'id/videoId',
              'id']
    for field in fields:
        node = item
        for p in field.split('/'):
            if node and isinstance(node, dict):
                node = node.get(p)
        if node:
            return node
    return ''

get_tracks_from_json(jsons)

Get search results from API response

Source code in mps_youtube/commands/search.py
def get_tracks_from_json(jsons):
    """ Get search results from API response """

    if len(jsons) == 0:
        util.dbg("got unexpected data or no search results")
        return ()

    # populate list of video objects
    songs = []
    for item in jsons:
        try:
            ytid = get_track_id_from_json(item)
            duration = util.parse_video_length(item.get('duration'))
            #stats = item.get('statistics', {})
            #snippet = item.get('snippet', {})
            title = item.get('title', '').strip()
            # instantiate video representation in local model
            cursong = Video(ytid=ytid, title=title, length=duration)
            dislike_data = {'likes': 0, 'dislikes':0, 'rating':0}#pafy.return_dislikes(ytid)
            likes = int(dislike_data['likes'])
            dislikes = int(dislike_data['dislikes'])
            # this is a very poor attempt to calculate a rating value
            rating = int(dislike_data['rating'])#5.*likes/(likes+dislikes) if (likes+dislikes) > 0 else 0
            category = '?'#snippet.get('categoryId')
            publishedlocaldatetime = item.get('publishedTime','?')#util.yt_datetime_local(snippet.get('publishedAt', ''))

            # cache video information in custom global variable store
            g.meta[ytid] = dict(
                # tries to get localized title first, fallback to normal title
                title=title,
                length=str(util.fmt_time(cursong.length)),
                rating=rating,#str('{}'.format(rating))[:4].ljust(4, "0"),
                uploader=item['channel']['id'],
                uploaderName=item['channel']['name'],
                category=category,
                aspect="custom", #XXX
                uploaded=publishedlocaldatetime,#publishedlocaldatetime[1] if publishedlocaldatetime is not None else None,
                uploadedTime=publishedlocaldatetime,#,publishedlocaldatetime[2] if publishedlocaldatetime is not None else None,
                likes=str(num_repr(likes)),
                dislikes=str(num_repr(dislikes)),
                commentCount='?',#str(num_repr(int(stats.get('commentCount', 0)))),
                viewCount= item['viewCount']['text'] if 'viewCount' in item.keys() else '?')#str(num_repr(int(stats.get('viewCount', 0)))))
            songs.append(cursong)

        except Exception as e:
            import traceback
            traceback.print_exception(type(e), e, e.__traceback__)
            input('Press any key to continue...')
            util.dbg(json.dumps(item, indent=2))
            util.dbg('Error during metadata extraction/instantiation of ' +
                'search result {}\n{}'.format(ytid, e))



    # return video objects
    return songs

mix(num)

Retrieves the YouTube mix for the selected video.

Source code in mps_youtube/commands/search.py
@command(r'mix\s*(\d{1,4})', 'mix')
def mix(num):
    """ Retrieves the YouTube mix for the selected video. """
    g.content = g.content or content.generate_songlist_display()
    if g.browse_mode != "normal":
        g.message = util.F('mix only videos')
    else:
        item = (g.model[int(num) - 1])
        if item is None:
            g.message = util.F('invalid item')
            return
        item = util.get_pafy(item)
        # Mix playlists are made up of 'RD' + video_id
        try:
            plist("RD" + item.videoid)
        except OSError:
            g.message = util.F('no mix')

num_repr(num)

Return up to four digit string representation of a number, eg 2.6m.

Source code in mps_youtube/commands/search.py
def num_repr(num):
    """ Return up to four digit string representation of a number, eg 2.6m. """
    if num <= 9999:
        return str(num)

    def digit_count(x):
        """ Return number of digits. """
        return int(math.floor(math.log10(x)) + 1)

    digits = digit_count(num)
    sig = 3 if digits % 3 == 0 else 2
    rounded = int(round(num, int(sig - digits)))
    digits = digit_count(rounded)
    suffix = "_kmBTqXYX"[(digits - 1) // 3]
    front = 3 if digits % 3 == 0 else digits % 3

    if not front == 1:
        return str(rounded)[0:front] + suffix

    return str(rounded)[0] + "." + str(rounded)[1] + suffix

Search for YouTube playlists.

term can be query str or dict indicating user playlist search.

Source code in mps_youtube/commands/search.py
@command(r'(?:\.\.|\/\/|pls(?:earch)?\s)\s*(.*)', 'plsearch')
def pl_search(term, page=0, splash=True, is_user=False):
    """ Search for YouTube playlists.

    term can be query str or dict indicating user playlist search.

    """
    if not term or len(term) < 2:
        g.message = c.r + "Not enough input" + c.w
        g.content = content.generate_songlist_display()
        return

    if splash:
        g.content = content.logo(c.g)
        prog = "user: " + term if is_user else term
        g.message = "Searching playlists for %s" % c.y + prog + c.w
        screen.update()

    if is_user:
        ret = channelfromname(term)
        if not ret: # Error
            return
        user, channel_id = ret

    else:
        # playlist search is done with the above url and param type=playlist
        logging.info("playlist search for %s", prog)
        # qs = generate_search_qs(term)
        # qs['pageToken'] = token(page)
        # qs['type'] = 'playlist'
        # if 'videoCategoryId' in qs:
        #     del qs['videoCategoryId'] # Incompatable with type=playlist

        pldata = pafy.playlist_search(term)

        #id_list = [i.get('id', {}) for i in pldata]

        result_count = len(pldata)

    #todo: what is the purpose of this code? #qs = {'part': 'contentDetails,snippet','maxResults': 50}
    if is_user:
        if page:
            pass #qs['pageToken'] = token(page)
        pass #qs['channelId'] = channel_id
    else:
        pass #qs['id'] = ','.join(id_list)

    pldata = pafy.playlist_search(term)
    playlists = get_pl_from_json(pldata)[:util.getxy().max_results]

    # if is_user:
    #     result_count = pldata['pageInfo']['totalResults']

    if playlists:
        g.last_search_query = (pl_search, {"term": term, "is_user": is_user})
        g.browse_mode = "ytpl"
        g.current_page = page
        g.result_count = result_count
        g.ytpls = playlists
        g.message = "Playlist results for %s" % c.y + prog + c.w
        g.content = content.generate_playlist_display()

    else:
        g.message = "No playlists found for: %s" % c.y + prog + c.w
        g.current_page = 0
        g.content = content.generate_songlist_display(zeromsg=g.message)

related(num)

Show videos related to to vid num.

Source code in mps_youtube/commands/search.py
@command(r'r\s?(\d{1,4})', 'r')
def related(num):
    """ Show videos related to to vid num. """
    if g.browse_mode != "normal":
        g.message = "Related items must refer to a specific video item"
        g.message = c.y + g.message + c.w
        g.content = content.generate_songlist_display()
        return

    g.current_page = 0
    item = g.model[int(num) - 1]
    related_search(item)

Fetch videos related to vitem vitem = {'description': str, 'length': int, 'title': str, 'ytid': str}

Source code in mps_youtube/commands/search.py
def related_search(vitem):
    """ Fetch videos related to vitem
    vitem = {'description': str, 'length': int, 'title': str, 'ytid': str}
    """

    t = vitem.title
    ttitle = t[:48].strip() + ".." if len(t) > 49 else t

    msg = "Videos related to %s%s%s" % (c.y, ttitle, c.w)
    failmsg = "Related to %s%s%s not found" % (c.y, vitem.ytid, c.w)

    # todo: implement realted search in pafy
    #_search(ttitle, vitem.title, msg, failmsg)

search(term)

Perform search.

Source code in mps_youtube/commands/search.py
@command(r'(?:search|\.|/)\s*([^./].{1,500})', 'search')
def search(term):
    """ Perform search. """
    try:  # TODO make use of unknowns
        args, unknown = parser.parse_known_args(term.split())
        video_duration = args.duration if args.duration else 'any'
        if args.category:
            if not args.category[0].isdigit():
                args.category = g.categories.get(args.category[0])
            else:
                args.category = "".join(args.category)
        after = args.after
        term = ' '.join(args.search)
    except SystemExit:  # <------ argsparse calls exit()
        g.message = c.b + "Bad syntax. Enter h for help" + c.w
        return

    if not term or len(term) < 2:
        g.message = c.r + "Not enough input" + c.w
        g.content = content.generate_songlist_display()
        return

    logging.info("search for %s", term)
    #query = generate_search_qs(term, videoDuration=video_duration, after=after, category=args.category, is_live=args.live)

    msg = "Search results for %s%s%s" % (c.y, term, c.w)
    failmsg = "Found nothing for %s%s%s" % (c.y, term, c.w)
    wdata = pafy.search_videos(term, int(config.PAGES.get))
    _display_search_results(term, wdata, msg, failmsg)

token(page)

Returns a page token for a given start index.

Source code in mps_youtube/commands/search.py
def token(page):
    """ Returns a page token for a given start index. """
    index = (page or 0) * util.getxy().max_results
    k = index//128 - 1
    index -= 128 * k
    f = [8, index]
    if k > 0 or index > 127:
        f.append(k+1)
    f += [16, 0]
    b64 = base64.b64encode(bytes(f)).decode('utf8')
    return b64.strip('=')

user_more(num)

Show more videos from user of vid num.

Source code in mps_youtube/commands/search.py
@command(r'u\s?([\d]{1,4})', 'u')
def user_more(num):
    """ Show more videos from user of vid num. """
    if g.browse_mode != "normal":
        g.message = "User uploads must refer to a specific video item"
        g.message = c.y + g.message + c.w
        g.content = content.generate_songlist_display()
        return

    g.current_page = 0
    item = g.model[int(num) - 1]

    #TODO: Cleaner way of doing this?
    if item.ytid in g.meta:
        channel_id = g.meta.get(item.ytid, {}).get('uploader')
        user = g.meta.get(item.ytid, {}).get('uploaderName')
    else:
        paf = util.get_pafy(item)
        user, channel_id = channelfromname(paf.author)

    usersearch_id(user, channel_id, '')

user_pls(user)

Retrieve user playlists.

Source code in mps_youtube/commands/search.py
@command(r'u(?:ser)?pl\s(.*)', 'userpl', 'upl')
def user_pls(user):
    """ Retrieve user playlists. """
    return usersearch_id(user, pafy.channel_id_from_name(user)[0], '')#pl_search(user, is_user=True)

userdata_cached(userterm)

Check if user name search term found in cache

Source code in mps_youtube/commands/search.py
def userdata_cached(userterm):
    """ Check if user name search term found in cache """
    userterm = ''.join([t.strip().lower() for t in userterm.split(' ')])
    return g.username_query_cache.get(userterm)

usersearch(q_user, identify='forUsername')

Fetch uploads by a YouTube user.

Source code in mps_youtube/commands/search.py
@command(r'user\s+(.+)', 'user')
def usersearch(q_user, identify='forUsername'):
    """ Fetch uploads by a YouTube user. """

    user, _, term = (x.strip() for x in q_user.partition("/"))
    if identify == 'forUsername':
        ret = channelfromname(user)
        if not ret:  # Error
            return
        user, channel_id = ret

    else:
        channel_id = user

    # at this point, we know the channel id associated to a user name
    usersearch_id(user, channel_id, term)

usersearch_id(user, channel_id, term)

Performs a search within a user's (i.e. a channel's) uploads for an optional search term with the user (i.e. the channel) identified by its ID

Source code in mps_youtube/commands/search.py
def usersearch_id(user, channel_id, term):
    """ Performs a search within a user's (i.e. a channel's) uploads
    for an optional search term with the user (i.e. the channel)
    identified by its ID """

    #query = generate_search_qs(term)
    aliases = dict(views='viewCount')  # The value of the config item is 'views' not 'viewCount'
    if config.USER_ORDER.get:
        pass
        #query['order'] = aliases.get(config.USER_ORDER.get,
        #        config.USER_ORDER.get)
    #query['channelId'] = channel_id

    termuser = tuple([c.y + x + c.w for x in (term, user)])
    if term:
        msg = "Results for {1}{3}{0} (by {2}{4}{0})"
        progtext = "%s by %s" % termuser
        failmsg = "No matching results for %s (by %s)" % termuser
    else:
        msg = "Video uploads by {2}{4}{0}"
        progtext = termuser[1]
        if config.SEARCH_MUSIC:
            failmsg = """User %s not found or has no videos in the Music category.
Use 'set search_music False' to show results not in the Music category.""" % termuser[1]
        else:
            failmsg = "User %s not found or has no videos."  % termuser[1]
    msg = str(msg).format(c.w, c.y, c.y, term, user)
    results = pafy.all_videos_from_channel(channel_id)
    _display_search_results(progtext, results, msg, failmsg)

yt_url(url, print_title=0)

Acess videos by urls.

Source code in mps_youtube/commands/search.py
@command(r'url\s(.*[-_a-zA-Z0-9]{11}.*)', 'url')
def yt_url(url, print_title=0):
    """ Acess videos by urls. """
    url_list = url.split()

    g.model.songs = []

    for u in url_list:
        try:
            p = pafy.get_video_info(url.split('?v=')[1])#util.get_pafy(u)

        except (IOError, ValueError) as e:
            g.message = c.r + str(e) + c.w
            g.content = g.content or content.generate_songlist_display(
                    zeromsg=g.message)
            return

        g.browse_mode = "normal"
        v = Video(p['id'], p['title'], int(p['duration']['secondsText']))
        g.model.songs.append(v)

    if not g.command_line:
        g.content = content.generate_songlist_display()

    if print_title:
        util.xprint(v.title)

yt_url_file(file_name)

Access a list of urls in a text file

Source code in mps_youtube/commands/search.py
@command(r'url_file\s(\S+)', 'url_file')
def yt_url_file(file_name):
    """ Access a list of urls in a text file """

    #Open and read the file
    try:
        with open(file_name, "r") as fo:
            output = ' '.join([line.strip() for line in fo if line.strip()])

    except (IOError):
        g.message = c.r + 'Error while opening the file, check the validity of the path' + c.w
        g.content = g.content or content.generate_songlist_display(
                zeromsg=g.message)
        return

    #Finally pass the input to yt_url
    yt_url(output)