| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602 |
- """
- 语音播放模块
- 接收业务平台的语音播放指令,调用 TTS 服务生成语音并通过喇叭播放
- """
- import os
- import time
- import json
- import threading
- import queue
- import requests
- import subprocess
- import tempfile
- from typing import Optional, Dict, Any, List
- from dataclasses import dataclass
- from enum import Enum
- class VoicePriority(Enum):
- """语音优先级"""
- LOW = 1 # 低优先级(一般通知)
- NORMAL = 2 # 正常优先级
- HIGH = 3 # 高优先级(紧急告警)
- URGENT = 4 # 最高优先级(立即播放,打断当前)
- @dataclass
- class VoiceCommand:
- """语音播放指令"""
- text: str # 要播放的文本
- priority: VoicePriority = VoicePriority.NORMAL # 优先级
- speed: float = 1.0 # 语速
- volume: float = 1.0 # 音量
- voice_id: str = "" # 音色ID
- repeat: int = 1 # 重复次数
- interval: float = 0.5 # 重复间隔
- source: str = "" # 来源(业务平台等)
- timestamp: float = 0.0 # 时间戳
-
- def __post_init__(self):
- if self.timestamp == 0.0:
- self.timestamp = time.time()
- class TTSService:
- """
- TTS 服务接口
- 支持多种 TTS 后端
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化 TTS 服务
-
- Args:
- config: 配置字典
- """
- self.config = config or {}
-
- # 服务类型: 'api', 'local', 'edge-tts', 'piper'
- self.service_type = self.config.get('service_type', 'edge-tts')
-
- # API 配置
- self.api_url = self.config.get('api_url', '')
- self.api_key = self.config.get('api_key', '')
-
- # 本地配置
- self.local_command = self.config.get('local_command', '')
-
- # Edge-TTS 配置
- self.edge_voice = self.config.get('edge_voice', 'zh-CN-XiaoxiaoNeural')
-
- # 缓存目录
- self.cache_dir = self.config.get('cache_dir', '/tmp/tts_cache')
- os.makedirs(self.cache_dir, exist_ok=True)
-
- # 语音缓存
- self.voice_cache = {}
- self.cache_enabled = self.config.get('cache_enabled', True)
-
- def synthesize(self, text: str, output_path: str = None,
- speed: float = 1.0, volume: float = 1.0,
- voice_id: str = "") -> Optional[str]:
- """
- 合成语音
-
- Args:
- text: 要合成的文本
- output_path: 输出路径,如果为 None 则自动生成
- speed: 语速
- volume: 音量
- voice_id: 音色ID
-
- Returns:
- 生成的音频文件路径,失败返回 None
- """
- if not text:
- return None
-
- # 检查缓存
- if self.cache_enabled:
- cache_key = self._get_cache_key(text, speed, volume, voice_id)
- if cache_key in self.voice_cache:
- cached_path = self.voice_cache[cache_key]
- if os.path.exists(cached_path):
- return cached_path
-
- # 生成输出路径
- if output_path is None:
- output_path = os.path.join(
- self.cache_dir,
- f"tts_{int(time.time() * 1000)}.mp3"
- )
-
- # 根据服务类型调用不同的 TTS
- success = False
-
- if self.service_type == 'api':
- success = self._synthesize_api(text, output_path, speed, volume, voice_id)
- elif self.service_type == 'edge-tts':
- success = self._synthesize_edge_tts(text, output_path, speed, volume, voice_id)
- elif self.service_type == 'piper':
- success = self._synthesize_piper(text, output_path, speed, volume, voice_id)
- elif self.service_type == 'local':
- success = self._synthesize_local(text, output_path, speed, volume, voice_id)
- else:
- print(f"未知的 TTS 服务类型: {self.service_type}")
- return None
-
- if success and os.path.exists(output_path):
- # 缓存
- if self.cache_enabled:
- self.voice_cache[cache_key] = output_path
- return output_path
-
- return None
-
- def _get_cache_key(self, text: str, speed: float, volume: float, voice_id: str) -> str:
- """生成缓存键"""
- return f"{text}_{speed}_{volume}_{voice_id}"
-
- def _synthesize_api(self, text: str, output_path: str,
- speed: float, volume: float, voice_id: str) -> bool:
- """使用 API 合成语音"""
- try:
- headers = {'Content-Type': 'application/json'}
- if self.api_key:
- headers['Authorization'] = f'Bearer {self.api_key}'
-
- data = {
- 'text': text,
- 'speed': speed,
- 'volume': volume,
- 'voice_id': voice_id or self.edge_voice
- }
-
- response = requests.post(
- self.api_url,
- headers=headers,
- json=data,
- timeout=30
- )
-
- if response.status_code == 200:
- # 假设返回音频数据
- with open(output_path, 'wb') as f:
- f.write(response.content)
- return True
- else:
- print(f"TTS API 错误: {response.status_code}")
- return False
-
- except Exception as e:
- print(f"TTS API 调用失败: {e}")
- return False
-
- def _synthesize_edge_tts(self, text: str, output_path: str,
- speed: float, volume: float, voice_id: str) -> bool:
- """使用 edge-tts 合成语音"""
- try:
- import edge_tts
-
- voice = voice_id or self.edge_voice
-
- # 语速和音量参数
- rate = f"+{int((speed - 1) * 100)}%" if speed > 1 else f"{int((speed - 1) * 100)}%"
- volume_str = f"+{int((volume - 1) * 100)}%" if volume > 1 else f"{int((volume - 1) * 100)}%"
-
- communicate = edge_tts.Communicate(
- text,
- voice,
- rate=rate,
- volume=volume_str
- )
-
- # 异步保存
- import asyncio
-
- async def save():
- await communicate.save(output_path)
-
- asyncio.run(save())
-
- return os.path.exists(output_path)
-
- except ImportError:
- print("未安装 edge-tts,请运行: pip install edge-tts")
- return False
- except Exception as e:
- print(f"edge-tts 合成失败: {e}")
- return False
-
- def _synthesize_piper(self, text: str, output_path: str,
- speed: float, volume: float, voice_id: str) -> bool:
- """使用 piper 合成语音"""
- try:
- # piper 命令行调用
- model = voice_id or self.config.get('piper_model', 'zh_CN-huayan-medium')
-
- cmd = [
- 'piper',
- '--model', model,
- '--output_file', output_path
- ]
-
- process = subprocess.Popen(
- cmd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- stdout, stderr = process.communicate(input=text.encode('utf-8'))
-
- if process.returncode == 0:
- return os.path.exists(output_path)
- else:
- print(f"piper 错误: {stderr.decode('utf-8')}")
- return False
-
- except FileNotFoundError:
- print("未找到 piper 命令")
- return False
- except Exception as e:
- print(f"piper 合成失败: {e}")
- return False
-
- def _synthesize_local(self, text: str, output_path: str,
- speed: float, volume: float, voice_id: str) -> bool:
- """使用本地命令合成语音"""
- try:
- cmd = self.local_command.format(
- text=text,
- output=output_path,
- speed=speed,
- volume=volume
- )
-
- result = subprocess.run(
- cmd,
- shell=True,
- capture_output=True,
- timeout=30
- )
-
- if result.returncode == 0:
- return os.path.exists(output_path)
- else:
- print(f"本地命令错误: {result.stderr.decode('utf-8')}")
- return False
-
- except Exception as e:
- print(f"本地命令执行失败: {e}")
- return False
- class AudioPlayer:
- """
- 音频播放器
- 使用系统音频设备播放音频
- """
-
- def __init__(self, config: Dict[str, Any] = None):
- """
- 初始化播放器
-
- Args:
- config: 配置字典
- """
- self.config = config or {}
-
- # 播放命令
- # Linux: 'aplay', 'mpg123', 'ffplay'
- # macOS: 'afplay'
- # Windows: 'cmdmp3'
- self.player_command = self.config.get('player_command', self._detect_player())
-
- # 音量控制
- self.volume = self.config.get('volume', 1.0)
-
- # 播放状态
- self.playing = False
- self.current_process = None
-
- def _detect_player(self) -> str:
- """检测可用的播放器"""
- players = ['mpg123', 'aplay', 'ffplay', 'afplay']
-
- for player in players:
- try:
- subprocess.run(
- ['which', player],
- capture_output=True,
- check=True
- )
- return player
- except:
- continue
-
- return 'mpg123' # 默认
-
- def play(self, audio_path: str, volume: float = None) -> bool:
- """
- 播放音频文件
-
- Args:
- audio_path: 音频文件路径
- volume: 音量 (覆盖默认值)
-
- Returns:
- 是否成功
- """
- if not os.path.exists(audio_path):
- print(f"音频文件不存在: {audio_path}")
- return False
-
- vol = volume if volume is not None else self.volume
-
- try:
- self.playing = True
-
- # 根据播放器选择命令
- if self.player_command == 'mpg123':
- cmd = ['mpg123', '-g', str(int(vol * 100)), audio_path]
- elif self.player_command == 'aplay':
- # aplay 只支持 WAV,需要转换
- cmd = ['aplay', audio_path]
- elif self.player_command == 'ffplay':
- cmd = ['ffplay', '-nodisp', '-autoexit', '-volume', str(int(vol * 100)), audio_path]
- elif self.player_command == 'afplay':
- cmd = ['afplay', '-v', str(vol), audio_path]
- else:
- cmd = [self.player_command, audio_path]
-
- self.current_process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE
- )
-
- # 等待播放完成
- self.current_process.wait()
- self.playing = False
-
- return self.current_process.returncode == 0
-
- except FileNotFoundError:
- print(f"播放器未找到: {self.player_command}")
- return False
- except Exception as e:
- print(f"播放失败: {e}")
- self.playing = False
- return False
-
- def stop(self):
- """停止播放"""
- if self.current_process:
- self.current_process.terminate()
- self.current_process = None
- self.playing = False
-
- def play_async(self, audio_path: str, volume: float = None,
- callback: callable = None) -> threading.Thread:
- """
- 异步播放
-
- Args:
- audio_path: 音频文件路径
- volume: 音量
- callback: 播放完成回调
-
- Returns:
- 播放线程
- """
- def _play():
- success = self.play(audio_path, volume)
- if callback:
- callback(success)
-
- thread = threading.Thread(target=_play, daemon=True)
- thread.start()
- return thread
- class VoiceAnnouncer:
- """
- 语音播报器
- 整合 TTS 和音频播放,支持队列播放和优先级管理
- """
-
- def __init__(self, tts_config: Dict[str, Any] = None,
- player_config: Dict[str, Any] = None):
- """
- 初始化语音播报器
-
- Args:
- tts_config: TTS 配置
- player_config: 播放器配置
- """
- self.tts = TTSService(tts_config)
- self.player = AudioPlayer(player_config)
-
- # 播放队列
- self.queue = queue.PriorityQueue()
-
- # 运行状态
- self.running = False
- self.worker_thread = None
-
- # 统计
- self.stats = {
- 'total_commands': 0,
- 'played_commands': 0,
- 'failed_commands': 0
- }
- self.stats_lock = threading.Lock()
-
- def start(self):
- """启动播报器"""
- if self.running:
- return
-
- self.running = True
- self.worker_thread = threading.Thread(target=self._worker, daemon=True)
- self.worker_thread.start()
- print("语音播报器已启动")
-
- def stop(self):
- """停止播报器"""
- self.running = False
- self.player.stop()
- if self.worker_thread:
- self.worker_thread.join(timeout=3)
- print("语音播报器已停止")
-
- def announce(self, text: str, priority: VoicePriority = VoicePriority.NORMAL,
- speed: float = 1.0, volume: float = 1.0, repeat: int = 1) -> bool:
- """
- 播报语音
-
- Args:
- text: 要播报的文本
- priority: 优先级
- speed: 语速
- volume: 音量
- repeat: 重复次数
-
- Returns:
- 是否成功加入队列
- """
- if not text:
- return False
-
- # 如果是紧急优先级,立即播放
- if priority == VoicePriority.URGENT:
- self._play_immediately(text, speed, volume, repeat)
- return True
-
- # 加入队列
- command = VoiceCommand(
- text=text,
- priority=priority,
- speed=speed,
- volume=volume,
- repeat=repeat
- )
-
- # 优先级队列:数值越小优先级越高
- self.queue.put((-priority.value, time.time(), command))
-
- with self.stats_lock:
- self.stats['total_commands'] += 1
-
- return True
-
- def announce_violation(self, description: str, urgent: bool = False):
- """
- 播报安全违规
-
- Args:
- description: 违规描述
- urgent: 是否紧急
- """
- text = f"警告:{description},请立即整改"
- priority = VoicePriority.URGENT if urgent else VoicePriority.HIGH
- self.announce(text, priority=priority, repeat=3)
-
- def announce_safe(self):
- """播报安全提示"""
- text = "安全装备齐全,请继续保持"
- self.announce(text, priority=VoicePriority.LOW)
-
- def _worker(self):
- """工作线程"""
- while self.running:
- try:
- # 获取命令
- try:
- _, _, command = self.queue.get(timeout=1.0)
- except queue.Empty:
- continue
-
- # 播放
- success = self._play_command(command)
-
- with self.stats_lock:
- if success:
- self.stats['played_commands'] += 1
- else:
- self.stats['failed_commands'] += 1
-
- except Exception as e:
- print(f"播报错误: {e}")
-
- def _play_immediately(self, text: str, speed: float, volume: float, repeat: int):
- """立即播放(紧急)"""
- # 停止当前播放
- self.player.stop()
-
- # 合成并播放
- audio_path = self.tts.synthesize(text, speed=speed, volume=volume)
-
- if audio_path:
- for _ in range(repeat):
- self.player.play(audio_path, volume)
- time.sleep(0.5)
-
- def _play_command(self, command: VoiceCommand) -> bool:
- """播放命令"""
- audio_path = self.tts.synthesize(
- command.text,
- speed=command.speed,
- volume=command.volume,
- voice_id=command.voice_id
- )
-
- if not audio_path:
- return False
-
- for i in range(command.repeat):
- if not self.running:
- break
-
- success = self.player.play(audio_path, command.volume)
- if not success:
- return False
-
- if i < command.repeat - 1:
- time.sleep(command.interval)
-
- return True
-
- def get_stats(self) -> Dict[str, int]:
- """获取统计信息"""
- with self.stats_lock:
- return self.stats.copy()
-
- def clear_queue(self):
- """清空队列"""
- while not self.queue.empty():
- try:
- self.queue.get_nowait()
- except queue.Empty:
- break
- def create_voice_announcer(config: Dict[str, Any] = None) -> VoiceAnnouncer:
- """
- 创建语音播报器实例
-
- Args:
- config: 配置字典
-
- Returns:
- VoiceAnnouncer 实例
- """
- config = config or {}
-
- tts_config = config.get('tts', {})
- player_config = config.get('player', {})
-
- return VoiceAnnouncer(tts_config, player_config)
|