Spaces:
Sleeping
Sleeping
| # from fastapi import FastAPI, Response | |
| # from fastapi.responses import FileResponse | |
| # from kokoro import KPipeline | |
| # import soundfile as sf | |
| # import os | |
| # import numpy as np | |
| # import torch | |
| # from huggingface_hub import InferenceClient | |
| # def llm_chat_response(text): | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # client = InferenceClient(api_key=HF_TOKEN) | |
| # messages = [ | |
| # { | |
| # "role": "user", | |
| # "content": [ | |
| # { | |
| # "type": "text", | |
| # "text": text + str('describe in one line only') | |
| # } #, | |
| # # { | |
| # # "type": "image_url", | |
| # # "image_url": { | |
| # # "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" | |
| # # } | |
| # # } | |
| # ] | |
| # } | |
| # ] | |
| # response_from_llama = client.chat.completions.create( | |
| # model="meta-llama/Llama-3.2-11B-Vision-Instruct", | |
| # messages=messages, | |
| # max_tokens=500) | |
| # return response_from_llama.choices[0].message['content'] | |
| # app = FastAPI() | |
| # # Initialize pipeline once at startup | |
| # pipeline = KPipeline(lang_code='a') | |
| # @app.post("/generate") | |
| # async def generate_audio(text: str, voice: str = "af_heart", speed: float = 1.0): | |
| # text_reply = llm_chat_response(text) | |
| # # Generate audio | |
| # generator = pipeline( | |
| # text_reply, | |
| # voice=voice, | |
| # speed=speed, | |
| # split_pattern=r'\n+' | |
| # ) | |
| # # # Save first segment only for demo | |
| # # for i, (gs, ps, audio) in enumerate(generator): | |
| # # sf.write(f"output_{i}.wav", audio, 24000) | |
| # # return FileResponse( | |
| # # f"output_{i}.wav", | |
| # # media_type="audio/wav", | |
| # # filename="output.wav" | |
| # # ) | |
| # # return Response("No audio generated", status_code=400) | |
| # # Process only the first segment for demo | |
| # for i, (gs, ps, audio) in enumerate(generator): | |
| # # Convert PyTorch tensor to NumPy array | |
| # audio_numpy = audio.cpu().numpy() | |
| # # Convert to 16-bit PCM | |
| # # Ensure the audio is in the range [-1, 1] | |
| # audio_numpy = np.clip(audio_numpy, -1, 1) | |
| # # Convert to 16-bit signed integers | |
| # pcm_data = (audio_numpy * 32767).astype(np.int16) | |
| # # Convert to bytes (automatically uses row-major order) | |
| # raw_audio = pcm_data.tobytes() | |
| # # Return PCM data with minimal necessary headers | |
| # return Response( | |
| # content=raw_audio, | |
| # media_type="application/octet-stream", | |
| # headers={ | |
| # "Content-Disposition": f'attachment; filename="output.pcm"', | |
| # "X-Sample-Rate": "24000", | |
| # "X-Bits-Per-Sample": "16", | |
| # "X-Endianness": "little" | |
| # } | |
| # ) | |
| # return Response("No audio generated", status_code=400) | |
| from fastapi import FastAPI, Response, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from kokoro import KPipeline | |
| import soundfile as sf | |
| import os | |
| import numpy as np | |
| import torch | |
| from huggingface_hub import InferenceClient | |
| from pydantic import BaseModel | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| import logging | |
| from typing import Optional | |
| import uuid | |
| import pathlib | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Create a directory for temporary image storage | |
| TEMP_DIR = pathlib.Path("./temp_images") | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| class TextImageRequest(BaseModel): | |
| text: Optional[str] = None | |
| image_base64: Optional[str] = None | |
| voice: str = "af_heart" | |
| speed: float = 1.0 | |
| class AudioResponse(BaseModel): | |
| status: str | |
| message: str | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="Text-to-Speech API with Vision Support", | |
| description="API for generating speech from text with optional image analysis", | |
| version="1.0.0" | |
| ) | |
| def save_base64_image(image_base64): | |
| """Save base64 image to a temporary file and return the file path""" | |
| try: | |
| # Generate a unique filename | |
| filename = f"{uuid.uuid4()}.jpg" | |
| filepath = TEMP_DIR / filename | |
| # Decode and save the image | |
| image_data = base64.b64decode(image_base64) | |
| with open(filepath, "wb") as f: | |
| f.write(image_data) | |
| # Return the file URL (using file:// protocol) | |
| return f"file://{filepath.absolute()}" | |
| except Exception as e: | |
| logger.error(f"Error saving base64 image: {str(e)}") | |
| raise HTTPException(status_code=400, detail=f"Invalid base64 image data: {str(e)}") | |
| def llm_chat_response(text, image_base64=None): | |
| """Function to get responses from LLM with text and optionally image input.""" | |
| try: | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| logger.info("Checking HF_TOKEN...") | |
| if not HF_TOKEN: | |
| logger.error("HF_TOKEN not found in environment variables") | |
| raise HTTPException(status_code=500, detail="HF_TOKEN not configured") | |
| logger.info("Initializing InferenceClient...") | |
| client = InferenceClient( | |
| provider="nebius", # Using novita as in your working example | |
| api_key=HF_TOKEN | |
| ) | |
| # Build the messages payload using the format from your working example | |
| message_content = [{ | |
| "type": "text", | |
| "text": text + ("" if image_base64 else " describe in one line only") | |
| }] | |
| if image_base64: | |
| logger.info("Processing base64 image...") | |
| # Save the base64 image to a file and get the file URL | |
| image_url = save_base64_image(image_base64) | |
| logger.info(f"Image saved at: {image_url}") | |
| # Create data URI | |
| data_uri = f"data:image/jpeg;base64,{image_base64}" | |
| # Add image to message content | |
| message_content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": data_uri} | |
| }) | |
| # Construct the messages array exactly as in your working example | |
| messages = [{ | |
| "role": "user", | |
| "content": message_content | |
| }] | |
| logger.info("Sending request to model...") | |
| try: | |
| completion = client.chat.completions.create( | |
| model="mistralai/Mistral-Small-3.1-24B-Instruct-2503", | |
| messages=messages, | |
| max_tokens=500 | |
| ) | |
| except Exception as http_err: | |
| # Log HTTP errors from the request | |
| logger.error(f"HTTP error occurred: {str(http_err)}") | |
| raise HTTPException(status_code=500, detail=str(http_err)) | |
| logger.info(f"Raw model response received") | |
| # Extract the response using the same method as your working code | |
| if not completion.choices or len(completion.choices) == 0: | |
| logger.error("No choices returned from model.") | |
| raise HTTPException(status_code=500, detail="Model returned no choices.") | |
| # Extract the response message from the first choice | |
| choice = completion.choices[0] | |
| response_message = None | |
| if hasattr(choice, "message"): | |
| response_message = choice.message | |
| elif isinstance(choice, dict): | |
| response_message = choice.get("message") | |
| if not response_message: | |
| logger.error(f"Response message is empty: {choice}") | |
| raise HTTPException(status_code=500, detail="Model response did not include a message.") | |
| content = None | |
| if isinstance(response_message, dict): | |
| content = response_message.get("content") | |
| if content is None and hasattr(response_message, "content"): | |
| content = response_message.content | |
| if not content: | |
| logger.error(f"Message content is missing: {response_message}") | |
| raise HTTPException(status_code=500, detail="Model message did not include content.") | |
| return content | |
| except Exception as e: | |
| logger.error(f"Error in llm_chat_response: {str(e)}") | |
| # Fallback response in case of error | |
| return "I couldn't process that input. Please try again with a different image or text query." | |
| # Initialize pipeline once at startup | |
| try: | |
| logger.info("Initializing KPipeline...") | |
| pipeline = KPipeline(lang_code='a') | |
| logger.info("KPipeline initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize KPipeline: {str(e)}") | |
| # We'll let the app start anyway, but log the error | |
| async def generate_audio(request: TextImageRequest): | |
| """ | |
| Generate audio from text and optionally analyze an image. | |
| - If text is provided, uses that as input | |
| - If image is provided, analyzes the image | |
| - Converts the LLM response to speech using the specified voice and speed | |
| """ | |
| try: | |
| logger.info(f"Received audio generation request") | |
| # If no text is provided but image is provided, use default prompt | |
| user_text = request.text if request.text is not None else "" | |
| if not user_text and request.image_base64: | |
| user_text = "Describe what you see in the image" | |
| elif not user_text and not request.image_base64: | |
| logger.error("Neither text nor image provided in request") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "Request must include either text or image_base64"} | |
| ) | |
| # Generate response using text and image if provided | |
| logger.info("Getting LLM response...") | |
| text_reply = llm_chat_response(user_text, request.image_base64) | |
| logger.info(f"LLM response: {text_reply}") | |
| # Generate audio | |
| logger.info(f"Generating audio using voice={request.voice}, speed={request.speed}") | |
| try: | |
| generator = pipeline( | |
| text_reply, | |
| voice=request.voice, | |
| speed=request.speed, | |
| split_pattern=r'\n+' | |
| ) | |
| # Process only the first segment for demo | |
| for i, (gs, ps, audio) in enumerate(generator): | |
| logger.info(f"Audio generated successfully: segment {i}") | |
| # Convert PyTorch tensor to NumPy array | |
| audio_numpy = audio.cpu().numpy() | |
| # Convert to 16-bit PCM | |
| # Ensure the audio is in the range [-1, 1] | |
| audio_numpy = np.clip(audio_numpy, -1, 1) | |
| # Convert to 16-bit signed integers | |
| pcm_data = (audio_numpy * 32767).astype(np.int16) | |
| # Convert to bytes (automatically uses row-major order) | |
| raw_audio = pcm_data.tobytes() | |
| # Return PCM data with minimal necessary headers | |
| return Response( | |
| content=raw_audio, | |
| media_type="application/octet-stream", | |
| headers={ | |
| "Content-Disposition": f'attachment; filename="output.pcm"', | |
| "X-Sample-Rate": "24000", | |
| "X-Bits-Per-Sample": "16", | |
| "X-Endianness": "little" | |
| } | |
| ) | |
| logger.error("No audio segments generated") | |
| return JSONResponse( | |
| status_code=400, | |
| content={"error": "No audio generated", "detail": "The pipeline did not produce any audio"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating audio: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Audio generation failed", "detail": str(e)} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error in generate_audio endpoint: {str(e)}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={"error": "Internal server error", "detail": str(e)} | |
| ) | |
| async def root(): | |
| return {"message": "Welcome to the Text-to-Speech API with Vision Support. Use POST /generate endpoint with 'text' and optionally 'image_base64' for queries."} | |
| # Cleanup function to periodically remove old temporary images | |
| async def startup_event(): | |
| # You could add scheduled tasks here to clean up old images | |
| pass | |
| async def not_found_handler(request, exc): | |
| return JSONResponse( | |
| status_code=404, | |
| content={"error": "Endpoint not found. Please use POST /generate for queries."} | |
| ) | |
| async def method_not_allowed_handler(request, exc): | |
| return JSONResponse( | |
| status_code=405, | |
| content={"error": "Method not allowed. Please check the API documentation."} | |
| ) |