|
|
|
|
|
|
|
|
import json
|
|
|
import threading
|
|
|
import queue
|
|
|
from collections import deque
|
|
|
from dataclasses import dataclass, field
|
|
|
from typing import Dict, Optional, Tuple
|
|
|
from playwright._impl._api_structures import StorageState
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from playwright.sync_api import sync_playwright, Page, BrowserContext
|
|
|
from playwright_stealth import Stealth
|
|
|
from PIL import Image, ImageDraw
|
|
|
from io import BytesIO
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
task_queue: "queue.Queue[dict]" = queue.Queue()
|
|
|
result_queue: "queue.Queue[Tuple[str, Optional[Image.Image]]]" = queue.Queue()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class BrowserState:
|
|
|
running: bool = False
|
|
|
thread: Optional[threading.Thread] = None
|
|
|
pages: Dict[str, Page] = field(default_factory=dict)
|
|
|
active_page: Optional[str] = None
|
|
|
network_logs: deque = field(default_factory=lambda: deque(maxlen=500))
|
|
|
console_logs: deque = field(default_factory=lambda: deque(maxlen=500))
|
|
|
recording: bool = False
|
|
|
macro: list = field(default_factory=list)
|
|
|
tab_counter: int = 0
|
|
|
|
|
|
|
|
|
BROWSER_STATE = BrowserState()
|
|
|
BROWSER_LOCK = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def take_screenshot(page: Optional[Page]) -> Optional[Image.Image]:
|
|
|
if page is None:
|
|
|
return None
|
|
|
try:
|
|
|
img_bytes = page.screenshot()
|
|
|
return Image.open(BytesIO(img_bytes))
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PlaywrightWorker:
|
|
|
def __init__(self, state: BrowserState, storage_state: StorageState=None):
|
|
|
self.state = state
|
|
|
self.context: Optional[BrowserContext] = None
|
|
|
self.storage_state = storage_state
|
|
|
|
|
|
def run(self):
|
|
|
with Stealth().use_sync(sync_playwright()) as p:
|
|
|
browser = p.chromium.launch(headless=True, args=["--no-sandbox"])
|
|
|
self.context = browser.new_context(storage_state=self.storage_state)
|
|
|
self._attach_network_listeners(self.context)
|
|
|
|
|
|
|
|
|
self._create_new_tab()
|
|
|
|
|
|
|
|
|
handlers = {
|
|
|
"eval": self.handle_eval,
|
|
|
"goto": self.handle_goto,
|
|
|
"click": self.handle_click,
|
|
|
"click_xy": self.handle_click_xy,
|
|
|
"type": self.handle_type,
|
|
|
"new_tab": self.handle_new_tab,
|
|
|
"close_tab": self.handle_close_tab,
|
|
|
"switch_tab": self.handle_switch_tab,
|
|
|
"inspect": self.handle_inspect,
|
|
|
"get_network_logs": self.handle_get_network_logs,
|
|
|
"get_console_logs": self.handle_get_console_logs,
|
|
|
"clear_logs": self.handle_clear_logs,
|
|
|
"start_record": self.handle_start_record,
|
|
|
"stop_record": self.handle_stop_record,
|
|
|
"play_macro": self.handle_play_macro,
|
|
|
"take_screenshot": self.handle_take_screenshot,
|
|
|
"find_template": self.handle_find_template,
|
|
|
}
|
|
|
|
|
|
while True:
|
|
|
task = task_queue.get()
|
|
|
cmd = task.get("cmd")
|
|
|
|
|
|
if cmd == "__EXIT__":
|
|
|
break
|
|
|
|
|
|
result_text = ""
|
|
|
screenshot = None
|
|
|
|
|
|
try:
|
|
|
page = self._get_active_page()
|
|
|
|
|
|
|
|
|
recordable_cmds = {
|
|
|
"goto", "click", "type", "new_tab",
|
|
|
"close_tab", "switch_tab"
|
|
|
}
|
|
|
if (
|
|
|
not task.get("from_macro", False)
|
|
|
and cmd in recordable_cmds
|
|
|
):
|
|
|
with BROWSER_LOCK:
|
|
|
if self.state.recording:
|
|
|
rec = {k: v for k, v in task.items() if k != "from_macro"}
|
|
|
self.state.macro.append(rec)
|
|
|
|
|
|
handler = handlers.get(cmd, None)
|
|
|
if handler is None:
|
|
|
result_text = f"Unknown command: {cmd}"
|
|
|
screenshot = take_screenshot(page)
|
|
|
else:
|
|
|
result_text, screenshot = handler(task, page)
|
|
|
|
|
|
except Exception as e:
|
|
|
result_text = f"Error: {type(e).__name__}: {e}"
|
|
|
|
|
|
result_queue.put((result_text, screenshot))
|
|
|
|
|
|
|
|
|
try:
|
|
|
browser.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
def _attach_console_listener(self, page: Page):
|
|
|
def on_console(msg):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.console_logs.append(f"[{msg.type}] {msg.text}")
|
|
|
page.on("console", on_console)
|
|
|
|
|
|
def _attach_network_listeners(self, context: BrowserContext):
|
|
|
def on_request(request):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.network_logs.append(f"[REQUEST] {request.method} {request.url}")
|
|
|
|
|
|
def on_response(response):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.network_logs.append(f"[RESPONSE] {response.status} {response.url}")
|
|
|
|
|
|
context.on("request", on_request)
|
|
|
context.on("response", on_response)
|
|
|
|
|
|
|
|
|
def _create_new_tab(self) -> Tuple[str, Page]:
|
|
|
page = self.context.new_page()
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.tab_counter += 1
|
|
|
tab_name = f"Tab-{self.state.tab_counter}"
|
|
|
self.state.pages[tab_name] = page
|
|
|
self.state.active_page = tab_name
|
|
|
self._attach_console_listener(page)
|
|
|
return tab_name, page
|
|
|
|
|
|
def _get_active_page(self) -> Optional[Page]:
|
|
|
with BROWSER_LOCK:
|
|
|
name = self.state.active_page
|
|
|
page = self.state.pages.get(name) if name else None
|
|
|
return page
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_eval(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
|
|
|
code = task.get("code", "")
|
|
|
safe_globals = {"__builtins__": {}}
|
|
|
safe_locals = {}
|
|
|
|
|
|
try:
|
|
|
result = eval(code, safe_globals, safe_locals)
|
|
|
except Exception as e:
|
|
|
return f"Eval error: {type(e).__name__}: {e}", take_screenshot(page)
|
|
|
|
|
|
text = f"Eval result: {result!r}"
|
|
|
return text, take_screenshot(page)
|
|
|
|
|
|
def handle_goto(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
url = task.get("url", "")
|
|
|
try:
|
|
|
page.goto(url)
|
|
|
return f"Navigated to {url}", take_screenshot(page)
|
|
|
except Exception as e:
|
|
|
return f"Goto error: {e}", take_screenshot(page)
|
|
|
|
|
|
def handle_click(self, task: dict, page: Optional[Page]):
|
|
|
"""
|
|
|
CSS Selector Based Click (NOT XY click)
|
|
|
Used for:
|
|
|
β’ Normal click button
|
|
|
β’ Macro playback
|
|
|
"""
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
|
|
|
selector = task.get("selector", "")
|
|
|
if not selector:
|
|
|
return "No selector provided.", take_screenshot(page)
|
|
|
|
|
|
try:
|
|
|
page.wait_for_selector(selector, timeout=5000)
|
|
|
page.click(selector)
|
|
|
return f"Clicked selector: {selector}", take_screenshot(page)
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Click error on '{selector}': {e}", take_screenshot(page)
|
|
|
|
|
|
def handle_click_xy(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
|
|
|
x = task.get("x")
|
|
|
y = task.get("y")
|
|
|
img_w = task.get("img_w")
|
|
|
img_h = task.get("img_h")
|
|
|
click_type = task.get("click_type", "left")
|
|
|
|
|
|
|
|
|
vp = page.viewport_size or {"width": img_w, "height": img_h}
|
|
|
|
|
|
real_x = x * (vp["width"] / img_w)
|
|
|
real_y = y * (vp["height"] / img_h)
|
|
|
|
|
|
if click_type == "left":
|
|
|
page.mouse.click(real_x, real_y)
|
|
|
elif click_type == "double":
|
|
|
page.mouse.dblclick(real_x, real_y)
|
|
|
elif click_type == "right":
|
|
|
page.mouse.click(real_x, real_y, button="right")
|
|
|
elif click_type == "hover":
|
|
|
page.mouse.move(real_x, real_y)
|
|
|
|
|
|
return f"{click_type} click at {real_x},{real_y}", take_screenshot(page)
|
|
|
|
|
|
def handle_type(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
selector = task.get("selector", "")
|
|
|
text = task.get("text", "")
|
|
|
try:
|
|
|
page.fill(selector, text)
|
|
|
return f"Typed into {selector}: {text}", take_screenshot(page)
|
|
|
except Exception as e:
|
|
|
return f"Type error: {e}", take_screenshot(page)
|
|
|
|
|
|
def handle_new_tab(self, task: dict, page: Optional[Page]):
|
|
|
tab_name, new_page = self._create_new_tab()
|
|
|
return f"Opened new tab: {tab_name}", take_screenshot(new_page)
|
|
|
|
|
|
def handle_close_tab(self, task: dict, page: Optional[Page]):
|
|
|
name = task.get("tab", "")
|
|
|
with BROWSER_LOCK:
|
|
|
if name in self.state.pages:
|
|
|
try:
|
|
|
self.state.pages[name].close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
del self.state.pages[name]
|
|
|
msg = f"Closed {name}"
|
|
|
if self.state.active_page == name:
|
|
|
if self.state.pages:
|
|
|
self.state.active_page = list(self.state.pages.keys())[0]
|
|
|
else:
|
|
|
self.state.active_page = None
|
|
|
else:
|
|
|
msg = f"Tab {name} not found."
|
|
|
|
|
|
active_name = self.state.active_page
|
|
|
active_page = self.state.pages.get(active_name) if active_name else None
|
|
|
|
|
|
return msg, take_screenshot(active_page)
|
|
|
|
|
|
def handle_switch_tab(self, task: dict, page: Optional[Page]):
|
|
|
name = task.get("tab", "")
|
|
|
with BROWSER_LOCK:
|
|
|
if name in self.state.pages:
|
|
|
self.state.active_page = name
|
|
|
active_page = self.state.pages[name]
|
|
|
msg = f"Switched to {name}"
|
|
|
else:
|
|
|
active_page = self._get_active_page()
|
|
|
msg = f"Tab {name} not found."
|
|
|
|
|
|
return msg, take_screenshot(active_page)
|
|
|
|
|
|
def handle_inspect(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
selector = task.get("selector", "")
|
|
|
try:
|
|
|
el = page.query_selector(selector)
|
|
|
if not el:
|
|
|
return f"No element found for selector: {selector}", take_screenshot(page)
|
|
|
|
|
|
inner_text = el.inner_text()
|
|
|
inner_html = el.inner_html()
|
|
|
attrs = page.evaluate(
|
|
|
"""(el) => {
|
|
|
const out = {};
|
|
|
for (const a of el.attributes) out[a.name] = a.value;
|
|
|
return out;
|
|
|
}""",
|
|
|
el
|
|
|
)
|
|
|
xpath = page.evaluate(
|
|
|
"""(el) => {
|
|
|
function getXPath(node) {
|
|
|
if (node.id)
|
|
|
return 'id(\"' + node.id + '\")';
|
|
|
if (node === document.body)
|
|
|
return '/html/body';
|
|
|
let ix = 0;
|
|
|
const siblings = node.parentNode ? node.parentNode.childNodes : [];
|
|
|
for (let i=0; i<siblings.length; i++) {
|
|
|
const sibling = siblings[i];
|
|
|
if (sibling === node)
|
|
|
return getXPath(node.parentNode) + '/' + node.tagName.toLowerCase() + '[' + (ix+1) + ']';
|
|
|
if (sibling.nodeType === 1 && sibling.tagName === node.tagName)
|
|
|
ix++;
|
|
|
}
|
|
|
}
|
|
|
return getXPath(el);
|
|
|
}""",
|
|
|
el
|
|
|
)
|
|
|
|
|
|
info = (
|
|
|
f"Selector: {selector}\n"
|
|
|
f"XPath: {xpath}\n\n"
|
|
|
f"Inner Text:\n{inner_text}\n\n"
|
|
|
f"Attributes:\n{attrs}\n\n"
|
|
|
f"Inner HTML (truncated):\n{inner_html[:1000]}"
|
|
|
)
|
|
|
return info, take_screenshot(page)
|
|
|
except Exception as e:
|
|
|
return f"Inspect error: {e}", take_screenshot(page)
|
|
|
|
|
|
def handle_get_network_logs(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
logs = list(self.state.network_logs)[-100:]
|
|
|
text = "=== Network Logs (last 100) ===\n" + "\n".join(logs)
|
|
|
return text, take_screenshot(page)
|
|
|
|
|
|
def handle_get_console_logs(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
logs = list(self.state.console_logs)[-100:]
|
|
|
text = "=== Console Logs (last 100) ===\n" + "\n".join(logs)
|
|
|
return text, take_screenshot(page)
|
|
|
|
|
|
def handle_clear_logs(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.network_logs.clear()
|
|
|
self.state.console_logs.clear()
|
|
|
return "Network & console logs cleared.", take_screenshot(page)
|
|
|
|
|
|
def handle_start_record(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.recording = True
|
|
|
self.state.macro = []
|
|
|
return "Macro recording started.", take_screenshot(page)
|
|
|
|
|
|
def handle_stop_record(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
self.state.recording = False
|
|
|
steps = len(self.state.macro)
|
|
|
return f"Macro recording stopped. {steps} steps recorded.", take_screenshot(page)
|
|
|
|
|
|
def handle_play_macro(self, task: dict, page: Optional[Page]):
|
|
|
with BROWSER_LOCK:
|
|
|
macro_steps = list(self.state.macro)
|
|
|
|
|
|
if not macro_steps:
|
|
|
return "Macro is empty.", take_screenshot(self._get_active_page())
|
|
|
|
|
|
last_result = ""
|
|
|
current_page = self._get_active_page()
|
|
|
|
|
|
for step in macro_steps:
|
|
|
step_cmd = dict(step)
|
|
|
step_cmd["from_macro"] = True
|
|
|
cmd = step_cmd.get("cmd")
|
|
|
|
|
|
if cmd == "goto":
|
|
|
current_page = self._get_active_page()
|
|
|
last_result, _ = self.handle_goto(step_cmd, current_page)
|
|
|
elif cmd == "click":
|
|
|
current_page = self._get_active_page()
|
|
|
last_result, _ = self.handle_click(step_cmd, current_page)
|
|
|
elif cmd == "type":
|
|
|
current_page = self._get_active_page()
|
|
|
last_result, _ = self.handle_type(step_cmd, current_page)
|
|
|
elif cmd == "new_tab":
|
|
|
last_result, _ = self.handle_new_tab(step_cmd, current_page)
|
|
|
current_page = self._get_active_page()
|
|
|
elif cmd == "close_tab":
|
|
|
last_result, _ = self.handle_close_tab(step_cmd, current_page)
|
|
|
current_page = self._get_active_page()
|
|
|
elif cmd == "switch_tab":
|
|
|
last_result, _ = self.handle_switch_tab(step_cmd, current_page)
|
|
|
current_page = self._get_active_page()
|
|
|
|
|
|
final_page = self._get_active_page()
|
|
|
return f"Macro executed. {len(macro_steps)} steps.\nLast step: {last_result}", take_screenshot(final_page)
|
|
|
|
|
|
def handle_take_screenshot(self, task: dict, page: Optional[Page]):
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
return "Screenshot captured.", take_screenshot(page)
|
|
|
|
|
|
def handle_find_template(self, task: dict, page: Optional[Page]):
|
|
|
template = task.get("template")
|
|
|
if page is None:
|
|
|
return "No active page.", None
|
|
|
if template is None:
|
|
|
return "Upload template first.", take_screenshot(page)
|
|
|
|
|
|
img = take_screenshot(page)
|
|
|
if img is None:
|
|
|
return "Screenshot error.", None
|
|
|
|
|
|
img_np = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2GRAY)
|
|
|
tpl_np = cv2.cvtColor(np.array(template), cv2.COLOR_RGB2GRAY)
|
|
|
|
|
|
res = cv2.matchTemplate(img_np, tpl_np, cv2.TM_CCOEFF_NORMED)
|
|
|
_, max_val, _, max_loc = cv2.minMaxLoc(res)
|
|
|
|
|
|
if max_val < 0.55:
|
|
|
return f"No strong match. Score={max_val:.2f}", img
|
|
|
|
|
|
th, tw = tpl_np.shape
|
|
|
|
|
|
draw = ImageDraw.Draw(img)
|
|
|
draw.rectangle(
|
|
|
[
|
|
|
max_loc[0], max_loc[1],
|
|
|
max_loc[0] + tw, max_loc[1] + th
|
|
|
],
|
|
|
outline="yellow",
|
|
|
width=3
|
|
|
)
|
|
|
|
|
|
return f"Match: score={max_val:.2f}", img
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_worker_thread(storage_state:StorageState):
|
|
|
worker = PlaywrightWorker(BROWSER_STATE,storage_state)
|
|
|
t = threading.Thread(target=worker.run, daemon=True)
|
|
|
t.start()
|
|
|
with BROWSER_LOCK:
|
|
|
BROWSER_STATE.thread = t
|
|
|
BROWSER_STATE.running = True
|
|
|
BROWSER_STATE.pages.clear()
|
|
|
BROWSER_STATE.active_page = None
|
|
|
BROWSER_STATE.network_logs.clear()
|
|
|
BROWSER_STATE.console_logs.clear()
|
|
|
BROWSER_STATE.macro.clear()
|
|
|
BROWSER_STATE.recording = False
|
|
|
BROWSER_STATE.tab_counter = 0
|
|
|
|
|
|
|
|
|
def stop_worker_thread():
|
|
|
with BROWSER_LOCK:
|
|
|
if not BROWSER_STATE.running:
|
|
|
return
|
|
|
BROWSER_STATE.running = False
|
|
|
t = BROWSER_STATE.thread
|
|
|
|
|
|
task_queue.put({"cmd": "__EXIT__"})
|
|
|
if t is not None:
|
|
|
t.join(timeout=5) |