Files
face_agent/main.py
2026-03-04 14:41:56 +08:00

346 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import os
import sqlite3
import sys
from pathlib import Path
from typing import Annotated
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.models.openai import _openai_client as openai_client_module
from autogen_ext.tools.mcp import StdioServerParams, mcp_server_tools
try:
import speech_recognition as sr
except ImportError:
sr = None
try:
import pyttsx3
except ImportError:
pyttsx3 = None
BASE_DIR = Path(__file__).resolve().parent
USER_DB_PATH = BASE_DIR / "users.db"
MODEL_CALL_TIMEOUT_SECONDS = 45
ASR_LANGUAGE = "zh-CN"
MODEL_NAME = os.getenv("VLM_MODEL", "Qwen/Qwen3-VL-8B-Instruct")
MODEL_BASE_URL = os.getenv("VLM_BASE_URL", "http://220.248.114.28:8000/v1")
MODEL_API_KEY = os.getenv("VLM_API_KEY", "EMPTY")
_TTS_ENGINE = None
# --- 第一部分:本地工具(面部 + 语音,以后接硬件)---
def _patch_autogen_tool_schema_for_vllm() -> None:
"""
vLLM 目前会对 OpenAI 工具定义中的 `strict` 字段告警(即便 strict=False
这里做最小补丁:保留工具定义,移除该字段,避免无意义警告。
"""
if getattr(openai_client_module.convert_tools, "_strict_removed_patch", False):
return
original_convert_tools = openai_client_module.convert_tools
def convert_tools_without_strict(tools):
converted = original_convert_tools(tools)
for tool in converted:
fn = tool.get("function")
if isinstance(fn, dict):
fn.pop("strict", None)
return converted
convert_tools_without_strict._strict_removed_patch = True
openai_client_module.convert_tools = convert_tools_without_strict
async def _async_console_input(prompt: str) -> str:
"""在线程中执行阻塞 input避免阻塞事件循环。"""
return await asyncio.to_thread(input, prompt)
def _init_tts_engine():
"""初始化离线 TTSpyttsx3"""
global _TTS_ENGINE
if _TTS_ENGINE is not None:
return _TTS_ENGINE
if pyttsx3 is None:
return None
engine = pyttsx3.init()
# 优先选择中文语音(不同系统 voice id 不同,这里做模糊匹配)
for voice in engine.getProperty("voices"):
voice_blob = f"{voice.id} {voice.name}".lower()
if "zh" in voice_blob or "chinese" in voice_blob or "mandarin" in voice_blob:
engine.setProperty("voice", voice.id)
break
engine.setProperty("rate", 190)
_TTS_ENGINE = engine
return _TTS_ENGINE
def _speak_blocking(text: str) -> bool:
"""阻塞式语音播报。成功返回 True。"""
if not text:
return False
engine = _init_tts_engine()
if engine is None:
return False
engine.say(text)
engine.runAndWait()
return True
async def _async_speak(text: str) -> bool:
return await asyncio.to_thread(_speak_blocking, text)
def _listen_once_blocking(
language: str = ASR_LANGUAGE,
timeout: int = 8,
phrase_time_limit: int = 20,
) -> str:
"""阻塞式麦克风识别,返回识别文本。"""
if sr is None:
raise RuntimeError("缺少 speech_recognition 依赖")
recognizer = sr.Recognizer()
with sr.Microphone(sample_rate=16000) as source:
print(">>>>>> 🎤 请说话... <<<<<<")
recognizer.adjust_for_ambient_noise(source, duration=0.4)
audio = recognizer.listen(
source,
timeout=timeout,
phrase_time_limit=phrase_time_limit,
)
return recognizer.recognize_google(audio, language=language).strip()
async def _async_listen_once() -> str:
"""在线程中执行语音识别,避免阻塞事件循环。"""
return await asyncio.to_thread(_listen_once_blocking)
async def _get_user_input(io_mode: str) -> str:
"""
统一用户输入入口:
- text: 纯文本输入
- voice: 回车后语音输入,也允许直接键入文字
"""
if io_mode == "text":
return (await _async_console_input("你说: ")).strip()
typed = (await _async_console_input("你说(回车=语音, 直接输入=文本): ")).strip()
if typed:
return typed
try:
spoken = await _async_listen_once()
except Exception as e:
print(f">>>>>> ⚠️ 语音识别失败:{e} <<<<<<\n")
return ""
if spoken:
print(f"[语音识别]: {spoken}")
return spoken
async def set_expression(
expression: Annotated[str, "机器人要展示的表情,如:开心、疑惑、难过、待机"],
intensity: Annotated[int, "表情强度 1-10"] = 5
) -> str:
"""[模拟面部] 控制机器人头部的表情展示。"""
print(f"\n>>>>>> 🤖 表情更新: 【{expression}】 (强度: {intensity}/10) <<<<<<")
return f"已切换到【{expression}】表情。"
# --- 第二部分:直接读取用户档案(不经过 MCP避免多轮工具调用---
def _load_user_profile(user_name: str, db_path: str | Path = USER_DB_PATH) -> str:
"""在 Python 层直接读档案,注入到消息上下文,模型无需主动调用 get_user_profile。"""
try:
with sqlite3.connect(db_path) as conn:
conn.row_factory = sqlite3.Row
user = conn.execute(
"SELECT * FROM users WHERE name = ?", (user_name,)
).fetchone()
if not user:
return f"用户 {user_name} 尚无历史记录,这是第一次见面。"
prefs = conn.execute(
"SELECT category, content FROM preferences WHERE user_name = ?",
(user_name,)
).fetchall()
conn.execute(
"UPDATE users SET last_seen = datetime('now') WHERE name = ?",
(user_name,)
)
return json.dumps({
"基本信息": {"姓名": user["name"], "年龄": user["age"], "上次见面": user["last_seen"]},
"偏好习惯": {p["category"]: p["content"] for p in prefs},
}, ensure_ascii=False)
except Exception as e:
return f"档案读取失败({e}),当作第一次见面。"
# --- 第三部分:启动大脑 ---
async def start_simulated_head():
_patch_autogen_tool_schema_for_vllm()
# 连接 MCP Server只取写入类工具读档案由 Python 层直接处理)
server_params = StdioServerParams(
command=sys.executable,
args=["robot_mcp_server.py"],
)
all_mcp_tools = await mcp_server_tools(server_params)
# 过滤掉 get_user_profile模型无需主动调用它
mcp_tools = [t for t in all_mcp_tools if getattr(t, "name", "") != "get_user_profile"]
model_client = OpenAIChatCompletionClient(
model=MODEL_NAME,
base_url=MODEL_BASE_URL,
api_key=MODEL_API_KEY,
model_info={
"vision": True,
"function_calling": True,
"json_output": True,
# 关闭 structured_output避免向 vLLM 发送 strict 字段告警。
"structured_output": False,
"family": "unknown",
"context_length": 32000,
}
)
brain = AssistantAgent(
name="RobotBrain",
model_client=model_client,
tools=[set_expression] + mcp_tools,
reflect_on_tool_use=True,
system_message="""你是一个有情感的机器人伙伴。每条消息包含:
[用户档案] 历史记录(已由系统预加载)
[视觉状态] 当前摄像头观察
[用户说] 用户这句话
回应规则:
1. 同一轮内调用set_expression表情+ 所有需要的查询工具get_weather/get_location/web_search+ 需要的用户信息工具upsert_user/set_preference
2. 工具执行完毕后,用简短、温暖、自然的语言直接回答用户——这段文字就是你的语音输出。
3. 不要说"我去查一下"之类的过渡语,直接完成任务并给出结果。""",
)
# --- 第四部分:交互循环 ---
print("=" * 50)
print(" 机器人已上线!输入 'quit' 退出")
print(f" 模型: {MODEL_NAME}")
print(f" 服务: {MODEL_BASE_URL}")
print("=" * 50)
try:
user_name = (await _async_console_input("请输入你的名字: ")).strip() or "用户"
except (EOFError, KeyboardInterrupt):
print("\n机器人下线,再见!")
return
has_asr = sr is not None
has_tts = pyttsx3 is not None
if has_asr and has_tts:
mode_tip = "voice"
else:
mode_tip = "text"
try:
io_mode = (
await _async_console_input(
f"输入模式 voice/text默认 {mode_tip}: "
)
).strip().lower() or mode_tip
except (EOFError, KeyboardInterrupt):
print("\n机器人下线,再见!")
return
if io_mode not in ("voice", "text"):
io_mode = mode_tip
if io_mode == "voice" and not has_asr:
print(">>>>>> ⚠️ 未安装 speech_recognition已降级为文本输入。 <<<<<<")
io_mode = "text"
if io_mode == "voice" and not has_tts:
print(">>>>>> ⚠️ 未安装 pyttsx3将仅文本输出不播报语音。 <<<<<<")
print(
"\n[语音依赖状态] "
f"ASR={'ok' if has_asr else 'missing'}, "
f"TTS={'ok' if has_tts else 'missing'}"
)
if not has_asr or not has_tts:
print("可安装: pip install SpeechRecognition pyaudio pyttsx3")
visual_context = "视觉输入:用户坐在电脑前,表情平静,看着屏幕。"
print(f"\n[当前视觉状态]: {visual_context}")
print("提示:输入 'v <描述>' 可以更新视觉状态,例如: v 用户在笑\n")
history = []
try:
while True:
try:
user_input = await _get_user_input(io_mode)
except (EOFError, KeyboardInterrupt):
print("\n机器人下线,再见!")
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "退出"):
print("机器人下线,再见!")
break
if user_input.lower().startswith("v "):
visual_context = f"视觉输入:{user_input[2:].strip()}"
print(f"[视觉状态已更新]: {visual_context}\n")
continue
# Python 层直接读取档案并注入消息,模型无需发起额外工具调用
profile = _load_user_profile(user_name)
combined_input = (
f"[用户档案]\n{profile}\n\n"
f"[视觉状态] {visual_context}\n"
f"[用户说] {user_input}"
)
history.append(TextMessage(content=combined_input, source="user"))
# 只保留最近 6 条消息3轮对话防止超出 token 上限
# 用户档案每轮从数据库重新注入,不依赖长历史
if len(history) > 6:
history = history[-6:]
try:
response = await asyncio.wait_for(
brain.on_messages(history, CancellationToken()),
timeout=MODEL_CALL_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
print(">>>>>> ⚠️ 请求超时,请稍后重试或简化问题。 <<<<<<\n")
continue
except Exception as e:
print(f">>>>>> ⚠️ 本轮处理失败:{e} <<<<<<\n")
continue
# 模型的文字回复就是语音输出reflect_on_tool_use=True 保证这里是 TextMessage
speech = response.chat_message.content
if speech and isinstance(speech, str):
print(f">>>>>> 🔊 机器人说: {speech} <<<<<<\n")
if io_mode == "voice":
spoken_ok = await _async_speak(speech)
if not spoken_ok:
print(">>>>>> ⚠️ TTS 不可用,当前仅文本输出。 <<<<<<\n")
# 只把最终回复加入历史inner_messages 是事件对象不能序列化回模型
history.append(response.chat_message)
finally:
model_client.close()
if __name__ == "__main__":
asyncio.run(start_simulated_head())