407 lines
15 KiB
Python
407 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
import httpx
|
|
from browser_use_sdk import BrowserUse
|
|
except ImportError as exc: # pragma: no cover
|
|
raise SystemExit(
|
|
"Missing dependency. Install `browser-use-sdk` first, e.g. `uv pip install browser-use-sdk`."
|
|
) from exc
|
|
|
|
|
|
DEFAULT_PROXY_COUNTRY = "us"
|
|
DEFAULT_BROWSER_TIMEOUT = 240
|
|
DEFAULT_WAIT_MS = 7000
|
|
DEFAULT_LOCALHOST_PATTERN = "http://localhost:1455/*"
|
|
DEFAULT_CALLBACK_FRAGMENT = "http://localhost:1455/auth/callback"
|
|
DEFAULT_LOGIN_EMAIL_SELECTOR = "input[name='username'], input[type='email']"
|
|
DEFAULT_PASSWORD_SELECTOR = "input[type='password']"
|
|
DEFAULT_CODE_SELECTOR = "input"
|
|
DEFAULT_SUBMIT_SELECTOR = "button[type='submit']"
|
|
DEFAULT_CONSENT_URL_FRAGMENT = "/sign-in-with-chatgpt/codex/consent"
|
|
DEFAULT_VERIFICATION_URL_FRAGMENT = "/email-verification"
|
|
|
|
|
|
@dataclass
|
|
class BrowserUseSession:
|
|
id: str
|
|
live_url: str | None
|
|
cdp_http_url: str
|
|
websocket_url: str
|
|
timeout_at: str | None
|
|
|
|
|
|
@dataclass
|
|
class CommandResult:
|
|
command: list[str]
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
parsed: Any | None = None
|
|
|
|
|
|
class AgentBrowserRunner:
|
|
def __init__(self, binary: str, session: str, verbose: bool = False) -> None:
|
|
self.binary = binary
|
|
self.session = session
|
|
self.verbose = verbose
|
|
|
|
def run(self, *args: str, expect_json: bool = True, retries: int = 0) -> CommandResult:
|
|
attempt = 0
|
|
while True:
|
|
command = [self.binary, "--session", self.session]
|
|
if expect_json:
|
|
command.append("--json")
|
|
command.extend(args)
|
|
|
|
if self.verbose:
|
|
print("+", " ".join(command), file=sys.stderr)
|
|
|
|
completed = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
stdout = completed.stdout.strip()
|
|
stderr = completed.stderr.strip()
|
|
parsed = None
|
|
|
|
if expect_json and stdout:
|
|
try:
|
|
parsed = json.loads(stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(
|
|
f"Failed to parse JSON from agent-browser for command {args!r}: {stdout}"
|
|
) from exc
|
|
|
|
result = CommandResult(
|
|
command=command,
|
|
returncode=completed.returncode,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
parsed=parsed,
|
|
)
|
|
|
|
failed = completed.returncode != 0 or (
|
|
expect_json and isinstance(parsed, dict) and not parsed.get("success", False)
|
|
)
|
|
if not failed:
|
|
return result
|
|
|
|
if attempt < retries and self._is_transient_failure(result):
|
|
attempt += 1
|
|
time.sleep(1.0)
|
|
continue
|
|
|
|
raise RuntimeError(self._format_error(result))
|
|
|
|
@staticmethod
|
|
def _format_error(result: CommandResult) -> str:
|
|
stdout = f"\nstdout: {result.stdout}" if result.stdout else ""
|
|
stderr = f"\nstderr: {result.stderr}" if result.stderr else ""
|
|
return (
|
|
f"agent-browser command failed ({result.returncode}): {' '.join(result.command)}"
|
|
f"{stdout}{stderr}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _is_transient_failure(result: CommandResult) -> bool:
|
|
haystack = f"{result.stdout}\n{result.stderr}".lower()
|
|
transient_markers = (
|
|
"cdp response channel closed",
|
|
"target closed",
|
|
"websocket",
|
|
"socket closed",
|
|
"connection closed",
|
|
"econnreset",
|
|
"broken pipe",
|
|
)
|
|
return any(marker in haystack for marker in transient_markers)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Create a Browser Use cloud browser, attach Agent Browser via CDP, and drive an OpenAI OAuth flow.",
|
|
)
|
|
parser.add_argument("--browser-use-api-key", default=os.getenv("BROWSER_USE_API_KEY"))
|
|
parser.add_argument("--oauth-url", required=True)
|
|
parser.add_argument("--email", required=True)
|
|
parser.add_argument("--password", required=True)
|
|
parser.add_argument("--code")
|
|
parser.add_argument("--approve-consent", action="store_true")
|
|
parser.add_argument("--session", default="browseruse-oauth")
|
|
parser.add_argument("--agent-browser-bin", default=shutil.which("agent-browser") or "agent-browser")
|
|
parser.add_argument("--proxy-country-code", default=DEFAULT_PROXY_COUNTRY)
|
|
parser.add_argument("--browser-timeout", type=int, default=DEFAULT_BROWSER_TIMEOUT)
|
|
parser.add_argument("--wait-ms", type=int, default=DEFAULT_WAIT_MS)
|
|
parser.add_argument("--localhost-pattern", default=DEFAULT_LOCALHOST_PATTERN)
|
|
parser.add_argument("--callback-fragment", default=DEFAULT_CALLBACK_FRAGMENT)
|
|
parser.add_argument("--login-email-selector", default=DEFAULT_LOGIN_EMAIL_SELECTOR)
|
|
parser.add_argument("--password-selector", default=DEFAULT_PASSWORD_SELECTOR)
|
|
parser.add_argument("--code-selector", default=DEFAULT_CODE_SELECTOR)
|
|
parser.add_argument("--submit-selector", default=DEFAULT_SUBMIT_SELECTOR)
|
|
parser.add_argument("--trace-path", type=Path)
|
|
parser.add_argument("--output", type=Path)
|
|
parser.add_argument("--stop-browser", action="store_true")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--verbose", action="store_true")
|
|
return parser
|
|
|
|
|
|
def extract_data(result: CommandResult) -> Any:
|
|
if isinstance(result.parsed, dict):
|
|
return result.parsed.get("data")
|
|
return result.parsed
|
|
|
|
|
|
def create_browser_use_session(api_key: str, proxy_country_code: str, timeout: int) -> BrowserUseSession:
|
|
client = BrowserUse(api_key=api_key)
|
|
browser = client.browsers.create(proxy_country_code=proxy_country_code, timeout=timeout)
|
|
cdp_http_url = browser.cdp_url.rstrip("/")
|
|
version = httpx.get(f"{cdp_http_url}/json/version", timeout=30)
|
|
version.raise_for_status()
|
|
websocket_url = version.json()["webSocketDebuggerUrl"]
|
|
return BrowserUseSession(
|
|
id=browser.id,
|
|
live_url=browser.live_url,
|
|
cdp_http_url=browser.cdp_url,
|
|
websocket_url=websocket_url,
|
|
timeout_at=str(browser.timeout_at),
|
|
)
|
|
|
|
|
|
def run_step(
|
|
runner: AgentBrowserRunner,
|
|
command: tuple[str, ...],
|
|
expect_json: bool = True,
|
|
retries: int | None = None,
|
|
) -> CommandResult:
|
|
if retries is None:
|
|
retries = 2 if command and command[0] in {"connect", "open", "wait", "get", "snapshot"} else 0
|
|
return runner.run(*command, expect_json=expect_json, retries=retries)
|
|
|
|
|
|
def get_url(runner: AgentBrowserRunner) -> str | None:
|
|
result = run_step(runner, ("get", "url"))
|
|
data = extract_data(result) or {}
|
|
return data.get("url")
|
|
|
|
|
|
def run_until_success(
|
|
runner: AgentBrowserRunner,
|
|
command: tuple[str, ...],
|
|
timeout_ms: int,
|
|
expect_json: bool = True,
|
|
) -> CommandResult:
|
|
deadline = time.time() + timeout_ms / 1000
|
|
last_error: Exception | None = None
|
|
while time.time() < deadline:
|
|
try:
|
|
return run_step(runner, command, expect_json=expect_json)
|
|
except RuntimeError as exc:
|
|
last_error = exc
|
|
time.sleep(1.0)
|
|
raise RuntimeError(f"Timed out waiting for command {command!r}. Last error: {last_error}")
|
|
|
|
|
|
def poll_url_contains(runner: AgentBrowserRunner, fragment: str, timeout_ms: int) -> str:
|
|
deadline = time.time() + timeout_ms / 1000
|
|
last_url = None
|
|
while time.time() < deadline:
|
|
last_url = get_url(runner)
|
|
if last_url and fragment in last_url:
|
|
return last_url
|
|
time.sleep(1.0)
|
|
raise RuntimeError(f"Timed out waiting for URL containing {fragment!r}. Last URL: {last_url!r}")
|
|
|
|
|
|
def append_result(results: list[dict[str, Any]], command: tuple[str, ...], result: CommandResult, expect_json: bool) -> None:
|
|
entry: dict[str, Any] = {
|
|
"command": list(command),
|
|
"returncode": result.returncode,
|
|
}
|
|
if expect_json:
|
|
entry["data"] = extract_data(result)
|
|
else:
|
|
entry["stdout"] = result.stdout
|
|
results.append(entry)
|
|
|
|
|
|
def stop_browser_use_session(api_key: str, browser_id: str) -> None:
|
|
client = BrowserUse(api_key=api_key)
|
|
client.browsers.stop(browser_id)
|
|
|
|
|
|
def run_flow(args: argparse.Namespace) -> dict[str, Any]:
|
|
if args.dry_run:
|
|
return {
|
|
"mode": "dry-run",
|
|
"oauth_url": args.oauth_url,
|
|
"session": args.session,
|
|
"email": args.email,
|
|
"approve_consent": args.approve_consent,
|
|
"has_code": bool(args.code),
|
|
"trace_path": str(args.trace_path) if args.trace_path else None,
|
|
}
|
|
|
|
if not args.browser_use_api_key:
|
|
raise SystemExit("Pass --browser-use-api-key or set BROWSER_USE_API_KEY.")
|
|
|
|
browser_session = create_browser_use_session(
|
|
api_key=args.browser_use_api_key,
|
|
proxy_country_code=args.proxy_country_code,
|
|
timeout=args.browser_timeout,
|
|
)
|
|
|
|
runner = AgentBrowserRunner(args.agent_browser_bin, args.session, verbose=args.verbose)
|
|
results: list[dict[str, Any]] = []
|
|
trace_started = False
|
|
|
|
try:
|
|
result = run_step(runner, ("connect", browser_session.websocket_url))
|
|
append_result(results, ("connect", browser_session.websocket_url), result, True)
|
|
|
|
result = run_step(runner, ("open", args.oauth_url))
|
|
append_result(results, ("open", args.oauth_url), result, True)
|
|
|
|
result = run_step(runner, ("wait", str(args.wait_ms)))
|
|
append_result(results, ("wait", str(args.wait_ms)), result, True)
|
|
|
|
result = run_until_success(runner, ("fill", args.login_email_selector, args.email), timeout_ms=args.wait_ms)
|
|
append_result(results, ("fill", args.login_email_selector, args.email), result, True)
|
|
|
|
result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(results, ("click", args.submit_selector), result, True)
|
|
|
|
result = run_step(runner, ("wait", "3000"))
|
|
append_result(results, ("wait", "3000"), result, True)
|
|
|
|
result = run_until_success(runner, ("fill", args.password_selector, args.password), timeout_ms=args.wait_ms)
|
|
append_result(results, ("fill", args.password_selector, "********"), result, True)
|
|
|
|
result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(results, ("click", args.submit_selector), result, True)
|
|
|
|
result = run_step(runner, ("wait", str(args.wait_ms)))
|
|
append_result(results, ("wait", str(args.wait_ms)), result, True)
|
|
|
|
current_url = get_url(runner)
|
|
status = "unknown"
|
|
callback_url = None
|
|
|
|
if current_url and DEFAULT_VERIFICATION_URL_FRAGMENT in current_url:
|
|
status = "verification_reached"
|
|
|
|
if args.code:
|
|
result = run_until_success(runner, ("fill", args.code_selector, args.code), timeout_ms=args.wait_ms)
|
|
append_result(results, ("fill", args.code_selector, "******"), result, True)
|
|
|
|
result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(results, ("click", args.submit_selector), result, True)
|
|
|
|
result = run_step(runner, ("wait", str(args.wait_ms)))
|
|
append_result(results, ("wait", str(args.wait_ms)), result, True)
|
|
|
|
current_url = get_url(runner)
|
|
if current_url and DEFAULT_CONSENT_URL_FRAGMENT in current_url:
|
|
status = "consent_reached"
|
|
|
|
if args.approve_consent:
|
|
result = run_step(runner, ("network", "requests", "--clear"))
|
|
append_result(results, ("network", "requests", "--clear"), result, True)
|
|
|
|
result = run_step(runner, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}'))
|
|
append_result(results, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}'), result, True)
|
|
|
|
if args.trace_path:
|
|
result = run_step(runner, ("trace", "start"))
|
|
append_result(results, ("trace", "start"), result, True)
|
|
trace_started = True
|
|
|
|
result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(results, ("click", args.submit_selector), result, True)
|
|
|
|
result = run_step(runner, ("wait", str(args.wait_ms)))
|
|
append_result(results, ("wait", str(args.wait_ms)), result, True)
|
|
|
|
current_url = get_url(runner)
|
|
if current_url and args.callback_fragment in current_url:
|
|
callback_url = current_url
|
|
status = "callback_captured"
|
|
|
|
url_result = run_step(runner, ("get", "url"))
|
|
title_result = run_step(runner, ("get", "title"))
|
|
text_result = run_step(runner, ("get", "text", "body"))
|
|
snapshot_result = run_step(runner, ("snapshot", "-i", "-c"), expect_json=False)
|
|
append_result(results, ("get", "url"), url_result, True)
|
|
append_result(results, ("get", "title"), title_result, True)
|
|
append_result(results, ("get", "text", "body"), text_result, True)
|
|
append_result(results, ("snapshot", "-i", "-c"), snapshot_result, False)
|
|
|
|
trace_output = None
|
|
if trace_started and args.trace_path:
|
|
trace_stop = run_step(runner, ("trace", "stop", str(args.trace_path)))
|
|
append_result(results, ("trace", "stop", str(args.trace_path)), trace_stop, True)
|
|
trace_output = str(args.trace_path)
|
|
|
|
final_url = (extract_data(url_result) or {}).get("url")
|
|
final_title = (extract_data(title_result) or {}).get("title")
|
|
final_text = (extract_data(text_result) or {}).get("text")
|
|
|
|
return {
|
|
"mode": "live",
|
|
"status": status,
|
|
"browser_use": {
|
|
"id": browser_session.id,
|
|
"live_url": browser_session.live_url,
|
|
"cdp_http_url": browser_session.cdp_http_url,
|
|
"websocket_url": browser_session.websocket_url,
|
|
"timeout_at": browser_session.timeout_at,
|
|
},
|
|
"session": args.session,
|
|
"email": args.email,
|
|
"oauth_url": args.oauth_url,
|
|
"callback_url": callback_url or final_url if final_url and args.callback_fragment in final_url else None,
|
|
"final_url": final_url,
|
|
"final_title": final_title,
|
|
"final_text_excerpt": final_text[:500] if final_text else None,
|
|
"final_snapshot": snapshot_result.stdout,
|
|
"trace_path": trace_output,
|
|
"results": results,
|
|
}
|
|
finally:
|
|
if args.stop_browser:
|
|
stop_browser_use_session(args.browser_use_api_key, browser_session.id)
|
|
|
|
|
|
def main() -> int:
|
|
args = build_parser().parse_args()
|
|
if not shutil.which(args.agent_browser_bin) and args.agent_browser_bin == "agent-browser":
|
|
raise SystemExit("agent-browser is not installed or not on PATH.")
|
|
|
|
summary = run_flow(args)
|
|
if args.output:
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(json.dumps(summary, indent=2, ensure_ascii=False) + "\n")
|
|
|
|
print(json.dumps(summary, indent=2, ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|