| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """FastAPI 应用工厂与全局服务初始化."""
- import logging
- import os
- import threading
- import time
- from contextlib import asynccontextmanager
- import cv2
- from fastapi import FastAPI
- from config import CAMERA_GROUPS, SYSTEM_CONFIG, STORAGE_CONFIG
- from third_party_pusher import get_third_party_pusher
- from core.stream_manager import StreamManager
- from core.capture_uploader import CaptureUploader
- from core.detector_service import DetectorService
- from core.group_state import group_state
- from core.oss_uploader import OSSUploader
- from core.file_cleanup import make_cleanup_workers
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- force=True,
- )
- logger = logging.getLogger(__name__)
- def build_rtsp_url(camera_config: dict) -> str:
- if camera_config.get("rtsp_url"):
- return camera_config["rtsp_url"]
- ip = camera_config["ip"]
- port = camera_config.get("rtsp_port", 554)
- username = camera_config["username"]
- password = camera_config["password"]
- channel = camera_config.get("rtsp_channel") or camera_config.get("channel", 1)
- subtype = camera_config.get("subtype", 0)
- return f"rtsp://{username}:{password}@{ip}:{port}/cam/realmonitor?channel={channel}&subtype={subtype}"
- def create_app(test_mode: bool = False) -> FastAPI:
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- group_state.reset()
- stream_manager = StreamManager()
- threads: list = []
- stop_event = threading.Event()
- pusher = None
- detector_service = None
- oss_uploader = None
- cleanup_workers: list = []
- app.state.stream_manager = stream_manager
- app.state.pusher = None
- app.state.oss_uploader = None
- app.state.detector_service = None
- if not test_mode:
- if SYSTEM_CONFIG.get("enable_detection", True):
- detector_service = DetectorService()
- app.state.detector_service = detector_service
- # 始终启用第三方推送(只要在配置中 enabled=True)
- try:
- oss_uploader = OSSUploader()
- except Exception as exc:
- print(f"[lifespan] OSS uploader init failed: {exc}")
- oss_uploader = None
- try:
- pusher = get_third_party_pusher()
- if pusher and not pusher.running:
- pusher.start()
- app.state.pusher = pusher
- except Exception as exc:
- print(f"[lifespan] Pusher start failed: {exc}")
- pusher = None
- app.state.pusher = None
- if pusher is not None:
- def _delete_captures_on_success(report):
- if STORAGE_CONFIG.get("captures", {}).get("keep_local_copy", False):
- return
- for path in report.batch_info.get("image_paths") or []:
- try:
- if path and os.path.exists(path):
- os.remove(path)
- logger.info("[cleanup] 上报成功后删除本地图片: %s", path)
- except Exception as exc:
- logger.warning("[cleanup] 删除本地图片失败: %s, %s", path, exc)
- try:
- pusher.set_callbacks(on_success=_delete_captures_on_success)
- except Exception as exc:
- print(f"[lifespan] set pusher success callback failed: {exc}")
- try:
- group_ids = [
- g.get("group_id", g.get("id"))
- for g in CAMERA_GROUPS
- if g.get("enabled", True)
- ]
- cleanup_workers = make_cleanup_workers(STORAGE_CONFIG, group_ids)
- for w in cleanup_workers:
- w.start()
- except Exception as exc:
- print(f"[lifespan] cleanup worker start failed: {exc}")
- for group in CAMERA_GROUPS:
- if not group.get("enabled", True):
- continue
- gid = group.get("group_id", group.get("id"))
- try:
- pano_cfg = group["panorama"]
- pano_url = build_rtsp_url(pano_cfg)
- ptz_cfg = group.get("ptz")
- ptz_url = build_rtsp_url(ptz_cfg) if ptz_cfg else None
- group_state.init_group(gid, pano_url, ptz_url, None)
- if test_mode:
- continue
- if SYSTEM_CONFIG.get("enable_panorama_camera", True):
- stream_manager.register(f"{gid}_panorama", pano_url)
- if ptz_url:
- stream_manager.register(f"{gid}_ptz", ptz_url)
- uploader = CaptureUploader(
- gid,
- upload_callback=pusher.report_batch if pusher else None,
- oss_uploader=oss_uploader,
- )
- if SYSTEM_CONFIG.get("enable_detection", True) and detector_service is not None:
- def panorama_detect_loop(g=gid, uploader=uploader, detector=detector_service):
- interval = 0.5
- frame_count = 0
- while not stop_event.is_set():
- try:
- stream = stream_manager.get(f"{g}_panorama")
- frame = stream.get_frame() if stream else None
- if frame is not None:
- frame_count += 1
- if frame_count % 10 == 0:
- print(f"[detect] {g} frame count: {frame_count}")
- dets = detector.detect(frame)
- marked = frame.copy()
- if dets:
- print(f"[detect] {g} found {len(dets)} objects")
- det_dicts = [{
- "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
- "confidence": d.confidence,
- } for d in dets]
- for dd in det_dicts:
- print(f"[detect] {g} bbox={dd['bbox']} conf={dd['confidence']:.3f}")
- uploader.handle_detection("panorama", frame, det_dicts)
- for d in det_dicts:
- x1, y1, x2, y2 = d["bbox"]
- cv2.rectangle(marked, (x1, y1), (x2, y2), (0, 255, 0), 2)
- stream.set_marked_frame(marked)
- elif stream is None:
- err = stream.last_error if stream else "stream not found"
- if frame_count == 0:
- print(f"[detect] {g} no stream, last_error: {err}")
- except Exception as exc:
- print(f"[panorama_detect_loop {g}] error: {exc}")
- time.sleep(interval)
- t_panorama = threading.Thread(target=panorama_detect_loop, daemon=True)
- t_panorama.start()
- threads.append(t_panorama)
- if ptz_url:
- def ptz_detect_loop(g=gid, uploader=uploader, detector=detector_service):
- interval = 0.5
- while not stop_event.is_set():
- try:
- stream = stream_manager.get(f"{g}_ptz")
- frame = stream.get_frame() if stream else None
- if frame is not None:
- dets = detector.detect(frame)
- if dets:
- det_dicts = [{
- "bbox": [d.bbox[0], d.bbox[1], d.bbox[0]+d.bbox[2], d.bbox[1]+d.bbox[3]],
- "confidence": d.confidence,
- } for d in dets]
- for dd in det_dicts:
- print(f"[detect] {g} PTZ bbox={dd['bbox']} conf={dd['confidence']:.3f}")
- uploader.handle_detection("ptz", frame, det_dicts)
- except Exception as exc:
- print(f"[ptz_detect_loop {g}] error: {exc}")
- time.sleep(interval)
- t_ptz = threading.Thread(target=ptz_detect_loop, daemon=True)
- t_ptz.start()
- threads.append(t_ptz)
- except Exception as exc:
- print(f"[lifespan] Group {gid} setup failed: {exc}")
- continue
- yield
- stop_event.set()
- for t in threads:
- try:
- t.join(timeout=2.0)
- except Exception as exc:
- print(f"[lifespan] Thread join error: {exc}")
- stream_manager.stop_all()
- if pusher and getattr(pusher, "running", False):
- try:
- pusher.stop()
- except Exception as exc:
- print(f"[lifespan] Pusher stop error: {exc}")
- for w in cleanup_workers:
- try:
- w.stop()
- except Exception as exc:
- print(f"[lifespan] cleanup worker stop error: {exc}")
- app = FastAPI(lifespan=lifespan)
- @app.get("/")
- async def root():
- return {"status": "running", "service": "panorama detection"}
- @app.get("/debug")
- async def debug():
- sm = app.state.stream_manager
- ps = app.state.pusher
- ou = app.state.oss_uploader
- ds = app.state.detector_service
- info = {}
- for g in CAMERA_GROUPS:
- if not g.get("enabled", True):
- continue
- gid = g.get("group_id", g.get("id"))
- stream = sm.get(f"{gid}_panorama") if sm else None
- if stream:
- info[gid] = {
- "stream_alive": stream.thread is not None and stream.thread.is_alive(),
- "has_frame": stream.latest_frame is not None,
- "frame_shape": str(stream.latest_frame.shape) if stream.latest_frame is not None else None,
- "last_error": stream.last_error,
- }
- else:
- info[gid] = {"error": "stream not registered"}
- info["pusher_running"] = ps.running if ps else False
- info["detector"] = ds is not None
- return info
- return app
|