scheduler.h 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. #ifndef JTJAI_MEDIA_SCHEDULER_H
  2. #define JTJAI_MEDIA_SCHEDULER_H
  3. #include "config.h"
  4. #include "concurrent_calculator.h"
  5. #include "rtsp_client.h"
  6. #include <vector>
  7. #include <memory>
  8. #include <thread>
  9. #include <mutex>
  10. #include <condition_variable>
  11. #include <atomic>
  12. #include <queue>
  13. #include <functional>
  14. namespace jtjai_media {
  15. struct TaskInfo {
  16. int stream_index;
  17. int start_time; // 相对于调度开始的秒数
  18. int duration;
  19. std::shared_ptr<RTSPClient> client;
  20. std::chrono::system_clock::time_point scheduled_start_time;
  21. std::atomic<bool> started; // 标记任务是否已经启动
  22. std::atomic<bool> completed; // 标记任务是否已经完成
  23. TaskInfo(int idx, int start, int dur, std::shared_ptr<RTSPClient> c)
  24. : stream_index(idx), start_time(start), duration(dur), client(c), started(false), completed(false) {}
  25. // 复制构造函数
  26. TaskInfo(const TaskInfo& other)
  27. : stream_index(other.stream_index)
  28. , start_time(other.start_time)
  29. , duration(other.duration)
  30. , client(other.client)
  31. , scheduled_start_time(other.scheduled_start_time)
  32. , started(other.started.load())
  33. , completed(other.completed.load()) {}
  34. };
  35. class StreamScheduler {
  36. public:
  37. StreamScheduler(const ConfigManager& config_mgr);
  38. ~StreamScheduler();
  39. // 禁用拷贝构造和赋值
  40. StreamScheduler(const StreamScheduler&) = delete;
  41. StreamScheduler& operator=(const StreamScheduler&) = delete;
  42. // 开始调度执行
  43. bool start_execution();
  44. // 重置调度器状态,准备下一轮执行
  45. bool reset_for_next_cycle();
  46. // 停止调度执行
  47. void stop_execution();
  48. // 等待所有任务完成
  49. void wait_for_completion();
  50. // 获取当前运行状态
  51. bool is_running() const { return is_running_.load(); }
  52. // 获取所有客户端的统计信息
  53. std::vector<RTSPClientStats> get_all_stats() const;
  54. // 获取调度统计信息
  55. struct SchedulerStats {
  56. int total_tasks;
  57. int completed_tasks;
  58. int failed_tasks;
  59. int cancelled_tasks;
  60. std::chrono::system_clock::time_point start_time;
  61. std::chrono::system_clock::time_point end_time;
  62. int max_concurrent_used;
  63. double completion_rate;
  64. };
  65. SchedulerStats get_scheduler_stats() const;
  66. // 设置进度回调
  67. void set_progress_callback(std::function<void(const SchedulerStats&)> callback) {
  68. progress_callback_ = callback;
  69. }
  70. // 设置报告回调(主循环结束时自动调用)
  71. void set_report_callback(std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> callback) {
  72. report_callback_ = callback;
  73. }
  74. // 获取调度方案信息
  75. std::string get_schedule_info() const;
  76. private:
  77. ConfigManager config_mgr_;
  78. ConcurrentCalculator calculator_;
  79. std::vector<TaskInfo> tasks_;
  80. std::vector<std::shared_ptr<RTSPClient>> clients_;
  81. // 线程控制
  82. std::atomic<bool> is_running_;
  83. std::atomic<bool> should_stop_;
  84. std::unique_ptr<std::thread> scheduler_thread_;
  85. // 同步原语
  86. mutable std::mutex stats_mutex_;
  87. mutable std::mutex tasks_mutex_;
  88. std::condition_variable task_cv_;
  89. // 调度统计
  90. mutable SchedulerStats scheduler_stats_;
  91. std::chrono::system_clock::time_point schedule_start_time_;
  92. // 回调函数
  93. std::function<void(const SchedulerStats&)> progress_callback_;
  94. std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> report_callback_;
  95. // 私有方法
  96. bool create_schedule();
  97. void scheduler_main_loop();
  98. void execute_task(std::shared_ptr<TaskInfo> task);
  99. void update_stats();
  100. void task_completion_callback(int stream_index, const RTSPClientStats& stats);
  101. // 时间计算辅助方法
  102. std::chrono::system_clock::time_point calculate_absolute_start_time(int relative_start_time) const;
  103. bool should_start_task(const TaskInfo& task) const;
  104. void wait_until_task_start_time(const TaskInfo& task);
  105. };
  106. } // namespace jtjai_media
  107. #endif // JTJAI_MEDIA_SCHEDULER_H