| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import sys
- import os
- import threading
- import time
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import pytest
- from core.group_state import GroupState
- def test_init_and_get():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- state = gs.get("g1")
- assert state["panorama_rtsp"] == "rtsp://pano"
- assert state["polling_state"] == "idle"
- def test_update_and_log():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- gs.update("g1", "polling_state", "polling")
- gs.append_log("g1", "started")
- state = gs.get("g1")
- assert state["polling_state"] == "polling"
- assert state["logs"] == ["started"]
- def test_get_returns_deep_copy():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- state1 = gs.get("g1")
- state1["ptz_position"]["pan"] = 45.0
- state1["logs"].append("mutated")
- state1["polling_state"] = "polling"
- state2 = gs.get("g1")
- assert state2["ptz_position"]["pan"] == 0.0
- assert state2["logs"] == []
- assert state2["polling_state"] == "idle"
- def test_init_group_raises_on_duplicate():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- with pytest.raises(ValueError, match="Group g1 already initialized"):
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- def test_update_nested():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- gs.update_nested("g1", "ptz_position", "pan", 30.0)
- gs.update_nested("g1", "scan_progress", "current", 5)
- state = gs.get("g1")
- assert state["ptz_position"]["pan"] == 30.0
- assert state["scan_progress"]["current"] == 5
- def test_update_nested_unknown_group():
- gs = GroupState()
- gs.reset()
- with pytest.raises(KeyError, match="Group unknown not initialized"):
- gs.update_nested("unknown", "ptz_position", "pan", 10.0)
- def test_update_nested_non_dict_key():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- with pytest.raises(KeyError, match="Key polling_state is not a dict"):
- gs.update_nested("g1", "polling_state", "x", "y")
- def test_append_log_caps_at_200():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- for i in range(250):
- gs.append_log("g1", f"log-{i}")
- state = gs.get("g1")
- assert len(state["logs"]) == 200
- assert state["logs"][0] == "log-50"
- assert state["logs"][-1] == "log-249"
- def test_unknown_group_warns_on_update_and_append_log(caplog):
- import logging
- gs = GroupState()
- gs.reset()
- with caplog.at_level(logging.WARNING):
- gs.update("unknown", "polling_state", "polling")
- gs.append_log("unknown", "message")
- assert "Cannot update unknown group: unknown" in caplog.text
- assert "Cannot append log to unknown group: unknown" in caplog.text
- def test_compare_and_update():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- assert gs.compare_and_update("g1", "polling_state", "idle", "scanning") is True
- assert gs.get("g1")["polling_state"] == "scanning"
- assert gs.compare_and_update("g1", "polling_state", "idle", "polling") is False
- assert gs.get("g1")["polling_state"] == "scanning"
- assert gs.compare_and_update("unknown", "polling_state", "idle", "scanning") is False
- def test_thread_safety():
- gs = GroupState()
- gs.reset()
- gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
- errors = []
- def worker(idx):
- try:
- for i in range(100):
- gs.update("g1", "last_detection", i)
- gs.append_log("g1", f"worker-{idx}-{i}")
- _ = gs.get("g1")
- time.sleep(0.0001)
- except Exception as e:
- errors.append(e)
- threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- assert not errors
- state = gs.get("g1")
- assert len(state["logs"]) == 200
- assert state["last_detection"] is not None
|