X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput%2Finput.cpp;h=1e081a17a09214b589ecd3d2cd4c8dd6d91273bb;hb=726897adbf881d3b75f171fff24f2b917ba5f05a;hp=c51cf152d406c503b0807e55e139664eba10a045;hpb=cd1a44a41dd64c05de067ba728c285f001b66bf3;p=casparcg diff --git a/modules/ffmpeg/producer/input/input.cpp b/modules/ffmpeg/producer/input/input.cpp index c51cf152d..1e081a17a 100644 --- a/modules/ffmpeg/producer/input/input.cpp +++ b/modules/ffmpeg/producer/input/input.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2011 Sveriges Television AB +* Copyright 2013 Sveriges Television AB http://casparcg.com/ * * This file is part of CasparCG (www.casparcg.com). * @@ -19,27 +19,28 @@ * Author: Robert Nagy, ronag89@gmail.com */ -#include "../../StdAfx.h" +#include "../../stdafx.h" #include "input.h" #include "../util/util.h" +#include "../util/flv.h" #include "../../ffmpeg_error.h" #include "../../ffmpeg.h" +#include + #include #include -#include -//#include +#include #include -#include - -#include #include #include #include +#include +#include #include #include #include @@ -48,7 +49,7 @@ #pragma warning (push) #pragma warning (disable : 4244) #endif -extern "C" +extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS @@ -58,212 +59,284 @@ extern "C" #pragma warning (pop) #endif +static const size_t MAX_BUFFER_COUNT = 100; +static const size_t MAX_BUFFER_COUNT_RT = 3; +static const size_t MIN_BUFFER_COUNT = 50; +static const size_t MAX_BUFFER_SIZE = 64 * 1000000; + namespace caspar { namespace ffmpeg { +struct input::implementation : boost::noncopyable +{ + const spl::shared_ptr graph_; -static const int MAX_PUSH_WITHOUT_POP = 200; -static const int MIN_FRAMES = 25; + const spl::shared_ptr format_context_; // Destroy this last + const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); -class stream -{ - stream(const stream&); - stream& operator=(const stream&); + const std::wstring filename_; + tbb::atomic start_; + tbb::atomic length_; + const bool thumbnail_mode_; + tbb::atomic loop_; + uint32_t frame_number_ = 0; + boost::rational framerate_ = read_framerate(*format_context_, 1); - typedef tbb::concurrent_bounded_queue>::size_type size_type; + tbb::concurrent_bounded_queue> buffer_; + tbb::atomic buffer_size_; - int index_; - tbb::concurrent_bounded_queue> packets_; - tbb::atomic push_since_pop_; -public: + executor executor_; - stream(int index) - : index_(index) + explicit implementation(const spl::shared_ptr graph, const std::wstring& filename, FFMPEG_Resource resource_type, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : graph_(graph) + , format_context_(open_input(filename, resource_type, vid_params)) + , filename_(filename) + , thumbnail_mode_(thumbnail_mode) + , executor_(print()) { - push_since_pop_ = 0; - } + if (thumbnail_mode_) + executor_.invoke([] + { + enable_quiet_logging_for_thread(); + }); - stream(stream&&) = default; + start_ = start; + length_ = length; + loop_ = loop; + buffer_size_ = 0; - bool is_available() const - { - return index_ >= 0; - } + if(start_ > 0) + queued_seek(start_); - int index() const - { - return index_; + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); + graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); + + tick(); } - - void push(const std::shared_ptr& packet) + + bool try_pop(std::shared_ptr& packet) { - if(packet && packet->data && packet->stream_index != index_) - return; + auto result = buffer_.try_pop(packet); - if (++push_since_pop_ > MAX_PUSH_WITHOUT_POP) // Out of memory protection for streams never being used. + if(result) { - return; + if(packet) + buffer_size_ -= packet->size; + tick(); } - packets_.push(packet); - } + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - bool try_pop(std::shared_ptr& packet) - { - push_since_pop_ = 0; - - return packets_.try_pop(packet); + return result; } - void clear() + std::ptrdiff_t get_max_buffer_count() const { - std::shared_ptr packet; - push_since_pop_ = 0; - while(packets_.try_pop(packet)); + return thumbnail_mode_ ? 1 : MAX_BUFFER_COUNT; } - - size_type size() const + + std::ptrdiff_t get_min_buffer_count() const { - return is_available() ? packets_.size() : std::numeric_limits::max(); + return thumbnail_mode_ ? 0 : MIN_BUFFER_COUNT; } -}; - -struct input::impl : boost::noncopyable -{ - const spl::shared_ptr graph_; - - const std::wstring filename_; - const spl::shared_ptr format_context_ = open_input(filename_); // Destroy this last - const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); - - tbb::atomic start_; - tbb::atomic length_; - tbb::atomic loop_; - tbb::atomic eof_; - double fps_ = read_fps(*format_context_, 0.0); - uint32_t frame_number_ = 0; - - stream video_stream_ { av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0) }; - std::vector audio_streams_; - - boost::optional seek_target_; - - tbb::atomic is_running_; - boost::mutex mutex_; - boost::condition_variable cond_; - boost::thread thread_; - - impl( - const spl::shared_ptr graph, - const std::wstring& filename, - const bool loop, - const uint32_t start, - const uint32_t length, - bool thumbnail_mode) - : graph_(graph) - , filename_(filename) - { - start_ = start; - length_ = length; - loop_ = loop; - eof_ = false; - is_running_ = true; - if(start_ != 0) - seek_target_ = start_; - - graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + std::future seek(uint32_t target) + { + if (!executor_.is_running()) + return make_ready_future(false); - if (!thumbnail_mode) - for (unsigned i = 0; i < format_context_->nb_streams; ++i) - if (format_context_->streams[i]->codec->codec_type == AVMediaType::AVMEDIA_TYPE_AUDIO) - audio_streams_.emplace_back(i); + return executor_.begin_invoke([=]() -> bool + { + std::shared_ptr packet; + while(buffer_.try_pop(packet) && packet) + buffer_size_ -= packet->size; - for (int i = 0; i < audio_streams_.size(); ++i) - graph_->set_color("audio-buffer" + boost::lexical_cast(i + 1), diagnostics::color(0.7f, 0.4f, 0.4f)); + queued_seek(target); - if (video_stream_.is_available()) - graph_->set_color("video-buffer", diagnostics::color(1.0f, 1.0f, 0.0f)); - - for(int n = 0; n < 8; ++n) tick(); - thread_ = boost::thread([this, thumbnail_mode]{run(thumbnail_mode);}); + return true; + }, task_priority::high_priority); } - ~impl() + std::wstring print() const { - is_running_ = false; - cond_.notify_one(); - thread_.join(); + return L"ffmpeg_input[" + filename_ + L")]"; } - - bool try_pop_video(std::shared_ptr& packet) - { - if (!video_stream_.is_available()) - return false; - - bool result = video_stream_.try_pop(packet); - if(result) - cond_.notify_one(); - - graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size())/MIN_FRAMES)); - - return result; + bool full() const + { + return (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > get_max_buffer_count()) && buffer_.size() > get_min_buffer_count(); } - - bool try_pop_audio(std::shared_ptr& packet, int audio_stream_index) + + void tick() { - if (audio_streams_.size() < audio_stream_index + 1) - return false; + if(!executor_.is_running()) + return; - auto& audio_stream = audio_streams_.at(audio_stream_index); - bool result = audio_stream.try_pop(packet); - if(result) - cond_.notify_one(); + executor_.begin_invoke([this] + { + if(full()) + return; - auto buffer_nr = boost::lexical_cast(audio_stream_index + 1); - graph_->set_value("audio-buffer" + buffer_nr, std::min(1.0, static_cast(audio_stream.size())/MIN_FRAMES)); + try + { + auto packet = create_packet(); - return result; - } + auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - void seek(uint32_t target) - { - { - boost::lock_guard lock(mutex_); + if(is_eof(ret)) + { + frame_number_ = 0; + + if(loop_) + { + queued_seek(start_); + graph_->set_tag(diagnostics::tag_severity::INFO, "seek"); + CASPAR_LOG(trace) << print() << " Looping."; + } + else + executor_.stop(); + } + else + { + THROW_ON_ERROR(ret, "av_read_frame", print()); - seek_target_ = target; - video_stream_.clear(); + if(packet->stream_index == default_stream_index_) + ++frame_number_; - for (auto& audio_stream : audio_streams_) - audio_stream.clear(); - } + THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); + + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + auto size = packet->size; + auto data = packet->data; - cond_.notify_one(); + packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) + { + packet->size = size; + packet->data = data; + }); + + buffer_.try_push(packet); + buffer_size_ += packet->size; + + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + } + + tick(); + } + catch(...) + { + if (!thumbnail_mode_) + CASPAR_LOG_CURRENT_EXCEPTION(); + executor_.stop(); + } + }); } - int get_actual_audio_stream_index(int audio_stream_index) const + spl::shared_ptr open_input(const std::wstring resource_name, FFMPEG_Resource resource_type, const ffmpeg_options& vid_params) { - if (audio_stream_index + 1 > audio_streams_.size()) - CASPAR_THROW_EXCEPTION(averror_stream_not_found()); + AVFormatContext* weak_context = nullptr; - return audio_streams_.at(audio_stream_index).index(); + switch (resource_type) { + case FFMPEG_Resource::FFMPEG_FILE: + THROW_ON_ERROR2(avformat_open_input(&weak_context, u8(resource_name).c_str(), nullptr, nullptr), resource_name); + break; + case FFMPEG_Resource::FFMPEG_DEVICE: + { + AVDictionary* format_options = NULL; + for (auto& option : vid_params) + { + av_dict_set(&format_options, option.first.c_str(), option.second.c_str(), 0); + } + AVInputFormat* input_format = av_find_input_format("dshow"); + THROW_ON_ERROR2(avformat_open_input(&weak_context, u8(resource_name).c_str(), input_format, &format_options), resource_name); + if (format_options != nullptr) + { + std::string unsupported_tokens = ""; + AVDictionaryEntry *t = NULL; + while ((t = av_dict_get(format_options, "", t, AV_DICT_IGNORE_SUFFIX)) != nullptr) + { + if (!unsupported_tokens.empty()) + unsupported_tokens += ", "; + unsupported_tokens += t->key; + } + avformat_close_input(&weak_context); + BOOST_THROW_EXCEPTION(ffmpeg_error() << msg_info(unsupported_tokens)); + } + av_dict_free(&format_options); + } + break; + case FFMPEG_Resource::FFMPEG_STREAM: + { + AVDictionary* format_options = NULL; + for (auto& option : vid_params) + { + av_dict_set(&format_options, option.first.c_str(), option.second.c_str(), 0); + } + THROW_ON_ERROR2(avformat_open_input(&weak_context, u8(resource_name).c_str(), nullptr, &format_options), resource_name); + if (format_options != nullptr) + { + std::string unsupported_tokens = ""; + AVDictionaryEntry *t = NULL; + while ((t = av_dict_get(format_options, "", t, AV_DICT_IGNORE_SUFFIX)) != nullptr) + { + if (!unsupported_tokens.empty()) + unsupported_tokens += ", "; + unsupported_tokens += t->key; + } + avformat_close_input(&weak_context); + BOOST_THROW_EXCEPTION(ffmpeg_error() << msg_info(unsupported_tokens)); + } + av_dict_free(&format_options); + } + break; + }; + spl::shared_ptr context(weak_context, [](AVFormatContext* p) + { + avformat_close_input(&p); + }); + THROW_ON_ERROR2(avformat_find_stream_info(weak_context, nullptr), resource_name); + fix_meta_data(*context); + return context; } - - std::wstring print() const + + void fix_meta_data(AVFormatContext& context) { - return L"ffmpeg_input[" + filename_ + L")]"; + auto video_index = av_find_best_stream(&context, AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + + if (video_index > -1) + { + auto video_stream = context.streams[video_index]; + auto video_context = context.streams[video_index]->codec; + + if (boost::filesystem::path(context.filename).extension().string() == ".flv") + { + try + { + auto meta = read_flv_meta_info(context.filename); + double fps = boost::lexical_cast(meta["framerate"]); + video_stream->nb_frames = static_cast(boost::lexical_cast(meta["duration"])*fps); + } + catch (...) {} + } + else + { + auto stream_time = video_stream->time_base; + auto duration = video_stream->duration; + auto codec_time = video_context->time_base; + auto ticks = video_context->ticks_per_frame; + + if (video_stream->nb_frames == 0) + video_stream->nb_frames = (duration*stream_time.num*codec_time.den) / (stream_time.den*codec_time.num*ticks); + } + } } -private: - void internal_seek(uint32_t target) + void queued_seek(const uint32_t target) { - eof_ = false; - graph_->set_tag(diagnostics::tag_severity::INFO, "seek"); - - if (is_logging_quiet_for_thread()) - CASPAR_LOG(trace) << print() << " Seeking: " << target; - else + if (!thumbnail_mode_) CASPAR_LOG(debug) << print() << " Seeking: " << target; int flags = AVSEEK_FLAG_FRAME; @@ -278,152 +351,61 @@ private: flags = AVSEEK_FLAG_BYTE; } } - - auto stream = format_context_->streams[default_stream_index_]; - auto fps = read_fps(*format_context_, 0.0); - auto target_timestamp = static_cast((target / fps * stream->time_base.den) / stream->time_base.num); - + + auto stream = format_context_->streams[default_stream_index_]; + + + auto fps = read_fps(*format_context_, 0.0); + THROW_ON_ERROR2(avformat_seek_file( - format_context_.get(), - default_stream_index_, - std::numeric_limits::min(), - target_timestamp, - std::numeric_limits::max(), - 0), print()); + format_context_.get(), + default_stream_index_, + std::numeric_limits::min(), + static_cast((target / fps * stream->time_base.den) / stream->time_base.num), + std::numeric_limits::max(), + 0), print()); auto flush_packet = create_packet(); flush_packet->data = nullptr; flush_packet->size = 0; flush_packet->pos = target; - - video_stream_.push(flush_packet); - for (auto& audio_stream : audio_streams_) - audio_stream.push(flush_packet); + buffer_.push(flush_packet); } - void tick() - { - if(seek_target_) - { - internal_seek(*seek_target_); - seek_target_.reset(); - } - - auto packet = create_packet(); - - auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - - if(is_eof(ret)) - { - if (loop_) - internal_seek(start_); - else - { - eof_ = true; - } - } - else - { - THROW_ON_ERROR(ret, "av_read_frame", print()); - - THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); - - // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. - const auto size = packet->size; - const auto data = packet->data; - - packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) - { - packet->size = size; - packet->data = data; - }); - - const auto stream_time_base = format_context_->streams[packet->stream_index]->time_base; - const auto packet_frame_number = static_cast((static_cast(packet->pts * stream_time_base.num)/stream_time_base.den)*fps_); - - if(packet->stream_index == default_stream_index_) - frame_number_ = packet_frame_number; - - if(packet_frame_number >= start_ && packet_frame_number < length_) - { - video_stream_.push(packet); - - for (auto& audio_stream : audio_streams_) - audio_stream.push(packet); - } - } - - if (video_stream_.is_available()) - graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size())/MIN_FRAMES)); - - for (int i = 0; i < audio_streams_.size(); ++i) - graph_->set_value( - "audio-buffer" + boost::lexical_cast(i + 1), - std::min(1.0, static_cast(audio_streams_[i].size())/MIN_FRAMES)); - } - - bool full() const + bool is_eof(int ret) { - bool video_full = video_stream_.size() >= MIN_FRAMES; + if(ret == AVERROR(EIO)) + CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. "; + if(ret == AVERROR_EOF) + CASPAR_LOG(trace) << print() << " Received EOF. "; - if (!video_full) - return false; - - for (auto& audio_stream : audio_streams_) - if (audio_stream.size() < MIN_FRAMES) - return false; - - return true; + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; } - void run(bool thumbnail_mode) + int num_audio_streams() const { - ensure_gpf_handler_installed_for_thread(u8(print()).c_str()); - auto quiet_logging = temporary_enable_quiet_logging_for_thread(thumbnail_mode); - - while(is_running_) - { - try - { - - { - boost::unique_lock lock(mutex_); - - while((eof_ || full()) && !seek_target_ && is_running_) - cond_.wait(lock); - - tick(); - } - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - is_running_ = false; - } - } + return 0; // TODO } - - bool is_eof(int ret) + + boost::rational framerate() const { - #pragma warning (disable : 4146) - return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; + return framerate_; } }; -input::input(const spl::shared_ptr& graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode) - : impl_(new impl(graph, filename, loop, start, length, thumbnail_mode)){} -int input::get_actual_audio_stream_index(int audio_stream_index) const { return impl_->get_actual_audio_stream_index(audio_stream_index); }; -int input::num_audio_streams() const { return static_cast(impl_->audio_streams_.size()); } -bool input::try_pop_video(std::shared_ptr& packet){return impl_->try_pop_video(packet);} -bool input::try_pop_audio(std::shared_ptr& packet, int audio_stream_index){return impl_->try_pop_audio(packet, audio_stream_index);} -AVFormatContext& input::context(){return *impl_->format_context_;} -void input::loop(bool value){impl_->loop_ = value;} -bool input::loop() const{return impl_->loop_;} -void input::seek(uint32_t target){impl_->seek(target);} +input::input(const spl::shared_ptr& graph, const std::wstring& filename, FFMPEG_Resource resource_type, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : impl_(new implementation(graph, filename, resource_type, loop, start, length, thumbnail_mode, vid_params)){} +bool input::eof() const {return !impl_->executor_.is_running();} +bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} +spl::shared_ptr input::context(){return impl_->format_context_;} void input::start(uint32_t value){impl_->start_ = value;} uint32_t input::start() const{return impl_->start_;} void input::length(uint32_t value){impl_->length_ = value;} uint32_t input::length() const{return impl_->length_;} -bool input::eof() const { return impl_->eof_; } +void input::loop(bool value){impl_->loop_ = value;} +bool input::loop() const{return impl_->loop_;} +int input::num_audio_streams() const { return impl_->num_audio_streams(); } +boost::rational input::framerate() const { return impl_->framerate(); } +std::future input::seek(uint32_t target){return impl_->seek(target);} }}