Spaces:
Sleeping
Sleeping
| import subprocess | |
| import sys | |
| # Ensure compatible versions of httpx and httpcore are installed | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "httpx==0.18.2", "httpcore==0.13.6"]) | |
| import gradio as gr | |
| import requests | |
| import os | |
| import re | |
| import yt_dlp | |
| import logging | |
| # Configure logging for debugging purposes | |
| logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Fetch the keys from the environment variable and convert them into a list | |
| YOUTUBE_API_KEYS = os.getenv("YOUTUBE_API_KEYS") | |
| if YOUTUBE_API_KEYS: | |
| YOUTUBE_API_KEYS = [key.strip() for key in YOUTUBE_API_KEYS.split(",")] | |
| else: | |
| raise ValueError("API keys not found. Make sure the secret 'YOUTUBE_API_KEYS' is set.") | |
| # Index to keep track of which API key to use | |
| key_index = 0 | |
| def get_api_key(): | |
| global key_index | |
| # Get the current API key and increment the index | |
| api_key = YOUTUBE_API_KEYS[key_index] | |
| key_index = (key_index + 1) % len(YOUTUBE_API_KEYS) # Rotate to the next key | |
| return api_key | |
| # Function to search YouTube videos using yt-dlp for better reliability | |
| def youtube_search(query, max_results=50): | |
| ydl_opts = { | |
| 'quiet': False, # Set to False to get more detailed output from yt-dlp | |
| 'extract_flat': 'in_playlist', | |
| 'logger': logging.getLogger(), # Use the logging module to capture yt-dlp logs | |
| 'simulate': True, | |
| 'noplaylist': True, # To avoid playlist entries | |
| } | |
| search_url = f"ytsearch{max_results}:{query}" | |
| logging.debug(f"Starting YouTube search for query: {query}") | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| result = ydl.extract_info(search_url, download=False) | |
| gallery_items = [] | |
| if 'entries' in result: | |
| logging.debug(f"Number of entries found: {len(result['entries'])}") | |
| for entry in result['entries']: | |
| video_id = entry.get('id') | |
| thumbnail_url = entry.get('thumbnail') if entry.get('thumbnail') else "https://via.placeholder.com/150" | |
| video_title = entry.get('title', "Unknown Title") | |
| if video_id: | |
| gallery_items.append((thumbnail_url, video_id, video_title)) | |
| logging.debug(f"Added video: ID={video_id}, Thumbnail={thumbnail_url}, Title={video_title}") | |
| else: | |
| logging.debug(f"Missing video ID for entry: {entry}") | |
| else: | |
| logging.warning("No entries found in search result.") | |
| return gallery_items, "" | |
| except Exception as e: | |
| error_message = f"Error during YouTube yt-dlp request: {e}" | |
| logging.error(error_message) | |
| return [], error_message | |
| # Function to search YouTube videos using the API with pagination to get up to 1,000 results | |
| def youtube_api_search(query, max_results=1000): | |
| search_url = "https://www.googleapis.com/youtube/v3/search" | |
| all_results = [] | |
| params = { | |
| "part": "snippet", | |
| "q": query, | |
| "type": "video", | |
| "maxResults": 50 # YouTube API allows a maximum of 50 per request | |
| } | |
| try: | |
| while len(all_results) < max_results: | |
| params["key"] = get_api_key() # Get the current API key | |
| response = requests.get(search_url, params=params) | |
| # If we get a bad response, try the next API key | |
| if response.status_code == 403 or response.status_code == 429: | |
| logging.debug(f"Quota exceeded or forbidden for API key. Trying next key...") | |
| continue | |
| response.raise_for_status() # Raise an error for other bad responses (4xx or 5xx) | |
| results = response.json().get("items", []) | |
| all_results.extend(results) | |
| # If there is no nextPageToken, we've reached the end | |
| if 'nextPageToken' not in response.json() or len(all_results) >= max_results: | |
| break | |
| # Update params with the nextPageToken to get the next batch of results | |
| params['pageToken'] = response.json()['nextPageToken'] | |
| # Create a list of tuples with thumbnail URL, video ID, and video title | |
| gallery_items = [ | |
| ( | |
| result["snippet"].get("thumbnails", {}).get("medium", {}).get("url", "https://via.placeholder.com/150"), | |
| result["id"]["videoId"], | |
| result["snippet"].get("title", "No title available") | |
| ) for result in all_results | |
| ] | |
| return gallery_items | |
| except requests.exceptions.RequestException as e: | |
| # Print the error message to help debug issues | |
| logging.error(f"Error during YouTube API request: {e}") | |
| return [], f"Error retrieving video results: {str(e)}" | |
| # Function to display the video using the video URL | |
| def show_video(video_url): | |
| # Regular expression to extract the YouTube video ID from the URL | |
| video_id = None | |
| patterns = [ | |
| r"youtube\.com/watch\?v=([^&?\/]+)", | |
| r"youtube\.com/embed/([^&?\/]+)", | |
| r"youtube\.com/v/([^&?\/]+)", | |
| r"youtu\.be/([^&?\/]+)" | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, video_url) | |
| if match: | |
| video_id = match.group(1) | |
| logging.debug(f"Extracted video ID: {video_id}") | |
| break | |
| # If no video ID is found, return an error message | |
| if not video_id: | |
| logging.error("Invalid YouTube URL. Please enter a valid YouTube video link.") | |
| return "Invalid YouTube URL. Please enter a valid YouTube video link." | |
| # Create the embed URL | |
| embed_url = f"https://www.youtube.com/embed/{video_id}" | |
| logging.debug(f"Embed URL generated: {embed_url}") | |
| # Return an iframe with the video | |
| html_code = f''' | |
| <iframe width="560" height="315" src="{embed_url}" | |
| frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" | |
| allowfullscreen></iframe> | |
| ''' | |
| return html_code | |
| # Create the Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## YouTube Video Search, Selection, and Playback") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| search_query_input = gr.Textbox(label="Search YouTube", placeholder="Enter your search query here") | |
| search_button = gr.Button("Search") | |
| search_output = gr.Gallery(label="Search Results", columns=5, height="1500px") | |
| error_output = gr.Textbox(label="Error Message", interactive=False, visible=False) | |
| with gr.Column(scale=2): | |
| selected_video_link = gr.Textbox(label="Selected Video Link", interactive=False) | |
| play_video_button = gr.Button("Play Video") | |
| video_output = gr.HTML(label="Video Player") | |
| # Define search button behavior | |
| def update_search_results(query): | |
| gallery_items, error_message = youtube_search(query) | |
| if not gallery_items: # If yt-dlp search fails or returns empty, fall back to YouTube API | |
| gallery_items = youtube_api_search(query) | |
| if error_message: | |
| return [], error_message, gr.update(visible=True) | |
| # Display videos with thumbnails, video IDs, and titles | |
| gallery_items_display = [ | |
| ( | |
| item[0], | |
| f""" | |
| <div style='display: flex; align-items: center;'> | |
| <img src='{item[0]}' style='width: 150px; height: auto; margin-right: 10px;'> | |
| <div style='flex-grow: 1;'> | |
| <strong>{item[2]}</strong><br> | |
| Video ID: {item[1]} | |
| </div> | |
| </div> | |
| """ | |
| ) for item in gallery_items | |
| ] | |
| return gallery_items_display, "", gr.update(visible=False) | |
| # Update the selected video link field when a video is clicked in the gallery | |
| def on_video_select(evt: gr.SelectData): | |
| # Extract the video ID from the event value, which is a dictionary containing details of the selected item | |
| selected_video_id = evt.value["caption"].split('(')[-1][:-1] # Extract video ID from caption | |
| video_url = f"https://www.youtube.com/watch?v={selected_video_id}" | |
| logging.debug(f"Video selected: {video_url}") | |
| return video_url | |
| # Play the video when the Play Video button is clicked | |
| def play_video(video_url): | |
| logging.debug(f"Playing video with URL: {video_url}") | |
| return show_video(video_url) | |
| search_button.click(update_search_results, inputs=search_query_input, outputs=[search_output, error_output, error_output]) | |
| search_output.select(on_video_select, inputs=None, outputs=selected_video_link) | |
| play_video_button.click(play_video, inputs=selected_video_link, outputs=video_output) | |
| # Launch the Gradio interface | |
| demo.launch() |