# backend.py import json import threading import queue from collections import deque from dataclasses import dataclass, field from typing import Dict, Optional, Tuple from playwright._impl._api_structures import StorageState import cv2 import numpy as np from playwright.sync_api import sync_playwright, Page, BrowserContext from playwright_stealth import Stealth from PIL import Image, ImageDraw from io import BytesIO # ───────────────────────────── # Global Queues (worker <-> UI) # ───────────────────────────── task_queue: "queue.Queue[dict]" = queue.Queue() result_queue: "queue.Queue[Tuple[str, Optional[Image.Image]]]" = queue.Queue() # ───────────────────────────── # Browser State + Lock # ───────────────────────────── @dataclass class BrowserState: running: bool = False thread: Optional[threading.Thread] = None pages: Dict[str, Page] = field(default_factory=dict) # tab_name -> Page active_page: Optional[str] = None network_logs: deque = field(default_factory=lambda: deque(maxlen=500)) console_logs: deque = field(default_factory=lambda: deque(maxlen=500)) recording: bool = False macro: list = field(default_factory=list) tab_counter: int = 0 # to generate stable names BROWSER_STATE = BrowserState() BROWSER_LOCK = threading.Lock() # ───────────────────────────── # Helper: Safe screenshot # ───────────────────────────── def take_screenshot(page: Optional[Page]) -> Optional[Image.Image]: if page is None: return None try: img_bytes = page.screenshot() return Image.open(BytesIO(img_bytes)) except Exception: return None # ───────────────────────────── # Worker Class (Playwright) # ───────────────────────────── class PlaywrightWorker: def __init__(self, state: BrowserState, storage_state: StorageState=None): self.state = state self.context: Optional[BrowserContext] = None self.storage_state = storage_state # ---- Main loop ---- def run(self): with Stealth().use_sync(sync_playwright()) as p: browser = p.chromium.launch(headless=True, args=["--no-sandbox"]) self.context = browser.new_context(storage_state=self.storage_state) self._attach_network_listeners(self.context) # default first tab self._create_new_tab() # Command dispatcher handlers = { "eval": self.handle_eval, "goto": self.handle_goto, "click": self.handle_click, "click_xy": self.handle_click_xy, "type": self.handle_type, "new_tab": self.handle_new_tab, "close_tab": self.handle_close_tab, "switch_tab": self.handle_switch_tab, "inspect": self.handle_inspect, "get_network_logs": self.handle_get_network_logs, "get_console_logs": self.handle_get_console_logs, "clear_logs": self.handle_clear_logs, "start_record": self.handle_start_record, "stop_record": self.handle_stop_record, "play_macro": self.handle_play_macro, "take_screenshot": self.handle_take_screenshot, "find_template": self.handle_find_template, } while True: task = task_queue.get() cmd = task.get("cmd") if cmd == "__EXIT__": break result_text = "" screenshot = None try: page = self._get_active_page() # macro recording (xy-click ko record nahi kar rahe) recordable_cmds = { "goto", "click", "type", "new_tab", "close_tab", "switch_tab" } if ( not task.get("from_macro", False) and cmd in recordable_cmds ): with BROWSER_LOCK: if self.state.recording: rec = {k: v for k, v in task.items() if k != "from_macro"} self.state.macro.append(rec) handler = handlers.get(cmd, None) if handler is None: result_text = f"Unknown command: {cmd}" screenshot = take_screenshot(page) else: result_text, screenshot = handler(task, page) except Exception as e: result_text = f"Error: {type(e).__name__}: {e}" result_queue.put((result_text, screenshot)) # graceful shutdown try: browser.close() except Exception: pass # ---- Console & Network hooks ---- def _attach_console_listener(self, page: Page): def on_console(msg): with BROWSER_LOCK: self.state.console_logs.append(f"[{msg.type}] {msg.text}") page.on("console", on_console) def _attach_network_listeners(self, context: BrowserContext): def on_request(request): with BROWSER_LOCK: self.state.network_logs.append(f"[REQUEST] {request.method} {request.url}") def on_response(response): with BROWSER_LOCK: self.state.network_logs.append(f"[RESPONSE] {response.status} {response.url}") context.on("request", on_request) context.on("response", on_response) # ---- Tabs Management helpers ---- def _create_new_tab(self) -> Tuple[str, Page]: page = self.context.new_page() with BROWSER_LOCK: self.state.tab_counter += 1 tab_name = f"Tab-{self.state.tab_counter}" self.state.pages[tab_name] = page self.state.active_page = tab_name self._attach_console_listener(page) return tab_name, page def _get_active_page(self) -> Optional[Page]: with BROWSER_LOCK: name = self.state.active_page page = self.state.pages.get(name) if name else None return page # ───────────────────────────── # Command Handlers # ───────────────────────────── def handle_eval(self, task: dict, page: Optional[Page]): if page is None: return "No active page.", None code = task.get("code", "") safe_globals = {"__builtins__": {}} safe_locals = {} try: result = eval(code, safe_globals, safe_locals) except Exception as e: return f"Eval error: {type(e).__name__}: {e}", take_screenshot(page) text = f"Eval result: {result!r}" return text, take_screenshot(page) def handle_goto(self, task: dict, page: Optional[Page]): if page is None: return "No active page.", None url = task.get("url", "") try: page.goto(url) return f"Navigated to {url}", take_screenshot(page) except Exception as e: return f"Goto error: {e}", take_screenshot(page) def handle_click(self, task: dict, page: Optional[Page]): """ CSS Selector Based Click (NOT XY click) Used for: • Normal click button • Macro playback """ if page is None: return "No active page.", None selector = task.get("selector", "") if not selector: return "No selector provided.", take_screenshot(page) try: page.wait_for_selector(selector, timeout=5000) page.click(selector) return f"Clicked selector: {selector}", take_screenshot(page) except Exception as e: return f"Click error on '{selector}': {e}", take_screenshot(page) def handle_click_xy(self, task: dict, page: Optional[Page]): if page is None: return "No active page.", None x = task.get("x") y = task.get("y") img_w = task.get("img_w") img_h = task.get("img_h") click_type = task.get("click_type", "left") # viewport size vp = page.viewport_size or {"width": img_w, "height": img_h} real_x = x * (vp["width"] / img_w) real_y = y * (vp["height"] / img_h) if click_type == "left": page.mouse.click(real_x, real_y) elif click_type == "double": page.mouse.dblclick(real_x, real_y) elif click_type == "right": page.mouse.click(real_x, real_y, button="right") elif click_type == "hover": page.mouse.move(real_x, real_y) return f"{click_type} click at {real_x},{real_y}", take_screenshot(page) def handle_type(self, task: dict, page: Optional[Page]): if page is None: return "No active page.", None selector = task.get("selector", "") text = task.get("text", "") try: page.fill(selector, text) return f"Typed into {selector}: {text}", take_screenshot(page) except Exception as e: return f"Type error: {e}", take_screenshot(page) def handle_new_tab(self, task: dict, page: Optional[Page]): tab_name, new_page = self._create_new_tab() return f"Opened new tab: {tab_name}", take_screenshot(new_page) def handle_close_tab(self, task: dict, page: Optional[Page]): name = task.get("tab", "") with BROWSER_LOCK: if name in self.state.pages: try: self.state.pages[name].close() except Exception: pass del self.state.pages[name] msg = f"Closed {name}" if self.state.active_page == name: if self.state.pages: self.state.active_page = list(self.state.pages.keys())[0] else: self.state.active_page = None else: msg = f"Tab {name} not found." active_name = self.state.active_page active_page = self.state.pages.get(active_name) if active_name else None return msg, take_screenshot(active_page) def handle_switch_tab(self, task: dict, page: Optional[Page]): name = task.get("tab", "") with BROWSER_LOCK: if name in self.state.pages: self.state.active_page = name active_page = self.state.pages[name] msg = f"Switched to {name}" else: active_page = self._get_active_page() msg = f"Tab {name} not found." return msg, take_screenshot(active_page) def handle_inspect(self, task: dict, page: Optional[Page]): if page is None: return "No active page.", None selector = task.get("selector", "") try: el = page.query_selector(selector) if not el: return f"No element found for selector: {selector}", take_screenshot(page) inner_text = el.inner_text() inner_html = el.inner_html() attrs = page.evaluate( """(el) => { const out = {}; for (const a of el.attributes) out[a.name] = a.value; return out; }""", el ) xpath = page.evaluate( """(el) => { function getXPath(node) { if (node.id) return 'id(\"' + node.id + '\")'; if (node === document.body) return '/html/body'; let ix = 0; const siblings = node.parentNode ? node.parentNode.childNodes : []; for (let i=0; i