123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- #include "rtsp_client.h"
- #include <iostream>
- #include <thread>
- #include <filesystem>
- #include <cstdarg>
- namespace jtjai_media {
- RTSPClient::RTSPClient(int stream_index, const StreamConfig& config, const std::string& output_dir)
- : config_(config)
- , output_dir_(output_dir)
- , input_format_ctx_(nullptr)
- , output_format_ctx_(nullptr)
- , input_video_stream_(nullptr)
- , output_video_stream_(nullptr)
- , input_audio_stream_(nullptr)
- , output_audio_stream_(nullptr)
- , should_stop_(false)
- , is_recording_(false)
- , connection_timeout_(10)
- , read_timeout_(30)
- , start_time_us_(0)
- , timeout_us_(0)
- {
- stats_.stream_index = stream_index;
- stats_.rtsp_url = config_.rtsp_url;
-
- // 确保输出目录存在
- std::filesystem::create_directories(output_dir_);
-
- // 构建完整的输出路径
- std::filesystem::path output_path(output_dir_);
- output_path /= config_.output_filename;
- full_output_path_ = output_path.string();
- stats_.output_file = full_output_path_;
-
- // 设置FFmpeg日志回调
- av_log_set_callback(ffmpeg_log_callback);
- }
- RTSPClient::~RTSPClient() {
- stop_recording();
- cleanup();
- }
- bool RTSPClient::start_recording(int duration_seconds) {
- if (is_recording_.load()) {
- return false;
- }
-
- stats_.duration_seconds = duration_seconds;
- stats_.start_time = std::chrono::system_clock::now();
- update_status(RTSPClientStatus::CONNECTING);
-
- // 初始化输入(快速连接检测)
- if (!initialize_input()) {
- // 检查是否是快速失败(流不可用)
- auto connect_end_time = std::chrono::system_clock::now();
- auto connect_duration = std::chrono::duration_cast<std::chrono::seconds>(
- connect_end_time - stats_.start_time).count();
-
- if (connect_duration < 3) { // 如3秒内失败认为流不可用
- update_status(RTSPClientStatus::STREAM_UNAVAILABLE, "流不可用或地址错误");
- std::cout << "流 " << stats_.stream_index << " 快速检测为不可用: " << config_.rtsp_url << std::endl;
- } else {
- update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize input");
- }
- stats_.end_time = std::chrono::system_clock::now();
- return false;
- }
-
- // 初始化输出
- if (!initialize_output()) {
- update_status(RTSPClientStatus::ERROR_CONNECT, "Failed to initialize output");
- cleanup();
- stats_.end_time = std::chrono::system_clock::now();
- return false;
- }
-
- update_status(RTSPClientStatus::CONNECTED);
-
- // 开始录制
- update_status(RTSPClientStatus::RECORDING);
- is_recording_.store(true);
-
- bool success = process_stream(duration_seconds);
-
- is_recording_.store(false);
- stats_.end_time = std::chrono::system_clock::now();
-
- if (success && !should_stop_.load()) {
- update_status(RTSPClientStatus::FINISHED);
- } else if (should_stop_.load()) {
- update_status(RTSPClientStatus::CANCELLED);
- }
-
- cleanup();
- return success;
- }
- void RTSPClient::start_recording_async(int duration_seconds,
- std::function<void(const RTSPClientStats&)> completion_callback) {
-
- std::thread([this, duration_seconds, completion_callback]() {
- bool success = start_recording(duration_seconds);
- if (completion_callback) {
- completion_callback(get_stats());
- }
- }).detach();
- }
- void RTSPClient::stop_recording() {
- should_stop_.store(true);
- }
- bool RTSPClient::is_running() const {
- return is_recording_.load();
- }
- RTSPClientStats RTSPClient::get_stats() const {
- std::lock_guard<std::mutex> lock(stats_mutex_);
- return stats_;
- }
- bool RTSPClient::initialize_input() {
- // 分配输入格式上下文
- input_format_ctx_ = avformat_alloc_context();
- if (!input_format_ctx_) {
- return false;
- }
-
- // 设置超时回调
- input_format_ctx_->interrupt_callback.callback = interrupt_callback;
- input_format_ctx_->interrupt_callback.opaque = this;
-
- // 设置RTSP选项(优化快速失败检测)
- AVDictionary* options = nullptr;
- av_dict_set(&options, "rtsp_transport", "tcp", 0);
- av_dict_set(&options, "timeout", "3000000", 0); // 3秒连接超时
- av_dict_set(&options, "stimeout", "2000000", 0); // 2秒读取超时
- av_dict_set(&options, "max_delay", "500000", 0); // 最大延迟500ms
- av_dict_set(&options, "reorder_queue_size", "10", 0); // 减小缓冲队列
-
- // 设置超时时间(用于中断回调)
- start_time_us_ = av_gettime();
- timeout_us_ = 3000000LL; // 3秒超时
-
- std::cout << "尝试连接RTSP流: " << config_.rtsp_url << std::endl;
-
- // 打开输入流
- int ret = avformat_open_input(&input_format_ctx_, config_.rtsp_url.c_str(), nullptr, &options);
- av_dict_free(&options);
-
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法打开RTSP流 " << config_.rtsp_url << ": " << error_buf << std::endl;
-
- // 根据错误类型判断是否为快速失败
- if (ret == AVERROR(ECONNREFUSED) || ret == AVERROR(EHOSTUNREACH) ||
- ret == AVERROR(ENETUNREACH) || ret == AVERROR_INVALIDDATA) {
- // 这些错误码表示流不可用
- std::cout << "检测到流不可用错误码: " << ret << std::endl;
- }
- return false;
- }
-
- std::cout << "成功连接RTSP流,获取流信息..." << std::endl;
-
- // 获取流信息(使用较短超时)
- start_time_us_ = av_gettime();
- timeout_us_ = 2000000LL; // 2秒超时
-
- ret = avformat_find_stream_info(input_format_ctx_, nullptr);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法获取流信息: " << error_buf << std::endl;
- return false;
- }
-
- // 查找视频和音频流
- for (unsigned int i = 0; i < input_format_ctx_->nb_streams; i++) {
- AVStream* stream = input_format_ctx_->streams[i];
- if (stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && !input_video_stream_) {
- input_video_stream_ = stream;
- } else if (stream->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && !input_audio_stream_) {
- input_audio_stream_ = stream;
- }
- }
-
- if (!input_video_stream_) {
- std::cerr << "未找到视频流" << std::endl;
- return false;
- }
-
- std::cout << "成功初始化RTSP流,找到视频流" << std::endl;
- return true;
- }
- bool RTSPClient::initialize_output() {
- // 分配输出格式上下文
- int ret = avformat_alloc_output_context2(&output_format_ctx_, nullptr, nullptr, full_output_path_.c_str());
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法创建输出格式上下文: " << error_buf << std::endl;
- return false;
- }
-
- // 复制视频流
- if (input_video_stream_) {
- output_video_stream_ = avformat_new_stream(output_format_ctx_, nullptr);
- if (!output_video_stream_) {
- std::cerr << "无法创建输出视频流" << std::endl;
- return false;
- }
-
- ret = avcodec_parameters_copy(output_video_stream_->codecpar, input_video_stream_->codecpar);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法复制视频流参数: " << error_buf << std::endl;
- return false;
- }
-
- output_video_stream_->codecpar->codec_tag = 0;
- }
-
- // 复制音频流(如果存在)
- if (input_audio_stream_) {
- output_audio_stream_ = avformat_new_stream(output_format_ctx_, nullptr);
- if (!output_audio_stream_) {
- std::cerr << "无法创建输出音频流" << std::endl;
- return false;
- }
-
- ret = avcodec_parameters_copy(output_audio_stream_->codecpar, input_audio_stream_->codecpar);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法复制音频流参数: " << error_buf << std::endl;
- return false;
- }
-
- output_audio_stream_->codecpar->codec_tag = 0;
- }
-
- // 打开输出文件
- if (!(output_format_ctx_->oformat->flags & AVFMT_NOFILE)) {
- ret = avio_open(&output_format_ctx_->pb, full_output_path_.c_str(), AVIO_FLAG_WRITE);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法打开输出文件 " << full_output_path_ << ": " << error_buf << std::endl;
- return false;
- }
- }
-
- // 写入文件头
- ret = avformat_write_header(output_format_ctx_, nullptr);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "无法写入输出文件头: " << error_buf << std::endl;
- return false;
- }
-
- return true;
- }
- bool RTSPClient::process_stream(int duration_seconds) {
- AVPacket* packet = av_packet_alloc();
- if (!packet) {
- return false;
- }
-
- auto start_time = std::chrono::steady_clock::now();
- auto end_time = start_time + std::chrono::seconds(duration_seconds);
-
- int64_t frames_count = 0;
- int64_t bytes_count = 0;
-
- while (!should_stop_.load() && std::chrono::steady_clock::now() < end_time) {
- // 设置读取超时
- start_time_us_ = av_gettime();
- timeout_us_ = read_timeout_ * 1000000LL;
-
- int ret = av_read_frame(input_format_ctx_, packet);
- if (ret < 0) {
- if (ret == AVERROR_EOF) {
- std::cout << "流结束" << std::endl;
- break;
- } else if (ret == AVERROR(EAGAIN)) {
- continue;
- } else {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "读取数据包错误: " << error_buf << std::endl;
- av_packet_free(&packet);
- update_status(RTSPClientStatus::ERROR_RECORD, error_buf);
- return false;
- }
- }
-
- // 确定输出流索引
- AVStream* output_stream = nullptr;
- if (packet->stream_index == input_video_stream_->index) {
- output_stream = output_video_stream_;
- } else if (input_audio_stream_ && packet->stream_index == input_audio_stream_->index) {
- output_stream = output_audio_stream_;
- }
-
- if (output_stream) {
- // 转换时间戳
- AVStream* input_stream = input_format_ctx_->streams[packet->stream_index];
-
- packet->pts = av_rescale_q_rnd(packet->pts, input_stream->time_base,
- output_stream->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
- packet->dts = av_rescale_q_rnd(packet->dts, input_stream->time_base,
- output_stream->time_base, static_cast<AVRounding>(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
- packet->duration = av_rescale_q(packet->duration, input_stream->time_base, output_stream->time_base);
- packet->stream_index = output_stream->index;
- packet->pos = -1;
-
- // 写入数据包
- ret = av_interleaved_write_frame(output_format_ctx_, packet);
- if (ret < 0) {
- char error_buf[AV_ERROR_MAX_STRING_SIZE];
- av_strerror(ret, error_buf, sizeof(error_buf));
- std::cerr << "写入数据包错误: " << error_buf << std::endl;
- } else {
- frames_count++;
- bytes_count += packet->size;
- }
- }
-
- av_packet_unref(packet);
-
- // 更新统计信息
- {
- std::lock_guard<std::mutex> lock(stats_mutex_);
- stats_.frames_received = frames_count;
- stats_.bytes_received = bytes_count;
- }
- }
-
- // 写入文件尾
- if (output_format_ctx_) {
- av_write_trailer(output_format_ctx_);
- }
-
- av_packet_free(&packet);
- return true;
- }
- void RTSPClient::cleanup() {
- if (input_format_ctx_) {
- avformat_close_input(&input_format_ctx_);
- input_format_ctx_ = nullptr;
- }
-
- if (output_format_ctx_) {
- if (output_format_ctx_->pb && !(output_format_ctx_->oformat->flags & AVFMT_NOFILE)) {
- avio_closep(&output_format_ctx_->pb);
- }
- avformat_free_context(output_format_ctx_);
- output_format_ctx_ = nullptr;
- }
-
- input_video_stream_ = nullptr;
- output_video_stream_ = nullptr;
- input_audio_stream_ = nullptr;
- output_audio_stream_ = nullptr;
- }
- void RTSPClient::update_status(RTSPClientStatus status, const std::string& error_msg) {
- std::lock_guard<std::mutex> lock(stats_mutex_);
- stats_.status = status;
- if (!error_msg.empty()) {
- stats_.error_message = error_msg;
- }
- }
- void RTSPClient::ffmpeg_log_callback(void* ptr, int level, const char* fmt, va_list vl) {
- if (level > AV_LOG_WARNING) {
- return;
- }
-
- char buffer[1024];
- vsnprintf(buffer, sizeof(buffer), fmt, vl);
-
- // 过滤一些常见的无害消息
- std::string msg(buffer);
- if (msg.find("Last message repeated") != std::string::npos ||
- msg.find("Application provided invalid") != std::string::npos) {
- return;
- }
-
- std::cerr << "FFmpeg: " << buffer;
- }
- int RTSPClient::interrupt_callback(void* ctx) {
- RTSPClient* client = static_cast<RTSPClient*>(ctx);
- if (client->should_stop_.load()) {
- return 1;
- }
-
- // 检查超时
- if (client->timeout_us_ > 0) {
- int64_t elapsed = av_gettime() - client->start_time_us_;
- if (elapsed > client->timeout_us_) {
- std::cerr << "操作超时" << std::endl;
- return 1;
- }
- }
-
- return 0;
- }
- } // namespace jtjai_media
|