|
|
|
|
|
""" |
|
|
CFP-Jarvis1 with Tool Calling capabilities |
|
|
The model can write and execute code to accomplish tasks |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import subprocess |
|
|
import tempfile |
|
|
import traceback |
|
|
from typing import Dict, Any, List, Optional |
|
|
from dataclasses import dataclass |
|
|
import torch |
|
|
from PIL import Image |
|
|
from transformers import AutoProcessor, AutoModelForImageTextToText |
|
|
|
|
|
@dataclass |
|
|
class ToolCall: |
|
|
"""Represents a tool call request from the model""" |
|
|
tool_name: str |
|
|
code: str |
|
|
language: str = "python" |
|
|
description: str = "" |
|
|
expected_output: str = "" |
|
|
|
|
|
class CFPJarvis1WithTools: |
|
|
""" |
|
|
CFP-Jarvis1 with ability to write and execute code |
|
|
""" |
|
|
|
|
|
def __init__(self, model_path="OpenGVLab/InternVL3-1B-hf", cache_dir="/media/jerem/641C8D6C1C8D3A56/hf_cache"): |
|
|
self.model_name = "CFP-Jarvis1-Tools" |
|
|
self.version = "1.0.0" |
|
|
self.base_model = model_path |
|
|
self.cache_dir = cache_dir |
|
|
|
|
|
|
|
|
os.environ['HF_HOME'] = cache_dir |
|
|
os.environ['TRANSFORMERS_CACHE'] = cache_dir |
|
|
|
|
|
self.processor = None |
|
|
self.model = None |
|
|
self.device = None |
|
|
self.dtype = None |
|
|
|
|
|
|
|
|
self.tools = { |
|
|
"python_executor": self.execute_python, |
|
|
"bash_executor": self.execute_bash, |
|
|
"web_scraper": self.create_web_scraper, |
|
|
"data_analyzer": self.analyze_data |
|
|
} |
|
|
|
|
|
print(f"π€ {self.model_name} v{self.version}") |
|
|
print(f"π§ Tools available: {list(self.tools.keys())}") |
|
|
|
|
|
def load_model(self): |
|
|
"""Load the model""" |
|
|
try: |
|
|
print(f"π₯ Loading {self.model_name}...") |
|
|
|
|
|
self.processor = AutoProcessor.from_pretrained( |
|
|
self.base_model, |
|
|
cache_dir=self.cache_dir, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
self.device = "cuda" |
|
|
self.dtype = torch.bfloat16 |
|
|
else: |
|
|
self.device = "cpu" |
|
|
self.dtype = torch.float32 |
|
|
|
|
|
self.model = AutoModelForImageTextToText.from_pretrained( |
|
|
self.base_model, |
|
|
torch_dtype=self.dtype, |
|
|
device_map="auto" if self.device == "cuda" else self.device, |
|
|
cache_dir=self.cache_dir, |
|
|
trust_remote_code=True, |
|
|
low_cpu_mem_usage=True |
|
|
).eval() |
|
|
|
|
|
print(f"β
Model loaded") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Failed to load: {e}") |
|
|
return False |
|
|
|
|
|
def process_request(self, request: str, image_path: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a request and decide if tool calling is needed |
|
|
""" |
|
|
if not self.model: |
|
|
if not self.load_model(): |
|
|
return {"error": "Failed to load model"} |
|
|
|
|
|
|
|
|
prompt = f"""You are CFP-Jarvis1, an AI assistant that can write and execute code. |
|
|
|
|
|
Task: {request} |
|
|
|
|
|
Analyze this task and decide: |
|
|
1. Do you need to write code to accomplish this? |
|
|
2. If yes, provide the code and specify the tool (python_executor, web_scraper, etc.) |
|
|
3. If no, provide a direct response |
|
|
|
|
|
Response format: |
|
|
{{ |
|
|
"needs_tool": true/false, |
|
|
"tool": "tool_name", |
|
|
"code": "code to execute", |
|
|
"reasoning": "why this approach" |
|
|
}}""" |
|
|
|
|
|
if image_path: |
|
|
image = Image.open(image_path).convert('RGB') |
|
|
else: |
|
|
|
|
|
image = Image.new('RGB', (100, 100), color='white') |
|
|
|
|
|
response = self._generate_response(image, prompt) |
|
|
|
|
|
try: |
|
|
|
|
|
result = self._parse_tool_response(response) |
|
|
|
|
|
if result.get("needs_tool", False): |
|
|
|
|
|
tool_result = self.execute_tool(result) |
|
|
return { |
|
|
"request": request, |
|
|
"tool_used": result.get("tool"), |
|
|
"code": result.get("code"), |
|
|
"result": tool_result, |
|
|
"reasoning": result.get("reasoning") |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"request": request, |
|
|
"response": result.get("reasoning", response), |
|
|
"tool_used": None |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"request": request, |
|
|
"error": str(e), |
|
|
"raw_response": response |
|
|
} |
|
|
|
|
|
def _generate_response(self, image, prompt): |
|
|
"""Generate model response""" |
|
|
try: |
|
|
messages = [{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{"type": "image", "image": image}, |
|
|
{"type": "text", "text": prompt} |
|
|
] |
|
|
}] |
|
|
|
|
|
text = self.processor.apply_chat_template( |
|
|
messages, |
|
|
tokenize=False, |
|
|
add_generation_prompt=True |
|
|
) |
|
|
|
|
|
inputs = self.processor( |
|
|
text=text, |
|
|
images=image, |
|
|
return_tensors="pt" |
|
|
) |
|
|
|
|
|
inputs = { |
|
|
k: v.to(self.device) if torch.is_tensor(v) else v |
|
|
for k, v in inputs.items() |
|
|
} |
|
|
|
|
|
if 'pixel_values' in inputs: |
|
|
inputs['pixel_values'] = inputs['pixel_values'].to(self.dtype) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=4096, |
|
|
do_sample=False, |
|
|
temperature=0.7, |
|
|
repetition_penalty=1.1 |
|
|
) |
|
|
|
|
|
response = self.processor.decode( |
|
|
outputs[0][inputs['input_ids'].shape[1]:], |
|
|
skip_special_tokens=True |
|
|
) |
|
|
|
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def _parse_tool_response(self, response: str) -> Dict[str, Any]: |
|
|
"""Parse the model's response to extract tool call""" |
|
|
|
|
|
try: |
|
|
|
|
|
import re |
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL) |
|
|
if json_match: |
|
|
return json.loads(json_match.group()) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
result = { |
|
|
"needs_tool": False, |
|
|
"reasoning": response |
|
|
} |
|
|
|
|
|
|
|
|
if "```python" in response: |
|
|
code_match = re.search(r'```python\n(.*?)\n```', response, re.DOTALL) |
|
|
if code_match: |
|
|
result["needs_tool"] = True |
|
|
result["tool"] = "python_executor" |
|
|
result["code"] = code_match.group(1) |
|
|
elif "```bash" in response: |
|
|
code_match = re.search(r'```bash\n(.*?)\n```', response, re.DOTALL) |
|
|
if code_match: |
|
|
result["needs_tool"] = True |
|
|
result["tool"] = "bash_executor" |
|
|
result["code"] = code_match.group(1) |
|
|
|
|
|
return result |
|
|
|
|
|
def execute_tool(self, tool_call: Dict[str, Any]) -> Any: |
|
|
"""Execute a tool based on the call""" |
|
|
tool_name = tool_call.get("tool", "python_executor") |
|
|
code = tool_call.get("code", "") |
|
|
|
|
|
if tool_name in self.tools: |
|
|
return self.tools[tool_name](code) |
|
|
else: |
|
|
return f"Unknown tool: {tool_name}" |
|
|
|
|
|
def execute_python(self, code: str) -> Dict[str, Any]: |
|
|
"""Execute Python code safely""" |
|
|
print(f"π Executing Python code...") |
|
|
|
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: |
|
|
f.write(code) |
|
|
temp_file = f.name |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
[sys.executable, temp_file], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
|
|
|
os.unlink(temp_file) |
|
|
|
|
|
return { |
|
|
"status": "success" if result.returncode == 0 else "error", |
|
|
"stdout": result.stdout, |
|
|
"stderr": result.stderr, |
|
|
"code": result.returncode |
|
|
} |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return {"status": "error", "error": "Code execution timeout"} |
|
|
except Exception as e: |
|
|
return {"status": "error", "error": str(e)} |
|
|
|
|
|
def execute_bash(self, code: str) -> Dict[str, Any]: |
|
|
"""Execute bash commands""" |
|
|
print(f"π§ Executing bash command...") |
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
code, |
|
|
shell=True, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30 |
|
|
) |
|
|
|
|
|
return { |
|
|
"status": "success" if result.returncode == 0 else "error", |
|
|
"stdout": result.stdout, |
|
|
"stderr": result.stderr, |
|
|
"code": result.returncode |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return {"status": "error", "error": str(e)} |
|
|
|
|
|
def create_web_scraper(self, code: str) -> Dict[str, Any]: |
|
|
"""Create and execute a web scraper""" |
|
|
print(f"π Creating web scraper...") |
|
|
|
|
|
|
|
|
full_code = """ |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import json |
|
|
import pandas as pd |
|
|
|
|
|
""" + code |
|
|
|
|
|
return self.execute_python(full_code) |
|
|
|
|
|
def analyze_data(self, code: str) -> Dict[str, Any]: |
|
|
"""Analyze data with pandas/numpy""" |
|
|
print(f"π Analyzing data...") |
|
|
|
|
|
|
|
|
full_code = """ |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import json |
|
|
from datetime import datetime |
|
|
|
|
|
""" + code |
|
|
|
|
|
return self.execute_python(full_code) |
|
|
|
|
|
def leboncoin_investment_analyzer(): |
|
|
""" |
|
|
Example: Analyze Leboncoin for investment properties |
|
|
""" |
|
|
|
|
|
jarvis = CFPJarvis1WithTools() |
|
|
|
|
|
|
|
|
request = """ |
|
|
Write a Python script to: |
|
|
1. Scrape Leboncoin apartments for sale in Paris |
|
|
2. Calculate investment potential based on: |
|
|
- Price per mΒ² |
|
|
- Location score |
|
|
- Size and rooms ratio |
|
|
3. Return top 50 properties with best potential |
|
|
|
|
|
Use requests and BeautifulSoup for scraping. |
|
|
Return results as JSON with property details and investment score. |
|
|
""" |
|
|
|
|
|
|
|
|
result = jarvis.process_request(request) |
|
|
|
|
|
if result.get("tool_used"): |
|
|
print(f"\nπ§ Tool used: {result['tool_used']}") |
|
|
print(f"\nπ Generated code:") |
|
|
print(result.get("code", "")) |
|
|
print(f"\nπ Execution result:") |
|
|
print(result.get("result", {})) |
|
|
|
|
|
return result |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import sys |
|
|
|
|
|
if len(sys.argv) > 1: |
|
|
|
|
|
jarvis = CFPJarvis1WithTools() |
|
|
request = " ".join(sys.argv[1:]) |
|
|
result = jarvis.process_request(request) |
|
|
|
|
|
print(json.dumps(result, indent=2)) |
|
|
else: |
|
|
|
|
|
print("π Analyzing Leboncoin investment properties...") |
|
|
leboncoin_investment_analyzer() |