| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import sys
- import os
- import time
- import logging
- from unittest.mock import MagicMock, patch
- import pytest
- sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import cv2
- import numpy as np
- from core.stream_manager import encode_mjpeg_frame, StreamManager, CameraStream, generate_mjpeg_stream
- def test_encode_mjpeg_frame():
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- data = encode_mjpeg_frame(frame)
- assert data[:2] == b"\xff\xd8"
- def test_stream_manager_register_and_get():
- manager = StreamManager()
- fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
- try:
- with patch("core.stream_manager.cv2.VideoCapture") as mock_vc:
- mock_cap = MagicMock()
- mock_cap.isOpened.return_value = True
- mock_cap.read.return_value = (True, fake_frame)
- mock_cap.set.return_value = True
- mock_vc.return_value = mock_cap
- stream = manager.register("test", "rtsp://dummy", reconnect_delay=0.01)
- assert stream is not None
- assert manager.get("test") is stream
- finally:
- manager.stop_all()
- def test_generate_mjpeg_stream_includes_content_length():
- frame = np.zeros((100, 100, 3), dtype=np.uint8)
- def get_frame():
- return frame
- chunks = []
- for chunk in generate_mjpeg_stream(get_frame, fps=30):
- chunks.append(chunk)
- if len(chunks) >= 3:
- break
- for chunk in chunks:
- assert b"Content-Length:" in chunk
- assert b"Content-Type: image/jpeg" in chunk
- def test_stream_manager_stop_all_does_not_crash():
- manager = StreamManager()
- fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
- try:
- with patch("core.stream_manager.cv2.VideoCapture") as mock_vc:
- mock_cap = MagicMock()
- mock_cap.isOpened.return_value = True
- mock_cap.read.return_value = (True, fake_frame)
- mock_cap.set.return_value = True
- mock_vc.return_value = mock_cap
- manager.register("a", "rtsp://dummy_a", reconnect_delay=0.01)
- manager.register("b", "rtsp://dummy_b", reconnect_delay=0.01)
- # Should stop all streams cleanly even when none have connected.
- manager.stop_all()
- finally:
- manager.stop_all()
- assert manager.get("a") is None
- assert manager.get("b") is None
- def test_camera_stream_last_error_initially_none():
- stream = CameraStream("test", "rtsp://dummy")
- assert stream.last_error is None
- def test_camera_stream_sets_timeout_properties_when_available():
- if not hasattr(cv2, "CAP_PROP_OPEN_TIMEOUT_MSEC") or not hasattr(cv2, "CAP_PROP_READ_TIMEOUT_MSEC"):
- pytest.skip("OpenCV timeout properties not available on this build")
- fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
- mock_cap = MagicMock()
- mock_cap.isOpened.return_value = True
- mock_cap.read.side_effect = [(True, fake_frame), (True, fake_frame), (False, None)]
- mock_cap.set.return_value = True
- stream = CameraStream("test", "rtsp://dummy", reconnect_delay=0.01)
- try:
- with patch("core.stream_manager.cv2.VideoCapture", return_value=mock_cap):
- stream.start()
- # Give the worker time to create a capture and set properties.
- time.sleep(0.1)
- finally:
- stream.stop()
- mock_cap.set.assert_any_call(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 10000)
- mock_cap.set.assert_any_call(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 10000)
- def test_camera_stream_logs_timeout_set_failure(caplog):
- if not hasattr(cv2, "CAP_PROP_OPEN_TIMEOUT_MSEC") or not hasattr(cv2, "CAP_PROP_READ_TIMEOUT_MSEC"):
- pytest.skip("OpenCV timeout properties not available on this build")
- fake_frame = np.zeros((100, 100, 3), dtype=np.uint8)
- mock_cap = MagicMock()
- mock_cap.isOpened.return_value = True
- mock_cap.read.side_effect = [(True, fake_frame), (False, None)]
- mock_cap.set.return_value = False
- stream = CameraStream("test", "rtsp://dummy", reconnect_delay=0.01)
- with caplog.at_level(logging.WARNING, logger="core.stream_manager"):
- try:
- with patch("core.stream_manager.cv2.VideoCapture", return_value=mock_cap):
- stream.start()
- time.sleep(0.1)
- finally:
- stream.stop()
- assert any("CAP_PROP_OPEN_TIMEOUT_MSEC" in rec.message for rec in caplog.records)
- assert any("CAP_PROP_READ_TIMEOUT_MSEC" in rec.message for rec in caplog.records)
|