| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767 |
- """
- 施工现场安全行为智能识别系统 - 主程序
- 系统功能:
- 1. 实时视频监控
- 2. 人员、安全帽、反光衣检测
- 3. 安全违规识别(未戴安全帽、未穿反光衣)
- 4. 事件推送至业务平台
- 5. 接收平台指令,TTS 语音播报
- """
- import os
- import sys
- import time
- import argparse
- import logging
- import threading
- import signal
- from typing import Optional, List
- import cv2
- import numpy as np
- # 添加项目路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from config import (
- LOG_CONFIG, PANORAMA_CAMERA, PTZ_CAMERA, SDK_PATH,
- SAFETY_DETECTION_CONFIG, EVENT_PUSHER_CONFIG,
- VOICE_ANNOUNCER_CONFIG, SYSTEM_CONFIG,
- LLM_CONFIG, LLM_SAFETY_CONFIG
- )
- from safety_detector import (
- SafetyDetector, SafetyDetection, PersonSafetyStatus,
- draw_safety_result, SafetyViolationType, LLMSafetyDetector
- )
- from llm_service import SafetyAnalyzer, NumberRecognizer
- from event_pusher import EventPusher, SafetyEvent, EventType
- from voice_announcer import VoiceAnnouncer, VoicePriority
- from safety_coordinator import SafetyCoordinator, SimpleCamera
- # 配置日志
- def setup_logging():
- """设置日志配置"""
- log_level = getattr(logging, LOG_CONFIG.get('level', 'INFO'), logging.INFO)
- log_format = LOG_CONFIG.get('format', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- log_file = LOG_CONFIG.get('file')
-
- handlers = [logging.StreamHandler()]
-
- if log_file:
- from logging.handlers import RotatingFileHandler
- file_handler = RotatingFileHandler(
- log_file,
- maxBytes=LOG_CONFIG.get('max_bytes', 10*1024*1024),
- backupCount=LOG_CONFIG.get('backup_count', 5)
- )
- file_handler.setFormatter(logging.Formatter(log_format))
- handlers.append(file_handler)
-
- logging.basicConfig(
- level=log_level,
- format=log_format,
- handlers=handlers
- )
- setup_logging()
- logger = logging.getLogger(__name__)
- class SafetyMonitorSystem:
- """
- 施工现场安全监控系统
- """
-
- def __init__(self, config: dict = None):
- """
- 初始化系统
-
- Args:
- config: 配置覆盖
- """
- self.config = config or {}
-
- # 摄像头
- self.camera = None
- self.camera_source = None
-
- # PTZ球机(安全模式可选联动)
- self.ptz_camera = None
- self.calibrator = None
- self.sdk = None
-
- # 双路流管理器(用于PTZ模式下的双流并行)
- self.stream_manager = None
-
- # 组件
- self.detector = None # 安全检测器 (支持 LLM)
- self.llm_analyzer = None # 大模型安全分析器
- self.number_recognizer = None # 编号识别器
- self.event_pusher = None # 事件推送器
- self.voice_announcer = None # 语音播报器
-
- # 功能开关 - 从 SYSTEM_CONFIG 读取
- self.enable_panorama_camera = SYSTEM_CONFIG.get('enable_panorama_camera', True)
- self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
- self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
- self.enable_safety_detection = SYSTEM_CONFIG.get('enable_safety_detection', True)
- self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
- self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
- self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
- self.enable_llm = SYSTEM_CONFIG.get('enable_llm', True)
- self.enable_event_push = SYSTEM_CONFIG.get('enable_event_push', True)
- self.enable_voice_announce = SYSTEM_CONFIG.get('enable_voice_announce', True)
-
- # 状态
- self.running = False
- self.display = True # 是否显示画面
-
- # 帧处理
- self.current_frame = None
- self.frame_lock = threading.Lock()
-
- # 统计
- self.stats = {
- 'frames_processed': 0,
- 'persons_detected': 0,
- 'violations_detected': 0,
- 'events_pushed': 0,
- 'voice_announced': 0,
- 'start_time': None
- }
- self.stats_lock = threading.Lock()
-
- # 工作线程
- self.detection_thread = None
-
- def initialize(self, camera_source=0) -> bool:
- """
- 初始化系统组件
-
- Args:
- camera_source: 摄像头源 (索引/RTSP/视频文件)
-
- Returns:
- 是否成功
- """
- logger.info("=" * 60)
- logger.info(f"初始化 {SYSTEM_CONFIG['name']} v{SYSTEM_CONFIG['version']}")
- logger.info("=" * 60)
-
- # 初始化摄像头
- self.camera_source = camera_source
- if self.enable_panorama_camera:
- self.camera = SimpleCamera(camera_source)
-
- if not self.camera.connect():
- logger.error("连接摄像头失败")
- return False
-
- logger.info(f"摄像头连接成功: {camera_source}")
- else:
- self.camera = None
- logger.info("摄像头功能已禁用")
-
- # 初始化 PTZ 球机(安全模式可选联动)
- if self.enable_ptz_camera and self.enable_ptz_tracking:
- try:
- from dahua_sdk import DahuaSDK
- sdk_path = os.path.join(SDK_PATH['lib_path'], SDK_PATH['netsdk'])
- self.sdk = DahuaSDK(sdk_path)
- if self.sdk.init():
- from ptz_camera import PTZCamera
- ptz_config = self.config.get('ptz_camera', PTZ_CAMERA)
- self.ptz_camera = PTZCamera(self.sdk, ptz_config)
- if self.ptz_camera.connect():
- logger.info(f"PTZ球机连接成功: {ptz_config['ip']}")
- if self.ptz_camera.start_stream_rtsp():
- logger.info("PTZ球机RTSP流启动成功")
- else:
- logger.warning("PTZ球机RTSP流启动失败,PTZ跟踪将无法进行帧验证")
- else:
- logger.warning(f"PTZ球机连接失败: {ptz_config['ip']}")
- self.ptz_camera = None
- else:
- logger.warning("SDK初始化失败,PTZ功能不可用")
- self.sdk = None
- except Exception as e:
- logger.warning(f"PTZ球机初始化失败: {e},PTZ跟踪将不可用")
- self.ptz_camera = None
-
- # 初始化 LLM 大模型服务
- if self.enable_llm:
- try:
- llm_config = {**LLM_CONFIG}
- if 'llm_host' in self.config:
- llm_config['api_host'] = self.config['llm_host']
- if 'llm_port' in self.config:
- llm_config['api_port'] = self.config['llm_port']
-
- self.llm_analyzer = SafetyAnalyzer(llm_config)
- logger.info(f"大模型分析器初始化成功: {llm_config['api_host']}:{llm_config['api_port']}")
- except Exception as e:
- logger.warning(f"大模型分析器初始化失败: {e},将使用规则判断")
- self.use_llm = False
-
- # 初始化安全检测器 (支持 LLM)
- if self.enable_detection and self.enable_safety_detection:
- try:
- llm_config = {**LLM_CONFIG} if self.enable_llm else None
-
- self.detector = LLMSafetyDetector(
- yolo_model_path=self.config.get('model_path', SAFETY_DETECTION_CONFIG.get('model_path')),
- llm_config=llm_config,
- use_gpu=self.config.get('use_gpu', SAFETY_DETECTION_CONFIG.get('use_gpu', True)),
- use_llm=self.enable_llm,
- model_type=self.config.get('model_type', SAFETY_DETECTION_CONFIG.get('model_type', 'auto'))
- )
- logger.info("安全检测器初始化成功")
- except Exception as e:
- logger.error(f"安全检测器初始化失败: {e}")
- return False
- else:
- self.detector = None
- logger.info("安全检测功能已禁用")
-
- # 初始化编号识别器
- if self.enable_ocr and self.enable_llm:
- try:
- self.number_recognizer = NumberRecognizer(LLM_CONFIG)
- logger.info("编号识别器初始化成功")
- except Exception as e:
- logger.warning(f"编号识别器初始化失败: {e}")
- self.number_recognizer = None
-
- # 初始化事件推送器
- if self.enable_event_push:
- try:
- push_config = {**EVENT_PUSHER_CONFIG}
- if 'api_host' in self.config:
- push_config['api_host'] = self.config['api_host']
- if 'api_port' in self.config:
- push_config['api_port'] = self.config['api_port']
-
- self.event_pusher = EventPusher(push_config)
- logger.info("事件推送器初始化成功")
- except Exception as e:
- logger.warning(f"事件推送器初始化失败: {e}")
-
- # 初始化语音播报器
- if self.enable_voice_announce:
- try:
- self.voice_announcer = VoiceAnnouncer(
- tts_config=VOICE_ANNOUNCER_CONFIG.get('tts', {}),
- player_config=VOICE_ANNOUNCER_CONFIG.get('player', {})
- )
- logger.info("语音播报器初始化成功")
- except Exception as e:
- logger.warning(f"语音播报器初始化失败: {e}")
-
- logger.info("系统初始化完成")
- return True
-
- def start(self) -> bool:
- """启动系统"""
- if self.running:
- logger.warning("系统已在运行")
- return True
-
- logger.info("启动安全监控系统...")
-
- # 启动事件推送器
- if self.event_pusher:
- self.event_pusher.start()
-
- # 启动语音播报器
- if self.voice_announcer:
- self.voice_announcer.start()
-
- # 启动检测线程
- self.running = True
- self.detection_thread = threading.Thread(target=self._detection_worker, daemon=True)
- self.detection_thread.start()
-
- with self.stats_lock:
- self.stats['start_time'] = time.time()
-
- logger.info("安全监控系统启动成功")
- return True
-
- def stop(self):
- """停止系统"""
- if not self.running:
- return
-
- logger.info("停止安全监控系统...")
-
- self.running = False
-
- if self.detection_thread:
- self.detection_thread.join(timeout=3)
-
- if self.event_pusher:
- self.event_pusher.stop()
-
- if self.voice_announcer:
- self.voice_announcer.stop()
-
- if self.camera:
- self.camera.disconnect()
-
- if self.ptz_camera:
- self.ptz_camera.stop_stream()
- self.ptz_camera.disconnect()
-
- if self.stream_manager:
- self.stream_manager.stop_all()
-
- if self.sdk:
- try:
- self.sdk.cleanup()
- except Exception:
- pass
-
- self._print_stats()
- logger.info("安全监控系统已停止")
-
- def _detection_worker(self):
- """检测工作线程"""
- # 检查摄像头和检测是否启用
- if not self.enable_panorama_camera or not self.enable_detection:
- logger.info("摄像头或检测功能已禁用,检测线程休眠")
- while self.running:
- time.sleep(1)
- return
-
- # 优先使用 detection_fps,默认每秒2帧
- detection_fps = SAFETY_DETECTION_CONFIG.get('detection_fps', 2)
- detection_interval = 1.0 / detection_fps # 根据FPS计算间隔
- last_detection_time = 0
-
- # 告警冷却(按违规类型)
- alert_cooldown = {}
- cooldown_time = SAFETY_DETECTION_CONFIG.get('alert_cooldown', 3.0)
-
- while self.running:
- try:
- current_time = time.time()
-
- # 获取帧
- frame = self.camera.get_frame() if self.camera else None
- if frame is None:
- time.sleep(0.01)
- continue
-
- with self.frame_lock:
- self.current_frame = frame.copy()
-
- self._update_stats('frames_processed')
-
- # 周期性检测
- if current_time - last_detection_time >= detection_interval:
- last_detection_time = current_time
-
- # 执行检测
- detections = self.detector.detect(frame)
- status_list = self.detector.check_safety(frame, detections)
-
- self._update_stats('persons_detected', len(status_list))
-
- # 轨迹追踪已禁用
-
- # 处理违规
- for status in status_list:
- if status.is_violation:
- # 检查冷却(按违规类型)
- violation_key = status.get_violation_desc()
- if violation_key in alert_cooldown:
- if current_time - alert_cooldown[violation_key] < cooldown_time:
- continue
-
- alert_cooldown[violation_key] = current_time
-
- self._handle_violation(status, frame)
-
- # 显示画面
- if self.display:
- self._display_frame(frame, detections, status_list)
-
- time.sleep(0.01)
-
- except Exception as e:
- logger.error(f"检测错误: {e}")
- time.sleep(0.1)
-
- def _handle_violation(self, status: PersonSafetyStatus, frame: np.ndarray):
- """处理违规"""
- description = status.get_violation_desc()
-
- self._update_stats('violations_detected')
-
- # 裁剪人体区域
- x1, y1, x2, y2 = status.person_bbox
- margin = 20
- x1 = max(0, x1 - margin)
- y1 = max(0, y1 - margin)
- x2 = min(frame.shape[1], x2 + margin)
- y2 = min(frame.shape[0], y2 + margin)
- person_image = frame[y1:y2, x1:x2].copy()
-
- # 编号识别
- number_text = None
- if self.number_recognizer:
- try:
- number_result = self.number_recognizer.recognize_person_number(person_image)
- number_text = number_result.get('number')
- if number_text:
- logger.info(f"识别到编号: {number_text}")
- except Exception as e:
- logger.warning(f"编号识别失败: {e}")
-
- # 如果识别到编号,添加到描述中
- if number_text:
- description = f"{description} (编号: {number_text})"
-
- # 推送事件
- if self.event_pusher:
- self.event_pusher.push_safety_violation(
- description=description,
- image=person_image,
- track_id=status.track_id,
- confidence=status.person_conf
- )
- self._update_stats('events_pushed')
-
- # 语音播报
- if self.voice_announcer:
- self.voice_announcer.announce_violation(description, urgent=True)
- self._update_stats('voice_announced')
-
- logger.warning(f"[违规] {description}")
-
- def _display_frame(self, frame: np.ndarray,
- detections: List[SafetyDetection],
- status_list: List[PersonSafetyStatus]):
- """显示帧"""
- # 绘制检测结果
- result_frame = draw_safety_result(frame, detections, status_list)
-
- # 添加统计信息
- stats_text = f"FPS: {self._get_fps():.1f}"
- cv2.putText(result_frame, stats_text, (10, 30),
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
-
- cv2.imshow('Safety Monitor', result_frame)
-
- # 按 'q' 退出
- if cv2.waitKey(1) & 0xFF == ord('q'):
- self.running = False
-
- def _get_fps(self) -> float:
- """获取帧率"""
- with self.stats_lock:
- if self.stats['start_time']:
- elapsed = time.time() - self.stats['start_time']
- if elapsed > 0:
- return self.stats['frames_processed'] / elapsed
- return 0.0
-
- def _update_stats(self, key: str, value: int = 1):
- """更新统计"""
- with self.stats_lock:
- if key in self.stats:
- self.stats[key] += value
-
- def _print_stats(self):
- """打印统计"""
- with self.stats_lock:
- if self.stats['start_time']:
- elapsed = time.time() - self.stats['start_time']
- print("\n" + "=" * 50)
- print("安全检测统计")
- print("=" * 50)
- print(f"运行时长: {elapsed:.1f} 秒")
- print(f"处理帧数: {self.stats['frames_processed']}")
- print(f"检测人员: {self.stats['persons_detected']} 次")
- print(f"违规检测: {self.stats['violations_detected']} 次")
- print(f"事件推送: {self.stats['events_pushed']} 次")
- print(f"语音播报: {self.stats['voice_announced']} 次")
-
- if self.event_pusher:
- push_stats = self.event_pusher.get_stats()
- print(f"推送成功: {push_stats['pushed_events']}")
- print(f"推送失败: {push_stats['failed_events']}")
-
- if self.voice_announcer:
- voice_stats = self.voice_announcer.get_stats()
- print(f"播报成功: {voice_stats['played_commands']}")
- print(f"播报失败: {voice_stats['failed_commands']}")
-
- print("=" * 50 + "\n")
-
- def get_stats(self) -> dict:
- """获取统计"""
- with self.stats_lock:
- return self.stats.copy()
-
-
- def announce(self, text: str):
- """
- 手动播报语音
-
- Args:
- text: 播报文本
- """
- if self.voice_announcer:
- self.voice_announcer.announce(text, priority=VoicePriority.NORMAL)
- def run_interactive(system: SafetyMonitorSystem):
- """
- 交互模式运行
-
- Args:
- system: 系统实例
- """
- print("\n施工现场安全监控系统 - 交互模式")
- print("=" * 50)
- print("命令:")
- print(" s - 开始/停止监控")
- print(" a - 手动播报 (输入文本)")
- print(" r - 查看统计信息")
- print(" q - 退出")
- print("=" * 50)
-
- running = False
-
- while True:
- try:
- cmd = input("\n> ").strip().lower()
-
- if cmd == 'q':
- break
-
- elif cmd == 's':
- if running:
- system.stop()
- running = False
- print("监控已停止")
- else:
- if system.start():
- running = True
- print("监控已启动")
-
- elif cmd == 'a':
- text = input("输入播报文本: ").strip()
- if text:
- system.announce(text)
- print(f"已播报: {text}")
-
- elif cmd == 'r':
- stats = system.get_stats()
- print("\n统计信息:")
- for k, v in stats.items():
- if v is not None:
- print(f" {k}: {v}")
-
- else:
- print("未知命令")
-
- except KeyboardInterrupt:
- break
- except Exception as e:
- print(f"错误: {e}")
-
- print("退出交互模式")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(
- description='施工现场安全行为智能识别系统'
- )
-
- # 摄像头参数
- parser.add_argument('--camera', type=str, default='0',
- help='摄像头源 (索引/RTSP地址/视频文件)')
- parser.add_argument('--no-display', action='store_true',
- help='不显示画面')
-
- # 检测参数
- parser.add_argument('--model', type=str,
- help='安全检测模型路径')
- parser.add_argument('--conf', type=float, default=0.5,
- help='置信度阈值')
- parser.add_argument('--person-conf', type=float, default=0.8,
- help='人员检测置信度阈值')
- parser.add_argument('--no-gpu', action='store_true',
- help='不使用GPU')
-
- # LLM 大模型参数
- parser.add_argument('--llm-host', type=str,
- help='大模型 API 主机')
- parser.add_argument('--llm-port', type=int,
- help='大模型 API 端口')
- parser.add_argument('--no-llm', action='store_true',
- help='禁用大模型判断,使用规则判断')
- parser.add_argument('--no-ocr', action='store_true',
- help='禁用编号识别')
-
- # 业务平台参数
- parser.add_argument('--api-host', type=str,
- help='业务平台 API 主机')
- parser.add_argument('--api-port', type=int,
- help='业务平台 API 端口')
- parser.add_argument('--no-push', action='store_true',
- help='禁用事件推送')
- parser.add_argument('--no-voice', action='store_true',
- help='禁用语音播报')
-
- # 运行模式
- parser.add_argument('--interactive', action='store_true',
- help='交互模式')
- parser.add_argument('--demo', action='store_true',
- help='演示模式')
-
- args = parser.parse_args()
-
- # 构建配置
- config = {}
-
- if args.model:
- config['model_path'] = args.model
- config['conf_threshold'] = args.conf
- config['person_threshold'] = args.person_conf
- config['use_gpu'] = not args.no_gpu
-
- # LLM 配置
- if args.llm_host:
- config['llm_host'] = args.llm_host
- if args.llm_port:
- config['llm_port'] = args.llm_port
-
- if args.api_host:
- config['api_host'] = args.api_host
- if args.api_port:
- config['api_port'] = args.api_port
-
- # 演示模式
- if args.demo:
- print("\n施工现场安全行为智能识别系统")
- print("=" * 60)
- print("""
- 系统功能:
- 1. 实时视频监控
- 2. YOLO11 检测: 人员、安全帽、反光衣
- 3. 大模型判断: 安全状态分析
- 4. 编号识别: OCR 识别衣服上的工号
- 5. 事件推送到业务平台
- 6. 接收平台指令,TTS 语音播报
- 系统架构:
- ┌─────────────────────────────────────────────────────┐
- │ 摄像头视频流 │
- └─────────────────────────────────────────────────────┘
- │
- ▼
- ┌─────────────────────────────────────────────────────┐
- │ YOLO11 安全检测模型 │
- │ 检测类别: 人员(3)、安全帽(0)、反光衣(4) │
- └─────────────────────────────────────────────────────┘
- │
- ▼
- ┌─────────────────────────────────────────────────────┐
- │ 大模型安全状态判断 │
- │ 分析: 是否佩戴安全帽、是否穿反光衣 │
- │ 识别: 衣服上的工号/编号 │
- └─────────────────────────────────────────────────────┘
- │
- ┌─────────────┼─────────────┐
- ▼ ▼ ▼
- ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
- │ 事件推送 │ │ 语音播报 │ │ 编号记录 │
- │ 业务平台API │ │ TTS服务 │ │ 身份关联 │
- └─────────────┘ └─────────────┘ └─────────────┘
- 运行命令:
- python safety_main.py --camera 0 # 使用默认摄像头
- python safety_main.py --camera rtsp://... # RTSP 流
- python safety_main.py --camera video.mp4 # 视频文件
- python safety_main.py --interactive # 交互模式
- python safety_main.py --no-display # 无界面模式
- python safety_main.py --no-llm # 禁用大模型,使用规则判断
- python safety_main.py --no-ocr # 禁用编号识别
- python safety_main.py --llm-host localhost --llm-port 8111 # 指定大模型服务
- """)
- return 0
-
- # 创建系统
- system = SafetyMonitorSystem(config)
- system.display = not args.no_display
-
- # 设置信号处理
- def signal_handler(sig, frame):
- print("\n接收到停止信号")
- system.stop()
- sys.exit(0)
-
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
-
- try:
- # 解析摄像头源
- camera_source = args.camera
- if camera_source.isdigit():
- camera_source = int(camera_source)
-
- # 初始化
- if not system.initialize(camera_source):
- print("\n系统初始化失败!")
- return 1
-
- # 禁用功能 (命令行参数覆盖配置)
- if args.no_llm:
- system.enable_llm = False
- print("大模型判断已禁用,使用规则判断")
- if args.no_ocr:
- system.enable_ocr = False
- system.number_recognizer = None
- print("编号识别已禁用")
- if args.no_push:
- system.enable_event_push = False
- system.event_pusher = None
- print("事件推送已禁用")
- if args.no_voice:
- system.enable_voice_announce = False
- system.voice_announcer = None
- print("语音播报已禁用")
-
- # 运行
- if args.interactive:
- run_interactive(system)
- else:
- # 自动模式
- if not system.start():
- print("启动失败")
- return 1
-
- print("\n系统运行中,按 Ctrl+C 停止")
- print("(按 'q' 键退出显示窗口)\n")
-
- # 主循环
- while system.running:
- time.sleep(0.1)
-
- except KeyboardInterrupt:
- print("\n接收到停止信号")
-
- finally:
- system.stop()
-
- return 0
- if __name__ == '__main__':
- sys.exit(main() or 0)
|