| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- import serial
- import serial.tools.list_ports
- import threading
- import time
- import logging
- import platform
- import glob
- from dataclasses import dataclass
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger('serial_port')
- @dataclass
- class SerialConfig:
- """串口配置数据类"""
- port: str
- baudrate: int = 115200
- bytesize: int = serial.EIGHTBITS
- parity: str = serial.PARITY_NONE
- stopbits: int = serial.STOPBITS_ONE
- timeout: float = 1.0
- xonxoff: bool = False
- rtscts: bool = False
- dsrdtr: bool = False
- class SerialPort:
- """串口通信类,提供串口连接、读写和状态管理功能"""
-
- def __init__(self):
- self.ser = None
- self.is_connected = False
- self.lock = threading.RLock() # 使用可重入锁
- self.read_thread = None
- self.stop_event = threading.Event()
- self.data_callback = None
- self.status_callback = None
- self.error_callback = None
- self.current_config = None
- self.reconnect_attempts = 0
- self.max_reconnect_attempts = 3
-
- def list_ports(self):
- """列出系统中可用的串口"""
- ports = []
-
- try:
- # 首先尝试使用serial.tools.list_ports
- try:
- detected_ports = [port.device for port in serial.tools.list_ports.comports()]
- ports.extend(detected_ports)
- except Exception as e:
- logger.warning(f"使用serial.tools.list_ports失败: {str(e)}")
-
- # 根据不同平台进行补充查找
- system = platform.system()
-
- if system == 'Windows':
- try:
- import winreg
- # 在Windows系统中读取注册表
- key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
- r'HARDWARE\DEVICEMAP\SERIALCOMM')
- i = 0
- while True:
- try:
- port, value, _ = winreg.EnumValue(key, i)
- if value not in ports:
- ports.append(value)
- i += 1
- except OSError:
- break
- except Exception as e:
- logger.error(f"读取Windows串口注册表失败: {str(e)}")
-
- elif system == 'Darwin': # macOS
- # 使用glob查找/dev/tty.*设备
- darwin_ports = glob.glob('/dev/tty.*')
- # 过滤掉不需要的端口
- for port in darwin_ports:
- if not ('Bluetooth' in port or 'debug' in port or 'com.apple' in port) and port not in ports:
- ports.append(port)
-
- elif system == 'Linux':
- # 使用glob查找Linux系统中的串口
- linux_ports = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')
- for port in linux_ports:
- if port not in ports:
- ports.append(port)
-
- logger.info(f"找到 {len(ports)} 个可用串口: {ports}")
- except Exception as e:
- logger.error(f"列出串口时出错: {str(e)}")
-
- return sorted(ports) # 排序返回
-
- def connect(self, port, baudrate=9600, timeout=1, **kwargs):
- """连接到串口"""
- try:
- # 构建配置
- config = SerialConfig(
- port=port,
- baudrate=baudrate,
- timeout=timeout,
- **kwargs
- )
-
- with self.lock:
- if self.is_connected:
- self.disconnect()
-
- logger.info(f"尝试连接串口: {port}, 波特率: {baudrate}")
- self.ser = serial.Serial(
- port=config.port,
- baudrate=config.baudrate,
- bytesize=config.bytesize,
- parity=config.parity,
- stopbits=config.stopbits,
- timeout=config.timeout,
- xonxoff=config.xonxoff,
- rtscts=config.rtscts,
- dsrdtr=config.dsrdtr
- )
-
- # 检查连接是否成功
- if not self.ser.is_open:
- raise Exception("串口打开失败")
-
- self.is_connected = True
- self.stop_event.clear()
- self.current_config = config
- self.reconnect_attempts = 0
-
- # 启动读取线程
- self.read_thread = threading.Thread(target=self._read_loop, daemon=True)
- self.read_thread.start()
-
- if self.status_callback:
- self.status_callback(True)
-
- logger.info(f"已连接到 {port},波特率 {baudrate}")
- return True, f"已连接到 {port},波特率 {baudrate}"
-
- except Exception as e:
- error_msg = f"连接失败: {str(e)}"
- logger.error(error_msg)
- if self.status_callback:
- self.status_callback(False)
- if self.error_callback:
- self.error_callback(error_msg)
- return False, error_msg
-
- def disconnect(self):
- """断开串口连接"""
- try:
- with self.lock:
- logger.info("断开串口连接")
- self.stop_event.set()
- if self.read_thread and self.read_thread.is_alive():
- self.read_thread.join(timeout=2.0)
- if self.read_thread.is_alive():
- logger.warning("读取线程未能正常终止")
-
- if self.ser and self.ser.is_open:
- try:
- self.ser.close()
- except Exception as e:
- logger.error(f"关闭串口时出错: {str(e)}")
- finally:
- self.ser = None
-
- self.is_connected = False
- self.current_config = None
-
- if self.status_callback:
- self.status_callback(False)
-
- return True, "已断开连接"
-
- except Exception as e:
- error_msg = f"断开连接失败: {str(e)}"
- logger.error(error_msg)
- if self.error_callback:
- self.error_callback(error_msg)
- return False, error_msg
-
- def _read_loop(self):
- """读取串口数据的循环"""
- logger.info("启动串口读取线程")
-
- while not self.stop_event.is_set():
- try:
- if self.ser and self.ser.is_open:
- # 使用in_waiting提高效率
- if self.ser.in_waiting > 0:
- data = self.ser.read(self.ser.in_waiting)
- # 尝试解码,如果失败则返回原始数据
- try:
- decoded_data = data.decode('utf-8', errors='replace').strip()
- if decoded_data and self.data_callback:
- self.data_callback(decoded_data)
- except Exception as e:
- # 对于无法解码的二进制数据,以十六进制形式返回
- hex_data = data.hex()
- if hex_data and self.data_callback:
- self.data_callback(hex_data)
- logger.warning(f"收到无法解码的二进制数据,长度: {len(data)}字节")
- time.sleep(0.001)
- except Exception as e:
- error_msg = f"读取串口数据错误: {str(e)}"
- logger.error(error_msg)
- if self.error_callback:
- self.error_callback(error_msg)
-
- # 尝试重连
- if self._should_reconnect():
- logger.warning(f"尝试重连串口... (第{self.reconnect_attempts}次)")
- if self.current_config:
- # 等待一段时间后重连
- time.sleep(2)
- self.connect(
- port=self.current_config.port,
- baudrate=self.current_config.baudrate,
- timeout=self.current_config.timeout,
- bytesize=self.current_config.bytesize,
- parity=self.current_config.parity,
- stopbits=self.current_config.stopbits,
- xonxoff=self.current_config.xonxoff,
- rtscts=self.current_config.rtscts,
- dsrdtr=self.current_config.dsrdtr
- )
- break
-
- # 线程结束时清理资源
- logger.info("串口读取线程结束")
- with self.lock:
- self.is_connected = False
- if self.ser:
- try:
- self.ser.close()
- except:
- pass
- self.ser = None
- if self.status_callback:
- self.status_callback(False)
-
- def send_data(self, data, encoding='utf-8'):
- """发送数据到串口"""
- try:
- with self.lock:
- if not self.is_connected or not self.ser or not self.ser.is_open:
- return False, "串口未连接"
-
- # 确保数据以换行符结束
- if isinstance(data, str):
- if not data.endswith('\n'):
- data += '\n'
- bytes_data = data.encode(encoding)
- elif isinstance(data, bytes):
- if not data.endswith(b'\n'):
- bytes_data = data + b'\n'
- else:
- bytes_data = data
- else:
- raise TypeError("数据必须是字符串或字节类型")
-
- bytes_sent = self.ser.write(bytes_data)
- self.ser.flush() # 确保数据被发送
-
- logger.debug(f"发送数据到串口: {bytes_data.hex()[:50]}... (共{bytes_sent}字节)")
- return True, "发送成功"
- except Exception as e:
- error_msg = f"发送失败: {str(e)}"
- logger.error(error_msg)
- if self.error_callback:
- self.error_callback(error_msg)
- return False, error_msg
-
- def set_data_callback(self, callback):
- """设置数据接收回调函数"""
- self.data_callback = callback
-
- def set_status_callback(self, callback):
- """设置状态变化回调函数"""
- self.status_callback = callback
-
- def set_error_callback(self, callback):
- """设置错误回调函数"""
- self.error_callback = callback
-
- def get_status(self):
- """获取当前连接状态"""
- with self.lock:
- return {
- 'connected': self.is_connected,
- 'config': self.current_config,
- 'has_error': self.reconnect_attempts > 0
- }
-
- def _should_reconnect(self):
- """判断是否应该尝试重连"""
- self.reconnect_attempts += 1
- return self.reconnect_attempts <= self.max_reconnect_attempts
-
- def flush_input(self):
- """清空输入缓冲区"""
- try:
- with self.lock:
- if self.ser and self.ser.is_open:
- self.ser.reset_input_buffer()
- return True, "输入缓冲区已清空"
- return False, "串口未连接"
- except Exception as e:
- error_msg = f"清空缓冲区失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
-
- def flush_output(self):
- """清空输出缓冲区"""
- try:
- with self.lock:
- if self.ser and self.ser.is_open:
- self.ser.reset_output_buffer()
- return True, "输出缓冲区已清空"
- return False, "串口未连接"
- except Exception as e:
- error_msg = f"清空缓冲区失败: {str(e)}"
- logger.error(error_msg)
- return False, error_msg
- # 创建全局串口实例
- global_serial = SerialPort()
|