main.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include <iostream>
  2. #include <signal.h>
  3. #include <chrono>
  4. #include <thread>
  5. #include "config.h"
  6. #include "scheduler.h"
  7. #include "reporter.h"
  8. using namespace jtjai_media;
  9. // 全局变量用于信号处理
  10. static std::unique_ptr<StreamScheduler> g_scheduler;
  11. static std::atomic<bool> g_interrupted(false);
  12. // 信号处理函数
  13. void signal_handler(int signal) {
  14. std::cout << "\n接收到信号 " << signal << ",正在停止程序..." << std::endl;
  15. g_interrupted.store(true);
  16. if (g_scheduler) {
  17. g_scheduler->stop_execution();
  18. }
  19. }
  20. // 进度回调函数
  21. void progress_callback(const StreamScheduler::SchedulerStats& stats) {
  22. std::cout << "\r进度: " << stats.completed_tasks << "/" << stats.total_tasks
  23. << " 完成, " << stats.failed_tasks << " 失败, "
  24. << "当前并发: " << stats.max_concurrent_used << " 流" << std::flush;
  25. }
  26. // 报告生成回调函数
  27. void report_callback(const ConfigManager& config_mgr, int cycle_number,
  28. const std::vector<RTSPClientStats>& client_stats,
  29. const StreamScheduler::SchedulerStats& scheduler_stats) {
  30. std::cout << "生成第 " << cycle_number << " 轮执行报告..." << std::endl;
  31. // 为每个周期生成不同的报告文件
  32. ConfigManager cycle_config = config_mgr;
  33. auto cycle_global_config = cycle_config.get_global_config();
  34. // 修改报告文件名,包含周期编号
  35. std::string base_name = cycle_global_config.report_filename;
  36. size_t dot_pos = base_name.find_last_of('.');
  37. if (dot_pos != std::string::npos) {
  38. base_name = base_name.substr(0, dot_pos) + "_cycle" + std::to_string(cycle_number) + base_name.substr(dot_pos);
  39. } else {
  40. base_name = base_name + "_cycle" + std::to_string(cycle_number);
  41. }
  42. cycle_global_config.report_filename = base_name;
  43. cycle_config.set_global_config(cycle_global_config);
  44. ResultReporter cycle_reporter(cycle_config);
  45. if (cycle_reporter.generate_report(client_stats, scheduler_stats)) {
  46. std::cout << "第 " << cycle_number << " 轮报告生成成功" << std::endl;
  47. } else {
  48. std::cerr << "第 " << cycle_number << " 轮报告生成失败" << std::endl;
  49. }
  50. }
  51. int main(int argc, char* argv[]) {
  52. std::cout << "========================================" << std::endl;
  53. std::cout << "RTSP视频流并发拉取系统" << std::endl;
  54. std::cout << "========================================" << std::endl;
  55. // 注册信号处理器
  56. signal(SIGINT, signal_handler);
  57. signal(SIGTERM, signal_handler);
  58. // 解析命令行参数
  59. std::string config_file = "/Users/wenhongquan/CLionProjects/jtjai_media/config.json";
  60. if (argc > 1) {
  61. config_file = argv[1];
  62. }
  63. std::cout << "使用配置文件: " << config_file << std::endl;
  64. try {
  65. // 加载配置
  66. ConfigManager config_mgr;
  67. if (!config_mgr.load_from_file(config_file)) {
  68. std::cerr << "加载配置文件失败: " << config_file << std::endl;
  69. return 1;
  70. }
  71. std::cout << "配置加载成功" << std::endl;
  72. std::cout << config_mgr.to_string() << std::endl;
  73. // 创建调度器
  74. g_scheduler = std::make_unique<StreamScheduler>(config_mgr);
  75. // 设置进度回调
  76. g_scheduler->set_progress_callback(progress_callback);
  77. const auto& global_config = config_mgr.get_global_config();
  78. int current_cycle = 0;
  79. // 开始多轮询循环
  80. while ((global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) && !g_interrupted.load()) {
  81. current_cycle++;
  82. std::cout << "\n========== 轮询周期 " << current_cycle;
  83. if (global_config.poll_cycles != -1) {
  84. std::cout << "/" << global_config.poll_cycles;
  85. }
  86. std::cout << " ==========" << std::endl;
  87. // 设置当前周期的报告回调
  88. g_scheduler->set_report_callback([&config_mgr, current_cycle](
  89. const std::vector<RTSPClientStats>& client_stats,
  90. const StreamScheduler::SchedulerStats& scheduler_stats) {
  91. report_callback(config_mgr, current_cycle, client_stats, scheduler_stats);
  92. });
  93. // 启动调度执行
  94. std::cout << "开始执行RTSP流拉取任务..." << std::endl;
  95. if (!g_scheduler->start_execution()) {
  96. std::cerr << "启动调度器失败" << std::endl;
  97. return 1;
  98. }
  99. // 等待执行完成或中断
  100. std::cout << "等待任务执行中..." << std::endl;
  101. while (g_scheduler->is_running() && !g_interrupted.load()) {
  102. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  103. }
  104. std::cout << "任务执行完成,等待所有任务线程结束..." << std::endl;
  105. g_scheduler->wait_for_completion();
  106. // 检查是否需要继续下一轮
  107. if (g_interrupted.load()) {
  108. std::cout << "接收到中断信号,退出轮询" << std::endl;
  109. break;
  110. }
  111. if (global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) {
  112. std::cout << "\n等待 " << global_config.cycle_interval_seconds << " 秒后开始下一轮询..." << std::endl;
  113. // 在等待期间检查中断信号
  114. for (int i = 0; i < global_config.cycle_interval_seconds && !g_interrupted.load(); ++i) {
  115. std::this_thread::sleep_for(std::chrono::seconds(1));
  116. if ((i + 1) % 10 == 0) {
  117. std::cout << "\r剩余等待时间: " << (global_config.cycle_interval_seconds - i - 1) << " 秒" << std::flush;
  118. }
  119. }
  120. std::cout << std::endl;
  121. if (!g_interrupted.load()) {
  122. // 重置调度器状态,准备下一轮
  123. if (!g_scheduler->reset_for_next_cycle()) {
  124. std::cerr << "重置调度器失败" << std::endl;
  125. break;
  126. }
  127. }
  128. }
  129. }
  130. std::cout << "\n程序执行完成" << std::endl;
  131. } catch (const std::exception& e) {
  132. std::cerr << "程序执行异常: " << e.what() << std::endl;
  133. return 1;
  134. }
  135. return 0;
  136. }