| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- from typing import Optional
- # ========== 地址配置协议 ==========
- def build_broadcast_query() -> bytes:
- """构建广播查询指令
- DTU定时发送广播指令查询未配置地址的从机
- 格式: 00 40 00 40 00 (无CRC, 固定5字节)
- """
- return bytes([0x00, 0x40, 0x00, 0x40, 0x00])
- def parse_device_response(data: bytes) -> Optional[dict]:
- """解析从机应答
- 从机应答格式: 00 + 41 + 12个UID + CRC_L + CRC_H
- 例如: 00 41 18 00 40 00 14 00 00 59 59 54 30 56 AE 34
- """
- if len(data) < 16:
- return {'error': '数据长度不足', 'raw_data': data.hex()}
- if data[1] != 0x41:
- return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()}
- uid_bytes = data[2:14]
- uid_hex = uid_bytes.hex()
- if not verify_crc16(data[:14]):
- logger.warning(f"CRC校验失败: {data.hex()}")
- return {
- 'function_code': 0x41,
- 'uid': uid_hex,
- 'uid_readable': ':'.join(f'{b:02x}' for b in uid_bytes),
- 'raw_data': data.hex()
- }
- def build_confirm_address(device_address: int, uid: bytes) -> bytes:
- """构建确认地址指令
- DTU遍历已存储的UID和地址,如果存在匹配的UID则发送确认指令
- 格式: add + 44 + 00 + 12个UID + CRC_L + CRC_H
- """
- if len(uid) != 12:
- raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}")
- data = bytes([device_address, 0x44, 0x00]) + uid
- return data + calculate_crc16(data)
- def build_assign_address(uid: bytes, address: int) -> bytes:
- """构建分配地址指令
- 如果DTU中没有该UID的记录,则分配新地址
- 格式: 00 + 42 + 12个UID + add + CRC_L + CRC_H
- """
- if len(uid) != 12:
- raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}")
- if address < 1 or address > 247:
- raise ValueError(f"设备地址必须在1-247之间,当前: {address}")
- data = bytes([0x00, 0x42]) + uid + bytes([address])
- return data + calculate_crc16(data)
- def parse_address_assignment_response(data: bytes) -> dict:
- """解析地址分配响应
- 模块应答格式: add + 43 + 00 + CRC_L + CRC_H
- 例如: 01 43 00 11 30
- """
- if len(data) < 5:
- return {'error': '数据长度不足', 'raw_data': data.hex()}
- device_address = data[0]
- if data[1] != 0x43:
- return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()}
- if not verify_crc16(data):
- logger.warning(f"CRC校验失败: {data.hex()}")
- return {
- 'device_address': device_address,
- 'function_code': 0x43,
- 'raw_data': data.hex()
- }
- class AddressConfigProtocol:
- """地址配置协议处理器"""
- def __init__(self, serial_port):
- self.serial = serial_port
- self.default_timeout = 1.0
- self.stored_devices = {}
- def add_stored_device(self, uid: str, address: int):
- """添加已存储的设备"""
- self.stored_devices[uid] = address
- def get_stored_devices(self) -> dict:
- """获取已存储的设备列表"""
- return self.stored_devices.copy()
- def broadcast_query(self, timeout: float = None) -> list:
- """send broadcast query and collect responses"""
- import time
- request = build_broadcast_query()
- logger.info(f"send broadcast: {request.hex()}")
- timeout = timeout or self.default_timeout
- response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=16)
- responses = []
- if response and len(response) >= 16:
- logger.info(f"got response: {response.hex()}")
- parsed = parse_device_response(response)
- if "error" not in parsed:
- responses.append(parsed)
- else:
- logger.info(f"no response or too short: {len(response) if response else 0}B")
- logger.info(f"broadcast done, got {len(responses)} responses")
- return responses
- def process_responses(self, responses: list) -> list:
- """处理广播查询响应,发送确认或分配地址指令"""
- import time
- results = []
- for resp in responses:
- if 'error' in resp:
- results.append({'status': 'error', 'message': resp['error']})
- continue
- uid = resp['uid']
- device_address = self.stored_devices.get(uid)
- if device_address:
- logger.info(f"UID={uid} 已配置地址={device_address},发送确认指令")
- request = build_confirm_address(device_address, bytes.fromhex(uid))
- self.serial.flush_input()
- success, msg = self.serial.send_raw(request)
- if success:
- results.append({
- 'status': 'confirmed',
- 'uid': uid,
- 'address': device_address,
- 'request': request.hex()
- })
- else:
- new_address = len(self.stored_devices) + 1
- if new_address > 247:
- new_address = 1
- logger.info(f"UID={uid} 未配置,分配新地址={new_address}")
- request = build_assign_address(bytes.fromhex(uid), new_address)
- self.serial.flush_input()
- time.sleep(0.05)
- success, msg = self.serial.send_raw(request)
- if success:
- self.stored_devices[uid] = new_address
- results.append({
- 'status': 'assigned',
- 'uid': uid,
- 'address': new_address,
- 'request': request.hex()
- })
- return results
- def auto_configure(self, timeout: float = None) -> dict:
- """自动配置所有未配置的设备"""
- logger.info("开始自动配置设备...")
- responses = self.broadcast_query(timeout)
- if not responses:
- return {
- 'success': True,
- 'message': '未发现任何从机设备',
- 'discovered': 0,
- 'configured': 0,
- 'results': []
- }
- results = self.process_responses(responses)
- confirmed = sum(1 for r in results if r.get('status') == 'confirmed')
- assigned = sum(1 for r in results if r.get('status') == 'assigned')
- errors = sum(1 for r in results if r.get('status') == 'error')
- return {
- 'success': errors == 0,
- 'discovered': len(responses),
- 'confirmed': confirmed,
- 'assigned': assigned,
- 'errors': errors,
- 'results': results,
- 'stored_devices': self.get_stored_devices()
- }
- def save_config(self, filepath: str) -> bool:
- """保存配置到文件"""
- import json
- try:
- with open(filepath, 'w') as f:
- json.dump(self.stored_devices, f, indent=2)
- return True
- except Exception as e:
- logger.error(f"保存配置失败: {e}")
- return False
- def load_config(self, filepath: str) -> bool:
- """从文件加载配置"""
- import json
- try:
- with open(filepath, 'r') as f:
- self.stored_devices = json.load(f)
- return True
- except Exception as e:
- logger.error(f"加载配置失败: {e}")
- return False
- # ========== Modbus RTU Client ==========
- # 天线地址映射表(天线编号 -> Modbus寄存器地址)
- ANTENNA_ADDRESSES = {i: 0x0001 + (i - 1) for i in range(1, 25)}
- import struct
- import logging
- logger = logging.getLogger('serial_mqtt_gateway')
- def calculate_crc16(data: bytes) -> bytes:
- """计算Modbus CRC16校验"""
- crc = 0xFFFF
- for byte in data:
- crc ^= byte
- for _ in range(8):
- if crc & 0x0001:
- crc = (crc >> 1) ^ 0xA001
- else:
- crc >>= 1
- return struct.pack('<H', crc)
- def verify_crc16(data: bytes) -> bool:
- """验证Modbus CRC16校验"""
- if len(data) < 2:
- return False
- received_crc = struct.unpack('<H', data[-2:])[0]
- calculated_crc = struct.unpack('<H', calculate_crc16(data[:-2]))[0]
- return received_crc == calculated_crc
- class ModbusRTUClient:
- """Modbus RTU 客户端"""
- def __init__(self, serial_port):
- self.serial = serial_port
- self.default_timeout = 1.0
- def _build_request(self, device_address: int, function_code: int, data: bytes = b'') -> bytes:
- """构建Modbus请求帧"""
- pdu = bytes([device_address, function_code]) + data
- return pdu + calculate_crc16(pdu)
- def _send_receive(self, request: bytes, expected_min_len: int = 5, timeout: float = None) -> dict:
- """send request and receive response using sync method"""
- timeout = timeout or self.default_timeout
- if not self.serial.ser:
- return {"error": "serial not connected"}
- response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=expected_min_len)
- if not response or len(response) < expected_min_len:
- return {"error": "response timeout or too short", "raw_data": response.hex() if response else ""}
- if not verify_crc16(response):
- logger.warning(f"CRC check failed: {response.hex()}")
- return {"error": "CRC check failed", "raw_data": response.hex()}
- return {
- "success": True,
- "device_address": response[0],
- "function_code": response[1],
- "data": response[2:-2].hex(),
- "raw_data": response.hex()
- }
- def read_antenna_card(self, device_address: int, antenna_num: int, timeout: float = None) -> dict:
- """读取天线卡号
- 协议: 读寄存器 0x0002+(ant-1)*4, 共4个寄存器(8字节卡号)
- """
- reg_addr = 0x0002 + (antenna_num - 1) * 4
- request_data = struct.pack('>HH', reg_addr, 4) # 读取4个寄存器
- request = self._build_request(device_address, 0x03, request_data)
- result = self._send_receive(request, timeout=timeout)
- if 'error' in result:
- return result
- data_str = result.get('data', '')
- data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b''
- if len(data_bytes) >= 8:
- # 8字节卡号, big-endian
- card_number = int.from_bytes(data_bytes[:8], 'big')
- card_str = data_bytes[:8].hex()
- return {
- 'card_number': card_number,
- 'card_number_hex': f'0x{card_number:016x}',
- 'card_number_str': card_str,
- 'antenna': antenna_num,
- 'uid': ':'.join(data_bytes[:8].hex()[i:i+2] for i in range(0, 16, 2)),
- **result
- }
- return {'error': f'数据长度不足({len(data_bytes)}B,需要8B)', **result}
- def read_holding_registers(self, device_address: int, start_address: int, quantity: int = 1, timeout: float = None) -> dict:
- """读取保持寄存器"""
- request_data = struct.pack('>HH', start_address, quantity)
- request = self._build_request(device_address, 0x03, request_data)
- result = self._send_receive(request, timeout=timeout)
- if 'error' in result:
- return result
- data_str = result.get('data', '')
- data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b''
- registers = []
- for i in range(0, len(data_bytes), 2):
- if i + 1 < len(data_bytes):
- registers.append(struct.unpack('>H', data_bytes[i:i+2])[0])
- return {'registers': registers, **result}
- def write_single_register(self, device_address: int, register_address: int, value: int, timeout: float = None) -> dict:
- """写单个寄存器"""
- request_data = struct.pack('>HH', register_address, value)
- request = self._build_request(device_address, 0x06, request_data)
- return self._send_receive(request, timeout=timeout)
- def set_rgb_led(self, device_address: int, led_number: int, color: int, timeout: float = None) -> dict:
- """设置RGB灯
- Register 0x0001, value = (led_number << 8) | color
- color: 0=off, 1=flashing red, 2=flashing green, 3=flashing blue
- """
- value = (led_number << 8) | (color & 0xFF)
- return self.write_single_register(device_address, 0x0001, value, timeout)
- def scan_devices(self, max_address: int = 247) -> list:
- """scan online devices using Modbus 03"""
- import time
- devices = []
- for addr in range(1, min(max_address + 1, 248)):
- request = self._build_request(addr, 0x03, struct.pack('>HH', 0x0000, 0x0001))
- response = self.serial.send_and_wait(request, timeout=0.3, min_response_bytes=5)
- if response and len(response) >= 5 and verify_crc16(response):
- devices.append(addr)
- return devices
|