| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- #!/usr/bin/env python3
- """
- MQTT DTU 协议测试脚本
- 连接 mqtt://xt.wenhq.top:8581,订阅所有 DTU 上报主题
- """
- import json
- import signal
- import paho.mqtt.client as mqtt
- HOST = 'xt.wenhq.top'
- PORT = 8581
- TOPIC_PREFIX = '线架系统'
- received = {}
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print(f"[✓] 已连接 {HOST}:{PORT}")
- client.subscribe(f'{TOPIC_PREFIX}/#')
- print(f"[*] 已订阅 {TOPIC_PREFIX}/#")
- else:
- print(f"[✗] 连接失败, return code={rc}")
- def on_message(client, userdata, msg):
- topic = msg.topic
- try:
- payload = json.loads(msg.payload.decode())
- except:
- payload = msg.payload.decode()
- ts = payload.get('timestamp', '')
- dtype = payload.get('type', '?')
- dtu_id = payload.get('dtu_id', '?')
- print(f"\n{'='*60}")
- print(f"[{dtype}] DTU={dtu_id} 时间={ts}")
- print(f" 主题: {topic}")
- received[topic] = received.get(topic, 0) + 1
- p = payload.get('payload', {})
- if isinstance(p, dict):
- for k, v in p.items():
- if isinstance(v, list) and len(v) > 3:
- print(f" {k}: [{len(v)} items]")
- for item in v[:3]:
- print(f" - {json.dumps(item, ensure_ascii=False)}")
- if len(v) > 3:
- print(f" ... 共 {len(v)} 项")
- elif isinstance(v, dict):
- print(f" {k}: {json.dumps(v, ensure_ascii=False)}")
- else:
- print(f" {k}: {v}")
- else:
- print(f" payload: {json.dumps(p, ensure_ascii=False)}")
- def on_disconnect(client, userdata, rc):
- print(f"\n[!] 已断开 (rc={rc})")
- def signal_handler(sig, frame):
- print(f"\n\n{'='*60}")
- print("接收统计:")
- for topic, count in sorted(received.items(), key=lambda x: -x[1]):
- short = topic.replace(f'{TOPIC_PREFIX}/', '', 1)
- print(f" {short}: {count} 条")
- print(f"\n共收到 {sum(received.values())} 条消息")
- client.disconnect()
- exit(0)
- signal.signal(signal.SIGINT, signal_handler)
- client = mqtt.Client(client_id='dtu_test_script', protocol=mqtt.MQTTv311)
- client.on_connect = on_connect
- client.on_message = on_message
- client.on_disconnect = on_disconnect
- print(f"正在连接 {HOST}:{PORT} ...")
- client.connect(HOST, PORT, keepalive=60)
- client.loop_forever()
|