X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput.cpp;h=c8b32cd4a0fc9f8b3d03b6d9ef2827a3ae5706c3;hb=ac7b3acb915f90de6b224e54a2240023fc221e5a;hp=1e6ff9ab7226a1c4ac337f66e40d312cb6d805b8;hpb=0b6e0a8087cbbcd404a476437f3d549c6cf48029;p=casparcg diff --git a/modules/ffmpeg/producer/input.cpp b/modules/ffmpeg/producer/input.cpp index 1e6ff9ab7..c8b32cd4a 100644 --- a/modules/ffmpeg/producer/input.cpp +++ b/modules/ffmpeg/producer/input.cpp @@ -21,7 +21,7 @@ #pragma warning (disable : 4244) #endif -#include "..\stdafx.h" +#include "../stdafx.h" #include "input.h" #include "../ffmpeg_error.h" @@ -29,312 +29,265 @@ #include -#include #include +#include +#include #include -#include +#include -#include #include +#include +#include +#include +#if defined(_MSC_VER) +#pragma warning (push) +#pragma warning (disable : 4244) +#endif extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS #include } +#if defined(_MSC_VER) +#pragma warning (pop) +#endif -namespace caspar { - -static const size_t PACKET_BUFFER_COUNT = 100; // Assume that av_read_frame distance between audio and video packets is less than PACKET_BUFFER_COUNT. - -class stream -{ - std::shared_ptr ctx_; - int index_; - tbb::concurrent_bounded_queue> buffer_; - -public: - - stream() - : index_(-1) - { - buffer_.set_capacity(PACKET_BUFFER_COUNT); - } - - int open(std::shared_ptr& fctx, AVMediaType media_type) - { - const auto streams = boost::iterator_range(fctx->streams, fctx->streams+fctx->nb_streams); - const auto stream = boost::find_if(streams, [&](AVStream* stream) - { - return stream && stream->codec->codec_type == media_type; - }); - - if(stream == streams.end()) - return AVERROR_STREAM_NOT_FOUND; - - auto codec = avcodec_find_decoder((*stream)->codec->codec_id); - if(!codec) - return AVERROR_DECODER_NOT_FOUND; - - index_ = (*stream)->index; - - int errn = tbb_avcodec_open((*stream)->codec, codec); - if(errn >= 0) - { - ctx_.reset((*stream)->codec, tbb_avcodec_close); - - // Some files give an invalid time_base numerator, try to fix it. - if(ctx_ && ctx_->time_base.num == 1) - ctx_->time_base.num = static_cast(std::pow(10.0, static_cast(std::log10(static_cast(ctx_->time_base.den)))-1)); - } - return errn; - } - - std::shared_ptr pop() - { - std::shared_ptr pkt; - buffer_.try_pop(pkt); - return pkt; - } - - void push(const std::shared_ptr& pkt) - { - if(pkt->stream_index != index_) - return; - - av_dup_packet(pkt.get()); - buffer_.push(pkt); - } - - const std::shared_ptr& ctx() { return ctx_; } - - operator bool(){return ctx_ != nullptr;} - - double fps() const { return !ctx_ ? -1.0 : static_cast(ctx_->time_base.den) / static_cast(ctx_->time_base.num); } +namespace caspar { namespace ffmpeg { - bool empty() const { return buffer_.empty();} - int size() const { return buffer_.size();} -}; - +static const size_t MAX_BUFFER_COUNT = 100; +static const size_t MIN_BUFFER_COUNT = 4; +static const size_t MAX_BUFFER_SIZE = 16 * 1000000; + struct input::implementation : boost::noncopyable { - safe_ptr graph_; + std::shared_ptr format_context_; // Destroy this last + int default_stream_index_; - std::shared_ptr format_context_; // Destroy this last + safe_ptr graph_; - const std::wstring filename_; - const bool loop_; - const int start_; - double fps_; - - stream video_stream_; - stream audio_stream_; + const std::wstring filename_; + const bool loop_; + const size_t start_; + const size_t length_; + size_t frame_number_; + + tbb::concurrent_bounded_queue> buffer_; + tbb::atomic buffer_size_; + boost::condition_variable buffer_cond_; + boost::mutex buffer_mutex_; - std::exception_ptr exception_; - executor executor_; + boost::thread thread_; + tbb::atomic is_running_; + + tbb::atomic nb_frames_; + tbb::atomic nb_loops_; + public: - explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, int start) + explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) : graph_(graph) , loop_(loop) , filename_(filename) - , executor_(print()) - , start_(std::max(start, 0)) - { - graph_->set_color("input-buffer", diagnostics::color(1.0f, 1.0f, 0.0f)); - graph_->set_color("seek", diagnostics::color(0.5f, 1.0f, 0.5f)); + , start_(start) + , length_(length) + , frame_number_(0) + { + is_running_ = true; + nb_frames_ = 0; + nb_loops_ = 0; - int errn; - AVFormatContext* weak_format_context_ = nullptr; - errn = av_open_input_file(&weak_format_context_, narrow(filename).c_str(), nullptr, 0, nullptr); - if(errn < 0 || weak_format_context_ == nullptr) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_open_input_file") << - boost::errinfo_errno(AVUNERROR(errn)) << - boost::errinfo_file_name(narrow(filename))); - } + THROW_ON_ERROR2(avformat_open_input(&weak_format_context_, narrow(filename).c_str(), nullptr, nullptr), print()); format_context_.reset(weak_format_context_, av_close_input_file); + + av_dump_format(weak_format_context_, 0, narrow(filename).c_str(), 0); - errn = av_find_stream_info(format_context_.get()); - if(errn < 0) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_find_stream_info") << - boost::errinfo_errno(AVUNERROR(errn))); - } - - errn = video_stream_.open(format_context_, AVMEDIA_TYPE_VIDEO); - if(errn < 0) - CASPAR_LOG(warning) << print() << L" Could not open video stream: " << widen(av_error_str(errn)); - - errn = audio_stream_.open(format_context_, AVMEDIA_TYPE_AUDIO); - if(errn < 0) - CASPAR_LOG(warning) << print() << L" Could not open audio stream: " << widen(av_error_str(errn)); + THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print()); - if(!video_stream_ && !audio_stream_) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - source_info(narrow(print())) << - msg_info("No video or audio codec context found.")); - } - - fps_ = video_stream_ ? video_stream_.fps() : audio_stream_.fps(); + default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print()); - if(start_ != 0) + if(start_ > 0) seek_frame(start_); - - for(size_t n = 0; n < 16; ++n) // Read some packets for pre-rolling. + + for(int n = 0; n < 16 && !full(); ++n) read_next_packet(); - - executor_.start(); - executor_.begin_invoke([this]{read_file();}); - CASPAR_LOG(info) << print() << " Started."; + + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); + graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); + + thread_ = boost::thread([this]{run();}); } ~implementation() { - stop(); + is_running_ = false; + buffer_cond_.notify_all(); + thread_.join(); } - std::shared_ptr get_video_packet() + bool try_pop(std::shared_ptr& packet) { - return video_stream_.pop(); - } + const bool result = buffer_.try_pop(packet); - std::shared_ptr get_audio_packet() - { - return audio_stream_.pop(); - } + if(result) + { + if(packet) + buffer_size_ -= packet->size; + buffer_cond_.notify_all(); + } - bool has_packet() const - { - return !video_stream_.empty() || !audio_stream_.empty(); + graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + + return result; } - - double fps() + + size_t nb_frames() const { - return fps_; + return nb_frames_; } -private: - - void stop() + size_t nb_loops() const { - executor_.stop(); - get_video_packet(); - get_audio_packet(); - CASPAR_LOG(info) << print() << " Stopping."; + return nb_loops_; } - void read_file() +private: + + void run() { - if(video_stream_.size() > 4 || audio_stream_.size() > 4) // audio is always before video. - Sleep(5); // There are enough packets, no hurry. + caspar::win32_exception::install_handler(); + + try + { + CASPAR_LOG(info) << print() << " Thread Started."; - read_next_packet(); + while(is_running_) + { + { + boost::unique_lock lock(buffer_mutex_); + while(full()) + buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20)); + } + read_next_packet(); + } - executor_.begin_invoke([this]{read_file();}); + CASPAR_LOG(info) << print() << " Thread Stopped."; + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + is_running_ = false; + } } void read_next_packet() { - try + int ret = 0; + + std::shared_ptr read_packet(new AVPacket, [](AVPacket* p) { - std::shared_ptr read_packet(new AVPacket(), [](AVPacket* p) - { - av_free_packet(p); - delete p; - }); + av_free_packet(p); + delete p; + }); + av_init_packet(read_packet.get()); + + ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + + if(is_eof(ret)) + { + ++nb_loops_; + frame_number_ = 0; - const int errn = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. - if(is_eof(errn)) // Use av_dup_packet to extend its life. + if(loop_) { - if(loop_) - { - seek_frame(start_, AVSEEK_FLAG_BACKWARD); - graph_->add_tag("seek"); - CASPAR_LOG(info) << print() << " Received EOF. Looping."; - } - else + int flags = AVSEEK_FLAG_BACKWARD; + + int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + if(vid_stream_index >= 0) { - stop(); - CASPAR_LOG(info) << print() << " Received EOF. Stopping."; + auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; + if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) + flags |= AVSEEK_FLAG_BYTE; } - } - else if(errn < 0) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - msg_info(av_error_str(errn)) << - source_info(narrow(print())) << - boost::errinfo_api_function("av_read_frame") << - boost::errinfo_errno(AVUNERROR(errn))); - } + + seek_frame(start_, flags); + graph_->add_tag("seek"); + CASPAR_LOG(trace) << print() << " Looping."; + } else { - video_stream_.push(read_packet); - audio_stream_.push(read_packet); + is_running_ = false; + CASPAR_LOG(trace) << print() << " Stopping."; } - - graph_->update_value("input-buffer", static_cast(std::max(video_stream_.size(), audio_stream_.size()))/static_cast(PACKET_BUFFER_COUNT)); } - catch(...) - { - stop(); - CASPAR_LOG_CURRENT_EXCEPTION(); - return; - } + else + { + THROW_ON_ERROR(ret, print(), "av_read_frame"); + + if(read_packet->stream_index == default_stream_index_) + { + if(nb_loops_ == 0) + ++nb_frames_; + ++frame_number_; + } + + THROW_ON_ERROR2(av_dup_packet(read_packet.get()), print()); + + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + auto size = read_packet->size; + auto data = read_packet->data; + + read_packet = std::shared_ptr(read_packet.get(), [=](AVPacket*) + { + read_packet->size = size; + read_packet->data = data; + }); + + buffer_.try_push(read_packet); + buffer_size_ += read_packet->size; + + graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + } + } + + bool full() const + { + return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT; } void seek_frame(int64_t frame, int flags = 0) - { - // Convert from frames into seconds. - const auto ts = frame*static_cast(AV_TIME_BASE/fps_); - - const int errn = av_seek_frame(format_context_.get(), -1, ts, flags | AVSEEK_FLAG_FRAME); - if(errn < 0) - { - BOOST_THROW_EXCEPTION( - invalid_operation() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_seek_frame") << - boost::errinfo_errno(AVUNERROR(errn))); - } + { + THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); + buffer_.push(nullptr); } - bool is_eof(int errn) + bool is_eof(int ret) { - if(errn == AVERROR(EIO)) - CASPAR_LOG(warning) << print() << " Received EIO, assuming EOF"; + if(ret == AVERROR(EIO)) + CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. " << nb_frames_; + if(ret == AVERROR_EOF) + CASPAR_LOG(trace) << print() << " Received EOF. " << nb_frames_; - return errn == AVERROR_EOF || errn == AVERROR(EIO); // av_read_frame doesn't always correctly return AVERROR_EOF; + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; } std::wstring print() const { - return L"ffmpeg_input[" + filename_ + L"]"; + return L"ffmpeg_input[" + filename_ + L")]"; } }; -input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, int start) - : impl_(new implementation(graph, filename, loop, start)){} -const std::shared_ptr& input::get_video_codec_context() const{return impl_->video_stream_.ctx();} -const std::shared_ptr& input::get_audio_codec_context() const{return impl_->audio_stream_.ctx();} -bool input::has_packet() const{return impl_->has_packet();} -bool input::is_running() const {return impl_->executor_.is_running();} -std::shared_ptr input::get_video_packet(){return impl_->get_video_packet();} -std::shared_ptr input::get_audio_packet(){return impl_->get_audio_packet();} -double input::fps() const { return impl_->fps(); } -} \ No newline at end of file +input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) + : impl_(new implementation(graph, filename, loop, start, length)){} +bool input::eof() const {return !impl_->is_running_;} +bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} +safe_ptr input::context(){return make_safe(impl_->format_context_);} +size_t input::nb_frames() const {return impl_->nb_frames();} +size_t input::nb_loops() const {return impl_->nb_loops();} +}} \ No newline at end of file