Files

407 lines
15 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
try:
import httpx
from browser_use_sdk import BrowserUse
except ImportError as exc: # pragma: no cover
raise SystemExit(
"Missing dependency. Install `browser-use-sdk` first, e.g. `uv pip install browser-use-sdk`."
) from exc
DEFAULT_PROXY_COUNTRY = "us"
DEFAULT_BROWSER_TIMEOUT = 240
DEFAULT_WAIT_MS = 7000
DEFAULT_LOCALHOST_PATTERN = "http://localhost:1455/*"
DEFAULT_CALLBACK_FRAGMENT = "http://localhost:1455/auth/callback"
DEFAULT_LOGIN_EMAIL_SELECTOR = "input[name='username'], input[type='email']"
DEFAULT_PASSWORD_SELECTOR = "input[type='password']"
DEFAULT_CODE_SELECTOR = "input"
DEFAULT_SUBMIT_SELECTOR = "button[type='submit']"
DEFAULT_CONSENT_URL_FRAGMENT = "/sign-in-with-chatgpt/codex/consent"
DEFAULT_VERIFICATION_URL_FRAGMENT = "/email-verification"
@dataclass
class BrowserUseSession:
id: str
live_url: str | None
cdp_http_url: str
websocket_url: str
timeout_at: str | None
@dataclass
class CommandResult:
command: list[str]
returncode: int
stdout: str
stderr: str
parsed: Any | None = None
class AgentBrowserRunner:
def __init__(self, binary: str, session: str, verbose: bool = False) -> None:
self.binary = binary
self.session = session
self.verbose = verbose
def run(self, *args: str, expect_json: bool = True, retries: int = 0) -> CommandResult:
attempt = 0
while True:
command = [self.binary, "--session", self.session]
if expect_json:
command.append("--json")
command.extend(args)
if self.verbose:
print("+", " ".join(command), file=sys.stderr)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
)
stdout = completed.stdout.strip()
stderr = completed.stderr.strip()
parsed = None
if expect_json and stdout:
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Failed to parse JSON from agent-browser for command {args!r}: {stdout}"
) from exc
result = CommandResult(
command=command,
returncode=completed.returncode,
stdout=stdout,
stderr=stderr,
parsed=parsed,
)
failed = completed.returncode != 0 or (
expect_json and isinstance(parsed, dict) and not parsed.get("success", False)
)
if not failed:
return result
if attempt < retries and self._is_transient_failure(result):
attempt += 1
time.sleep(1.0)
continue
raise RuntimeError(self._format_error(result))
@staticmethod
def _format_error(result: CommandResult) -> str:
stdout = f"\nstdout: {result.stdout}" if result.stdout else ""
stderr = f"\nstderr: {result.stderr}" if result.stderr else ""
return (
f"agent-browser command failed ({result.returncode}): {' '.join(result.command)}"
f"{stdout}{stderr}"
)
@staticmethod
def _is_transient_failure(result: CommandResult) -> bool:
haystack = f"{result.stdout}\n{result.stderr}".lower()
transient_markers = (
"cdp response channel closed",
"target closed",
"websocket",
"socket closed",
"connection closed",
"econnreset",
"broken pipe",
)
return any(marker in haystack for marker in transient_markers)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Create a Browser Use cloud browser, attach Agent Browser via CDP, and drive an OpenAI OAuth flow.",
)
parser.add_argument("--browser-use-api-key", default=os.getenv("BROWSER_USE_API_KEY"))
parser.add_argument("--oauth-url", required=True)
parser.add_argument("--email", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--code")
parser.add_argument("--approve-consent", action="store_true")
parser.add_argument("--session", default="browseruse-oauth")
parser.add_argument("--agent-browser-bin", default=shutil.which("agent-browser") or "agent-browser")
parser.add_argument("--proxy-country-code", default=DEFAULT_PROXY_COUNTRY)
parser.add_argument("--browser-timeout", type=int, default=DEFAULT_BROWSER_TIMEOUT)
parser.add_argument("--wait-ms", type=int, default=DEFAULT_WAIT_MS)
parser.add_argument("--localhost-pattern", default=DEFAULT_LOCALHOST_PATTERN)
parser.add_argument("--callback-fragment", default=DEFAULT_CALLBACK_FRAGMENT)
parser.add_argument("--login-email-selector", default=DEFAULT_LOGIN_EMAIL_SELECTOR)
parser.add_argument("--password-selector", default=DEFAULT_PASSWORD_SELECTOR)
parser.add_argument("--code-selector", default=DEFAULT_CODE_SELECTOR)
parser.add_argument("--submit-selector", default=DEFAULT_SUBMIT_SELECTOR)
parser.add_argument("--trace-path", type=Path)
parser.add_argument("--output", type=Path)
parser.add_argument("--stop-browser", action="store_true")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verbose", action="store_true")
return parser
def extract_data(result: CommandResult) -> Any:
if isinstance(result.parsed, dict):
return result.parsed.get("data")
return result.parsed
def create_browser_use_session(api_key: str, proxy_country_code: str, timeout: int) -> BrowserUseSession:
client = BrowserUse(api_key=api_key)
browser = client.browsers.create(proxy_country_code=proxy_country_code, timeout=timeout)
cdp_http_url = browser.cdp_url.rstrip("/")
version = httpx.get(f"{cdp_http_url}/json/version", timeout=30)
version.raise_for_status()
websocket_url = version.json()["webSocketDebuggerUrl"]
return BrowserUseSession(
id=browser.id,
live_url=browser.live_url,
cdp_http_url=browser.cdp_url,
websocket_url=websocket_url,
timeout_at=str(browser.timeout_at),
)
def run_step(
runner: AgentBrowserRunner,
command: tuple[str, ...],
expect_json: bool = True,
retries: int | None = None,
) -> CommandResult:
if retries is None:
retries = 2 if command and command[0] in {"connect", "open", "wait", "get", "snapshot"} else 0
return runner.run(*command, expect_json=expect_json, retries=retries)
def get_url(runner: AgentBrowserRunner) -> str | None:
result = run_step(runner, ("get", "url"))
data = extract_data(result) or {}
return data.get("url")
def run_until_success(
runner: AgentBrowserRunner,
command: tuple[str, ...],
timeout_ms: int,
expect_json: bool = True,
) -> CommandResult:
deadline = time.time() + timeout_ms / 1000
last_error: Exception | None = None
while time.time() < deadline:
try:
return run_step(runner, command, expect_json=expect_json)
except RuntimeError as exc:
last_error = exc
time.sleep(1.0)
raise RuntimeError(f"Timed out waiting for command {command!r}. Last error: {last_error}")
def poll_url_contains(runner: AgentBrowserRunner, fragment: str, timeout_ms: int) -> str:
deadline = time.time() + timeout_ms / 1000
last_url = None
while time.time() < deadline:
last_url = get_url(runner)
if last_url and fragment in last_url:
return last_url
time.sleep(1.0)
raise RuntimeError(f"Timed out waiting for URL containing {fragment!r}. Last URL: {last_url!r}")
def append_result(results: list[dict[str, Any]], command: tuple[str, ...], result: CommandResult, expect_json: bool) -> None:
entry: dict[str, Any] = {
"command": list(command),
"returncode": result.returncode,
}
if expect_json:
entry["data"] = extract_data(result)
else:
entry["stdout"] = result.stdout
results.append(entry)
def stop_browser_use_session(api_key: str, browser_id: str) -> None:
client = BrowserUse(api_key=api_key)
client.browsers.stop(browser_id)
def run_flow(args: argparse.Namespace) -> dict[str, Any]:
if args.dry_run:
return {
"mode": "dry-run",
"oauth_url": args.oauth_url,
"session": args.session,
"email": args.email,
"approve_consent": args.approve_consent,
"has_code": bool(args.code),
"trace_path": str(args.trace_path) if args.trace_path else None,
}
if not args.browser_use_api_key:
raise SystemExit("Pass --browser-use-api-key or set BROWSER_USE_API_KEY.")
browser_session = create_browser_use_session(
api_key=args.browser_use_api_key,
proxy_country_code=args.proxy_country_code,
timeout=args.browser_timeout,
)
runner = AgentBrowserRunner(args.agent_browser_bin, args.session, verbose=args.verbose)
results: list[dict[str, Any]] = []
trace_started = False
try:
result = run_step(runner, ("connect", browser_session.websocket_url))
append_result(results, ("connect", browser_session.websocket_url), result, True)
result = run_step(runner, ("open", args.oauth_url))
append_result(results, ("open", args.oauth_url), result, True)
result = run_step(runner, ("wait", str(args.wait_ms)))
append_result(results, ("wait", str(args.wait_ms)), result, True)
result = run_until_success(runner, ("fill", args.login_email_selector, args.email), timeout_ms=args.wait_ms)
append_result(results, ("fill", args.login_email_selector, args.email), result, True)
result = run_step(runner, ("click", args.submit_selector))
append_result(results, ("click", args.submit_selector), result, True)
result = run_step(runner, ("wait", "3000"))
append_result(results, ("wait", "3000"), result, True)
result = run_until_success(runner, ("fill", args.password_selector, args.password), timeout_ms=args.wait_ms)
append_result(results, ("fill", args.password_selector, "********"), result, True)
result = run_step(runner, ("click", args.submit_selector))
append_result(results, ("click", args.submit_selector), result, True)
result = run_step(runner, ("wait", str(args.wait_ms)))
append_result(results, ("wait", str(args.wait_ms)), result, True)
current_url = get_url(runner)
status = "unknown"
callback_url = None
if current_url and DEFAULT_VERIFICATION_URL_FRAGMENT in current_url:
status = "verification_reached"
if args.code:
result = run_until_success(runner, ("fill", args.code_selector, args.code), timeout_ms=args.wait_ms)
append_result(results, ("fill", args.code_selector, "******"), result, True)
result = run_step(runner, ("click", args.submit_selector))
append_result(results, ("click", args.submit_selector), result, True)
result = run_step(runner, ("wait", str(args.wait_ms)))
append_result(results, ("wait", str(args.wait_ms)), result, True)
current_url = get_url(runner)
if current_url and DEFAULT_CONSENT_URL_FRAGMENT in current_url:
status = "consent_reached"
if args.approve_consent:
result = run_step(runner, ("network", "requests", "--clear"))
append_result(results, ("network", "requests", "--clear"), result, True)
result = run_step(runner, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}'))
append_result(results, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}'), result, True)
if args.trace_path:
result = run_step(runner, ("trace", "start"))
append_result(results, ("trace", "start"), result, True)
trace_started = True
result = run_step(runner, ("click", args.submit_selector))
append_result(results, ("click", args.submit_selector), result, True)
result = run_step(runner, ("wait", str(args.wait_ms)))
append_result(results, ("wait", str(args.wait_ms)), result, True)
current_url = get_url(runner)
if current_url and args.callback_fragment in current_url:
callback_url = current_url
status = "callback_captured"
url_result = run_step(runner, ("get", "url"))
title_result = run_step(runner, ("get", "title"))
text_result = run_step(runner, ("get", "text", "body"))
snapshot_result = run_step(runner, ("snapshot", "-i", "-c"), expect_json=False)
append_result(results, ("get", "url"), url_result, True)
append_result(results, ("get", "title"), title_result, True)
append_result(results, ("get", "text", "body"), text_result, True)
append_result(results, ("snapshot", "-i", "-c"), snapshot_result, False)
trace_output = None
if trace_started and args.trace_path:
trace_stop = run_step(runner, ("trace", "stop", str(args.trace_path)))
append_result(results, ("trace", "stop", str(args.trace_path)), trace_stop, True)
trace_output = str(args.trace_path)
final_url = (extract_data(url_result) or {}).get("url")
final_title = (extract_data(title_result) or {}).get("title")
final_text = (extract_data(text_result) or {}).get("text")
return {
"mode": "live",
"status": status,
"browser_use": {
"id": browser_session.id,
"live_url": browser_session.live_url,
"cdp_http_url": browser_session.cdp_http_url,
"websocket_url": browser_session.websocket_url,
"timeout_at": browser_session.timeout_at,
},
"session": args.session,
"email": args.email,
"oauth_url": args.oauth_url,
"callback_url": callback_url or final_url if final_url and args.callback_fragment in final_url else None,
"final_url": final_url,
"final_title": final_title,
"final_text_excerpt": final_text[:500] if final_text else None,
"final_snapshot": snapshot_result.stdout,
"trace_path": trace_output,
"results": results,
}
finally:
if args.stop_browser:
stop_browser_use_session(args.browser_use_api_key, browser_session.id)
def main() -> int:
args = build_parser().parse_args()
if not shutil.which(args.agent_browser_bin) and args.agent_browser_bin == "agent-browser":
raise SystemExit("agent-browser is not installed or not on PATH.")
summary = run_flow(args)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(summary, indent=2, ensure_ascii=False) + "\n")
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())