123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- #ifndef JTJAI_MEDIA_SCHEDULER_H
- #define JTJAI_MEDIA_SCHEDULER_H
- #include "config.h"
- #include "concurrent_calculator.h"
- #include "rtsp_client.h"
- #include <vector>
- #include <memory>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <atomic>
- #include <queue>
- #include <functional>
- namespace jtjai_media {
- struct TaskInfo {
- int stream_index;
- int start_time; // 相对于调度开始的秒数
- int duration;
- std::shared_ptr<RTSPClient> client;
- std::chrono::system_clock::time_point scheduled_start_time;
- std::atomic<bool> started; // 标记任务是否已经启动
- std::atomic<bool> completed; // 标记任务是否已经完成
-
- TaskInfo(int idx, int start, int dur, std::shared_ptr<RTSPClient> c)
- : stream_index(idx), start_time(start), duration(dur), client(c), started(false), completed(false) {}
-
- // 复制构造函数
- TaskInfo(const TaskInfo& other)
- : stream_index(other.stream_index)
- , start_time(other.start_time)
- , duration(other.duration)
- , client(other.client)
- , scheduled_start_time(other.scheduled_start_time)
- , started(other.started.load())
- , completed(other.completed.load()) {}
- };
- class StreamScheduler {
- public:
- StreamScheduler(const ConfigManager& config_mgr);
- ~StreamScheduler();
-
- // 禁用拷贝构造和赋值
- StreamScheduler(const StreamScheduler&) = delete;
- StreamScheduler& operator=(const StreamScheduler&) = delete;
-
- // 开始调度执行
- bool start_execution();
-
- // 重置调度器状态,准备下一轮执行
- bool reset_for_next_cycle();
-
- // 停止调度执行
- void stop_execution();
-
- // 等待所有任务完成
- void wait_for_completion();
-
- // 获取当前运行状态
- bool is_running() const { return is_running_.load(); }
-
- // 获取所有客户端的统计信息
- std::vector<RTSPClientStats> get_all_stats() const;
-
- // 获取调度统计信息
- struct SchedulerStats {
- int total_tasks;
- int completed_tasks;
- int failed_tasks;
- int cancelled_tasks;
- std::chrono::system_clock::time_point start_time;
- std::chrono::system_clock::time_point end_time;
- int max_concurrent_used;
- double completion_rate;
- };
-
- SchedulerStats get_scheduler_stats() const;
-
- // 设置进度回调
- void set_progress_callback(std::function<void(const SchedulerStats&)> callback) {
- progress_callback_ = callback;
- }
-
- // 设置报告回调(主循环结束时自动调用)
- void set_report_callback(std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> callback) {
- report_callback_ = callback;
- }
-
- // 获取调度方案信息
- std::string get_schedule_info() const;
- private:
- ConfigManager config_mgr_;
- ConcurrentCalculator calculator_;
- std::vector<TaskInfo> tasks_;
- std::vector<std::shared_ptr<RTSPClient>> clients_;
-
- // 线程控制
- std::atomic<bool> is_running_;
- std::atomic<bool> should_stop_;
- std::unique_ptr<std::thread> scheduler_thread_;
-
- // 同步原语
- mutable std::mutex stats_mutex_;
- mutable std::mutex tasks_mutex_;
- std::condition_variable task_cv_;
-
- // 调度统计
- mutable SchedulerStats scheduler_stats_;
- std::chrono::system_clock::time_point schedule_start_time_;
-
- // 回调函数
- std::function<void(const SchedulerStats&)> progress_callback_;
- std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> report_callback_;
-
- // 私有方法
- bool create_schedule();
- void scheduler_main_loop();
- void execute_task(std::shared_ptr<TaskInfo> task);
- void update_stats();
- void task_completion_callback(int stream_index, const RTSPClientStats& stats);
-
- // 时间计算辅助方法
- std::chrono::system_clock::time_point calculate_absolute_start_time(int relative_start_time) const;
- bool should_start_task(const TaskInfo& task) const;
- void wait_until_task_start_time(const TaskInfo& task);
- };
- } // namespace jtjai_media
- #endif // JTJAI_MEDIA_SCHEDULER_H
|