app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. """FastAPI 应用工厂与全局服务初始化."""
  2. import logging
  3. import os
  4. import threading
  5. import time
  6. from contextlib import asynccontextmanager
  7. import cv2
  8. from fastapi import FastAPI
  9. from config import CAMERA_GROUPS, SYSTEM_CONFIG, STORAGE_CONFIG
  10. from third_party_pusher import get_third_party_pusher
  11. from core.stream_manager import StreamManager
  12. from core.capture_uploader import CaptureUploader
  13. from core.detector_service import DetectorService
  14. from core.group_state import group_state
  15. from core.oss_uploader import OSSUploader
  16. from core.file_cleanup import make_cleanup_workers
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  20. force=True,
  21. )
  22. logger = logging.getLogger(__name__)
  23. def build_rtsp_url(camera_config: dict) -> str:
  24. if camera_config.get("rtsp_url"):
  25. return camera_config["rtsp_url"]
  26. ip = camera_config["ip"]
  27. port = camera_config.get("rtsp_port", 554)
  28. username = camera_config["username"]
  29. password = camera_config["password"]
  30. channel = camera_config.get("rtsp_channel") or camera_config.get("channel", 1)
  31. subtype = camera_config.get("subtype", 0)
  32. return f"rtsp://{username}:{password}@{ip}:{port}/cam/realmonitor?channel={channel}&subtype={subtype}"
  33. def create_app(test_mode: bool = False) -> FastAPI:
  34. @asynccontextmanager
  35. async def lifespan(app: FastAPI):
  36. group_state.reset()
  37. stream_manager = StreamManager()
  38. threads: list = []
  39. stop_event = threading.Event()
  40. pusher = None
  41. detector_service = None
  42. oss_uploader = None
  43. cleanup_workers: list = []
  44. app.state.stream_manager = stream_manager
  45. app.state.pusher = None
  46. app.state.oss_uploader = None
  47. app.state.detector_service = None
  48. if not test_mode:
  49. if SYSTEM_CONFIG.get("enable_detection", True):
  50. detector_service = DetectorService()
  51. app.state.detector_service = detector_service
  52. # 始终启用第三方推送(只要在配置中 enabled=True)
  53. try:
  54. oss_uploader = OSSUploader()
  55. except Exception as exc:
  56. print(f"[lifespan] OSS uploader init failed: {exc}")
  57. oss_uploader = None
  58. try:
  59. pusher = get_third_party_pusher()
  60. if pusher and not pusher.running:
  61. pusher.start()
  62. app.state.pusher = pusher
  63. except Exception as exc:
  64. print(f"[lifespan] Pusher start failed: {exc}")
  65. pusher = None
  66. app.state.pusher = None
  67. if pusher is not None:
  68. def _delete_captures_on_success(report):
  69. if STORAGE_CONFIG.get("captures", {}).get("keep_local_copy", False):
  70. return
  71. for path in report.batch_info.get("image_paths") or []:
  72. try:
  73. if path and os.path.exists(path):
  74. os.remove(path)
  75. logger.info("[cleanup] 上报成功后删除本地图片: %s", path)
  76. except Exception as exc:
  77. logger.warning("[cleanup] 删除本地图片失败: %s, %s", path, exc)
  78. try:
  79. pusher.set_callbacks(on_success=_delete_captures_on_success)
  80. except Exception as exc:
  81. print(f"[lifespan] set pusher success callback failed: {exc}")
  82. try:
  83. group_ids = [
  84. g.get("group_id", g.get("id"))
  85. for g in CAMERA_GROUPS
  86. if g.get("enabled", True)
  87. ]
  88. cleanup_workers = make_cleanup_workers(STORAGE_CONFIG, group_ids)
  89. for w in cleanup_workers:
  90. w.start()
  91. except Exception as exc:
  92. print(f"[lifespan] cleanup worker start failed: {exc}")
  93. for group in CAMERA_GROUPS:
  94. if not group.get("enabled", True):
  95. continue
  96. gid = group.get("group_id", group.get("id"))
  97. try:
  98. pano_cfg = group["panorama"]
  99. pano_url = build_rtsp_url(pano_cfg)
  100. ptz_cfg = group.get("ptz")
  101. ptz_url = build_rtsp_url(ptz_cfg) if ptz_cfg else None
  102. group_state.init_group(gid, pano_url, ptz_url, None)
  103. if test_mode:
  104. continue
  105. if SYSTEM_CONFIG.get("enable_panorama_camera", True):
  106. stream_manager.register(f"{gid}_panorama", pano_url)
  107. if ptz_url:
  108. stream_manager.register(f"{gid}_ptz", ptz_url)
  109. uploader = CaptureUploader(
  110. gid,
  111. upload_callback=pusher.report_batch if pusher else None,
  112. oss_uploader=oss_uploader,
  113. )
  114. if SYSTEM_CONFIG.get("enable_detection", True) and detector_service is not None:
  115. def panorama_detect_loop(g=gid, uploader=uploader, detector=detector_service):
  116. interval = 0.5
  117. frame_count = 0
  118. while not stop_event.is_set():
  119. try:
  120. stream = stream_manager.get(f"{g}_panorama")
  121. frame = stream.get_frame() if stream else None
  122. if frame is not None:
  123. frame_count += 1
  124. if frame_count % 10 == 0:
  125. print(f"[detect] {g} frame count: {frame_count}")
  126. dets = detector.detect(frame)
  127. marked = frame.copy()
  128. if dets:
  129. print(f"[detect] {g} found {len(dets)} objects")
  130. det_dicts = [{
  131. "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
  132. "confidence": d.confidence,
  133. } for d in dets]
  134. for dd in det_dicts:
  135. print(f"[detect] {g} bbox={dd['bbox']} conf={dd['confidence']:.3f}")
  136. uploader.handle_detection("panorama", frame, det_dicts)
  137. for d in det_dicts:
  138. x1, y1, x2, y2 = d["bbox"]
  139. cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
  140. stream.set_marked_frame(marked)
  141. elif stream is None:
  142. err = stream.last_error if stream else "stream not found"
  143. if frame_count == 0:
  144. print(f"[detect] {g} no stream, last_error: {err}")
  145. except Exception as exc:
  146. print(f"[panorama_detect_loop {g}] error: {exc}")
  147. time.sleep(interval)
  148. t_panorama = threading.Thread(target=panorama_detect_loop, daemon=True)
  149. t_panorama.start()
  150. threads.append(t_panorama)
  151. if ptz_url:
  152. def ptz_detect_loop(g=gid, uploader=uploader, detector=detector_service):
  153. interval = 0.5
  154. while not stop_event.is_set():
  155. try:
  156. stream = stream_manager.get(f"{g}_ptz")
  157. frame = stream.get_frame() if stream else None
  158. if frame is not None:
  159. dets = detector.detect(frame)
  160. if dets:
  161. det_dicts = [{
  162. "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
  163. "confidence": d.confidence,
  164. } for d in dets]
  165. for dd in det_dicts:
  166. print(f"[detect] {g} PTZ bbox={dd['bbox']} conf={dd['confidence']:.3f}")
  167. uploader.handle_detection("ptz", frame, det_dicts)
  168. except Exception as exc:
  169. print(f"[ptz_detect_loop {g}] error: {exc}")
  170. time.sleep(interval)
  171. t_ptz = threading.Thread(target=ptz_detect_loop, daemon=True)
  172. t_ptz.start()
  173. threads.append(t_ptz)
  174. except Exception as exc:
  175. print(f"[lifespan] Group {gid} setup failed: {exc}")
  176. continue
  177. yield
  178. stop_event.set()
  179. for t in threads:
  180. try:
  181. t.join(timeout=2.0)
  182. except Exception as exc:
  183. print(f"[lifespan] Thread join error: {exc}")
  184. stream_manager.stop_all()
  185. if pusher and getattr(pusher, "running", False):
  186. try:
  187. pusher.stop()
  188. except Exception as exc:
  189. print(f"[lifespan] Pusher stop error: {exc}")
  190. for w in cleanup_workers:
  191. try:
  192. w.stop()
  193. except Exception as exc:
  194. print(f"[lifespan] cleanup worker stop error: {exc}")
  195. app = FastAPI(lifespan=lifespan)
  196. @app.get("/")
  197. async def root():
  198. return {"status": "running", "service": "panorama detection"}
  199. @app.get("/debug")
  200. async def debug():
  201. sm = app.state.stream_manager
  202. ps = app.state.pusher
  203. ou = app.state.oss_uploader
  204. ds = app.state.detector_service
  205. info = {}
  206. for g in CAMERA_GROUPS:
  207. if not g.get("enabled", True):
  208. continue
  209. gid = g.get("group_id", g.get("id"))
  210. stream = sm.get(f"{gid}_panorama") if sm else None
  211. if stream:
  212. info[gid] = {
  213. "stream_alive": stream.thread is not None and stream.thread.is_alive(),
  214. "has_frame": stream.latest_frame is not None,
  215. "frame_shape": str(stream.latest_frame.shape) if stream.latest_frame is not None else None,
  216. "last_error": stream.last_error,
  217. }
  218. else:
  219. info[gid] = {"error": "stream not registered"}
  220. info["pusher_running"] = ps.running if ps else False
  221. info["detector"] = ds is not None
  222. return info
  223. return app