rtsp_client.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. #include "rtsp_client.h"
  2. #include <iostream>
  3. #include <thread>
  4. #include <filesystem>
  5. #include <cstdarg>
  6. namespace jtjai_media {
  7. RTSPClient::RTSPClient(int stream_index, const StreamConfig& config, const std::string& output_dir)
  8. : config_(config)
  9. , output_dir_(output_dir)
  10. , input_format_ctx_(nullptr)
  11. , output_format_ctx_(nullptr)
  12. , input_video_stream_(nullptr)
  13. , output_video_stream_(nullptr)
  14. , input_audio_stream_(nullptr)
  15. , output_audio_stream_(nullptr)
  16. , should_stop_(false)
  17. , is_recording_(false)
  18. , connection_timeout_(10)
  19. , read_timeout_(30)
  20. , start_time_us_(0)
  21. , timeout_us_(0)
  22. {
  23. stats_.stream_index = stream_index;
  24. stats_.rtsp_url = config_.rtsp_url;
  25. // 确保输出目录存在
  26. std::filesystem::create_directories(output_dir_);
  27. // 构建完整的输出路径
  28. std::filesystem::path output_path(output_dir_);
  29. output_path /= config_.output_filename;
  30. full_output_path_ = output_path.string();
  31. stats_.output_file = full_output_path_;
  32. // 设置FFmpeg日志回调
  33. av_log_set_callback(ffmpeg_log_callback);
  34. }
  35. RTSPClient::~RTSPClient() {
  36. stop_recording();
  37. cleanup();
  38. }
  39. bool RTSPClient::start_recording(int duration_seconds) {
  40. if (is_recording_.load()) {
  41. return false;
  42. }
  43. stats_.duration_seconds = duration_seconds;
  44. stats_.start_time = std::chrono::system_clock::now();
  45. update_status(RTSPClientStatus::CONNECTING);
  46. // 初始化输入(快速连接检测)
  47. if (!initialize_input()) {
  48. // 检查是否是快速失败(流不可用)
  49. auto connect_end_time = std::chrono::system_clock::now();
  50. auto connect_duration = std::chrono::duration_cast<std::chrono::seconds>(
  51. connect_end_time - stats_.start_time).count();
  52. if (connect_duration < 3) { // 如3秒内失败认为流不可用
  53. update_status(RTSPClientStatus::STREAM_UNAVAILABLE, "流不可用或地址错误");
  54. std::cout << "流 " << stats_.stream_index << " 快速检测为不可用: " << config_.rtsp_url << std::endl;
  55. } else {
  56. update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize input");
  57. }
  58. stats_.end_time = std::chrono::system_clock::now();
  59. return false;
  60. }
  61. // 初始化输出
  62. if (!initialize_output()) {
  63. update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize output");
  64. cleanup();
  65. stats_.end_time = std::chrono::system_clock::now();
  66. return false;
  67. }
  68. update_status(RTSPClientStatus::CONNECTED);
  69. // 开始录制
  70. update_status(RTSPClientStatus::RECORDING);
  71. is_recording_.store(true);
  72. bool success = process_stream(duration_seconds);
  73. is_recording_.store(false);
  74. stats_.end_time = std::chrono::system_clock::now();
  75. if (success && !should_stop_.load()) {
  76. update_status(RTSPClientStatus::FINISHED);
  77. } else if (should_stop_.load()) {
  78. update_status(RTSPClientStatus::CANCELLED);
  79. }
  80. cleanup();
  81. return success;
  82. }
  83. void RTSPClient::start_recording_async(int duration_seconds,
  84. std::function<void(const RTSPClientStats&)> completion_callback) {
  85. std::thread([this, duration_seconds, completion_callback]() {
  86. bool success = start_recording(duration_seconds);
  87. if (completion_callback) {
  88. completion_callback(get_stats());
  89. }
  90. }).detach();
  91. }
  92. void RTSPClient::stop_recording() {
  93. should_stop_.store(true);
  94. }
  95. bool RTSPClient::is_running() const {
  96. return is_recording_.load();
  97. }
  98. RTSPClientStats RTSPClient::get_stats() const {
  99. std::lock_guard<std::mutex> lock(stats_mutex_);
  100. return stats_;
  101. }
  102. bool RTSPClient::initialize_input() {
  103. // 分配输入格式上下文
  104. input_format_ctx_ = avformat_alloc_context();
  105. if (!input_format_ctx_) {
  106. return false;
  107. }
  108. // 设置超时回调
  109. input_format_ctx_->interrupt_callback.callback = interrupt_callback;
  110. input_format_ctx_->interrupt_callback.opaque = this;
  111. // 设置RTSP选项(优化快速失败检测)
  112. AVDictionary* options = nullptr;
  113. av_dict_set(&options, "rtsp_transport", "tcp", 0);
  114. av_dict_set(&options, "timeout", "3000000", 0); // 3秒连接超时
  115. av_dict_set(&options, "stimeout", "2000000", 0); // 2秒读取超时
  116. av_dict_set(&options, "max_delay", "500000", 0); // 最大延迟500ms
  117. av_dict_set(&options, "reorder_queue_size", "10", 0); // 减小缓冲队列
  118. // 设置超时时间(用于中断回调)
  119. start_time_us_ = av_gettime();
  120. timeout_us_ = 3000000LL; // 3秒超时
  121. std::cout << "尝试连接RTSP流: " << config_.rtsp_url << std::endl;
  122. // 打开输入流
  123. int ret = avformat_open_input(&input_format_ctx_, config_.rtsp_url.c_str(), nullptr, &options);
  124. av_dict_free(&options);
  125. if (ret < 0) {
  126. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  127. av_strerror(ret, error_buf, sizeof(error_buf));
  128. std::cerr << "无法打开RTSP流 " << config_.rtsp_url << ": " << error_buf << std::endl;
  129. // 根据错误类型判断是否为快速失败
  130. if (ret == AVERROR(ECONNREFUSED) || ret == AVERROR(EHOSTUNREACH) ||
  131. ret == AVERROR(ENETUNREACH) || ret == AVERROR_INVALIDDATA) {
  132. // 这些错误码表示流不可用
  133. std::cout << "检测到流不可用错误码: " << ret << std::endl;
  134. }
  135. return false;
  136. }
  137. std::cout << "成功连接RTSP流,获取流信息..." << std::endl;
  138. // 获取流信息(使用较短超时)
  139. start_time_us_ = av_gettime();
  140. timeout_us_ = 2000000LL; // 2秒超时
  141. ret = avformat_find_stream_info(input_format_ctx_, nullptr);
  142. if (ret < 0) {
  143. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  144. av_strerror(ret, error_buf, sizeof(error_buf));
  145. std::cerr << "无法获取流信息: " << error_buf << std::endl;
  146. return false;
  147. }
  148. // 查找视频和音频流
  149. for (unsigned int i = 0; i < input_format_ctx_->nb_streams; i++) {
  150. AVStream* stream = input_format_ctx_->streams[i];
  151. if (stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && !input_video_stream_) {
  152. input_video_stream_ = stream;
  153. } else if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && !input_audio_stream_) {
  154. input_audio_stream_ = stream;
  155. }
  156. }
  157. if (!input_video_stream_) {
  158. std::cerr << "未找到视频流" << std::endl;
  159. return false;
  160. }
  161. std::cout << "成功初始化RTSP流,找到视频流" << std::endl;
  162. return true;
  163. }
  164. bool RTSPClient::initialize_output() {
  165. // 分配输出格式上下文
  166. int ret = avformat_alloc_output_context2(&output_format_ctx_, nullptr, nullptr, full_output_path_.c_str());
  167. if (ret < 0) {
  168. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  169. av_strerror(ret, error_buf, sizeof(error_buf));
  170. std::cerr << "无法创建输出格式上下文: " << error_buf << std::endl;
  171. return false;
  172. }
  173. // 复制视频流
  174. if (input_video_stream_) {
  175. output_video_stream_ = avformat_new_stream(output_format_ctx_, nullptr);
  176. if (!output_video_stream_) {
  177. std::cerr << "无法创建输出视频流" << std::endl;
  178. return false;
  179. }
  180. ret = avcodec_parameters_copy(output_video_stream_->codecpar, input_video_stream_->codecpar);
  181. if (ret < 0) {
  182. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  183. av_strerror(ret, error_buf, sizeof(error_buf));
  184. std::cerr << "无法复制视频流参数: " << error_buf << std::endl;
  185. return false;
  186. }
  187. output_video_stream_->codecpar->codec_tag = 0;
  188. }
  189. // 复制音频流(如果存在)
  190. if (input_audio_stream_) {
  191. output_audio_stream_ = avformat_new_stream(output_format_ctx_, nullptr);
  192. if (!output_audio_stream_) {
  193. std::cerr << "无法创建输出音频流" << std::endl;
  194. return false;
  195. }
  196. ret = avcodec_parameters_copy(output_audio_stream_->codecpar, input_audio_stream_->codecpar);
  197. if (ret < 0) {
  198. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  199. av_strerror(ret, error_buf, sizeof(error_buf));
  200. std::cerr << "无法复制音频流参数: " << error_buf << std::endl;
  201. return false;
  202. }
  203. output_audio_stream_->codecpar->codec_tag = 0;
  204. }
  205. // 打开输出文件
  206. if (!(output_format_ctx_->oformat->flags & AVFMT_NOFILE)) {
  207. ret = avio_open(&output_format_ctx_->pb, full_output_path_.c_str(), AVIO_FLAG_WRITE);
  208. if (ret < 0) {
  209. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  210. av_strerror(ret, error_buf, sizeof(error_buf));
  211. std::cerr << "无法打开输出文件 " << full_output_path_ << ": " << error_buf << std::endl;
  212. return false;
  213. }
  214. }
  215. // 写入文件头
  216. ret = avformat_write_header(output_format_ctx_, nullptr);
  217. if (ret < 0) {
  218. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  219. av_strerror(ret, error_buf, sizeof(error_buf));
  220. std::cerr << "无法写入输出文件头: " << error_buf << std::endl;
  221. return false;
  222. }
  223. return true;
  224. }
  225. bool RTSPClient::process_stream(int duration_seconds) {
  226. AVPacket* packet = av_packet_alloc();
  227. if (!packet) {
  228. return false;
  229. }
  230. auto start_time = std::chrono::steady_clock::now();
  231. auto end_time = start_time + std::chrono::seconds(duration_seconds);
  232. int64_t frames_count = 0;
  233. int64_t bytes_count = 0;
  234. while (!should_stop_.load() && std::chrono::steady_clock::now() < end_time) {
  235. // 设置读取超时
  236. start_time_us_ = av_gettime();
  237. timeout_us_ = read_timeout_ * 1000000LL;
  238. int ret = av_read_frame(input_format_ctx_, packet);
  239. if (ret < 0) {
  240. if (ret == AVERROR_EOF) {
  241. std::cout << "流结束" << std::endl;
  242. break;
  243. } else if (ret == AVERROR(EAGAIN)) {
  244. continue;
  245. } else {
  246. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  247. av_strerror(ret, error_buf, sizeof(error_buf));
  248. std::cerr << "读取数据包错误: " << error_buf << std::endl;
  249. av_packet_free(&packet);
  250. update_status(RTSPClientStatus::ERROR_RECORD, error_buf);
  251. return false;
  252. }
  253. }
  254. // 确定输出流索引
  255. AVStream* output_stream = nullptr;
  256. if (packet->stream_index == input_video_stream_->index) {
  257. output_stream = output_video_stream_;
  258. } else if (input_audio_stream_ && packet->stream_index == input_audio_stream_->index) {
  259. output_stream = output_audio_stream_;
  260. }
  261. if (output_stream) {
  262. // 转换时间戳
  263. AVStream* input_stream = input_format_ctx_->streams[packet->stream_index];
  264. packet->pts = av_rescale_q_rnd(packet->pts, input_stream->time_base,
  265. output_stream->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
  266. packet->dts = av_rescale_q_rnd(packet->dts, input_stream->time_base,
  267. output_stream->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
  268. packet->duration = av_rescale_q(packet->duration, input_stream->time_base, output_stream->time_base);
  269. packet->stream_index = output_stream->index;
  270. packet->pos = -1;
  271. // 写入数据包
  272. ret = av_interleaved_write_frame(output_format_ctx_, packet);
  273. if (ret < 0) {
  274. char error_buf[AV_ERROR_MAX_STRING_SIZE];
  275. av_strerror(ret, error_buf, sizeof(error_buf));
  276. std::cerr << "写入数据包错误: " << error_buf << std::endl;
  277. } else {
  278. frames_count++;
  279. bytes_count += packet->size;
  280. }
  281. }
  282. av_packet_unref(packet);
  283. // 更新统计信息
  284. {
  285. std::lock_guard<std::mutex> lock(stats_mutex_);
  286. stats_.frames_received = frames_count;
  287. stats_.bytes_received = bytes_count;
  288. }
  289. }
  290. // 写入文件尾
  291. if (output_format_ctx_) {
  292. av_write_trailer(output_format_ctx_);
  293. }
  294. av_packet_free(&packet);
  295. return true;
  296. }
  297. void RTSPClient::cleanup() {
  298. if (input_format_ctx_) {
  299. avformat_close_input(&input_format_ctx_);
  300. input_format_ctx_ = nullptr;
  301. }
  302. if (output_format_ctx_) {
  303. if (output_format_ctx_->pb && !(output_format_ctx_->oformat->flags & AVFMT_NOFILE)) {
  304. avio_closep(&output_format_ctx_->pb);
  305. }
  306. avformat_free_context(output_format_ctx_);
  307. output_format_ctx_ = nullptr;
  308. }
  309. input_video_stream_ = nullptr;
  310. output_video_stream_ = nullptr;
  311. input_audio_stream_ = nullptr;
  312. output_audio_stream_ = nullptr;
  313. }
  314. void RTSPClient::update_status(RTSPClientStatus status, const std::string& error_msg) {
  315. std::lock_guard<std::mutex> lock(stats_mutex_);
  316. stats_.status = status;
  317. if (!error_msg.empty()) {
  318. stats_.error_message = error_msg;
  319. }
  320. }
  321. void RTSPClient::ffmpeg_log_callback(void* ptr, int level, const char* fmt, va_list vl) {
  322. if (level > AV_LOG_WARNING) {
  323. return;
  324. }
  325. char buffer[1024];
  326. vsnprintf(buffer, sizeof(buffer), fmt, vl);
  327. // 过滤一些常见的无害消息
  328. std::string msg(buffer);
  329. if (msg.find("Last message repeated") != std::string::npos ||
  330. msg.find("Application provided invalid") != std::string::npos) {
  331. return;
  332. }
  333. std::cerr << "FFmpeg: " << buffer;
  334. }
  335. int RTSPClient::interrupt_callback(void* ctx) {
  336. RTSPClient* client = static_cast<RTSPClient*>(ctx);
  337. if (client->should_stop_.load()) {
  338. return 1;
  339. }
  340. // 检查超时
  341. if (client->timeout_us_ > 0) {
  342. int64_t elapsed = av_gettime() - client->start_time_us_;
  343. if (elapsed > client->timeout_us_) {
  344. std::cerr << "操作超时" << std::endl;
  345. return 1;
  346. }
  347. }
  348. return 0;
  349. }
  350. } // namespace jtjai_media