X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput.cpp;h=ec661bcfa48a75958c7d76059c82a23705c996aa;hb=e5770670a865f6a4f85245c7f895acb03f295e26;hp=ab64f23aa50c69cb42cefa25fbfb72eea57469d8;hpb=bddd8fb7d91330208118d51dc0a2d67e55c0efd0;p=casparcg diff --git a/modules/ffmpeg/producer/input.cpp b/modules/ffmpeg/producer/input.cpp index ab64f23aa..ec661bcfa 100644 --- a/modules/ffmpeg/producer/input.cpp +++ b/modules/ffmpeg/producer/input.cpp @@ -21,7 +21,7 @@ #pragma warning (disable : 4244) #endif -#include "..\stdafx.h" +#include "../stdafx.h" #include "input.h" #include "../ffmpeg_error.h" @@ -29,8 +29,9 @@ #include -#include #include +#include +#include #include #include @@ -38,219 +39,242 @@ #include #include #include -#include +#include +#if defined(_MSC_VER) +#pragma warning (push) +#pragma warning (disable : 4244) +#endif extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS #include } +#if defined(_MSC_VER) +#pragma warning (pop) +#endif -namespace caspar { +namespace caspar { namespace ffmpeg { -static const size_t MAX_BUFFER_COUNT = 128; -static const size_t MAX_BUFFER_SIZE = 32 * 1000000; +static const size_t MAX_BUFFER_COUNT = 100; +static const size_t MIN_BUFFER_COUNT = 4; +static const size_t MAX_BUFFER_SIZE = 16 * 1000000; struct input::implementation : boost::noncopyable { - safe_ptr graph_; + std::shared_ptr format_context_; // Destroy this last + int default_stream_index_; - std::shared_ptr format_context_; // Destroy this last + safe_ptr graph_; - const std::wstring filename_; - const bool loop_; - const int start_; + const std::wstring filename_; + const bool loop_; + const size_t start_; + const size_t length_; + size_t frame_number_; + + tbb::concurrent_bounded_queue> buffer_; + tbb::atomic buffer_size_; + boost::condition_variable buffer_cond_; + boost::mutex buffer_mutex_; + + boost::thread thread_; + tbb::atomic is_running_; - tbb::atomic buffer_size_; - boost::condition_variable cond_; - boost::mutex mutex_; + tbb::atomic nb_frames_; + tbb::atomic nb_loops_; - tbb::concurrent_bounded_queue> buffer_; - - executor executor_; public: - explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, int start) + explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) : graph_(graph) , loop_(loop) , filename_(filename) - , executor_(print()) - , start_(std::max(start, 0)) + , start_(start) + , length_(length) + , frame_number_(0) { - int errn; - + is_running_ = true; + nb_frames_ = 0; + nb_loops_ = 0; + AVFormatContext* weak_format_context_ = nullptr; - errn = avformat_open_input(&weak_format_context_, narrow(filename).c_str(), nullptr, nullptr); - if(errn < 0 || weak_format_context_ == nullptr) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_open_input_file") << - boost::errinfo_errno(AVUNERROR(errn)) << - boost::errinfo_file_name(narrow(filename))); - } + THROW_ON_ERROR2(avformat_open_input(&weak_format_context_, narrow(filename).c_str(), nullptr, nullptr), print()); format_context_.reset(weak_format_context_, av_close_input_file); + + av_dump_format(weak_format_context_, 0, narrow(filename).c_str(), 0); - errn = avformat_find_stream_info(format_context_.get(), nullptr); - if(errn < 0) - { - BOOST_THROW_EXCEPTION( - file_read_error() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_find_stream_info") << - boost::errinfo_errno(AVUNERROR(errn))); - } + THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print()); - if(start_ != 0) + default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print()); + + if(start_ > 0) seek_frame(start_); - for(int n = 0; n < 16 && buffer_size_ < MAX_BUFFER_SIZE && buffer_.size() < MAX_BUFFER_COUNT; ++n) + for(int n = 0; n < 16 && !full(); ++n) read_next_packet(); + + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); + graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); - buffer_.set_capacity(MAX_BUFFER_COUNT); - - graph_->set_color("seek", diagnostics::color(0.5f, 1.0f, 0.5f)); - graph_->set_color("buffer-count", diagnostics::color(0.2f, 0.8f, 1.0f)); - graph_->set_color("buffer-size", diagnostics::color(0.2f, 0.4f, 1.0f)); - - executor_.begin_invoke([this]{read_file();}); - CASPAR_LOG(info) << print() << " Started."; + thread_ = boost::thread([this]{run();}); } ~implementation() { - stop(); - // Unblock thread. - std::shared_ptr packet; - buffer_.try_pop(packet); - buffer_size_ = 0; - cond_.notify_all(); + is_running_ = false; + buffer_cond_.notify_all(); + thread_.join(); } bool try_pop(std::shared_ptr& packet) { - bool result = buffer_.try_pop(packet); - graph_->update_value("buffer-count", MAX_BUFFER_SIZE/static_cast(buffer_.size())); + const bool result = buffer_.try_pop(packet); + if(result) { if(packet) buffer_size_ -= packet->size; - graph_->update_value("buffer-size", MAX_BUFFER_SIZE/static_cast(buffer_size_)); - cond_.notify_all(); + buffer_cond_.notify_all(); } + + graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + return result; } -private: + size_t nb_frames() const + { + return nb_frames_; + } - void stop() + size_t nb_loops() const { - executor_.stop(); - - CASPAR_LOG(info) << print() << " Stopping."; + return nb_loops_; } - void read_file() +private: + + void run() { - read_next_packet(); - executor_.begin_invoke([this]{read_file();}); + caspar::win32_exception::install_handler(); + + try + { + CASPAR_LOG(info) << print() << " Thread Started."; + + while(is_running_) + { + { + boost::unique_lock lock(buffer_mutex_); + while(full()) + buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20)); + } + read_next_packet(); + } + + CASPAR_LOG(info) << print() << " Thread Stopped."; + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + is_running_ = false; + } } void read_next_packet() { - try + int ret = 0; + + std::shared_ptr read_packet(new AVPacket, [](AVPacket* p) { - std::shared_ptr read_packet(new AVPacket, [](AVPacket* p) - { - av_free_packet(p); - delete p; - }); - av_init_packet(read_packet.get()); + av_free_packet(p); + delete p; + }); + av_init_packet(read_packet.get()); - const int errn = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. - if(is_eof(errn)) // Use av_dup_packet to extend its life. + ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + + if(is_eof(ret)) + { + ++nb_loops_; + frame_number_ = 0; + + if(loop_) { - if(loop_) - { - seek_frame(start_, AVSEEK_FLAG_BACKWARD); - graph_->add_tag("seek"); - //CASPAR_LOG(info) << print() << " Received EOF. Looping."; - } - else + int flags = AVSEEK_FLAG_BACKWARD; + + int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + if(vid_stream_index >= 0) { - stop(); - //CASPAR_LOG(info) << print() << " Received EOF. Stopping."; + auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; + if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) + flags |= AVSEEK_FLAG_BYTE; } - } - else if(errn < 0) + + seek_frame(start_, flags); + graph_->add_tag("seek"); + CASPAR_LOG(debug) << print() << " Looping."; + } + else { - BOOST_THROW_EXCEPTION( - file_read_error() << - msg_info(av_error_str(errn)) << - source_info(narrow(print())) << - boost::errinfo_api_function("av_read_frame") << - boost::errinfo_errno(AVUNERROR(errn))); + is_running_ = false; + CASPAR_LOG(debug) << print() << " Stopping."; } - else + } + else + { + THROW_ON_ERROR(ret, print(), "av_read_frame"); + + if(read_packet->stream_index == default_stream_index_) { - av_dup_packet(read_packet.get()); + if(nb_loops_ == 0) + ++nb_frames_; + ++frame_number_; + } - graph_->update_value("buffer-count", MAX_BUFFER_SIZE/static_cast(buffer_.size())); + THROW_ON_ERROR2(av_dup_packet(read_packet.get()), print()); - boost::unique_lock lock(mutex_); - while(buffer_size_ > MAX_BUFFER_SIZE && buffer_.size() > MAX_BUFFER_COUNT) - cond_.wait(lock); + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + auto size = read_packet->size; + auto data = read_packet->data; + + read_packet = std::shared_ptr(read_packet.get(), [=](AVPacket*) + { + read_packet->size = size; + read_packet->data = data; + }); + + buffer_.try_push(read_packet); + buffer_size_ += read_packet->size; - buffer_.push(read_packet); - buffer_size_ += read_packet->size; + graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + } + } - graph_->update_value("buffer-size", MAX_BUFFER_SIZE/static_cast(buffer_size_)); - } - } - catch(...) - { - stop(); - CASPAR_LOG_CURRENT_EXCEPTION(); - return; - } + bool full() const + { + return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT; } void seek_frame(int64_t frame, int flags = 0) - { - static const AVRational base_q = {1, AV_TIME_BASE}; - - // Convert from frames into seconds. - auto seek_target = frame;//*static_cast(AV_TIME_BASE/fps_); - - int stream_index = -1;//video_stream_.index() >= 0 ? video_stream_.index() : audio_stream_.index(); - - //if(stream_index >= 0) - // seek_target = av_rescale_q(seek_target, base_q, format_context_->streams[stream_index]->time_base); - - const int errn = av_seek_frame(format_context_.get(), stream_index, seek_target, flags); - if(errn < 0) - { - BOOST_THROW_EXCEPTION( - invalid_operation() << - source_info(narrow(print())) << - msg_info(av_error_str(errn)) << - boost::errinfo_api_function("av_seek_frame") << - boost::errinfo_errno(AVUNERROR(errn))); - } - + { + THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); buffer_.push(nullptr); } - bool is_eof(int errn) + bool is_eof(int ret) { - //if(errn == AVERROR(EIO)) - // CASPAR_LOG(warning) << print() << " Received EIO, assuming EOF"; + if(ret == AVERROR(EIO)) + CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. " << nb_frames_; + if(ret == AVERROR_EOF) + CASPAR_LOG(debug) << print() << " Received EOF. " << nb_frames_; - return errn == AVERROR_EOF || errn == AVERROR(EIO); // av_read_frame doesn't always correctly return AVERROR_EOF; + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; } std::wstring print() const @@ -259,9 +283,11 @@ private: } }; -input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, int start, int length) - : impl_(new implementation(graph, filename, loop, start)){} -bool input::eof() const {return !impl_->executor_.is_running();} +input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) + : impl_(new implementation(graph, filename, loop, start, length)){} +bool input::eof() const {return !impl_->is_running_;} bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} -std::shared_ptr input::context(){return impl_->format_context_;} -} \ No newline at end of file +safe_ptr input::context(){return make_safe(impl_->format_context_);} +size_t input::nb_frames() const {return impl_->nb_frames();} +size_t input::nb_loops() const {return impl_->nb_loops();} +}} \ No newline at end of file