chore: initial commit

This commit is contained in:
gameloader
2026-03-18 22:07:19 +08:00
commit e3a28050df
13 changed files with 51831 additions and 0 deletions

50
src/captcha_solver.py Normal file
View File

@@ -0,0 +1,50 @@
import time
import httpx
# YesCaptcha international endpoint
YESCAPTCHA_BASE = "https://api.yescaptcha.com"
class CaptchaSolver:
def __init__(self, api_key: str):
self.api_key = api_key
def solve_hcaptcha(self, site_key: str, page_url: str, timeout: int = 180) -> str:
"""Submit hCaptcha task via YesCaptcha and wait for solution token."""
# createTask
r = httpx.post(
f"{YESCAPTCHA_BASE}/createTask",
json={
"clientKey": self.api_key,
"task": {
"type": "HCaptchaTaskProxyless",
"websiteURL": page_url,
"websiteKey": site_key,
},
},
timeout=30,
)
r.raise_for_status()
resp = r.json()
if resp.get("errorId", 0) != 0:
raise RuntimeError(f"YesCaptcha createTask error: {resp}")
task_id = resp["taskId"]
print(f" YesCaptcha task: {task_id}")
# getTaskResult polling
deadline = time.time() + timeout
while time.time() < deadline:
time.sleep(5)
r2 = httpx.post(
f"{YESCAPTCHA_BASE}/getTaskResult",
json={"clientKey": self.api_key, "taskId": task_id},
timeout=30,
)
r2.raise_for_status()
res = r2.json()
if res.get("errorId", 0) != 0:
raise RuntimeError(f"YesCaptcha error: {res}")
if res.get("status") == "ready":
return res["solution"]["gRecaptchaResponse"]
# status == "processing" -> keep polling
raise TimeoutError(f"hCaptcha not solved within {timeout}s")

227
src/chatgpt_payment.py Normal file
View File

@@ -0,0 +1,227 @@
"""ChatGPT Plus payment flow."""
import uuid
from http_client import HTTPClient
from captcha_solver import CaptchaSolver
class ChatGPTPayment:
def __init__(self, http: HTTPClient, captcha: CaptchaSolver):
self.http = http
self.captcha = captcha
self.base = "https://chatgpt.com"
def subscribe_plus(
self, access_token: str, country: str, currency: str, card_info: dict
) -> bool:
"""Subscribe to ChatGPT Plus."""
checkout_currency = currency.upper()
# 0. Get account info and promo campaign
print("[0/6] Fetching account promo campaign...")
r = self.http.request(
"GET",
f"{self.base}/backend-api/accounts/check/v4-2023-04-27",
headers={"Authorization": f"Bearer {access_token}"},
)
accounts_data = r.json()
# Extract promo campaign from first account
account_id = list(accounts_data.get("accounts", {}).keys())[0] if accounts_data.get("accounts") else None
promo_campaign_id = ""
plan_name = "chatgptplusplan"
if account_id:
account = accounts_data["accounts"][account_id]
eligible_promos = account.get("eligible_promo_campaigns", {})
plus_promo = eligible_promos.get("plus", {})
if plus_promo:
promo_campaign_id = plus_promo.get("id", "")
plan_name = plus_promo.get("metadata", {}).get("plan_name", "chatgptplusplan")
print(f" Promo campaign: {promo_campaign_id}")
print(f" Plan name: {plan_name}")
# 1. Create checkout session
print("[1/6] Creating checkout session...")
r = self.http.request(
"POST",
f"{self.base}/backend-api/payments/checkout",
json={
"plan_name": plan_name,
"billing_details": {"country": country, "currency": checkout_currency},
"promo_campaign": {"promo_campaign_id": promo_campaign_id, "is_coupon_from_query_param": False},
"prefetch": True,
"checkout_ui_mode": "custom",
},
headers={"Authorization": f"Bearer {access_token}"},
)
checkout = r.json()
session_id = checkout["checkout_session_id"]
publishable_key = checkout["publishable_key"]
processor_entity = checkout["processor_entity"]
print(f" Session: {session_id}")
# 2. Initialize Stripe elements session
print("[2/6] Initializing Stripe elements...")
stripe_js_id = str(uuid.uuid4())
r = self.http.request(
"GET",
f"https://api.stripe.com/v1/elements/sessions",
params={
"client_betas[0]": "custom_checkout_server_updates_1",
"client_betas[1]": "custom_checkout_manual_approval_1",
"deferred_intent[mode]": "subscription",
"deferred_intent[amount]": "0",
"deferred_intent[currency]": currency.lower(),
"deferred_intent[setup_future_usage]": "off_session",
"deferred_intent[payment_method_types][0]": "card",
"currency": currency.lower(),
"key": publishable_key,
"_stripe_version": "2025-03-31.basil; checkout_server_update_beta=v1; checkout_manual_approval_preview=v1",
"elements_init_source": "custom_checkout",
"referrer_host": "chatgpt.com",
"stripe_js_id": stripe_js_id,
"locale": "en",
"type": "deferred_intent",
"checkout_session_id": session_id,
},
headers={
"Origin": "https://js.stripe.com",
"Referer": "https://js.stripe.com/",
},
)
elements = r.json()
captcha_site_key = elements.get("passive_captcha", {}).get("site_key", "")
elements_session_id = elements.get("session_id", "")
elements_config_id = elements.get("config_id", "")
print(f" Elements session: {elements_session_id}")
# 3. Solve hCaptcha (Stripe passive captcha via YesCaptcha)
print("[3/6] Solving Stripe passive captcha...")
print(f" Site key: {captcha_site_key}")
if captcha_site_key:
captcha_token = self.captcha.solve_hcaptcha(captcha_site_key, "https://js.stripe.com")
print(f" Token: {captcha_token[:30]}...")
else:
captcha_token = ""
print(" No captcha required")
# 4. Create Stripe payment method
print("[4/6] Creating payment method...")
billing_name = card_info.get("billing_name") or card_info.get("name") or "John Doe"
billing_email = card_info.get("billing_email") or card_info.get("email") or ""
billing_line1 = card_info.get("billing_address_line1") or "100 Main St"
billing_city = card_info.get("billing_address_city") or "San Francisco"
billing_state = card_info.get("billing_address_state") or "CA"
billing_postal_code = card_info.get("billing_address_postal_code") or "94105"
guid = str(uuid.uuid4()) + str(uuid.uuid4())[:6]
muid = str(uuid.uuid4()) + str(uuid.uuid4())[:6]
sid = str(uuid.uuid4()) + str(uuid.uuid4())[:6]
r = self.http.request(
"POST",
"https://api.stripe.com/v1/payment_methods",
data={
"billing_details[name]": billing_name,
"billing_details[email]": billing_email,
"billing_details[address][country]": country,
"billing_details[address][line1]": billing_line1,
"billing_details[address][city]": billing_city,
"billing_details[address][postal_code]": billing_postal_code,
"billing_details[address][state]": billing_state,
"type": "card",
"card[number]": card_info["number"],
"card[cvc]": card_info["cvc"],
"card[exp_year]": card_info["exp_year"],
"card[exp_month]": card_info["exp_month"],
"allow_redisplay": "unspecified",
"pasted_fields": "number,exp,cvc",
"payment_user_agent": "stripe.js/5e596c82e6; stripe-js-v3/5e596c82e6; payment-element; deferred-intent",
"referrer": "https://chatgpt.com",
"time_on_page": "60000",
"client_attribution_metadata[client_session_id]": stripe_js_id,
"client_attribution_metadata[checkout_session_id]": session_id,
"client_attribution_metadata[merchant_integration_source]": "elements",
"client_attribution_metadata[merchant_integration_subtype]": "payment-element",
"client_attribution_metadata[merchant_integration_version]": "2021",
"client_attribution_metadata[payment_intent_creation_flow]": "deferred",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[elements_session_id]": elements_session_id,
"client_attribution_metadata[elements_session_config_id]": elements_config_id,
"client_attribution_metadata[merchant_integration_additional_elements][0]": "payment",
"client_attribution_metadata[merchant_integration_additional_elements][1]": "address",
"guid": guid,
"muid": muid,
"sid": sid,
"key": publishable_key,
"_stripe_version": "2025-03-31.basil; checkout_server_update_beta=v1; checkout_manual_approval_preview=v1",
"radar_options[hcaptcha_token]": captcha_token,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://js.stripe.com",
"Referer": "https://js.stripe.com/",
},
)
if r.status_code != 200:
raise RuntimeError(f"Create payment method failed: {r.text}")
payment_method_id = r.json()["id"]
print(f" Payment method: {payment_method_id}")
# 5. Confirm payment with Stripe
print("[5/6] Confirming payment with Stripe...")
return_url = f"https://checkout.stripe.com/c/pay/{session_id}?returned_from_redirect=true&ui_mode=custom&return_url=https%3A%2F%2Fchatgpt.com%2Fcheckout%2Fverify%3Fstripe_session_id%3D{session_id}%26processor_entity%3D{processor_entity}%26plan_type%3Dplus"
r = self.http.request(
"POST",
f"https://api.stripe.com/v1/payment_pages/{session_id}/confirm",
data={
"guid": guid,
"muid": muid,
"sid": sid,
"payment_method": payment_method_id,
"expected_amount": "0",
"passive_captcha_token": captcha_token,
"passive_captcha_ekey": "",
"expected_payment_method_type": "card",
"return_url": return_url,
"elements_session_client[client_betas][0]": "custom_checkout_server_updates_1",
"elements_session_client[client_betas][1]": "custom_checkout_manual_approval_1",
"elements_session_client[elements_init_source]": "custom_checkout",
"elements_session_client[referrer_host]": "chatgpt.com",
"elements_session_client[session_id]": elements_session_id,
"elements_session_client[stripe_js_id]": stripe_js_id,
"elements_session_client[locale]": "en",
"elements_session_client[is_aggregation_expected]": "false",
"client_attribution_metadata[client_session_id]": stripe_js_id,
"client_attribution_metadata[checkout_session_id]": session_id,
"client_attribution_metadata[merchant_integration_source]": "checkout",
"client_attribution_metadata[merchant_integration_version]": "custom",
"client_attribution_metadata[merchant_integration_subtype]": "payment-element",
"client_attribution_metadata[merchant_integration_additional_elements][0]": "payment",
"client_attribution_metadata[merchant_integration_additional_elements][1]": "address",
"client_attribution_metadata[payment_intent_creation_flow]": "deferred",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[elements_session_id]": elements_session_id,
"client_attribution_metadata[elements_session_config_id]": elements_config_id,
"key": publishable_key,
"_stripe_version": "2025-03-31.basil; checkout_server_update_beta=v1; checkout_manual_approval_preview=v1",
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Origin": "https://js.stripe.com",
"Referer": "https://js.stripe.com/",
},
)
if r.status_code != 200:
raise RuntimeError(f"Stripe confirm failed: {r.text}")
print(f" Stripe confirm status: {r.status_code}")
# 6. Verify subscription
print("[6/6] Verifying subscription...")
r = self.http.request(
"GET",
f"{self.base}/backend-api/payments/checkout/{processor_entity}/{session_id}",
headers={"Authorization": f"Bearer {access_token}"},
)
return r.status_code == 200

View File

@@ -0,0 +1,427 @@
"""Pure HTTP registration experiment with detailed auth-state diagnostics."""
import os
import random
import re
import time
import uuid
from typing import Optional
from urllib.parse import unquote, urlencode, urljoin, urlparse
from http_client import HTTPClient
from vmail_client import MailClient
class ChatGPTRegisterHTTPReverse:
def __init__(self, http: HTTPClient, mail: MailClient):
self.http = http
self.mail = mail
self.base = "https://chatgpt.com"
self.auth_base = "https://auth.openai.com"
self.root_dir = os.path.dirname(os.path.dirname(__file__))
def _delay(self, lo=0.5, hi=1.5):
time.sleep(random.uniform(lo, hi))
def _cookie_rows(self) -> list[tuple[str, str, str]]:
rows = []
for cookie in self.http.session.cookies.jar:
rows.append(((cookie.domain or "").lstrip("."), cookie.name, cookie.value))
rows.sort()
return rows
def _cookie_value(self, name: str, domain_suffix: str = "") -> str:
matches = []
for domain, cookie_name, value in self._cookie_rows():
if cookie_name != name:
continue
if domain_suffix and not domain.endswith(domain_suffix):
continue
matches.append((domain, value))
return matches[-1][1] if matches else ""
def _print_cookie_summary(self, label: str):
interesting = [
"__Host-next-auth.csrf-token",
"__Secure-next-auth.callback-url",
"__Secure-next-auth.state",
"oai-login-csrf_dev_3772291445",
"rg_context",
"auth_provider",
"login_session",
"hydra_redirect",
"oai-client-auth-session",
"unified_session_manifest",
"auth-session-minimized",
"cf_clearance",
"__cf_bm",
"__cflb",
"_cfuvid",
"oai-sc",
]
present = []
for name in interesting:
value = self._cookie_value(name)
if value:
present.append(name)
print(f" {label}: {', '.join(present) if present else 'no relevant cookies'}")
@staticmethod
def _response_excerpt(response, limit: int = 500) -> str:
text = response.text or ""
text = text.replace("\n", " ").replace("\r", " ")
return text[:limit]
def _json_or_none(self, response):
try:
return response.json()
except Exception:
return None
def _load_captured_sentinel(self, flow_name: str) -> str:
path = os.path.join(self.root_dir, "nodatadog.js")
try:
with open(path, "r", encoding="utf-8") as handle:
content = handle.read()
except FileNotFoundError:
return ""
pattern = re.compile(r'"openai-sentinel-token": "((?:[^"\\]|\\.)*flow\\"\\"%s(?:[^"\\]|\\.)*)"' % re.escape(flow_name))
match = pattern.search(content)
if not match:
return ""
# Keep the captured literal as-is. The request header in the capture uses this exact string form.
return match.group(1)
def _bootstrap_chatgpt(self):
print("[2/9] Bootstrapping chatgpt.com cookies...")
response = self.http.request(
"GET",
f"{self.base}/",
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"sec-fetch-site": "none",
"sec-fetch-mode": "navigate",
"sec-fetch-dest": "document",
},
)
print(f" Home status: {response.status_code}")
self._print_cookie_summary("After home")
response = self.http.request(
"GET",
f"{self.base}/api/auth/csrf",
headers={
"Accept": "*/*",
"Referer": f"{self.base}/",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
},
)
data = self._json_or_none(response) or {}
csrf_token = data.get("csrfToken", "")
print(f" CSRF status: {response.status_code}")
print(f" CSRF token present: {bool(csrf_token)}")
self._print_cookie_summary("After csrf")
if not csrf_token:
raise RuntimeError(f"Failed to get csrf token: {self._response_excerpt(response)}")
return csrf_token
def _signin_auth0(self, email: str, csrf_token: str) -> str:
print("[3/9] Calling next-auth signin endpoint...")
callback_cookie = self._cookie_value("__Secure-next-auth.callback-url", "chatgpt.com")
callback_url = unquote(callback_cookie) if callback_cookie else f"{self.base}/"
auth_logging_id = str(uuid.uuid4())
params = {
"prompt": "login",
"ext-oai-did": self.http.device_id,
"auth_session_logging_id": auth_logging_id,
"ext-passkey-client-capabilities": "1111",
"screen_hint": "login_or_signup",
"login_hint": email,
}
response = self.http.request(
"POST",
f"{self.base}/api/auth/signin/openai?{urlencode(params)}",
data=urlencode({
"callbackUrl": callback_url,
"csrfToken": csrf_token,
"json": "true",
}),
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "*/*",
"Origin": self.base,
"Referer": f"{self.base}/",
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
},
)
data = self._json_or_none(response) or {}
auth_url = data.get("url", "")
print(f" Signin status: {response.status_code}")
print(f" Auth URL present: {bool(auth_url)}")
self._print_cookie_summary("After signin/auth0")
if not auth_url:
raise RuntimeError(f"Failed to get auth URL: {self._response_excerpt(response)}")
return auth_url
def _follow_auth_redirects(self, auth_url: str) -> str:
print("[4/9] Following auth.openai.com redirect chain...")
current_url = auth_url
referer = f"{self.base}/"
final_url = ""
for step in range(10):
response = self.http.request(
"GET",
current_url,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"Referer": referer,
},
)
print(f" Redirect step {step + 1}: {response.status_code} {current_url}")
self._print_cookie_summary(f"Cookies after auth step {step + 1}")
if response.status_code in (301, 302, 303, 307, 308):
location = response.headers.get("location", "")
if not location:
raise RuntimeError(f"Redirect missing location at {current_url}")
referer = current_url
current_url = urljoin(current_url, location)
continue
final_url = current_url
break
if not final_url:
raise RuntimeError("Failed to land on auth page")
print(f" Final auth page: {final_url}")
return final_url
def _accounts_api_base(self, auth_url: str) -> str:
path = urlparse(auth_url).path.rstrip("/")
if path.endswith("/authorize"):
return f"{self.auth_base}{path.rsplit('/', 1)[0]}"
return f"{self.auth_base}/api/accounts"
def _register_endpoint(self, auth_url: str) -> str:
return f"{self._accounts_api_base(auth_url)}/user/register"
def _attempt_register(self, register_url: str, auth_page_url: str, email: str, password: str, sentinel: str = ""):
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"Origin": self.auth_base,
"Referer": auth_page_url,
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "cors",
"sec-fetch-dest": "empty",
}
if sentinel:
headers["openai-sentinel-token"] = sentinel
response = self.http.request(
"POST",
register_url,
json={"username": email, "password": password},
headers=headers,
)
data = self._json_or_none(response)
print(f" Register status: {response.status_code} sentinel={bool(sentinel)}")
if data is not None:
print(f" Register json keys: {sorted(data.keys())}")
else:
print(f" Register body: {self._response_excerpt(response)}")
self._print_cookie_summary("After register attempt")
return response, data
def _open_continue_url(self, continue_url: str, referer: str, label: str) -> str:
print(f"{label} Opening continue URL...")
response = self.http.request(
"GET",
continue_url,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"Referer": referer,
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "navigate",
"sec-fetch-dest": "document",
},
)
print(f" Continue status: {response.status_code}")
page_url = continue_url
location = response.headers.get("location", "")
if location:
page_url = urljoin(continue_url, location)
if page_url != continue_url:
page_response = self.http.request(
"GET",
page_url,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"Referer": referer,
"sec-fetch-site": "same-origin",
"sec-fetch-mode": "navigate",
"sec-fetch-dest": "document",
},
)
print(f" Page status: {page_response.status_code}")
print(f" Page URL: {page_url}")
self._print_cookie_summary("After opening continue URL")
return page_url
def _validate_otp(self, validate_url: str, otp: str, referer: str) -> str:
print("[7/9] Validating OTP via pure HTTP...")
response = self.http.request(
"POST",
validate_url,
json={"code": otp},
headers={
"Content-Type": "application/json",
"Accept": "*/*",
"Origin": self.auth_base,
"Referer": referer,
},
)
data = self._json_or_none(response) or {}
print(f" Validate status: {response.status_code}")
if response.status_code != 200 or not data.get("continue_url"):
raise RuntimeError(f"OTP validate failed: {self._response_excerpt(response)}")
self._print_cookie_summary("After validate")
return data["continue_url"]
def _attempt_create_account(self, create_account_url: str, name: str, referer: str, sentinel: str = ""):
print("[8/9] Creating account profile via pure HTTP...")
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"Origin": self.auth_base,
"Referer": referer,
}
if sentinel:
headers["openai-sentinel-token"] = sentinel
response = self.http.request(
"POST",
create_account_url,
json={"name": name, "birthdate": "1998-03-18"},
headers=headers,
)
data = self._json_or_none(response)
print(f" Create-account status: {response.status_code} sentinel={bool(sentinel)}")
if data is not None:
print(f" Create-account json keys: {sorted(data.keys())}")
else:
print(f" Create-account body: {self._response_excerpt(response)}")
self._print_cookie_summary("After create_account")
return response, data
def _finalize_auth_callback(self, continue_url: str):
print("[9/9] Finalizing OAuth callback on chatgpt.com...")
current_url = continue_url
referer = f"{self.auth_base}/"
for step in range(5):
response = self.http.request(
"GET",
current_url,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"Referer": referer,
},
)
print(f" Callback step {step + 1}: {response.status_code} {current_url}")
if response.status_code not in (301, 302, 303, 307, 308):
break
location = response.headers.get("location", "")
if not location:
break
referer = current_url
current_url = urljoin(current_url, location)
self._print_cookie_summary("After callback")
def register(self, password: str, name: str) -> dict:
print("[1/9] Creating temporary mailbox...")
mailbox = self.mail.create_mailbox()
email = mailbox["address"]
print(f" Email: {email}")
self._delay()
csrf_token = self._bootstrap_chatgpt()
self._delay()
auth_url = self._signin_auth0(email, csrf_token)
self._delay()
auth_page_url = self._follow_auth_redirects(auth_url)
self._delay()
accounts_api_base = self._accounts_api_base(auth_url)
register_url = self._register_endpoint(auth_url)
print(f"[5/9] Pure HTTP register endpoint: {register_url}")
response, data = self._attempt_register(register_url, auth_page_url, email, password)
if response.status_code != 200 or not data or not data.get("continue_url"):
captured = self._load_captured_sentinel("username_password_create")
if not captured:
raise RuntimeError(
f"Register failed without sentinel and no captured sentinel found: {self._response_excerpt(response)}"
)
print("[6/9] Retrying register with captured sentinel token...")
response, data = self._attempt_register(register_url, auth_page_url, email, password, sentinel=captured)
if response.status_code != 200 or not data or not data.get("continue_url"):
raise RuntimeError(f"Register blocked: {self._response_excerpt(response)}")
continue_url1 = data["continue_url"]
print(f" Continue URL: {continue_url1}")
email_verification_url = self._open_continue_url(continue_url1, auth_page_url, "[6/9]")
print("[7/9] Waiting for OTP...")
otp = self.mail.wait_for_otp(mailbox, timeout=120)
print(f" OTP: {otp}")
continue_url2 = self._validate_otp(
f"{accounts_api_base}/email-otp/validate",
otp,
email_verification_url,
)
about_you_url = self._open_continue_url(continue_url2, email_verification_url, "[8/9]")
create_response, create_data = self._attempt_create_account(
f"{accounts_api_base}/create_account",
name,
about_you_url,
)
if create_response.status_code != 200 or not create_data or not create_data.get("continue_url"):
captured = self._load_captured_sentinel("oauth_create_account")
if captured:
print("[9/9] Retrying create_account with captured sentinel token...")
create_response, create_data = self._attempt_create_account(
f"{accounts_api_base}/create_account",
name,
about_you_url,
sentinel=captured,
)
if create_response.status_code != 200 or not create_data or not create_data.get("continue_url"):
raise RuntimeError(f"Create account blocked: {self._response_excerpt(create_response)}")
continue_url3 = create_data["continue_url"]
print(f" Final continue URL: {continue_url3}")
self._finalize_auth_callback(continue_url3)
session_response = self.http.request(
"GET",
f"{self.base}/api/auth/session",
headers={"Referer": f"{self.base}/"},
)
session_data = self._json_or_none(session_response) or {}
print(f" Session status: {session_response.status_code}")
return {
"email": email,
"mailbox_password": mailbox.get("password", ""),
"access_token": session_data.get("accessToken", ""),
"user_id": session_data.get("user", {}).get("id", ""),
"continue_url": continue_url3,
}

View File

@@ -0,0 +1,575 @@
"""Replay the captured Codex OAuth login flow over HTTP only."""
from __future__ import annotations
import base64
import json
import os
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse
from http_client import HTTPClient
try:
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
except Exception: # pragma: no cover - optional runtime dependency path
PlaywrightTimeoutError = Exception
sync_playwright = None
DEFAULT_AUTHORIZE_URL = (
"https://auth.openai.com/oauth/authorize?"
"response_type=code&client_id=app_EMoamEEZ73f0CkXaXp7hrann&"
"redirect_uri=http%3A%2F%2Flocalhost%3A1455%2Fauth%2Fcallback&"
"scope=openid+profile+email+offline_access&"
"code_challenge=DdqIRKaga8whKtumU863MbnpEA74P4YITGhM2l7VLuU&"
"code_challenge_method=S256&state=dfa46133d0bb07e60420e058c3a4e8ea&"
"id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=pi"
)
class FlowError(RuntimeError):
pass
class BrowserSentinelHelper:
chrome_executable = Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome")
def __init__(self, proxy: str) -> None:
self.proxy = proxy
self.playwright = None
self.browser = None
self.context = None
self.page = None
@classmethod
def is_available(cls) -> bool:
return sync_playwright is not None and cls.chrome_executable.exists()
def start(self) -> None:
if self.page is not None:
return
if sync_playwright is None:
raise FlowError("Playwright is not available in this environment")
launch_kwargs: dict[str, Any] = {
"headless": True,
"executable_path": str(self.chrome_executable),
}
proxy_config = self._playwright_proxy()
if proxy_config:
launch_kwargs["proxy"] = proxy_config
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(**launch_kwargs)
self.context = self.browser.new_context(ignore_https_errors=True)
self.page = self.context.new_page()
def close(self) -> None:
if self.context is not None:
self.context.close()
if self.browser is not None:
self.browser.close()
if self.playwright is not None:
self.playwright.stop()
self.page = None
self.context = None
self.browser = None
self.playwright = None
def open_page(self, url: str) -> str:
self.start()
self.page.goto(url, wait_until="domcontentloaded", timeout=60_000)
try:
self.page.wait_for_load_state("networkidle", timeout=15_000)
except PlaywrightTimeoutError:
pass
print(f"[browser] page -> {self.page.url}")
return self.page.url
def get_sentinel_token(self, flow: str) -> str:
self.start()
token = self.page.evaluate(
"""
async (flow) => {
async function loadScript(src) {
await new Promise((resolve, reject) => {
const script = document.createElement("script");
script.src = src;
script.async = true;
script.defer = true;
script.onload = () => resolve();
script.onerror = () => reject(new Error(`Failed to load ${src}`));
document.head.appendChild(script);
});
}
if (!window.SentinelSDK) {
try {
await loadScript("https://sentinel.openai.com/backend-api/sentinel/sdk.js");
} catch (error) {
await loadScript("https://chatgpt.com/backend-api/sentinel/sdk.js");
}
}
try {
if (window.SentinelSDK && typeof window.SentinelSDK.init === "function") {
await window.SentinelSDK.init(flow);
}
} catch (error) {
// Match the web app: init failures are tolerated.
}
if (!window.SentinelSDK || typeof window.SentinelSDK.token !== "function") {
return JSON.stringify({ e: "q2n8w7x5z1" });
}
try {
return await window.SentinelSDK.token(flow);
} catch (error) {
return JSON.stringify({ e: "k9d4s6v3b2" });
}
}
""",
flow,
)
if not isinstance(token, str) or not token:
raise FlowError(f"Sentinel token generation failed for flow {flow!r}: {token!r}")
print(f"[browser] sentinel token ready for {flow}")
return token
def cookies(self) -> list[dict[str, Any]]:
self.start()
return self.context.cookies()
def set_cookies(self, cookies: list[dict[str, Any]]) -> None:
self.start()
if cookies:
self.context.add_cookies(cookies)
def _playwright_proxy(self) -> dict[str, str] | None:
if not self.proxy:
return None
parsed = urlparse(self.proxy)
if not parsed.hostname or not parsed.port:
return {"server": self.proxy}
scheme = parsed.scheme
if scheme in {"socks5", "socks5h"}:
# Match HTTPClient's fallback: some proxy providers expose an HTTP proxy
# behind a socks5-looking URL, and Chromium cannot do authenticated socks5 here.
scheme = "http"
proxy: dict[str, str] = {"server": f"{scheme}://{parsed.hostname}:{parsed.port}"}
if parsed.username:
proxy["username"] = parsed.username
if parsed.password:
proxy["password"] = parsed.password
return proxy
def load_proxy() -> str:
proxy = os.getenv("SOCKS5_PROXY", "").strip()
if proxy:
return proxy
env_path = Path(__file__).resolve().parent.parent / ".env"
if not env_path.exists():
return ""
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
if key.strip() != "SOCKS5_PROXY":
continue
return value.strip().strip("\"'")
return ""
class CodexOAuthHTTPFlow:
def __init__(
self,
authorize_url: str,
email: str,
password: str,
otp: str = "",
workspace_id: str = "",
use_browser: bool = False,
) -> None:
self.authorize_url = authorize_url
self.email = email
self.password = password
self.otp = otp
self.workspace_id = workspace_id
self.auth_base = "https://auth.openai.com"
self.proxy = load_proxy()
self.http = HTTPClient(self.proxy)
self.browser = (
BrowserSentinelHelper(self.proxy)
if use_browser and BrowserSentinelHelper.is_available()
else None
)
def close(self) -> None:
self.http.close()
if self.browser is not None:
self.browser.close()
def run(self) -> str:
login_page_url = self._bootstrap_login_page()
self._sync_browser_to_http()
response = self._api_json(
"POST",
f"{self.auth_base}/api/accounts/authorize/continue",
referer=login_page_url,
payload={"username": {"kind": "email", "value": self.email}},
sentinel_flow="authorize_continue",
)
current_page = login_page_url
while True:
continue_url = self._absolute_continue_url(response)
page = response.get("page") or {}
page_type = page.get("type")
print(f"[flow] page_type={page_type!r}")
if continue_url and "/api/oauth/oauth2/auth" in continue_url:
return self._finish_oauth_chain(continue_url, current_page)
if page_type == "login_password":
current_page = self._open_page(continue_url, current_page, browser_sync=True)
response = self._api_json(
"POST",
f"{self.auth_base}/api/accounts/password/verify",
referer=current_page,
payload={"password": self.password},
sentinel_flow="password_verify",
)
continue
if page_type in {"contact_verification", "email_otp_send"}:
response = self._send_email_otp(current_page, continue_url)
continue
if page_type == "email_otp_verification":
current_page = continue_url or f"{self.auth_base}/email-verification"
response = self._validate_email_otp(current_page)
continue
if page_type == "sign_in_with_chatgpt_codex_consent":
current_page = continue_url or f"{self.auth_base}/sign-in-with-chatgpt/codex/consent"
workspace_id = self._pick_workspace_id()
response = self._api_json(
"POST",
f"{self.auth_base}/api/accounts/workspace/select",
referer=current_page,
payload={"workspace_id": workspace_id},
)
continue
raise FlowError(
f"Unsupported page type: {page_type!r}, continue_url={continue_url!r}, "
f"response={json.dumps(response, ensure_ascii=False)}"
)
def _bootstrap_login_page(self) -> str:
if self.browser is not None:
return self.browser.open_page(self.authorize_url)
current_url = self.authorize_url
referer = ""
for step in range(1, 10):
response = self._page_request(current_url, referer=referer)
if response.status_code in {301, 302, 303, 307, 308}:
location = response.headers.get("location")
if not location:
raise FlowError(f"Redirect from {current_url} missing Location header")
next_url = urljoin(current_url, location)
print(f"[bootstrap:{step}] {response.status_code} {current_url} -> {next_url}")
referer = current_url
current_url = next_url
continue
print(f"[bootstrap:{step}] {response.status_code} {current_url}")
return current_url
raise FlowError("Authorize bootstrap exceeded redirect limit")
def _send_email_otp(self, referer: str, continue_url: str) -> dict[str, Any]:
send_url = continue_url or f"{self.auth_base}/api/accounts/email-otp/send"
print(f"[otp] sending email code via {send_url}")
return self._api_json("GET", send_url, referer=referer)
def _validate_email_otp(self, referer: str) -> dict[str, Any]:
validate_url = f"{self.auth_base}/api/accounts/email-otp/validate"
resend_url = f"{self.auth_base}/api/accounts/email-otp/resend"
while True:
code = (self.otp or input("Email OTP (or type 'resend'): ").strip()).strip()
self.otp = ""
if not code:
continue
if code.lower() == "resend":
resend_response = self.http.request(
"POST",
resend_url,
headers={
"Accept": "application/json",
"Origin": self.auth_base,
"Referer": referer,
},
)
print(f"[otp] resend -> {resend_response.status_code}")
if resend_response.status_code == 429:
raise FlowError("Email OTP resend rate-limited")
if resend_response.status_code not in {200, 204}:
raise FlowError(
f"Resend OTP failed: {resend_response.status_code} "
f"{self._response_excerpt(resend_response)}"
)
continue
response = self.http.request(
"POST",
validate_url,
json={"code": code},
headers={
"Accept": "application/json",
"Content-Type": "application/json",
"Origin": self.auth_base,
"Referer": referer,
},
)
print(f"[otp] validate -> {response.status_code}")
if response.status_code == 401:
print("[otp] incorrect code")
continue
if response.status_code == 429:
raise FlowError("Email OTP validation rate-limited")
if response.status_code not in {200, 201}:
raise FlowError(
f"Validate OTP failed: {response.status_code} {self._response_excerpt(response)}"
)
return self._json_from_response(response)
def _pick_workspace_id(self) -> str:
session_data = self._decode_cookie_json("oai-client-auth-session")
workspaces = session_data.get("workspaces") or []
if not workspaces:
raise FlowError(f"No workspaces found in oai-client-auth-session: {session_data}")
if self.workspace_id:
for workspace in workspaces:
if workspace.get("id") == self.workspace_id:
print(f"[workspace] using explicit workspace_id={self.workspace_id}")
return self.workspace_id
raise FlowError(f"workspace_id {self.workspace_id!r} not found in {workspaces}")
if len(workspaces) == 1:
workspace_id = workspaces[0]["id"]
print(f"[workspace] auto-selected {workspace_id}")
return workspace_id
print("[workspace] available workspaces:")
for index, workspace in enumerate(workspaces, start=1):
label = workspace.get("name") or workspace.get("profile_picture_alt_text") or workspace["id"]
print(f" {index}. {label} ({workspace['id']}) [{workspace.get('kind', 'unknown')}]")
while True:
raw = input("Choose workspace number: ").strip()
if not raw.isdigit():
continue
selected = int(raw)
if 1 <= selected <= len(workspaces):
workspace_id = workspaces[selected - 1]["id"]
print(f"[workspace] selected {workspace_id}")
return workspace_id
def _finish_oauth_chain(self, url: str, referer: str) -> str:
current_url = url
current_referer = referer
for step in range(1, 10):
response = self._page_request(current_url, referer=current_referer)
location = response.headers.get("location")
print(
f"[oauth:{step}] {response.status_code} {current_url}"
+ (f" -> {location}" if location else "")
)
if response.status_code not in {301, 302, 303, 307, 308}:
raise FlowError(
f"Expected redirect during oauth finish, got {response.status_code}: "
f"{self._response_excerpt(response)}"
)
if not location:
raise FlowError(f"Redirect from {current_url} missing Location header")
next_url = urljoin(current_url, location)
if next_url.startswith("http://localhost:"):
return next_url
current_referer = current_url
current_url = next_url
raise FlowError("OAuth finish exceeded redirect limit")
def _api_json(
self,
method: str,
url: str,
referer: str,
payload: dict[str, Any] | None = None,
sentinel_flow: str = "",
) -> dict[str, Any]:
headers = {
"Accept": "application/json",
"Origin": self.auth_base,
"Referer": referer,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
}
if payload is not None:
headers["Content-Type"] = "application/json"
if sentinel_flow:
headers["OpenAI-Sentinel-Token"] = self._sentinel_token(sentinel_flow)
request_kwargs: dict[str, Any] = {"headers": headers}
if payload is not None:
request_kwargs["json"] = payload
response = self.http.request(method, url, **request_kwargs)
print(f"[api] {method} {url} -> {response.status_code}")
if response.status_code not in {200, 201}:
raise FlowError(
f"API request failed: {method} {url} -> {response.status_code} "
f"{self._response_excerpt(response)}"
)
return self._json_from_response(response)
def _page_request(self, url: str, referer: str = ""):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
}
if referer:
headers["Referer"] = referer
return self.http.request("GET", url, headers=headers)
def _open_page(self, url: str, referer: str, browser_sync: bool = False) -> str:
if not url:
raise FlowError("Missing continue_url for page navigation")
if browser_sync and self.browser is not None:
self._sync_http_to_browser()
page_url = self.browser.open_page(url)
self._sync_browser_to_http()
return page_url
response = self._page_request(url, referer=referer)
print(f"[page] GET {url} -> {response.status_code}")
if response.status_code in {301, 302, 303, 307, 308}:
location = response.headers.get("location")
if not location:
raise FlowError(f"Page redirect from {url} missing Location header")
return urljoin(url, location)
if response.status_code != 200:
raise FlowError(f"Page load failed: {url} -> {response.status_code}")
return url
def _absolute_continue_url(self, payload: dict[str, Any]) -> str:
continue_url = payload.get("continue_url") or ""
if not continue_url:
return ""
if urlparse(continue_url).scheme:
return continue_url
return urljoin(self.auth_base, continue_url)
def _decode_cookie_json(self, name: str) -> dict[str, Any]:
raw_value = self._cookie_value(name)
if not raw_value:
return {}
candidates = raw_value.split(".")
for part in candidates[:2]:
padded = part + "=" * (-len(part) % 4)
try:
decoded = base64.urlsafe_b64decode(padded.encode()).decode()
parsed = json.loads(decoded)
except Exception:
continue
if isinstance(parsed, dict):
return parsed
return {}
def _cookie_value(self, name: str) -> str:
for cookie in self.http.session.cookies.jar:
if cookie.name == name:
return cookie.value
return ""
def _sentinel_token(self, flow: str) -> str:
if self.browser is None:
# Fallback: use error token that browser SDK returns when unavailable
print(f"[sentinel] using fallback error token for {flow}")
return '{"e":"q2n8w7x5z1"}'
self._sync_http_to_browser()
return self.browser.get_sentinel_token(flow)
def _sync_browser_to_http(self) -> None:
if self.browser is None:
return
for cookie in self.browser.cookies():
self.http.session.cookies.set(
cookie["name"],
cookie["value"],
domain=cookie.get("domain", ""),
path=cookie.get("path", "/"),
)
def _sync_http_to_browser(self) -> None:
if self.browser is None:
return
cookies: list[dict[str, Any]] = []
for cookie in self.http.session.cookies.jar:
domain = cookie.domain or ""
if not domain:
continue
if "openai.com" not in domain and "chatgpt.com" not in domain:
continue
item: dict[str, Any] = {
"name": cookie.name,
"value": cookie.value,
"domain": domain,
"path": cookie.path or "/",
"secure": bool(cookie.secure),
}
if cookie.expires:
item["expires"] = float(cookie.expires)
cookies.append(item)
self.browser.set_cookies(cookies)
@staticmethod
def _json_from_response(response) -> dict[str, Any]:
content_type = response.headers.get("content-type", "")
if "application/json" not in content_type:
raise FlowError(
f"Expected JSON response, got {content_type}: {CodexOAuthHTTPFlow._response_excerpt(response)}"
)
data = response.json()
if not isinstance(data, dict):
raise FlowError(f"Expected JSON object, got: {data!r}")
return data
@staticmethod
def _response_excerpt(response, limit: int = 500) -> str:
text = response.text or ""
text = text.replace("\r", " ").replace("\n", " ")
return text[:limit]

28
src/config.py Normal file
View File

@@ -0,0 +1,28 @@
from pydantic_settings import BaseSettings
from pydantic import Field
class Settings(BaseSettings):
vmail_api_key: str = Field(default="", env="VMAIL_API_KEY")
yescaptcha_api_key: str = Field(default="", env="YESCAPTCHA_API_KEY")
socks5_proxy: str = Field(default="", env="SOCKS5_PROXY")
# Payment info
card_number: str = Field(default="", env="CARD_NUMBER")
card_exp_month: str = Field(default="", env="CARD_EXP_MONTH")
card_exp_year: str = Field(default="", env="CARD_EXP_YEAR")
card_cvc: str = Field(default="", env="CARD_CVC")
billing_name: str = Field(default="", env="BILLING_NAME")
billing_email: str = Field(default="", env="BILLING_EMAIL")
billing_address_line1: str = Field(default="", env="BILLING_ADDRESS_LINE1")
billing_address_city: str = Field(default="", env="BILLING_ADDRESS_CITY")
billing_address_state: str = Field(default="", env="BILLING_ADDRESS_STATE")
billing_address_postal_code: str = Field(default="", env="BILLING_ADDRESS_POSTAL_CODE")
country: str = Field(default="US", env="COUNTRY")
currency: str = Field(default="usd", env="CURRENCY")
class Config:
env_file = ".env"
settings = Settings()

69
src/http_client.py Normal file
View File

@@ -0,0 +1,69 @@
import uuid
from curl_cffi import requests as curl_requests
class HTTPClient:
def __init__(self, proxy: str = ""):
self.proxy_candidates = self._proxy_candidates(proxy)
self.proxy = self.proxy_candidates[0] if self.proxy_candidates else None
self.session = curl_requests.Session(impersonate="chrome")
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"sec-ch-ua": '"Not A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
})
self.device_id = str(uuid.uuid4())
@staticmethod
def _proxy_candidates(proxy: str) -> list[str]:
if not proxy:
return []
candidates = []
normalized = proxy
if normalized.startswith("socks5://"):
# curl_cffi prefers socks5h when the proxy really is SOCKS.
candidates.append("socks5h://" + normalized[len("socks5://"):])
# Some providers label HTTP proxies as socks5:// in dashboards.
candidates.append("http://" + normalized[len("socks5://"):])
elif normalized.startswith("socks5h://"):
candidates.append(normalized)
candidates.append("http://" + normalized[len("socks5h://"):])
else:
candidates.append(normalized)
seen = set()
ordered = []
for item in candidates:
if item not in seen:
ordered.append(item)
seen.add(item)
return ordered
def request(self, method: str, url: str, **kwargs):
kwargs.setdefault("timeout", 30)
kwargs.setdefault("allow_redirects", False)
# This environment/proxy setup does not expose a usable CA bundle to curl_cffi.
kwargs.setdefault("verify", False)
if not self.proxy_candidates:
return self.session.request(method, url, **kwargs)
last_exc = None
for index, proxy in enumerate(self.proxy_candidates):
request_kwargs = dict(kwargs)
request_kwargs.setdefault("proxy", proxy)
try:
if proxy != self.proxy:
print(f" Retrying HTTP request with proxy scheme fallback: {proxy.split('://', 1)[0]}")
response = self.session.request(method, url, **request_kwargs)
self.proxy = proxy
return response
except Exception as exc:
last_exc = exc
if index == len(self.proxy_candidates) - 1:
raise
raise last_exc
def close(self):
self.session.close()

173
src/main.py Normal file
View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""Entry point: ChatGPT Plus auto-registration + subscription, or Codex OAuth login."""
import argparse
import random
import string
import sys
from config import settings
from vmail_client import MailClient
from captcha_solver import CaptchaSolver
from http_client import HTTPClient
from chatgpt_register_http_reverse import ChatGPTRegisterHTTPReverse
from chatgpt_payment import ChatGPTPayment
from codex_oauth_http_flow import CodexOAuthHTTPFlow, DEFAULT_AUTHORIZE_URL
def generate_password(length=16):
chars = string.ascii_letters + string.digits + "!@#$%"
while True:
pwd = "".join(random.choice(chars) for _ in range(length))
if (any(c.islower() for c in pwd) and any(c.isupper() for c in pwd)
and any(c.isdigit() for c in pwd) and any(c in "!@#$%" for c in pwd)):
return pwd
def generate_name():
first = random.choice(["John", "Jane", "Mike", "Sarah", "David", "Emma", "Chris", "Lisa"])
last = random.choice(["Smith", "Johnson", "Brown", "Davis", "Wilson", "Moore", "Taylor", "Lee"])
return f"{first} {last}"
def cmd_register(args):
"""Register a new ChatGPT account and optionally subscribe to Plus."""
password = generate_password()
name = generate_name()
print("Generated credentials:")
print(f" Password: {password}")
print(f" Name: {name}")
mail = MailClient()
http = HTTPClient(proxy=settings.socks5_proxy)
try:
print("\n=== ChatGPT Registration ===")
register = ChatGPTRegisterHTTPReverse(http, mail)
session = register.register(password, name)
print("\n Registration complete!")
print(f" Email: {session['email']}")
print(f" Mailbox password: {session['mailbox_password']}")
print(f" ChatGPT password: {password}")
if session["access_token"]:
print(f" Access Token: {session['access_token'][:50]}...")
if settings.card_number and session.get("access_token"):
print("\n=== ChatGPT Plus Subscription ===")
captcha = CaptchaSolver(settings.yescaptcha_api_key)
payment = ChatGPTPayment(http, captcha)
success = payment.subscribe_plus(
access_token=session["access_token"],
country=settings.country,
currency=settings.currency,
card_info={
"number": settings.card_number,
"exp_month": settings.card_exp_month,
"exp_year": settings.card_exp_year,
"cvc": settings.card_cvc,
"billing_name": settings.billing_name or name,
"billing_email": settings.billing_email or session["email"],
"billing_address_line1": settings.billing_address_line1,
"billing_address_city": settings.billing_address_city,
"billing_address_state": settings.billing_address_state,
"billing_address_postal_code": settings.billing_address_postal_code,
},
)
if success:
print("\n Plus subscription complete!")
print(f" Email: {session['email']}")
print(f" Mailbox password: {session['mailbox_password']}")
print(f" ChatGPT password: {password}")
else:
print("\n No card info configured, skipping Plus subscription")
except Exception as e:
print(f"\n Error: {e}")
import traceback
traceback.print_exc()
return 1
finally:
http.close()
return 0
def cmd_codex_login(args):
"""Run Codex OAuth login and print the callback URL with authorization code."""
email = args.email
password = args.password
if not email:
email = input("Email: ").strip()
if not password:
import getpass
password = getpass.getpass("Password: ")
authorize_url = args.authorize_url or DEFAULT_AUTHORIZE_URL
print(f"Starting Codex OAuth flow for {email}")
flow = CodexOAuthHTTPFlow(
authorize_url=authorize_url,
email=email,
password=password,
otp=args.otp or "",
workspace_id=args.workspace_id or "",
)
try:
callback_url = flow.run()
print("\n" + "=" * 60)
print("[SUCCESS] callback_url:")
print(callback_url)
print("=" * 60)
return 0
except Exception as e:
print(f"\n[ERROR] {e}")
import traceback
traceback.print_exc()
return 1
finally:
flow.close()
def build_parser():
parser = argparse.ArgumentParser(
description="ChatGPT Plus auto-registration + subscription, or Codex OAuth login",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Commands:
register Auto-register a new ChatGPT account and subscribe to Plus
codex-login Run Codex CLI OAuth login and obtain the callback URL
Examples:
python src/main.py register
python src/main.py codex-login --email user@example.com --password mypass
"""
)
sub = parser.add_subparsers(dest="command")
# register sub-command
sub.add_parser("register", help="Register new ChatGPT account + optional Plus subscription")
# codex-login sub-command
p_codex = sub.add_parser("codex-login", help="Codex CLI OAuth login (HTTP-only, no browser)")
p_codex.add_argument("--email", default="", help="OpenAI account email")
p_codex.add_argument("--password", default="", help="OpenAI account password")
p_codex.add_argument("--otp", default="", help="Email OTP code (if already known)")
p_codex.add_argument("--workspace-id", dest="workspace_id", default="", help="Workspace ID to select")
p_codex.add_argument("--authorize-url", dest="authorize_url", default="", help="Custom authorize URL")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.command == "register":
return cmd_register(args)
elif args.command == "codex-login":
return cmd_codex_login(args)
else:
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())

91
src/vmail_client.py Normal file
View File

@@ -0,0 +1,91 @@
import re
import secrets
import time
import httpx
MAILTM_BASE = "https://api.mail.tm"
class MailClient:
"""mail.tm temporary email client."""
def create_mailbox(self) -> dict:
"""Create a mailbox. Returns dict with address, password, token."""
domains_resp = httpx.get(f"{MAILTM_BASE}/domains", timeout=30)
domains_resp.raise_for_status()
domains = domains_resp.json().get("hydra:member", [])
active = next((d for d in domains if d.get("isActive")), None)
if not active:
raise RuntimeError("mail.tm returned no active domains")
for _ in range(10):
address = f"{secrets.token_hex(6)}@{active['domain']}"
password = secrets.token_urlsafe(18)
r = httpx.post(
f"{MAILTM_BASE}/accounts",
json={"address": address, "password": password},
timeout=30,
)
if r.status_code == 201:
token_resp = httpx.post(
f"{MAILTM_BASE}/token",
json={"address": address, "password": password},
timeout=30,
)
token_resp.raise_for_status()
return {
"id": r.json()["id"],
"address": address,
"password": password,
"token": token_resp.json()["token"],
}
if r.status_code == 422:
time.sleep(0.3)
continue
r.raise_for_status()
raise RuntimeError("Failed to create mail.tm mailbox after 10 attempts")
def wait_for_otp(self, mailbox: dict, timeout: int = 120, poll: float = 5.0) -> str:
"""Poll mailbox until a 6-digit OTP arrives."""
headers = {"Authorization": f"Bearer {mailbox['token']}"}
deadline = time.time() + timeout
while time.time() < deadline:
try:
r = httpx.get(
f"{MAILTM_BASE}/messages",
headers=headers,
timeout=30,
)
r.raise_for_status()
messages = r.json().get("hydra:member", [])
except httpx.HTTPError as exc:
print(f"mail poll error: {exc.__class__.__name__}")
time.sleep(poll)
continue
if messages:
try:
msg_r = httpx.get(
f"{MAILTM_BASE}/messages/{messages[0]['id']}",
headers=headers,
timeout=30,
)
msg_r.raise_for_status()
msg = msg_r.json()
except httpx.HTTPError as exc:
print(f"mail fetch error: {exc.__class__.__name__}")
time.sleep(poll)
continue
html = msg.get("html") or ""
if isinstance(html, list):
html = "\n".join(html)
body = msg.get("text") or html or ""
codes = re.findall(r"\b(\d{6})\b", body)
if codes:
return codes[0]
time.sleep(poll)
raise TimeoutError(f"No OTP received within {timeout}s")