393 lines
13 KiB
Python
393 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
DEFAULT_HOST = "production-sfo.browserless.io"
|
|
DEFAULT_CHATGPT_URL = "https://chatgpt.com"
|
|
DEFAULT_SIGNUP_LABEL = "Sign up for free"
|
|
DEFAULT_EMAIL_SELECTOR = "input[name='email']"
|
|
DEFAULT_PASSWORD_SELECTOR = "input[type='password']"
|
|
DEFAULT_SUBMIT_SELECTOR = "button[type='submit']"
|
|
DEFAULT_WAIT_MS = 5000
|
|
|
|
|
|
def build_default_email() -> str:
|
|
return f"agent-browser-smoke-{datetime.now().strftime('%Y%m%d-%H%M%S')}@example.com"
|
|
|
|
|
|
@dataclass
|
|
class CommandResult:
|
|
command: list[str]
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
parsed: Any | None = None
|
|
|
|
|
|
class AgentBrowserRunner:
|
|
def __init__(self, binary: str, session: str, verbose: bool = False) -> None:
|
|
self.binary = binary
|
|
self.session = session
|
|
self.verbose = verbose
|
|
|
|
def run(self, *args: str, expect_json: bool = True, retries: int = 0) -> CommandResult:
|
|
attempt = 0
|
|
while True:
|
|
command = [self.binary, "--session", self.session]
|
|
if expect_json:
|
|
command.append("--json")
|
|
command.extend(args)
|
|
|
|
if self.verbose:
|
|
print("+", " ".join(command), file=sys.stderr)
|
|
|
|
completed = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
stdout = completed.stdout.strip()
|
|
stderr = completed.stderr.strip()
|
|
parsed = None
|
|
|
|
if expect_json and stdout:
|
|
try:
|
|
parsed = json.loads(stdout)
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(
|
|
f"Failed to parse JSON from agent-browser for command {args!r}: {stdout}"
|
|
) from exc
|
|
|
|
result = CommandResult(
|
|
command=command,
|
|
returncode=completed.returncode,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
parsed=parsed,
|
|
)
|
|
|
|
failed = completed.returncode != 0 or (
|
|
expect_json and isinstance(parsed, dict) and not parsed.get("success", False)
|
|
)
|
|
|
|
if not failed:
|
|
return result
|
|
|
|
if attempt < retries and self._is_transient_failure(result):
|
|
attempt += 1
|
|
time.sleep(1.0)
|
|
continue
|
|
|
|
raise RuntimeError(self._format_error(result))
|
|
|
|
@staticmethod
|
|
def _format_error(result: CommandResult) -> str:
|
|
stdout = f"\nstdout: {result.stdout}" if result.stdout else ""
|
|
stderr = f"\nstderr: {result.stderr}" if result.stderr else ""
|
|
return (
|
|
f"agent-browser command failed ({result.returncode}): {' '.join(result.command)}"
|
|
f"{stdout}{stderr}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _is_transient_failure(result: CommandResult) -> bool:
|
|
haystack = f"{result.stdout}\n{result.stderr}".lower()
|
|
transient_markers = (
|
|
"cdp response channel closed",
|
|
"target closed",
|
|
"websocket",
|
|
"socket closed",
|
|
"connection closed",
|
|
"econnreset",
|
|
)
|
|
return any(marker in haystack for marker in transient_markers)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Smoke-test the ChatGPT signup flow through Agent Browser over Browserless stealth CDP.",
|
|
)
|
|
parser.add_argument("--token", default=os.getenv("BROWSERLESS_TOKEN"))
|
|
parser.add_argument("--ws-url", help="Override the Browserless websocket URL.")
|
|
parser.add_argument("--host", default=DEFAULT_HOST)
|
|
parser.add_argument("--session", default="browserless-signup-smoke")
|
|
parser.add_argument("--agent-browser-bin", default=shutil.which("agent-browser") or "agent-browser")
|
|
parser.add_argument("--chatgpt-url", default=DEFAULT_CHATGPT_URL)
|
|
parser.add_argument("--signup-label", default=DEFAULT_SIGNUP_LABEL)
|
|
parser.add_argument("--email", default=build_default_email())
|
|
parser.add_argument(
|
|
"--password",
|
|
default=f"TempPass!{datetime.now().strftime('%Y%m%d')}",
|
|
)
|
|
parser.add_argument("--email-selector", default=DEFAULT_EMAIL_SELECTOR)
|
|
parser.add_argument("--password-selector", default=DEFAULT_PASSWORD_SELECTOR)
|
|
parser.add_argument("--submit-selector", default=DEFAULT_SUBMIT_SELECTOR)
|
|
parser.add_argument("--wait-ms", type=int, default=DEFAULT_WAIT_MS)
|
|
parser.add_argument("--output", type=Path, help="Optional path for a JSON summary.")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
parser.add_argument("--verbose", action="store_true")
|
|
return parser
|
|
|
|
|
|
def build_ws_url(args: argparse.Namespace) -> str:
|
|
if args.ws_url:
|
|
return args.ws_url
|
|
if not args.token:
|
|
raise SystemExit("Pass --token or set BROWSERLESS_TOKEN.")
|
|
return f"wss://{args.host}/chrome/stealth?token={args.token}"
|
|
|
|
|
|
def plan_commands(args: argparse.Namespace, ws_url: str) -> list[tuple[tuple[str, ...], bool]]:
|
|
return [
|
|
(("connect", ws_url), True),
|
|
(("open", args.chatgpt_url), True),
|
|
(("wait", str(args.wait_ms)), True),
|
|
(("find", "role", "button", "click", "--name", args.signup_label), True),
|
|
(("fill", args.email_selector, args.email), True),
|
|
(("click", args.submit_selector), True),
|
|
(("get", "url"), True),
|
|
(("fill", args.password_selector, args.password), True),
|
|
(("click", args.submit_selector), True),
|
|
(("get", "url"), True),
|
|
(("get", "title"), True),
|
|
(("get", "text", "body"), True),
|
|
(("snapshot", "-i", "-c"), False),
|
|
]
|
|
|
|
|
|
def run_step(
|
|
runner: AgentBrowserRunner,
|
|
command: tuple[str, ...],
|
|
expect_json: bool = True,
|
|
) -> CommandResult:
|
|
retries = 2 if command and command[0] in {"connect", "open", "wait", "get", "snapshot"} else 0
|
|
return runner.run(*command, expect_json=expect_json, retries=retries)
|
|
|
|
|
|
def run_until_success(
|
|
runner: AgentBrowserRunner,
|
|
command: tuple[str, ...],
|
|
timeout_ms: int,
|
|
expect_json: bool = True,
|
|
poll_interval_s: float = 1.0,
|
|
) -> CommandResult:
|
|
deadline = time.time() + (timeout_ms / 1000)
|
|
last_error: Exception | None = None
|
|
|
|
while time.time() < deadline:
|
|
try:
|
|
return run_step(runner, command, expect_json=expect_json)
|
|
except RuntimeError as exc:
|
|
last_error = exc
|
|
time.sleep(poll_interval_s)
|
|
|
|
raise RuntimeError(f"Timed out waiting for command {command!r}. Last error: {last_error}")
|
|
|
|
|
|
def extract_json_data(result: CommandResult) -> Any:
|
|
if isinstance(result.parsed, dict):
|
|
return result.parsed.get("data")
|
|
return result.parsed
|
|
|
|
|
|
def get_current_url(runner: AgentBrowserRunner) -> str | None:
|
|
result = run_step(runner, ("get", "url"))
|
|
data = extract_json_data(result) or {}
|
|
return data.get("url")
|
|
|
|
|
|
def poll_url_contains(
|
|
runner: AgentBrowserRunner,
|
|
fragment: str,
|
|
timeout_ms: int,
|
|
poll_interval_s: float = 1.0,
|
|
) -> str:
|
|
deadline = time.time() + (timeout_ms / 1000)
|
|
last_url = None
|
|
|
|
while time.time() < deadline:
|
|
last_url = get_current_url(runner)
|
|
if last_url and fragment in last_url:
|
|
return last_url
|
|
time.sleep(poll_interval_s)
|
|
|
|
raise RuntimeError(f"Timed out waiting for URL containing {fragment!r}. Last URL: {last_url!r}")
|
|
|
|
|
|
def poll_url_contains_any(
|
|
runner: AgentBrowserRunner,
|
|
fragments: tuple[str, ...],
|
|
timeout_ms: int,
|
|
poll_interval_s: float = 1.0,
|
|
) -> str:
|
|
deadline = time.time() + (timeout_ms / 1000)
|
|
last_url = None
|
|
|
|
while time.time() < deadline:
|
|
last_url = get_current_url(runner)
|
|
if last_url and any(fragment in last_url for fragment in fragments):
|
|
return last_url
|
|
time.sleep(poll_interval_s)
|
|
|
|
raise RuntimeError(f"Timed out waiting for URL containing one of {fragments!r}. Last URL: {last_url!r}")
|
|
|
|
|
|
def run_flow(args: argparse.Namespace) -> dict[str, Any]:
|
|
ws_url = build_ws_url(args)
|
|
commands = plan_commands(args, ws_url)
|
|
|
|
if args.dry_run:
|
|
return {
|
|
"mode": "dry-run",
|
|
"session": args.session,
|
|
"ws_url": ws_url,
|
|
"commands": [
|
|
{
|
|
"expect_json": expect_json,
|
|
"command": [args.agent_browser_bin, "--session", args.session]
|
|
+ (["--json"] if expect_json else [])
|
|
+ list(command),
|
|
}
|
|
for command, expect_json in commands
|
|
],
|
|
}
|
|
|
|
runner = AgentBrowserRunner(
|
|
binary=args.agent_browser_bin,
|
|
session=args.session,
|
|
verbose=args.verbose,
|
|
)
|
|
|
|
results: list[dict[str, Any]] = []
|
|
final_url = None
|
|
final_title = None
|
|
final_text = None
|
|
final_snapshot = None
|
|
|
|
def append_result(command: tuple[str, ...], result: CommandResult, expect_json: bool = True) -> None:
|
|
entry: dict[str, Any] = {
|
|
"command": command,
|
|
"returncode": result.returncode,
|
|
}
|
|
|
|
if expect_json:
|
|
entry["data"] = extract_json_data(result)
|
|
else:
|
|
entry["stdout"] = result.stdout
|
|
|
|
results.append(entry)
|
|
|
|
connect_result = run_step(runner, ("connect", ws_url))
|
|
append_result(("connect", ws_url), connect_result)
|
|
|
|
open_result = run_step(runner, ("open", args.chatgpt_url))
|
|
append_result(("open", args.chatgpt_url), open_result)
|
|
|
|
wait_home_result = run_step(runner, ("wait", str(args.wait_ms)))
|
|
append_result(("wait", str(args.wait_ms)), wait_home_result)
|
|
|
|
signup_result = run_until_success(
|
|
runner,
|
|
("find", "role", "button", "click", "--name", args.signup_label),
|
|
timeout_ms=args.wait_ms,
|
|
)
|
|
append_result(("find", "role", "button", "click", "--name", args.signup_label), signup_result)
|
|
|
|
email_fill_result = run_until_success(
|
|
runner,
|
|
("fill", args.email_selector, args.email),
|
|
timeout_ms=args.wait_ms,
|
|
)
|
|
append_result(("fill", args.email_selector, args.email), email_fill_result)
|
|
|
|
submit_email_result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(("click", args.submit_selector), submit_email_result)
|
|
|
|
password_url = poll_url_contains_any(
|
|
runner,
|
|
("create-account/password", "log-in/password"),
|
|
timeout_ms=args.wait_ms * 6,
|
|
)
|
|
password_url_result = run_step(runner, ("get", "url"))
|
|
append_result(("get", "url"), password_url_result)
|
|
|
|
password_fill_result = run_until_success(
|
|
runner,
|
|
("fill", args.password_selector, args.password),
|
|
timeout_ms=args.wait_ms,
|
|
)
|
|
append_result(("fill", args.password_selector, args.password), password_fill_result)
|
|
|
|
submit_password_result = run_step(runner, ("click", args.submit_selector))
|
|
append_result(("click", args.submit_selector), submit_password_result)
|
|
|
|
final_url = poll_url_contains(
|
|
runner,
|
|
"email-verification",
|
|
timeout_ms=args.wait_ms * 6,
|
|
)
|
|
|
|
url_result = run_step(runner, ("get", "url"))
|
|
append_result(("get", "url"), url_result)
|
|
title_result = run_step(runner, ("get", "title"))
|
|
append_result(("get", "title"), title_result)
|
|
text_result = run_step(runner, ("get", "text", "body"))
|
|
append_result(("get", "text", "body"), text_result)
|
|
snapshot_result = run_step(runner, ("snapshot", "-i", "-c"), expect_json=False)
|
|
append_result(("snapshot", "-i", "-c"), snapshot_result, expect_json=False)
|
|
|
|
final_title = (extract_json_data(title_result) or {}).get("title")
|
|
final_text = (extract_json_data(text_result) or {}).get("text")
|
|
final_snapshot = snapshot_result.stdout
|
|
|
|
reached_verification = bool(final_url and "email-verification" in final_url)
|
|
|
|
return {
|
|
"mode": "live",
|
|
"session": args.session,
|
|
"ws_url": ws_url,
|
|
"chatgpt_url": args.chatgpt_url,
|
|
"email": args.email,
|
|
"password_masked": "*" * len(args.password),
|
|
"reached_email_verification": reached_verification,
|
|
"final_url": final_url,
|
|
"final_title": final_title,
|
|
"final_text_excerpt": final_text[:500] if final_text else None,
|
|
"final_snapshot": final_snapshot,
|
|
"results": results,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
args = build_parser().parse_args()
|
|
|
|
if not shutil.which(args.agent_browser_bin) and args.agent_browser_bin == "agent-browser":
|
|
raise SystemExit("agent-browser is not installed or not on PATH.")
|
|
|
|
summary = run_flow(args)
|
|
|
|
if args.output:
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(json.dumps(summary, indent=2, ensure_ascii=False) + "\n")
|
|
|
|
print(json.dumps(summary, indent=2, ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|