test_mqtt.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #!/usr/bin/env python3
  2. """
  3. MQTT DTU 协议测试脚本
  4. 连接 mqtt://xt.wenhq.top:8581,订阅所有 DTU 上报主题
  5. """
  6. import json
  7. import signal
  8. import paho.mqtt.client as mqtt
  9. HOST = 'xt.wenhq.top'
  10. PORT = 8581
  11. TOPIC_PREFIX = '线架系统'
  12. received = {}
  13. def on_connect(client, userdata, flags, rc):
  14. if rc == 0:
  15. print(f"[✓] 已连接 {HOST}:{PORT}")
  16. client.subscribe(f'{TOPIC_PREFIX}/#')
  17. print(f"[*] 已订阅 {TOPIC_PREFIX}/#")
  18. else:
  19. print(f"[✗] 连接失败, return code={rc}")
  20. def on_message(client, userdata, msg):
  21. topic = msg.topic
  22. try:
  23. payload = json.loads(msg.payload.decode())
  24. except:
  25. payload = msg.payload.decode()
  26. ts = payload.get('timestamp', '')
  27. dtype = payload.get('type', '?')
  28. dtu_id = payload.get('dtu_id', '?')
  29. print(f"\n{'='*60}")
  30. print(f"[{dtype}] DTU={dtu_id} 时间={ts}")
  31. print(f" 主题: {topic}")
  32. received[topic] = received.get(topic, 0) + 1
  33. p = payload.get('payload', {})
  34. if isinstance(p, dict):
  35. for k, v in p.items():
  36. if isinstance(v, list) and len(v) > 3:
  37. print(f" {k}: [{len(v)} items]")
  38. for item in v[:3]:
  39. print(f" - {json.dumps(item, ensure_ascii=False)}")
  40. if len(v) > 3:
  41. print(f" ... 共 {len(v)} 项")
  42. elif isinstance(v, dict):
  43. print(f" {k}: {json.dumps(v, ensure_ascii=False)}")
  44. else:
  45. print(f" {k}: {v}")
  46. else:
  47. print(f" payload: {json.dumps(p, ensure_ascii=False)}")
  48. def on_disconnect(client, userdata, rc):
  49. print(f"\n[!] 已断开 (rc={rc})")
  50. def signal_handler(sig, frame):
  51. print(f"\n\n{'='*60}")
  52. print("接收统计:")
  53. for topic, count in sorted(received.items(), key=lambda x: -x[1]):
  54. short = topic.replace(f'{TOPIC_PREFIX}/', '', 1)
  55. print(f" {short}: {count} 条")
  56. print(f"\n共收到 {sum(received.values())} 条消息")
  57. client.disconnect()
  58. exit(0)
  59. signal.signal(signal.SIGINT, signal_handler)
  60. client = mqtt.Client(client_id='dtu_test_script', protocol=mqtt.MQTTv311)
  61. client.on_connect = on_connect
  62. client.on_message = on_message
  63. client.on_disconnect = on_disconnect
  64. print(f"正在连接 {HOST}:{PORT} ...")
  65. client.connect(HOST, PORT, keepalive=60)
  66. client.loop_forever()