| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- """FastAPI 应用工厂与全局服务初始化."""
- import os
- import tempfile
- import threading
- import time
- from contextlib import asynccontextmanager
- import cv2
- from fastapi import FastAPI
- from fastapi.staticfiles import StaticFiles
- from fastapi.responses import FileResponse
- from config import CAMERA_GROUPS, SDK_PATH, SYSTEM_CONFIG
- from dahua_sdk import DahuaSDK
- from ptz_camera import PTZCamera
- from third_party_pusher import get_third_party_pusher
- from core.stream_manager import StreamManager
- from core.scan_point_store import ScanPointStore
- from core.spatial_scanner import SpatialScanner
- from core.polling_scheduler import PollingScheduler
- from core.capture_uploader import CaptureUploader
- from core.detector_service import DetectorService
- from core.group_state import group_state
- from web.routes import router
- from web.state import WebState
- import web.state as _web_state_module
- def build_rtsp_url(camera_config: dict) -> str:
- if camera_config.get("rtsp_url"):
- return camera_config["rtsp_url"]
- ip = camera_config["ip"]
- port = camera_config.get("rtsp_port", 554)
- username = camera_config["username"]
- password = camera_config["password"]
- channel = camera_config.get("rtsp_channel") or camera_config.get("channel", 1)
- subtype = camera_config.get("subtype", 0)
- return f"rtsp://{username}:{password}@{ip}:{port}/cam/realmonitor?channel={channel}&subtype={subtype}"
- def create_app(test_mode: bool = False) -> FastAPI:
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # 重置全局状态,允许同一进程内反复创建应用(测试场景)
- group_state.reset()
- stream_manager = StreamManager()
- store_path = os.path.join(tempfile.mkdtemp(), "scan_models.json") if test_mode else "data/scan_models.json"
- scan_store = ScanPointStore(store_path)
- scanners: dict = {}
- schedulers: dict = {}
- ptz_cameras: dict = {}
- threads: list = []
- stop_event = threading.Event()
- pusher = None
- sdk = None
- detector_service = None
- if not test_mode:
- if SYSTEM_CONFIG.get("enable_detection", True):
- detector_service = DetectorService()
- if SYSTEM_CONFIG.get("enable_ptz_camera", True):
- try:
- sdk_path = os.path.join(SDK_PATH["lib_path"], SDK_PATH.get("netsdk", "libdhnetsdk.so"))
- sdk = DahuaSDK(sdk_path)
- sdk.init()
- except Exception as exc:
- print(f"[lifespan] SDK init failed: {exc}")
- sdk = None
- if SYSTEM_CONFIG.get("enable_event_push", False):
- try:
- pusher = get_third_party_pusher()
- if pusher and not pusher.running:
- pusher.start()
- except Exception as exc:
- print(f"[lifespan] Pusher start failed: {exc}")
- pusher = None
- for group in CAMERA_GROUPS:
- if not group.get("enabled", True):
- continue
- gid = group.get("group_id", group.get("id"))
- try:
- ptz_cfg = group["ptz"]
- pano_cfg = group["panorama"]
- pano_url = build_rtsp_url(pano_cfg)
- ptz_url = build_rtsp_url(ptz_cfg)
- group_state.init_group(gid, pano_url, ptz_url, ptz_cfg)
- group_state.update(gid, "ptz_connected", False)
- scan_store.ensure_group(gid, {
- "ptz_name": ptz_cfg.get("name", gid),
- "panorama_name": pano_cfg.get("name", gid),
- })
- if test_mode:
- continue
- # 注册枪机 RTSP 流
- if SYSTEM_CONFIG.get("enable_panorama_camera", True):
- stream_manager.register(f"{gid}_panorama", pano_url)
- ptz = None
- if SYSTEM_CONFIG.get("enable_ptz_camera", True) and sdk is not None:
- # 连接球机
- ptz = PTZCamera(sdk, ptz_cfg)
- try:
- ptz_connected = ptz.connect()
- except Exception as exc:
- print(f"[lifespan] PTZ connect raised for group {gid}: {exc}")
- ptz_connected = False
- if ptz_connected:
- ptz_cameras[gid] = ptz
- group_state.update(gid, "ptz_connected", True)
- # 注册球机 RTSP 流
- stream_manager.register(f"{gid}_ptz", ptz_url)
- scanners[gid] = SpatialScanner(
- gid, ptz, lambda g=gid: stream_manager.get(f"{g}_ptz").get_frame()
- )
- def on_arrived(point, g=gid, ptz=ptz):
- group_state.update(g, "ptz_position", {
- "pan": point["pan"], "tilt": point["tilt"], "zoom": point.get("zoom", 1)
- })
- schedulers[gid] = PollingScheduler(
- gid, ptz,
- get_points=lambda g=gid: scan_store.list_enabled_points(g),
- on_arrived=on_arrived,
- default_dwell=3.0,
- )
- else:
- print(f"[lifespan] PTZ connect failed for group {gid}; skipping PTZ features")
- # 创建上传器(即使球机关闭,枪机检测仍可能使用)
- uploader = CaptureUploader(gid, upload_callback=pusher.report_batch if pusher else None)
- if SYSTEM_CONFIG.get("enable_detection", True) and detector_service is not None:
- def panorama_detect_loop(g=gid, uploader=uploader, detector=detector_service):
- interval = 0.5
- while not stop_event.is_set():
- try:
- stream = stream_manager.get(f"{g}_panorama")
- frame = stream.get_frame() if stream else None
- if frame is not None:
- dets = detector.detect(frame)
- marked = frame.copy()
- if dets:
- det_dicts = [{
- "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
- "confidence": d.confidence,
- } for d in dets]
- uploader.handle_detection("panorama", frame, det_dicts)
- for d in det_dicts:
- x1, y1, x2, y2 = d["bbox"]
- cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
- stream.set_marked_frame(marked)
- except Exception as exc:
- print(f"[panorama_detect_loop {g}] error: {exc}")
- time.sleep(interval)
- t_panorama = threading.Thread(target=panorama_detect_loop, daemon=True)
- t_panorama.start()
- threads.append(t_panorama)
- if ptz is not None:
- def ptz_detect_loop(g=gid, uploader=uploader, detector=detector_service):
- interval = 0.2
- while not stop_event.is_set():
- try:
- stream = stream_manager.get(f"{g}_ptz")
- frame = stream.get_frame() if stream else None
- if frame is not None:
- dets = detector.detect(frame)
- marked = frame.copy()
- if dets:
- det_dicts = [{
- "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
- "confidence": d.confidence,
- } for d in dets]
- pos = group_state.get(g).get("ptz_position")
- uploader.handle_detection("ptz", frame, det_dicts, pos)
- for d in det_dicts:
- x1, y1, x2, y2 = d["bbox"]
- cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
- stream.set_marked_frame(marked)
- except Exception as exc:
- print(f"[ptz_detect_loop {g}] error: {exc}")
- time.sleep(interval)
- t_ptz = threading.Thread(target=ptz_detect_loop, daemon=True)
- t_ptz.start()
- threads.append(t_ptz)
- except Exception as exc:
- print(f"[lifespan] Group {gid} setup failed: {exc}")
- continue
- _web_state_module.web_state = WebState(
- group_state, stream_manager, scan_store, scanners, schedulers, ptz_cameras
- )
- yield
- # 清理
- stop_event.set()
- for t in threads:
- try:
- t.join(timeout=2.0)
- except Exception as exc:
- print(f"[lifespan] Thread join error: {exc}")
- for s in schedulers.values():
- try:
- s.stop()
- except Exception as exc:
- print(f"[lifespan] Scheduler stop error: {exc}")
- for gid, ptz in ptz_cameras.items():
- try:
- ptz.disconnect()
- stream = stream_manager.get(f"{gid}_ptz")
- if stream:
- stream.stop()
- except Exception as exc:
- print(f"[lifespan] PTZ cleanup error for {gid}: {exc}")
- stream_manager.stop_all()
- if pusher and getattr(pusher, "running", False):
- try:
- pusher.stop()
- except Exception as exc:
- print(f"[lifespan] Pusher stop error: {exc}")
- if sdk:
- try:
- sdk.cleanup()
- except Exception as exc:
- print(f"[lifespan] SDK cleanup error: {exc}")
- if test_mode:
- # 在测试模式下预先初始化共享状态(不依赖 lifespan,兼容非上下文管理器的 TestClient)
- group_state.reset()
- test_stream_manager = StreamManager()
- test_store_path = os.path.join(tempfile.mkdtemp(), "scan_models.json")
- test_scan_store = ScanPointStore(test_store_path)
- for group in CAMERA_GROUPS:
- if not group.get("enabled", True):
- continue
- gid = group.get("group_id", group.get("id"))
- try:
- ptz_cfg = group["ptz"]
- pano_cfg = group["panorama"]
- pano_url = build_rtsp_url(pano_cfg)
- ptz_url = build_rtsp_url(ptz_cfg)
- group_state.init_group(gid, pano_url, ptz_url, ptz_cfg)
- group_state.update(gid, "ptz_connected", False)
- test_scan_store.ensure_group(gid, {
- "ptz_name": ptz_cfg.get("name", gid),
- "panorama_name": pano_cfg.get("name", gid),
- })
- except Exception as exc:
- print(f"[create_app] Test group {gid} setup failed: {exc}")
- _web_state_module.web_state = WebState(
- group_state, test_stream_manager, test_scan_store, {}, {}, {}
- )
- app = FastAPI(lifespan=lifespan)
- static_dir = os.path.join(os.path.dirname(__file__), "web_static")
- if os.path.isdir(static_dir):
- app.mount("/static", StaticFiles(directory=static_dir), name="static")
- app.include_router(router)
- @app.get("/")
- async def root():
- index_path = os.path.join(static_dir, "index.html")
- if os.path.isfile(index_path):
- return FileResponse(index_path)
- return {"message": "PTZ 360 scan + panorama polling system"}
- return app
|