#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any try: import httpx from browser_use_sdk import BrowserUse except ImportError as exc: # pragma: no cover raise SystemExit( "Missing dependency. Install `browser-use-sdk` first, e.g. `uv pip install browser-use-sdk`." ) from exc DEFAULT_PROXY_COUNTRY = "us" DEFAULT_BROWSER_TIMEOUT = 240 DEFAULT_WAIT_MS = 7000 DEFAULT_LOCALHOST_PATTERN = "http://localhost:1455/*" DEFAULT_CALLBACK_FRAGMENT = "http://localhost:1455/auth/callback" DEFAULT_LOGIN_EMAIL_SELECTOR = "input[name='username'], input[type='email']" DEFAULT_PASSWORD_SELECTOR = "input[type='password']" DEFAULT_CODE_SELECTOR = "input" DEFAULT_SUBMIT_SELECTOR = "button[type='submit']" DEFAULT_CONSENT_URL_FRAGMENT = "/sign-in-with-chatgpt/codex/consent" DEFAULT_VERIFICATION_URL_FRAGMENT = "/email-verification" @dataclass class BrowserUseSession: id: str live_url: str | None cdp_http_url: str websocket_url: str timeout_at: str | None @dataclass class CommandResult: command: list[str] returncode: int stdout: str stderr: str parsed: Any | None = None class AgentBrowserRunner: def __init__(self, binary: str, session: str, verbose: bool = False) -> None: self.binary = binary self.session = session self.verbose = verbose def run(self, *args: str, expect_json: bool = True, retries: int = 0) -> CommandResult: attempt = 0 while True: command = [self.binary, "--session", self.session] if expect_json: command.append("--json") command.extend(args) if self.verbose: print("+", " ".join(command), file=sys.stderr) completed = subprocess.run( command, check=False, capture_output=True, text=True, ) stdout = completed.stdout.strip() stderr = completed.stderr.strip() parsed = None if expect_json and stdout: try: parsed = json.loads(stdout) except json.JSONDecodeError as exc: raise RuntimeError( f"Failed to parse JSON from agent-browser for command {args!r}: {stdout}" ) from exc result = CommandResult( command=command, returncode=completed.returncode, stdout=stdout, stderr=stderr, parsed=parsed, ) failed = completed.returncode != 0 or ( expect_json and isinstance(parsed, dict) and not parsed.get("success", False) ) if not failed: return result if attempt < retries and self._is_transient_failure(result): attempt += 1 time.sleep(1.0) continue raise RuntimeError(self._format_error(result)) @staticmethod def _format_error(result: CommandResult) -> str: stdout = f"\nstdout: {result.stdout}" if result.stdout else "" stderr = f"\nstderr: {result.stderr}" if result.stderr else "" return ( f"agent-browser command failed ({result.returncode}): {' '.join(result.command)}" f"{stdout}{stderr}" ) @staticmethod def _is_transient_failure(result: CommandResult) -> bool: haystack = f"{result.stdout}\n{result.stderr}".lower() transient_markers = ( "cdp response channel closed", "target closed", "websocket", "socket closed", "connection closed", "econnreset", "broken pipe", ) return any(marker in haystack for marker in transient_markers) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Create a Browser Use cloud browser, attach Agent Browser via CDP, and drive an OpenAI OAuth flow.", ) parser.add_argument("--browser-use-api-key", default=os.getenv("BROWSER_USE_API_KEY")) parser.add_argument("--oauth-url", required=True) parser.add_argument("--email", required=True) parser.add_argument("--password", required=True) parser.add_argument("--code") parser.add_argument("--approve-consent", action="store_true") parser.add_argument("--session", default="browseruse-oauth") parser.add_argument("--agent-browser-bin", default=shutil.which("agent-browser") or "agent-browser") parser.add_argument("--proxy-country-code", default=DEFAULT_PROXY_COUNTRY) parser.add_argument("--browser-timeout", type=int, default=DEFAULT_BROWSER_TIMEOUT) parser.add_argument("--wait-ms", type=int, default=DEFAULT_WAIT_MS) parser.add_argument("--localhost-pattern", default=DEFAULT_LOCALHOST_PATTERN) parser.add_argument("--callback-fragment", default=DEFAULT_CALLBACK_FRAGMENT) parser.add_argument("--login-email-selector", default=DEFAULT_LOGIN_EMAIL_SELECTOR) parser.add_argument("--password-selector", default=DEFAULT_PASSWORD_SELECTOR) parser.add_argument("--code-selector", default=DEFAULT_CODE_SELECTOR) parser.add_argument("--submit-selector", default=DEFAULT_SUBMIT_SELECTOR) parser.add_argument("--trace-path", type=Path) parser.add_argument("--output", type=Path) parser.add_argument("--stop-browser", action="store_true") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--verbose", action="store_true") return parser def extract_data(result: CommandResult) -> Any: if isinstance(result.parsed, dict): return result.parsed.get("data") return result.parsed def create_browser_use_session(api_key: str, proxy_country_code: str, timeout: int) -> BrowserUseSession: client = BrowserUse(api_key=api_key) browser = client.browsers.create(proxy_country_code=proxy_country_code, timeout=timeout) cdp_http_url = browser.cdp_url.rstrip("/") version = httpx.get(f"{cdp_http_url}/json/version", timeout=30) version.raise_for_status() websocket_url = version.json()["webSocketDebuggerUrl"] return BrowserUseSession( id=browser.id, live_url=browser.live_url, cdp_http_url=browser.cdp_url, websocket_url=websocket_url, timeout_at=str(browser.timeout_at), ) def run_step( runner: AgentBrowserRunner, command: tuple[str, ...], expect_json: bool = True, retries: int | None = None, ) -> CommandResult: if retries is None: retries = 2 if command and command[0] in {"connect", "open", "wait", "get", "snapshot"} else 0 return runner.run(*command, expect_json=expect_json, retries=retries) def get_url(runner: AgentBrowserRunner) -> str | None: result = run_step(runner, ("get", "url")) data = extract_data(result) or {} return data.get("url") def run_until_success( runner: AgentBrowserRunner, command: tuple[str, ...], timeout_ms: int, expect_json: bool = True, ) -> CommandResult: deadline = time.time() + timeout_ms / 1000 last_error: Exception | None = None while time.time() < deadline: try: return run_step(runner, command, expect_json=expect_json) except RuntimeError as exc: last_error = exc time.sleep(1.0) raise RuntimeError(f"Timed out waiting for command {command!r}. Last error: {last_error}") def poll_url_contains(runner: AgentBrowserRunner, fragment: str, timeout_ms: int) -> str: deadline = time.time() + timeout_ms / 1000 last_url = None while time.time() < deadline: last_url = get_url(runner) if last_url and fragment in last_url: return last_url time.sleep(1.0) raise RuntimeError(f"Timed out waiting for URL containing {fragment!r}. Last URL: {last_url!r}") def append_result(results: list[dict[str, Any]], command: tuple[str, ...], result: CommandResult, expect_json: bool) -> None: entry: dict[str, Any] = { "command": list(command), "returncode": result.returncode, } if expect_json: entry["data"] = extract_data(result) else: entry["stdout"] = result.stdout results.append(entry) def stop_browser_use_session(api_key: str, browser_id: str) -> None: client = BrowserUse(api_key=api_key) client.browsers.stop(browser_id) def run_flow(args: argparse.Namespace) -> dict[str, Any]: if args.dry_run: return { "mode": "dry-run", "oauth_url": args.oauth_url, "session": args.session, "email": args.email, "approve_consent": args.approve_consent, "has_code": bool(args.code), "trace_path": str(args.trace_path) if args.trace_path else None, } if not args.browser_use_api_key: raise SystemExit("Pass --browser-use-api-key or set BROWSER_USE_API_KEY.") browser_session = create_browser_use_session( api_key=args.browser_use_api_key, proxy_country_code=args.proxy_country_code, timeout=args.browser_timeout, ) runner = AgentBrowserRunner(args.agent_browser_bin, args.session, verbose=args.verbose) results: list[dict[str, Any]] = [] trace_started = False try: result = run_step(runner, ("connect", browser_session.websocket_url)) append_result(results, ("connect", browser_session.websocket_url), result, True) result = run_step(runner, ("open", args.oauth_url)) append_result(results, ("open", args.oauth_url), result, True) result = run_step(runner, ("wait", str(args.wait_ms))) append_result(results, ("wait", str(args.wait_ms)), result, True) result = run_until_success(runner, ("fill", args.login_email_selector, args.email), timeout_ms=args.wait_ms) append_result(results, ("fill", args.login_email_selector, args.email), result, True) result = run_step(runner, ("click", args.submit_selector)) append_result(results, ("click", args.submit_selector), result, True) result = run_step(runner, ("wait", "3000")) append_result(results, ("wait", "3000"), result, True) result = run_until_success(runner, ("fill", args.password_selector, args.password), timeout_ms=args.wait_ms) append_result(results, ("fill", args.password_selector, "********"), result, True) result = run_step(runner, ("click", args.submit_selector)) append_result(results, ("click", args.submit_selector), result, True) result = run_step(runner, ("wait", str(args.wait_ms))) append_result(results, ("wait", str(args.wait_ms)), result, True) current_url = get_url(runner) status = "unknown" callback_url = None if current_url and DEFAULT_VERIFICATION_URL_FRAGMENT in current_url: status = "verification_reached" if args.code: result = run_until_success(runner, ("fill", args.code_selector, args.code), timeout_ms=args.wait_ms) append_result(results, ("fill", args.code_selector, "******"), result, True) result = run_step(runner, ("click", args.submit_selector)) append_result(results, ("click", args.submit_selector), result, True) result = run_step(runner, ("wait", str(args.wait_ms))) append_result(results, ("wait", str(args.wait_ms)), result, True) current_url = get_url(runner) if current_url and DEFAULT_CONSENT_URL_FRAGMENT in current_url: status = "consent_reached" if args.approve_consent: result = run_step(runner, ("network", "requests", "--clear")) append_result(results, ("network", "requests", "--clear"), result, True) result = run_step(runner, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}')) append_result(results, ("network", "route", args.localhost_pattern, "--body", '{"ok":true}'), result, True) if args.trace_path: result = run_step(runner, ("trace", "start")) append_result(results, ("trace", "start"), result, True) trace_started = True result = run_step(runner, ("click", args.submit_selector)) append_result(results, ("click", args.submit_selector), result, True) result = run_step(runner, ("wait", str(args.wait_ms))) append_result(results, ("wait", str(args.wait_ms)), result, True) current_url = get_url(runner) if current_url and args.callback_fragment in current_url: callback_url = current_url status = "callback_captured" url_result = run_step(runner, ("get", "url")) title_result = run_step(runner, ("get", "title")) text_result = run_step(runner, ("get", "text", "body")) snapshot_result = run_step(runner, ("snapshot", "-i", "-c"), expect_json=False) append_result(results, ("get", "url"), url_result, True) append_result(results, ("get", "title"), title_result, True) append_result(results, ("get", "text", "body"), text_result, True) append_result(results, ("snapshot", "-i", "-c"), snapshot_result, False) trace_output = None if trace_started and args.trace_path: trace_stop = run_step(runner, ("trace", "stop", str(args.trace_path))) append_result(results, ("trace", "stop", str(args.trace_path)), trace_stop, True) trace_output = str(args.trace_path) final_url = (extract_data(url_result) or {}).get("url") final_title = (extract_data(title_result) or {}).get("title") final_text = (extract_data(text_result) or {}).get("text") return { "mode": "live", "status": status, "browser_use": { "id": browser_session.id, "live_url": browser_session.live_url, "cdp_http_url": browser_session.cdp_http_url, "websocket_url": browser_session.websocket_url, "timeout_at": browser_session.timeout_at, }, "session": args.session, "email": args.email, "oauth_url": args.oauth_url, "callback_url": callback_url or final_url if final_url and args.callback_fragment in final_url else None, "final_url": final_url, "final_title": final_title, "final_text_excerpt": final_text[:500] if final_text else None, "final_snapshot": snapshot_result.stdout, "trace_path": trace_output, "results": results, } finally: if args.stop_browser: stop_browser_use_session(args.browser_use_api_key, browser_session.id) def main() -> int: args = build_parser().parse_args() if not shutil.which(args.agent_browser_bin) and args.agent_browser_bin == "agent-browser": raise SystemExit("agent-browser is not installed or not on PATH.") summary = run_flow(args) if args.output: args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(summary, indent=2, ensure_ascii=False) + "\n") print(json.dumps(summary, indent=2, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(main())