| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884 |
- from flask import Flask, jsonify, request, abort
- from flask_cors import CORS
- from flask_socketio import SocketIO, emit
- import threading
- import time
- import json
- import os
- import logging
- import uuid
- # 导入配置
- from config import (
- MAX_BUFFER_SIZE,
- FLASK_SECRET_KEY,
- FLASK_DEBUG,
- FLASK_HOST,
- FLASK_PORT,
- LOG_LEVEL,
- LOG_FORMAT,
- LOG_FILE,
- SOCKETIO_ASYNC_MODE,
- SOCKETIO_ALLOWED_ORIGINS,
- DEFAULT_FORWARD_SERIAL_TO_MQTT,
- DEFAULT_FORWARD_MQTT_TO_SERIAL,
- DEFAULT_MQTT_PUBLISH_TOPIC,
- SOCKETIO_NAMESPACE_DATA,
- SOCKETIO_NAMESPACE_STATUS,
- SOCKETIO_NAMESPACE_CONTROL,
- ERROR_CODES,
- ERROR_MESSAGES
- )
- # 配置日志
- logging_config = {
- 'level': getattr(logging, LOG_LEVEL),
- 'format': LOG_FORMAT
- }
- if LOG_FILE:
- logging_config['filename'] = LOG_FILE
- logging.basicConfig(**logging_config)
- logger = logging.getLogger('serial_mqtt_gateway')
- from modules.serial_port import SerialPort
- from modules.mqtt_client import MQTTClient
- from modules.network_config import network_manager
- app = Flask(__name__)
- app.config['SECRET_KEY'] = FLASK_SECRET_KEY
- # 配置CORS以允许nginx代理的前端访问
- CORS(app, resources={r"/api/*": {"origins": "*"}, r"/socket.io/*": {"origins": "*"}})
- # 初始化SocketIO
- socketio = SocketIO(
- app,
- cors_allowed_origins=SOCKETIO_ALLOWED_ORIGINS,
- async_mode=SOCKETIO_ASYNC_MODE,
- manage_session=False, # 禁用会话管理以提高性能
- ping_timeout=30, # 心跳超时时间
- ping_interval=25, # 心跳间隔
- logger=FLASK_DEBUG, # 根据Flask调试模式决定是否记录SocketIO日志
- engineio_logger=FLASK_DEBUG
- )
- # 初始化串口和MQTT客户端
- serial_client = SerialPort()
- mqtt_client = MQTTClient()
- # 转发标志
- forward_serial_to_mqtt = DEFAULT_FORWARD_SERIAL_TO_MQTT
- forward_mqtt_to_serial = DEFAULT_FORWARD_MQTT_TO_SERIAL
- mqtt_publish_topic = DEFAULT_MQTT_PUBLISH_TOPIC
- # 数据存储缓冲区
- serial_data_buffer = []
- mqtt_data_buffer = []
- # 状态标志
- serial_status = False
- mqtt_status = False
- # 客户端连接管理
- connected_clients = {
- 'data': set(),
- 'status': set(),
- 'control': set()
- }
- # 设置回调函数
- def serial_data_handler(data):
- """处理串口接收的数据"""
- try:
- # 添加到缓冲区
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- serial_data_buffer.append({
- 'timestamp': timestamp,
- 'data': data
- })
- # 保持缓冲区大小
- if len(serial_data_buffer) > MAX_BUFFER_SIZE:
- serial_data_buffer.pop(0)
-
- # 通过WebSocket广播数据
- socketio.emit('serial_data', {
- 'timestamp': timestamp,
- 'data': data
- }, namespace=SOCKETIO_NAMESPACE_DATA)
-
- # 如果启用了转发且MQTT已连接,转发数据到MQTT
- if forward_serial_to_mqtt and mqtt_client.get_status():
- success, msg = mqtt_client.publish(mqtt_publish_topic, data)
- if not success:
- logger.warning(f"串口数据转发到MQTT失败: {msg}")
- except Exception as e:
- logger.error(f"处理串口数据时出错: {str(e)}")
- def serial_status_handler(status):
- """处理串口状态变化"""
- try:
- global serial_status
- serial_status = status
-
- # 通过WebSocket广播状态变化
- socketio.emit('serial_status', {
- 'connected': status
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
-
- logger.info(f"串口状态更新: {'已连接' if status else '已断开'}")
- except Exception as e:
- logger.error(f"处理串口状态时出错: {str(e)}")
- def mqtt_data_handler(data):
- """处理MQTT接收的数据"""
- try:
- # 添加到缓冲区
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- mqtt_data_buffer.append({
- 'timestamp': timestamp,
- 'topic': data['topic'],
- 'payload': data['payload']
- })
- # 保持缓冲区大小
- if len(mqtt_data_buffer) > MAX_BUFFER_SIZE:
- mqtt_data_buffer.pop(0)
-
- # 通过WebSocket广播数据
- socketio.emit('mqtt_data', {
- 'timestamp': timestamp,
- 'topic': data['topic'],
- 'payload': data['payload']
- }, namespace=SOCKETIO_NAMESPACE_DATA)
-
- # 如果启用了转发且串口已连接,转发数据到串口
- if forward_mqtt_to_serial and serial_client.get_status():
- success, msg = serial_client.send_data(data['payload'])
- if not success:
- logger.warning(f"MQTT数据转发到串口失败: {msg}")
- except Exception as e:
- logger.error(f"处理MQTT数据时出错: {str(e)}")
- def mqtt_status_handler(status):
- """处理MQTT状态变化"""
- try:
- global mqtt_status
- mqtt_status = status
-
- # 通过WebSocket广播状态变化
- socketio.emit('mqtt_status', {
- 'connected': status
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
-
- logger.info(f"MQTT状态更新: {'已连接' if status else '已断开'}")
- except Exception as e:
- logger.error(f"处理MQTT状态时出错: {str(e)}")
- # WebSocket事件处理
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_DATA)
- def handle_data_connect():
- """处理数据命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['data'].add(client_id)
- logger.info(f'客户端已连接到数据命名空间,当前连接数: {len(connected_clients["data"])}')
-
- # 发送当前的缓冲区数据,限制发送的历史记录数量
- max_history = 100 # 限制发送的历史记录数量以提高性能
- emit('serial_data_history', {'data': serial_data_buffer[-max_history:]})
- emit('mqtt_data_history', {'data': mqtt_data_buffer[-max_history:]})
-
- # 存储客户端ID以便断开连接时使用
- socketio.start_background_task(target=lambda: None) # 确保上下文可用
- except Exception as e:
- logger.error(f"处理数据命名空间连接时出错: {str(e)}")
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_STATUS)
- def handle_status_connect():
- """处理状态命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['status'].add(client_id)
- logger.info(f'客户端已连接到状态命名空间,当前连接数: {len(connected_clients["status"])}')
-
- # 发送当前状态
- emit('serial_status', {'connected': serial_status})
- emit('mqtt_status', {'connected': mqtt_status})
- emit('forward_status', {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- })
- except Exception as e:
- logger.error(f"处理状态命名空间连接时出错: {str(e)}")
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_control_connect():
- """处理控制命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['control'].add(client_id)
- logger.info(f'客户端已连接到控制命名空间,当前连接数: {len(connected_clients["control"])}')
- except Exception as e:
- logger.error(f"处理控制命名空间连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_DATA)
- def handle_data_disconnect():
- """处理数据命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- # 在实际应用中,可能需要更复杂的逻辑来追踪具体哪个客户端断开了连接
- if len(connected_clients['data']) > 0:
- # 这里简化处理,实际应该维护session到client_id的映射
- connected_clients['data'].pop() # 注意:这是一个简化的实现
- logger.info(f'客户端已断开数据命名空间的连接,当前连接数: {len(connected_clients["data"])}')
- except Exception as e:
- logger.error(f"处理数据命名空间断开连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_STATUS)
- def handle_status_disconnect():
- """处理状态命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- if len(connected_clients['status']) > 0:
- connected_clients['status'].pop()
- logger.info(f'客户端已断开状态命名空间的连接,当前连接数: {len(connected_clients["status"])}')
- except Exception as e:
- logger.error(f"处理状态命名空间断开连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_control_disconnect():
- """处理控制命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- if len(connected_clients['control']) > 0:
- connected_clients['control'].pop()
- logger.info(f'客户端已断开控制命名空间的连接,当前连接数: {len(connected_clients["control"])}')
- except Exception as e:
- logger.error(f"处理控制命名空间断开连接时出错: {str(e)}")
- @socketio.on('serial_send', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_serial_send(data):
- """通过WebSocket处理串口发送数据请求"""
- try:
- message = data.get('message', '')
- if not message:
- emit('serial_send_response', {
- 'success': False,
- 'message': '消息内容不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- return
-
- if not serial_client.get_status():
- emit('serial_send_response', {
- 'success': False,
- 'message': '串口未连接',
- 'error_code': ERROR_CODES['SERIAL_CONNECTION_ERROR']
- })
- return
-
- success, msg = serial_client.send_data(message)
- emit('serial_send_response', {
- 'success': success,
- 'message': msg,
- 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_SEND_ERROR']
- })
-
- if success:
- logger.info(f"通过WebSocket发送串口数据成功: {message[:50]}..." if len(message) > 50 else message)
- except Exception as e:
- error_msg = f"处理串口发送请求时出错: {str(e)}"
- logger.error(error_msg)
- emit('serial_send_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('mqtt_publish', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_mqtt_publish(data):
- """通过WebSocket处理MQTT发布数据请求"""
- try:
- topic = data.get('topic', '')
- message = data.get('message', '')
-
- if not topic or not message:
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': '主题或消息内容不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- return
-
- if not mqtt_client.get_status():
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': 'MQTT未连接',
- 'error_code': ERROR_CODES['MQTT_CONNECTION_ERROR']
- })
- return
-
- success, msg = mqtt_client.publish(topic, message)
- emit('mqtt_publish_response', {
- 'success': success,
- 'message': msg,
- 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_PUBLISH_ERROR']
- })
-
- if success:
- logger.info(f"通过WebSocket发布MQTT消息成功: 主题={topic}, 消息={message[:50]}..." if len(message) > 50 else message)
- except Exception as e:
- error_msg = f"处理MQTT发布请求时出错: {str(e)}"
- logger.error(error_msg)
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('update_forward_config', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_update_forward_config(data):
- """通过WebSocket更新转发配置"""
- global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
-
- try:
- # 更新转发标志
- if 'serial_to_mqtt' in data:
- forward_serial_to_mqtt = bool(data['serial_to_mqtt'])
- if 'mqtt_to_serial' in data:
- forward_mqtt_to_serial = bool(data['mqtt_to_serial'])
- if 'publish_topic' in data:
- new_topic = str(data['publish_topic'])
- if not new_topic.strip():
- raise ValueError("发布主题不能为空")
- mqtt_publish_topic = new_topic
-
- # 广播配置更新
- socketio.emit('forward_status', {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
-
- emit('update_forward_config_response', {
- 'success': True,
- 'message': '转发配置已更新',
- 'error_code': ERROR_CODES['SUCCESS']
- })
-
- logger.info(f"转发配置已更新 - 串口到MQTT: {forward_serial_to_mqtt}, MQTT到串口: {forward_mqtt_to_serial}, 发布主题: {mqtt_publish_topic}")
- except ValueError as e:
- error_msg = str(e)
- logger.warning(f"转发配置更新失败: {error_msg}")
- emit('update_forward_config_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- except Exception as e:
- error_msg = f'更新转发配置失败: {str(e)}'
- logger.error(error_msg)
- emit('update_forward_config_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('clear_data_buffer', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_clear_data_buffer(data):
- """通过WebSocket清空数据缓冲区"""
- try:
- buffer_type = data.get('type', '')
-
- if buffer_type == 'serial':
- serial_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': '串口数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("串口数据缓冲区已清空")
- elif buffer_type == 'mqtt':
- mqtt_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': 'MQTT数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("MQTT数据缓冲区已清空")
- elif buffer_type == 'all':
- serial_data_buffer.clear()
- mqtt_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': '所有数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("所有数据缓冲区已清空")
- else:
- emit('clear_data_buffer_response', {
- 'success': False,
- 'message': '无效的缓冲区类型',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- logger.warning(f"清空缓冲区失败: 无效的缓冲区类型 '{buffer_type}'")
- except Exception as e:
- error_msg = f'清空缓冲区失败: {str(e)}'
- logger.error(error_msg)
- emit('clear_data_buffer_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- # 设置回调
- serial_client.set_data_callback(serial_data_handler)
- serial_client.set_status_callback(serial_status_handler)
- mqtt_client.set_data_callback(mqtt_data_handler)
- mqtt_client.set_status_callback(mqtt_status_handler)
- # API路由
- # 移除静态文件服务,前端由nginx提供服务
- # 根路径路由
- @app.route('/')
- def index():
- return send_from_directory(STATIC_FOLDER, 'index.html')
- # 404错误处理器 - 解决SPA路由刷新问题
- @app.errorhandler(404)
- def page_not_found(e):
- """处理所有404错误,对于非API路径返回index.html"""
- path = request.path
- # 检查是否是API请求
- if path.startswith('/api/'):
- # 对于API请求,返回404错误
- return jsonify({
- 'success': False,
- 'message': 'API endpoint not found'
- }), 404
- # 对于所有非API路径,返回index.html让前端路由处理
- return send_from_directory(STATIC_FOLDER, 'index.html'), 200
- @app.route('/api/serial/ports', methods=['GET'])
- def get_serial_ports():
- """获取可用串口列表"""
- ports = serial_client.list_ports()
- return jsonify({
- 'success': True,
- 'ports': ports
- })
- @app.route('/api/serial/connect', methods=['POST'])
- def serial_connect():
- """连接串口"""
- try:
- data = request.json
- port = data.get('port')
- baudrate = data.get('baudrate', 9600)
- bytesize = data.get('bytesize', 8)
- parity = data.get('parity', 'N')
- stopbits = data.get('stopbits', 1)
- timeout = data.get('timeout', 0.1)
-
- if not port:
- logger.warning("连接串口请求缺少串口名称")
- return jsonify({
- 'success': False,
- 'message': '串口名称不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- }), 400
-
- # 先断开之前的连接
- if serial_client.get_status():
- logger.info(f"断开现有串口连接: {serial_client.port}")
- serial_client.disconnect()
-
- # 连接新的串口
- logger.info(f"尝试连接串口: {port}, 波特率: {baudrate}")
- success, message = serial_client.connect(port, baudrate, bytesize, parity, stopbits, timeout)
-
- status_code = 200 if success else 400
- error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_CONNECTION_ERROR']
-
- response = {
- 'success': success,
- 'message': message,
- 'error_code': error_code
- }
-
- if success:
- logger.info(f"串口连接成功: {port}")
- else:
- logger.error(f"串口连接失败: {message}")
-
- return jsonify(response), status_code
- except Exception as e:
- error_msg = f'连接串口时出错: {str(e)}'
- logger.exception(error_msg) # 使用exception记录完整堆栈
- return jsonify({
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- }), 500
- @app.route('/api/serial/disconnect', methods=['POST'])
- def serial_disconnect():
- """断开串口连接"""
- success, message = serial_client.disconnect()
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/serial/status', methods=['GET'])
- def serial_get_status():
- """获取串口状态"""
- return jsonify({
- 'connected': serial_client.get_status()
- })
- @app.route('/api/serial/send', methods=['POST'])
- def serial_send():
- """发送数据到串口"""
- data = request.json
- message = data.get('message')
-
- if not message:
- return jsonify({
- 'success': False,
- 'message': '消息内容不能为空'
- })
-
- success, message = serial_client.send_data(message)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/connect', methods=['POST'])
- def mqtt_connect():
- """连接MQTT服务器"""
- try:
- data = request.json
- host = data.get('broker') or data.get('host', 'localhost')
- port = data.get('port', 1883)
- client_id = data.get('client_id', f'serial_gateway_{int(time.time())}')
- username = data.get('username')
- password = data.get('password')
- keepalive = data.get('keepalive', 60)
-
- # 先断开之前的连接
- if mqtt_client.get_status():
- logger.info(f"断开现有MQTT连接: {mqtt_client.host}:{mqtt_client.port}")
- mqtt_client.disconnect()
-
- # 连接新的MQTT服务器
- logger.info(f"尝试连接MQTT服务器: {host}:{port}, 客户端ID: {client_id}")
- success, message = mqtt_client.connect(
- host=host,
- port=port,
- client_id=client_id,
- username=username,
- password=password,
- keepalive=keepalive
- )
-
- # 如果连接成功,订阅主题
- if success and 'topics' in data:
- mqtt_client.subscribe(data['topics'])
-
- status_code = 200 if success else 400
- error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_CONNECTION_ERROR']
-
- response = {
- 'success': success,
- 'message': message,
- 'error_code': error_code
- }
-
- if success:
- logger.info(f"MQTT服务器连接成功: {host}:{port}")
- else:
- logger.error(f"MQTT服务器连接失败: {message}")
-
- return jsonify(response), status_code
- except Exception as e:
- error_msg = f'连接MQTT服务器时出错: {str(e)}'
- logger.exception(error_msg)
- return jsonify({
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- }), 500
- @app.route('/api/mqtt/disconnect', methods=['POST'])
- def mqtt_disconnect():
- """断开MQTT连接"""
- success, message = mqtt_client.disconnect()
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/status', methods=['GET'])
- def mqtt_get_status():
- """获取MQTT状态"""
- return jsonify({
- 'connected': mqtt_client.get_status()
- })
- @app.route('/api/mqtt/publish', methods=['POST'])
- def mqtt_publish():
- """发布MQTT消息"""
- data = request.json
- topic = data.get('topic')
- message = data.get('message')
-
- if not topic or not message:
- return jsonify({
- 'success': False,
- 'message': '主题和消息内容不能为空'
- })
-
- success, message = mqtt_client.publish(topic, message)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/subscribe', methods=['POST'])
- def mqtt_subscribe():
- """订阅MQTT主题"""
- data = request.json
- topics = data.get('topics', [])
-
- if not topics:
- return jsonify({
- 'success': False,
- 'message': '请至少订阅一个主题'
- })
-
- success, message = mqtt_client.subscribe(topics)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/data/serial', methods=['GET'])
- def get_serial_data():
- """获取串口数据"""
- return jsonify({
- 'data': serial_data_buffer
- })
- @app.route('/api/data/mqtt', methods=['GET'])
- def get_mqtt_data():
- """获取MQTT数据"""
- return jsonify({
- 'data': mqtt_data_buffer
- })
- @app.route('/api/forward/config', methods=['POST'])
- def set_forward_config():
- """设置转发配置"""
- global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
-
- data = request.json
- forward_serial_to_mqtt = data.get('serial_to_mqtt', False)
- forward_mqtt_to_serial = data.get('mqtt_to_serial', False)
- if 'publish_topic' in data:
- mqtt_publish_topic = data['publish_topic']
-
- return jsonify({
- 'success': True,
- 'message': '转发配置已更新',
- 'config': {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- }
- })
- @app.route('/api/forward/status', methods=['GET'])
- def get_forward_status():
- """获取转发状态"""
- return jsonify({
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- })
- # 健康检查端点
- @app.route('/api/health', methods=['GET'])
- def health_check():
- """健康检查端点"""
- try:
- # 获取客户端连接数量
- client_counts = {
- 'data': len(connected_clients['data']),
- 'status': len(connected_clients['status']),
- 'control': len(connected_clients['control'])
- }
-
- # 获取缓冲区大小
- buffer_sizes = {
- 'serial': len(serial_data_buffer),
- 'mqtt': len(mqtt_data_buffer)
- }
-
- # 执行系统负载检查
- # 注意:这只是一个简化的负载检查,实际应用中可能需要更复杂的监控
- is_healthy = True
- load_warnings = []
-
- # 检查缓冲区是否过大
- if buffer_sizes['serial'] > MAX_BUFFER_SIZE * 0.8:
- is_healthy = False
- load_warnings.append(f"串口缓冲区接近最大容量: {buffer_sizes['serial']}/{MAX_BUFFER_SIZE}")
-
- if buffer_sizes['mqtt'] > MAX_BUFFER_SIZE * 0.8:
- is_healthy = False
- load_warnings.append(f"MQTT缓冲区接近最大容量: {buffer_sizes['mqtt']}/{MAX_BUFFER_SIZE}")
-
- # 检查WebSocket连接数是否过多
- total_clients = sum(client_counts.values())
- if total_clients > 100: # 设置合理的阈值
- is_healthy = False
- load_warnings.append(f"WebSocket连接数过多: {total_clients}")
-
- # 尝试获取网络状态信息(使用try-except包装,防止网络模块出错导致健康检查失败)
- network_info = None
- try:
- network_info = network_manager.get_network_status()
- except Exception as e:
- logger.warning(f"获取网络状态时出错: {str(e)}")
- # 不影响整体健康检查,只添加警告
- load_warnings.append(f"网络状态获取失败: {str(e)}")
-
- response = {
- 'status': 'healthy' if is_healthy else 'warning',
- 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
- 'services': {
- 'serial': serial_status,
- 'mqtt': mqtt_status,
- 'websocket': True
- },
- 'client_counts': client_counts,
- 'buffer_sizes': buffer_sizes,
- 'warnings': load_warnings
- }
-
- # 仅当获取到网络信息时添加
- if network_info:
- response['network'] = network_info
-
- return jsonify(response), 200
- except Exception as e:
- # 捕获所有异常,确保健康检查不会返回500错误
- logger.error(f"健康检查端点出错: {str(e)}")
- # 返回一个基础的健康状态,至少显示服务在运行
- return jsonify({
- 'status': 'error',
- 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
- 'error': str(e),
- 'services': {
- 'api': True, # API服务本身是运行的
- 'serial': None,
- 'mqtt': None,
- 'websocket': None
- }
- }), 200 # 仍然返回200,避免健康检查导致的连锁反应
- # 网络配置相关API
- @app.route('/api/network/config', methods=['GET'])
- def get_network_config():
- """获取当前网络配置"""
- try:
- config = network_manager.get_network_config()
- return jsonify(config)
- except Exception as e:
- logger.error(f'获取网络配置失败: {str(e)}')
- return jsonify({'success': False, 'message': f'获取网络配置失败: {str(e)}'}), 500
- @app.route('/api/network/config', methods=['POST'])
- def update_network_config():
- """更新网络配置"""
- try:
- config_data = request.json
- result = network_manager.update_network_config(config_data)
- if result['success']:
- return jsonify(result)
- else:
- return jsonify(result), 400
- except Exception as e:
- logger.error(f'更新网络配置失败: {str(e)}')
- return jsonify({'success': False, 'message': f'更新网络配置失败: {str(e)}'}), 500
- @app.route('/api/network/status', methods=['GET'])
- def get_network_status():
- """获取网络状态信息"""
- try:
- status = network_manager.get_network_status()
- return jsonify(status)
- except Exception as e:
- logger.error(f'获取网络状态失败: {str(e)}')
- return jsonify({'success': False, 'message': f'获取网络状态失败: {str(e)}'}), 500
- @app.route('/api/network/restart', methods=['POST'])
- def restart_network_service():
- """重启网络服务以应用新配置"""
- try:
- result = network_manager.restart_network_service()
- if result['success']:
- return jsonify(result)
- else:
- return jsonify(result), 500
- except Exception as e:
- logger.error(f'重启网络服务失败: {str(e)}')
- return jsonify({'success': False, 'message': f'重启网络服务失败: {str(e)}'}), 500
- # 不再需要静态文件目录,前端由nginx提供服务
- if __name__ == '__main__':
- try:
- # 启动前的初始化工作
- logger.info('启动串口-MQTT网关服务...')
- logger.info(f"配置信息: 主机={FLASK_HOST}, 端口={FLASK_PORT}, 调试模式={FLASK_DEBUG}")
-
- # 启动服务
- socketio.run(
- app,
- host=FLASK_HOST,
- port=FLASK_PORT,
- debug=FLASK_DEBUG,
- use_reloader=False, # 禁用重载器以避免重复初始化问题
- log_output=False # 禁用Flask的日志输出,使用我们自己的日志配置
- )
- except KeyboardInterrupt:
- # 优雅退出
- logger.info('正在关闭应用...')
- try:
- if serial_client.get_status():
- serial_client.disconnect()
- logger.info('串口连接已断开')
- if mqtt_client.get_status():
- mqtt_client.disconnect()
- logger.info('MQTT连接已断开')
- except Exception as e:
- logger.error(f'关闭连接时出错: {str(e)}')
-
- # 清理WebSocket连接
- for client_type in connected_clients:
- connected_clients[client_type].clear()
-
- logger.info('应用已安全关闭')
- except Exception as e:
- logger.exception(f'应用启动失败') # 使用exception记录完整堆栈
- # 确保资源被释放
- try:
- serial_client.disconnect()
- mqtt_client.disconnect()
- except:
- pass
|