| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 | #include <iostream>#include <signal.h>#include <chrono>#include <thread>#include <iomanip>#include <sstream>#include <filesystem>#include "config.h"#include "scheduler.h"#include "reporter.h"#include "http_server.h"using namespace jtjai_media;// 全局变量用于信号处理static std::unique_ptr<StreamScheduler> g_scheduler;static std::unique_ptr<HttpServer> g_http_server;static std::atomic<bool> g_interrupted(false);// 信号处理函数void signal_handler(int signal) {    std::cout << "\n接收到信号 " << signal << ",正在停止程序..." << std::endl;    g_interrupted.store(true);    if (g_scheduler) {        g_scheduler->stop_execution();    }    if (g_http_server) {        g_http_server->stop();    }}// 进度回调函数void progress_callback(const StreamScheduler::SchedulerStats& stats) {    std::cout << "\r进度: " << stats.completed_tasks << "/" << stats.total_tasks               << " 完成, " << stats.failed_tasks << " 失败, "              << "当前并发: " << stats.max_concurrent_used << " 流" << std::flush;}// 生成时间戳字符串std::string generate_timestamp() {    auto now = std::chrono::system_clock::now();    auto time_t = std::chrono::system_clock::to_time_t(now);    auto tm = *std::localtime(&time_t);        std::stringstream ss;    ss << std::put_time(&tm, "%Y%m%d_%H%M%S");    return ss.str();}// 报告生成回调函数void report_callback(const ConfigManager& config_mgr, int cycle_number,                     const std::string& cycle_output_dir,                    const std::vector<RTSPClientStats>& client_stats,                    const StreamScheduler::SchedulerStats& scheduler_stats,                    bool was_timeout = false) {    std::cout << "生成第 " << cycle_number << " 轮执行报告";    if (was_timeout) {        std::cout << " (⚠️ 超时强制结束)";    }    std::cout << "..." << std::endl;        // 为每个周期使用独立的时间戳目录    ConfigManager cycle_config = config_mgr;    auto cycle_global_config = cycle_config.get_global_config();        // 使用传入的时间戳目录    cycle_global_config.output_directory = cycle_output_dir;    cycle_config.set_global_config(cycle_global_config);        ResultReporter cycle_reporter(cycle_config);    if (cycle_reporter.generate_report(client_stats, scheduler_stats)) {        std::cout << "第 " << cycle_number << " 轮报告生成成功";        if (was_timeout) {            std::cout << " (包含超时标记)";        }        std::cout << std::endl;        std::cout << "输出目录: " << cycle_output_dir << std::endl;    } else {        std::cerr << "第 " << cycle_number << " 轮报告生成失败" << std::endl;    }}int main(int argc, char* argv[]) {    std::cout << "========================================" << std::endl;    std::cout << "RTSP视频流并发拉取系统" << std::endl;    std::cout << "========================================" << std::endl;        // 注册信号处理器    signal(SIGINT, signal_handler);    signal(SIGTERM, signal_handler);        // 解析命令行参数    std::string config_file = "/Users/wenhongquan/CLionProjects/jtjai_media/config.json";    if (argc > 1) {        config_file = argv[1];    }        std::cout << "使用配置文件: " << config_file << std::endl;        try {        // 加载配置        ConfigManager config_mgr;        if (!config_mgr.load_from_file(config_file)) {            std::cerr << "加载配置文件失败: " << config_file << std::endl;            return 1;        }                std::cout << "配置加载成功" << std::endl;        std::cout << config_mgr.to_string() << std::endl;                const auto& global_config = config_mgr.get_global_config();                // 启动HTTP服务器        std::cout << "\n启动HTTP管理服务器..." << std::endl;        g_http_server = std::make_unique<HttpServer>(global_config.output_directory, 8080);        if (!g_http_server->start()) {            std::cerr << "启动HTTP服务器失败" << std::endl;            return 1;        }                std::cout << "\n🌐 Web管理界面: http://localhost:8080" << std::endl;        std::cout << "📁 文件管理: http://localhost:8080/manager" << std::endl;        std::cout << "📊 API文档: http://localhost:8080/api" << std::endl;                int current_cycle = 0;                // 开始多轮询循环 - 使用定时器强制切换        auto start_time = std::chrono::system_clock::now();                while ((global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) && !g_interrupted.load()) {            current_cycle++;            std::cout << "\n========== 轮询周期 " << current_cycle;            if (global_config.poll_cycles != -1) {                std::cout << "/" << global_config.poll_cycles;            }            std::cout << " ==========" << std::endl;                        // 计算本轮应该开始的时间            auto cycle_start_time = start_time + std::chrono::seconds((current_cycle - 1) * global_config.total_poll_duration_seconds);            auto now = std::chrono::system_clock::now();                        // 如果还没到时间,等待            if (cycle_start_time > now) {                auto wait_duration = std::chrono::duration_cast<std::chrono::seconds>(cycle_start_time - now).count();                std::cout << "等待 " << wait_duration << " 秒后开始本轮任务..." << std::endl;                                for (int i = 0; i < wait_duration && !g_interrupted.load(); ++i) {                    std::this_thread::sleep_for(std::chrono::seconds(1));                    if ((i + 1) % 10 == 0) {                        std::cout << "\r剩余等待时间: " << (wait_duration - i - 1) << " 秒" << std::flush;                    }                }                std::cout << std::endl;            }                        if (g_interrupted.load()) {                break;            }                        // 强制停止上一轮的调度器(如果还在运行)            if (g_scheduler && g_scheduler->is_running()) {                std::cout << "⚠️  强制停止上一轮任务(超时)" << std::endl;                g_scheduler->stop_execution();                g_scheduler->wait_for_completion();            }                        // 为当前周期创建时间戳目录            std::string timestamp = generate_timestamp();            std::string base_output_dir = global_config.output_directory;            std::string cycle_output_dir = base_output_dir + "/" + timestamp;                        // 创建时间戳目录            std::filesystem::create_directories(cycle_output_dir);            std::cout << "当前周期输出目录: " << cycle_output_dir << std::endl;            std::cout << "⏱️  本轮最大执行时间: " << global_config.total_poll_duration_seconds << " 秒" << std::endl;                        // 更新配置管理器的输出目录            ConfigManager cycle_config_mgr = config_mgr;            auto cycle_global_config = cycle_config_mgr.get_global_config();            cycle_global_config.output_directory = cycle_output_dir;            cycle_config_mgr.set_global_config(cycle_global_config);                        // 使用新的配置创建调度器            g_scheduler = std::make_unique<StreamScheduler>(cycle_config_mgr);            g_scheduler->set_progress_callback(progress_callback);                        bool timeout_reached = false;                        // 设置当前周期的报告回调            g_scheduler->set_report_callback([&config_mgr, current_cycle, cycle_output_dir, &timeout_reached](                const std::vector<RTSPClientStats>& client_stats,                const StreamScheduler::SchedulerStats& scheduler_stats) {                report_callback(config_mgr, current_cycle, cycle_output_dir, client_stats, scheduler_stats, timeout_reached);            });                        // 启动调度执行            std::cout << "开始执行RTSP流拉取任务..." << std::endl;            auto task_start_time = std::chrono::system_clock::now();                        if (!g_scheduler->start_execution()) {                std::cerr << "启动调度器失败" << std::endl;                return 1;            }                        // 等待执行完成或超时            std::cout << "等待任务执行中(最大 " << global_config.total_poll_duration_seconds << " 秒)..." << std::endl;                        while (g_scheduler->is_running() && !g_interrupted.load()) {                auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(                    std::chrono::system_clock::now() - task_start_time).count();                                    if (elapsed >= global_config.total_poll_duration_seconds) {                    std::cout << "\n⏰ 达到最大执行时间(" << global_config.total_poll_duration_seconds << "秒),强制结束本轮任务" << std::endl;                    timeout_reached = true;                    break;                }                                // 显示剩余时间                int remaining = global_config.total_poll_duration_seconds - elapsed;                if (remaining % 10 == 0 && remaining > 0) {                    std::cout << "\r剩余执行时间: " << remaining << " 秒" << std::flush;                }                                std::this_thread::sleep_for(std::chrono::milliseconds(1000));            }                        // 强制停止任务并生成报告            if (timeout_reached || g_scheduler->is_running()) {                std::cout << "\n强制停止调度器..." << std::endl;                g_scheduler->stop_execution();            }                        std::cout << "等待所有任务线程结束..." << std::endl;            g_scheduler->wait_for_completion();                        // 检查是否需要继续下一轮            if (g_interrupted.load()) {                std::cout << "接收到中断信号,退出轮询" << std::endl;                break;            }        }                std::cout << "\n程序执行完成" << std::endl;            } catch (const std::exception& e) {        std::cerr << "程序执行异常: " << e.what() << std::endl;        return 1;    }        return 0;}
 |