129 lines
3.3 KiB
Python
129 lines
3.3 KiB
Python
import asyncio
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
from config import ASR_LANGUAGE, TTS_VOICE
|
||
|
||
try:
|
||
import speech_recognition as sr
|
||
except ImportError:
|
||
sr = None
|
||
|
||
try:
|
||
import edge_tts
|
||
except ImportError:
|
||
edge_tts = None
|
||
|
||
|
||
async def async_console_input(prompt: str) -> str:
|
||
"""在线程中执行阻塞 input,避免阻塞事件循环。"""
|
||
return await asyncio.to_thread(input, prompt)
|
||
|
||
|
||
def has_asr() -> bool:
|
||
return sr is not None
|
||
|
||
|
||
def has_tts() -> bool:
|
||
return edge_tts is not None
|
||
|
||
|
||
def find_audio_player() -> list[str] | None:
|
||
"""查找可用播放器,优先 ffplay。"""
|
||
if shutil.which("ffplay"):
|
||
return ["ffplay", "-nodisp", "-autoexit", "-loglevel", "error"]
|
||
if shutil.which("mpg123"):
|
||
return ["mpg123", "-q"]
|
||
if shutil.which("afplay"):
|
||
return ["afplay"]
|
||
return None
|
||
|
||
|
||
def _play_audio_file_blocking(audio_path: str, player_cmd: list[str]) -> bool:
|
||
try:
|
||
subprocess.run(
|
||
[*player_cmd, audio_path],
|
||
check=True,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
async def async_speak(text: str) -> bool:
|
||
"""使用 edge-tts 生成 Yunxi 语音并播放。"""
|
||
if not text or edge_tts is None:
|
||
return False
|
||
|
||
player_cmd = find_audio_player()
|
||
if player_cmd is None:
|
||
return False
|
||
|
||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as fp:
|
||
audio_path = fp.name
|
||
try:
|
||
communicate = edge_tts.Communicate(text=text, voice=TTS_VOICE)
|
||
await communicate.save(audio_path)
|
||
return await asyncio.to_thread(_play_audio_file_blocking, audio_path, player_cmd)
|
||
except Exception:
|
||
return False
|
||
finally:
|
||
try:
|
||
Path(audio_path).unlink(missing_ok=True)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _listen_once_blocking(
|
||
language: str = ASR_LANGUAGE,
|
||
timeout: int = 8,
|
||
phrase_time_limit: int = 20,
|
||
) -> str:
|
||
"""阻塞式麦克风识别,返回识别文本。"""
|
||
if sr is None:
|
||
raise RuntimeError("缺少 speech_recognition 依赖")
|
||
|
||
recognizer = sr.Recognizer()
|
||
with sr.Microphone(sample_rate=16000) as source:
|
||
print(">>>>>> 🎤 请说话... <<<<<<")
|
||
recognizer.adjust_for_ambient_noise(source, duration=0.4)
|
||
audio = recognizer.listen(
|
||
source,
|
||
timeout=timeout,
|
||
phrase_time_limit=phrase_time_limit,
|
||
)
|
||
return recognizer.recognize_google(audio, language=language).strip()
|
||
|
||
|
||
async def _async_listen_once() -> str:
|
||
return await asyncio.to_thread(_listen_once_blocking)
|
||
|
||
|
||
async def get_user_input(io_mode: str) -> str:
|
||
"""
|
||
统一用户输入入口:
|
||
- text: 纯文本输入
|
||
- voice: 回车后语音输入,也允许直接键入文字
|
||
"""
|
||
if io_mode == "text":
|
||
return (await async_console_input("你说: ")).strip()
|
||
|
||
typed = (await async_console_input("你说(回车=语音, 直接输入=文本): ")).strip()
|
||
if typed:
|
||
return typed
|
||
|
||
try:
|
||
spoken = await _async_listen_once()
|
||
except Exception as e:
|
||
print(f">>>>>> ⚠️ 语音识别失败:{e} <<<<<<\n")
|
||
return ""
|
||
|
||
if spoken:
|
||
print(f"[语音识别]: {spoken}")
|
||
return spoken
|
||
|