voice_announcer.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. """
  2. 语音播放模块
  3. 接收业务平台的语音播放指令,调用 TTS 服务生成语音并通过喇叭播放
  4. """
  5. import os
  6. import time
  7. import json
  8. import threading
  9. import queue
  10. import requests
  11. import subprocess
  12. import tempfile
  13. from typing import Optional, Dict, Any, List
  14. from dataclasses import dataclass
  15. from enum import Enum
  16. class VoicePriority(Enum):
  17. """语音优先级"""
  18. LOW = 1 # 低优先级(一般通知)
  19. NORMAL = 2 # 正常优先级
  20. HIGH = 3 # 高优先级(紧急告警)
  21. URGENT = 4 # 最高优先级(立即播放,打断当前)
  22. @dataclass
  23. class VoiceCommand:
  24. """语音播放指令"""
  25. text: str # 要播放的文本
  26. priority: VoicePriority = VoicePriority.NORMAL # 优先级
  27. speed: float = 1.0 # 语速
  28. volume: float = 1.0 # 音量
  29. voice_id: str = "" # 音色ID
  30. repeat: int = 1 # 重复次数
  31. interval: float = 0.5 # 重复间隔
  32. source: str = "" # 来源(业务平台等)
  33. timestamp: float = 0.0 # 时间戳
  34. def __post_init__(self):
  35. if self.timestamp == 0.0:
  36. self.timestamp = time.time()
  37. class TTSService:
  38. """
  39. TTS 服务接口
  40. 支持多种 TTS 后端
  41. """
  42. def __init__(self, config: Dict[str, Any] = None):
  43. """
  44. 初始化 TTS 服务
  45. Args:
  46. config: 配置字典
  47. """
  48. self.config = config or {}
  49. # 服务类型: 'api', 'local', 'edge-tts', 'piper'
  50. self.service_type = self.config.get('service_type', 'edge-tts')
  51. # API 配置
  52. self.api_url = self.config.get('api_url', '')
  53. self.api_key = self.config.get('api_key', '')
  54. # 本地配置
  55. self.local_command = self.config.get('local_command', '')
  56. # Edge-TTS 配置
  57. self.edge_voice = self.config.get('edge_voice', 'zh-CN-XiaoxiaoNeural')
  58. # 缓存目录
  59. self.cache_dir = self.config.get('cache_dir', '/tmp/tts_cache')
  60. os.makedirs(self.cache_dir, exist_ok=True)
  61. # 语音缓存
  62. self.voice_cache = {}
  63. self.cache_enabled = self.config.get('cache_enabled', True)
  64. def synthesize(self, text: str, output_path: str = None,
  65. speed: float = 1.0, volume: float = 1.0,
  66. voice_id: str = "") -> Optional[str]:
  67. """
  68. 合成语音
  69. Args:
  70. text: 要合成的文本
  71. output_path: 输出路径,如果为 None 则自动生成
  72. speed: 语速
  73. volume: 音量
  74. voice_id: 音色ID
  75. Returns:
  76. 生成的音频文件路径,失败返回 None
  77. """
  78. if not text:
  79. return None
  80. # 检查缓存
  81. if self.cache_enabled:
  82. cache_key = self._get_cache_key(text, speed, volume, voice_id)
  83. if cache_key in self.voice_cache:
  84. cached_path = self.voice_cache[cache_key]
  85. if os.path.exists(cached_path):
  86. return cached_path
  87. # 生成输出路径
  88. if output_path is None:
  89. output_path = os.path.join(
  90. self.cache_dir,
  91. f"tts_{int(time.time() * 1000)}.mp3"
  92. )
  93. # 根据服务类型调用不同的 TTS
  94. success = False
  95. if self.service_type == 'api':
  96. success = self._synthesize_api(text, output_path, speed, volume, voice_id)
  97. elif self.service_type == 'edge-tts':
  98. success = self._synthesize_edge_tts(text, output_path, speed, volume, voice_id)
  99. elif self.service_type == 'piper':
  100. success = self._synthesize_piper(text, output_path, speed, volume, voice_id)
  101. elif self.service_type == 'local':
  102. success = self._synthesize_local(text, output_path, speed, volume, voice_id)
  103. else:
  104. print(f"未知的 TTS 服务类型: {self.service_type}")
  105. return None
  106. if success and os.path.exists(output_path):
  107. # 缓存
  108. if self.cache_enabled:
  109. self.voice_cache[cache_key] = output_path
  110. return output_path
  111. return None
  112. def _get_cache_key(self, text: str, speed: float, volume: float, voice_id: str) -> str:
  113. """生成缓存键"""
  114. return f"{text}_{speed}_{volume}_{voice_id}"
  115. def _synthesize_api(self, text: str, output_path: str,
  116. speed: float, volume: float, voice_id: str) -> bool:
  117. """使用 API 合成语音"""
  118. try:
  119. headers = {'Content-Type': 'application/json'}
  120. if self.api_key:
  121. headers['Authorization'] = f'Bearer {self.api_key}'
  122. data = {
  123. 'text': text,
  124. 'speed': speed,
  125. 'volume': volume,
  126. 'voice_id': voice_id or self.edge_voice
  127. }
  128. response = requests.post(
  129. self.api_url,
  130. headers=headers,
  131. json=data,
  132. timeout=30
  133. )
  134. if response.status_code == 200:
  135. # 假设返回音频数据
  136. with open(output_path, 'wb') as f:
  137. f.write(response.content)
  138. return True
  139. else:
  140. print(f"TTS API 错误: {response.status_code}")
  141. return False
  142. except Exception as e:
  143. print(f"TTS API 调用失败: {e}")
  144. return False
  145. def _synthesize_edge_tts(self, text: str, output_path: str,
  146. speed: float, volume: float, voice_id: str) -> bool:
  147. """使用 edge-tts 合成语音"""
  148. try:
  149. import edge_tts
  150. voice = voice_id or self.edge_voice
  151. # 语速和音量参数
  152. rate = f"+{int((speed - 1) * 100)}%" if speed > 1 else f"{int((speed - 1) * 100)}%"
  153. volume_str = f"+{int((volume - 1) * 100)}%" if volume > 1 else f"{int((volume - 1) * 100)}%"
  154. communicate = edge_tts.Communicate(
  155. text,
  156. voice,
  157. rate=rate,
  158. volume=volume_str
  159. )
  160. # 异步保存
  161. import asyncio
  162. async def save():
  163. await communicate.save(output_path)
  164. asyncio.run(save())
  165. return os.path.exists(output_path)
  166. except ImportError:
  167. print("未安装 edge-tts,请运行: pip install edge-tts")
  168. return False
  169. except Exception as e:
  170. print(f"edge-tts 合成失败: {e}")
  171. return False
  172. def _synthesize_piper(self, text: str, output_path: str,
  173. speed: float, volume: float, voice_id: str) -> bool:
  174. """使用 piper 合成语音"""
  175. try:
  176. # piper 命令行调用
  177. model = voice_id or self.config.get('piper_model', 'zh_CN-huayan-medium')
  178. cmd = [
  179. 'piper',
  180. '--model', model,
  181. '--output_file', output_path
  182. ]
  183. process = subprocess.Popen(
  184. cmd,
  185. stdin=subprocess.PIPE,
  186. stdout=subprocess.PIPE,
  187. stderr=subprocess.PIPE
  188. )
  189. stdout, stderr = process.communicate(input=text.encode('utf-8'))
  190. if process.returncode == 0:
  191. return os.path.exists(output_path)
  192. else:
  193. print(f"piper 错误: {stderr.decode('utf-8')}")
  194. return False
  195. except FileNotFoundError:
  196. print("未找到 piper 命令")
  197. return False
  198. except Exception as e:
  199. print(f"piper 合成失败: {e}")
  200. return False
  201. def _synthesize_local(self, text: str, output_path: str,
  202. speed: float, volume: float, voice_id: str) -> bool:
  203. """使用本地命令合成语音"""
  204. try:
  205. cmd = self.local_command.format(
  206. text=text,
  207. output=output_path,
  208. speed=speed,
  209. volume=volume
  210. )
  211. result = subprocess.run(
  212. cmd,
  213. shell=True,
  214. capture_output=True,
  215. timeout=30
  216. )
  217. if result.returncode == 0:
  218. return os.path.exists(output_path)
  219. else:
  220. print(f"本地命令错误: {result.stderr.decode('utf-8')}")
  221. return False
  222. except Exception as e:
  223. print(f"本地命令执行失败: {e}")
  224. return False
  225. class AudioPlayer:
  226. """
  227. 音频播放器
  228. 使用系统音频设备播放音频
  229. """
  230. def __init__(self, config: Dict[str, Any] = None):
  231. """
  232. 初始化播放器
  233. Args:
  234. config: 配置字典
  235. """
  236. self.config = config or {}
  237. # 播放命令
  238. # Linux: 'aplay', 'mpg123', 'ffplay'
  239. # macOS: 'afplay'
  240. # Windows: 'cmdmp3'
  241. self.player_command = self.config.get('player_command', self._detect_player())
  242. # 音量控制
  243. self.volume = self.config.get('volume', 1.0)
  244. # 播放状态
  245. self.playing = False
  246. self.current_process = None
  247. def _detect_player(self) -> str:
  248. """检测可用的播放器"""
  249. players = ['mpg123', 'aplay', 'ffplay', 'afplay']
  250. for player in players:
  251. try:
  252. subprocess.run(
  253. ['which', player],
  254. capture_output=True,
  255. check=True
  256. )
  257. return player
  258. except:
  259. continue
  260. return 'mpg123' # 默认
  261. def play(self, audio_path: str, volume: float = None) -> bool:
  262. """
  263. 播放音频文件
  264. Args:
  265. audio_path: 音频文件路径
  266. volume: 音量 (覆盖默认值)
  267. Returns:
  268. 是否成功
  269. """
  270. if not os.path.exists(audio_path):
  271. print(f"音频文件不存在: {audio_path}")
  272. return False
  273. vol = volume if volume is not None else self.volume
  274. try:
  275. self.playing = True
  276. # 根据播放器选择命令
  277. if self.player_command == 'mpg123':
  278. cmd = ['mpg123', '-g', str(int(vol * 100)), audio_path]
  279. elif self.player_command == 'aplay':
  280. # aplay 只支持 WAV,需要转换
  281. cmd = ['aplay', audio_path]
  282. elif self.player_command == 'ffplay':
  283. cmd = ['ffplay', '-nodisp', '-autoexit', '-volume', str(int(vol * 100)), audio_path]
  284. elif self.player_command == 'afplay':
  285. cmd = ['afplay', '-v', str(vol), audio_path]
  286. else:
  287. cmd = [self.player_command, audio_path]
  288. self.current_process = subprocess.Popen(
  289. cmd,
  290. stdout=subprocess.PIPE,
  291. stderr=subprocess.PIPE
  292. )
  293. # 等待播放完成
  294. self.current_process.wait()
  295. self.playing = False
  296. return self.current_process.returncode == 0
  297. except FileNotFoundError:
  298. print(f"播放器未找到: {self.player_command}")
  299. return False
  300. except Exception as e:
  301. print(f"播放失败: {e}")
  302. self.playing = False
  303. return False
  304. def stop(self):
  305. """停止播放"""
  306. if self.current_process:
  307. self.current_process.terminate()
  308. self.current_process = None
  309. self.playing = False
  310. def play_async(self, audio_path: str, volume: float = None,
  311. callback: callable = None) -> threading.Thread:
  312. """
  313. 异步播放
  314. Args:
  315. audio_path: 音频文件路径
  316. volume: 音量
  317. callback: 播放完成回调
  318. Returns:
  319. 播放线程
  320. """
  321. def _play():
  322. success = self.play(audio_path, volume)
  323. if callback:
  324. callback(success)
  325. thread = threading.Thread(target=_play, daemon=True)
  326. thread.start()
  327. return thread
  328. class VoiceAnnouncer:
  329. """
  330. 语音播报器
  331. 整合 TTS 和音频播放,支持队列播放和优先级管理
  332. """
  333. def __init__(self, tts_config: Dict[str, Any] = None,
  334. player_config: Dict[str, Any] = None):
  335. """
  336. 初始化语音播报器
  337. Args:
  338. tts_config: TTS 配置
  339. player_config: 播放器配置
  340. """
  341. self.tts = TTSService(tts_config)
  342. self.player = AudioPlayer(player_config)
  343. # 播放队列
  344. self.queue = queue.PriorityQueue()
  345. # 运行状态
  346. self.running = False
  347. self.worker_thread = None
  348. # 统计
  349. self.stats = {
  350. 'total_commands': 0,
  351. 'played_commands': 0,
  352. 'failed_commands': 0
  353. }
  354. self.stats_lock = threading.Lock()
  355. def start(self):
  356. """启动播报器"""
  357. if self.running:
  358. return
  359. self.running = True
  360. self.worker_thread = threading.Thread(target=self._worker, daemon=True)
  361. self.worker_thread.start()
  362. print("语音播报器已启动")
  363. def stop(self):
  364. """停止播报器"""
  365. self.running = False
  366. self.player.stop()
  367. if self.worker_thread:
  368. self.worker_thread.join(timeout=3)
  369. print("语音播报器已停止")
  370. def announce(self, text: str, priority: VoicePriority = VoicePriority.NORMAL,
  371. speed: float = 1.0, volume: float = 1.0, repeat: int = 1) -> bool:
  372. """
  373. 播报语音
  374. Args:
  375. text: 要播报的文本
  376. priority: 优先级
  377. speed: 语速
  378. volume: 音量
  379. repeat: 重复次数
  380. Returns:
  381. 是否成功加入队列
  382. """
  383. if not text:
  384. return False
  385. # 如果是紧急优先级,立即播放
  386. if priority == VoicePriority.URGENT:
  387. self._play_immediately(text, speed, volume, repeat)
  388. return True
  389. # 加入队列
  390. command = VoiceCommand(
  391. text=text,
  392. priority=priority,
  393. speed=speed,
  394. volume=volume,
  395. repeat=repeat
  396. )
  397. # 优先级队列:数值越小优先级越高
  398. self.queue.put((-priority.value, time.time(), command))
  399. with self.stats_lock:
  400. self.stats['total_commands'] += 1
  401. return True
  402. def announce_violation(self, description: str, urgent: bool = False):
  403. """
  404. 播报安全违规
  405. Args:
  406. description: 违规描述
  407. urgent: 是否紧急
  408. """
  409. text = f"警告:{description},请立即整改"
  410. priority = VoicePriority.URGENT if urgent else VoicePriority.HIGH
  411. self.announce(text, priority=priority, repeat=3)
  412. def announce_safe(self):
  413. """播报安全提示"""
  414. text = "安全装备齐全,请继续保持"
  415. self.announce(text, priority=VoicePriority.LOW)
  416. def _worker(self):
  417. """工作线程"""
  418. while self.running:
  419. try:
  420. # 获取命令
  421. try:
  422. _, _, command = self.queue.get(timeout=1.0)
  423. except queue.Empty:
  424. continue
  425. # 播放
  426. success = self._play_command(command)
  427. with self.stats_lock:
  428. if success:
  429. self.stats['played_commands'] += 1
  430. else:
  431. self.stats['failed_commands'] += 1
  432. except Exception as e:
  433. print(f"播报错误: {e}")
  434. def _play_immediately(self, text: str, speed: float, volume: float, repeat: int):
  435. """立即播放(紧急)"""
  436. # 停止当前播放
  437. self.player.stop()
  438. # 合成并播放
  439. audio_path = self.tts.synthesize(text, speed=speed, volume=volume)
  440. if audio_path:
  441. for _ in range(repeat):
  442. self.player.play(audio_path, volume)
  443. time.sleep(0.5)
  444. def _play_command(self, command: VoiceCommand) -> bool:
  445. """播放命令"""
  446. audio_path = self.tts.synthesize(
  447. command.text,
  448. speed=command.speed,
  449. volume=command.volume,
  450. voice_id=command.voice_id
  451. )
  452. if not audio_path:
  453. return False
  454. for i in range(command.repeat):
  455. if not self.running:
  456. break
  457. success = self.player.play(audio_path, command.volume)
  458. if not success:
  459. return False
  460. if i < command.repeat - 1:
  461. time.sleep(command.interval)
  462. return True
  463. def get_stats(self) -> Dict[str, int]:
  464. """获取统计信息"""
  465. with self.stats_lock:
  466. return self.stats.copy()
  467. def clear_queue(self):
  468. """清空队列"""
  469. while not self.queue.empty():
  470. try:
  471. self.queue.get_nowait()
  472. except queue.Empty:
  473. break
  474. def create_voice_announcer(config: Dict[str, Any] = None) -> VoiceAnnouncer:
  475. """
  476. 创建语音播报器实例
  477. Args:
  478. config: 配置字典
  479. Returns:
  480. VoiceAnnouncer 实例
  481. """
  482. config = config or {}
  483. tts_config = config.get('tts', {})
  484. player_config = config.get('player', {})
  485. return VoiceAnnouncer(tts_config, player_config)