123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- #include <iostream>
- #include <signal.h>
- #include <chrono>
- #include <thread>
- #include <iomanip>
- #include <sstream>
- #include <filesystem>
- #include "config.h"
- #include "scheduler.h"
- #include "reporter.h"
- #include "http_server.h"
- using namespace jtjai_media;
- // 全局变量用于信号处理
- static std::unique_ptr<StreamScheduler> g_scheduler;
- static std::unique_ptr<HttpServer> g_http_server;
- static std::atomic<bool> g_interrupted(false);
- // 信号处理函数
- void signal_handler(int signal) {
- std::cout << "\n接收到信号 " << signal << ",正在停止程序..." << std::endl;
- g_interrupted.store(true);
- if (g_scheduler) {
- g_scheduler->stop_execution();
- }
- if (g_http_server) {
- g_http_server->stop();
- }
- }
- // 进度回调函数
- void progress_callback(const StreamScheduler::SchedulerStats& stats) {
- std::cout << "\r进度: " << stats.completed_tasks << "/" << stats.total_tasks
- << " 完成, " << stats.failed_tasks << " 失败, "
- << "当前并发: " << stats.max_concurrent_used << " 流" << std::flush;
- }
- // 生成时间戳字符串
- std::string generate_timestamp() {
- auto now = std::chrono::system_clock::now();
- auto time_t = std::chrono::system_clock::to_time_t(now);
- auto tm = *std::localtime(&time_t);
-
- std::stringstream ss;
- ss << std::put_time(&tm, "%Y%m%d_%H%M%S");
- return ss.str();
- }
- // 报告生成回调函数
- void report_callback(const ConfigManager& config_mgr, int cycle_number,
- const std::string& cycle_output_dir,
- const std::vector<RTSPClientStats>& client_stats,
- const StreamScheduler::SchedulerStats& scheduler_stats) {
- std::cout << "生成第 " << cycle_number << " 轮执行报告..." << std::endl;
-
- // 为每个周期使用独立的时间戳目录
- ConfigManager cycle_config = config_mgr;
- auto cycle_global_config = cycle_config.get_global_config();
-
- // 使用传入的时间戳目录
- cycle_global_config.output_directory = cycle_output_dir;
- cycle_config.set_global_config(cycle_global_config);
-
- ResultReporter cycle_reporter(cycle_config);
- if (cycle_reporter.generate_report(client_stats, scheduler_stats)) {
- std::cout << "第 " << cycle_number << " 轮报告生成成功" << std::endl;
- std::cout << "输出目录: " << cycle_output_dir << std::endl;
- } else {
- std::cerr << "第 " << cycle_number << " 轮报告生成失败" << std::endl;
- }
- }
- int main(int argc, char* argv[]) {
- std::cout << "========================================" << std::endl;
- std::cout << "RTSP视频流并发拉取系统" << std::endl;
- std::cout << "========================================" << std::endl;
-
- // 注册信号处理器
- signal(SIGINT, signal_handler);
- signal(SIGTERM, signal_handler);
-
- // 解析命令行参数
- std::string config_file = "/Users/wenhongquan/CLionProjects/jtjai_media/config.json";
- if (argc > 1) {
- config_file = argv[1];
- }
-
- std::cout << "使用配置文件: " << config_file << std::endl;
-
- try {
- // 加载配置
- ConfigManager config_mgr;
- if (!config_mgr.load_from_file(config_file)) {
- std::cerr << "加载配置文件失败: " << config_file << std::endl;
- return 1;
- }
-
- std::cout << "配置加载成功" << std::endl;
- std::cout << config_mgr.to_string() << std::endl;
-
- const auto& global_config = config_mgr.get_global_config();
-
- // 启动HTTP服务器
- std::cout << "\n启动HTTP管理服务器..." << std::endl;
- g_http_server = std::make_unique<HttpServer>(global_config.output_directory, 8080);
- if (!g_http_server->start()) {
- std::cerr << "启动HTTP服务器失败" << std::endl;
- return 1;
- }
-
- std::cout << "\n🌐 Web管理界面: http://localhost:8080" << std::endl;
- std::cout << "📁 文件管理: http://localhost:8080/manager" << std::endl;
- std::cout << "📊 API文档: http://localhost:8080/api" << std::endl;
-
- int current_cycle = 0;
-
- // 开始多轮询循环
- while ((global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) && !g_interrupted.load()) {
- current_cycle++;
- std::cout << "\n========== 轮询周期 " << current_cycle;
- if (global_config.poll_cycles != -1) {
- std::cout << "/" << global_config.poll_cycles;
- }
- std::cout << " ==========" << std::endl;
-
- // 为当前周期创建时间戳目录
- std::string timestamp = generate_timestamp();
- std::string base_output_dir = global_config.output_directory;
- std::string cycle_output_dir = base_output_dir + "/" + timestamp;
-
- // 创建时间戳目录
- std::filesystem::create_directories(cycle_output_dir);
- std::cout << "当前周期输出目录: " << cycle_output_dir << std::endl;
-
- // 更新配置管理器的输出目录
- ConfigManager cycle_config_mgr = config_mgr;
- auto cycle_global_config = cycle_config_mgr.get_global_config();
- cycle_global_config.output_directory = cycle_output_dir;
- cycle_config_mgr.set_global_config(cycle_global_config);
-
- // 使用新的配置创建调度器
- g_scheduler = std::make_unique<StreamScheduler>(cycle_config_mgr);
- g_scheduler->set_progress_callback(progress_callback);
-
- // 设置当前周期的报告回调
- g_scheduler->set_report_callback([&config_mgr, current_cycle, cycle_output_dir](
- const std::vector<RTSPClientStats>& client_stats,
- const StreamScheduler::SchedulerStats& scheduler_stats) {
- report_callback(config_mgr, current_cycle, cycle_output_dir, client_stats, scheduler_stats);
- });
-
- // 启动调度执行
- std::cout << "开始执行RTSP流拉取任务..." << std::endl;
- if (!g_scheduler->start_execution()) {
- std::cerr << "启动调度器失败" << std::endl;
- return 1;
- }
-
- // 等待执行完成或中断
- std::cout << "等待任务执行中..." << std::endl;
- while (g_scheduler->is_running() && !g_interrupted.load()) {
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
- }
-
- std::cout << "任务执行完成,等待所有任务线程结束..." << std::endl;
- g_scheduler->wait_for_completion();
-
- // 检查是否需要继续下一轮
- if (g_interrupted.load()) {
- std::cout << "接收到中断信号,退出轮询" << std::endl;
- break;
- }
-
- if (global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) {
- std::cout << "\n等待 " << global_config.cycle_interval_seconds << " 秒后开始下一轮询..." << std::endl;
-
- // 在等待期间检查中断信号
- for (int i = 0; i < global_config.cycle_interval_seconds && !g_interrupted.load(); ++i) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- if ((i + 1) % 10 == 0) {
- std::cout << "\r剩余等待时间: " << (global_config.cycle_interval_seconds - i - 1) << " 秒" << std::flush;
- }
- }
- std::cout << std::endl;
- }
- }
-
- std::cout << "\n程序执行完成" << std::endl;
-
- } catch (const std::exception& e) {
- std::cerr << "程序执行异常: " << e.what() << std::endl;
- return 1;
- }
-
- return 0;
- }
|