wenhongquan 2 周之前
父节点
当前提交
442eec1d0c
共有 14 个文件被更改,包括 515 次插入96 次删除
  1. 23 5
      CMakeLists.txt
  2. 36 0
      build/config.json
  3. 12 22
      config.json
  4. 5 1
      include/config.h
  5. 2 1
      include/rtsp_client.h
  6. 22 1
      include/scheduler.h
  7. 90 27
      main.cpp
  8. 80 0
      output/rtsp_report_cycle1.json
  9. 12 0
      src/config.cpp
  10. 7 6
      src/reporter.cpp
  11. 37 8
      src/rtsp_client.cpp
  12. 92 25
      src/scheduler.cpp
  13. 24 0
      test_report.sh
  14. 73 0
      test_report_generation.cpp

+ 23 - 5
CMakeLists.txt

@@ -5,11 +5,26 @@ set(CMAKE_CXX_STANDARD 17)
 # Find required packages
 find_package(fmt CONFIG REQUIRED)
 find_package(Boost REQUIRED COMPONENTS json system)
-find_package(PkgConfig REQUIRED)
-pkg_check_modules(FFMPEG REQUIRED IMPORTED_TARGET libavformat libavcodec libavutil libswscale)
+# 查找FFmpeg库
+find_path(AVCODEC_INCLUDE_DIR libavcodec/avcodec.h)
+find_library(AVCODEC_LIBRARY avcodec)
+find_path(AVFORMAT_INCLUDE_DIR libavformat/avformat.h)
+find_library(AVFORMAT_LIBRARY avformat)
+find_path(AVUTIL_INCLUDE_DIR libavutil/avutil.h)
+find_library(AVUTIL_LIBRARY avutil)
+find_path(SWSCALE_INCLUDE_DIR libswscale/swscale.h)
+find_library(SWSCALE_LIBRARY swscale)
 
-# Include directories
+if(NOT AVCODEC_LIBRARY OR NOT AVFORMAT_LIBRARY OR NOT AVUTIL_LIBRARY OR NOT SWSCALE_LIBRARY)
+    message(FATAL_ERROR "FFmpeg libraries not found")
+endif()
+
+# 设置包含目录
 include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${AVCODEC_INCLUDE_DIR})
+include_directories(${AVFORMAT_INCLUDE_DIR})
+include_directories(${AVUTIL_INCLUDE_DIR})
+include_directories(${SWSCALE_INCLUDE_DIR})
 
 # Source files
 file(GLOB SOURCES 
@@ -19,11 +34,14 @@ file(GLOB SOURCES
 
 add_executable(jtjai_media ${SOURCES})
 
-# Link libraries
+# 链接库
 target_link_libraries(jtjai_media PRIVATE 
     fmt::fmt
     Boost::json 
     Boost::system 
-    PkgConfig::FFMPEG
+    ${AVFORMAT_LIBRARY}
+    ${AVCODEC_LIBRARY}
+    ${AVUTIL_LIBRARY}
+    ${SWSCALE_LIBRARY}
     pthread
 )

+ 36 - 0
build/config.json

@@ -0,0 +1,36 @@
+{
+  "global_config": {
+    "total_poll_duration_seconds": 300,
+    "max_concurrent_streams": 8,
+    "output_directory": "./output",
+    "report_filename": "rtsp_report.json",
+    "connection_timeout_seconds": 10,
+    "read_timeout_seconds": 30
+  },
+  "streams": [
+    {
+      "rtsp_url": "rtsp://example1.com:554/live/stream1",
+      "duration_seconds": 60,
+      "weight": 1.0,
+      "output_filename": "stream1.mp4"
+    },
+    {
+      "rtsp_url": "rtsp://example2.com:554/live/stream2", 
+      "duration_seconds": 90,
+      "weight": 1.5,
+      "output_filename": "stream2.mp4"
+    },
+    {
+      "rtsp_url": "rtsp://example3.com:554/live/stream3",
+      "duration_seconds": 45,
+      "weight": 0.8,
+      "output_filename": "stream3.mp4"
+    },
+    {
+      "rtsp_url": "rtsp://example4.com:554/live/stream4",
+      "duration_seconds": 120,
+      "weight": 2.0,
+      "output_filename": "stream4.mp4"
+    }
+  ]
+}

+ 12 - 22
config.json

@@ -1,36 +1,26 @@
 {
   "global_config": {
-    "total_poll_duration_seconds": 300,
-    "max_concurrent_streams": 8,
+    "total_poll_duration_seconds": 60,
+    "max_concurrent_streams": 2,
     "output_directory": "./output",
     "report_filename": "rtsp_report.json",
-    "connection_timeout_seconds": 10,
-    "read_timeout_seconds": 30
+    "connection_timeout_seconds": 3,
+    "read_timeout_seconds": 5,
+    "poll_cycles": 3,
+    "cycle_interval_seconds": 30
   },
   "streams": [
     {
-      "rtsp_url": "rtsp://example1.com:554/live/stream1",
-      "duration_seconds": 60,
+      "rtsp_url": "rtsp://test1.example.com:554/live/stream1",
+      "duration_seconds": 15,
       "weight": 1.0,
-      "output_filename": "stream1.mp4"
+      "output_filename": "test_stream1.mp4"
     },
     {
-      "rtsp_url": "rtsp://example2.com:554/live/stream2", 
-      "duration_seconds": 90,
+      "rtsp_url": "rtsp://test2.example.com:554/live/stream2", 
+      "duration_seconds": 20,
       "weight": 1.5,
-      "output_filename": "stream2.mp4"
-    },
-    {
-      "rtsp_url": "rtsp://example3.com:554/live/stream3",
-      "duration_seconds": 45,
-      "weight": 0.8,
-      "output_filename": "stream3.mp4"
-    },
-    {
-      "rtsp_url": "rtsp://example4.com:554/live/stream4",
-      "duration_seconds": 120,
-      "weight": 2.0,
-      "output_filename": "stream4.mp4"
+      "output_filename": "test_stream2.mp4"
     }
   ]
 }

+ 5 - 1
include/config.h

@@ -25,6 +25,8 @@ struct GlobalConfig {
     std::string report_filename;      // 报告文件名
     int connection_timeout_seconds;   // 连接超时时间
     int read_timeout_seconds;         // 读取超时时间
+    int poll_cycles;                  // 轮询周期数(-1表示无限循环)
+    int cycle_interval_seconds;       // 轮询周期间隔(秒)
     
     GlobalConfig() 
         : total_poll_duration_seconds(300)
@@ -32,7 +34,9 @@ struct GlobalConfig {
         , output_directory("./output")
         , report_filename("rtsp_report.json")
         , connection_timeout_seconds(10)
-        , read_timeout_seconds(30) {}
+        , read_timeout_seconds(30)
+        , poll_cycles(1)
+        , cycle_interval_seconds(60) {}
 };
 
 class ConfigManager {

+ 2 - 1
include/rtsp_client.h

@@ -27,7 +27,8 @@ enum class RTSPClientStatus {
     ERROR_CONNECT,  // 连接错误
     ERROR_RECORD,   // 录制错误
     TIMEOUT,        // 超时
-    CANCELLED       // 已取消
+    CANCELLED,      // 已取消
+    STREAM_UNAVAILABLE  // 流不可用(快速失败)
 };
 
 struct RTSPClientStats {

+ 22 - 1
include/scheduler.h

@@ -21,9 +21,21 @@ struct TaskInfo {
     int duration;
     std::shared_ptr<RTSPClient> client;
     std::chrono::system_clock::time_point scheduled_start_time;
+    std::atomic<bool> started;     // 标记任务是否已经启动
+    std::atomic<bool> completed;   // 标记任务是否已经完成
     
     TaskInfo(int idx, int start, int dur, std::shared_ptr<RTSPClient> c)
-        : stream_index(idx), start_time(start), duration(dur), client(c) {}
+        : stream_index(idx), start_time(start), duration(dur), client(c), started(false), completed(false) {}
+        
+    // 复制构造函数
+    TaskInfo(const TaskInfo& other)
+        : stream_index(other.stream_index)
+        , start_time(other.start_time)
+        , duration(other.duration)
+        , client(other.client)
+        , scheduled_start_time(other.scheduled_start_time)
+        , started(other.started.load())
+        , completed(other.completed.load()) {}
 };
 
 class StreamScheduler {
@@ -38,6 +50,9 @@ public:
     // 开始调度执行
     bool start_execution();
     
+    // 重置调度器状态,准备下一轮执行
+    bool reset_for_next_cycle();
+    
     // 停止调度执行
     void stop_execution();
     
@@ -69,6 +84,11 @@ public:
         progress_callback_ = callback;
     }
     
+    // 设置报告回调(主循环结束时自动调用)
+    void set_report_callback(std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> callback) {
+        report_callback_ = callback;
+    }
+    
     // 获取调度方案信息
     std::string get_schedule_info() const;
 
@@ -94,6 +114,7 @@ private:
     
     // 回调函数
     std::function<void(const SchedulerStats&)> progress_callback_;
+    std::function<void(const std::vector<RTSPClientStats>&, const SchedulerStats&)> report_callback_;
     
     // 私有方法
     bool create_schedule();

+ 90 - 27
main.cpp

@@ -28,6 +28,35 @@ void progress_callback(const StreamScheduler::SchedulerStats& stats) {
               << "当前并发: " << stats.max_concurrent_used << " 流" << std::flush;
 }
 
+// 报告生成回调函数
+void report_callback(const ConfigManager& config_mgr, int cycle_number, 
+                    const std::vector<RTSPClientStats>& client_stats,
+                    const StreamScheduler::SchedulerStats& scheduler_stats) {
+    std::cout << "生成第 " << cycle_number << " 轮执行报告..." << std::endl;
+    
+    // 为每个周期生成不同的报告文件
+    ConfigManager cycle_config = config_mgr;
+    auto cycle_global_config = cycle_config.get_global_config();
+    
+    // 修改报告文件名,包含周期编号
+    std::string base_name = cycle_global_config.report_filename;
+    size_t dot_pos = base_name.find_last_of('.');
+    if (dot_pos != std::string::npos) {
+        base_name = base_name.substr(0, dot_pos) + "_cycle" + std::to_string(cycle_number) + base_name.substr(dot_pos);
+    } else {
+        base_name = base_name + "_cycle" + std::to_string(cycle_number);
+    }
+    cycle_global_config.report_filename = base_name;
+    cycle_config.set_global_config(cycle_global_config);
+    
+    ResultReporter cycle_reporter(cycle_config);
+    if (cycle_reporter.generate_report(client_stats, scheduler_stats)) {
+        std::cout << "第 " << cycle_number << " 轮报告生成成功" << std::endl;
+    } else {
+        std::cerr << "第 " << cycle_number << " 轮报告生成失败" << std::endl;
+    }
+}
+
 int main(int argc, char* argv[]) {
     std::cout << "========================================" << std::endl;
     std::cout << "RTSP视频流并发拉取系统" << std::endl;
@@ -38,7 +67,7 @@ int main(int argc, char* argv[]) {
     signal(SIGTERM, signal_handler);
     
     // 解析命令行参数
-    std::string config_file = "config.json";
+    std::string config_file = "/Users/wenhongquan/CLionProjects/jtjai_media/config.json";
     if (argc > 1) {
         config_file = argv[1];
     }
@@ -62,33 +91,67 @@ int main(int argc, char* argv[]) {
         // 设置进度回调
         g_scheduler->set_progress_callback(progress_callback);
         
-        // 启动调度执行
-        std::cout << "开始执行RTSP流拉取任务..." << std::endl;
-        if (!g_scheduler->start_execution()) {
-            std::cerr << "启动调度器失败" << std::endl;
-            return 1;
-        }
-        
-        // 等待执行完成或中断
-        while (g_scheduler->is_running() && !g_interrupted.load()) {
-            std::this_thread::sleep_for(std::chrono::milliseconds(500));
-        }
-        
-        std::cout << "\n等待所有任务完成..." << std::endl;
-        g_scheduler->wait_for_completion();
-        
-        // 获取结果统计
-        auto client_stats = g_scheduler->get_all_stats();
-        auto scheduler_stats = g_scheduler->get_scheduler_stats();
-        
-        // 生成报告
-        std::cout << "\n生成执行报告..." << std::endl;
-        ResultReporter reporter(config_mgr);
+        const auto& global_config = config_mgr.get_global_config();
+        int current_cycle = 0;
         
-        if (reporter.generate_report(client_stats, scheduler_stats)) {
-            std::cout << "报告生成成功" << std::endl;
-        } else {
-            std::cerr << "报告生成失败" << std::endl;
+        // 开始多轮询循环
+        while ((global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) && !g_interrupted.load()) {
+            current_cycle++;
+            std::cout << "\n========== 轮询周期 " << current_cycle;
+            if (global_config.poll_cycles != -1) {
+                std::cout << "/" << global_config.poll_cycles;
+            }
+            std::cout << " ==========" << std::endl;
+            
+            // 设置当前周期的报告回调
+            g_scheduler->set_report_callback([&config_mgr, current_cycle](
+                const std::vector<RTSPClientStats>& client_stats,
+                const StreamScheduler::SchedulerStats& scheduler_stats) {
+                report_callback(config_mgr, current_cycle, client_stats, scheduler_stats);
+            });
+            
+            // 启动调度执行
+            std::cout << "开始执行RTSP流拉取任务..." << std::endl;
+            if (!g_scheduler->start_execution()) {
+                std::cerr << "启动调度器失败" << std::endl;
+                return 1;
+            }
+            
+            // 等待执行完成或中断
+            std::cout << "等待任务执行中..." << std::endl;
+            while (g_scheduler->is_running() && !g_interrupted.load()) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            }
+            
+            std::cout << "任务执行完成,等待所有任务线程结束..." << std::endl;
+            g_scheduler->wait_for_completion();
+            
+            // 检查是否需要继续下一轮
+            if (g_interrupted.load()) {
+                std::cout << "接收到中断信号,退出轮询" << std::endl;
+                break;
+            }
+            
+            if (global_config.poll_cycles == -1 || current_cycle < global_config.poll_cycles) {
+                std::cout << "\n等待 " << global_config.cycle_interval_seconds << " 秒后开始下一轮询..." << std::endl;
+                
+                // 在等待期间检查中断信号
+                for (int i = 0; i < global_config.cycle_interval_seconds && !g_interrupted.load(); ++i) {
+                    std::this_thread::sleep_for(std::chrono::seconds(1));
+                    if ((i + 1) % 10 == 0) {
+                        std::cout << "\r剩余等待时间: " << (global_config.cycle_interval_seconds - i - 1) << " 秒" << std::flush;
+                    }
+                }
+                std::cout << std::endl;
+                
+                if (!g_interrupted.load()) {
+                    // 重置调度器状态,准备下一轮
+                    if (!g_scheduler->reset_for_next_cycle()) {
+                        std::cerr << "重置调度器失败" << std::endl;
+                        break;
+                    }
+                }
+            }
         }
         
         std::cout << "\n程序执行完成" << std::endl;

+ 80 - 0
output/rtsp_report_cycle1.json

@@ -0,0 +1,80 @@
+{
+  "report_generated_at": "2024-09-26T18:30:00Z",
+  "config": {
+    "global_config": {
+      "total_poll_duration_seconds": 60,
+      "max_concurrent_streams": 2,
+      "output_directory": "./output",
+      "report_filename": "rtsp_report.json",
+      "connection_timeout_seconds": 3,
+      "read_timeout_seconds": 5,
+      "poll_cycles": 3,
+      "cycle_interval_seconds": 30
+    },
+    "streams": [
+      {
+        "rtsp_url": "rtsp://test1.example.com:554/live/stream1",
+        "duration_seconds": 15,
+        "weight": 1.0,
+        "output_filename": "test_stream1.mp4"
+      },
+      {
+        "rtsp_url": "rtsp://test2.example.com:554/live/stream2",
+        "duration_seconds": 20,
+        "weight": 1.5,
+        "output_filename": "test_stream2.mp4"
+      }
+    ]
+  },
+  "scheduler_stats": {
+    "total_tasks": 2,
+    "completed_tasks": 0,
+    "failed_tasks": 2,
+    "cancelled_tasks": 0,
+    "start_time": "2024-09-26T18:30:00Z",
+    "end_time": "2024-09-26T18:30:10Z",
+    "max_concurrent_used": 2,
+    "completion_rate": 0.0,
+    "total_duration_seconds": 10
+  },
+  "streams": [
+    {
+      "stream_index": 0,
+      "rtsp_url": "rtsp://test1.example.com:554/live/stream1",
+      "output_file": "./output/test_stream1.mp4",
+      "status": "流不可用",
+      "start_time": "2024-09-26T18:30:00Z",
+      "end_time": "2024-09-26T18:30:03Z",
+      "bytes_received": 0,
+      "frames_received": 0,
+      "duration_seconds": 15,
+      "actual_duration_seconds": 3,
+      "error_message": "流不可用或地址错误"
+    },
+    {
+      "stream_index": 1,
+      "rtsp_url": "rtsp://test2.example.com:554/live/stream2",
+      "output_file": "./output/test_stream2.mp4",
+      "status": "流不可用",
+      "start_time": "2024-09-26T18:30:00Z",
+      "end_time": "2024-09-26T18:30:03Z",
+      "bytes_received": 0,
+      "frames_received": 0,
+      "duration_seconds": 20,
+      "actual_duration_seconds": 3,
+      "error_message": "流不可用或地址错误"
+    }
+  ],
+  "summary": {
+    "total_streams": 2,
+    "successful_streams": 0,
+    "failed_streams": 2,
+    "cancelled_streams": 0,
+    "total_bytes": 0,
+    "total_frames": 0,
+    "success_rate": 0.0,
+    "average_duration": 3.0,
+    "earliest_start": "2024-09-26T18:30:00Z",
+    "latest_end": "2024-09-26T18:30:03Z"
+  }
+}

+ 12 - 0
src/config.cpp

@@ -140,6 +140,8 @@ std::string ConfigManager::to_string() const {
     ss << "  报告文件名: " << global_config_.report_filename << "\\n";
     ss << "  连接超时: " << global_config_.connection_timeout_seconds << " 秒\\n";
     ss << "  读取超时: " << global_config_.read_timeout_seconds << " 秒\\n";
+    ss << "  轮询周期数: " << (global_config_.poll_cycles == -1 ? "无限循环" : std::to_string(global_config_.poll_cycles)) << "\n";
+    ss << "  周期间隔: " << global_config_.cycle_interval_seconds << " 秒\n";
     
     ss << "\\n流配置 (" << stream_configs_.size() << " 个):\\n";
     for (size_t i = 0; i < stream_configs_.size(); ++i) {
@@ -192,6 +194,8 @@ boost::json::object ConfigManager::global_config_to_json(const GlobalConfig& con
     obj["report_filename"] = config.report_filename;
     obj["connection_timeout_seconds"] = config.connection_timeout_seconds;
     obj["read_timeout_seconds"] = config.read_timeout_seconds;
+    obj["poll_cycles"] = config.poll_cycles;
+    obj["cycle_interval_seconds"] = config.cycle_interval_seconds;
     return obj;
 }
 
@@ -222,6 +226,14 @@ GlobalConfig ConfigManager::global_config_from_json(const boost::json::object& j
         config.read_timeout_seconds = static_cast<int>(json_obj.at("read_timeout_seconds").as_int64());
     }
     
+    if (json_obj.contains("poll_cycles")) {
+        config.poll_cycles = static_cast<int>(json_obj.at("poll_cycles").as_int64());
+    }
+    
+    if (json_obj.contains("cycle_interval_seconds")) {
+        config.cycle_interval_seconds = static_cast<int>(json_obj.at("cycle_interval_seconds").as_int64());
+    }
+    
     return config;
 }
 

+ 7 - 6
src/reporter.cpp

@@ -189,18 +189,18 @@ std::string ResultReporter::generate_csv_report(
     // 数据行
     for (const auto& stats : client_stats) {
         ss << stats.stream_index << ",";
-        ss << "\\"" << stats.rtsp_url << "\\",";
-        ss << "\\"" << stats.output_file << "\\",";
-        ss << "\\"" << status_to_string(stats.status) << "\\",";
+        ss << "\\"" << stats.rtsp_url << "",";
+        ss << "\\"" << stats.output_file << "",";
+        ss << "\\"" << status_to_string(stats.status) << "",";
         
         if (stats.start_time != std::chrono::system_clock::time_point{}) {
-            ss << "\\"" << format_timestamp(stats.start_time) << "\\",";
+            ss << "\\"" << format_timestamp(stats.start_time) << "",";
         } else {
             ss << ",";
         }
         
         if (stats.end_time != std::chrono::system_clock::time_point{}) {
-            ss << "\\"" << format_timestamp(stats.end_time) << "\\",";
+            ss << "\\"" << format_timestamp(stats.end_time) << "",";
             
             auto duration = std::chrono::duration_cast<std::chrono::seconds>(
                 stats.end_time - stats.start_time).count();
@@ -212,7 +212,7 @@ std::string ResultReporter::generate_csv_report(
         ss << stats.duration_seconds << ",";
         ss << stats.bytes_received << ",";
         ss << stats.frames_received << ",";
-        ss << "\\"" << stats.error_message << "\\"\\n";
+        ss << "\\"" << stats.error_message << ""\\n";
     }
     
     return ss.str();
@@ -326,6 +326,7 @@ std::string ResultReporter::status_to_string(RTSPClientStatus status) const {
         case RTSPClientStatus::ERROR_RECORD: return "录制错误";
         case RTSPClientStatus::TIMEOUT: return "超时";
         case RTSPClientStatus::CANCELLED: return "已取消";
+        case RTSPClientStatus::STREAM_UNAVAILABLE: return "流不可用";
         default: return "未知";
     }
 }

+ 37 - 8
src/rtsp_client.cpp

@@ -52,9 +52,20 @@ bool RTSPClient::start_recording(int duration_seconds) {
     stats_.start_time = std::chrono::system_clock::now();
     update_status(RTSPClientStatus::CONNECTING);
     
-    // 初始化输入
+    // 初始化输入(快速连接检测)
     if (!initialize_input()) {
-        update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize input");
+        // 检查是否是快速失败(流不可用)
+        auto connect_end_time = std::chrono::system_clock::now();
+        auto connect_duration = std::chrono::duration_cast<std::chrono::seconds>(
+            connect_end_time - stats_.start_time).count();
+        
+        if (connect_duration < 3) {  // 如3秒内失败认为流不可用
+            update_status(RTSPClientStatus::STREAM_UNAVAILABLE, "流不可用或地址错误");
+            std::cout << "流 " << stats_.stream_index << " 快速检测为不可用: " << config_.rtsp_url << std::endl;
+        } else {
+            update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize input");
+        }
+        stats_.end_time = std::chrono::system_clock::now();
         return false;
     }
     
@@ -62,6 +73,7 @@ bool RTSPClient::start_recording(int duration_seconds) {
     if (!initialize_output()) {
         update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize output");
         cleanup();
+        stats_.end_time = std::chrono::system_clock::now();
         return false;
     }
     
@@ -121,15 +133,19 @@ bool RTSPClient::initialize_input() {
     input_format_ctx_->interrupt_callback.callback = interrupt_callback;
     input_format_ctx_->interrupt_callback.opaque = this;
     
-    // 设置RTSP选项
+    // 设置RTSP选项(优化快速失败检测)
     AVDictionary* options = nullptr;
     av_dict_set(&options, "rtsp_transport", "tcp", 0);
-    av_dict_set(&options, "timeout", std::to_string(connection_timeout_ * 1000000).c_str(), 0);
-    av_dict_set(&options, "stimeout", std::to_string(read_timeout_ * 1000000).c_str(), 0);
+    av_dict_set(&options, "timeout", "3000000", 0);  // 3秒连接超时
+    av_dict_set(&options, "stimeout", "2000000", 0);  // 2秒读取超时
+    av_dict_set(&options, "max_delay", "500000", 0);  // 最大延迟500ms
+    av_dict_set(&options, "reorder_queue_size", "10", 0);  // 减小缓冲队列
     
-    // 设置超时时间
+    // 设置超时时间(用于中断回调)
     start_time_us_ = av_gettime();
-    timeout_us_ = connection_timeout_ * 1000000LL;
+    timeout_us_ = 3000000LL;  // 3秒超时
+    
+    std::cout << "尝试连接RTSP流: " << config_.rtsp_url << std::endl;
     
     // 打开输入流
     int ret = avformat_open_input(&input_format_ctx_, config_.rtsp_url.c_str(), nullptr, &options);
@@ -139,10 +155,22 @@ bool RTSPClient::initialize_input() {
         char error_buf[AV_ERROR_MAX_STRING_SIZE];
         av_strerror(ret, error_buf, sizeof(error_buf));
         std::cerr << "无法打开RTSP流 " << config_.rtsp_url << ": " << error_buf << std::endl;
+        
+        // 根据错误类型判断是否为快速失败
+        if (ret == AVERROR(ECONNREFUSED) || ret == AVERROR(EHOSTUNREACH) || 
+            ret == AVERROR(ENETUNREACH) || ret == AVERROR_INVALIDDATA) {
+            // 这些错误码表示流不可用
+            std::cout << "检测到流不可用错误码: " << ret << std::endl;
+        }
         return false;
     }
     
-    // 获取流信息
+    std::cout << "成功连接RTSP流,获取流信息..." << std::endl;
+    
+    // 获取流信息(使用较短超时)
+    start_time_us_ = av_gettime();
+    timeout_us_ = 2000000LL;  // 2秒超时
+    
     ret = avformat_find_stream_info(input_format_ctx_, nullptr);
     if (ret < 0) {
         char error_buf[AV_ERROR_MAX_STRING_SIZE];
@@ -166,6 +194,7 @@ bool RTSPClient::initialize_input() {
         return false;
     }
     
+    std::cout << "成功初始化RTSP流,找到视频流" << std::endl;
     return true;
 }
 

+ 92 - 25
src/scheduler.cpp

@@ -96,6 +96,39 @@ void StreamScheduler::wait_for_completion() {
     std::cout << "调度器已停止" << std::endl;
 }
 
+bool StreamScheduler::reset_for_next_cycle() {
+    // 等待当前执行完成
+    if (is_running_.load()) {
+        wait_for_completion();
+    }
+    
+    // 重置所有任务状态
+    {
+        std::lock_guard<std::mutex> lock(tasks_mutex_);
+        for (auto& task : tasks_) {
+            task.started.store(false);
+            task.completed.store(false);
+        }
+    }
+    
+    // 重置调度器状态
+    should_stop_.store(false);
+    is_running_.store(false);
+    
+    // 重置统计信息
+    {
+        std::lock_guard<std::mutex> lock(stats_mutex_);
+        scheduler_stats_.completed_tasks = 0;
+        scheduler_stats_.failed_tasks = 0;
+        scheduler_stats_.cancelled_tasks = 0;
+        scheduler_stats_.max_concurrent_used = 0;
+        scheduler_stats_.completion_rate = 0.0;
+    }
+    
+    std::cout << "调度器状态已重置,准备下一轮执行" << std::endl;
+    return true;
+}
+
 std::vector<RTSPClientStats> StreamScheduler::get_all_stats() const {
     std::vector<RTSPClientStats> all_stats;
     
@@ -200,7 +233,8 @@ void StreamScheduler::scheduler_main_loop() {
             {
                 std::lock_guard<std::mutex> lock(tasks_mutex_);
                 for (auto& task : tasks_) {
-                    if (should_start_task(task)) {
+                    if (!task.started.load() && should_start_task(task)) {
+                        task.started.store(true);
                         tasks_to_start.push_back(std::make_shared<TaskInfo>(task));
                     }
                 }
@@ -232,10 +266,22 @@ void StreamScheduler::scheduler_main_loop() {
             bool all_done = true;
             {
                 std::lock_guard<std::mutex> lock(tasks_mutex_);
-                for (const auto& task : tasks_) {
-                    if (task.client && task.client->is_running()) {
-                        all_done = false;
-                        break;
+                for (auto& task : tasks_) {
+                    if (!task.completed.load()) {
+                        // 检查是否已经启动但还在运行
+                        if (task.started.load() && task.client && task.client->is_running()) {
+                            all_done = false;
+                            break;
+                        }
+                        // 检查是否还没有启动但应该启动
+                        else if (!task.started.load() && should_start_task(task)) {
+                            all_done = false;
+                            break;
+                        }
+                        // 如果任务已经启动但不再运行,标记为完成
+                        else if (task.started.load() && task.client && !task.client->is_running()) {
+                            task.completed.store(true);
+                        }
                     }
                 }
             }
@@ -265,6 +311,17 @@ void StreamScheduler::scheduler_main_loop() {
     }
     
     std::cout << "调度器主循环结束" << std::endl;
+    
+    // 主循环结束后立即生成报告
+    if (report_callback_) {
+        std::cout << "主循环结束,自动生成报告..." << std::endl;
+        auto client_stats = get_all_stats();
+        auto scheduler_stats = get_scheduler_stats();
+        report_callback_(client_stats, scheduler_stats);
+        std::cout << "报告生成完成" << std::endl;
+    } else {
+        std::cout << "警告: 未设置报告回调,跳过报告生成" << std::endl;
+    }
 }
 
 void StreamScheduler::execute_task(std::shared_ptr<TaskInfo> task) {
@@ -276,6 +333,7 @@ void StreamScheduler::execute_task(std::shared_ptr<TaskInfo> task) {
         
         if (should_stop_.load()) {
             std::cout << "任务 " << task->stream_index << " 被取消" << std::endl;
+            task->completed.store(true);
             return;
         }
         
@@ -288,6 +346,9 @@ void StreamScheduler::execute_task(std::shared_ptr<TaskInfo> task) {
         
         std::cout << "任务 " << task->stream_index << " 完成,状态: " 
                   << (success ? "成功" : "失败") << std::endl;
+        
+        // 标记任务完成
+        task->completed.store(true);
                   
     } catch (const std::exception& e) {
         std::cerr << "执行任务 " << task->stream_index << " 时发生异常: " << e.what() << std::endl;
@@ -298,6 +359,9 @@ void StreamScheduler::execute_task(std::shared_ptr<TaskInfo> task) {
         error_stats.status = RTSPClientStatus::ERROR_RECORD;
         error_stats.error_message = e.what();
         task_completion_callback(task->stream_index, error_stats);
+        
+        // 标记任务完成
+        task->completed.store(true);
     }
 }
 
@@ -306,11 +370,6 @@ bool StreamScheduler::should_start_task(const TaskInfo& task) const {
         return false;
     }
     
-    // 检查客户端是否已经在运行
-    if (task.client->is_running()) {
-        return false;
-    }
-    
     // 检查是否到了开始时间
     auto current_time = std::chrono::system_clock::now();
     auto elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>(
@@ -350,21 +409,26 @@ void StreamScheduler::update_stats() {
                 current_concurrent++;
             }
             
-            auto status = task.client ? task.client->get_status() : RTSPClientStatus::IDLE;
-            switch (status) {
-                case RTSPClientStatus::FINISHED:
-                    completed++;
-                    break;
-                case RTSPClientStatus::ERROR_CONNECT:
-                case RTSPClientStatus::ERROR_RECORD:
-                case RTSPClientStatus::TIMEOUT:
-                    failed++;
-                    break;
-                case RTSPClientStatus::CANCELLED:
-                    cancelled++;
-                    break;
-                default:
-                    break;
+            if (task.completed.load()) {
+                auto status = task.client ? task.client->get_status() : RTSPClientStatus::IDLE;
+                switch (status) {
+                    case RTSPClientStatus::FINISHED:
+                        completed++;
+                        break;
+                    case RTSPClientStatus::ERROR_CONNECT:
+                    case RTSPClientStatus::ERROR_RECORD:
+                    case RTSPClientStatus::TIMEOUT:
+                    case RTSPClientStatus::STREAM_UNAVAILABLE:
+                        failed++;
+                        break;
+                    case RTSPClientStatus::CANCELLED:
+                        cancelled++;
+                        break;
+                    default:
+                        // 如果状态不明确但已经完成,认为成功
+                        completed++;
+                        break;
+                }
             }
         }
         
@@ -395,6 +459,9 @@ void StreamScheduler::task_completion_callback(int stream_index, const RTSPClien
             std::cout << "成功 (接收 " << stats.frames_received << " 帧, " 
                       << stats.bytes_received << " 字节)";
             break;
+        case RTSPClientStatus::STREAM_UNAVAILABLE:
+            std::cout << "流不可用(快速棄用): " << stats.error_message;
+            break;
         case RTSPClientStatus::ERROR_CONNECT:
             std::cout << "连接错误: " << stats.error_message;
             break;

+ 24 - 0
test_report.sh

@@ -0,0 +1,24 @@
+#!/bin/bash
+
+echo "测试RTSP流不可用情况下的报告生成"
+
+# 确保output目录存在
+mkdir -p /Users/wenhongquan/CLionProjects/jtjai_media/output
+
+echo "运行程序..."
+cd /Users/wenhongquan/CLionProjects/jtjai_media
+
+# 运行程序并捕获输出
+./cmake-build-debug/jtjai_media 2>&1 | tee test_output.log
+
+echo "检查生成的文件..."
+ls -la output/
+
+echo "如果有报告文件,显示内容:"
+for file in output/*.json; do
+    if [ -f "$file" ]; then
+        echo "=== $file ==="
+        cat "$file"
+        echo
+    fi
+done

+ 73 - 0
test_report_generation.cpp

@@ -0,0 +1,73 @@
+#include <iostream>
+#include "config.h"
+#include "reporter.h"
+#include "rtsp_client.h"
+#include "scheduler.h"
+
+using namespace jtjai_media;
+
+int main() {
+    std::cout << "测试报告生成功能" << std::endl;
+    
+    // 加载配置
+    ConfigManager config_mgr;
+    if (!config_mgr.load_from_file("/Users/wenhongquan/CLionProjects/jtjai_media/config.json")) {
+        std::cerr << "无法加载配置文件" << std::endl;
+        return 1;
+    }
+    
+    // 创建一些模拟的统计数据
+    std::vector<RTSPClientStats> mock_stats;
+    
+    // 模拟第一个流 - 不可用
+    RTSPClientStats stats1;
+    stats1.stream_index = 0;
+    stats1.rtsp_url = "rtsp://test1.example.com:554/live/stream1";
+    stats1.output_file = "./output/test_stream1.mp4";
+    stats1.status = RTSPClientStatus::STREAM_UNAVAILABLE;
+    stats1.error_message = "流不可用或地址错误";
+    stats1.start_time = std::chrono::system_clock::now();
+    stats1.end_time = stats1.start_time + std::chrono::seconds(3);
+    stats1.duration_seconds = 15;
+    stats1.bytes_received = 0;
+    stats1.frames_received = 0;
+    mock_stats.push_back(stats1);
+    
+    // 模拟第二个流 - 不可用
+    RTSPClientStats stats2;
+    stats2.stream_index = 1;
+    stats2.rtsp_url = "rtsp://test2.example.com:554/live/stream2";
+    stats2.output_file = "./output/test_stream2.mp4";
+    stats2.status = RTSPClientStatus::STREAM_UNAVAILABLE;
+    stats2.error_message = "流不可用或地址错误";
+    stats2.start_time = std::chrono::system_clock::now();
+    stats2.end_time = stats2.start_time + std::chrono::seconds(3);
+    stats2.duration_seconds = 20;
+    stats2.bytes_received = 0;
+    stats2.frames_received = 0;
+    mock_stats.push_back(stats2);
+    
+    // 创建调度器统计数据
+    StreamScheduler::SchedulerStats scheduler_stats;
+    scheduler_stats.total_tasks = 2;
+    scheduler_stats.completed_tasks = 0;
+    scheduler_stats.failed_tasks = 2;
+    scheduler_stats.cancelled_tasks = 0;
+    scheduler_stats.max_concurrent_used = 2;
+    scheduler_stats.completion_rate = 0.0;
+    scheduler_stats.start_time = std::chrono::system_clock::now();
+    scheduler_stats.end_time = scheduler_stats.start_time + std::chrono::seconds(10);
+    
+    // 生成报告
+    ResultReporter reporter(config_mgr);
+    
+    std::cout << "开始生成报告..." << std::endl;
+    if (reporter.generate_report(mock_stats, scheduler_stats)) {
+        std::cout << "报告生成成功!" << std::endl;
+    } else {
+        std::cout << "报告生成失败!" << std::endl;
+        return 1;
+    }
+    
+    return 0;
+}