Files

393 lines
13 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_HOST = "production-sfo.browserless.io"
DEFAULT_CHATGPT_URL = "https://chatgpt.com"
DEFAULT_SIGNUP_LABEL = "Sign up for free"
DEFAULT_EMAIL_SELECTOR = "input[name='email']"
DEFAULT_PASSWORD_SELECTOR = "input[type='password']"
DEFAULT_SUBMIT_SELECTOR = "button[type='submit']"
DEFAULT_WAIT_MS = 5000
def build_default_email() -> str:
return f"agent-browser-smoke-{datetime.now().strftime('%Y%m%d-%H%M%S')}@example.com"
@dataclass
class CommandResult:
command: list[str]
returncode: int
stdout: str
stderr: str
parsed: Any | None = None
class AgentBrowserRunner:
def __init__(self, binary: str, session: str, verbose: bool = False) -> None:
self.binary = binary
self.session = session
self.verbose = verbose
def run(self, *args: str, expect_json: bool = True, retries: int = 0) -> CommandResult:
attempt = 0
while True:
command = [self.binary, "--session", self.session]
if expect_json:
command.append("--json")
command.extend(args)
if self.verbose:
print("+", " ".join(command), file=sys.stderr)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
)
stdout = completed.stdout.strip()
stderr = completed.stderr.strip()
parsed = None
if expect_json and stdout:
try:
parsed = json.loads(stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(
f"Failed to parse JSON from agent-browser for command {args!r}: {stdout}"
) from exc
result = CommandResult(
command=command,
returncode=completed.returncode,
stdout=stdout,
stderr=stderr,
parsed=parsed,
)
failed = completed.returncode != 0 or (
expect_json and isinstance(parsed, dict) and not parsed.get("success", False)
)
if not failed:
return result
if attempt < retries and self._is_transient_failure(result):
attempt += 1
time.sleep(1.0)
continue
raise RuntimeError(self._format_error(result))
@staticmethod
def _format_error(result: CommandResult) -> str:
stdout = f"\nstdout: {result.stdout}" if result.stdout else ""
stderr = f"\nstderr: {result.stderr}" if result.stderr else ""
return (
f"agent-browser command failed ({result.returncode}): {' '.join(result.command)}"
f"{stdout}{stderr}"
)
@staticmethod
def _is_transient_failure(result: CommandResult) -> bool:
haystack = f"{result.stdout}\n{result.stderr}".lower()
transient_markers = (
"cdp response channel closed",
"target closed",
"websocket",
"socket closed",
"connection closed",
"econnreset",
)
return any(marker in haystack for marker in transient_markers)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Smoke-test the ChatGPT signup flow through Agent Browser over Browserless stealth CDP.",
)
parser.add_argument("--token", default=os.getenv("BROWSERLESS_TOKEN"))
parser.add_argument("--ws-url", help="Override the Browserless websocket URL.")
parser.add_argument("--host", default=DEFAULT_HOST)
parser.add_argument("--session", default="browserless-signup-smoke")
parser.add_argument("--agent-browser-bin", default=shutil.which("agent-browser") or "agent-browser")
parser.add_argument("--chatgpt-url", default=DEFAULT_CHATGPT_URL)
parser.add_argument("--signup-label", default=DEFAULT_SIGNUP_LABEL)
parser.add_argument("--email", default=build_default_email())
parser.add_argument(
"--password",
default=f"TempPass!{datetime.now().strftime('%Y%m%d')}",
)
parser.add_argument("--email-selector", default=DEFAULT_EMAIL_SELECTOR)
parser.add_argument("--password-selector", default=DEFAULT_PASSWORD_SELECTOR)
parser.add_argument("--submit-selector", default=DEFAULT_SUBMIT_SELECTOR)
parser.add_argument("--wait-ms", type=int, default=DEFAULT_WAIT_MS)
parser.add_argument("--output", type=Path, help="Optional path for a JSON summary.")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--verbose", action="store_true")
return parser
def build_ws_url(args: argparse.Namespace) -> str:
if args.ws_url:
return args.ws_url
if not args.token:
raise SystemExit("Pass --token or set BROWSERLESS_TOKEN.")
return f"wss://{args.host}/chrome/stealth?token={args.token}"
def plan_commands(args: argparse.Namespace, ws_url: str) -> list[tuple[tuple[str, ...], bool]]:
return [
(("connect", ws_url), True),
(("open", args.chatgpt_url), True),
(("wait", str(args.wait_ms)), True),
(("find", "role", "button", "click", "--name", args.signup_label), True),
(("fill", args.email_selector, args.email), True),
(("click", args.submit_selector), True),
(("get", "url"), True),
(("fill", args.password_selector, args.password), True),
(("click", args.submit_selector), True),
(("get", "url"), True),
(("get", "title"), True),
(("get", "text", "body"), True),
(("snapshot", "-i", "-c"), False),
]
def run_step(
runner: AgentBrowserRunner,
command: tuple[str, ...],
expect_json: bool = True,
) -> CommandResult:
retries = 2 if command and command[0] in {"connect", "open", "wait", "get", "snapshot"} else 0
return runner.run(*command, expect_json=expect_json, retries=retries)
def run_until_success(
runner: AgentBrowserRunner,
command: tuple[str, ...],
timeout_ms: int,
expect_json: bool = True,
poll_interval_s: float = 1.0,
) -> CommandResult:
deadline = time.time() + (timeout_ms / 1000)
last_error: Exception | None = None
while time.time() < deadline:
try:
return run_step(runner, command, expect_json=expect_json)
except RuntimeError as exc:
last_error = exc
time.sleep(poll_interval_s)
raise RuntimeError(f"Timed out waiting for command {command!r}. Last error: {last_error}")
def extract_json_data(result: CommandResult) -> Any:
if isinstance(result.parsed, dict):
return result.parsed.get("data")
return result.parsed
def get_current_url(runner: AgentBrowserRunner) -> str | None:
result = run_step(runner, ("get", "url"))
data = extract_json_data(result) or {}
return data.get("url")
def poll_url_contains(
runner: AgentBrowserRunner,
fragment: str,
timeout_ms: int,
poll_interval_s: float = 1.0,
) -> str:
deadline = time.time() + (timeout_ms / 1000)
last_url = None
while time.time() < deadline:
last_url = get_current_url(runner)
if last_url and fragment in last_url:
return last_url
time.sleep(poll_interval_s)
raise RuntimeError(f"Timed out waiting for URL containing {fragment!r}. Last URL: {last_url!r}")
def poll_url_contains_any(
runner: AgentBrowserRunner,
fragments: tuple[str, ...],
timeout_ms: int,
poll_interval_s: float = 1.0,
) -> str:
deadline = time.time() + (timeout_ms / 1000)
last_url = None
while time.time() < deadline:
last_url = get_current_url(runner)
if last_url and any(fragment in last_url for fragment in fragments):
return last_url
time.sleep(poll_interval_s)
raise RuntimeError(f"Timed out waiting for URL containing one of {fragments!r}. Last URL: {last_url!r}")
def run_flow(args: argparse.Namespace) -> dict[str, Any]:
ws_url = build_ws_url(args)
commands = plan_commands(args, ws_url)
if args.dry_run:
return {
"mode": "dry-run",
"session": args.session,
"ws_url": ws_url,
"commands": [
{
"expect_json": expect_json,
"command": [args.agent_browser_bin, "--session", args.session]
+ (["--json"] if expect_json else [])
+ list(command),
}
for command, expect_json in commands
],
}
runner = AgentBrowserRunner(
binary=args.agent_browser_bin,
session=args.session,
verbose=args.verbose,
)
results: list[dict[str, Any]] = []
final_url = None
final_title = None
final_text = None
final_snapshot = None
def append_result(command: tuple[str, ...], result: CommandResult, expect_json: bool = True) -> None:
entry: dict[str, Any] = {
"command": command,
"returncode": result.returncode,
}
if expect_json:
entry["data"] = extract_json_data(result)
else:
entry["stdout"] = result.stdout
results.append(entry)
connect_result = run_step(runner, ("connect", ws_url))
append_result(("connect", ws_url), connect_result)
open_result = run_step(runner, ("open", args.chatgpt_url))
append_result(("open", args.chatgpt_url), open_result)
wait_home_result = run_step(runner, ("wait", str(args.wait_ms)))
append_result(("wait", str(args.wait_ms)), wait_home_result)
signup_result = run_until_success(
runner,
("find", "role", "button", "click", "--name", args.signup_label),
timeout_ms=args.wait_ms,
)
append_result(("find", "role", "button", "click", "--name", args.signup_label), signup_result)
email_fill_result = run_until_success(
runner,
("fill", args.email_selector, args.email),
timeout_ms=args.wait_ms,
)
append_result(("fill", args.email_selector, args.email), email_fill_result)
submit_email_result = run_step(runner, ("click", args.submit_selector))
append_result(("click", args.submit_selector), submit_email_result)
password_url = poll_url_contains_any(
runner,
("create-account/password", "log-in/password"),
timeout_ms=args.wait_ms * 6,
)
password_url_result = run_step(runner, ("get", "url"))
append_result(("get", "url"), password_url_result)
password_fill_result = run_until_success(
runner,
("fill", args.password_selector, args.password),
timeout_ms=args.wait_ms,
)
append_result(("fill", args.password_selector, args.password), password_fill_result)
submit_password_result = run_step(runner, ("click", args.submit_selector))
append_result(("click", args.submit_selector), submit_password_result)
final_url = poll_url_contains(
runner,
"email-verification",
timeout_ms=args.wait_ms * 6,
)
url_result = run_step(runner, ("get", "url"))
append_result(("get", "url"), url_result)
title_result = run_step(runner, ("get", "title"))
append_result(("get", "title"), title_result)
text_result = run_step(runner, ("get", "text", "body"))
append_result(("get", "text", "body"), text_result)
snapshot_result = run_step(runner, ("snapshot", "-i", "-c"), expect_json=False)
append_result(("snapshot", "-i", "-c"), snapshot_result, expect_json=False)
final_title = (extract_json_data(title_result) or {}).get("title")
final_text = (extract_json_data(text_result) or {}).get("text")
final_snapshot = snapshot_result.stdout
reached_verification = bool(final_url and "email-verification" in final_url)
return {
"mode": "live",
"session": args.session,
"ws_url": ws_url,
"chatgpt_url": args.chatgpt_url,
"email": args.email,
"password_masked": "*" * len(args.password),
"reached_email_verification": reached_verification,
"final_url": final_url,
"final_title": final_title,
"final_text_excerpt": final_text[:500] if final_text else None,
"final_snapshot": final_snapshot,
"results": results,
}
def main() -> int:
args = build_parser().parse_args()
if not shutil.which(args.agent_browser_bin) and args.agent_browser_bin == "agent-browser":
raise SystemExit("agent-browser is not installed or not on PATH.")
summary = run_flow(args)
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(summary, indent=2, ensure_ascii=False) + "\n")
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())