app.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. from flask import Flask, jsonify, request, abort
  2. from flask_cors import CORS
  3. from flask_socketio import SocketIO, emit
  4. import threading
  5. import time
  6. import json
  7. import os
  8. import logging
  9. import uuid
  10. # 导入配置
  11. from config import (
  12. MAX_BUFFER_SIZE,
  13. FLASK_SECRET_KEY,
  14. FLASK_DEBUG,
  15. FLASK_HOST,
  16. FLASK_PORT,
  17. LOG_LEVEL,
  18. LOG_FORMAT,
  19. LOG_FILE,
  20. SOCKETIO_ASYNC_MODE,
  21. SOCKETIO_ALLOWED_ORIGINS,
  22. DEFAULT_FORWARD_SERIAL_TO_MQTT,
  23. DEFAULT_FORWARD_MQTT_TO_SERIAL,
  24. DEFAULT_MQTT_PUBLISH_TOPIC,
  25. SOCKETIO_NAMESPACE_DATA,
  26. SOCKETIO_NAMESPACE_STATUS,
  27. SOCKETIO_NAMESPACE_CONTROL,
  28. ERROR_CODES,
  29. ERROR_MESSAGES
  30. )
  31. # 配置日志
  32. logging_config = {
  33. 'level': getattr(logging, LOG_LEVEL),
  34. 'format': LOG_FORMAT
  35. }
  36. if LOG_FILE:
  37. logging_config['filename'] = LOG_FILE
  38. logging.basicConfig(**logging_config)
  39. logger = logging.getLogger('serial_mqtt_gateway')
  40. from modules.serial_port import SerialPort
  41. from modules.mqtt_client import MQTTClient
  42. from modules.network_config import network_manager
  43. app = Flask(__name__)
  44. app.config['SECRET_KEY'] = FLASK_SECRET_KEY
  45. # 配置CORS以允许nginx代理的前端访问
  46. CORS(app, resources={r"/api/*": {"origins": "*"}, r"/socket.io/*": {"origins": "*"}})
  47. # 初始化SocketIO
  48. socketio = SocketIO(
  49. app,
  50. cors_allowed_origins=SOCKETIO_ALLOWED_ORIGINS,
  51. async_mode=SOCKETIO_ASYNC_MODE,
  52. manage_session=False, # 禁用会话管理以提高性能
  53. ping_timeout=30, # 心跳超时时间
  54. ping_interval=25, # 心跳间隔
  55. logger=FLASK_DEBUG, # 根据Flask调试模式决定是否记录SocketIO日志
  56. engineio_logger=FLASK_DEBUG
  57. )
  58. # 初始化串口和MQTT客户端
  59. serial_client = SerialPort()
  60. mqtt_client = MQTTClient()
  61. # 转发标志
  62. forward_serial_to_mqtt = DEFAULT_FORWARD_SERIAL_TO_MQTT
  63. forward_mqtt_to_serial = DEFAULT_FORWARD_MQTT_TO_SERIAL
  64. mqtt_publish_topic = DEFAULT_MQTT_PUBLISH_TOPIC
  65. # 数据存储缓冲区
  66. serial_data_buffer = []
  67. mqtt_data_buffer = []
  68. # 状态标志
  69. serial_status = False
  70. mqtt_status = False
  71. # 客户端连接管理
  72. connected_clients = {
  73. 'data': set(),
  74. 'status': set(),
  75. 'control': set()
  76. }
  77. # 设置回调函数
  78. def serial_data_handler(data):
  79. """处理串口接收的数据"""
  80. try:
  81. # 添加到缓冲区
  82. timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
  83. serial_data_buffer.append({
  84. 'timestamp': timestamp,
  85. 'data': data
  86. })
  87. # 保持缓冲区大小
  88. if len(serial_data_buffer) > MAX_BUFFER_SIZE:
  89. serial_data_buffer.pop(0)
  90. # 通过WebSocket广播数据
  91. socketio.emit('serial_data', {
  92. 'timestamp': timestamp,
  93. 'data': data
  94. }, namespace=SOCKETIO_NAMESPACE_DATA)
  95. # 如果启用了转发且MQTT已连接,转发数据到MQTT
  96. if forward_serial_to_mqtt and mqtt_client.get_status():
  97. success, msg = mqtt_client.publish(mqtt_publish_topic, data)
  98. if not success:
  99. logger.warning(f"串口数据转发到MQTT失败: {msg}")
  100. except Exception as e:
  101. logger.error(f"处理串口数据时出错: {str(e)}")
  102. def serial_status_handler(status):
  103. """处理串口状态变化"""
  104. try:
  105. global serial_status
  106. serial_status = status
  107. # 通过WebSocket广播状态变化
  108. socketio.emit('serial_status', {
  109. 'connected': status
  110. }, namespace=SOCKETIO_NAMESPACE_STATUS)
  111. logger.info(f"串口状态更新: {'已连接' if status else '已断开'}")
  112. except Exception as e:
  113. logger.error(f"处理串口状态时出错: {str(e)}")
  114. def mqtt_data_handler(data):
  115. """处理MQTT接收的数据"""
  116. try:
  117. # 添加到缓冲区
  118. timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
  119. mqtt_data_buffer.append({
  120. 'timestamp': timestamp,
  121. 'topic': data['topic'],
  122. 'payload': data['payload']
  123. })
  124. # 保持缓冲区大小
  125. if len(mqtt_data_buffer) > MAX_BUFFER_SIZE:
  126. mqtt_data_buffer.pop(0)
  127. # 通过WebSocket广播数据
  128. socketio.emit('mqtt_data', {
  129. 'timestamp': timestamp,
  130. 'topic': data['topic'],
  131. 'payload': data['payload']
  132. }, namespace=SOCKETIO_NAMESPACE_DATA)
  133. # 如果启用了转发且串口已连接,转发数据到串口
  134. if forward_mqtt_to_serial and serial_client.get_status():
  135. success, msg = serial_client.send_data(data['payload'])
  136. if not success:
  137. logger.warning(f"MQTT数据转发到串口失败: {msg}")
  138. except Exception as e:
  139. logger.error(f"处理MQTT数据时出错: {str(e)}")
  140. def mqtt_status_handler(status):
  141. """处理MQTT状态变化"""
  142. try:
  143. global mqtt_status
  144. mqtt_status = status
  145. # 通过WebSocket广播状态变化
  146. socketio.emit('mqtt_status', {
  147. 'connected': status
  148. }, namespace=SOCKETIO_NAMESPACE_STATUS)
  149. logger.info(f"MQTT状态更新: {'已连接' if status else '已断开'}")
  150. except Exception as e:
  151. logger.error(f"处理MQTT状态时出错: {str(e)}")
  152. # WebSocket事件处理
  153. @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_DATA)
  154. def handle_data_connect():
  155. """处理数据命名空间的连接"""
  156. try:
  157. client_id = str(uuid.uuid4())
  158. connected_clients['data'].add(client_id)
  159. logger.info(f'客户端已连接到数据命名空间,当前连接数: {len(connected_clients["data"])}')
  160. # 发送当前的缓冲区数据,限制发送的历史记录数量
  161. max_history = 100 # 限制发送的历史记录数量以提高性能
  162. emit('serial_data_history', {'data': serial_data_buffer[-max_history:]})
  163. emit('mqtt_data_history', {'data': mqtt_data_buffer[-max_history:]})
  164. # 存储客户端ID以便断开连接时使用
  165. socketio.start_background_task(target=lambda: None) # 确保上下文可用
  166. except Exception as e:
  167. logger.error(f"处理数据命名空间连接时出错: {str(e)}")
  168. @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_STATUS)
  169. def handle_status_connect():
  170. """处理状态命名空间的连接"""
  171. try:
  172. client_id = str(uuid.uuid4())
  173. connected_clients['status'].add(client_id)
  174. logger.info(f'客户端已连接到状态命名空间,当前连接数: {len(connected_clients["status"])}')
  175. # 发送当前状态
  176. emit('serial_status', {'connected': serial_status})
  177. emit('mqtt_status', {'connected': mqtt_status})
  178. emit('forward_status', {
  179. 'serial_to_mqtt': forward_serial_to_mqtt,
  180. 'mqtt_to_serial': forward_mqtt_to_serial,
  181. 'publish_topic': mqtt_publish_topic
  182. })
  183. except Exception as e:
  184. logger.error(f"处理状态命名空间连接时出错: {str(e)}")
  185. @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_CONTROL)
  186. def handle_control_connect():
  187. """处理控制命名空间的连接"""
  188. try:
  189. client_id = str(uuid.uuid4())
  190. connected_clients['control'].add(client_id)
  191. logger.info(f'客户端已连接到控制命名空间,当前连接数: {len(connected_clients["control"])}')
  192. except Exception as e:
  193. logger.error(f"处理控制命名空间连接时出错: {str(e)}")
  194. @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_DATA)
  195. def handle_data_disconnect():
  196. """处理数据命名空间的断开连接"""
  197. try:
  198. # 清理客户端连接记录
  199. # 在实际应用中,可能需要更复杂的逻辑来追踪具体哪个客户端断开了连接
  200. if len(connected_clients['data']) > 0:
  201. # 这里简化处理,实际应该维护session到client_id的映射
  202. connected_clients['data'].pop() # 注意:这是一个简化的实现
  203. logger.info(f'客户端已断开数据命名空间的连接,当前连接数: {len(connected_clients["data"])}')
  204. except Exception as e:
  205. logger.error(f"处理数据命名空间断开连接时出错: {str(e)}")
  206. @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_STATUS)
  207. def handle_status_disconnect():
  208. """处理状态命名空间的断开连接"""
  209. try:
  210. # 清理客户端连接记录
  211. if len(connected_clients['status']) > 0:
  212. connected_clients['status'].pop()
  213. logger.info(f'客户端已断开状态命名空间的连接,当前连接数: {len(connected_clients["status"])}')
  214. except Exception as e:
  215. logger.error(f"处理状态命名空间断开连接时出错: {str(e)}")
  216. @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_CONTROL)
  217. def handle_control_disconnect():
  218. """处理控制命名空间的断开连接"""
  219. try:
  220. # 清理客户端连接记录
  221. if len(connected_clients['control']) > 0:
  222. connected_clients['control'].pop()
  223. logger.info(f'客户端已断开控制命名空间的连接,当前连接数: {len(connected_clients["control"])}')
  224. except Exception as e:
  225. logger.error(f"处理控制命名空间断开连接时出错: {str(e)}")
  226. @socketio.on('serial_send', namespace=SOCKETIO_NAMESPACE_CONTROL)
  227. def handle_serial_send(data):
  228. """通过WebSocket处理串口发送数据请求"""
  229. try:
  230. message = data.get('message', '')
  231. if not message:
  232. emit('serial_send_response', {
  233. 'success': False,
  234. 'message': '消息内容不能为空',
  235. 'error_code': ERROR_CODES['CONFIG_ERROR']
  236. })
  237. return
  238. if not serial_client.get_status():
  239. emit('serial_send_response', {
  240. 'success': False,
  241. 'message': '串口未连接',
  242. 'error_code': ERROR_CODES['SERIAL_CONNECTION_ERROR']
  243. })
  244. return
  245. success, msg = serial_client.send_data(message)
  246. emit('serial_send_response', {
  247. 'success': success,
  248. 'message': msg,
  249. 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_SEND_ERROR']
  250. })
  251. if success:
  252. logger.info(f"通过WebSocket发送串口数据成功: {message[:50]}..." if len(message) > 50 else message)
  253. except Exception as e:
  254. error_msg = f"处理串口发送请求时出错: {str(e)}"
  255. logger.error(error_msg)
  256. emit('serial_send_response', {
  257. 'success': False,
  258. 'message': error_msg,
  259. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  260. })
  261. @socketio.on('mqtt_publish', namespace=SOCKETIO_NAMESPACE_CONTROL)
  262. def handle_mqtt_publish(data):
  263. """通过WebSocket处理MQTT发布数据请求"""
  264. try:
  265. topic = data.get('topic', '')
  266. message = data.get('message', '')
  267. if not topic or not message:
  268. emit('mqtt_publish_response', {
  269. 'success': False,
  270. 'message': '主题或消息内容不能为空',
  271. 'error_code': ERROR_CODES['CONFIG_ERROR']
  272. })
  273. return
  274. if not mqtt_client.get_status():
  275. emit('mqtt_publish_response', {
  276. 'success': False,
  277. 'message': 'MQTT未连接',
  278. 'error_code': ERROR_CODES['MQTT_CONNECTION_ERROR']
  279. })
  280. return
  281. success, msg = mqtt_client.publish(topic, message)
  282. emit('mqtt_publish_response', {
  283. 'success': success,
  284. 'message': msg,
  285. 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_PUBLISH_ERROR']
  286. })
  287. if success:
  288. logger.info(f"通过WebSocket发布MQTT消息成功: 主题={topic}, 消息={message[:50]}..." if len(message) > 50 else message)
  289. except Exception as e:
  290. error_msg = f"处理MQTT发布请求时出错: {str(e)}"
  291. logger.error(error_msg)
  292. emit('mqtt_publish_response', {
  293. 'success': False,
  294. 'message': error_msg,
  295. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  296. })
  297. @socketio.on('update_forward_config', namespace=SOCKETIO_NAMESPACE_CONTROL)
  298. def handle_update_forward_config(data):
  299. """通过WebSocket更新转发配置"""
  300. global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
  301. try:
  302. # 更新转发标志
  303. if 'serial_to_mqtt' in data:
  304. forward_serial_to_mqtt = bool(data['serial_to_mqtt'])
  305. if 'mqtt_to_serial' in data:
  306. forward_mqtt_to_serial = bool(data['mqtt_to_serial'])
  307. if 'publish_topic' in data:
  308. new_topic = str(data['publish_topic'])
  309. if not new_topic.strip():
  310. raise ValueError("发布主题不能为空")
  311. mqtt_publish_topic = new_topic
  312. # 广播配置更新
  313. socketio.emit('forward_status', {
  314. 'serial_to_mqtt': forward_serial_to_mqtt,
  315. 'mqtt_to_serial': forward_mqtt_to_serial,
  316. 'publish_topic': mqtt_publish_topic
  317. }, namespace=SOCKETIO_NAMESPACE_STATUS)
  318. emit('update_forward_config_response', {
  319. 'success': True,
  320. 'message': '转发配置已更新',
  321. 'error_code': ERROR_CODES['SUCCESS']
  322. })
  323. logger.info(f"转发配置已更新 - 串口到MQTT: {forward_serial_to_mqtt}, MQTT到串口: {forward_mqtt_to_serial}, 发布主题: {mqtt_publish_topic}")
  324. except ValueError as e:
  325. error_msg = str(e)
  326. logger.warning(f"转发配置更新失败: {error_msg}")
  327. emit('update_forward_config_response', {
  328. 'success': False,
  329. 'message': error_msg,
  330. 'error_code': ERROR_CODES['CONFIG_ERROR']
  331. })
  332. except Exception as e:
  333. error_msg = f'更新转发配置失败: {str(e)}'
  334. logger.error(error_msg)
  335. emit('update_forward_config_response', {
  336. 'success': False,
  337. 'message': error_msg,
  338. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  339. })
  340. @socketio.on('clear_data_buffer', namespace=SOCKETIO_NAMESPACE_CONTROL)
  341. def handle_clear_data_buffer(data):
  342. """通过WebSocket清空数据缓冲区"""
  343. try:
  344. buffer_type = data.get('type', '')
  345. if buffer_type == 'serial':
  346. serial_data_buffer.clear()
  347. emit('clear_data_buffer_response', {
  348. 'success': True,
  349. 'message': '串口数据缓冲区已清空',
  350. 'error_code': ERROR_CODES['SUCCESS']
  351. })
  352. logger.info("串口数据缓冲区已清空")
  353. elif buffer_type == 'mqtt':
  354. mqtt_data_buffer.clear()
  355. emit('clear_data_buffer_response', {
  356. 'success': True,
  357. 'message': 'MQTT数据缓冲区已清空',
  358. 'error_code': ERROR_CODES['SUCCESS']
  359. })
  360. logger.info("MQTT数据缓冲区已清空")
  361. elif buffer_type == 'all':
  362. serial_data_buffer.clear()
  363. mqtt_data_buffer.clear()
  364. emit('clear_data_buffer_response', {
  365. 'success': True,
  366. 'message': '所有数据缓冲区已清空',
  367. 'error_code': ERROR_CODES['SUCCESS']
  368. })
  369. logger.info("所有数据缓冲区已清空")
  370. else:
  371. emit('clear_data_buffer_response', {
  372. 'success': False,
  373. 'message': '无效的缓冲区类型',
  374. 'error_code': ERROR_CODES['CONFIG_ERROR']
  375. })
  376. logger.warning(f"清空缓冲区失败: 无效的缓冲区类型 '{buffer_type}'")
  377. except Exception as e:
  378. error_msg = f'清空缓冲区失败: {str(e)}'
  379. logger.error(error_msg)
  380. emit('clear_data_buffer_response', {
  381. 'success': False,
  382. 'message': error_msg,
  383. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  384. })
  385. # 设置回调
  386. serial_client.set_data_callback(serial_data_handler)
  387. serial_client.set_status_callback(serial_status_handler)
  388. mqtt_client.set_data_callback(mqtt_data_handler)
  389. mqtt_client.set_status_callback(mqtt_status_handler)
  390. # API路由
  391. # 移除静态文件服务,前端由nginx提供服务
  392. # 根路径路由
  393. @app.route('/')
  394. def index():
  395. return send_from_directory(STATIC_FOLDER, 'index.html')
  396. # 404错误处理器 - 解决SPA路由刷新问题
  397. @app.errorhandler(404)
  398. def page_not_found(e):
  399. """处理所有404错误,对于非API路径返回index.html"""
  400. path = request.path
  401. # 检查是否是API请求
  402. if path.startswith('/api/'):
  403. # 对于API请求,返回404错误
  404. return jsonify({
  405. 'success': False,
  406. 'message': 'API endpoint not found'
  407. }), 404
  408. # 对于所有非API路径,返回index.html让前端路由处理
  409. return send_from_directory(STATIC_FOLDER, 'index.html'), 200
  410. @app.route('/api/serial/ports', methods=['GET'])
  411. def get_serial_ports():
  412. """获取可用串口列表"""
  413. ports = serial_client.list_ports()
  414. return jsonify({
  415. 'success': True,
  416. 'ports': ports
  417. })
  418. @app.route('/api/serial/connect', methods=['POST'])
  419. def serial_connect():
  420. """连接串口"""
  421. try:
  422. data = request.json
  423. port = data.get('port')
  424. baudrate = data.get('baudrate', 9600)
  425. bytesize = data.get('bytesize', 8)
  426. parity = data.get('parity', 'N')
  427. stopbits = data.get('stopbits', 1)
  428. timeout = data.get('timeout', 0.1)
  429. if not port:
  430. logger.warning("连接串口请求缺少串口名称")
  431. return jsonify({
  432. 'success': False,
  433. 'message': '串口名称不能为空',
  434. 'error_code': ERROR_CODES['CONFIG_ERROR']
  435. }), 400
  436. # 先断开之前的连接
  437. if serial_client.get_status():
  438. logger.info(f"断开现有串口连接: {serial_client.port}")
  439. serial_client.disconnect()
  440. # 连接新的串口
  441. logger.info(f"尝试连接串口: {port}, 波特率: {baudrate}")
  442. success, message = serial_client.connect(port, baudrate, bytesize, parity, stopbits, timeout)
  443. status_code = 200 if success else 400
  444. error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_CONNECTION_ERROR']
  445. response = {
  446. 'success': success,
  447. 'message': message,
  448. 'error_code': error_code
  449. }
  450. if success:
  451. logger.info(f"串口连接成功: {port}")
  452. else:
  453. logger.error(f"串口连接失败: {message}")
  454. return jsonify(response), status_code
  455. except Exception as e:
  456. error_msg = f'连接串口时出错: {str(e)}'
  457. logger.exception(error_msg) # 使用exception记录完整堆栈
  458. return jsonify({
  459. 'success': False,
  460. 'message': error_msg,
  461. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  462. }), 500
  463. @app.route('/api/serial/disconnect', methods=['POST'])
  464. def serial_disconnect():
  465. """断开串口连接"""
  466. success, message = serial_client.disconnect()
  467. return jsonify({
  468. 'success': success,
  469. 'message': message
  470. })
  471. @app.route('/api/serial/status', methods=['GET'])
  472. def serial_get_status():
  473. """获取串口状态"""
  474. return jsonify({
  475. 'connected': serial_client.get_status()
  476. })
  477. @app.route('/api/serial/send', methods=['POST'])
  478. def serial_send():
  479. """发送数据到串口"""
  480. data = request.json
  481. message = data.get('message')
  482. if not message:
  483. return jsonify({
  484. 'success': False,
  485. 'message': '消息内容不能为空'
  486. })
  487. success, message = serial_client.send_data(message)
  488. return jsonify({
  489. 'success': success,
  490. 'message': message
  491. })
  492. @app.route('/api/mqtt/connect', methods=['POST'])
  493. def mqtt_connect():
  494. """连接MQTT服务器"""
  495. try:
  496. data = request.json
  497. host = data.get('broker') or data.get('host', 'localhost')
  498. port = data.get('port', 1883)
  499. client_id = data.get('client_id', f'serial_gateway_{int(time.time())}')
  500. username = data.get('username')
  501. password = data.get('password')
  502. keepalive = data.get('keepalive', 60)
  503. # 先断开之前的连接
  504. if mqtt_client.get_status():
  505. logger.info(f"断开现有MQTT连接: {mqtt_client.host}:{mqtt_client.port}")
  506. mqtt_client.disconnect()
  507. # 连接新的MQTT服务器
  508. logger.info(f"尝试连接MQTT服务器: {host}:{port}, 客户端ID: {client_id}")
  509. success, message = mqtt_client.connect(
  510. host=host,
  511. port=port,
  512. client_id=client_id,
  513. username=username,
  514. password=password,
  515. keepalive=keepalive
  516. )
  517. # 如果连接成功,订阅主题
  518. if success and 'topics' in data:
  519. mqtt_client.subscribe(data['topics'])
  520. status_code = 200 if success else 400
  521. error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_CONNECTION_ERROR']
  522. response = {
  523. 'success': success,
  524. 'message': message,
  525. 'error_code': error_code
  526. }
  527. if success:
  528. logger.info(f"MQTT服务器连接成功: {host}:{port}")
  529. else:
  530. logger.error(f"MQTT服务器连接失败: {message}")
  531. return jsonify(response), status_code
  532. except Exception as e:
  533. error_msg = f'连接MQTT服务器时出错: {str(e)}'
  534. logger.exception(error_msg)
  535. return jsonify({
  536. 'success': False,
  537. 'message': error_msg,
  538. 'error_code': ERROR_CODES['UNKNOWN_ERROR']
  539. }), 500
  540. @app.route('/api/mqtt/disconnect', methods=['POST'])
  541. def mqtt_disconnect():
  542. """断开MQTT连接"""
  543. success, message = mqtt_client.disconnect()
  544. return jsonify({
  545. 'success': success,
  546. 'message': message
  547. })
  548. @app.route('/api/mqtt/status', methods=['GET'])
  549. def mqtt_get_status():
  550. """获取MQTT状态"""
  551. return jsonify({
  552. 'connected': mqtt_client.get_status()
  553. })
  554. @app.route('/api/mqtt/publish', methods=['POST'])
  555. def mqtt_publish():
  556. """发布MQTT消息"""
  557. data = request.json
  558. topic = data.get('topic')
  559. message = data.get('message')
  560. if not topic or not message:
  561. return jsonify({
  562. 'success': False,
  563. 'message': '主题和消息内容不能为空'
  564. })
  565. success, message = mqtt_client.publish(topic, message)
  566. return jsonify({
  567. 'success': success,
  568. 'message': message
  569. })
  570. @app.route('/api/mqtt/subscribe', methods=['POST'])
  571. def mqtt_subscribe():
  572. """订阅MQTT主题"""
  573. data = request.json
  574. topics = data.get('topics', [])
  575. if not topics:
  576. return jsonify({
  577. 'success': False,
  578. 'message': '请至少订阅一个主题'
  579. })
  580. success, message = mqtt_client.subscribe(topics)
  581. return jsonify({
  582. 'success': success,
  583. 'message': message
  584. })
  585. @app.route('/api/data/serial', methods=['GET'])
  586. def get_serial_data():
  587. """获取串口数据"""
  588. return jsonify({
  589. 'data': serial_data_buffer
  590. })
  591. @app.route('/api/data/mqtt', methods=['GET'])
  592. def get_mqtt_data():
  593. """获取MQTT数据"""
  594. return jsonify({
  595. 'data': mqtt_data_buffer
  596. })
  597. @app.route('/api/forward/config', methods=['POST'])
  598. def set_forward_config():
  599. """设置转发配置"""
  600. global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
  601. data = request.json
  602. forward_serial_to_mqtt = data.get('serial_to_mqtt', False)
  603. forward_mqtt_to_serial = data.get('mqtt_to_serial', False)
  604. if 'publish_topic' in data:
  605. mqtt_publish_topic = data['publish_topic']
  606. return jsonify({
  607. 'success': True,
  608. 'message': '转发配置已更新',
  609. 'config': {
  610. 'serial_to_mqtt': forward_serial_to_mqtt,
  611. 'mqtt_to_serial': forward_mqtt_to_serial,
  612. 'publish_topic': mqtt_publish_topic
  613. }
  614. })
  615. @app.route('/api/forward/status', methods=['GET'])
  616. def get_forward_status():
  617. """获取转发状态"""
  618. return jsonify({
  619. 'serial_to_mqtt': forward_serial_to_mqtt,
  620. 'mqtt_to_serial': forward_mqtt_to_serial,
  621. 'publish_topic': mqtt_publish_topic
  622. })
  623. # 健康检查端点
  624. @app.route('/api/health', methods=['GET'])
  625. def health_check():
  626. """健康检查端点"""
  627. try:
  628. # 获取客户端连接数量
  629. client_counts = {
  630. 'data': len(connected_clients['data']),
  631. 'status': len(connected_clients['status']),
  632. 'control': len(connected_clients['control'])
  633. }
  634. # 获取缓冲区大小
  635. buffer_sizes = {
  636. 'serial': len(serial_data_buffer),
  637. 'mqtt': len(mqtt_data_buffer)
  638. }
  639. # 执行系统负载检查
  640. # 注意:这只是一个简化的负载检查,实际应用中可能需要更复杂的监控
  641. is_healthy = True
  642. load_warnings = []
  643. # 检查缓冲区是否过大
  644. if buffer_sizes['serial'] > MAX_BUFFER_SIZE * 0.8:
  645. is_healthy = False
  646. load_warnings.append(f"串口缓冲区接近最大容量: {buffer_sizes['serial']}/{MAX_BUFFER_SIZE}")
  647. if buffer_sizes['mqtt'] > MAX_BUFFER_SIZE * 0.8:
  648. is_healthy = False
  649. load_warnings.append(f"MQTT缓冲区接近最大容量: {buffer_sizes['mqtt']}/{MAX_BUFFER_SIZE}")
  650. # 检查WebSocket连接数是否过多
  651. total_clients = sum(client_counts.values())
  652. if total_clients > 100: # 设置合理的阈值
  653. is_healthy = False
  654. load_warnings.append(f"WebSocket连接数过多: {total_clients}")
  655. # 尝试获取网络状态信息(使用try-except包装,防止网络模块出错导致健康检查失败)
  656. network_info = None
  657. try:
  658. network_info = network_manager.get_network_status()
  659. except Exception as e:
  660. logger.warning(f"获取网络状态时出错: {str(e)}")
  661. # 不影响整体健康检查,只添加警告
  662. load_warnings.append(f"网络状态获取失败: {str(e)}")
  663. response = {
  664. 'status': 'healthy' if is_healthy else 'warning',
  665. 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
  666. 'services': {
  667. 'serial': serial_status,
  668. 'mqtt': mqtt_status,
  669. 'websocket': True
  670. },
  671. 'client_counts': client_counts,
  672. 'buffer_sizes': buffer_sizes,
  673. 'warnings': load_warnings
  674. }
  675. # 仅当获取到网络信息时添加
  676. if network_info:
  677. response['network'] = network_info
  678. return jsonify(response), 200
  679. except Exception as e:
  680. # 捕获所有异常,确保健康检查不会返回500错误
  681. logger.error(f"健康检查端点出错: {str(e)}")
  682. # 返回一个基础的健康状态,至少显示服务在运行
  683. return jsonify({
  684. 'status': 'error',
  685. 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
  686. 'error': str(e),
  687. 'services': {
  688. 'api': True, # API服务本身是运行的
  689. 'serial': None,
  690. 'mqtt': None,
  691. 'websocket': None
  692. }
  693. }), 200 # 仍然返回200,避免健康检查导致的连锁反应
  694. # 网络配置相关API
  695. @app.route('/api/network/config', methods=['GET'])
  696. def get_network_config():
  697. """获取当前网络配置"""
  698. try:
  699. config = network_manager.get_network_config()
  700. return jsonify(config)
  701. except Exception as e:
  702. logger.error(f'获取网络配置失败: {str(e)}')
  703. return jsonify({'success': False, 'message': f'获取网络配置失败: {str(e)}'}), 500
  704. @app.route('/api/network/config', methods=['POST'])
  705. def update_network_config():
  706. """更新网络配置"""
  707. try:
  708. config_data = request.json
  709. result = network_manager.update_network_config(config_data)
  710. if result['success']:
  711. return jsonify(result)
  712. else:
  713. return jsonify(result), 400
  714. except Exception as e:
  715. logger.error(f'更新网络配置失败: {str(e)}')
  716. return jsonify({'success': False, 'message': f'更新网络配置失败: {str(e)}'}), 500
  717. @app.route('/api/network/status', methods=['GET'])
  718. def get_network_status():
  719. """获取网络状态信息"""
  720. try:
  721. status = network_manager.get_network_status()
  722. return jsonify(status)
  723. except Exception as e:
  724. logger.error(f'获取网络状态失败: {str(e)}')
  725. return jsonify({'success': False, 'message': f'获取网络状态失败: {str(e)}'}), 500
  726. @app.route('/api/network/restart', methods=['POST'])
  727. def restart_network_service():
  728. """重启网络服务以应用新配置"""
  729. try:
  730. result = network_manager.restart_network_service()
  731. if result['success']:
  732. return jsonify(result)
  733. else:
  734. return jsonify(result), 500
  735. except Exception as e:
  736. logger.error(f'重启网络服务失败: {str(e)}')
  737. return jsonify({'success': False, 'message': f'重启网络服务失败: {str(e)}'}), 500
  738. # 不再需要静态文件目录,前端由nginx提供服务
  739. if __name__ == '__main__':
  740. try:
  741. # 启动前的初始化工作
  742. logger.info('启动串口-MQTT网关服务...')
  743. logger.info(f"配置信息: 主机={FLASK_HOST}, 端口={FLASK_PORT}, 调试模式={FLASK_DEBUG}")
  744. # 启动服务
  745. socketio.run(
  746. app,
  747. host=FLASK_HOST,
  748. port=FLASK_PORT,
  749. debug=FLASK_DEBUG,
  750. use_reloader=False, # 禁用重载器以避免重复初始化问题
  751. log_output=False # 禁用Flask的日志输出,使用我们自己的日志配置
  752. )
  753. except KeyboardInterrupt:
  754. # 优雅退出
  755. logger.info('正在关闭应用...')
  756. try:
  757. if serial_client.get_status():
  758. serial_client.disconnect()
  759. logger.info('串口连接已断开')
  760. if mqtt_client.get_status():
  761. mqtt_client.disconnect()
  762. logger.info('MQTT连接已断开')
  763. except Exception as e:
  764. logger.error(f'关闭连接时出错: {str(e)}')
  765. # 清理WebSocket连接
  766. for client_type in connected_clients:
  767. connected_clients[client_type].clear()
  768. logger.info('应用已安全关闭')
  769. except Exception as e:
  770. logger.exception(f'应用启动失败') # 使用exception记录完整堆栈
  771. # 确保资源被释放
  772. try:
  773. serial_client.disconnect()
  774. mqtt_client.disconnect()
  775. except:
  776. pass