import serial import serial.tools.list_ports import threading import time import logging import platform import glob from dataclasses import dataclass # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('serial_port') @dataclass class SerialConfig: """串口配置数据类""" port: str baudrate: int = 115200 bytesize: int = serial.EIGHTBITS parity: str = serial.PARITY_NONE stopbits: int = serial.STOPBITS_ONE timeout: float = 1.0 xonxoff: bool = False rtscts: bool = False dsrdtr: bool = False class SerialPort: """串口通信类,提供串口连接、读写和状态管理功能""" def __init__(self): self.ser = None self.is_connected = False self.lock = threading.RLock() # 使用可重入锁 self.read_thread = None self.stop_event = threading.Event() self.data_callback = None self.status_callback = None self.error_callback = None self.current_config = None self.reconnect_attempts = 0 self.max_reconnect_attempts = 3 def list_ports(self): """列出系统中可用的串口""" ports = [] try: # 首先尝试使用serial.tools.list_ports try: detected_ports = [port.device for port in serial.tools.list_ports.comports()] ports.extend(detected_ports) except Exception as e: logger.warning(f"使用serial.tools.list_ports失败: {str(e)}") # 根据不同平台进行补充查找 system = platform.system() if system == 'Windows': try: import winreg # 在Windows系统中读取注册表 key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r'HARDWARE\DEVICEMAP\SERIALCOMM') i = 0 while True: try: port, value, _ = winreg.EnumValue(key, i) if value not in ports: ports.append(value) i += 1 except OSError: break except Exception as e: logger.error(f"读取Windows串口注册表失败: {str(e)}") elif system == 'Darwin': # macOS # 使用glob查找/dev/tty.*设备 darwin_ports = glob.glob('/dev/tty.*') # 过滤掉不需要的端口 for port in darwin_ports: if not ('Bluetooth' in port or 'debug' in port or 'com.apple' in port) and port not in ports: ports.append(port) elif system == 'Linux': # 使用glob查找Linux系统中的串口 linux_ports = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') for port in linux_ports: if port not in ports: ports.append(port) logger.info(f"找到 {len(ports)} 个可用串口: {ports}") except Exception as e: logger.error(f"列出串口时出错: {str(e)}") return sorted(ports) # 排序返回 def connect(self, port, baudrate=9600, timeout=1, **kwargs): """连接到串口""" try: # 构建配置 config = SerialConfig( port=port, baudrate=baudrate, timeout=timeout, **kwargs ) with self.lock: if self.is_connected: self.disconnect() logger.info(f"尝试连接串口: {port}, 波特率: {baudrate}") self.ser = serial.Serial( port=config.port, baudrate=config.baudrate, bytesize=config.bytesize, parity=config.parity, stopbits=config.stopbits, timeout=config.timeout, xonxoff=config.xonxoff, rtscts=config.rtscts, dsrdtr=config.dsrdtr ) # 检查连接是否成功 if not self.ser.is_open: raise Exception("串口打开失败") self.is_connected = True self.stop_event.clear() self.current_config = config self.reconnect_attempts = 0 # 启动读取线程 self.read_thread = threading.Thread(target=self._read_loop, daemon=True) self.read_thread.start() if self.status_callback: self.status_callback(True) logger.info(f"已连接到 {port},波特率 {baudrate}") return True, f"已连接到 {port},波特率 {baudrate}" except Exception as e: error_msg = f"连接失败: {str(e)}" logger.error(error_msg) if self.status_callback: self.status_callback(False) if self.error_callback: self.error_callback(error_msg) return False, error_msg def disconnect(self): """断开串口连接""" try: with self.lock: logger.info("断开串口连接") self.stop_event.set() if self.read_thread and self.read_thread.is_alive(): self.read_thread.join(timeout=2.0) if self.read_thread.is_alive(): logger.warning("读取线程未能正常终止") if self.ser and self.ser.is_open: try: self.ser.close() except Exception as e: logger.error(f"关闭串口时出错: {str(e)}") finally: self.ser = None self.is_connected = False self.current_config = None if self.status_callback: self.status_callback(False) return True, "已断开连接" except Exception as e: error_msg = f"断开连接失败: {str(e)}" logger.error(error_msg) if self.error_callback: self.error_callback(error_msg) return False, error_msg def _read_loop(self): """读取串口数据的循环""" logger.info("启动串口读取线程") while not self.stop_event.is_set(): try: if self.ser and self.ser.is_open: # 使用in_waiting提高效率 if self.ser.in_waiting > 0: data = self.ser.read(self.ser.in_waiting) # 尝试解码,如果失败则返回原始数据 try: decoded_data = data.decode('utf-8', errors='replace').strip() if decoded_data and self.data_callback: self.data_callback(decoded_data) except Exception as e: # 对于无法解码的二进制数据,以十六进制形式返回 hex_data = data.hex() if hex_data and self.data_callback: self.data_callback(hex_data) logger.warning(f"收到无法解码的二进制数据,长度: {len(data)}字节") time.sleep(0.001) except Exception as e: error_msg = f"读取串口数据错误: {str(e)}" logger.error(error_msg) if self.error_callback: self.error_callback(error_msg) # 尝试重连 if self._should_reconnect(): logger.warning(f"尝试重连串口... (第{self.reconnect_attempts}次)") if self.current_config: # 等待一段时间后重连 time.sleep(2) self.connect( port=self.current_config.port, baudrate=self.current_config.baudrate, timeout=self.current_config.timeout, bytesize=self.current_config.bytesize, parity=self.current_config.parity, stopbits=self.current_config.stopbits, xonxoff=self.current_config.xonxoff, rtscts=self.current_config.rtscts, dsrdtr=self.current_config.dsrdtr ) break # 线程结束时清理资源 logger.info("串口读取线程结束") with self.lock: self.is_connected = False if self.ser: try: self.ser.close() except: pass self.ser = None if self.status_callback: self.status_callback(False) def send_data(self, data, encoding='utf-8'): """发送数据到串口""" try: with self.lock: if not self.is_connected or not self.ser or not self.ser.is_open: return False, "串口未连接" # 确保数据以换行符结束 if isinstance(data, str): if not data.endswith('\n'): data += '\n' bytes_data = data.encode(encoding) elif isinstance(data, bytes): if not data.endswith(b'\n'): bytes_data = data + b'\n' else: bytes_data = data else: raise TypeError("数据必须是字符串或字节类型") bytes_sent = self.ser.write(bytes_data) self.ser.flush() # 确保数据被发送 logger.debug(f"发送数据到串口: {bytes_data.hex()[:50]}... (共{bytes_sent}字节)") return True, "发送成功" except Exception as e: error_msg = f"发送失败: {str(e)}" logger.error(error_msg) if self.error_callback: self.error_callback(error_msg) return False, error_msg def set_data_callback(self, callback): """设置数据接收回调函数""" self.data_callback = callback def set_status_callback(self, callback): """设置状态变化回调函数""" self.status_callback = callback def set_error_callback(self, callback): """设置错误回调函数""" self.error_callback = callback def get_status(self): """获取当前连接状态""" with self.lock: return { 'connected': self.is_connected, 'config': self.current_config, 'has_error': self.reconnect_attempts > 0 } def _should_reconnect(self): """判断是否应该尝试重连""" self.reconnect_attempts += 1 return self.reconnect_attempts <= self.max_reconnect_attempts def flush_input(self): """清空输入缓冲区""" try: with self.lock: if self.ser and self.ser.is_open: self.ser.reset_input_buffer() return True, "输入缓冲区已清空" return False, "串口未连接" except Exception as e: error_msg = f"清空缓冲区失败: {str(e)}" logger.error(error_msg) return False, error_msg def flush_output(self): """清空输出缓冲区""" try: with self.lock: if self.ser and self.ser.is_open: self.ser.reset_output_buffer() return True, "输出缓冲区已清空" return False, "串口未连接" except Exception as e: error_msg = f"清空缓冲区失败: {str(e)}" logger.error(error_msg) return False, error_msg # 创建全局串口实例 global_serial = SerialPort()