main.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #include <iostream>
  2. #include <signal.h>
  3. #include <chrono>
  4. #include <thread>
  5. #include <iomanip>
  6. #include <sstream>
  7. #include <filesystem>
  8. #include "config.h"
  9. #include "scheduler.h"
  10. #include "reporter.h"
  11. #include "http_server.h"
  12. using namespace jtjai_media;
  13. // 全局变量用于信号处理
  14. static std::unique_ptr<StreamScheduler> g_scheduler;
  15. static std::unique_ptr<HttpServer> g_http_server;
  16. static std::atomic<bool> g_interrupted(false);
  17. // 信号处理函数
  18. void signal_handler(int signal) {
  19. std::cout << "\n接收到信号 " << signal << ",正在停止程序..." << std::endl;
  20. g_interrupted.store(true);
  21. if (g_scheduler) {
  22. g_scheduler->stop_execution();
  23. }
  24. if (g_http_server) {
  25. g_http_server->stop();
  26. }
  27. }
  28. // 进度回调函数
  29. void progress_callback(const StreamScheduler::SchedulerStats& stats) {
  30. std::cout << "\r进度: " << stats.completed_tasks << "/" << stats.total_tasks
  31. << " 完成, " << stats.failed_tasks << " 失败, "
  32. << "当前并发: " << stats.max_concurrent_used << " 流" << std::flush;
  33. }
  34. // 生成时间戳字符串
  35. std::string generate_timestamp() {
  36. auto now = std::chrono::system_clock::now();
  37. auto time_t = std::chrono::system_clock::to_time_t(now);
  38. auto tm = *std::localtime(&time_t);
  39. std::stringstream ss;
  40. ss << std::put_time(&tm, "%Y%m%d_%H%M%S");
  41. return ss.str();
  42. }
  43. // 报告生成回调函数
  44. void report_callback(const ConfigManager& config_mgr, int cycle_number,
  45. const std::string& cycle_output_dir,
  46. const std::vector<RTSPClientStats>& client_stats,
  47. const StreamScheduler::SchedulerStats& scheduler_stats,
  48. bool was_timeout = false) {
  49. std::cout << "生成第 " << cycle_number << " 轮执行报告";
  50. if (was_timeout) {
  51. std::cout << " (⚠️ 超时强制结束)";
  52. }
  53. std::cout << "..." << std::endl;
  54. // 为每个周期使用独立的时间戳目录
  55. ConfigManager cycle_config = config_mgr;
  56. auto cycle_global_config = cycle_config.get_global_config();
  57. // 使用传入的时间戳目录
  58. cycle_global_config.output_directory = cycle_output_dir;
  59. cycle_config.set_global_config(cycle_global_config);
  60. ResultReporter cycle_reporter(cycle_config);
  61. if (cycle_reporter.generate_report(client_stats, scheduler_stats)) {
  62. std::cout << "第 " << cycle_number << " 轮报告生成成功";
  63. if (was_timeout) {
  64. std::cout << " (包含超时标记)";
  65. }
  66. std::cout << std::endl;
  67. std::cout << "输出目录: " << cycle_output_dir << std::endl;
  68. } else {
  69. std::cerr << "第 " << cycle_number << " 轮报告生成失败" << std::endl;
  70. }
  71. }
  72. int main(int argc, char* argv[]) {
  73. std::cout << "========================================" << std::endl;
  74. std::cout << "RTSP视频流并发拉取系统" << std::endl;
  75. std::cout << "========================================" << std::endl;
  76. // 注册信号处理器
  77. signal(SIGINT, signal_handler);
  78. signal(SIGTERM, signal_handler);
  79. // 解析命令行参数
  80. std::string config_file = "/Users/wenhongquan/CLionProjects/jtjai_media/config.json";
  81. if (argc > 1) {
  82. config_file = argv[1];
  83. }
  84. std::cout << "使用配置文件: " << config_file << std::endl;
  85. try {
  86. // 加载配置
  87. ConfigManager config_mgr;
  88. if (!config_mgr.load_from_file(config_file)) {
  89. std::cerr << "加载配置文件失败: " << config_file << std::endl;
  90. return 1;
  91. }
  92. std::cout << "配置加载成功" << std::endl;
  93. std::cout << config_mgr.to_string() << std::endl;
  94. const auto& global_config = config_mgr.get_global_config();
  95. // 启动HTTP服务器
  96. std::cout << "\n启动HTTP管理服务器..." << std::endl;
  97. g_http_server = std::make_unique<HttpServer>(global_config.output_directory, 8080);
  98. if (!g_http_server->start()) {
  99. std::cerr << "启动HTTP服务器失败" << std::endl;
  100. return 1;
  101. }
  102. std::cout << "\n🌐 Web管理界面: http://localhost:8080" << std::endl;
  103. std::cout << "📁 文件管理: http://localhost:8080/manager" << std::endl;
  104. std::cout << "📊 API文档: http://localhost:8080/api" << std::endl;
  105. int current_cycle = 0;
  106. // 开始多轮询循环 - 使用定时器强制切换
  107. auto start_time = std::chrono::system_clock::now();
  108. while ((global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) && !g_interrupted.load()) {
  109. current_cycle++;
  110. std::cout << "\n========== 轮询周期 " << current_cycle;
  111. if (global_config.poll_cycles != -1) {
  112. std::cout << "/" << global_config.poll_cycles;
  113. } else {
  114. std::cout << " (无限循环)";
  115. }
  116. std::cout << " ==========" << std::endl;
  117. // 计算本轮应该开始的时间
  118. auto cycle_start_time = start_time + std::chrono::seconds((current_cycle - 1) * global_config.total_poll_duration_seconds);
  119. auto now = std::chrono::system_clock::now();
  120. // 如果还没到时间,等待
  121. if (cycle_start_time > now) {
  122. auto wait_duration = std::chrono::duration_cast<std::chrono::seconds>(cycle_start_time - now).count();
  123. std::cout << "等待 " << wait_duration << " 秒后开始本轮任务..." << std::endl;
  124. for (int i = 0; i < wait_duration && !g_interrupted.load(); ++i) {
  125. std::this_thread::sleep_for(std::chrono::seconds(1));
  126. if ((i + 1) % 10 == 0) {
  127. std::cout << "\r剩余等待时间: " << (wait_duration - i - 1) << " 秒" << std::flush;
  128. }
  129. }
  130. std::cout << std::endl;
  131. }
  132. if (g_interrupted.load()) {
  133. break;
  134. }
  135. // 强制停止上一轮的调度器(如果还在运行)
  136. if (g_scheduler && g_scheduler->is_running()) {
  137. std::cout << "⚠️ 强制停止上一轮任务(超时)" << std::endl;
  138. g_scheduler->stop_execution();
  139. g_scheduler->wait_for_completion();
  140. }
  141. // 为当前周期创建时间戳目录
  142. std::string timestamp = generate_timestamp();
  143. std::string base_output_dir = global_config.output_directory;
  144. std::string cycle_output_dir = base_output_dir + "/" + timestamp;
  145. // 创建时间戳目录
  146. std::filesystem::create_directories(cycle_output_dir);
  147. std::cout << "当前周期输出目录: " << cycle_output_dir << std::endl;
  148. std::cout << "⏱️ 本轮最大执行时间: " << global_config.total_poll_duration_seconds << " 秒" << std::endl;
  149. // 更新配置管理器的输出目录
  150. ConfigManager cycle_config_mgr = config_mgr;
  151. auto cycle_global_config = cycle_config_mgr.get_global_config();
  152. cycle_global_config.output_directory = cycle_output_dir;
  153. cycle_config_mgr.set_global_config(cycle_global_config);
  154. // 使用新的配置创建调度器
  155. g_scheduler = std::make_unique<StreamScheduler>(cycle_config_mgr);
  156. g_scheduler->set_progress_callback(progress_callback);
  157. bool timeout_reached = false;
  158. // 设置当前周期的报告回调
  159. g_scheduler->set_report_callback([&config_mgr, current_cycle, cycle_output_dir, &timeout_reached](
  160. const std::vector<RTSPClientStats>& client_stats,
  161. const StreamScheduler::SchedulerStats& scheduler_stats) {
  162. report_callback(config_mgr, current_cycle, cycle_output_dir, client_stats, scheduler_stats, timeout_reached);
  163. });
  164. // 启动调度执行
  165. std::cout << "开始执行RTSP流拉取任务..." << std::endl;
  166. auto task_start_time = std::chrono::system_clock::now();
  167. if (!g_scheduler->start_execution()) {
  168. std::cerr << "启动调度器失败" << std::endl;
  169. return 1;
  170. }
  171. // 等待执行完成或超时
  172. std::cout << "等待任务执行中(最大 " << global_config.total_poll_duration_seconds << " 秒)..." << std::endl;
  173. while (g_scheduler->is_running() && !g_interrupted.load()) {
  174. auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
  175. std::chrono::system_clock::now() - task_start_time).count();
  176. if (elapsed >= global_config.total_poll_duration_seconds) {
  177. std::cout << "\n⏰ 达到最大执行时间(" << global_config.total_poll_duration_seconds << "秒),强制结束本轮任务" << std::endl;
  178. timeout_reached = true;
  179. break;
  180. }
  181. // 显示剩余时间
  182. int remaining = global_config.total_poll_duration_seconds - elapsed;
  183. if (remaining % 10 == 0 && remaining > 0) {
  184. std::cout << "\r剩余执行时间: " << remaining << " 秒" << std::flush;
  185. }
  186. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  187. }
  188. // 强制停止任务并生成报告
  189. if (timeout_reached || g_scheduler->is_running()) {
  190. std::cout << "\n强制停止调度器..." << std::endl;
  191. g_scheduler->stop_execution();
  192. }
  193. std::cout << "等待所有任务线程结束..." << std::endl;
  194. g_scheduler->wait_for_completion();
  195. // 检查是否需要继续下一轮
  196. if (g_interrupted.load()) {
  197. std::cout << "接收到中断信号,退出轮询" << std::endl;
  198. break;
  199. }
  200. }
  201. if (global_config.poll_cycles != -1) {
  202. std::cout << "\n所有轮询周期已完成" << std::endl;
  203. } else {
  204. std::cout << "\n接收到中断信号,退出轮询" << std::endl;
  205. }
  206. // 保持HTTP服务器运行,即使轮询完成
  207. if (g_http_server && !g_interrupted.load()) {
  208. std::cout << "\n🌐 HTTP管理服务器继续运行..." << std::endl;
  209. std::cout << "📱 您可以继续通过 http://localhost:8080/manager 管理文件" << std::endl;
  210. std::cout << "▶️ 按 Ctrl+C 停止服务器" << std::endl;
  211. // 等待中断信号
  212. while (!g_interrupted.load()) {
  213. std::this_thread::sleep_for(std::chrono::seconds(1));
  214. }
  215. std::cout << "\n正在停止HTTP服务器..." << std::endl;
  216. g_http_server->stop();
  217. }
  218. std::cout << "\nHTTP服务器已停止" << std::endl;
  219. } catch (const std::exception& e) {
  220. std::cerr << "程序执行异常: " << e.what() << std::endl;
  221. return 1;
  222. }
  223. return 0;
  224. }