| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839 |
- from flask import Flask, jsonify, request, abort
- from flask_cors import CORS
- from flask_socketio import SocketIO, emit
- import threading
- import time
- import json
- import os
- import logging
- import uuid
- # 配置文件路径
- SERIAL_CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'serial_config.json')
- # 导入配置
- from config import (
- MAX_BUFFER_SIZE,
- FLASK_SECRET_KEY,
- FLASK_DEBUG,
- FLASK_HOST,
- FLASK_PORT,
- LOG_LEVEL,
- LOG_FORMAT,
- LOG_FILE,
- SOCKETIO_ASYNC_MODE,
- SOCKETIO_ALLOWED_ORIGINS,
- DEFAULT_FORWARD_SERIAL_TO_MQTT,
- DEFAULT_FORWARD_MQTT_TO_SERIAL,
- DEFAULT_MQTT_PUBLISH_TOPIC,
- SOCKETIO_NAMESPACE_DATA,
- SOCKETIO_NAMESPACE_STATUS,
- SOCKETIO_NAMESPACE_CONTROL,
- ERROR_CODES,
- ERROR_MESSAGES,
- DEFAULT_CUSTOMER_ID,
- DEFAULT_DTU_ID,
- DTU_HEARTBEAT_INTERVAL,
- DEFAULT_MQTT_TOPIC_PREFIX,
- DTU_REGISTRATION_TOPIC,
- DTU_STATUS_TOPIC,
- DTU_CONTROL_TOPIC,
- DTU_RESPONSE_TOPIC,
- DTU_EVENT_TOPIC,
- DTU_ALARM_TOPIC,
- DTU_BROADCAST_TOPIC,
- MAX_PANELS,
- MAX_PORTS_PER_PANEL
- )
- # 配置日志
- logging_config = {
- 'level': getattr(logging, LOG_LEVEL),
- 'format': LOG_FORMAT
- }
- if LOG_FILE:
- logging_config['filename'] = LOG_FILE
- logging.basicConfig(**logging_config)
- logger = logging.getLogger('serial_mqtt_gateway')
- from modules.serial_port import SerialPort
- from modules.mqtt_client import MQTTClient
- from modules.network_config import network_manager
- from modules.modbus_rtu import ModbusRTUClient, ANTENNA_ADDRESSES, AddressConfigProtocol, build_broadcast_query, build_confirm_address, build_assign_address
- app = Flask(__name__)
- app.config['SECRET_KEY'] = FLASK_SECRET_KEY
- # 配置CORS以允许nginx代理的前端访问
- CORS(app, resources={r"/api/*": {"origins": "*"}, r"/socket.io/*": {"origins": "*"}})
- # 初始化SocketIO
- socketio = SocketIO(
- app,
- cors_allowed_origins=SOCKETIO_ALLOWED_ORIGINS,
- async_mode=SOCKETIO_ASYNC_MODE,
- manage_session=False, # 禁用会话管理以提高性能
- ping_timeout=30, # 心跳超时时间
- ping_interval=25, # 心跳间隔
- logger=FLASK_DEBUG, # 根据Flask调试模式决定是否记录SocketIO日志
- engineio_logger=FLASK_DEBUG
- )
- # 初始化串口和MQTT客户端
- serial_client = SerialPort()
- mqtt_client = MQTTClient()
- modbus_client = ModbusRTUClient(serial_client)
- address_config = AddressConfigProtocol(serial_client)
- # 转发标志
- forward_serial_to_mqtt = DEFAULT_FORWARD_SERIAL_TO_MQTT
- forward_mqtt_to_serial = DEFAULT_FORWARD_MQTT_TO_SERIAL
- mqtt_publish_topic = DEFAULT_MQTT_PUBLISH_TOPIC
- # DTU MQTT协议配置(运行时可修改)
- dtu_config = {
- 'topic_prefix': DEFAULT_MQTT_TOPIC_PREFIX,
- 'customer_id': DEFAULT_CUSTOMER_ID,
- 'dtu_id': DEFAULT_DTU_ID,
- 'firmware_version': 'v1.0.0',
- 'hardware_version': 'v1.0',
- 'heartbeat_interval': DTU_HEARTBEAT_INTERVAL,
- 'enabled': True
- }
- DTU_CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dtu_config.json')
- def load_dtu_config():
- global dtu_config
- try:
- if os.path.exists(DTU_CONFIG_FILE):
- with open(DTU_CONFIG_FILE, 'r') as f:
- saved = json.load(f)
- dtu_config.update(saved)
- except Exception as e:
- logger.warning(f"加载DTU配置失败: {e}")
- def save_dtu_config():
- try:
- with open(DTU_CONFIG_FILE, 'w') as f:
- json.dump(dtu_config, f, indent=2)
- except Exception as e:
- logger.warning(f"保存DTU配置失败: {e}")
- # 端口状态追踪(用于事件检测)
- port_state = {} # {panel_id: {port_id: {'last_uid': str, 'expected_uid': str, 'alarm_count': int}}}
- # 面板配置(从地址配置模块加载)
- panel_config = {} # {panel_id: {'address': int, 'position': int, 'panel_uid': str}}
- # 数据存储缓冲区
- serial_data_buffer = []
- mqtt_data_buffer = []
- # 状态标志
- serial_status = False
- mqtt_status = False
- # 串口配置保存和加载函数
- def save_serial_config(port, baudrate=9600, timeout=1.0, bytesize=8, parity='N', stopbits=1):
- """保存串口配置"""
- config = {
- 'port': port,
- 'baudrate': baudrate,
- 'timeout': timeout,
- 'bytesize': bytesize,
- 'parity': parity,
- 'stopbits': stopbits
- }
- try:
- with open(SERIAL_CONFIG_FILE, 'w') as f:
- json.dump(config, f)
- logger.info(f"串口配置已保存: {port} @ {baudrate}")
- except Exception as e:
- logger.error(f"保存串口配置失败: {e}")
- def load_serial_config():
- """加载串口配置"""
- if os.path.exists(SERIAL_CONFIG_FILE):
- try:
- with open(SERIAL_CONFIG_FILE, 'r') as f:
- config = json.load(f)
- logger.info(f"已加载串口配置: {config.get('port')} @ {config.get('baudrate')}")
- return config
- except Exception as e:
- logger.error(f"加载串口配置失败: {e}")
- return None
- def auto_connect_serial():
- """自动连接上次使用的串口"""
- config = load_serial_config()
- if config and config.get('port'):
- port = config.get('port')
- baudrate = config.get('baudrate', 9600)
- timeout = config.get('timeout', 1.0)
- bytesize = config.get('bytesize', 8)
- parity = config.get('parity', 'N')
- stopbits = config.get('stopbits', 1)
- logger.info(f"尝试自动连接串口: {port} @ {baudrate}")
- success, message = serial_client.connect(port, baudrate=baudrate, timeout=timeout, bytesize=bytesize, parity=parity, stopbits=stopbits)
- if success:
- logger.info(f"自动连接串口成功: {port}")
- return True
- else:
- logger.warning(f"自动连接串口失败: {message}")
- return False
- # 客户端连接管理
- connected_clients = {
- 'data': set(),
- 'status': set(),
- 'control': set()
- }
- # 设置回调函数
- def serial_data_handler(data):
- """处理串口接收的数据"""
- try:
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- serial_data_buffer.append({
- 'timestamp': timestamp,
- 'data': data,
- 'direction': 'in'
- })
- if len(serial_data_buffer) > MAX_BUFFER_SIZE:
- serial_data_buffer.pop(0)
-
- socketio.emit('serial_data', {
- 'timestamp': timestamp,
- 'data': data,
- 'direction': 'in'
- }, namespace=SOCKETIO_NAMESPACE_DATA)
-
- if forward_serial_to_mqtt and mqtt_client.get_status():
- success, msg = mqtt_client.publish(mqtt_publish_topic, data)
- if not success:
- logger.warning(f"串口数据转发到MQTT失败: {msg}")
- except Exception as e:
- logger.error(f"处理串口数据时出错: {str(e)}")
- def serial_send_handler(data):
- """处理串口发送的数据"""
- try:
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- serial_data_buffer.append({
- 'timestamp': timestamp,
- 'data': data,
- 'direction': 'out'
- })
- if len(serial_data_buffer) > MAX_BUFFER_SIZE:
- serial_data_buffer.pop(0)
-
- socketio.emit('serial_data', {
- 'timestamp': timestamp,
- 'data': data,
- 'direction': 'out'
- }, namespace=SOCKETIO_NAMESPACE_DATA)
- except Exception as e:
- logger.error(f"处理串口发送数据时出错: {str(e)}")
- def serial_status_handler(status):
- """处理串口状态变化"""
- try:
- global serial_status
- serial_status = status
-
- # 通过WebSocket广播状态变化
- socketio.emit('serial_status', {
- 'connected': status
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
-
- logger.info(f"串口状态更新: {'已连接' if status else '已断开'}")
- except Exception as e:
- logger.error(f"处理串口状态时出错: {str(e)}")
- def mqtt_data_handler(data):
- """处理MQTT接收的数据"""
- try:
- # 添加到缓冲区
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- mqtt_data_buffer.append({
- 'timestamp': timestamp,
- 'topic': data['topic'],
- 'payload': data['payload']
- })
- # 保持缓冲区大小
- if len(mqtt_data_buffer) > MAX_BUFFER_SIZE:
- mqtt_data_buffer.pop(0)
-
- # 通过WebSocket广播数据
- socketio.emit('mqtt_data', {
- 'timestamp': timestamp,
- 'topic': data['topic'],
- 'payload': data['payload']
- }, namespace=SOCKETIO_NAMESPACE_DATA)
-
- # 如果启用了转发且串口已连接,转发数据到串口
- if forward_mqtt_to_serial and serial_client.get_status():
- success, msg = serial_client.send_data(data['payload'])
- if not success:
- logger.warning(f"MQTT数据转发到串口失败: {msg}")
- except Exception as e:
- logger.error(f"处理MQTT数据时出错: {str(e)}")
- def mqtt_status_handler(status):
- """处理MQTT状态变化"""
- try:
- global mqtt_status
- mqtt_status = status
- # 通过WebSocket广播状态变化
- socketio.emit('mqtt_status', {
- 'connected': status
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
- logger.info(f"MQTT状态更新: {'已连接' if status else '已断开'}")
- # 如果MQTT连接成功且启用了DTU协议,发送注册消息和订阅控制主题
- if status and dtu_config.get('enabled'):
- socketio.sleep(1) # 等待连接稳定
- # 发送DTU注册消息
- dtu_register()
- # 订阅控制主题
- control_topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'control')
- mqtt_client.subscribe(control_topic)
- logger.info(f"已订阅控制主题: {control_topic}")
- # 订阅广播主题 (DTU发现 7.14 和批量配置 7.15)
- discover_topic = build_dtu_topic('broadcast', 'dtu', 'discover')
- config_topic = build_dtu_topic('broadcast', 'dtu', 'config')
- mqtt_client.subscribe(discover_topic)
- mqtt_client.subscribe(config_topic)
- logger.info(f"已订阅广播主题: {discover_topic}, {config_topic}")
- except Exception as e:
- logger.error(f"处理MQTT状态时出错: {str(e)}")
- # ========== DTU MQTT协议处理函数 ==========
- def build_dtu_topic(*parts):
- """构建DTU MQTT主题"""
- prefix = dtu_config.get('topic_prefix', '线架系统')
- return '/'.join([prefix] + list(parts))
- def dtu_register():
- """发送DTU注册消息"""
- if not mqtt_client.get_status():
- logger.warning("MQTT未连接,无法发送注册消息")
- return False
- try:
- # 加载面板配置
- devices = address_config.get_stored_devices()
- panels = []
- panel_id = 1
- for uid_hex, addr in devices.items():
- panels.append({
- 'panel_id': f"PANEL_{dtu_config['dtu_id']}_{addr}",
- 'address': addr,
- 'position': panel_id
- })
- panel_id += 1
- payload = {
- 'msg_id': f"reg_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'REGISTER',
- 'payload': {
- 'firmware_version': dtu_config.get('firmware_version', 'v1.0.0'),
- 'hardware_version': dtu_config.get('hardware_version', 'v1.0'),
- 'panel_count': len(panels),
- 'network_type': 'ethernet',
- 'signal_strength': None,
- 'uptime': int(time.time() * 1000), # 简化处理
- 'panels': panels
- }
- }
- topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'register')
- success, msg = mqtt_client.publish(topic, json.dumps(payload))
- if success:
- logger.info(f"DTU注册消息已发送: {topic}")
- else:
- logger.error(f"DTU注册消息发送失败: {msg}")
- return success
- except Exception as e:
- logger.error(f"发送DTU注册消息失败: {str(e)}")
- return False
- def handle_broadcast_discover(payload):
- """处理广播发现消息 (7.14)
- 服务器通过广播主题发送DISCOVER消息,
- DTU收到后应立即回应REGISTER消息进行注册
- """
- try:
- logger.info(f"收到广播发现消息: {payload}")
- # 检查消息是否是有效的DISCOVER类型
- if isinstance(payload, dict):
- msg_type = payload.get('type', '')
- if msg_type == 'DISCOVER':
- # 立即响应注册消息
- logger.info("收到DISCOVER广播,响应注册消息")
- dtu_register()
- # 可选:也发送一次状态消息
- socketio.sleep(0.5)
- dtu_publish_status()
- except Exception as e:
- logger.error(f"处理广播发现消息失败: {str(e)}")
- def handle_broadcast_config(payload):
- """处理广播批量配置消息 (7.15)
- 服务器通过广播主题发送CONFIG消息,
- 用于批量配置所有在线DTU
- """
- try:
- logger.info(f"收到广播配置消息: {payload}")
- if isinstance(payload, dict):
- msg_type = payload.get('type', '')
- if msg_type == 'CONFIG':
- config_data = payload.get('payload', {})
- # 处理批量配置
- # 这里可以处理各种配置项,如:
- # - 上报间隔配置
- # - 告警阈值配置
- # - 网络配置等
- logger.info(f"应用批量配置: {config_data}")
- # 可以在这里添加配置应用逻辑
- # 例如:更新本地配置、发送确认等
- # 发送配置确认响应
- response_topic = build_dtu_topic(
- dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'status'
- )
- response_payload = {
- 'msg_id': f"cfg_ack_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'CONFIG_ACK',
- 'payload': {
- 'config_received': True,
- 'applied_settings': config_data
- }
- }
- mqtt_client.publish(response_topic, json.dumps(response_payload))
- logger.info("配置确认响应已发送")
- except Exception as e:
- logger.error(f"处理广播配置消息失败: {str(e)}")
- def dtu_publish_status(force=False):
- """发送DTU状态/心跳消息"""
- if not mqtt_client.get_status() or not dtu_config.get('enabled'):
- return False
- try:
- # 统计面板在线状态
- panel_online = 0
- panel_offline = 0
- # 检查串口状态
- serial_st = serial_client.get_status()
- rs485_status = 'normal' if (isinstance(serial_st, dict) and serial_st.get('connected', False)) else 'error'
- payload = {
- 'msg_id': f"hbt_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'STATUS',
- 'payload': {
- 'cpu_usage': None, # 可后续添加
- 'memory_usage': None,
- 'temperature': None,
- 'network_status': 'online',
- 'panel_online': panel_online,
- 'panel_offline': panel_offline,
- 'screen_connected': False,
- 'mqtt_connected': mqtt_client.get_status(),
- 'rs485_status': rs485_status
- }
- }
- topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'status')
- success, _ = mqtt_client.publish(topic, json.dumps(payload))
- return success
- except Exception as e:
- logger.error(f"发送DTU状态消息失败: {str(e)}")
- return False
- def dtu_publish_event(panel_id, port_id, event_type, jumper_uid, previous_jumper_uid=None):
- """发送端口事件消息"""
- if not mqtt_client.get_status() or not dtu_config.get('enabled'):
- return False
- try:
- payload = {
- 'msg_id': f"evt_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'EVENT',
- 'payload': {
- 'panel_id': panel_id,
- 'port_id': port_id,
- 'event_type': event_type,
- 'event_id': f"evt_{uuid.uuid4().hex[:8]}",
- 'jumper_uid': jumper_uid,
- 'previous_jumper_uid': previous_jumper_uid
- }
- }
- topic = build_dtu_topic(dtu_config['customer_id'], 'patchpanel', dtu_config['dtu_id'], panel_id, 'event')
- success, _ = mqtt_client.publish(topic, json.dumps(payload))
- if success:
- logger.info(f"端口事件已发送: {panel_id}:{port_id} - {event_type}")
- # 记录到事件历史
- port_event_history.append({
- 'timestamp': int(time.time() * 1000),
- 'panel_id': panel_id,
- 'port_id': port_id,
- 'event_type': event_type,
- 'jumper_uid': jumper_uid,
- 'previous_jumper_uid': previous_jumper_uid
- })
- # 限制历史记录数量
- if len(port_event_history) > 1000:
- port_event_history[:] = port_event_history[-1000:]
- return success
- except Exception as e:
- logger.error(f"发送端口事件失败: {str(e)}")
- return False
- def dtu_publish_alarm(panel_id, port_id, alarm_type, expected_jumper_uid, actual_jumper_uid):
- """发送非法告警消息"""
- if not mqtt_client.get_status() or not dtu_config.get('enabled'):
- return False
- try:
- description = f"端口{port_id}期望跳线{expected_jumper_uid},实际{'未读到' if not actual_jumper_uid else actual_jumper_uid}"
- payload = {
- 'msg_id': f"alm_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'ALARM',
- 'payload': {
- 'panel_id': panel_id,
- 'port_id': port_id,
- 'alarm_type': alarm_type,
- 'severity': 'WARNING',
- 'expected_jumper_uid': expected_jumper_uid,
- 'actual_jumper_uid': actual_jumper_uid,
- 'description': description
- }
- }
- topic = build_dtu_topic(dtu_config['customer_id'], 'patchpanel', dtu_config['dtu_id'], panel_id, 'alarm')
- success, _ = mqtt_client.publish(topic, json.dumps(payload))
- if success:
- logger.warning(f"非法告警已发送: {panel_id}:{port_id} - {alarm_type}")
- # 记录到事件历史
- port_event_history.append({
- 'timestamp': int(time.time() * 1000),
- 'panel_id': panel_id,
- 'port_id': port_id,
- 'event_type': 'ALARM',
- 'alarm_type': alarm_type,
- 'expected_jumper_uid': expected_jumper_uid,
- 'actual_jumper_uid': actual_jumper_uid
- })
- # 限制历史记录数量
- if len(port_event_history) > 1000:
- port_event_history[:] = port_event_history[-1000:]
- return success
- except Exception as e:
- logger.error(f"发送非法告警失败: {str(e)}")
- return False
- def dtu_handle_control(topic, payload):
- """处理下行控制指令"""
- try:
- if not dtu_config.get('enabled'):
- return
- command = payload.get('payload', {}).get('command')
- target = payload.get('payload', {}).get('target')
- params = payload.get('payload', {}).get('params', {})
- logger.info(f"收到控制指令: command={command}, target={target}")
- response_payload = {
- 'msg_id': payload.get('msg_id'),
- 'timestamp': int(time.time() * 1000),
- 'dtu_id': dtu_config['dtu_id'],
- 'type': 'RESPONSE',
- 'payload': {
- 'command': command,
- 'target': target,
- 'success': True,
- 'result': {}
- }
- }
- # 处理各命令
- if command == 'SET_PORT_LED':
- # 设置端口LED
- port_id = params.get('port_id')
- led_mode = params.get('led_mode', 'OFF')
- # LED模式映射
- led_mode_map = {'OFF': 0, 'BLINK_RED': 1, 'BLINK_GREEN': 2, 'BLINK_BLUE': 3}
- color = led_mode_map.get(led_mode, 0)
- # 查找设备地址
- device_address = 1 # 默认
- for panel_id, cfg in panel_config.items():
- if panel_id == target:
- device_address = cfg.get('address', 1)
- break
- result = modbus_client.set_rgb_led(device_address, port_id, color)
- response_payload['payload']['success'] = 'error' not in result
- response_payload['payload']['result'] = result
- elif command == 'QUERY_DTU_STATUS':
- # 查询DTU状态
- dtu_publish_status(force=True)
- elif command == 'SYNC_PORT_MAPPING':
- # 同步单端口期望映射
- port_id = params.get('port_id')
- jumper_uid = params.get('jumper_uid')
- if target not in port_state:
- port_state[target] = {}
- if port_id not in port_state[target]:
- port_state[target][port_id] = {'last_uid': None, 'expected_uid': None, 'alarm_count': 0}
- port_state[target][port_id]['expected_uid'] = jumper_uid
- elif command == 'SYNC_ALL_MAPPING':
- # 批量同步期望映射
- mappings = params.get('mappings', [])
- for mapping in mappings:
- panel_id = mapping.get('panel_id')
- port_id = mapping.get('port_id')
- jumper_uid = mapping.get('jumper_uid')
- if panel_id not in port_state:
- port_state[panel_id] = {}
- if port_id not in port_state[panel_id]:
- port_state[panel_id][port_id] = {'last_uid': None, 'expected_uid': None, 'alarm_count': 0}
- port_state[panel_id][port_id]['expected_uid'] = jumper_uid
- elif command == 'REBOOT':
- # 重启DTU(模拟)
- response_payload['payload']['result'] = {'message': 'Reboot command received'}
- elif command == 'READ_PANEL_STATUS':
- # 读取面板状态
- # 读取面板的多个寄存器获取状态信息
- # 寄存器地址定义: 0x0000=运行状态, 0x0001=LED控制, 0x0002-0x0009=天线卡状态
- device_address = 1
- for panel_id, cfg in panel_config.items():
- if panel_id == target:
- device_address = cfg.get('address', 1)
- break
- # 读取面板状态寄存器 (地址0x0000开始,读取10个寄存器)
- status_result = modbus_client.read_holding_registers(device_address, 0x0000, 10)
- # 解析状态
- registers = status_result.get('registers', [])
- panel_status = {
- 'device_address': device_address,
- 'run_status': registers[0] if len(registers) > 0 else None,
- 'led_control': registers[1] if len(registers) > 1 else None,
- 'antenna_status': registers[2:10] if len(registers) >= 10 else [],
- 'raw_registers': registers
- }
- response_payload['payload']['success'] = 'error' not in status_result
- response_payload['payload']['result'] = panel_status
- elif command == 'QUERY_JUMPER_STATUS':
- # 查询跳线器状态
- # 跳线器状态寄存器: 0x0100=跳线连接状态, 0x0101=跳线ID高字节, 0x0102=跳线ID低字节
- device_address = 1
- for panel_id, cfg in panel_config.items():
- if panel_id == target:
- device_address = cfg.get('address', 1)
- break
- # 读取跳线器状态寄存器 (地址0x0100开始,读取3个寄存器)
- jumper_result = modbus_client.read_holding_registers(device_address, 0x0100, 3)
- registers = jumper_result.get('registers', [])
- jumper_status = {
- 'device_address': device_address,
- 'connected': bool(registers[0]) if len(registers) > 0 else False,
- 'jumper_id_high': registers[1] if len(registers) > 1 else 0,
- 'jumper_id_low': registers[2] if len(registers) > 2 else 0,
- 'jumper_id': (registers[1] << 16 | registers[2]) if len(registers) > 2 else 0,
- 'raw_registers': registers
- }
- response_payload['payload']['success'] = 'error' not in jumper_result
- response_payload['payload']['result'] = jumper_status
- elif command == 'QUERY_ENV_SENSOR':
- # 查询环境传感器
- # 环境传感器寄存器: 0x0200=温度, 0x0201=湿度, 0x0202=光照, 0x0203=烟雾
- device_address = 1
- for panel_id, cfg in panel_config.items():
- if panel_id == target:
- device_address = cfg.get('address', 1)
- break
- # 读取环境传感器寄存器 (地址0x0200开始,读取4个寄存器)
- sensor_result = modbus_client.read_holding_registers(device_address, 0x0200, 4)
- registers = sensor_result.get('registers', [])
- # 温度和湿度为有符号16位整数,需要除以100转换为实际值
- env_sensor = {
- 'device_address': device_address,
- 'temperature': registers[0] / 100.0 if len(registers) > 0 else None,
- 'humidity': registers[1] / 100.0 if len(registers) > 1 else None,
- 'light_level': registers[2] if len(registers) > 2 else None,
- 'smoke_detected': bool(registers[3]) if len(registers) > 3 else False,
- 'raw_registers': registers
- }
- response_payload['payload']['success'] = 'error' not in sensor_result
- response_payload['payload']['result'] = env_sensor
- elif command == 'OTA_UPGRADE':
- params = payload.get('payload', {}).get('params', {})
- if params.get('firmware_url'):
- import threading
- threading.Thread(target=lambda: _run_ota(params), daemon=True).start()
- response_payload['payload']['success'] = True
- response_payload['payload']['result'] = {
- 'firmware_version': params.get('firmware_version'),
- 'ota_status': 'DOWNLOADING',
- 'restart_required': False
- }
- elif command == 'OTA_CANCEL':
- # 取消OTA升级
- # 向OTA控制寄存器写入取消命令 (0x0000=取消)
- device_address = 1
- for panel_id, cfg in panel_config.items():
- if panel_id == target:
- device_address = cfg.get('address', 1)
- break
- # OTA控制寄存器地址: 0x0300
- # 值: 0x0000=取消升级
- cancel_result = modbus_client.write_single_register(device_address, 0x0300, 0x0000)
- response_payload['payload']['success'] = 'error' not in cancel_result
- response_payload['payload']['result'] = {
- 'message': 'OTA upgrade cancelled' if 'error' not in cancel_result else 'Failed to cancel OTA',
- 'raw_result': cancel_result
- }
- # 发送响应
- response_topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'response')
- mqtt_client.publish(response_topic, json.dumps(response_payload))
- except Exception as e:
- logger.error(f"处理控制指令失败: {str(e)}")
- # 启动DTU心跳定时器
- def start_dtu_heartbeat():
- """启动DTU心跳定时任务"""
- def heartbeat_task():
- while True:
- socketio.sleep(dtu_config.get('heartbeat_interval', DTU_HEARTBEAT_INTERVAL))
- if mqtt_client.get_status() and dtu_config.get('enabled'):
- dtu_publish_status()
- socketio.start_background_task(target=heartbeat_task)
- # 修改mqtt_data_handler以处理控制指令
- def mqtt_data_handler_extended(data):
- """处理MQTT接收的数据(扩展版,含DTU协议)"""
- try:
- topic = data.get('topic', '')
- payload_str = data.get('payload', '')
- # 尝试解析JSON
- try:
- payload = json.loads(payload_str) if isinstance(payload_str, str) else payload_str
- except:
- payload = payload_str
- # 添加到缓冲区
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
- mqtt_data_buffer.append({
- 'timestamp': timestamp,
- 'topic': topic,
- 'payload': payload_str
- })
- if len(mqtt_data_buffer) > MAX_BUFFER_SIZE:
- mqtt_data_buffer.pop(0)
- # 通过WebSocket广播
- socketio.emit('mqtt_data', {
- 'timestamp': timestamp,
- 'topic': topic,
- 'payload': payload_str
- }, namespace=SOCKETIO_NAMESPACE_DATA)
- # 检查是否是控制指令主题
- expected_control_topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'control')
- if topic == expected_control_topic:
- dtu_handle_control(topic, payload)
- # 检查是否是状态主题,用于处理OTA状态上报
- expected_status_topic = build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'status')
- if topic == expected_status_topic and isinstance(payload, dict):
- if 'ota_status' in payload or 'firmware_version' in payload:
- handle_ota_status(dtu_config['dtu_id'], payload)
- # 检查是否是环境传感器数据主题
- sensor_topic = build_dtu_topic(dtu_config['customer_id'], 'sensor', dtu_config['dtu_id'], 'data')
- if topic == sensor_topic and isinstance(payload, dict):
- if 'temperature' in payload or 'humidity' in payload:
- update_env_sensor_data(
- payload.get('temperature'),
- payload.get('humidity'),
- payload.get('dtu_temperature'),
- payload.get('sensor_update_time')
- )
- # 检查是否是广播主题 - DTU发现 (7.14)
- discover_topic = build_dtu_topic('broadcast', 'dtu', 'discover')
- if topic == discover_topic:
- handle_broadcast_discover(payload)
- # 检查是否是广播主题 - 批量配置 (7.15)
- config_topic = build_dtu_topic('broadcast', 'dtu', 'config')
- if topic == config_topic:
- handle_broadcast_config(payload)
- # 转发到串口(如果启用)
- if forward_mqtt_to_serial and serial_client.get_status():
- success, msg = serial_client.send_data(payload_str)
- if not success:
- logger.warning(f"MQTT数据转发到串口失败: {msg}")
- except Exception as e:
- logger.error(f"处理MQTT数据时出错: {str(e)}")
- # WebSocket事件处理
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_DATA)
- def handle_data_connect():
- """处理数据命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['data'].add(client_id)
- logger.info(f'客户端已连接到数据命名空间,当前连接数: {len(connected_clients["data"])}')
-
- # 发送当前的缓冲区数据,限制发送的历史记录数量
- max_history = 100 # 限制发送的历史记录数量以提高性能
- emit('serial_data_history', {'data': serial_data_buffer[-max_history:]})
- emit('mqtt_data_history', {'data': mqtt_data_buffer[-max_history:]})
-
- # 存储客户端ID以便断开连接时使用
- socketio.start_background_task(target=lambda: None) # 确保上下文可用
- except Exception as e:
- logger.error(f"处理数据命名空间连接时出错: {str(e)}")
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_STATUS)
- def handle_status_connect():
- """处理状态命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['status'].add(client_id)
- logger.info(f'客户端已连接到状态命名空间,当前连接数: {len(connected_clients["status"])}')
-
- # 发送当前状态
- emit('serial_status', {'connected': serial_status})
- emit('mqtt_status', {'connected': mqtt_status})
- emit('forward_status', {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- })
- except Exception as e:
- logger.error(f"处理状态命名空间连接时出错: {str(e)}")
- @socketio.on('connect', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_control_connect():
- """处理控制命名空间的连接"""
- try:
- client_id = str(uuid.uuid4())
- connected_clients['control'].add(client_id)
- logger.info(f'客户端已连接到控制命名空间,当前连接数: {len(connected_clients["control"])}')
- except Exception as e:
- logger.error(f"处理控制命名空间连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_DATA)
- def handle_data_disconnect():
- """处理数据命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- # 在实际应用中,可能需要更复杂的逻辑来追踪具体哪个客户端断开了连接
- if len(connected_clients['data']) > 0:
- # 这里简化处理,实际应该维护session到client_id的映射
- connected_clients['data'].pop() # 注意:这是一个简化的实现
- logger.info(f'客户端已断开数据命名空间的连接,当前连接数: {len(connected_clients["data"])}')
- except Exception as e:
- logger.error(f"处理数据命名空间断开连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_STATUS)
- def handle_status_disconnect():
- """处理状态命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- if len(connected_clients['status']) > 0:
- connected_clients['status'].pop()
- logger.info(f'客户端已断开状态命名空间的连接,当前连接数: {len(connected_clients["status"])}')
- except Exception as e:
- logger.error(f"处理状态命名空间断开连接时出错: {str(e)}")
- @socketio.on('disconnect', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_control_disconnect():
- """处理控制命名空间的断开连接"""
- try:
- # 清理客户端连接记录
- if len(connected_clients['control']) > 0:
- connected_clients['control'].pop()
- logger.info(f'客户端已断开控制命名空间的连接,当前连接数: {len(connected_clients["control"])}')
- except Exception as e:
- logger.error(f"处理控制命名空间断开连接时出错: {str(e)}")
- @socketio.on('serial_send', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_serial_send(data):
- """通过WebSocket处理串口发送数据请求"""
- try:
- message = data.get('message', '')
- if not message:
- emit('serial_send_response', {
- 'success': False,
- 'message': '消息内容不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- return
-
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- emit('serial_send_response', {
- 'success': False,
- 'message': '串口未连接',
- 'error_code': ERROR_CODES['SERIAL_CONNECTION_ERROR']
- })
- return
-
- success, msg = serial_client.send_data(message)
- emit('serial_send_response', {
- 'success': success,
- 'message': msg,
- 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_SEND_ERROR']
- })
-
- if success:
- logger.info(f"通过WebSocket发送串口数据成功: {message[:50]}..." if len(message) > 50 else message)
- except Exception as e:
- error_msg = f"处理串口发送请求时出错: {str(e)}"
- logger.error(error_msg)
- emit('serial_send_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('mqtt_publish', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_mqtt_publish(data):
- """通过WebSocket处理MQTT发布数据请求"""
- try:
- topic = data.get('topic', '')
- message = data.get('message', '')
-
- if not topic or not message:
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': '主题或消息内容不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- return
-
- if not mqtt_client.get_status():
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': 'MQTT未连接',
- 'error_code': ERROR_CODES['MQTT_CONNECTION_ERROR']
- })
- return
-
- success, msg = mqtt_client.publish(topic, message)
- emit('mqtt_publish_response', {
- 'success': success,
- 'message': msg,
- 'error_code': ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_PUBLISH_ERROR']
- })
-
- if success:
- logger.info(f"通过WebSocket发布MQTT消息成功: 主题={topic}, 消息={message[:50]}..." if len(message) > 50 else message)
- except Exception as e:
- error_msg = f"处理MQTT发布请求时出错: {str(e)}"
- logger.error(error_msg)
- emit('mqtt_publish_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('update_forward_config', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_update_forward_config(data):
- """通过WebSocket更新转发配置"""
- global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
-
- try:
- # 更新转发标志
- if 'serial_to_mqtt' in data:
- forward_serial_to_mqtt = bool(data['serial_to_mqtt'])
- if 'mqtt_to_serial' in data:
- forward_mqtt_to_serial = bool(data['mqtt_to_serial'])
- if 'publish_topic' in data:
- new_topic = str(data['publish_topic'])
- if not new_topic.strip():
- raise ValueError("发布主题不能为空")
- mqtt_publish_topic = new_topic
-
- # 广播配置更新
- socketio.emit('forward_status', {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- }, namespace=SOCKETIO_NAMESPACE_STATUS)
-
- emit('update_forward_config_response', {
- 'success': True,
- 'message': '转发配置已更新',
- 'error_code': ERROR_CODES['SUCCESS']
- })
-
- logger.info(f"转发配置已更新 - 串口到MQTT: {forward_serial_to_mqtt}, MQTT到串口: {forward_mqtt_to_serial}, 发布主题: {mqtt_publish_topic}")
- except ValueError as e:
- error_msg = str(e)
- logger.warning(f"转发配置更新失败: {error_msg}")
- emit('update_forward_config_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- except Exception as e:
- error_msg = f'更新转发配置失败: {str(e)}'
- logger.error(error_msg)
- emit('update_forward_config_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- @socketio.on('clear_data_buffer', namespace=SOCKETIO_NAMESPACE_CONTROL)
- def handle_clear_data_buffer(data):
- """通过WebSocket清空数据缓冲区"""
- try:
- buffer_type = data.get('type', '')
-
- if buffer_type == 'serial':
- serial_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': '串口数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("串口数据缓冲区已清空")
- elif buffer_type == 'mqtt':
- mqtt_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': 'MQTT数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("MQTT数据缓冲区已清空")
- elif buffer_type == 'all':
- serial_data_buffer.clear()
- mqtt_data_buffer.clear()
- emit('clear_data_buffer_response', {
- 'success': True,
- 'message': '所有数据缓冲区已清空',
- 'error_code': ERROR_CODES['SUCCESS']
- })
- logger.info("所有数据缓冲区已清空")
- else:
- emit('clear_data_buffer_response', {
- 'success': False,
- 'message': '无效的缓冲区类型',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- })
- logger.warning(f"清空缓冲区失败: 无效的缓冲区类型 '{buffer_type}'")
- except Exception as e:
- error_msg = f'清空缓冲区失败: {str(e)}'
- logger.error(error_msg)
- emit('clear_data_buffer_response', {
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- })
- # 设置回调
- serial_client.set_data_callback(serial_data_handler)
- serial_client.set_send_callback(serial_send_handler)
- serial_client.set_status_callback(serial_status_handler)
- mqtt_client.set_data_callback(mqtt_data_handler_extended)
- mqtt_client.set_status_callback(mqtt_status_handler)
- # 设备配置文件路径
- DEVICE_CONFIG_FILE = '/root/dzxj_dtu/devices.json'
- confirm_loop_running = False
- def save_device_config():
- """保存设备配置到文件"""
- try:
- filepath = DEVICE_CONFIG_FILE
- success = address_config.save_config(filepath)
- if success:
- logger.info(f"设备配置已保存到: {filepath}")
- except Exception as e:
- logger.error(f"保存设备配置失败: {str(e)}")
- def load_device_config():
- """加载设备配置"""
- try:
- filepath = DEVICE_CONFIG_FILE
- if os.path.exists(filepath):
- success = address_config.load_config(filepath)
- if success:
- devices = address_config.get_stored_devices()
- logger.info(f"已加载 {len(devices)} 个设备配置")
- return devices
- except Exception as e:
- logger.error(f"加载设备配置失败: {str(e)}")
- return {}
- def confirm_loop():
- """后台确认线程:每10秒对所有已存储设备发送 confirm_address"""
- global confirm_loop_running
- confirm_loop_running = True
- logger.info("启动设备确认线程 (间隔10秒)")
- while confirm_loop_running:
- try:
- devices = address_config.get_stored_devices()
- if not devices:
- time.sleep(10)
- continue
- if not serial_client.get_status():
- time.sleep(10)
- continue
- _st = serial_client.get_status()
- if not (isinstance(_st, dict) and _st.get('connected', False)):
- time.sleep(10)
- continue
- for uid_hex, addr in devices.items():
- try:
- uid_bytes = bytes.fromhex(uid_hex)
- cmd = build_confirm_address(addr, uid_bytes)
- success, msg = serial_client.send_raw(cmd)
- if success:
- logger.debug(f"确认设备: 地址={addr}, UID={uid_hex[:16]}...")
- else:
- logger.warning(f"确认失败 地址={addr}: {msg}")
- except Exception as e:
- logger.error(f"确认异常 地址={addr}: {str(e)}")
- time.sleep(0.1)
- except Exception as e:
- logger.error(f"确认线程异常: {str(e)}")
- time.sleep(10)
- # API路由
- # 移除静态文件服务,前端由nginx提供服务
- # 根路径路由
- @app.route('/')
- def index():
- return send_from_directory(STATIC_FOLDER, 'index.html')
- # 404错误处理器 - 解决SPA路由刷新问题
- @app.errorhandler(404)
- def page_not_found(e):
- """处理所有404错误,对于非API路径返回index.html"""
- path = request.path
- # 检查是否是API请求
- if path.startswith('/api/'):
- # 对于API请求,返回404错误
- return jsonify({
- 'success': False,
- 'message': 'API endpoint not found'
- }), 404
- # 对于所有非API路径,返回index.html让前端路由处理
- return send_from_directory(STATIC_FOLDER, 'index.html'), 200
- @app.route('/api/serial/ports', methods=['GET'])
- def get_serial_ports():
- """获取可用串口列表"""
- ports = serial_client.list_ports()
- return jsonify({
- 'success': True,
- 'ports': ports
- })
- @app.route('/api/serial/connect', methods=['POST'])
- def serial_connect():
- """连接串口"""
- try:
- data = request.json
- port = data.get('port')
- baudrate = data.get('baudrate', 9600)
- bytesize = data.get('bytesize', 8)
- parity = data.get('parity', 'N')
- stopbits = data.get('stopbits', 1)
- timeout = data.get('timeout', 0.1)
-
- if not port:
- logger.warning("连接串口请求缺少串口名称")
- return jsonify({
- 'success': False,
- 'message': '串口名称不能为空',
- 'error_code': ERROR_CODES['CONFIG_ERROR']
- }), 400
-
- # 先断开之前的连接
- _status = serial_client.get_status()
- if isinstance(_status, dict) and _status.get('connected', False):
- _port = serial_client.current_config.port if serial_client.current_config else 'unknown'
- logger.info(f"断开现有串口连接: {_port}")
- serial_client.disconnect()
-
- # 连接新的串口
- logger.info(f"尝试连接串口: {port}, 波特率: {baudrate}")
- success, message = serial_client.connect(port, baudrate=baudrate, timeout=timeout, bytesize=bytesize, parity=parity, stopbits=stopbits)
-
- status_code = 200 if success else 400
- error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['SERIAL_CONNECTION_ERROR']
-
- response = {
- 'success': success,
- 'message': message,
- 'error_code': error_code
- }
- if success:
- logger.info(f"串口连接成功: {port}")
- # 保存串口配置
- save_serial_config(port, baudrate, timeout, bytesize, parity, stopbits)
- else:
- logger.error(f"串口连接失败: {message}")
- return jsonify(response), status_code
- except Exception as e:
- error_msg = f'连接串口时出错: {str(e)}'
- logger.exception(error_msg) # 使用exception记录完整堆栈
- return jsonify({
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- }), 500
- @app.route('/api/serial/disconnect', methods=['POST'])
- def serial_disconnect():
- """断开串口连接"""
- success, message = serial_client.disconnect()
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/serial/status', methods=['GET'])
- def serial_get_status():
- """获取串口状态"""
- _st = serial_client.get_status()
- saved_config = {}
- try:
- with open(SERIAL_CONFIG_FILE, 'r') as f:
- saved_config = json.load(f)
- except (FileNotFoundError, json.JSONDecodeError):
- pass
- result = {'saved_config': saved_config}
- if isinstance(_st, dict):
- result['connected'] = _st.get('connected', False)
- if result['connected']:
- cfg = _st.get('config')
- result['port'] = cfg.port if cfg else None
- else:
- result['port'] = None
- else:
- result['connected'] = bool(_st)
- result['port'] = None
- return jsonify(result)
- @app.route('/api/serial/send', methods=['POST'])
- def serial_send():
- """发送数据到串口"""
- data = request.json
- message = data.get('message')
-
- if not message:
- return jsonify({
- 'success': False,
- 'message': '消息内容不能为空'
- })
-
- success, message = serial_client.send_data(message)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/connect', methods=['POST'])
- def mqtt_connect():
- """连接MQTT服务器"""
- try:
- data = request.json
- host = data.get('broker') or data.get('host', 'localhost')
- port = data.get('port', 1883)
- client_id = data.get('client_id', f'serial_gateway_{int(time.time())}')
- username = data.get('username')
- password = data.get('password')
- keepalive = data.get('keepalive', 60)
-
- # 先断开之前的连接
- if mqtt_client.get_status():
- logger.info(f"断开现有MQTT连接: {mqtt_client.host}:{mqtt_client.port}")
- mqtt_client.disconnect()
-
- # 连接新的MQTT服务器
- logger.info(f"尝试连接MQTT服务器: {host}:{port}, 客户端ID: {client_id}")
- success, message = mqtt_client.connect(
- host=host,
- port=port,
- client_id=client_id,
- username=username,
- password=password,
- keepalive=keepalive
- )
-
- # 如果连接成功,订阅主题
- if success and 'topics' in data:
- mqtt_client.subscribe(data['topics'])
-
- status_code = 200 if success else 400
- error_code = ERROR_CODES['SUCCESS'] if success else ERROR_CODES['MQTT_CONNECTION_ERROR']
-
- response = {
- 'success': success,
- 'message': message,
- 'error_code': error_code
- }
-
- if success:
- logger.info(f"MQTT服务器连接成功: {host}:{port}")
- else:
- logger.error(f"MQTT服务器连接失败: {message}")
-
- return jsonify(response), status_code
- except Exception as e:
- error_msg = f'连接MQTT服务器时出错: {str(e)}'
- logger.exception(error_msg)
- return jsonify({
- 'success': False,
- 'message': error_msg,
- 'error_code': ERROR_CODES['UNKNOWN_ERROR']
- }), 500
- @app.route('/api/mqtt/disconnect', methods=['POST'])
- def mqtt_disconnect():
- """断开MQTT连接"""
- success, message = mqtt_client.disconnect()
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/status', methods=['GET'])
- def mqtt_get_status():
- """获取MQTT状态"""
- _st = mqtt_client.get_status()
- return jsonify({
- 'connected': _st.get('connected', False) if isinstance(_st, dict) else bool(_st)
- })
- @app.route('/api/mqtt/publish', methods=['POST'])
- def mqtt_publish():
- """发布MQTT消息"""
- data = request.json
- topic = data.get('topic')
- message = data.get('message')
-
- if not topic or not message:
- return jsonify({
- 'success': False,
- 'message': '主题和消息内容不能为空'
- })
-
- success, message = mqtt_client.publish(topic, message)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/subscribe', methods=['POST'])
- def mqtt_subscribe():
- """订阅MQTT主题"""
- data = request.json
- topics = data.get('topics', [])
- if not topics:
- return jsonify({
- 'success': False,
- 'message': '请至少订阅一个主题'
- })
- success, message = mqtt_client.subscribe(topics)
- return jsonify({
- 'success': success,
- 'message': message
- })
- @app.route('/api/mqtt/broadcast_discover', methods=['POST'])
- def mqtt_broadcast_discover():
- """发送DTU发现广播 (7.14)
- 服务器通过广播主题发送DISCOVER消息,
- 触发所有在线DTU进行注册响应
- """
- data = request.json or {}
- customer_id = data.get('customer_id', dtu_config.get('customer_id', 'default'))
- discover_topic = build_dtu_topic('broadcast', 'dtu', 'discover')
- payload = {
- 'msg_id': f"disc_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'type': 'DISCOVER',
- 'payload': {
- 'customer_id': customer_id,
- 'action': 'register'
- }
- }
- success, message = mqtt_client.publish(discover_topic, json.dumps(payload))
- logger.info(f"发送DTU发现广播: topic={discover_topic}, payload={payload}")
- return jsonify({
- 'success': success,
- 'message': 'DTU发现广播已发送' if success else message,
- 'topic': discover_topic
- })
- @app.route('/api/mqtt/broadcast_config', methods=['POST'])
- def mqtt_broadcast_config():
- """发送DTU批量配置广播 (7.15)
- 服务器通过广播主题发送CONFIG消息,
- 用于批量配置所有在线DTU
- """
- data = request.json or {}
- customer_id = data.get('customer_id', dtu_config.get('customer_id', 'default'))
- config_settings = data.get('config', {})
- config_topic = build_dtu_topic('broadcast', 'dtu', 'config')
- payload = {
- 'msg_id': f"cfg_{int(time.time() * 1000)}",
- 'timestamp': int(time.time() * 1000),
- 'type': 'CONFIG',
- 'payload': {
- 'customer_id': customer_id,
- 'config': config_settings
- }
- }
- success, message = mqtt_client.publish(config_topic, json.dumps(payload))
- logger.info(f"发送DTU批量配置广播: topic={config_topic}, payload={payload}")
- return jsonify({
- 'success': success,
- 'message': 'DTU批量配置广播已发送' if success else message,
- 'topic': config_topic,
- 'config': config_settings
- })
- @app.route('/api/data/serial', methods=['GET'])
- def get_serial_data():
- """获取串口数据"""
- return jsonify({
- 'data': serial_data_buffer
- })
- @app.route('/api/data/mqtt', methods=['GET'])
- def get_mqtt_data():
- """获取MQTT数据"""
- return jsonify({
- 'data': mqtt_data_buffer
- })
- @app.route('/api/forward/config', methods=['POST'])
- def set_forward_config():
- """设置转发配置"""
- global forward_serial_to_mqtt, forward_mqtt_to_serial, mqtt_publish_topic
-
- data = request.json
- forward_serial_to_mqtt = data.get('serial_to_mqtt', False)
- forward_mqtt_to_serial = data.get('mqtt_to_serial', False)
- if 'publish_topic' in data:
- mqtt_publish_topic = data['publish_topic']
-
- return jsonify({
- 'success': True,
- 'message': '转发配置已更新',
- 'config': {
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- }
- })
- @app.route('/api/forward/status', methods=['GET'])
- def get_forward_status():
- """获取转发状态"""
- return jsonify({
- 'serial_to_mqtt': forward_serial_to_mqtt,
- 'mqtt_to_serial': forward_mqtt_to_serial,
- 'publish_topic': mqtt_publish_topic
- })
- # 健康检查端点
- @app.route('/api/health', methods=['GET'])
- def health_check():
- """健康检查端点"""
- try:
- # 获取客户端连接数量
- client_counts = {
- 'data': len(connected_clients['data']),
- 'status': len(connected_clients['status']),
- 'control': len(connected_clients['control'])
- }
-
- # 获取缓冲区大小
- buffer_sizes = {
- 'serial': len(serial_data_buffer),
- 'mqtt': len(mqtt_data_buffer)
- }
-
- # 执行系统负载检查
- # 注意:这只是一个简化的负载检查,实际应用中可能需要更复杂的监控
- is_healthy = True
- load_warnings = []
-
- # 检查缓冲区是否过大
- if buffer_sizes['serial'] > MAX_BUFFER_SIZE * 0.8:
- is_healthy = False
- load_warnings.append(f"串口缓冲区接近最大容量: {buffer_sizes['serial']}/{MAX_BUFFER_SIZE}")
-
- if buffer_sizes['mqtt'] > MAX_BUFFER_SIZE * 0.8:
- is_healthy = False
- load_warnings.append(f"MQTT缓冲区接近最大容量: {buffer_sizes['mqtt']}/{MAX_BUFFER_SIZE}")
-
- # 检查WebSocket连接数是否过多
- total_clients = sum(client_counts.values())
- if total_clients > 100: # 设置合理的阈值
- is_healthy = False
- load_warnings.append(f"WebSocket连接数过多: {total_clients}")
-
- # 尝试获取网络状态信息(使用try-except包装,防止网络模块出错导致健康检查失败)
- network_info = None
- try:
- network_info = network_manager.get_network_status()
- except Exception as e:
- logger.warning(f"获取网络状态时出错: {str(e)}")
- # 不影响整体健康检查,只添加警告
- load_warnings.append(f"网络状态获取失败: {str(e)}")
-
- response = {
- 'status': 'healthy' if is_healthy else 'warning',
- 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
- 'services': {
- 'serial': serial_status,
- 'mqtt': mqtt_status,
- 'websocket': True
- },
- 'client_counts': client_counts,
- 'buffer_sizes': buffer_sizes,
- 'warnings': load_warnings
- }
-
- # 仅当获取到网络信息时添加
- if network_info:
- response['network'] = network_info
-
- return jsonify(response), 200
- except Exception as e:
- # 捕获所有异常,确保健康检查不会返回500错误
- logger.error(f"健康检查端点出错: {str(e)}")
- # 返回一个基础的健康状态,至少显示服务在运行
- return jsonify({
- 'status': 'error',
- 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
- 'error': str(e),
- 'services': {
- 'api': True, # API服务本身是运行的
- 'serial': None,
- 'mqtt': None,
- 'websocket': None
- }
- }), 200 # 仍然返回200,避免健康检查导致的连锁反应
- # 网络配置相关API
- @app.route('/api/network/config', methods=['GET'])
- def get_network_config():
- """获取当前网络配置"""
- try:
- config = network_manager.get_network_config()
- return jsonify(config)
- except Exception as e:
- logger.error(f'获取网络配置失败: {str(e)}')
- return jsonify({'success': False, 'message': f'获取网络配置失败: {str(e)}'}), 500
- @app.route('/api/network/config', methods=['POST'])
- def update_network_config():
- """更新网络配置"""
- try:
- config_data = request.json
- result = network_manager.update_network_config(config_data)
- if result['success']:
- return jsonify(result)
- else:
- return jsonify(result), 400
- except Exception as e:
- logger.error(f'更新网络配置失败: {str(e)}')
- return jsonify({'success': False, 'message': f'更新网络配置失败: {str(e)}'}), 500
- @app.route('/api/network/status', methods=['GET'])
- def get_network_status():
- """获取网络状态信息"""
- try:
- status = network_manager.get_network_status()
- return jsonify(status)
- except Exception as e:
- logger.error(f'获取网络状态失败: {str(e)}')
- return jsonify({'success': False, 'message': f'获取网络状态失败: {str(e)}'}), 500
- @app.route('/api/network/restart', methods=['POST'])
- def restart_network_service():
- """重启网络服务以应用新配置"""
- try:
- result = network_manager.restart_network_service()
- if result['success']:
- return jsonify(result)
- else:
- return jsonify(result), 500
- except Exception as e:
- logger.error(f'重启网络服务失败: {str(e)}')
- return jsonify({'success': False, 'message': f'重启网络服务失败: {str(e)}'}), 500
- # Modbus RTU API
- @app.route('/api/modbus/antenna_addresses', methods=['GET'])
- def get_antenna_addresses():
- """获取天线地址映射表"""
- return jsonify({
- 'success': True,
- 'antennas': {str(k): f"0x{v:04x}" for k, v in ANTENNA_ADDRESSES.items()}
- })
- @app.route('/api/modbus/read_antenna', methods=['POST'])
- def modbus_read_antenna():
- """读取指定天线的卡号
- 请求参数:
- {
- "device_address": 1, // 设备地址 (1-247)
- "antenna": 1, // 天线编号 (1-24)
- "timeout": 1.0 // 可选,超时时间(秒)
- }
- """
- try:
- data = request.json
- device_address = data.get('device_address', 1)
- antenna = data.get('antenna', 1)
- timeout = data.get('timeout')
- if antenna < 1 or antenna > 24:
- return jsonify({
- 'success': False,
- 'message': f'无效的天线编号: {antenna}, 必须是1-24'
- }), 400
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({
- 'success': False,
- 'message': '串口未连接'
- }), 400
- result = modbus_client.read_antenna_card(
- device_address=device_address,
- antenna_num=antenna,
- timeout=timeout
- )
- if 'error' in result:
- return jsonify({
- 'success': False,
- 'message': result['error'],
- 'raw_data': result.get('raw_data', '')
- }), 400
- return jsonify({
- 'success': True,
- 'data': result
- })
- except Exception as e:
- logger.error(f'读取天线数据失败: {str(e)}')
- return jsonify({
- 'success': False,
- 'message': str(e)
- }), 500
- @app.route('/api/modbus/read_registers', methods=['POST'])
- def modbus_read_registers():
- """读取保持寄存器
- 请求参数:
- {
- "device_address": 1,
- "start_address": 2, // 起始地址 (十六进制如0x0002或十进制如2)
- "quantity": 4, // 寄存器数量
- "timeout": 1.0
- }
- """
- try:
- data = request.json
- device_address = data.get('device_address', 1)
- start_address = data.get('start_address', 0)
- quantity = data.get('quantity', 1)
- timeout = data.get('timeout')
- # 支持十六进制字符串
- if isinstance(start_address, str):
- start_address = int(start_address, 16)
- if isinstance(device_address, str):
- device_address = int(device_address, 16)
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({
- 'success': False,
- 'message': '串口未连接'
- }), 400
- result = modbus_client.read_holding_registers(
- device_address=device_address,
- start_address=start_address,
- quantity=quantity,
- timeout=timeout
- )
- if 'error' in result:
- return jsonify({
- 'success': False,
- 'message': result['error'],
- 'raw_data': result.get('raw_data', '')
- }), 400
- return jsonify({
- 'success': True,
- 'data': result
- })
- except Exception as e:
- logger.error(f'读取寄存器失败: {str(e)}')
- return jsonify({
- 'success': False,
- 'message': str(e)
- }), 500
- @app.route('/api/modbus/write_register', methods=['POST'])
- def modbus_write_register():
- """写单个寄存器
- 请求参数:
- {
- "device_address": 1,
- "register_address": 1, // 寄存器地址
- "value": 256, // 写入的值
- "timeout": 1.0
- }
- """
- try:
- data = request.json
- device_address = data.get('device_address', 1)
- register_address = data.get('register_address', 1)
- value = data.get('value', 0)
- timeout = data.get('timeout')
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({
- 'success': False,
- 'message':'串口未连接'
- }), 400
- result = modbus_client.write_single_register(
- device_address=device_address,
- register_address=register_address,
- value=value,
- timeout=timeout
- )
- if 'error' in result:
- return jsonify({
- 'success': False,
- 'message': result['error'],
- 'raw_data': result.get('raw_data', '')
- }), 400
- return jsonify({
- 'success': True,
- 'data': result
- })
- except Exception as e:
- logger.error(f'写寄存器失败: {str(e)}')
- return jsonify({
- 'success': False,
- 'message': str(e)
- }), 500
- @app.route('/api/modbus/set_rgb_led', methods=['POST'])
- def modbus_set_rgb_led():
- """设置RGB灯状态
- 请求参数:
- {
- "device_address": 1,
- "led_number": 1, // 灯编号 (1-24)
- "color": 1, // 颜色: 0=灭, 1=红灯, 2=绿灯, 3=蓝灯
- "timeout": 1.0
- }
- """
- try:
- data = request.json
- device_address = data.get('device_address', 1)
- led_number = data.get('led_number', 1)
- color = data.get('color', 0)
- timeout = data.get('timeout')
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({
- 'success': False,
- 'message': '串口未连接'
- }), 400
- result = modbus_client.set_rgb_led(
- device_address=device_address,
- led_number=led_number,
- color=color,
- timeout=timeout
- )
- if 'error' in result:
- return jsonify({
- 'success': False,
- 'message': result['error'],
- 'raw_data': result.get('raw_data', '')
- }), 400
- # 更新 LED 状态追踪
- led_states[led_number] = color
- return jsonify({
- 'success': True,
- 'data': result
- })
- except Exception as e:
- logger.error(f'设置RGB灯失败: {str(e)}')
- return jsonify({
- 'success': False,
- 'message': str(e)
- }), 500
- @app.route('/api/modbus/scan', methods=['POST'])
- def modbus_scan_devices():
- """扫描在线设备
- 请求参数:
- {
- "max_address": 247, // 最大设备地址
- "timeout": 0.2 // 单个设备超时时间
- }
- """
- try:
- data = request.json or {}
- max_address = data.get('max_address', 247)
- timeout = data.get('timeout', 0.2)
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({
- 'success': False,
- 'message': '串口未连接'
- }), 400
- # 异步扫描可能更好,但这里先同步实现
- result = modbus_client.scan_devices(max_address)
- return jsonify({
- 'success': True,
- 'devices': result
- })
- except Exception as e:
- logger.error(f'扫描设备失败: {str(e)}')
- return jsonify({
- 'success': False,
- 'message': str(e)
- }), 500
- # ========== 地址配置协议 API ==========
- @app.route('/api/modbus/broadcast_query', methods=['POST'])
- def modbus_broadcast_query():
- """发送广播查询指令"""
- try:
- data = request.json or {}
- timeout = data.get('timeout')
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({'success': False, 'message': '串口未连接'}), 400
- responses = address_config.broadcast_query(timeout)
- for r in responses:
- uid = r.get('uid', '').lower()
- if uid and uid not in address_config.get_stored_devices():
- addr = len(address_config.get_stored_devices()) + 1
- address_config.add_stored_device(uid, addr)
- logger.info(f"自动保存发现设备: UID={uid}, 地址={addr}")
- if responses:
- save_device_config()
- return jsonify({'success': True, 'responses': responses, 'count': len(responses)})
- except Exception as e:
- logger.error(f'广播查询失败: {str(e)}')
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/modbus/auto_configure', methods=['POST'])
- def modbus_auto_configure():
- """自动配置设备地址"""
- try:
- data = request.json or {}
- timeout = data.get('timeout')
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({'success': False, 'message': '串口未连接'}), 400
- result = address_config.auto_configure(timeout)
- if result.get('success') and (result.get('discovered', 0) > 0 or result.get('confirmed', 0) > 0 or result.get('assigned', 0) > 0):
- save_device_config()
- return jsonify(result)
- except Exception as e:
- logger.error(f'自动配置失败: {str(e)}')
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/modbus/stored_devices', methods=['GET'])
- def get_stored_devices():
- """获取已存储的设备列表"""
- return jsonify({'success': True, 'devices': address_config.get_stored_devices()})
- @app.route('/api/modbus/stored_devices', methods=['POST'])
- def add_stored_device():
- """添加已存储的设备"""
- try:
- data = request.json
- uid = data.get('uid', '').lower()
- address = data.get('address', 1)
- if len(uid) != 24:
- return jsonify({'success': False, 'message': 'UID长度必须是24个十六进制字符'}), 400
- address_config.add_stored_device(uid, address)
- save_device_config()
- return jsonify({'success': True, 'message': f'已添加设备: UID={uid}, 地址={address}'})
- except Exception as e:
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/modbus/load_config', methods=['POST'])
- def modbus_load_config():
- """从文件加载设备配置"""
- try:
- data = request.json
- filepath = data.get('filepath', '/tmp/modbus_devices.json')
- success = address_config.load_config(filepath)
- return jsonify({'success': success, 'devices': address_config.get_stored_devices()})
- except Exception as e:
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/modbus/save_config', methods=['POST'])
- def modbus_save_config():
- """保存设备配置到文件"""
- try:
- data = request.json
- filepath = data.get('filepath', '/tmp/modbus_devices.json')
- success = address_config.save_config(filepath)
- return jsonify({'success': success, 'message': f'配置已保存到: {filepath}' if success else '保存失败'})
- except Exception as e:
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/modbus/confirm_devices', methods=['POST'])
- def confirm_devices():
- """手动触发对所有已存储设备的确认"""
- try:
- devices = address_config.get_stored_devices()
- if not devices:
- return jsonify({'success': False, 'message': '没有已存储的设备'}), 400
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({'success': False, 'message': '串口未连接'}), 400
- results = []
- for uid_hex, addr in devices.items():
- try:
- uid_bytes = bytes.fromhex(uid_hex)
- cmd = build_confirm_address(addr, uid_bytes)
- success, msg = serial_client.send_raw(cmd)
- logger.info(f"确认设备 地址={addr}: {'成功' if success else '失败 ' + msg}")
- results.append({'address': addr, 'uid': uid_hex, 'success': success, 'message': msg})
- except Exception as e:
- results.append({'address': addr, 'uid': uid_hex, 'success': False, 'message': str(e)})
- time.sleep(0.1)
- return jsonify({'success': True, 'results': results, 'count': len(results)})
- except Exception as e:
- return jsonify({'success': False, 'message': str(e)}), 500
- # 端口状态追踪 - 用于存储每个端口的事件状态
- port_event_history = [] # 存储最近的端口事件
- @app.route('/api/port/status', methods=['GET'])
- def get_port_status():
- """获取所有端口状态"""
- return jsonify({'success': True, 'port_state': port_state})
- @app.route('/api/port/events', methods=['GET'])
- def get_port_events():
- """获取端口事件历史"""
- limit = request.args.get('limit', 100, type=int)
- return jsonify({
- 'success': True,
- 'events': port_event_history[-limit:]
- })
- @app.route('/api/port/clear_events', methods=['POST'])
- def clear_port_events():
- """清除端口事件历史"""
- global port_event_history
- port_event_history = []
- return jsonify({'success': True, 'message': '事件历史已清除'})
- # 设备最后响应时间跟踪
- device_last_seen = {}
- @app.route('/api/panel/status', methods=['GET'])
- def get_panel_status():
- """获取所有面板状态"""
- now = time.time()
- PANEL_OFFLINE_TIMEOUT = 60
- panel_status = {}
- for panel_id, ports in port_state.items():
- port_count = len(ports)
- alarm_count = sum(1 for p in ports.values() if p.get('alarm_count', 0) > 0)
- connected_count = sum(1 for p in ports.values() if p.get('last_uid'))
- panel_status[panel_id] = {
- 'panel_id': panel_id,
- 'port_count': port_count,
- 'connected_count': connected_count,
- 'alarm_count': alarm_count,
- 'status': 'online' if connected_count > 0 else 'offline'
- }
- for panel_id, cfg in panel_config.items():
- last_seen = device_last_seen.get(panel_id, 0)
- is_online = (now - last_seen) < PANEL_OFFLINE_TIMEOUT
- if panel_id in panel_status:
- panel_status[panel_id].update({
- 'address': cfg.get('address'),
- 'position': cfg.get('position'),
- 'panel_uid': cfg.get('panel_uid'),
- 'status': 'online' if is_online else 'offline'
- })
- else:
- panel_status[panel_id] = {
- 'panel_id': panel_id,
- 'address': cfg.get('address'),
- 'position': cfg.get('position'),
- 'panel_uid': cfg.get('panel_uid'),
- 'port_count': 0,
- 'connected_count': 0,
- 'alarm_count': 0,
- 'status': 'online' if is_online else 'offline'
- }
- return jsonify({'success': True, 'panels': panel_status})
- # LED 状态追踪 (内存中跟踪每个灯的最后设置状态)
- led_states = {i: 0 for i in range(1, 25)} # 0=off, 1=red, 2=green, 3=blue
- @app.route('/api/modbus/led_status', methods=['GET'])
- def get_led_status():
- """获取所有 LED 状态"""
- return jsonify({'success': True, 'leds': led_states})
- @app.route('/api/modbus/set_all_leds', methods=['POST'])
- def set_all_leds():
- """批量设置 LED
- 请求: {"device_address": 1, "color": 1}
- 设置所有 24 个 LED 到指定颜色
- """
- try:
- data = request.json
- device_address = data.get('device_address', 1)
- color = data.get('color', 0)
- timeout = data.get('timeout')
- if not (isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False)):
- return jsonify({'success': False, 'message': '串口未连接'}), 400
- results = []
- for led_num in range(1, 25):
- result = modbus_client.set_rgb_led(
- device_address=device_address,
- led_number=led_num,
- color=color,
- timeout=timeout
- )
- if 'error' in result:
- results.append({'led': led_num, 'success': False, 'error': result['error']})
- else:
- results.append({'led': led_num, 'success': True})
- led_states[led_num] = color
- return jsonify({'success': True, 'results': results})
- except Exception as e:
- logger.error(f'批量设置LED失败: {str(e)}')
- return jsonify({'success': False, 'message': str(e)}), 500
- # 在 set_rgb_led 中更新 LED 状态追踪
- # 不再需要静态文件目录,前端由nginx提供服务
- # ========== DTU MQTT协议配置API ==========
- @app.route('/api/dtu/config', methods=['GET'])
- def get_dtu_config():
- """获取DTU配置"""
- return jsonify({
- 'success': True,
- 'data': dtu_config
- })
- @app.route('/api/dtu/config', methods=['POST'])
- def update_dtu_config():
- """更新DTU配置"""
- try:
- data = request.json
- old_config = dtu_config.copy()
- # 更新配置项
- if 'topic_prefix' in data:
- dtu_config['topic_prefix'] = data['topic_prefix']
- if 'customer_id' in data:
- dtu_config['customer_id'] = data['customer_id']
- if 'dtu_id' in data:
- dtu_config['dtu_id'] = data['dtu_id']
- if 'firmware_version' in data:
- dtu_config['firmware_version'] = data['firmware_version']
- if 'hardware_version' in data:
- dtu_config['hardware_version'] = data['hardware_version']
- if 'heartbeat_interval' in data:
- dtu_config['heartbeat_interval'] = data['heartbeat_interval']
- if 'enabled' in data:
- dtu_config['enabled'] = data['enabled']
- # 如果MQTT已连接且主题配置发生变化,重新订阅
- if mqtt_status and dtu_config.get('enabled'):
- old_control_topic = build_dtu_topic(
- old_config.get('customer_id', DEFAULT_CUSTOMER_ID),
- 'dtu',
- old_config.get('dtu_id', DEFAULT_DTU_ID),
- 'control'
- )
- new_control_topic = build_dtu_topic(
- dtu_config['customer_id'],
- 'dtu',
- dtu_config['dtu_id'],
- 'control'
- )
- if old_control_topic != new_control_topic:
- # 取消旧订阅,订阅新主题
- mqtt_client.unsubscribe(old_control_topic)
- mqtt_client.subscribe(new_control_topic)
- logger.info(f"控制主题已更新: {old_control_topic} -> {new_control_topic}")
- # 重新发送注册消息
- dtu_register()
- return jsonify({
- 'success': True,
- 'message': 'DTU配置已更新',
- 'data': dtu_config
- })
- except Exception as e:
- logger.error(f"更新DTU配置失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/dtu/register', methods=['POST'])
- def manual_dtu_register():
- """手动触发DTU注册"""
- try:
- if not mqtt_status:
- return jsonify({'success': False, 'message': 'MQTT未连接'}), 400
- success = dtu_register()
- if success:
- return jsonify({'success': True, 'message': '注册消息已发送'})
- else:
- return jsonify({'success': False, 'message': '注册消息发送失败'}), 500
- except Exception as e:
- logger.error(f"手动触发DTU注册失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/dtu/status', methods=['GET'])
- def get_dtu_status():
- """获取DTU状态"""
- try:
- # 获取串口状态
- serial_st = serial_client.get_status()
- # 获取面板状态
- devices = address_config.get_stored_devices()
- status = {
- 'dtu_id': dtu_config.get('dtu_id'),
- 'mqtt_connected': mqtt_status,
- 'serial_connected': isinstance(serial_st, dict) and serial_st.get('connected', False),
- 'dtu_enabled': dtu_config.get('enabled', True),
- 'topic_prefix': dtu_config.get('topic_prefix'),
- 'customer_id': dtu_config.get('customer_id'),
- 'panel_count': len(devices),
- 'topics': {
- 'register': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'register'),
- 'status': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'status'),
- 'control': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'control'),
- 'response': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'response'),
- 'event': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'event'),
- 'alarm': build_dtu_topic(dtu_config['customer_id'], 'dtu', dtu_config['dtu_id'], 'alarm')
- }
- }
- return jsonify({'success': True, 'data': status})
- except Exception as e:
- logger.error(f"获取DTU状态失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/dtu/control', methods=['POST'])
- def dtu_control():
- """发送DTU控制命令"""
- try:
- data = request.json
- command = data.get('command')
- if command == 'REBOOT':
- import subprocess
- logger.warning("执行系统重启命令")
- threading.Thread(target=lambda: (
- time.sleep(1),
- subprocess.run(['reboot'], capture_output=True)
- ), daemon=True).start()
- return jsonify({'success': True, 'message': '系统正在重启...'})
- return jsonify({'success': False, 'message': f'未知命令: {command}'}), 400
- except Exception as e:
- logger.error(f"DTU控制命令失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- # OTA状态存储
- ota_status = {
- 'status': 'IDLE', # IDLE, DOWNLOADING, VERIFYING, FLASHING, SUCCESS, FAILED
- 'progress': 0,
- 'firmware_version': None,
- 'target_version': None,
- 'error_code': None,
- 'error_message': None,
- 'last_update': None
- }
- @app.route('/api/dtu/ota_detect', methods=['POST'])
- def ota_detect():
- """检测固件包信息(自动下载并解析 manifest)"""
- try:
- url = request.json.get('url', '')
- if not url:
- return jsonify({'success': False, 'message': '请提供固件 URL'}), 400
- import urllib.request, tempfile, tarfile, json, hashlib
- tmp = tempfile.mktemp(suffix='.tar.gz')
- try:
- urllib.request.urlretrieve(url, tmp)
- except Exception as e:
- return jsonify({'success': False, 'message': f'下载失败: {str(e)}'}), 400
- file_size = os.path.getsize(tmp)
- h = hashlib.md5()
- with open(tmp, 'rb') as f:
- for chunk in iter(lambda: f.read(65536), b''):
- h.update(chunk)
- md5sum = h.hexdigest()
- version = ''
- try:
- with tarfile.open(tmp, 'r:gz') as tar:
- m = tar.extractfile('firmware/firmware.json')
- if m:
- manifest = json.loads(m.read())
- version = manifest.get('version', '')
- except Exception as e:
- logger.warning(f"解析 firmware.json 失败: {e}")
- os.remove(tmp)
- return jsonify({
- 'success': True,
- 'data': {
- 'file_size': file_size,
- 'checksum': md5sum,
- 'checksum_type': 'MD5',
- 'firmware_version': version
- }
- })
- except Exception as e:
- logger.error(f"检测固件失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- @app.route('/api/dtu/ota_status', methods=['GET'])
- def get_ota_status():
- """获取OTA升级状态"""
- try:
- return jsonify({
- 'success': True,
- 'data': {
- 'current_firmware': dtu_config.get('firmware_version', 'v1.0.0'),
- 'ota_status': ota_status.get('status', 'IDLE'),
- 'ota_progress': ota_status.get('progress', 0),
- 'target_version': ota_status.get('target_version'),
- 'error_code': ota_status.get('error_code'),
- 'error_message': ota_status.get('error_message'),
- 'last_update': ota_status.get('last_update')
- }
- })
- except Exception as e:
- logger.error(f"获取OTA状态失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- def handle_ota_status(dtu_id, payload):
- """处理MQTT上报的OTA状态"""
- ota_status['status'] = payload.get('ota_status', 'IDLE')
- ota_status['progress'] = payload.get('ota_progress', 0)
- ota_status['firmware_version'] = payload.get('firmware_version')
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- error_code = payload.get('error_code')
- if error_code and error_code != 0:
- ota_status['error_code'] = error_code
- ota_status['error_message'] = payload.get('error_message', get_ota_error_message(error_code))
- ota_status['status'] = 'FAILED'
- elif ota_status['status'] == 'SUCCESS':
- ota_status['error_code'] = None
- ota_status['error_message'] = None
- if ota_status.get('firmware_version'):
- dtu_config['firmware_version'] = ota_status['firmware_version']
- save_dtu_config()
- logger.info(f"MQTT OTA状态更新: {ota_status['status']}, 进度: {ota_status['progress']}%")
- def get_ota_error_message(error_code):
- error_messages = {1021: '已是目标版本', 1022: '校验失败', 1023: '下载失败', 1024: '写入失败', 1025: '存储空间不足'}
- return error_messages.get(error_code, f'未知错误码: {error_code}')
- def _run_ota(params):
- """执行OTA升级(后台线程,供HTTP和MQTT共用)"""
- import subprocess, os, hashlib, shutil, tarfile, tempfile, urllib.request
- url = params['firmware_url']
- version = params['firmware_version']
- file_size = params['file_size']
- checksum = params['checksum']
- checksum_type = params.get('checksum_type', 'MD5').upper()
- force = params.get('force_upgrade', False)
- ota_status['status'] = 'DOWNLOADING'
- ota_status['progress'] = 0
- ota_status['target_version'] = version
- ota_status['error_code'] = None
- ota_status['error_message'] = None
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- try:
- tmp_dir = tempfile.mkdtemp(prefix='ota_')
- fw_path = os.path.join(tmp_dir, 'firmware.tar.gz')
- ota_status['progress'] = 5
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- logger.info(f"OTA: 开始下载固件 {url}")
- req = urllib.request.Request(url, headers={'User-Agent': 'OTA-Updater'})
- with urllib.request.urlopen(req, timeout=120) as resp:
- with open(fw_path, 'wb') as f:
- total = int(resp.headers.get('Content-Length', 0))
- downloaded = 0
- while True:
- chunk = resp.read(65536)
- if not chunk: break
- f.write(chunk)
- downloaded += len(chunk)
- if total:
- pct = 5 + int(downloaded / total * 30)
- ota_status['progress'] = min(pct, 35)
- dl_size = os.path.getsize(fw_path)
- ota_status['progress'] = 40
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- if abs(dl_size - file_size) > 1024:
- raise Exception(f"文件大小不匹配: 预期{file_size}, 实际{dl_size}")
- ota_status['status'] = 'VERIFYING'
- ota_status['progress'] = 50
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- h = hashlib.new(checksum_type)
- with open(fw_path, 'rb') as f:
- for chunk in iter(lambda: f.read(65536), b''): h.update(chunk)
- actual_checksum = h.hexdigest().lower()
- if actual_checksum != checksum.lower():
- raise Exception(f"校验和不匹配: 预期{checksum}, 实际{actual_checksum}")
- ota_status['progress'] = 60
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- extract_dir = os.path.join(tmp_dir, 'firmware')
- os.makedirs(extract_dir, exist_ok=True)
- with tarfile.open(fw_path, 'r:gz') as tar: tar.extractall(extract_dir)
- manifest_path = os.path.join(extract_dir, 'firmware', 'firmware.json')
- if not os.path.exists(manifest_path):
- raise Exception("固件包缺少 firmware.json")
- with open(manifest_path, 'r') as f: manifest = json.load(f)
- fw_version = manifest.get('version', version)
- if not force:
- current_ver = dtu_config.get('firmware_version', 'v0.0.0')
- if fw_version == current_ver:
- raise Exception(f'已是目标版本 {current_ver}')
- ota_status['status'] = 'FLASHING'
- ota_status['progress'] = 70
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- fw_root = os.path.join(extract_dir, 'firmware')
- project_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- excludes = {'__pycache__', '.git', 'log', 'venv'}
- for root, dirs, files in os.walk(fw_root):
- rel = os.path.relpath(root, fw_root)
- if rel == '.': rel = ''
- parts = rel.split(os.sep) if rel else []
- if parts and parts[0] in excludes: continue
- for fname in files:
- if fname == 'firmware.json': continue
- src = os.path.join(root, fname)
- dst = os.path.join(project_dir, rel, fname)
- os.makedirs(os.path.dirname(dst), exist_ok=True)
- shutil.copy2(src, dst)
- ota_status['progress'] = 90
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- dtu_config['firmware_version'] = fw_version
- save_dtu_config()
- ota_status['status'] = 'SUCCESS'
- ota_status['progress'] = 100
- ota_status['firmware_version'] = fw_version
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- logger.info(f"OTA: 升级成功 {fw_version}")
- shutil.rmtree(tmp_dir, ignore_errors=True)
- for i in range(10, 0, -1):
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- time.sleep(1)
- logger.info("OTA: 重启服务...")
- subprocess.Popen(
- [subprocess.sys.executable, '-m', 'flask', 'run', '--host=0.0.0.0', '--port=5001'],
- cwd=os.path.dirname(os.path.abspath(__file__)),
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
- )
- os._exit(0)
- except Exception as e:
- ota_status['status'] = 'FAILED'
- ota_status['error_message'] = str(e)
- ota_status['last_update'] = time.strftime('%Y-%m-%d %H:%M:%S')
- logger.error(f"OTA: 升级失败 - {str(e)}")
- if 'tmp_dir' in dir() and tmp_dir and os.path.exists(tmp_dir):
- shutil.rmtree(tmp_dir, ignore_errors=True)
- @app.route('/api/dtu/ota_upgrade', methods=['POST'])
- def trigger_ota_upgrade():
- """触发OTA升级"""
- try:
- data = request.json
- if not data:
- return jsonify({'success': False, 'message': '请求体不能为空'}), 400
- required_fields = ['firmware_url', 'firmware_version', 'file_size', 'checksum', 'checksum_type']
- missing = [f for f in required_fields if not data.get(f)]
- if missing:
- return jsonify({'success': False, 'message': f'缺少必填字段: {", ".join(missing)}'}), 400
- t = threading.Thread(target=_run_ota, args=(data,), daemon=True)
- t.start()
- return jsonify({
- 'success': True,
- 'message': 'OTA升级已启动',
- 'data': {
- 'target_version': data['firmware_version'],
- 'ota_status': 'DOWNLOADING'
- }
- })
- except Exception as e:
- logger.error(f"触发OTA升级失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- # ==================== 环境传感器 API ====================
- # 环境传感器数据存储
- env_sensor_data = {
- 'temperature': None, # 环境温度 (℃)
- 'humidity': None, # 环境湿度 (%)
- 'dtu_temperature': None, # DTU主板温度 (℃)
- 'update_time': None, # 数据更新时间
- 'sensor_update_time': None, # 传感器更新时间
- 'connected': False # 传感器连接状态
- }
- # 环境传感器告警阈值
- env_sensor_threshold = {
- 'temp_high': 45.0, # 环境温度上限 (℃)
- 'temp_low': -10.0, # 环境温度下限 (℃)
- 'humidity_high': 80.0, # 环境湿度上限 (%)
- 'humidity_low': 20.0, # 环境湿度下限 (%)
- 'dtu_temp_high': 70.0 # DTU主板温度上限 (℃)
- }
- # 环境传感器历史数据 (保留最近1000条)
- env_sensor_history = []
- @app.route('/api/sensor/env', methods=['GET'])
- def get_env_sensor_data():
- """获取环境传感器当前数据"""
- return jsonify({
- 'success': True,
- 'data': env_sensor_data
- })
- @app.route('/api/sensor/threshold', methods=['GET', 'POST'])
- def handle_env_sensor_threshold():
- """获取或设置环境传感器告警阈值"""
- if request.method == 'GET':
- return jsonify({
- 'success': True,
- 'data': env_sensor_threshold
- })
- else:
- try:
- data = request.get_json()
- for key in env_sensor_threshold.keys():
- if key in data:
- env_sensor_threshold[key] = float(data[key])
- logger.info(f"更新环境传感器告警阈值: {env_sensor_threshold}")
- return jsonify({
- 'success': True,
- 'message': '阈值设置成功',
- 'data': env_sensor_threshold
- })
- except Exception as e:
- logger.error(f"设置告警阈值失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 400
- @app.route('/api/sensor/history', methods=['GET'])
- def get_env_sensor_history():
- """获取环境传感器历史数据"""
- try:
- import datetime
- # 获取查询参数
- range_param = request.args.get('range', '24h')
- start_time = request.args.get('start_time')
- end_time = request.args.get('end_time')
- limit = request.args.get('limit', 100, type=int)
- filtered_history = env_sensor_history
- # 解析range参数
- if not start_time and not end_time:
- now = datetime.datetime.now()
- if range_param == '1h':
- start_time = (now - datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
- elif range_param == '6h':
- start_time = (now - datetime.timedelta(hours=6)).strftime('%Y-%m-%d %H:%M:%S')
- elif range_param == '24h':
- start_time = (now - datetime.timedelta(hours=24)).strftime('%Y-%m-%d %H:%M:%S')
- elif range_param == '7d':
- start_time = (now - datetime.timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S')
- elif range_param == '30d':
- start_time = (now - datetime.timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
- # 按时间过滤
- if start_time:
- filtered_history = [h for h in filtered_history if h.get('update_time') >= start_time]
- if end_time:
- filtered_history = [h for h in filtered_history if h.get('update_time') <= end_time]
- # 按时间排序 (新到旧)
- filtered_history = sorted(filtered_history, key=lambda x: x.get('update_time', ''), reverse=True)
- # 限制返回数量
- filtered_history = filtered_history[:limit]
- return jsonify({
- 'success': True,
- 'data': filtered_history,
- 'total': len(filtered_history)
- })
- except Exception as e:
- logger.error(f"获取历史数据失败: {str(e)}")
- return jsonify({'success': False, 'message': str(e)}), 500
- def update_env_sensor_data(temperature, humidity, dtu_temperature, sensor_update_time):
- """更新环境传感器数据 (由MQTT消息触发)"""
- import datetime
- now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- env_sensor_data['temperature'] = temperature
- env_sensor_data['humidity'] = humidity
- env_sensor_data['dtu_temperature'] = dtu_temperature
- env_sensor_data['sensor_update_time'] = sensor_update_time
- env_sensor_data['update_time'] = now
- env_sensor_data['connected'] = True
- # 添加到历史记录
- history_record = {
- 'temperature': temperature,
- 'humidity': humidity,
- 'dtu_temperature': dtu_temperature,
- 'sensor_update_time': sensor_update_time,
- 'update_time': now
- }
- env_sensor_history.append(history_record)
- # 保留最近1000条
- if len(env_sensor_history) > 1000:
- env_sensor_history[:] = env_sensor_history[-1000:]
- # 检查告警
- check_env_sensor_alarms(temperature, humidity, dtu_temperature)
- logger.info(f"环境传感器数据更新: 温度={temperature}℃, 湿度={humidity}%, DTU温度={dtu_temperature}℃")
- def check_env_sensor_alarms(temperature, humidity, dtu_temperature):
- """检查环境传感器告警"""
- alarms = []
- if temperature is not None:
- if temperature > env_sensor_threshold['temp_high']:
- alarms.append({
- 'type': 'temperature_high',
- 'message': f'环境温度过高: {temperature}℃ (阈值: {env_sensor_threshold["temp_high"]}℃)',
- 'level': 'warning'
- })
- elif temperature < env_sensor_threshold['temp_low']:
- alarms.append({
- 'type': 'temperature_low',
- 'message': f'环境温度过低: {temperature}℃ (阈值: {env_sensor_threshold["temp_low"]}℃)',
- 'level': 'warning'
- })
- if humidity is not None:
- if humidity > env_sensor_threshold['humidity_high']:
- alarms.append({
- 'type': 'humidity_high',
- 'message': f'环境湿度过高: {humidity}% (阈值: {env_sensor_threshold["humidity_high"]}%)',
- 'level': 'warning'
- })
- elif humidity < env_sensor_threshold['humidity_low']:
- alarms.append({
- 'type': 'humidity_low',
- 'message': f'环境湿度过低: {humidity}% (阈值: {env_sensor_threshold["humidity_low"]}%)',
- 'level': 'warning'
- })
- if dtu_temperature is not None:
- if dtu_temperature > env_sensor_threshold['dtu_temp_high']:
- alarms.append({
- 'type': 'dtu_temp_high',
- 'message': f'DTU主板温度过高: {dtu_temperature}℃ (阈值: {env_sensor_threshold["dtu_temp_high"]}℃)',
- 'level': 'critical'
- })
- # 发送告警通知
- for alarm in alarms:
- logger.warning(f"环境传感器告警: {alarm['message']}")
- # 可以通过WebSocket发送告警
- socketio.emit('env_sensor_alarm', alarm)
- if __name__ == '__main__':
- try:
- # 启动前的初始化工作
- logger.info('启动串口-MQTT网关服务...')
- logger.info(f"配置信息: 主机={FLASK_HOST}, 端口={FLASK_PORT}, 调试模式={FLASK_DEBUG}")
- # 加载DTU配置
- load_dtu_config()
- # 加载设备配置
- loaded = load_device_config()
- logger.info(f"已加载 {len(loaded)} 个设备配置")
- # 启动自动发现循环
- def auto_discover_loop():
- while True:
- time.sleep(30)
- try:
- _st = serial_client.get_status()
- if not (isinstance(_st, dict) and _st.get('connected', False)):
- continue
- if not dtu_config.get('enabled'):
- continue
- result = address_config.auto_configure(timeout=2.0)
- if result.get('discovered', 0) > 0:
- logger.info(f"自动发现: {result.get('discovered')} 个设备")
- save_device_config()
- now = time.time()
- for uid in address_config.get_stored_devices():
- device_last_seen[uid] = now
- except Exception as e:
- logger.error(f"自动发现异常: {str(e)}")
- import threading
- t = threading.Thread(target=auto_discover_loop, daemon=True)
- t.start()
- logger.info("启动自动发现线程 (间隔30秒)")
- # 自动连接上次使用的串口
- logger.info("尝试自动连接串口...")
- auto_connect_serial()
- # 启动服务
- socketio.run(
- app,
- host=FLASK_HOST,
- port=FLASK_PORT,
- debug=FLASK_DEBUG,
- use_reloader=False, # 禁用重载器以避免重复初始化问题
- log_output=False, # 禁用Flask的日志输出,使用我们自己的日志配置
- allow_unsafe_werkzeug=True
- )
- except KeyboardInterrupt:
- # 优雅退出
- logger.info('正在关闭应用...')
- try:
- if isinstance(serial_client.get_status(), dict) and serial_client.get_status().get("connected", False):
- serial_client.disconnect()
- logger.info('串口连接已断开')
- if mqtt_client.get_status():
- mqtt_client.disconnect()
- logger.info('MQTT连接已断开')
- except Exception as e:
- logger.error(f'关闭连接时出错: {str(e)}')
-
- # 清理WebSocket连接
- for client_type in connected_clients:
- connected_clients[client_type].clear()
-
- logger.info('应用已安全关闭')
- except Exception as e:
- logger.exception(f'应用启动失败') # 使用exception记录完整堆栈
- # 确保资源被释放
- try:
- serial_client.disconnect()
- mqtt_client.disconnect()
- except:
- pass
|