shethjenil's picture
Upload 2 files
04f1ca5 verified
# backend.py
import json
import threading
import queue
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple
from playwright._impl._api_structures import StorageState
import cv2
import numpy as np
from playwright.sync_api import sync_playwright, Page, BrowserContext
from playwright_stealth import Stealth
from PIL import Image, ImageDraw
from io import BytesIO
# ─────────────────────────────
# Global Queues (worker <-> UI)
# ─────────────────────────────
task_queue: "queue.Queue[dict]" = queue.Queue()
result_queue: "queue.Queue[Tuple[str, Optional[Image.Image]]]" = queue.Queue()
# ─────────────────────────────
# Browser State + Lock
# ─────────────────────────────
@dataclass
class BrowserState:
running: bool = False
thread: Optional[threading.Thread] = None
pages: Dict[str, Page] = field(default_factory=dict) # tab_name -> Page
active_page: Optional[str] = None
network_logs: deque = field(default_factory=lambda: deque(maxlen=500))
console_logs: deque = field(default_factory=lambda: deque(maxlen=500))
recording: bool = False
macro: list = field(default_factory=list)
tab_counter: int = 0 # to generate stable names
BROWSER_STATE = BrowserState()
BROWSER_LOCK = threading.Lock()
# ─────────────────────────────
# Helper: Safe screenshot
# ─────────────────────────────
def take_screenshot(page: Optional[Page]) -> Optional[Image.Image]:
if page is None:
return None
try:
img_bytes = page.screenshot()
return Image.open(BytesIO(img_bytes))
except Exception:
return None
# ─────────────────────────────
# Worker Class (Playwright)
# ─────────────────────────────
class PlaywrightWorker:
def __init__(self, state: BrowserState, storage_state: StorageState=None):
self.state = state
self.context: Optional[BrowserContext] = None
self.storage_state = storage_state
# ---- Main loop ----
def run(self):
with Stealth().use_sync(sync_playwright()) as p:
browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
self.context = browser.new_context(storage_state=self.storage_state)
self._attach_network_listeners(self.context)
# default first tab
self._create_new_tab()
# Command dispatcher
handlers = {
"eval": self.handle_eval,
"goto": self.handle_goto,
"click": self.handle_click,
"click_xy": self.handle_click_xy,
"type": self.handle_type,
"new_tab": self.handle_new_tab,
"close_tab": self.handle_close_tab,
"switch_tab": self.handle_switch_tab,
"inspect": self.handle_inspect,
"get_network_logs": self.handle_get_network_logs,
"get_console_logs": self.handle_get_console_logs,
"clear_logs": self.handle_clear_logs,
"start_record": self.handle_start_record,
"stop_record": self.handle_stop_record,
"play_macro": self.handle_play_macro,
"take_screenshot": self.handle_take_screenshot,
"find_template": self.handle_find_template,
}
while True:
task = task_queue.get()
cmd = task.get("cmd")
if cmd == "__EXIT__":
break
result_text = ""
screenshot = None
try:
page = self._get_active_page()
# macro recording (xy-click ko record nahi kar rahe)
recordable_cmds = {
"goto", "click", "type", "new_tab",
"close_tab", "switch_tab"
}
if (
not task.get("from_macro", False)
and cmd in recordable_cmds
):
with BROWSER_LOCK:
if self.state.recording:
rec = {k: v for k, v in task.items() if k != "from_macro"}
self.state.macro.append(rec)
handler = handlers.get(cmd, None)
if handler is None:
result_text = f"Unknown command: {cmd}"
screenshot = take_screenshot(page)
else:
result_text, screenshot = handler(task, page)
except Exception as e:
result_text = f"Error: {type(e).__name__}: {e}"
result_queue.put((result_text, screenshot))
# graceful shutdown
try:
browser.close()
except Exception:
pass
# ---- Console & Network hooks ----
def _attach_console_listener(self, page: Page):
def on_console(msg):
with BROWSER_LOCK:
self.state.console_logs.append(f"[{msg.type}] {msg.text}")
page.on("console", on_console)
def _attach_network_listeners(self, context: BrowserContext):
def on_request(request):
with BROWSER_LOCK:
self.state.network_logs.append(f"[REQUEST] {request.method} {request.url}")
def on_response(response):
with BROWSER_LOCK:
self.state.network_logs.append(f"[RESPONSE] {response.status} {response.url}")
context.on("request", on_request)
context.on("response", on_response)
# ---- Tabs Management helpers ----
def _create_new_tab(self) -> Tuple[str, Page]:
page = self.context.new_page()
with BROWSER_LOCK:
self.state.tab_counter += 1
tab_name = f"Tab-{self.state.tab_counter}"
self.state.pages[tab_name] = page
self.state.active_page = tab_name
self._attach_console_listener(page)
return tab_name, page
def _get_active_page(self) -> Optional[Page]:
with BROWSER_LOCK:
name = self.state.active_page
page = self.state.pages.get(name) if name else None
return page
# ─────────────────────────────
# Command Handlers
# ─────────────────────────────
def handle_eval(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
code = task.get("code", "")
safe_globals = {"__builtins__": {}}
safe_locals = {}
try:
result = eval(code, safe_globals, safe_locals)
except Exception as e:
return f"Eval error: {type(e).__name__}: {e}", take_screenshot(page)
text = f"Eval result: {result!r}"
return text, take_screenshot(page)
def handle_goto(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
url = task.get("url", "")
try:
page.goto(url)
return f"Navigated to {url}", take_screenshot(page)
except Exception as e:
return f"Goto error: {e}", take_screenshot(page)
def handle_click(self, task: dict, page: Optional[Page]):
"""
CSS Selector Based Click (NOT XY click)
Used for:
β€’ Normal click button
β€’ Macro playback
"""
if page is None:
return "No active page.", None
selector = task.get("selector", "")
if not selector:
return "No selector provided.", take_screenshot(page)
try:
page.wait_for_selector(selector, timeout=5000)
page.click(selector)
return f"Clicked selector: {selector}", take_screenshot(page)
except Exception as e:
return f"Click error on '{selector}': {e}", take_screenshot(page)
def handle_click_xy(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
x = task.get("x")
y = task.get("y")
img_w = task.get("img_w")
img_h = task.get("img_h")
click_type = task.get("click_type", "left")
# viewport size
vp = page.viewport_size or {"width": img_w, "height": img_h}
real_x = x * (vp["width"] / img_w)
real_y = y * (vp["height"] / img_h)
if click_type == "left":
page.mouse.click(real_x, real_y)
elif click_type == "double":
page.mouse.dblclick(real_x, real_y)
elif click_type == "right":
page.mouse.click(real_x, real_y, button="right")
elif click_type == "hover":
page.mouse.move(real_x, real_y)
return f"{click_type} click at {real_x},{real_y}", take_screenshot(page)
def handle_type(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
selector = task.get("selector", "")
text = task.get("text", "")
try:
page.fill(selector, text)
return f"Typed into {selector}: {text}", take_screenshot(page)
except Exception as e:
return f"Type error: {e}", take_screenshot(page)
def handle_new_tab(self, task: dict, page: Optional[Page]):
tab_name, new_page = self._create_new_tab()
return f"Opened new tab: {tab_name}", take_screenshot(new_page)
def handle_close_tab(self, task: dict, page: Optional[Page]):
name = task.get("tab", "")
with BROWSER_LOCK:
if name in self.state.pages:
try:
self.state.pages[name].close()
except Exception:
pass
del self.state.pages[name]
msg = f"Closed {name}"
if self.state.active_page == name:
if self.state.pages:
self.state.active_page = list(self.state.pages.keys())[0]
else:
self.state.active_page = None
else:
msg = f"Tab {name} not found."
active_name = self.state.active_page
active_page = self.state.pages.get(active_name) if active_name else None
return msg, take_screenshot(active_page)
def handle_switch_tab(self, task: dict, page: Optional[Page]):
name = task.get("tab", "")
with BROWSER_LOCK:
if name in self.state.pages:
self.state.active_page = name
active_page = self.state.pages[name]
msg = f"Switched to {name}"
else:
active_page = self._get_active_page()
msg = f"Tab {name} not found."
return msg, take_screenshot(active_page)
def handle_inspect(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
selector = task.get("selector", "")
try:
el = page.query_selector(selector)
if not el:
return f"No element found for selector: {selector}", take_screenshot(page)
inner_text = el.inner_text()
inner_html = el.inner_html()
attrs = page.evaluate(
"""(el) => {
const out = {};
for (const a of el.attributes) out[a.name] = a.value;
return out;
}""",
el
)
xpath = page.evaluate(
"""(el) => {
function getXPath(node) {
if (node.id)
return 'id(\"' + node.id + '\")';
if (node === document.body)
return '/html/body';
let ix = 0;
const siblings = node.parentNode ? node.parentNode.childNodes : [];
for (let i=0; i<siblings.length; i++) {
const sibling = siblings[i];
if (sibling === node)
return getXPath(node.parentNode) + '/' + node.tagName.toLowerCase() + '[' + (ix+1) + ']';
if (sibling.nodeType === 1 && sibling.tagName === node.tagName)
ix++;
}
}
return getXPath(el);
}""",
el
)
info = (
f"Selector: {selector}\n"
f"XPath: {xpath}\n\n"
f"Inner Text:\n{inner_text}\n\n"
f"Attributes:\n{attrs}\n\n"
f"Inner HTML (truncated):\n{inner_html[:1000]}"
)
return info, take_screenshot(page)
except Exception as e:
return f"Inspect error: {e}", take_screenshot(page)
def handle_get_network_logs(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
logs = list(self.state.network_logs)[-100:]
text = "=== Network Logs (last 100) ===\n" + "\n".join(logs)
return text, take_screenshot(page)
def handle_get_console_logs(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
logs = list(self.state.console_logs)[-100:]
text = "=== Console Logs (last 100) ===\n" + "\n".join(logs)
return text, take_screenshot(page)
def handle_clear_logs(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
self.state.network_logs.clear()
self.state.console_logs.clear()
return "Network & console logs cleared.", take_screenshot(page)
def handle_start_record(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
self.state.recording = True
self.state.macro = []
return "Macro recording started.", take_screenshot(page)
def handle_stop_record(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
self.state.recording = False
steps = len(self.state.macro)
return f"Macro recording stopped. {steps} steps recorded.", take_screenshot(page)
def handle_play_macro(self, task: dict, page: Optional[Page]):
with BROWSER_LOCK:
macro_steps = list(self.state.macro)
if not macro_steps:
return "Macro is empty.", take_screenshot(self._get_active_page())
last_result = ""
current_page = self._get_active_page()
for step in macro_steps:
step_cmd = dict(step)
step_cmd["from_macro"] = True
cmd = step_cmd.get("cmd")
if cmd == "goto":
current_page = self._get_active_page()
last_result, _ = self.handle_goto(step_cmd, current_page)
elif cmd == "click":
current_page = self._get_active_page()
last_result, _ = self.handle_click(step_cmd, current_page)
elif cmd == "type":
current_page = self._get_active_page()
last_result, _ = self.handle_type(step_cmd, current_page)
elif cmd == "new_tab":
last_result, _ = self.handle_new_tab(step_cmd, current_page)
current_page = self._get_active_page()
elif cmd == "close_tab":
last_result, _ = self.handle_close_tab(step_cmd, current_page)
current_page = self._get_active_page()
elif cmd == "switch_tab":
last_result, _ = self.handle_switch_tab(step_cmd, current_page)
current_page = self._get_active_page()
final_page = self._get_active_page()
return f"Macro executed. {len(macro_steps)} steps.\nLast step: {last_result}", take_screenshot(final_page)
def handle_take_screenshot(self, task: dict, page: Optional[Page]):
if page is None:
return "No active page.", None
return "Screenshot captured.", take_screenshot(page)
def handle_find_template(self, task: dict, page: Optional[Page]):
template = task.get("template")
if page is None:
return "No active page.", None
if template is None:
return "Upload template first.", take_screenshot(page)
img = take_screenshot(page)
if img is None:
return "Screenshot error.", None
img_np = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
tpl_np = cv2.cvtColor(np.array(template), cv2.COLOR_RGB2GRAY)
res = cv2.matchTemplate(img_np, tpl_np, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(res)
if max_val < 0.55:
return f"No strong match. Score={max_val:.2f}", img
th, tw = tpl_np.shape
draw = ImageDraw.Draw(img)
draw.rectangle(
[
max_loc[0], max_loc[1],
max_loc[0] + tw, max_loc[1] + th
],
outline="yellow",
width=3
)
return f"Match: score={max_val:.2f}", img
# ─────────────────────────────
# Worker spawn / shutdown helpers
# ─────────────────────────────
def start_worker_thread(storage_state:StorageState):
worker = PlaywrightWorker(BROWSER_STATE,storage_state)
t = threading.Thread(target=worker.run, daemon=True)
t.start()
with BROWSER_LOCK:
BROWSER_STATE.thread = t
BROWSER_STATE.running = True
BROWSER_STATE.pages.clear()
BROWSER_STATE.active_page = None
BROWSER_STATE.network_logs.clear()
BROWSER_STATE.console_logs.clear()
BROWSER_STATE.macro.clear()
BROWSER_STATE.recording = False
BROWSER_STATE.tab_counter = 0
def stop_worker_thread():
with BROWSER_LOCK:
if not BROWSER_STATE.running:
return
BROWSER_STATE.running = False
t = BROWSER_STATE.thread
task_queue.put({"cmd": "__EXIT__"})
if t is not None:
t.join(timeout=5)