"""Replay the captured Codex OAuth login flow over HTTP only.""" from __future__ import annotations import base64 import json import os from pathlib import Path from typing import Any from urllib.parse import urljoin, urlparse from http_client import HTTPClient try: from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright except Exception: # pragma: no cover - optional runtime dependency path PlaywrightTimeoutError = Exception sync_playwright = None DEFAULT_AUTHORIZE_URL = ( "https://auth.openai.com/oauth/authorize?" "response_type=code&client_id=app_EMoamEEZ73f0CkXaXp7hrann&" "redirect_uri=http%3A%2F%2Flocalhost%3A1455%2Fauth%2Fcallback&" "scope=openid+profile+email+offline_access&" "code_challenge=DdqIRKaga8whKtumU863MbnpEA74P4YITGhM2l7VLuU&" "code_challenge_method=S256&state=dfa46133d0bb07e60420e058c3a4e8ea&" "id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=pi" ) class FlowError(RuntimeError): pass class BrowserSentinelHelper: chrome_executable = Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome") def __init__(self, proxy: str) -> None: self.proxy = proxy self.playwright = None self.browser = None self.context = None self.page = None @classmethod def is_available(cls) -> bool: return sync_playwright is not None and cls.chrome_executable.exists() def start(self) -> None: if self.page is not None: return if sync_playwright is None: raise FlowError("Playwright is not available in this environment") launch_kwargs: dict[str, Any] = { "headless": True, "executable_path": str(self.chrome_executable), } proxy_config = self._playwright_proxy() if proxy_config: launch_kwargs["proxy"] = proxy_config self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch(**launch_kwargs) self.context = self.browser.new_context(ignore_https_errors=True) self.page = self.context.new_page() def close(self) -> None: if self.context is not None: self.context.close() if self.browser is not None: self.browser.close() if self.playwright is not None: self.playwright.stop() self.page = None self.context = None self.browser = None self.playwright = None def open_page(self, url: str) -> str: self.start() self.page.goto(url, wait_until="domcontentloaded", timeout=60_000) try: self.page.wait_for_load_state("networkidle", timeout=15_000) except PlaywrightTimeoutError: pass print(f"[browser] page -> {self.page.url}") return self.page.url def get_sentinel_token(self, flow: str) -> str: self.start() token = self.page.evaluate( """ async (flow) => { async function loadScript(src) { await new Promise((resolve, reject) => { const script = document.createElement("script"); script.src = src; script.async = true; script.defer = true; script.onload = () => resolve(); script.onerror = () => reject(new Error(`Failed to load ${src}`)); document.head.appendChild(script); }); } if (!window.SentinelSDK) { try { await loadScript("https://sentinel.openai.com/backend-api/sentinel/sdk.js"); } catch (error) { await loadScript("https://chatgpt.com/backend-api/sentinel/sdk.js"); } } try { if (window.SentinelSDK && typeof window.SentinelSDK.init === "function") { await window.SentinelSDK.init(flow); } } catch (error) { // Match the web app: init failures are tolerated. } if (!window.SentinelSDK || typeof window.SentinelSDK.token !== "function") { return JSON.stringify({ e: "q2n8w7x5z1" }); } try { return await window.SentinelSDK.token(flow); } catch (error) { return JSON.stringify({ e: "k9d4s6v3b2" }); } } """, flow, ) if not isinstance(token, str) or not token: raise FlowError(f"Sentinel token generation failed for flow {flow!r}: {token!r}") print(f"[browser] sentinel token ready for {flow}") return token def cookies(self) -> list[dict[str, Any]]: self.start() return self.context.cookies() def set_cookies(self, cookies: list[dict[str, Any]]) -> None: self.start() if cookies: self.context.add_cookies(cookies) def _playwright_proxy(self) -> dict[str, str] | None: if not self.proxy: return None parsed = urlparse(self.proxy) if not parsed.hostname or not parsed.port: return {"server": self.proxy} scheme = parsed.scheme if scheme in {"socks5", "socks5h"}: # Match HTTPClient's fallback: some proxy providers expose an HTTP proxy # behind a socks5-looking URL, and Chromium cannot do authenticated socks5 here. scheme = "http" proxy: dict[str, str] = {"server": f"{scheme}://{parsed.hostname}:{parsed.port}"} if parsed.username: proxy["username"] = parsed.username if parsed.password: proxy["password"] = parsed.password return proxy def load_proxy() -> str: proxy = os.getenv("SOCKS5_PROXY", "").strip() if proxy: return proxy env_path = Path(__file__).resolve().parent.parent / ".env" if not env_path.exists(): return "" for raw_line in env_path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) if key.strip() != "SOCKS5_PROXY": continue return value.strip().strip("\"'") return "" class CodexOAuthHTTPFlow: def __init__( self, authorize_url: str, email: str, password: str, otp: str = "", workspace_id: str = "", use_browser: bool = False, mailbox: dict | None = None, ) -> None: self.authorize_url = authorize_url self.email = email self.password = password self.otp = otp self.workspace_id = workspace_id self.mailbox = mailbox self.auth_base = "https://auth.openai.com" self.proxy = load_proxy() self.http = HTTPClient(self.proxy) self.browser = ( BrowserSentinelHelper(self.proxy) if use_browser and BrowserSentinelHelper.is_available() else None ) def close(self) -> None: self.http.close() if self.browser is not None: self.browser.close() def run(self) -> str: login_page_url = self._bootstrap_login_page() self._sync_browser_to_http() response = self._api_json( "POST", f"{self.auth_base}/api/accounts/authorize/continue", referer=login_page_url, payload={"username": {"kind": "email", "value": self.email}}, sentinel_flow="authorize_continue", ) current_page = login_page_url while True: continue_url = self._absolute_continue_url(response) page = response.get("page") or {} page_type = page.get("type") print(f"[flow] page_type={page_type!r}") if continue_url and "/api/oauth/oauth2/auth" in continue_url: return self._finish_oauth_chain(continue_url, current_page) if page_type == "login_password": current_page = self._open_page(continue_url, current_page, browser_sync=True) response = self._api_json( "POST", f"{self.auth_base}/api/accounts/password/verify", referer=current_page, payload={"password": self.password}, sentinel_flow="password_verify", ) continue if page_type in {"contact_verification", "email_otp_send"}: response = self._send_email_otp(current_page, continue_url) continue if page_type == "email_otp_verification": current_page = continue_url or f"{self.auth_base}/email-verification" response = self._validate_email_otp(current_page) continue if page_type == "sign_in_with_chatgpt_codex_consent": current_page = continue_url or f"{self.auth_base}/sign-in-with-chatgpt/codex/consent" workspace_id = self._pick_workspace_id() response = self._api_json( "POST", f"{self.auth_base}/api/accounts/workspace/select", referer=current_page, payload={"workspace_id": workspace_id}, ) continue raise FlowError( f"Unsupported page type: {page_type!r}, continue_url={continue_url!r}, " f"response={json.dumps(response, ensure_ascii=False)}" ) def _bootstrap_login_page(self) -> str: if self.browser is not None: return self.browser.open_page(self.authorize_url) current_url = self.authorize_url referer = "" for step in range(1, 10): response = self._page_request(current_url, referer=referer) if response.status_code in {301, 302, 303, 307, 308}: location = response.headers.get("location") if not location: raise FlowError(f"Redirect from {current_url} missing Location header") next_url = urljoin(current_url, location) print(f"[bootstrap:{step}] {response.status_code} {current_url} -> {next_url}") referer = current_url current_url = next_url continue print(f"[bootstrap:{step}] {response.status_code} {current_url}") return current_url raise FlowError("Authorize bootstrap exceeded redirect limit") def _send_email_otp(self, referer: str, continue_url: str) -> dict[str, Any]: send_url = continue_url or f"{self.auth_base}/api/accounts/email-otp/send" print(f"[otp] sending email code via {send_url}") return self._api_json("GET", send_url, referer=referer) def _fetch_otp_from_mailbox(self) -> str: """Try to fetch OTP from mailbox automatically.""" if not self.mailbox: return "" from vmail_client import MailClient mail = MailClient() print("[otp] auto-fetching OTP from mailbox...") try: code = mail.wait_for_otp(self.mailbox, timeout=120, poll=3.0) print(f" OTP: {code}") return code except Exception as e: print(f"[otp] auto-fetch failed: {e}") return "" def _validate_email_otp(self, referer: str) -> dict[str, Any]: validate_url = f"{self.auth_base}/api/accounts/email-otp/validate" resend_url = f"{self.auth_base}/api/accounts/email-otp/resend" while True: if self.otp: code = self.otp.strip() self.otp = "" elif self.mailbox: code = self._fetch_otp_from_mailbox() if not code: raise FlowError("Failed to auto-fetch OTP from mailbox") else: code = input("Email OTP (or type 'resend'): ").strip() if not code: continue if code.lower() == "resend": resend_response = self.http.request( "POST", resend_url, headers={ "Accept": "application/json", "Origin": self.auth_base, "Referer": referer, }, ) print(f"[otp] resend -> {resend_response.status_code}") if resend_response.status_code == 429: raise FlowError("Email OTP resend rate-limited") if resend_response.status_code not in {200, 204}: raise FlowError( f"Resend OTP failed: {resend_response.status_code} " f"{self._response_excerpt(resend_response)}" ) continue response = self.http.request( "POST", validate_url, json={"code": code}, headers={ "Accept": "application/json", "Content-Type": "application/json", "Origin": self.auth_base, "Referer": referer, }, ) print(f"[otp] validate -> {response.status_code}") if response.status_code == 401: print("[otp] incorrect code") continue if response.status_code == 429: raise FlowError("Email OTP validation rate-limited") if response.status_code not in {200, 201}: raise FlowError( f"Validate OTP failed: {response.status_code} {self._response_excerpt(response)}" ) return self._json_from_response(response) def _pick_workspace_id(self) -> str: session_data = self._decode_cookie_json("oai-client-auth-session") workspaces = session_data.get("workspaces") or [] if not workspaces: raise FlowError(f"No workspaces found in oai-client-auth-session: {session_data}") if self.workspace_id: for workspace in workspaces: if workspace.get("id") == self.workspace_id: print(f"[workspace] using explicit workspace_id={self.workspace_id}") return self.workspace_id raise FlowError(f"workspace_id {self.workspace_id!r} not found in {workspaces}") if len(workspaces) == 1: workspace_id = workspaces[0]["id"] print(f"[workspace] auto-selected {workspace_id}") return workspace_id print("[workspace] available workspaces:") for index, workspace in enumerate(workspaces, start=1): label = workspace.get("name") or workspace.get("profile_picture_alt_text") or workspace["id"] print(f" {index}. {label} ({workspace['id']}) [{workspace.get('kind', 'unknown')}]") while True: raw = input("Choose workspace number: ").strip() if not raw.isdigit(): continue selected = int(raw) if 1 <= selected <= len(workspaces): workspace_id = workspaces[selected - 1]["id"] print(f"[workspace] selected {workspace_id}") return workspace_id def _finish_oauth_chain(self, url: str, referer: str) -> str: current_url = url current_referer = referer for step in range(1, 10): response = self._page_request(current_url, referer=current_referer) location = response.headers.get("location") print( f"[oauth:{step}] {response.status_code} {current_url}" + (f" -> {location}" if location else "") ) if response.status_code not in {301, 302, 303, 307, 308}: raise FlowError( f"Expected redirect during oauth finish, got {response.status_code}: " f"{self._response_excerpt(response)}" ) if not location: raise FlowError(f"Redirect from {current_url} missing Location header") next_url = urljoin(current_url, location) if next_url.startswith("http://localhost:"): return next_url current_referer = current_url current_url = next_url raise FlowError("OAuth finish exceeded redirect limit") def _api_json( self, method: str, url: str, referer: str, payload: dict[str, Any] | None = None, sentinel_flow: str = "", ) -> dict[str, Any]: headers = { "Accept": "application/json", "Origin": self.auth_base, "Referer": referer, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", } if payload is not None: headers["Content-Type"] = "application/json" if sentinel_flow: headers["OpenAI-Sentinel-Token"] = self._sentinel_token(sentinel_flow) request_kwargs: dict[str, Any] = {"headers": headers} if payload is not None: request_kwargs["json"] = payload response = self.http.request(method, url, **request_kwargs) print(f"[api] {method} {url} -> {response.status_code}") if response.status_code not in {200, 201}: raise FlowError( f"API request failed: {method} {url} -> {response.status_code} " f"{self._response_excerpt(response)}" ) return self._json_from_response(response) def _page_request(self, url: str, referer: str = ""): headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Upgrade-Insecure-Requests": "1", } if referer: headers["Referer"] = referer return self.http.request("GET", url, headers=headers) def _open_page(self, url: str, referer: str, browser_sync: bool = False) -> str: if not url: raise FlowError("Missing continue_url for page navigation") if browser_sync and self.browser is not None: self._sync_http_to_browser() page_url = self.browser.open_page(url) self._sync_browser_to_http() return page_url response = self._page_request(url, referer=referer) print(f"[page] GET {url} -> {response.status_code}") if response.status_code in {301, 302, 303, 307, 308}: location = response.headers.get("location") if not location: raise FlowError(f"Page redirect from {url} missing Location header") return urljoin(url, location) if response.status_code != 200: raise FlowError(f"Page load failed: {url} -> {response.status_code}") return url def _absolute_continue_url(self, payload: dict[str, Any]) -> str: continue_url = payload.get("continue_url") or "" if not continue_url: return "" if urlparse(continue_url).scheme: return continue_url return urljoin(self.auth_base, continue_url) def _decode_cookie_json(self, name: str) -> dict[str, Any]: raw_value = self._cookie_value(name) if not raw_value: return {} candidates = raw_value.split(".") for part in candidates[:2]: padded = part + "=" * (-len(part) % 4) try: decoded = base64.urlsafe_b64decode(padded.encode()).decode() parsed = json.loads(decoded) except Exception: continue if isinstance(parsed, dict): return parsed return {} def _cookie_value(self, name: str) -> str: for cookie in self.http.session.cookies.jar: if cookie.name == name: return cookie.value return "" def _sentinel_token(self, flow: str) -> str: if self.browser is None: # Fallback: use error token that browser SDK returns when unavailable print(f"[sentinel] using fallback error token for {flow}") return '{"e":"q2n8w7x5z1"}' self._sync_http_to_browser() return self.browser.get_sentinel_token(flow) def _sync_browser_to_http(self) -> None: if self.browser is None: return for cookie in self.browser.cookies(): self.http.session.cookies.set( cookie["name"], cookie["value"], domain=cookie.get("domain", ""), path=cookie.get("path", "/"), ) def _sync_http_to_browser(self) -> None: if self.browser is None: return cookies: list[dict[str, Any]] = [] for cookie in self.http.session.cookies.jar: domain = cookie.domain or "" if not domain: continue if "openai.com" not in domain and "chatgpt.com" not in domain: continue item: dict[str, Any] = { "name": cookie.name, "value": cookie.value, "domain": domain, "path": cookie.path or "/", "secure": bool(cookie.secure), } if cookie.expires: item["expires"] = float(cookie.expires) cookies.append(item) self.browser.set_cookies(cookies) @staticmethod def _json_from_response(response) -> dict[str, Any]: content_type = response.headers.get("content-type", "") if "application/json" not in content_type: raise FlowError( f"Expected JSON response, got {content_type}: {CodexOAuthHTTPFlow._response_excerpt(response)}" ) data = response.json() if not isinstance(data, dict): raise FlowError(f"Expected JSON object, got: {data!r}") return data @staticmethod def _response_excerpt(response, limit: int = 500) -> str: text = response.text or "" text = text.replace("\r", " ").replace("\n", " ") return text[:limit]