app.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. """FastAPI 应用工厂与全局服务初始化."""
  2. import os
  3. import tempfile
  4. import threading
  5. import time
  6. from contextlib import asynccontextmanager
  7. import cv2
  8. from fastapi import FastAPI
  9. from fastapi.staticfiles import StaticFiles
  10. from fastapi.responses import FileResponse
  11. from config import CAMERA_GROUPS, SDK_PATH, SYSTEM_CONFIG
  12. from dahua_sdk import DahuaSDK
  13. from ptz_camera import PTZCamera
  14. from third_party_pusher import get_third_party_pusher
  15. from core.stream_manager import StreamManager
  16. from core.scan_point_store import ScanPointStore
  17. from core.spatial_scanner import SpatialScanner
  18. from core.polling_scheduler import PollingScheduler
  19. from core.capture_uploader import CaptureUploader
  20. from core.detector_service import DetectorService
  21. from core.group_state import group_state
  22. from web.routes import router
  23. from web.state import WebState
  24. import web.state as _web_state_module
  25. def build_rtsp_url(camera_config: dict) -> str:
  26. if camera_config.get("rtsp_url"):
  27. return camera_config["rtsp_url"]
  28. ip = camera_config["ip"]
  29. port = camera_config.get("rtsp_port", 554)
  30. username = camera_config["username"]
  31. password = camera_config["password"]
  32. channel = camera_config.get("rtsp_channel") or camera_config.get("channel", 1)
  33. subtype = camera_config.get("subtype", 0)
  34. return f"rtsp://{username}:{password}@{ip}:{port}/cam/realmonitor?channel={channel}&subtype={subtype}"
  35. def create_app(test_mode: bool = False) -> FastAPI:
  36. @asynccontextmanager
  37. async def lifespan(app: FastAPI):
  38. # 重置全局状态,允许同一进程内反复创建应用(测试场景)
  39. group_state.reset()
  40. stream_manager = StreamManager()
  41. store_path = os.path.join(tempfile.mkdtemp(), "scan_models.json") if test_mode else "data/scan_models.json"
  42. scan_store = ScanPointStore(store_path)
  43. scanners: dict = {}
  44. schedulers: dict = {}
  45. ptz_cameras: dict = {}
  46. threads: list = []
  47. stop_event = threading.Event()
  48. pusher = None
  49. sdk = None
  50. detector_service = None
  51. if not test_mode:
  52. if SYSTEM_CONFIG.get("enable_detection", True):
  53. detector_service = DetectorService()
  54. if SYSTEM_CONFIG.get("enable_ptz_camera", True):
  55. try:
  56. sdk_path = os.path.join(SDK_PATH["lib_path"], SDK_PATH.get("netsdk", "libdhnetsdk.so"))
  57. sdk = DahuaSDK(sdk_path)
  58. sdk.init()
  59. except Exception as exc:
  60. print(f"[lifespan] SDK init failed: {exc}")
  61. sdk = None
  62. if SYSTEM_CONFIG.get("enable_event_push", False):
  63. try:
  64. pusher = get_third_party_pusher()
  65. if pusher and not pusher.running:
  66. pusher.start()
  67. except Exception as exc:
  68. print(f"[lifespan] Pusher start failed: {exc}")
  69. pusher = None
  70. for group in CAMERA_GROUPS:
  71. if not group.get("enabled", True):
  72. continue
  73. gid = group.get("group_id", group.get("id"))
  74. try:
  75. ptz_cfg = group["ptz"]
  76. pano_cfg = group["panorama"]
  77. pano_url = build_rtsp_url(pano_cfg)
  78. ptz_url = build_rtsp_url(ptz_cfg)
  79. group_state.init_group(gid, pano_url, ptz_url, ptz_cfg)
  80. group_state.update(gid, "ptz_connected", False)
  81. scan_store.ensure_group(gid, {
  82. "ptz_name": ptz_cfg.get("name", gid),
  83. "panorama_name": pano_cfg.get("name", gid),
  84. })
  85. if test_mode:
  86. continue
  87. # 注册枪机 RTSP 流
  88. if SYSTEM_CONFIG.get("enable_panorama_camera", True):
  89. stream_manager.register(f"{gid}_panorama", pano_url)
  90. ptz = None
  91. if SYSTEM_CONFIG.get("enable_ptz_camera", True) and sdk is not None:
  92. # 连接球机
  93. ptz = PTZCamera(sdk, ptz_cfg)
  94. try:
  95. ptz_connected = ptz.connect()
  96. except Exception as exc:
  97. print(f"[lifespan] PTZ connect raised for group {gid}: {exc}")
  98. ptz_connected = False
  99. if ptz_connected:
  100. ptz_cameras[gid] = ptz
  101. group_state.update(gid, "ptz_connected", True)
  102. # 注册球机 RTSP 流
  103. stream_manager.register(f"{gid}_ptz", ptz_url)
  104. scanners[gid] = SpatialScanner(
  105. gid, ptz, lambda g=gid: stream_manager.get(f"{g}_ptz").get_frame()
  106. )
  107. def on_arrived(point, g=gid, ptz=ptz):
  108. group_state.update(g, "ptz_position", {
  109. "pan": point["pan"], "tilt": point["tilt"], "zoom": point.get("zoom", 1)
  110. })
  111. schedulers[gid] = PollingScheduler(
  112. gid, ptz,
  113. get_points=lambda g=gid: scan_store.list_enabled_points(g),
  114. on_arrived=on_arrived,
  115. default_dwell=3.0,
  116. )
  117. else:
  118. print(f"[lifespan] PTZ connect failed for group {gid}; skipping PTZ features")
  119. # 创建上传器(即使球机关闭,枪机检测仍可能使用)
  120. uploader = CaptureUploader(gid, upload_callback=pusher.report_batch if pusher else None)
  121. if SYSTEM_CONFIG.get("enable_detection", True) and detector_service is not None:
  122. def panorama_detect_loop(g=gid, uploader=uploader, detector=detector_service):
  123. interval = 0.5
  124. while not stop_event.is_set():
  125. try:
  126. stream = stream_manager.get(f"{g}_panorama")
  127. frame = stream.get_frame() if stream else None
  128. if frame is not None:
  129. dets = detector.detect(frame)
  130. marked = frame.copy()
  131. if dets:
  132. det_dicts = [{
  133. "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
  134. "confidence": d.confidence,
  135. } for d in dets]
  136. uploader.handle_detection("panorama", frame, det_dicts)
  137. for d in det_dicts:
  138. x1, y1, x2, y2 = d["bbox"]
  139. cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
  140. stream.set_marked_frame(marked)
  141. except Exception as exc:
  142. print(f"[panorama_detect_loop {g}] error: {exc}")
  143. time.sleep(interval)
  144. t_panorama = threading.Thread(target=panorama_detect_loop, daemon=True)
  145. t_panorama.start()
  146. threads.append(t_panorama)
  147. if ptz is not None:
  148. def ptz_detect_loop(g=gid, uploader=uploader, detector=detector_service):
  149. interval = 0.2
  150. while not stop_event.is_set():
  151. try:
  152. stream = stream_manager.get(f"{g}_ptz")
  153. frame = stream.get_frame() if stream else None
  154. if frame is not None:
  155. dets = detector.detect(frame)
  156. marked = frame.copy()
  157. if dets:
  158. det_dicts = [{
  159. "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
  160. "confidence": d.confidence,
  161. } for d in dets]
  162. pos = group_state.get(g).get("ptz_position")
  163. uploader.handle_detection("ptz", frame, det_dicts, pos)
  164. for d in det_dicts:
  165. x1, y1, x2, y2 = d["bbox"]
  166. cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
  167. stream.set_marked_frame(marked)
  168. except Exception as exc:
  169. print(f"[ptz_detect_loop {g}] error: {exc}")
  170. time.sleep(interval)
  171. t_ptz = threading.Thread(target=ptz_detect_loop, daemon=True)
  172. t_ptz.start()
  173. threads.append(t_ptz)
  174. except Exception as exc:
  175. print(f"[lifespan] Group {gid} setup failed: {exc}")
  176. continue
  177. _web_state_module.web_state = WebState(
  178. group_state, stream_manager, scan_store, scanners, schedulers, ptz_cameras
  179. )
  180. yield
  181. # 清理
  182. stop_event.set()
  183. for t in threads:
  184. try:
  185. t.join(timeout=2.0)
  186. except Exception as exc:
  187. print(f"[lifespan] Thread join error: {exc}")
  188. for s in schedulers.values():
  189. try:
  190. s.stop()
  191. except Exception as exc:
  192. print(f"[lifespan] Scheduler stop error: {exc}")
  193. for gid, ptz in ptz_cameras.items():
  194. try:
  195. ptz.disconnect()
  196. stream = stream_manager.get(f"{gid}_ptz")
  197. if stream:
  198. stream.stop()
  199. except Exception as exc:
  200. print(f"[lifespan] PTZ cleanup error for {gid}: {exc}")
  201. stream_manager.stop_all()
  202. if pusher and getattr(pusher, "running", False):
  203. try:
  204. pusher.stop()
  205. except Exception as exc:
  206. print(f"[lifespan] Pusher stop error: {exc}")
  207. if sdk:
  208. try:
  209. sdk.cleanup()
  210. except Exception as exc:
  211. print(f"[lifespan] SDK cleanup error: {exc}")
  212. if test_mode:
  213. # 在测试模式下预先初始化共享状态(不依赖 lifespan,兼容非上下文管理器的 TestClient)
  214. group_state.reset()
  215. test_stream_manager = StreamManager()
  216. test_store_path = os.path.join(tempfile.mkdtemp(), "scan_models.json")
  217. test_scan_store = ScanPointStore(test_store_path)
  218. for group in CAMERA_GROUPS:
  219. if not group.get("enabled", True):
  220. continue
  221. gid = group.get("group_id", group.get("id"))
  222. try:
  223. ptz_cfg = group["ptz"]
  224. pano_cfg = group["panorama"]
  225. pano_url = build_rtsp_url(pano_cfg)
  226. ptz_url = build_rtsp_url(ptz_cfg)
  227. group_state.init_group(gid, pano_url, ptz_url, ptz_cfg)
  228. group_state.update(gid, "ptz_connected", False)
  229. test_scan_store.ensure_group(gid, {
  230. "ptz_name": ptz_cfg.get("name", gid),
  231. "panorama_name": pano_cfg.get("name", gid),
  232. })
  233. except Exception as exc:
  234. print(f"[create_app] Test group {gid} setup failed: {exc}")
  235. _web_state_module.web_state = WebState(
  236. group_state, test_stream_manager, test_scan_store, {}, {}, {}
  237. )
  238. app = FastAPI(lifespan=lifespan)
  239. static_dir = os.path.join(os.path.dirname(__file__), "web_static")
  240. if os.path.isdir(static_dir):
  241. app.mount("/static", StaticFiles(directory=static_dir), name="static")
  242. app.include_router(router)
  243. @app.get("/")
  244. async def root():
  245. index_path = os.path.join(static_dir, "index.html")
  246. if os.path.isfile(index_path):
  247. return FileResponse(index_path)
  248. return {"message": "PTZ 360 scan + panorama polling system"}
  249. return app