147 lines
5.8 KiB
Python
147 lines
5.8 KiB
Python
import json
|
|
import unittest
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
|
|
class FakeResponse:
|
|
def __init__(self, *, text="", status_code=200, json_data=None, headers=None):
|
|
self.text = text
|
|
self.status_code = status_code
|
|
self._json_data = json_data
|
|
self.headers = headers or {}
|
|
|
|
def json(self):
|
|
if self._json_data is None:
|
|
raise ValueError("no json")
|
|
return self._json_data
|
|
|
|
|
|
class FakeHTTP:
|
|
def __init__(self):
|
|
self.calls = []
|
|
self.device_id = "device-fallback-id"
|
|
self.session = SimpleNamespace(
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 Test",
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
|
},
|
|
cookies=SimpleNamespace(
|
|
jar=[SimpleNamespace(name="oai-did", value="cookie-did", domain="chatgpt.com")]
|
|
),
|
|
)
|
|
self.req_payloads = []
|
|
|
|
def request(self, method, url, **kwargs):
|
|
self.calls.append((method, url, kwargs))
|
|
if url.endswith("/backend-api/sentinel/sdk.js"):
|
|
return FakeResponse(
|
|
text=(
|
|
"window.SentinelSDK = {};"
|
|
"script.src = 'https://sentinel.openai.com/sentinel/20260219f9f6/sdk.js';"
|
|
)
|
|
)
|
|
if "/backend-api/sentinel/frame.html?sv=" in url:
|
|
return FakeResponse(text="<html></html>")
|
|
if url.endswith("/backend-api/sentinel/req"):
|
|
data = kwargs.get("data")
|
|
if data:
|
|
payload = json.loads(data)
|
|
else:
|
|
payload = {}
|
|
self.req_payloads.append(payload)
|
|
return FakeResponse(
|
|
json_data={
|
|
"token": "collector-token",
|
|
"proofofwork": {"required": True, "seed": "seed-1", "difficulty": "0"},
|
|
"turnstile": {"required": True, "dx": "encoded-dx"},
|
|
"so": {"required": True, "snapshot_dx": "snapshot-dx"},
|
|
}
|
|
)
|
|
raise AssertionError(f"unexpected request: {method} {url}")
|
|
|
|
|
|
class SentinelSolverTests(unittest.TestCase):
|
|
@patch("subprocess.run")
|
|
def test_build_token_uses_enforcement_p_not_prepare_p(self, run_mock):
|
|
from src.sentinel_solver import SentinelSolver
|
|
|
|
def fake_run(command, **kwargs):
|
|
mode = command[-1]
|
|
payload = json.loads(kwargs["input"])
|
|
if mode == "prepare":
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"p": "prepare-proof"}), stderr="")
|
|
if mode == "turnstile":
|
|
self.assertEqual(payload["p"], "prepare-proof")
|
|
self.assertEqual(payload["dx"], "encoded-dx")
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"t": "wire-turnstile"}), stderr="")
|
|
if mode == "enforcement":
|
|
self.assertEqual(payload["chat_req"]["proofofwork"]["seed"], "seed-1")
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"p": "enforcement-proof"}), stderr="")
|
|
raise AssertionError(command)
|
|
|
|
run_mock.side_effect = fake_run
|
|
http = FakeHTTP()
|
|
solver = SentinelSolver(http)
|
|
|
|
token = solver.build_token("username_password_create")
|
|
parsed = json.loads(token)
|
|
|
|
self.assertEqual(
|
|
parsed,
|
|
{
|
|
"p": "enforcement-proof",
|
|
"t": "wire-turnstile",
|
|
"c": "collector-token",
|
|
"id": "cookie-did",
|
|
"flow": "username_password_create",
|
|
},
|
|
)
|
|
self.assertEqual(http.calls[0][0:2], ("GET", "https://sentinel.openai.com/backend-api/sentinel/sdk.js"))
|
|
self.assertTrue(http.calls[1][1].startswith("https://sentinel.openai.com/backend-api/sentinel/frame.html?sv="))
|
|
self.assertEqual(http.calls[2][0:2], ("POST", "https://sentinel.openai.com/backend-api/sentinel/req"))
|
|
self.assertEqual(http.req_payloads[-1]["p"], "prepare-proof")
|
|
self.assertEqual(http.req_payloads[-1]["flow"], "username_password_create")
|
|
|
|
@patch("subprocess.run")
|
|
def test_build_session_observer_token_uses_cached_chat_req_snapshot(self, run_mock):
|
|
from src.sentinel_solver import SentinelSolver
|
|
|
|
def fake_run(command, **kwargs):
|
|
mode = command[-1]
|
|
payload = json.loads(kwargs["input"])
|
|
if mode == "prepare":
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"p": "prepare-proof"}), stderr="")
|
|
if mode == "turnstile":
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"t": "wire-turnstile"}), stderr="")
|
|
if mode == "enforcement":
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"p": "enforcement-proof"}), stderr="")
|
|
if mode == "session-observer":
|
|
self.assertEqual(payload["flow"], "oauth_create_account")
|
|
self.assertEqual(payload["c"], "collector-token")
|
|
self.assertEqual(payload["snapshot_dx"], "snapshot-dx")
|
|
return SimpleNamespace(returncode=0, stdout=json.dumps({"so": "wire-so"}), stderr="")
|
|
raise AssertionError(command)
|
|
|
|
run_mock.side_effect = fake_run
|
|
http = FakeHTTP()
|
|
solver = SentinelSolver(http)
|
|
|
|
so_token = solver.build_session_observer_token("oauth_create_account")
|
|
|
|
self.assertEqual(
|
|
json.loads(so_token),
|
|
{
|
|
"so": "wire-so",
|
|
"c": "collector-token",
|
|
"id": "cookie-did",
|
|
"flow": "oauth_create_account",
|
|
},
|
|
)
|
|
self.assertEqual(http.req_payloads[-1]["flow"], "oauth_create_account")
|
|
self.assertEqual(http.req_payloads[-1]["p"], "prepare-proof")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|