Spaces:
Runtime error
Runtime error
| from crewai import Agent, Task, Crew | |
| import gradio as gr | |
| from gradio import ChatMessage | |
| import asyncio | |
| import re | |
| import sys | |
| from typing import List, Generator | |
| import os | |
| from dotenv import load_dotenv | |
| import threading | |
| from langchain_openai import ChatOpenAI | |
| class OutputParser: | |
| def __init__(self): | |
| self.buffer = "" | |
| self.current_agent = None | |
| self.final_article_sent = False | |
| self.message_queue = { | |
| "Content Planner": [], | |
| "Content Writer": [], | |
| "Editor": [] | |
| } | |
| self.agent_sequence = ["Content Planner", "Content Writer", "Editor"] | |
| def format_output(self, raw_content: str, agent_name: str) -> str: | |
| """Format the output content based on agent type.""" | |
| if agent_name == "Content Planner": | |
| # Clean up the planner's output to make it more readable | |
| lines = raw_content.split('\n') | |
| formatted_lines = [] | |
| for line in lines: | |
| # Remove number prefixes and clean up | |
| line = re.sub(r'^\d+\.\s*', '', line.strip()) | |
| # Make text size normal by removing markdown formatting | |
| line = re.sub(r'^#+\s*', '', line) | |
| if line: | |
| formatted_lines.append(line) | |
| return '\n\n'.join(formatted_lines) | |
| elif agent_name == "Content Writer": | |
| # Clean up writer's output to make it more readable | |
| # Remove markdown headers but keep the text | |
| content = re.sub(r'^#+\s*(.+)$', r'\1', raw_content, flags=re.MULTILINE) | |
| # Remove multiple newlines | |
| content = re.sub(r'\n{3,}', '\n\n', content) | |
| return content.strip() | |
| return raw_content.strip() | |
| def parse_output(self, text: str) -> List[ChatMessage]: | |
| messages = [] | |
| cleaned_text = re.sub(r'\x1B\[[0-9;]*[mK]', '', text) | |
| # Look for working agent declarations | |
| agent_match = re.search(r'\[DEBUG\]: == Working Agent: (.*?)(?=\n|$)', cleaned_text) | |
| if agent_match: | |
| self.current_agent = agent_match.group(1) | |
| self.message_queue[self.current_agent].append(ChatMessage( | |
| role="assistant", | |
| content=f"Starting work...", | |
| metadata={"title": f"π€ {self.current_agent}"} | |
| )) | |
| # Look for task information | |
| task_match = re.search(r'\[INFO\]: == Starting Task: (.*?)(?=\n\n|\n> Entering|$)', cleaned_text, re.DOTALL) | |
| if task_match and self.current_agent: | |
| task_content = task_match.group(1).strip() | |
| self.message_queue[self.current_agent].append(ChatMessage( | |
| role="assistant", | |
| content=task_content, | |
| metadata={"title": f"π Task for {self.current_agent}"} | |
| )) | |
| # Look for agent outputs in debug messages | |
| debug_match = re.search(r'\[DEBUG\]: == \[(.*?)\] Task output: (.*?)(?=\[DEBUG\]|$)', cleaned_text, re.DOTALL) | |
| if debug_match: | |
| agent_name = debug_match.group(1) | |
| output_content = debug_match.group(2).strip() | |
| # Format the output content | |
| formatted_content = self.format_output(output_content, agent_name) | |
| if agent_name == "Editor" and not self.final_article_sent: | |
| self.message_queue[agent_name].append(ChatMessage( | |
| role="assistant", | |
| content="Final article is ready!", | |
| metadata={"title": "π Final Article"} | |
| )) | |
| self.message_queue[agent_name].append(ChatMessage( | |
| role="assistant", | |
| content=formatted_content | |
| )) | |
| self.final_article_sent = True | |
| elif agent_name != "Editor": | |
| self.message_queue[agent_name].append(ChatMessage( | |
| role="assistant", | |
| content=formatted_content, | |
| metadata={"title": f"π‘ Output from {agent_name}"} | |
| )) | |
| # Return messages in the correct sequence | |
| for agent in self.agent_sequence: | |
| if self.message_queue[agent]: | |
| messages.extend(self.message_queue[agent]) | |
| self.message_queue[agent] = [] | |
| return messages | |
| class StreamingCapture: | |
| def __init__(self): | |
| self.buffer = "" | |
| def write(self, text): | |
| self.buffer += text | |
| return len(text) | |
| def flush(self): | |
| pass | |
| class ArticleCrew: | |
| def __init__(self, api_key: str = None): | |
| self.api_key = api_key | |
| self.initialize_agents() | |
| def initialize_agents(self): | |
| # Create a ChatOpenAI instance with the API key | |
| llm = ChatOpenAI( | |
| openai_api_key=self.api_key, | |
| temperature=0.7, | |
| model="gpt-4" | |
| ) | |
| # Initialize agents with the LLM | |
| self.planner = Agent( | |
| role="Content Planner", | |
| goal="Plan engaging and factually accurate content on {topic}", | |
| backstory="You're working on planning a blog article about the topic: {topic}. " | |
| "You collect information that helps the audience learn something " | |
| "and make informed decisions.", | |
| allow_delegation=False, | |
| verbose=True, | |
| llm=llm | |
| ) | |
| self.writer = Agent( | |
| role="Content Writer", | |
| goal="Write insightful and factually accurate opinion piece about the topic: {topic}", | |
| backstory="You're working on writing a new opinion piece about the topic: {topic}. " | |
| "You base your writing on the work of the Content Planner.", | |
| allow_delegation=False, | |
| verbose=True, | |
| llm=llm | |
| ) | |
| self.editor = Agent( | |
| role="Editor", | |
| goal="Edit a given blog post to align with the writing style", | |
| backstory="You are an editor who receives a blog post from the Content Writer.", | |
| allow_delegation=False, | |
| verbose=True, | |
| llm=llm | |
| ) | |
| self.output_parser = OutputParser() | |
| def create_tasks(self, topic: str): | |
| plan_task = Task( | |
| description=( | |
| f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" | |
| f"2. Identify the target audience, considering their interests and pain points.\n" | |
| f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" | |
| f"4. Include SEO keywords and relevant data or sources." | |
| ), | |
| expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", | |
| agent=self.planner | |
| ) | |
| write_task = Task( | |
| description=( | |
| "1. Use the content plan to craft a compelling blog post.\n" | |
| "2. Incorporate SEO keywords naturally.\n" | |
| "3. Sections/Subtitles are properly named in an engaging manner.\n" | |
| "4. Ensure proper structure with introduction, body, and conclusion.\n" | |
| "5. Proofread for grammatical errors." | |
| ), | |
| expected_output="A well-written blog post in markdown format, ready for publication.", | |
| agent=self.writer | |
| ) | |
| edit_task = Task( | |
| description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", | |
| expected_output="A well-written blog post in markdown format, ready for publication.", | |
| agent=self.editor | |
| ) | |
| return [plan_task, write_task, edit_task] | |
| async def process_article(self, topic: str) -> Generator[List[ChatMessage], None, None]: | |
| crew = Crew( | |
| agents=[self.planner, self.writer, self.editor], | |
| tasks=self.create_tasks(topic), | |
| verbose=True | |
| ) | |
| capture = StreamingCapture() | |
| original_stdout = sys.stdout | |
| sys.stdout = capture | |
| try: | |
| # Start the crew task in a separate thread to not block streaming | |
| result_container = [] | |
| def run_crew(): | |
| try: | |
| result = crew.kickoff(inputs={"topic": topic}) | |
| result_container.append(result) | |
| except Exception as e: | |
| result_container.append(e) | |
| thread = threading.Thread(target=run_crew) | |
| thread.start() | |
| # Stream output while the crew is working | |
| last_processed = 0 | |
| while thread.is_alive() or last_processed < len(capture.buffer): | |
| if len(capture.buffer) > last_processed: | |
| new_content = capture.buffer[last_processed:] | |
| messages = self.output_parser.parse_output(new_content) | |
| if messages: | |
| for msg in messages: | |
| yield [msg] | |
| last_processed = len(capture.buffer) | |
| await asyncio.sleep(0.1) | |
| # Check if we got a result or an error | |
| if result_container and not isinstance(result_container[0], Exception): | |
| # Final messages already sent by the parser | |
| pass | |
| else: | |
| yield [ChatMessage( | |
| role="assistant", | |
| content="An error occurred while generating the article.", | |
| metadata={"title": "β Error"} | |
| )] | |
| finally: | |
| sys.stdout = original_stdout | |
| def create_demo(): | |
| article_crew = None # Initialize as None | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π AI Article Writing Crew") | |
| gr.Markdown("Watch as this AI Crew collaborates to create your article! This application utilizes [CrewAI](https://www.crewai.com/) agents: Content Planner, Content Writer, and Content Editor, to write an article on any topic you choose. To get started, enter your OpenAI API Key below and press Enter!") | |
| openai_api_key = gr.Textbox( | |
| label='OpenAI API Key', | |
| type='password', | |
| placeholder='Type your OpenAI API key and press Enter!', | |
| interactive=True) | |
| chatbot = gr.Chatbot( | |
| label="Writing Process", | |
| avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), | |
| height=700, | |
| type="messages", | |
| show_label=True, | |
| visible=False | |
| ) | |
| with gr.Row(equal_height=True): | |
| topic = gr.Textbox( | |
| label="Article Topic", | |
| placeholder="Enter the topic you want an article about...", | |
| scale=4, | |
| visible=False | |
| ) | |
| async def process_input(topic, history, openai_api_key): | |
| nonlocal article_crew | |
| # Initialize ArticleCrew with the API key if not already initialized | |
| if article_crew is None: | |
| article_crew = ArticleCrew(api_key=openai_api_key) | |
| history.append(ChatMessage(role="user", content=f"Write an article about: {topic}")) | |
| yield history | |
| async for messages in article_crew.process_article(topic): | |
| history.extend(messages) | |
| yield history | |
| btn = gr.Button("Write Article", variant="primary", scale=1, visible=False) | |
| def show_interface(): | |
| return { | |
| openai_api_key: gr.Textbox(visible=False), | |
| chatbot: gr.Chatbot(visible=True), | |
| topic: gr.Textbox(visible=True), | |
| btn: gr.Button(visible=True) | |
| } | |
| openai_api_key.submit( | |
| show_interface, | |
| None, | |
| [openai_api_key, chatbot, topic, btn] | |
| ) | |
| btn.click( | |
| process_input, | |
| inputs=[topic, chatbot, openai_api_key], | |
| outputs=[chatbot] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| demo.queue() | |
| demo.launch() |