modbus_rtu.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. from typing import Optional
  2. # ========== 地址配置协议 ==========
  3. def build_broadcast_query() -> bytes:
  4. """构建广播查询指令
  5. DTU定时发送广播指令查询未配置地址的从机
  6. 格式: 00 40 00 40 00 (无CRC, 固定5字节)
  7. """
  8. return bytes([0x00, 0x40, 0x00, 0x40, 0x00])
  9. def parse_device_response(data: bytes) -> Optional[dict]:
  10. """解析从机应答
  11. 从机应答格式: 00 + 41 + 12个UID + CRC_L + CRC_H
  12. 例如: 00 41 18 00 40 00 14 00 00 59 59 54 30 56 AE 34
  13. """
  14. if len(data) < 16:
  15. return {'error': '数据长度不足', 'raw_data': data.hex()}
  16. if data[1] != 0x41:
  17. return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()}
  18. uid_bytes = data[2:14]
  19. uid_hex = uid_bytes.hex()
  20. if not verify_crc16(data[:14]):
  21. logger.warning(f"CRC校验失败: {data.hex()}")
  22. return {
  23. 'function_code': 0x41,
  24. 'uid': uid_hex,
  25. 'uid_readable': ':'.join(f'{b:02x}' for b in uid_bytes),
  26. 'raw_data': data.hex()
  27. }
  28. def build_confirm_address(device_address: int, uid: bytes) -> bytes:
  29. """构建确认地址指令
  30. DTU遍历已存储的UID和地址,如果存在匹配的UID则发送确认指令
  31. 格式: add + 44 + 00 + 12个UID + CRC_L + CRC_H
  32. """
  33. if len(uid) != 12:
  34. raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}")
  35. data = bytes([device_address, 0x44, 0x00]) + uid
  36. return data + calculate_crc16(data)
  37. def build_assign_address(uid: bytes, address: int) -> bytes:
  38. """构建分配地址指令
  39. 如果DTU中没有该UID的记录,则分配新地址
  40. 格式: 00 + 42 + 12个UID + add + CRC_L + CRC_H
  41. """
  42. if len(uid) != 12:
  43. raise ValueError(f"UID长度必须是12字节,当前: {len(uid)}")
  44. if address < 1 or address > 247:
  45. raise ValueError(f"设备地址必须在1-247之间,当前: {address}")
  46. data = bytes([0x00, 0x42]) + uid + bytes([address])
  47. return data + calculate_crc16(data)
  48. def parse_address_assignment_response(data: bytes) -> dict:
  49. """解析地址分配响应
  50. 模块应答格式: add + 43 + 00 + CRC_L + CRC_H
  51. 例如: 01 43 00 11 30
  52. """
  53. if len(data) < 5:
  54. return {'error': '数据长度不足', 'raw_data': data.hex()}
  55. device_address = data[0]
  56. if data[1] != 0x43:
  57. return {'error': f"非预期功能码: {data[1]:#x}", 'raw_data': data.hex()}
  58. if not verify_crc16(data):
  59. logger.warning(f"CRC校验失败: {data.hex()}")
  60. return {
  61. 'device_address': device_address,
  62. 'function_code': 0x43,
  63. 'raw_data': data.hex()
  64. }
  65. class AddressConfigProtocol:
  66. """地址配置协议处理器"""
  67. def __init__(self, serial_port):
  68. self.serial = serial_port
  69. self.default_timeout = 1.0
  70. self.stored_devices = {}
  71. def add_stored_device(self, uid: str, address: int):
  72. """添加已存储的设备"""
  73. self.stored_devices[uid] = address
  74. def get_stored_devices(self) -> dict:
  75. """获取已存储的设备列表"""
  76. return self.stored_devices.copy()
  77. def broadcast_query(self, timeout: float = None) -> list:
  78. """send broadcast query and collect responses"""
  79. import time
  80. request = build_broadcast_query()
  81. logger.info(f"send broadcast: {request.hex()}")
  82. timeout = timeout or self.default_timeout
  83. response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=16)
  84. responses = []
  85. if response and len(response) >= 16:
  86. logger.info(f"got response: {response.hex()}")
  87. parsed = parse_device_response(response)
  88. if "error" not in parsed:
  89. responses.append(parsed)
  90. else:
  91. logger.info(f"no response or too short: {len(response) if response else 0}B")
  92. logger.info(f"broadcast done, got {len(responses)} responses")
  93. return responses
  94. def process_responses(self, responses: list) -> list:
  95. """处理广播查询响应,发送确认或分配地址指令"""
  96. import time
  97. results = []
  98. for resp in responses:
  99. if 'error' in resp:
  100. results.append({'status': 'error', 'message': resp['error']})
  101. continue
  102. uid = resp['uid']
  103. device_address = self.stored_devices.get(uid)
  104. if device_address:
  105. logger.info(f"UID={uid} 已配置地址={device_address},发送确认指令")
  106. request = build_confirm_address(device_address, bytes.fromhex(uid))
  107. self.serial.flush_input()
  108. success, msg = self.serial.send_raw(request)
  109. if success:
  110. results.append({
  111. 'status': 'confirmed',
  112. 'uid': uid,
  113. 'address': device_address,
  114. 'request': request.hex()
  115. })
  116. else:
  117. new_address = len(self.stored_devices) + 1
  118. if new_address > 247:
  119. new_address = 1
  120. logger.info(f"UID={uid} 未配置,分配新地址={new_address}")
  121. request = build_assign_address(bytes.fromhex(uid), new_address)
  122. self.serial.flush_input()
  123. time.sleep(0.05)
  124. success, msg = self.serial.send_raw(request)
  125. if success:
  126. self.stored_devices[uid] = new_address
  127. results.append({
  128. 'status': 'assigned',
  129. 'uid': uid,
  130. 'address': new_address,
  131. 'request': request.hex()
  132. })
  133. return results
  134. def auto_configure(self, timeout: float = None) -> dict:
  135. """自动配置所有未配置的设备"""
  136. logger.info("开始自动配置设备...")
  137. responses = self.broadcast_query(timeout)
  138. if not responses:
  139. return {
  140. 'success': True,
  141. 'message': '未发现任何从机设备',
  142. 'discovered': 0,
  143. 'configured': 0,
  144. 'results': []
  145. }
  146. results = self.process_responses(responses)
  147. confirmed = sum(1 for r in results if r.get('status') == 'confirmed')
  148. assigned = sum(1 for r in results if r.get('status') == 'assigned')
  149. errors = sum(1 for r in results if r.get('status') == 'error')
  150. return {
  151. 'success': errors == 0,
  152. 'discovered': len(responses),
  153. 'confirmed': confirmed,
  154. 'assigned': assigned,
  155. 'errors': errors,
  156. 'results': results,
  157. 'stored_devices': self.get_stored_devices()
  158. }
  159. def save_config(self, filepath: str) -> bool:
  160. """保存配置到文件"""
  161. import json
  162. try:
  163. with open(filepath, 'w') as f:
  164. json.dump(self.stored_devices, f, indent=2)
  165. return True
  166. except Exception as e:
  167. logger.error(f"保存配置失败: {e}")
  168. return False
  169. def load_config(self, filepath: str) -> bool:
  170. """从文件加载配置"""
  171. import json
  172. try:
  173. with open(filepath, 'r') as f:
  174. self.stored_devices = json.load(f)
  175. return True
  176. except Exception as e:
  177. logger.error(f"加载配置失败: {e}")
  178. return False
  179. # ========== Modbus RTU Client ==========
  180. # 天线地址映射表(天线编号 -> Modbus寄存器地址)
  181. ANTENNA_ADDRESSES = {i: 0x0001 + (i - 1) for i in range(1, 25)}
  182. import struct
  183. import logging
  184. logger = logging.getLogger('serial_mqtt_gateway')
  185. def calculate_crc16(data: bytes) -> bytes:
  186. """计算Modbus CRC16校验"""
  187. crc = 0xFFFF
  188. for byte in data:
  189. crc ^= byte
  190. for _ in range(8):
  191. if crc & 0x0001:
  192. crc = (crc >> 1) ^ 0xA001
  193. else:
  194. crc >>= 1
  195. return struct.pack('<H', crc)
  196. def verify_crc16(data: bytes) -> bool:
  197. """验证Modbus CRC16校验"""
  198. if len(data) < 2:
  199. return False
  200. received_crc = struct.unpack('<H', data[-2:])[0]
  201. calculated_crc = struct.unpack('<H', calculate_crc16(data[:-2]))[0]
  202. return received_crc == calculated_crc
  203. class ModbusRTUClient:
  204. """Modbus RTU 客户端"""
  205. def __init__(self, serial_port):
  206. self.serial = serial_port
  207. self.default_timeout = 1.0
  208. def _build_request(self, device_address: int, function_code: int, data: bytes = b'') -> bytes:
  209. """构建Modbus请求帧"""
  210. pdu = bytes([device_address, function_code]) + data
  211. return pdu + calculate_crc16(pdu)
  212. def _send_receive(self, request: bytes, expected_min_len: int = 5, timeout: float = None) -> dict:
  213. """send request and receive response using sync method"""
  214. timeout = timeout or self.default_timeout
  215. if not self.serial.ser:
  216. return {"error": "serial not connected"}
  217. response = self.serial.send_and_wait(request, timeout=timeout, min_response_bytes=expected_min_len)
  218. if not response or len(response) < expected_min_len:
  219. return {"error": "response timeout or too short", "raw_data": response.hex() if response else ""}
  220. if not verify_crc16(response):
  221. logger.warning(f"CRC check failed: {response.hex()}")
  222. return {"error": "CRC check failed", "raw_data": response.hex()}
  223. return {
  224. "success": True,
  225. "device_address": response[0],
  226. "function_code": response[1],
  227. "data": response[2:-2].hex(),
  228. "raw_data": response.hex()
  229. }
  230. def read_antenna_card(self, device_address: int, antenna_num: int, timeout: float = None) -> dict:
  231. """读取天线卡号
  232. 协议: 读寄存器 0x0002+(ant-1)*4, 共4个寄存器(8字节卡号)
  233. """
  234. reg_addr = 0x0002 + (antenna_num - 1) * 4
  235. request_data = struct.pack('>HH', reg_addr, 4) # 读取4个寄存器
  236. request = self._build_request(device_address, 0x03, request_data)
  237. result = self._send_receive(request, timeout=timeout)
  238. if 'error' in result:
  239. return result
  240. data_str = result.get('data', '')
  241. data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b''
  242. if len(data_bytes) >= 8:
  243. # 8字节卡号, big-endian
  244. card_number = int.from_bytes(data_bytes[:8], 'big')
  245. card_str = data_bytes[:8].hex()
  246. return {
  247. 'card_number': card_number,
  248. 'card_number_hex': f'0x{card_number:016x}',
  249. 'card_number_str': card_str,
  250. 'antenna': antenna_num,
  251. 'uid': ':'.join(data_bytes[:8].hex()[i:i+2] for i in range(0, 16, 2)),
  252. **result
  253. }
  254. return {'error': f'数据长度不足({len(data_bytes)}B,需要8B)', **result}
  255. def read_holding_registers(self, device_address: int, start_address: int, quantity: int = 1, timeout: float = None) -> dict:
  256. """读取保持寄存器"""
  257. request_data = struct.pack('>HH', start_address, quantity)
  258. request = self._build_request(device_address, 0x03, request_data)
  259. result = self._send_receive(request, timeout=timeout)
  260. if 'error' in result:
  261. return result
  262. data_str = result.get('data', '')
  263. data_bytes = bytes.fromhex(data_str) if isinstance(data_str, str) and data_str else b''
  264. registers = []
  265. for i in range(0, len(data_bytes), 2):
  266. if i + 1 < len(data_bytes):
  267. registers.append(struct.unpack('>H', data_bytes[i:i+2])[0])
  268. return {'registers': registers, **result}
  269. def write_single_register(self, device_address: int, register_address: int, value: int, timeout: float = None) -> dict:
  270. """写单个寄存器"""
  271. request_data = struct.pack('>HH', register_address, value)
  272. request = self._build_request(device_address, 0x06, request_data)
  273. return self._send_receive(request, timeout=timeout)
  274. def set_rgb_led(self, device_address: int, led_number: int, color: int, timeout: float = None) -> dict:
  275. """设置RGB灯
  276. Register 0x0001, value = (led_number << 8) | color
  277. color: 0=off, 1=flashing red, 2=flashing green, 3=flashing blue
  278. """
  279. value = (led_number << 8) | (color & 0xFF)
  280. return self.write_single_register(device_address, 0x0001, value, timeout)
  281. def scan_devices(self, max_address: int = 247) -> list:
  282. """scan online devices using Modbus 03"""
  283. import time
  284. devices = []
  285. for addr in range(1, min(max_address + 1, 248)):
  286. request = self._build_request(addr, 0x03, struct.pack('>HH', 0x0000, 0x0001))
  287. response = self.serial.send_and_wait(request, timeout=0.3, min_response_bytes=5)
  288. if response and len(response) >= 5 and verify_crc16(response):
  289. devices.append(addr)
  290. return devices