Files
gptplus_machine/src/vmail_client.py
Logic 15cba50314 feat: add register-only and checkout commands, fix payment flow, switch to yyds mail
- add 'register' command: register account and print credentials only
- add 'checkout' command: register account and print Plus hosted checkout URL
- fix hCaptcha rqdata/isInvisible for Stripe setup intent verification
- fix verify_challenge flow: call chatgpt.com/checkout/verify after challenge
- fix currency uppercase for checkout API
- add get_checkout_url method to ChatGPTPayment
- switch default mail provider to yyds
- update README with new commands and usage
2026-03-21 14:38:28 +08:00

360 lines
13 KiB
Python

import re
import secrets
import time
from abc import ABC, abstractmethod
import httpx
# vmail.dev config
VMAIL_BASE = "https://vmail.dev/api/v1"
VMAIL_API_KEY = "vmail_UCjaCMZghzKaH5KrRlKXyxkrCO7ZNesV"
# mail.tm config
MAILTM_BASE = "https://api.mail.tm"
# yyds mail config
YYDS_BASE = "https://maliapi.215.im"
YYDS_API_KEY = "AC-4a23c58b8f84c19a27ef509e"
class BaseMailClient(ABC):
"""Abstract base class for temporary email clients."""
@abstractmethod
def create_mailbox(self) -> dict:
"""Create a mailbox. Returns dict with id, address, password, token."""
pass
@abstractmethod
def wait_for_otp(self, mailbox: dict, timeout: int = 120, poll: float = 5.0) -> str:
"""Poll mailbox until a 6-digit OTP arrives."""
pass
def get_token_for_address(self, address: str, password: str) -> str:
"""Get auth token for an existing mailbox (for mail.tm only)."""
return ""
class VMailClient(BaseMailClient):
"""vmail.dev temporary email client."""
def __init__(self):
self.api_key = VMAIL_API_KEY
self.headers = {"X-API-Key": self.api_key}
self.base = VMAIL_BASE
def create_mailbox(self) -> dict:
"""Create a mailbox. Returns dict with id, address, password, token."""
for attempt in range(10):
local_part = secrets.token_hex(4)
r = httpx.post(
f"{self.base}/mailboxes",
headers=self.headers,
json={"localPart": local_part},
timeout=30,
)
if r.status_code == 201:
data = r.json()["data"]
return {
"id": data["id"],
"address": data["address"],
"password": data.get("password", ""),
"token": self.api_key,
}
if r.status_code == 422:
time.sleep(0.3)
continue
r.raise_for_status()
raise RuntimeError("Failed to create vmail.dev mailbox after 10 attempts")
def wait_for_otp(self, mailbox: dict, timeout: int = 120, poll: float = 5.0) -> str:
"""Poll mailbox until a 6-digit OTP arrives."""
mailbox_id = mailbox["id"]
if not mailbox_id or mailbox_id == "existing":
# Try to find the mailbox ID by creating a new one with same local part
mailbox_id = self._find_or_create_mailbox(mailbox["address"])
if not mailbox_id:
raise TimeoutError(f"Could not find mailbox ID for {mailbox['address']}")
deadline = time.time() + timeout
while time.time() < deadline:
try:
r = httpx.get(
f"{self.base}/mailboxes/{mailbox_id}/messages",
headers=self.headers,
timeout=30,
)
r.raise_for_status()
messages = r.json().get("data", [])
except httpx.HTTPError as exc:
print(f"mail poll error: {exc.__class__.__name__}")
time.sleep(poll)
continue
if messages:
try:
msg_r = httpx.get(
f"{self.base}/mailboxes/{mailbox_id}/messages/{messages[0]['id']}",
headers=self.headers,
timeout=30,
)
msg_r.raise_for_status()
msg = msg_r.json()["data"]
except httpx.HTTPError as exc:
print(f"mail fetch error: {exc.__class__.__name__}")
time.sleep(poll)
continue
body = msg.get("text") or msg.get("html") or ""
if isinstance(body, list):
body = "\n".join(body)
codes = re.findall(r"\b(\d{6})\b", body)
if codes:
return codes[0]
time.sleep(poll)
raise TimeoutError(f"No OTP received within {timeout}s")
def _find_or_create_mailbox(self, address: str) -> str | None:
"""Try to find existing mailbox or create new one for OTP fetching."""
local_part = address.split("@")[0]
# Try creating with the exact local part - if it exists, we get 409
r = httpx.post(
f"{self.base}/mailboxes",
headers=self.headers,
json={"localPart": local_part},
timeout=30,
)
if r.status_code == 201:
return r.json()["data"]["id"]
if r.status_code == 409:
# Mailbox exists with this API key but we don't know the ID
# Create a new temporary mailbox and use that for OTP
temp_local = f"{local_part}-{secrets.token_hex(4)}"
r2 = httpx.post(
f"{self.base}/mailboxes",
headers=self.headers,
json={"localPart": temp_local},
timeout=30,
)
if r2.status_code == 201:
return r2.json()["data"]["id"]
return None
class MailTMClient(BaseMailClient):
"""mail.tm temporary email client."""
def __init__(self):
self.base = MAILTM_BASE
self._token = None
def create_mailbox(self) -> dict:
"""Create a mailbox. Returns dict with id, address, password, token."""
domains_resp = httpx.get(f"{self.base}/domains", timeout=30)
domains_resp.raise_for_status()
domains = domains_resp.json().get("hydra:member", [])
active = next((d for d in domains if d.get("isActive")), None)
if not active:
raise RuntimeError("mail.tm returned no active domains")
for _ in range(10):
address = f"{secrets.token_hex(6)}@{active['domain']}"
password = secrets.token_urlsafe(18)
r = httpx.post(
f"{self.base}/accounts",
json={"address": address, "password": password},
timeout=30,
)
if r.status_code == 201:
token_resp = httpx.post(
f"{self.base}/token",
json={"address": address, "password": password},
timeout=30,
)
token_resp.raise_for_status()
return {
"id": r.json()["id"],
"address": address,
"password": password,
"token": token_resp.json()["token"],
}
if r.status_code == 422:
time.sleep(0.3)
continue
r.raise_for_status()
raise RuntimeError("Failed to create mail.tm mailbox after 10 attempts")
def get_token_for_address(self, address: str, password: str) -> str:
"""Get auth token for an existing mailbox (for mail.tm only)."""
r = httpx.post(
f"{self.base}/token",
json={"address": address, "password": password},
timeout=30,
)
if r.status_code == 200:
return r.json()["token"]
return ""
def wait_for_otp(self, mailbox: dict, timeout: int = 120, poll: float = 5.0) -> str:
"""Poll mailbox until a 6-digit OTP arrives."""
token = mailbox.get("token", "")
if not token:
raise ValueError("mail.tm requires token for wait_for_otp")
headers = {"Authorization": f"Bearer {token}"}
deadline = time.time() + timeout
while time.time() < deadline:
try:
r = httpx.get(
f"{self.base}/messages",
headers=headers,
timeout=30,
)
r.raise_for_status()
messages = r.json().get("hydra:member", [])
except httpx.HTTPError as exc:
print(f"mail poll error: {exc.__class__.__name__}")
time.sleep(poll)
continue
if messages:
try:
msg_r = httpx.get(
f"{self.base}/messages/{messages[0]['id']}",
headers=headers,
timeout=30,
)
msg_r.raise_for_status()
msg = msg_r.json()
except httpx.HTTPError as exc:
print(f"mail fetch error: {exc.__class__.__name__}")
time.sleep(poll)
continue
html = msg.get("html") or ""
if isinstance(html, list):
html = "\n".join(html)
body = msg.get("text") or html or ""
codes = re.findall(r"\b(\d{6})\b", body)
if codes:
return codes[0]
time.sleep(poll)
raise TimeoutError(f"No OTP received within {timeout}s")
class YYDSMailClient(BaseMailClient):
"""YYDS Mail temporary email client (maliapi.215.im)."""
def __init__(self):
self.api_key = YYDS_API_KEY
self.headers = {"X-API-Key": self.api_key}
self.base = YYDS_BASE
def create_mailbox(self) -> dict:
"""Create a mailbox. Returns dict with id, address, token."""
for attempt in range(10):
r = httpx.post(
f"{self.base}/v1/accounts",
headers=self.headers,
json={},
timeout=30,
)
if r.status_code == 201 or r.status_code == 200:
data = r.json()["data"]
return {
"id": data["id"],
"address": data["address"],
"password": "",
"token": data["token"],
}
if r.status_code == 429:
time.sleep(1)
continue
r.raise_for_status()
raise RuntimeError("Failed to create YYDS mailbox after 10 attempts")
def get_token_for_address(self, address: str, password: str = "") -> str:
"""Get auth token for an existing mailbox via POST /v1/token."""
r = httpx.post(
f"{self.base}/v1/token",
headers=self.headers,
json={"address": address},
timeout=30,
)
if r.status_code == 200:
return r.json()["data"]["token"]
return ""
def wait_for_otp(self, mailbox: dict, timeout: int = 120, poll: float = 5.0) -> str:
"""Poll mailbox until a 6-digit OTP arrives."""
address = mailbox.get("address", "")
token = mailbox.get("token", "")
# If no token, try to get one from address
if not token and address:
token = self.get_token_for_address(address)
if not token:
raise TimeoutError(f"Could not get YYDS token for {address}")
headers = {"Authorization": f"Bearer {token}"}
deadline = time.time() + timeout
while time.time() < deadline:
try:
r = httpx.get(
f"{self.base}/v1/messages",
headers=headers,
params={"address": address},
timeout=30,
)
r.raise_for_status()
result = r.json().get("data", {})
messages = result.get("messages", [])
except httpx.HTTPError as exc:
print(f"mail poll error: {exc.__class__.__name__}")
time.sleep(poll)
continue
if messages:
try:
msg_r = httpx.get(
f"{self.base}/v1/messages/{messages[0]['id']}",
headers=headers,
timeout=30,
)
msg_r.raise_for_status()
msg = msg_r.json().get("data", {})
except httpx.HTTPError as exc:
print(f"mail fetch error: {exc.__class__.__name__}")
time.sleep(poll)
continue
text = msg.get("text", "") or ""
html_list = msg.get("html", [])
if isinstance(html_list, list):
html = "\n".join(html_list)
else:
html = html_list
body = text or html or ""
codes = re.findall(r"\b(\d{6})\b", body)
if codes:
return codes[0]
time.sleep(poll)
raise TimeoutError(f"No OTP received within {timeout}s")
def get_mail_client(provider: str = "vmail") -> BaseMailClient:
"""Get mail client by provider name."""
if provider == "mailtm":
return MailTMClient()
if provider == "yyds":
return YYDSMailClient()
return VMailClient() # default to vmail
# Default instance
MailClient = get_mail_client("vmail")