test_stream_manager.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import sys
  2. import os
  3. import time
  4. import logging
  5. from unittest.mock import MagicMock, patch
  6. import pytest
  7. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  8. import cv2
  9. import numpy as np
  10. from core.stream_manager import encode_mjpeg_frame, StreamManager, CameraStream, generate_mjpeg_stream
  11. def test_encode_mjpeg_frame():
  12. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  13. data = encode_mjpeg_frame(frame)
  14. assert data[:2] == b"\xff\xd8"
  15. def test_stream_manager_register_and_get():
  16. manager = StreamManager()
  17. fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
  18. try:
  19. with patch("core.stream_manager.cv2.VideoCapture") as mock_vc:
  20. mock_cap = MagicMock()
  21. mock_cap.isOpened.return_value = True
  22. mock_cap.read.return_value = (True, fake_frame)
  23. mock_cap.set.return_value = True
  24. mock_vc.return_value = mock_cap
  25. stream = manager.register("test", "rtsp://dummy", reconnect_delay=0.01)
  26. assert stream is not None
  27. assert manager.get("test") is stream
  28. finally:
  29. manager.stop_all()
  30. def test_generate_mjpeg_stream_includes_content_length():
  31. frame = np.zeros((100, 100, 3), dtype=np.uint8)
  32. def get_frame():
  33. return frame
  34. chunks = []
  35. for chunk in generate_mjpeg_stream(get_frame, fps=30):
  36. chunks.append(chunk)
  37. if len(chunks) >= 3:
  38. break
  39. for chunk in chunks:
  40. assert b"Content-Length:" in chunk
  41. assert b"Content-Type: image/jpeg" in chunk
  42. def test_stream_manager_stop_all_does_not_crash():
  43. manager = StreamManager()
  44. fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
  45. try:
  46. with patch("core.stream_manager.cv2.VideoCapture") as mock_vc:
  47. mock_cap = MagicMock()
  48. mock_cap.isOpened.return_value = True
  49. mock_cap.read.return_value = (True, fake_frame)
  50. mock_cap.set.return_value = True
  51. mock_vc.return_value = mock_cap
  52. manager.register("a", "rtsp://dummy_a", reconnect_delay=0.01)
  53. manager.register("b", "rtsp://dummy_b", reconnect_delay=0.01)
  54. # Should stop all streams cleanly even when none have connected.
  55. manager.stop_all()
  56. finally:
  57. manager.stop_all()
  58. assert manager.get("a") is None
  59. assert manager.get("b") is None
  60. def test_camera_stream_last_error_initially_none():
  61. stream = CameraStream("test", "rtsp://dummy")
  62. assert stream.last_error is None
  63. def test_camera_stream_sets_timeout_properties_when_available():
  64. if not hasattr(cv2, "CAP_PROP_OPEN_TIMEOUT_MSEC") or not hasattr(cv2, "CAP_PROP_READ_TIMEOUT_MSEC"):
  65. pytest.skip("OpenCV timeout properties not available on this build")
  66. fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
  67. mock_cap = MagicMock()
  68. mock_cap.isOpened.return_value = True
  69. mock_cap.read.side_effect = [(True, fake_frame), (True, fake_frame), (False, None)]
  70. mock_cap.set.return_value = True
  71. stream = CameraStream("test", "rtsp://dummy", reconnect_delay=0.01)
  72. try:
  73. with patch("core.stream_manager.cv2.VideoCapture", return_value=mock_cap):
  74. stream.start()
  75. # Give the worker time to create a capture and set properties.
  76. time.sleep(0.1)
  77. finally:
  78. stream.stop()
  79. mock_cap.set.assert_any_call(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000)
  80. mock_cap.set.assert_any_call(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000)
  81. def test_camera_stream_logs_timeout_set_failure(caplog):
  82. if not hasattr(cv2, "CAP_PROP_OPEN_TIMEOUT_MSEC") or not hasattr(cv2, "CAP_PROP_READ_TIMEOUT_MSEC"):
  83. pytest.skip("OpenCV timeout properties not available on this build")
  84. fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
  85. mock_cap = MagicMock()
  86. mock_cap.isOpened.return_value = True
  87. mock_cap.read.side_effect = [(True, fake_frame), (False, None)]
  88. mock_cap.set.return_value = False
  89. stream = CameraStream("test", "rtsp://dummy", reconnect_delay=0.01)
  90. with caplog.at_level(logging.WARNING, logger="core.stream_manager"):
  91. try:
  92. with patch("core.stream_manager.cv2.VideoCapture", return_value=mock_cap):
  93. stream.start()
  94. time.sleep(0.1)
  95. finally:
  96. stream.stop()
  97. assert any("CAP_PROP_OPEN_TIMEOUT_MSEC" in rec.message for rec in caplog.records)
  98. assert any("CAP_PROP_READ_TIMEOUT_MSEC" in rec.message for rec in caplog.records)