test_group_state.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import sys
  2. import os
  3. import threading
  4. import time
  5. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  6. import pytest
  7. from core.group_state import GroupState
  8. def test_init_and_get():
  9. gs = GroupState()
  10. gs.reset()
  11. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  12. state = gs.get("g1")
  13. assert state["panorama_rtsp"] == "rtsp://pano"
  14. assert state["polling_state"] == "idle"
  15. def test_update_and_log():
  16. gs = GroupState()
  17. gs.reset()
  18. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  19. gs.update("g1", "polling_state", "polling")
  20. gs.append_log("g1", "started")
  21. state = gs.get("g1")
  22. assert state["polling_state"] == "polling"
  23. assert state["logs"] == ["started"]
  24. def test_get_returns_deep_copy():
  25. gs = GroupState()
  26. gs.reset()
  27. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  28. state1 = gs.get("g1")
  29. state1["ptz_position"]["pan"] = 45.0
  30. state1["logs"].append("mutated")
  31. state1["polling_state"] = "polling"
  32. state2 = gs.get("g1")
  33. assert state2["ptz_position"]["pan"] == 0.0
  34. assert state2["logs"] == []
  35. assert state2["polling_state"] == "idle"
  36. def test_init_group_raises_on_duplicate():
  37. gs = GroupState()
  38. gs.reset()
  39. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  40. with pytest.raises(ValueError, match="Group g1 already initialized"):
  41. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  42. def test_update_nested():
  43. gs = GroupState()
  44. gs.reset()
  45. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  46. gs.update_nested("g1", "ptz_position", "pan", 30.0)
  47. gs.update_nested("g1", "scan_progress", "current", 5)
  48. state = gs.get("g1")
  49. assert state["ptz_position"]["pan"] == 30.0
  50. assert state["scan_progress"]["current"] == 5
  51. def test_update_nested_unknown_group():
  52. gs = GroupState()
  53. gs.reset()
  54. with pytest.raises(KeyError, match="Group unknown not initialized"):
  55. gs.update_nested("unknown", "ptz_position", "pan", 10.0)
  56. def test_update_nested_non_dict_key():
  57. gs = GroupState()
  58. gs.reset()
  59. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  60. with pytest.raises(KeyError, match="Key polling_state is not a dict"):
  61. gs.update_nested("g1", "polling_state", "x", "y")
  62. def test_append_log_caps_at_200():
  63. gs = GroupState()
  64. gs.reset()
  65. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  66. for i in range(250):
  67. gs.append_log("g1", f"log-{i}")
  68. state = gs.get("g1")
  69. assert len(state["logs"]) == 200
  70. assert state["logs"][0] == "log-50"
  71. assert state["logs"][-1] == "log-249"
  72. def test_unknown_group_warns_on_update_and_append_log(caplog):
  73. import logging
  74. gs = GroupState()
  75. gs.reset()
  76. with caplog.at_level(logging.WARNING):
  77. gs.update("unknown", "polling_state", "polling")
  78. gs.append_log("unknown", "message")
  79. assert "Cannot update unknown group: unknown" in caplog.text
  80. assert "Cannot append log to unknown group: unknown" in caplog.text
  81. def test_compare_and_update():
  82. gs = GroupState()
  83. gs.reset()
  84. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  85. assert gs.compare_and_update("g1", "polling_state", "idle", "scanning") is True
  86. assert gs.get("g1")["polling_state"] == "scanning"
  87. assert gs.compare_and_update("g1", "polling_state", "idle", "polling") is False
  88. assert gs.get("g1")["polling_state"] == "scanning"
  89. assert gs.compare_and_update("unknown", "polling_state", "idle", "scanning") is False
  90. def test_thread_safety():
  91. gs = GroupState()
  92. gs.reset()
  93. gs.init_group("g1", "rtsp://pano", "rtsp://ptz", {})
  94. errors = []
  95. def worker(idx):
  96. try:
  97. for i in range(100):
  98. gs.update("g1", "last_detection", i)
  99. gs.append_log("g1", f"worker-{idx}-{i}")
  100. _ = gs.get("g1")
  101. time.sleep(0.0001)
  102. except Exception as e:
  103. errors.append(e)
  104. threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
  105. for t in threads:
  106. t.start()
  107. for t in threads:
  108. t.join()
  109. assert not errors
  110. state = gs.get("g1")
  111. assert len(state["logs"]) == 200
  112. assert state["last_detection"] is not None