from typing import Optional # ========== 地址配置协议 ========== def build_broadcast_query() -> bytes: """构建广播查询指令 DTU定时发送广播指令查询未配置地址的从机 格式: 00 40 00 40 00 (无CRC, 固定5字节) """ return bytes([0x00, 0x40, 0x00, 0x40, 0x00]) def parse_device_response(data: bytes) -> Optional[dict]: """解析从机应答 从机应答格式: 00 + 41 + 12个UID + CRC_L + CRC_H 例如: 00 41 18 00 40 00 14 00 00 59 59 54 30 56 AE 34 """ if len(data) < 16: return {'error': '数据长度不足', 'raw_data': data.hex()} if data[1] != 0x41: return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()} uid_bytes = data[2:14] uid_hex = uid_bytes.hex() if not verify_crc16(data[:14]): logger.warning(f"CRC校验失败: {data.hex()}") return { 'function_code': 0x41, 'uid': uid_hex, 'uid_readable': ':'.join(f'{b:02x}' for b in uid_bytes), 'raw_data': data.hex() } def build_confirm_address(device_address: int, uid: bytes) -> bytes: """构建确认地址指令 DTU遍历已存储的UID和地址,如果存在匹配的UID则发送确认指令 格式: add + 44 + 00 + 12个UID + CRC_L + CRC_H """ if len(uid) != 12: raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}") data = bytes([device_address, 0x44, 0x00]) + uid return data + calculate_crc16(data) def build_assign_address(uid: bytes, address: int) -> bytes: """构建分配地址指令 如果DTU中没有该UID的记录,则分配新地址 格式: 00 + 42 + 12个UID + add + CRC_L + CRC_H """ if len(uid) != 12: raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}") if address < 1 or address > 247: raise ValueError(f"设备地址必须在1-247之间,当前: {address}") data = bytes([0x00, 0x42]) + uid + bytes([address]) return data + calculate_crc16(data) def parse_address_assignment_response(data: bytes) -> dict: """解析地址分配响应 模块应答格式: add + 43 + 00 + CRC_L + CRC_H 例如: 01 43 00 11 30 """ if len(data) < 5: return {'error': '数据长度不足', 'raw_data': data.hex()} device_address = data[0] if data[1] != 0x43: return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()} if not verify_crc16(data): logger.warning(f"CRC校验失败: {data.hex()}") return { 'device_address': device_address, 'function_code': 0x43, 'raw_data': data.hex() } class AddressConfigProtocol: """地址配置协议处理器""" def __init__(self, serial_port): self.serial = serial_port self.default_timeout = 1.0 self.stored_devices = {} def add_stored_device(self, uid: str, address: int): """添加已存储的设备""" self.stored_devices[uid] = address def get_stored_devices(self) -> dict: """获取已存储的设备列表""" return self.stored_devices.copy() def broadcast_query(self, timeout: float = None) -> list: """send broadcast query and collect responses""" import time request = build_broadcast_query() logger.info(f"send broadcast: {request.hex()}") timeout = timeout or self.default_timeout response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=16) responses = [] if response and len(response) >= 16: logger.info(f"got response: {response.hex()}") parsed = parse_device_response(response) if "error" not in parsed: responses.append(parsed) else: logger.info(f"no response or too short: {len(response) if response else 0}B") logger.info(f"broadcast done, got {len(responses)} responses") return responses def process_responses(self, responses: list) -> list: """处理广播查询响应,发送确认或分配地址指令""" import time results = [] for resp in responses: if 'error' in resp: results.append({'status': 'error', 'message': resp['error']}) continue uid = resp['uid'] device_address = self.stored_devices.get(uid) if device_address: logger.info(f"UID={uid} 已配置地址={device_address},发送确认指令") request = build_confirm_address(device_address, bytes.fromhex(uid)) self.serial.flush_input() success, msg = self.serial.send_raw(request) if success: results.append({ 'status': 'confirmed', 'uid': uid, 'address': device_address, 'request': request.hex() }) else: new_address = len(self.stored_devices) + 1 if new_address > 247: new_address = 1 logger.info(f"UID={uid} 未配置,分配新地址={new_address}") request = build_assign_address(bytes.fromhex(uid), new_address) self.serial.flush_input() time.sleep(0.05) success, msg = self.serial.send_raw(request) if success: self.stored_devices[uid] = new_address results.append({ 'status': 'assigned', 'uid': uid, 'address': new_address, 'request': request.hex() }) return results def auto_configure(self, timeout: float = None) -> dict: """自动配置所有未配置的设备""" logger.info("开始自动配置设备...") responses = self.broadcast_query(timeout) if not responses: return { 'success': True, 'message': '未发现任何从机设备', 'discovered': 0, 'configured': 0, 'results': [] } results = self.process_responses(responses) confirmed = sum(1 for r in results if r.get('status') == 'confirmed') assigned = sum(1 for r in results if r.get('status') == 'assigned') errors = sum(1 for r in results if r.get('status') == 'error') return { 'success': errors == 0, 'discovered': len(responses), 'confirmed': confirmed, 'assigned': assigned, 'errors': errors, 'results': results, 'stored_devices': self.get_stored_devices() } def save_config(self, filepath: str) -> bool: """保存配置到文件""" import json try: with open(filepath, 'w') as f: json.dump(self.stored_devices, f, indent=2) return True except Exception as e: logger.error(f"保存配置失败: {e}") return False def load_config(self, filepath: str) -> bool: """从文件加载配置""" import json try: with open(filepath, 'r') as f: self.stored_devices = json.load(f) return True except Exception as e: logger.error(f"加载配置失败: {e}") return False # ========== Modbus RTU Client ========== # 天线地址映射表(天线编号 -> Modbus寄存器地址) ANTENNA_ADDRESSES = {i: 0x0001 + (i - 1) for i in range(1, 25)} import struct import logging logger = logging.getLogger('serial_mqtt_gateway') def calculate_crc16(data: bytes) -> bytes: """计算Modbus CRC16校验""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return struct.pack(' bool: """验证Modbus CRC16校验""" if len(data) < 2: return False received_crc = struct.unpack(' bytes: """构建Modbus请求帧""" pdu = bytes([device_address, function_code]) + data return pdu + calculate_crc16(pdu) def _send_receive(self, request: bytes, expected_min_len: int = 5, timeout: float = None) -> dict: """send request and receive response using sync method""" timeout = timeout or self.default_timeout if not self.serial.ser: return {"error": "serial not connected"} response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=expected_min_len) if not response or len(response) < expected_min_len: return {"error": "response timeout or too short", "raw_data": response.hex() if response else ""} if not verify_crc16(response): logger.warning(f"CRC check failed: {response.hex()}") return {"error": "CRC check failed", "raw_data": response.hex()} return { "success": True, "device_address": response[0], "function_code": response[1], "data": response[2:-2].hex(), "raw_data": response.hex() } def read_antenna_card(self, device_address: int, antenna_num: int, timeout: float = None) -> dict: """读取天线卡号 协议: 读寄存器 0x0002+(ant-1)*4, 共4个寄存器(8字节卡号) """ reg_addr = 0x0002 + (antenna_num - 1) * 4 request_data = struct.pack('>HH', reg_addr, 4) # 读取4个寄存器 request = self._build_request(device_address, 0x03, request_data) result = self._send_receive(request, timeout=timeout) if 'error' in result: return result data_str = result.get('data', '') data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b'' if len(data_bytes) >= 8: # 8字节卡号, big-endian card_number = int.from_bytes(data_bytes[:8], 'big') card_str = data_bytes[:8].hex() return { 'card_number': card_number, 'card_number_hex': f'0x{card_number:016x}', 'card_number_str': card_str, 'antenna': antenna_num, 'uid': ':'.join(data_bytes[:8].hex()[i:i+2] for i in range(0, 16, 2)), **result } return {'error': f'数据长度不足({len(data_bytes)}B,需要8B)', **result} def read_holding_registers(self, device_address: int, start_address: int, quantity: int = 1, timeout: float = None) -> dict: """读取保持寄存器""" request_data = struct.pack('>HH', start_address, quantity) request = self._build_request(device_address, 0x03, request_data) result = self._send_receive(request, timeout=timeout) if 'error' in result: return result data_str = result.get('data', '') data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b'' registers = [] for i in range(0, len(data_bytes), 2): if i + 1 < len(data_bytes): registers.append(struct.unpack('>H', data_bytes[i:i+2])[0]) return {'registers': registers, **result} def write_single_register(self, device_address: int, register_address: int, value: int, timeout: float = None) -> dict: """写单个寄存器""" request_data = struct.pack('>HH', register_address, value) request = self._build_request(device_address, 0x06, request_data) return self._send_receive(request, timeout=timeout) def set_rgb_led(self, device_address: int, led_number: int, color: int, timeout: float = None) -> dict: """设置RGB灯 Register 0x0001, value = (led_number << 8) | color color: 0=off, 1=flashing red, 2=flashing green, 3=flashing blue """ value = (led_number << 8) | (color & 0xFF) return self.write_single_register(device_address, 0x0001, value, timeout) def scan_devices(self, max_address: int = 247) -> list: """scan online devices using Modbus 03""" import time devices = [] for addr in range(1, min(max_address + 1, 248)): request = self._build_request(addr, 0x03, struct.pack('>HH', 0x0000, 0x0001)) response = self.serial.send_and_wait(request, timeout=0.3, min_response_bytes=5) if response and len(response) >= 5 and verify_crc16(response): devices.append(addr) return devices