test_spatial_scanner.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. import sys
  2. import os
  3. import tempfile
  4. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  5. import numpy as np
  6. import pytest
  7. from core.spatial_scanner import SpatialScanner
  8. from core.coord_utils import compute_sample_grid
  9. class FakePTZ:
  10. def __init__(self):
  11. self.positions = []
  12. def goto_exact_position(self, pan, tilt, zoom):
  13. self.positions.append((pan, tilt, zoom))
  14. def test_spatial_scanner_grid(tmp_path):
  15. ptz = FakePTZ()
  16. counter = {"n": 0}
  17. def frame_source():
  18. counter["n"] += 1
  19. return np.zeros((120, 160, 3), dtype=np.uint8)
  20. scanner = SpatialScanner("g1", ptz, frame_source, str(tmp_path), stabilize_time=0.0)
  21. result = scanner.run(pan_range=(0, 90), tilt_layers=(-10, 0), pan_step=30, zoom=1)
  22. assert len(result["samples"]) == 6
  23. assert result["panorama_path"] is not None
  24. assert len(ptz.positions) == 6
  25. def test_invalid_pan_step_raises_value_error(tmp_path):
  26. ptz = FakePTZ()
  27. scanner = SpatialScanner("g1", ptz, lambda: None, str(tmp_path), stabilize_time=0.0)
  28. with pytest.raises(ValueError, match="pan_step must be positive"):
  29. scanner.run(pan_range=(0, 90), tilt_layers=(-10, 0), pan_step=0)
  30. def test_compute_sample_grid_validation():
  31. with pytest.raises(ValueError, match="pan_step must be positive"):
  32. compute_sample_grid(pan_step=0)
  33. with pytest.raises(ValueError, match="tilt_layers must not be empty"):
  34. compute_sample_grid(tilt_layers=())
  35. with pytest.raises(ValueError, match="pan_range start must be less than end"):
  36. compute_sample_grid(pan_range=(180.0, 180.0))
  37. def test_cancellation_stops_early(tmp_path):
  38. ptz = FakePTZ()
  39. counter = {"n": 0}
  40. def frame_source():
  41. counter["n"] += 1
  42. if counter["n"] == 2:
  43. scanner.cancel()
  44. return np.zeros((120, 160, 3), dtype=np.uint8)
  45. scanner = SpatialScanner("g1", ptz, frame_source, str(tmp_path), stabilize_time=0.0)
  46. result = scanner.run(pan_range=(0, 60), tilt_layers=(-10, 0), pan_step=30, zoom=1)
  47. assert len(result["samples"]) < 4
  48. assert scanner.progress["state"] == "cancelled"
  49. def test_empty_sample_list_returns_no_panorama(tmp_path):
  50. ptz = FakePTZ()
  51. scanner = SpatialScanner("g1", ptz, lambda: None, str(tmp_path), stabilize_time=0.0)
  52. scanner._wait_frame = lambda timeout: None
  53. result = scanner.run(pan_range=(0, 60), tilt_layers=(-10, 0), pan_step=30, zoom=1)
  54. assert result["samples"] == []
  55. assert result["panorama_path"] is None
  56. assert scanner.progress["current"] == 0
  57. def test_progress_callback_invoked(tmp_path):
  58. ptz = FakePTZ()
  59. progress_snapshots = []
  60. def frame_source():
  61. return np.zeros((120, 160, 3), dtype=np.uint8)
  62. def progress_callback(progress):
  63. progress_snapshots.append(dict(progress))
  64. scanner = SpatialScanner("g1", ptz, frame_source, str(tmp_path), stabilize_time=0.0)
  65. result = scanner.run(
  66. pan_range=(0, 60),
  67. tilt_layers=(-10, 0),
  68. pan_step=30,
  69. zoom=1,
  70. progress_callback=progress_callback,
  71. )
  72. expected_samples = len(result["samples"])
  73. assert expected_samples > 0
  74. assert len(progress_snapshots) == expected_samples
  75. assert progress_snapshots[-1]["current"] == expected_samples
  76. assert progress_snapshots[-1]["state"] == "scanning"
  77. def test_prerun_cancellation_returns_empty_result(tmp_path):
  78. ptz = FakePTZ()
  79. scanner = SpatialScanner("g1", ptz, lambda: None, str(tmp_path), stabilize_time=0.0)
  80. scanner.cancel()
  81. result = scanner.run(pan_range=(0, 60), tilt_layers=(-10, 0), pan_step=30, zoom=1)
  82. assert result["samples"] == []
  83. assert result["panorama_path"] is None
  84. assert scanner.progress["state"] == "cancelled"