coordinator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. """
  2. 联动控制器
  3. 协调全景摄像头和球机的工作
  4. """
  5. import time
  6. import threading
  7. import queue
  8. from typing import Optional, List, Dict, Tuple, Callable
  9. from dataclasses import dataclass, field
  10. from enum import Enum
  11. import numpy as np
  12. from config import COORDINATOR_CONFIG, SYSTEM_CONFIG
  13. from panorama_camera import PanoramaCamera, ObjectDetector, PersonTracker, DetectedObject
  14. from ptz_camera import PTZCamera, PTZController
  15. from ocr_recognizer import NumberDetector, PersonInfo
  16. class TrackingState(Enum):
  17. """跟踪状态"""
  18. IDLE = 0 # 空闲
  19. SEARCHING = 1 # 搜索目标
  20. TRACKING = 2 # 跟踪中
  21. ZOOMING = 3 # 变焦中
  22. OCR_PROCESSING = 4 # OCR处理中
  23. @dataclass
  24. class TrackingTarget:
  25. """跟踪目标"""
  26. track_id: int # 跟踪ID
  27. position: Tuple[float, float] # 位置比例 (x_ratio, y_ratio)
  28. last_update: float # 最后更新时间
  29. person_info: Optional[PersonInfo] = None # 人员信息
  30. priority: int = 0 # 优先级
  31. class Coordinator:
  32. """
  33. 联动控制器
  34. 协调全景摄像头和球机实现联动抓拍
  35. """
  36. def __init__(self, panorama_camera: PanoramaCamera,
  37. ptz_camera: PTZCamera,
  38. detector: ObjectDetector = None,
  39. number_detector: NumberDetector = None,
  40. calibrator = None):
  41. """
  42. 初始化联动控制器
  43. Args:
  44. panorama_camera: 全景摄像头
  45. ptz_camera: 球机
  46. detector: 物体检测器
  47. number_detector: 编号检测器
  48. calibrator: 校准器 (用于坐标转换)
  49. """
  50. self.panorama = panorama_camera
  51. self.ptz = ptz_camera
  52. self.detector = detector
  53. self.number_detector = number_detector
  54. self.calibrator = calibrator
  55. self.config = COORDINATOR_CONFIG
  56. # 功能开关 - 从 SYSTEM_CONFIG 读取
  57. self.enable_ptz_camera = SYSTEM_CONFIG.get('enable_ptz_camera', True)
  58. self.enable_ptz_tracking = SYSTEM_CONFIG.get('enable_ptz_tracking', True)
  59. self.enable_calibration = SYSTEM_CONFIG.get('enable_calibration', True)
  60. self.enable_detection = SYSTEM_CONFIG.get('enable_detection', True)
  61. self.enable_ocr = SYSTEM_CONFIG.get('enable_ocr', True)
  62. # 跟踪器
  63. self.tracker = PersonTracker()
  64. # 状态
  65. self.state = TrackingState.IDLE
  66. self.state_lock = threading.Lock()
  67. # 跟踪目标
  68. self.tracking_targets: Dict[int, TrackingTarget] = {}
  69. self.targets_lock = threading.Lock()
  70. # 当前跟踪目标
  71. self.current_target: Optional[TrackingTarget] = None
  72. # 回调函数
  73. self.on_person_detected: Optional[Callable] = None
  74. self.on_number_recognized: Optional[Callable] = None
  75. self.on_tracking_started: Optional[Callable] = None
  76. self.on_tracking_stopped: Optional[Callable] = None
  77. # 控制标志
  78. self.running = False
  79. self.coordinator_thread = None
  80. # OCR频率控制
  81. self.last_ocr_time = 0
  82. self.ocr_interval = 1.0 # OCR间隔(秒),避免过于频繁调用API
  83. # PTZ优化 - 避免频繁发送相同位置的命令
  84. self.last_ptz_position = None
  85. self.ptz_position_threshold = 0.02 # 位置变化阈值 (2%)
  86. # 结果队列
  87. self.result_queue = queue.Queue()
  88. # 性能统计
  89. self.stats = {
  90. 'frames_processed': 0,
  91. 'persons_detected': 0,
  92. 'ocr_attempts': 0,
  93. 'ocr_success': 0,
  94. 'start_time': None,
  95. 'last_frame_time': None,
  96. }
  97. self.stats_lock = threading.Lock()
  98. def set_calibrator(self, calibrator):
  99. """设置校准器"""
  100. self.calibrator = calibrator
  101. def _transform_position(self, x_ratio: float, y_ratio: float) -> Tuple[float, float, int]:
  102. """
  103. 将全景坐标转换为PTZ角度
  104. Args:
  105. x_ratio: X方向比例
  106. y_ratio: Y方向比例
  107. Returns:
  108. (pan, tilt, zoom)
  109. """
  110. if self.enable_calibration and self.calibrator and self.calibrator.is_calibrated():
  111. # 使用校准结果进行转换
  112. pan, tilt = self.calibrator.transform(x_ratio, y_ratio)
  113. zoom = 8 # 默认变倍
  114. else:
  115. # 使用默认估算
  116. pan, tilt, zoom = self.ptz.calculate_ptz_position(x_ratio, y_ratio)
  117. return (pan, tilt, zoom)
  118. def start(self) -> bool:
  119. """
  120. 启动联动系统
  121. Returns:
  122. 是否成功
  123. """
  124. # 连接全景摄像头
  125. if not self.panorama.connect():
  126. print("连接全景摄像头失败")
  127. return False
  128. # 连接 PTZ 球机 (可选)
  129. if self.enable_ptz_camera:
  130. if not self.ptz.connect():
  131. print("连接球机失败")
  132. self.panorama.disconnect()
  133. return False
  134. else:
  135. print("PTZ 球机功能已禁用")
  136. # 启动视频流(优先RTSP,SDK回调不可用时回退)
  137. if not self.panorama.start_stream_rtsp():
  138. print("RTSP视频流启动失败,尝试SDK方式...")
  139. if not self.panorama.start_stream():
  140. print("启动视频流失败")
  141. self.panorama.disconnect()
  142. if self.enable_ptz_camera:
  143. self.ptz.disconnect()
  144. return False
  145. # 启动联动线程
  146. self.running = True
  147. self.coordinator_thread = threading.Thread(target=self._coordinator_worker, daemon=True)
  148. self.coordinator_thread.start()
  149. print("联动系统已启动")
  150. return True
  151. def stop(self):
  152. """停止联动系统"""
  153. self.running = False
  154. if self.coordinator_thread:
  155. self.coordinator_thread.join(timeout=3)
  156. self.panorama.disconnect()
  157. if self.enable_ptz_camera:
  158. self.ptz.disconnect()
  159. # 打印统计信息
  160. self._print_stats()
  161. print("联动系统已停止")
  162. def _update_stats(self, key: str, value: int = 1):
  163. """更新统计信息"""
  164. with self.stats_lock:
  165. if key in self.stats:
  166. self.stats[key] += value
  167. def _print_stats(self):
  168. """打印统计信息"""
  169. with self.stats_lock:
  170. if self.stats['start_time'] and self.stats['frames_processed'] > 0:
  171. elapsed = time.time() - self.stats['start_time']
  172. fps = self.stats['frames_processed'] / elapsed
  173. print("\n=== 性能统计 ===")
  174. print(f"运行时长: {elapsed:.1f}秒")
  175. print(f"处理帧数: {self.stats['frames_processed']}")
  176. print(f"平均帧率: {fps:.1f} fps")
  177. print(f"检测人体: {self.stats['persons_detected']}次")
  178. print(f"OCR尝试: {self.stats['ocr_attempts']}次")
  179. print(f"OCR成功: {self.stats['ocr_success']}次")
  180. print("================\n")
  181. def get_stats(self) -> dict:
  182. """获取统计信息"""
  183. with self.stats_lock:
  184. return self.stats.copy()
  185. def _coordinator_worker(self):
  186. """联动工作线程"""
  187. last_detection_time = 0
  188. detection_interval = 0.1 # 检测间隔
  189. # 初始化统计
  190. with self.stats_lock:
  191. self.stats['start_time'] = time.time()
  192. while self.running:
  193. try:
  194. current_time = time.time()
  195. # 获取当前帧
  196. frame = self.panorama.get_frame()
  197. if frame is None:
  198. time.sleep(0.01)
  199. continue
  200. # 更新帧统计
  201. self._update_stats('frames_processed')
  202. frame_size = (frame.shape[1], frame.shape[0])
  203. # 周期性检测
  204. if current_time - last_detection_time >= detection_interval:
  205. last_detection_time = current_time
  206. # 检测人体
  207. detections = self._detect_persons(frame)
  208. # 更新检测统计
  209. if detections:
  210. self._update_stats('persons_detected', len(detections))
  211. # 更新跟踪
  212. tracked = self.tracker.update(detections)
  213. # 更新跟踪目标
  214. self._update_tracking_targets(tracked, frame_size)
  215. # 处理检测结果
  216. if tracked:
  217. self._process_detections(tracked, frame, frame_size)
  218. # 处理当前跟踪目标
  219. self._process_current_target(frame, frame_size)
  220. # 清理过期目标
  221. self._cleanup_expired_targets()
  222. time.sleep(0.01)
  223. except Exception as e:
  224. print(f"联动处理错误: {e}")
  225. time.sleep(0.1)
  226. def _detect_persons(self, frame: np.ndarray) -> List[DetectedObject]:
  227. """检测人体"""
  228. if not self.enable_detection or self.detector is None:
  229. return []
  230. return self.detector.detect_persons(frame)
  231. def _update_tracking_targets(self, detections: List[DetectedObject],
  232. frame_size: Tuple[int, int]):
  233. """更新跟踪目标"""
  234. current_time = time.time()
  235. with self.targets_lock:
  236. # 更新现有目标
  237. for det in detections:
  238. if det.track_id is None:
  239. continue
  240. x_ratio = det.center[0] / frame_size[0]
  241. y_ratio = det.center[1] / frame_size[1]
  242. if det.track_id in self.tracking_targets:
  243. # 更新位置
  244. target = self.tracking_targets[det.track_id]
  245. target.position = (x_ratio, y_ratio)
  246. target.last_update = current_time
  247. else:
  248. # 新目标
  249. if len(self.tracking_targets) < self.config['max_tracking_targets']:
  250. self.tracking_targets[det.track_id] = TrackingTarget(
  251. track_id=det.track_id,
  252. position=(x_ratio, y_ratio),
  253. last_update=current_time
  254. )
  255. def _process_detections(self, detections: List[DetectedObject],
  256. frame: np.ndarray, frame_size: Tuple[int, int]):
  257. """处理检测结果"""
  258. if self.on_person_detected:
  259. for det in detections:
  260. self.on_person_detected(det, frame)
  261. def _process_current_target(self, frame: np.ndarray, frame_size: Tuple[int, int]):
  262. """处理当前跟踪目标"""
  263. with self.targets_lock:
  264. if not self.tracking_targets:
  265. self._set_state(TrackingState.IDLE)
  266. self.current_target = None
  267. return
  268. # 选择优先级最高的目标(这里选择最新的)
  269. if self.current_target is None or \
  270. self.current_target.track_id not in self.tracking_targets:
  271. # 选择一个新目标
  272. target_id = list(self.tracking_targets.keys())[0]
  273. self.current_target = self.tracking_targets[target_id]
  274. if self.current_target:
  275. # 移动球机到目标位置 (仅在 PTZ 跟踪启用时)
  276. if self.enable_ptz_tracking and self.enable_ptz_camera:
  277. self._set_state(TrackingState.TRACKING)
  278. x_ratio, y_ratio = self.current_target.position
  279. # 检查位置是否变化超过阈值
  280. should_move = True
  281. if self.last_ptz_position is not None:
  282. last_x, last_y = self.last_ptz_position
  283. if (abs(x_ratio - last_x) < self.ptz_position_threshold and
  284. abs(y_ratio - last_y) < self.ptz_position_threshold):
  285. should_move = False
  286. if should_move:
  287. self.ptz.track_target(x_ratio, y_ratio)
  288. self.last_ptz_position = (x_ratio, y_ratio)
  289. # 执行OCR识别 (仅在 OCR 启用时)
  290. if self.enable_ocr:
  291. self._perform_ocr(frame, self.current_target)
  292. def _perform_ocr(self, frame: np.ndarray, target: TrackingTarget):
  293. """执行OCR识别"""
  294. if not self.enable_ocr or self.number_detector is None:
  295. return
  296. # 频率控制 - 避免过于频繁调用OCR API
  297. current_time = time.time()
  298. if current_time - self.last_ocr_time < self.ocr_interval:
  299. return
  300. self.last_ocr_time = current_time
  301. # 更新OCR尝试统计
  302. self._update_stats('ocr_attempts')
  303. # 计算人体边界框 (基于位置估算)
  304. frame_h, frame_w = frame.shape[:2]
  305. # 人体占画面比例 (可配置,默认宽20%、高40%)
  306. person_width_ratio = self.config.get('person_width_ratio', 0.2)
  307. person_height_ratio = self.config.get('person_height_ratio', 0.4)
  308. person_width = int(frame_w * person_width_ratio)
  309. person_height = int(frame_h * person_height_ratio)
  310. x_ratio, y_ratio = target.position
  311. center_x = int(x_ratio * frame_w)
  312. center_y = int(y_ratio * frame_h)
  313. # 计算边界框,确保不超出画面范围
  314. x1 = max(0, center_x - person_width // 2)
  315. y1 = max(0, center_y - person_height // 2)
  316. x2 = min(frame_w, x1 + person_width)
  317. y2 = min(frame_h, y1 + person_height)
  318. # 更新实际宽高 (可能因边界裁剪而变小)
  319. actual_width = x2 - x1
  320. actual_height = y2 - y1
  321. person_bbox = (x1, y1, actual_width, actual_height)
  322. # 检测编号
  323. self._set_state(TrackingState.OCR_PROCESSING)
  324. person_info = self.number_detector.detect_number(frame, person_bbox)
  325. person_info.person_id = target.track_id
  326. # 更新OCR成功统计
  327. if person_info.number_text:
  328. self._update_stats('ocr_success')
  329. # 更新目标信息
  330. with self.targets_lock:
  331. if target.track_id in self.tracking_targets:
  332. self.tracking_targets[target.track_id].person_info = person_info
  333. # 回调
  334. if self.on_number_recognized and person_info.number_text:
  335. self.on_number_recognized(person_info)
  336. # 放入结果队列
  337. self.result_queue.put(person_info)
  338. def _cleanup_expired_targets(self):
  339. """清理过期目标"""
  340. current_time = time.time()
  341. timeout = self.config['tracking_timeout']
  342. with self.targets_lock:
  343. expired_ids = [
  344. target_id for target_id, target in self.tracking_targets.items()
  345. if current_time - target.last_update > timeout
  346. ]
  347. for target_id in expired_ids:
  348. del self.tracking_targets[target_id]
  349. if self.current_target and self.current_target.track_id == target_id:
  350. self.current_target = None
  351. def _set_state(self, state: TrackingState):
  352. """设置状态"""
  353. with self.state_lock:
  354. self.state = state
  355. def get_state(self) -> TrackingState:
  356. """获取状态"""
  357. with self.state_lock:
  358. return self.state
  359. def get_results(self) -> List[PersonInfo]:
  360. """
  361. 获取识别结果
  362. Returns:
  363. 人员信息列表
  364. """
  365. results = []
  366. while not self.result_queue.empty():
  367. try:
  368. results.append(self.result_queue.get_nowait())
  369. except queue.Empty:
  370. break
  371. return results
  372. def get_tracking_targets(self) -> List[TrackingTarget]:
  373. """获取当前跟踪目标"""
  374. with self.targets_lock:
  375. return list(self.tracking_targets.values())
  376. def force_track_position(self, x_ratio: float, y_ratio: float, zoom: int = None):
  377. """
  378. 强制跟踪指定位置
  379. Args:
  380. x_ratio: X方向比例
  381. y_ratio: Y方向比例
  382. zoom: 变倍
  383. """
  384. if self.enable_ptz_tracking and self.enable_ptz_camera:
  385. self.ptz.move_to_target(x_ratio, y_ratio, zoom)
  386. def capture_snapshot(self) -> Optional[np.ndarray]:
  387. """
  388. 抓拍快照
  389. Returns:
  390. 快照图像
  391. """
  392. return self.panorama.get_frame()
  393. class EventDrivenCoordinator(Coordinator):
  394. """
  395. 事件驱动联动控制器
  396. 当全景摄像头检测到事件时触发联动
  397. """
  398. def __init__(self, *args, **kwargs):
  399. super().__init__(*args, **kwargs)
  400. # 事件类型
  401. self.event_types = {
  402. 'intruder': True, # 入侵检测
  403. 'crossline': True, # 越界检测
  404. 'motion': True, # 移动侦测
  405. }
  406. # 事件队列
  407. self.event_queue = queue.Queue()
  408. def on_event(self, event_type: str, event_data: dict):
  409. """
  410. 事件回调
  411. Args:
  412. event_type: 事件类型
  413. event_data: 事件数据
  414. """
  415. if not self.event_types.get(event_type, False):
  416. return
  417. self.event_queue.put({
  418. 'type': event_type,
  419. 'data': event_data,
  420. 'time': time.time()
  421. })
  422. def _coordinator_worker(self):
  423. """联动工作线程 (事件驱动版本)"""
  424. while self.running:
  425. try:
  426. # 处理事件
  427. try:
  428. event = self.event_queue.get(timeout=0.1)
  429. self._process_event(event)
  430. except queue.Empty:
  431. pass
  432. # 继续常规处理
  433. frame = self.panorama.get_frame()
  434. if frame is not None:
  435. frame_size = (frame.shape[1], frame.shape[0])
  436. detections = self._detect_persons(frame)
  437. if detections:
  438. tracked = self.tracker.update(detections)
  439. self._update_tracking_targets(tracked, frame_size)
  440. self._process_current_target(frame, frame_size)
  441. self._cleanup_expired_targets()
  442. except Exception as e:
  443. print(f"事件处理错误: {e}")
  444. time.sleep(0.1)
  445. def _process_event(self, event: dict):
  446. """处理事件"""
  447. event_type = event['type']
  448. event_data = event['data']
  449. print(f"处理事件: {event_type}")
  450. # 根据事件类型处理
  451. if event_type == 'intruder':
  452. # 入侵事件 - 获取入侵位置
  453. if 'position' in event_data:
  454. x_ratio, y_ratio = event_data['position']
  455. self.force_track_position(x_ratio, y_ratio)
  456. elif event_type == 'crossline':
  457. # 越界事件
  458. pass
  459. elif event_type == 'motion':
  460. # 移动侦测事件
  461. pass