refactor: 模块解耦
This commit is contained in:
128
voice_io.py
Normal file
128
voice_io.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import asyncio
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from config import ASR_LANGUAGE, TTS_VOICE
|
||||
|
||||
try:
|
||||
import speech_recognition as sr
|
||||
except ImportError:
|
||||
sr = None
|
||||
|
||||
try:
|
||||
import edge_tts
|
||||
except ImportError:
|
||||
edge_tts = None
|
||||
|
||||
|
||||
async def async_console_input(prompt: str) -> str:
|
||||
"""在线程中执行阻塞 input,避免阻塞事件循环。"""
|
||||
return await asyncio.to_thread(input, prompt)
|
||||
|
||||
|
||||
def has_asr() -> bool:
|
||||
return sr is not None
|
||||
|
||||
|
||||
def has_tts() -> bool:
|
||||
return edge_tts is not None
|
||||
|
||||
|
||||
def find_audio_player() -> list[str] | None:
|
||||
"""查找可用播放器,优先 ffplay。"""
|
||||
if shutil.which("ffplay"):
|
||||
return ["ffplay", "-nodisp", "-autoexit", "-loglevel", "error"]
|
||||
if shutil.which("mpg123"):
|
||||
return ["mpg123", "-q"]
|
||||
if shutil.which("afplay"):
|
||||
return ["afplay"]
|
||||
return None
|
||||
|
||||
|
||||
def _play_audio_file_blocking(audio_path: str, player_cmd: list[str]) -> bool:
|
||||
try:
|
||||
subprocess.run(
|
||||
[*player_cmd, audio_path],
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
async def async_speak(text: str) -> bool:
|
||||
"""使用 edge-tts 生成 Yunxi 语音并播放。"""
|
||||
if not text or edge_tts is None:
|
||||
return False
|
||||
|
||||
player_cmd = find_audio_player()
|
||||
if player_cmd is None:
|
||||
return False
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as fp:
|
||||
audio_path = fp.name
|
||||
try:
|
||||
communicate = edge_tts.Communicate(text=text, voice=TTS_VOICE)
|
||||
await communicate.save(audio_path)
|
||||
return await asyncio.to_thread(_play_audio_file_blocking, audio_path, player_cmd)
|
||||
except Exception:
|
||||
return False
|
||||
finally:
|
||||
try:
|
||||
Path(audio_path).unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _listen_once_blocking(
|
||||
language: str = ASR_LANGUAGE,
|
||||
timeout: int = 8,
|
||||
phrase_time_limit: int = 20,
|
||||
) -> str:
|
||||
"""阻塞式麦克风识别,返回识别文本。"""
|
||||
if sr is None:
|
||||
raise RuntimeError("缺少 speech_recognition 依赖")
|
||||
|
||||
recognizer = sr.Recognizer()
|
||||
with sr.Microphone(sample_rate=16000) as source:
|
||||
print(">>>>>> 🎤 请说话... <<<<<<")
|
||||
recognizer.adjust_for_ambient_noise(source, duration=0.4)
|
||||
audio = recognizer.listen(
|
||||
source,
|
||||
timeout=timeout,
|
||||
phrase_time_limit=phrase_time_limit,
|
||||
)
|
||||
return recognizer.recognize_google(audio, language=language).strip()
|
||||
|
||||
|
||||
async def _async_listen_once() -> str:
|
||||
return await asyncio.to_thread(_listen_once_blocking)
|
||||
|
||||
|
||||
async def get_user_input(io_mode: str) -> str:
|
||||
"""
|
||||
统一用户输入入口:
|
||||
- text: 纯文本输入
|
||||
- voice: 回车后语音输入,也允许直接键入文字
|
||||
"""
|
||||
if io_mode == "text":
|
||||
return (await async_console_input("你说: ")).strip()
|
||||
|
||||
typed = (await async_console_input("你说(回车=语音, 直接输入=文本): ")).strip()
|
||||
if typed:
|
||||
return typed
|
||||
|
||||
try:
|
||||
spoken = await _async_listen_once()
|
||||
except Exception as e:
|
||||
print(f">>>>>> ⚠️ 语音识别失败:{e} <<<<<<\n")
|
||||
return ""
|
||||
|
||||
if spoken:
|
||||
print(f"[语音识别]: {spoken}")
|
||||
return spoken
|
||||
|
||||
Reference in New Issue
Block a user