Files
face_agent/voice_io.py
2026-03-04 15:35:57 +08:00

129 lines
3.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import shutil
import subprocess
import tempfile
from pathlib import Path
from config import ASR_LANGUAGE, TTS_VOICE
try:
import speech_recognition as sr
except ImportError:
sr = None
try:
import edge_tts
except ImportError:
edge_tts = None
async def async_console_input(prompt: str) -> str:
"""在线程中执行阻塞 input避免阻塞事件循环。"""
return await asyncio.to_thread(input, prompt)
def has_asr() -> bool:
return sr is not None
def has_tts() -> bool:
return edge_tts is not None
def find_audio_player() -> list[str] | None:
"""查找可用播放器,优先 ffplay。"""
if shutil.which("ffplay"):
return ["ffplay", "-nodisp", "-autoexit", "-loglevel", "error"]
if shutil.which("mpg123"):
return ["mpg123", "-q"]
if shutil.which("afplay"):
return ["afplay"]
return None
def _play_audio_file_blocking(audio_path: str, player_cmd: list[str]) -> bool:
try:
subprocess.run(
[*player_cmd, audio_path],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
async def async_speak(text: str) -> bool:
"""使用 edge-tts 生成 Yunxi 语音并播放。"""
if not text or edge_tts is None:
return False
player_cmd = find_audio_player()
if player_cmd is None:
return False
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as fp:
audio_path = fp.name
try:
communicate = edge_tts.Communicate(text=text, voice=TTS_VOICE)
await communicate.save(audio_path)
return await asyncio.to_thread(_play_audio_file_blocking, audio_path, player_cmd)
except Exception:
return False
finally:
try:
Path(audio_path).unlink(missing_ok=True)
except Exception:
pass
def _listen_once_blocking(
language: str = ASR_LANGUAGE,
timeout: int = 8,
phrase_time_limit: int = 20,
) -> str:
"""阻塞式麦克风识别,返回识别文本。"""
if sr is None:
raise RuntimeError("缺少 speech_recognition 依赖")
recognizer = sr.Recognizer()
with sr.Microphone(sample_rate=16000) as source:
print(">>>>>> 🎤 请说话... <<<<<<")
recognizer.adjust_for_ambient_noise(source, duration=0.4)
audio = recognizer.listen(
source,
timeout=timeout,
phrase_time_limit=phrase_time_limit,
)
return recognizer.recognize_google(audio, language=language).strip()
async def _async_listen_once() -> str:
return await asyncio.to_thread(_listen_once_blocking)
async def get_user_input(io_mode: str) -> str:
"""
统一用户输入入口:
- text: 纯文本输入
- voice: 回车后语音输入,也允许直接键入文字
"""
if io_mode == "text":
return (await async_console_input("你说: ")).strip()
typed = (await async_console_input("你说(回车=语音, 直接输入=文本): ")).strip()
if typed:
return typed
try:
spoken = await _async_listen_once()
except Exception as e:
print(f">>>>>> ⚠️ 语音识别失败:{e} <<<<<<\n")
return ""
if spoken:
print(f"[语音识别]: {spoken}")
return spoken