X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput%2Finput.cpp;h=72648037d8d9704374ebaacf0e3ee89d3dc3394e;hb=c0dc760a3d87b346c9f267cd9d74c67c55d3bdc3;hp=823e10ac74685d570a85f63bc7858128366e3b98;hpb=0a0753380eefc7aa64561308ad02564735262ae9;p=casparcg diff --git a/modules/ffmpeg/producer/input/input.cpp b/modules/ffmpeg/producer/input/input.cpp index 823e10ac7..72648037d 100644 --- a/modules/ffmpeg/producer/input/input.cpp +++ b/modules/ffmpeg/producer/input/input.cpp @@ -1,5 +1,5 @@ /* -* Copyright (c) 2011 Sveriges Television AB +* Copyright 2013 Sveriges Television AB http://casparcg.com/ * * This file is part of CasparCG (www.casparcg.com). * @@ -24,21 +24,25 @@ #include "input.h" #include "../util/util.h" +#include "../util/flv.h" #include "../../ffmpeg_error.h" +#include "../../ffmpeg.h" + +#include #include #include -#include -//#include +#include #include -#include - -#include +#include +#include #include #include #include +#include +#include #include #include #include @@ -47,7 +51,7 @@ #pragma warning (push) #pragma warning (disable : 4244) #endif -extern "C" +extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS @@ -57,276 +61,339 @@ extern "C" #pragma warning (pop) #endif -namespace caspar { namespace ffmpeg { +static const size_t MAX_BUFFER_COUNT = 100; +static const size_t MAX_BUFFER_COUNT_RT = 3; +static const size_t MIN_BUFFER_COUNT = 50; +static const size_t MAX_BUFFER_SIZE = 64 * 1000000; -static const int MIN_FRAMES = 25; - -class stream +namespace caspar { namespace ffmpeg { +struct input::implementation : boost::noncopyable { - stream(const stream&); - stream& operator=(const stream&); + const spl::shared_ptr graph_; - typedef tbb::concurrent_bounded_queue>::size_type size_type; + const spl::shared_ptr format_context_; // Destroy this last + const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); - int index_; - tbb::concurrent_bounded_queue> packets_; -public: + const std::wstring filename_; + tbb::atomic start_; + tbb::atomic length_; + const bool thumbnail_mode_; + tbb::atomic loop_; + uint32_t frame_number_ = 0; + boost::rational framerate_ = read_framerate(*format_context_, 1); - stream(int index) - : index_(index) - { - } - - void push(const std::shared_ptr& packet) - { - if(packet && packet->data && packet->stream_index != index_) - return; + tbb::concurrent_bounded_queue> buffer_; + tbb::atomic buffer_size_; - packets_.push(packet); - } + executor executor_; - bool try_pop(std::shared_ptr& packet) + explicit implementation(const spl::shared_ptr graph, const std::wstring& url_or_file, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : graph_(graph) + , format_context_(open_input(url_or_file, vid_params)) + , filename_(url_or_file) + , thumbnail_mode_(thumbnail_mode) + , executor_(print()) { - return packets_.try_pop(packet); - } + if (thumbnail_mode_) + executor_.invoke([] + { + enable_quiet_logging_for_thread(); + }); - void clear() - { - std::shared_ptr packet; - while(packets_.try_pop(packet)); - } - - size_type size() const - { - return index_ != -1 ? packets_.size() : std::numeric_limits::max(); - } -}; - -struct input::impl : boost::noncopyable -{ - const spl::shared_ptr graph_; - - const std::wstring filename_; - const spl::shared_ptr format_context_ = open_input(filename_); // Destroy this last - const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); - - tbb::atomic start_; - tbb::atomic length_; - tbb::atomic loop_; - double fps_ = read_fps(*format_context_, 0.0); - uint32_t frame_number_ = 0; - - stream video_stream_ { av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0) }; - stream audio_stream_ { av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_AUDIO, -1, -1, 0, 0) }; - - boost::optional seek_target_; - - tbb::atomic is_running_; - boost::mutex mutex_; - boost::condition_variable cond_; - boost::thread thread_; - - impl(const spl::shared_ptr graph, const std::wstring& filename, const bool loop, const uint32_t start, const uint32_t length) - : graph_(graph) - , filename_(filename) - { start_ = start; length_ = length; loop_ = loop; - is_running_ = true; - - if(start_ != 0) - seek_target_ = start_; - - graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); - graph_->set_color("audio-buffer", diagnostics::color(0.7f, 0.4f, 0.4f)); - graph_->set_color("video-buffer", diagnostics::color(1.0f, 1.0f, 0.0f)); - - for(int n = 0; n < 8; ++n) - tick(); + buffer_size_ = 0; + + if(start_ > 0) + queued_seek(start_); - thread_ = boost::thread([this]{run();}); + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); + graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); + + tick(); } - ~impl() + bool try_pop(std::shared_ptr& packet) { - is_running_ = false; - cond_.notify_one(); - thread_.join(); - } - - bool try_pop_video(std::shared_ptr& packet) - { - bool result = video_stream_.try_pop(packet); + auto result = buffer_.try_pop(packet); + if(result) - cond_.notify_one(); - - graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size()/MIN_FRAMES))); - + { + if(packet) + buffer_size_ -= packet->size; + tick(); + } + + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + return result; } - - bool try_pop_audio(std::shared_ptr& packet) - { - bool result = audio_stream_.try_pop(packet); - if(result) - cond_.notify_one(); - - graph_->set_value("audio-buffer", std::min(1.0, static_cast(audio_stream_.size()/MIN_FRAMES))); - return result; + std::ptrdiff_t get_max_buffer_count() const + { + return thumbnail_mode_ ? 1 : MAX_BUFFER_COUNT; + } + + std::ptrdiff_t get_min_buffer_count() const + { + return thumbnail_mode_ ? 0 : MIN_BUFFER_COUNT; } - void seek(uint32_t target) + std::future seek(uint32_t target) { + if (!executor_.is_running()) + return make_ready_future(false); + + return executor_.begin_invoke([=]() -> bool { - boost::lock_guard lock(mutex_); + std::shared_ptr packet; + while(buffer_.try_pop(packet) && packet) + buffer_size_ -= packet->size; - seek_target_ = target; - video_stream_.clear(); - audio_stream_.clear(); - } - - cond_.notify_one(); + queued_seek(target); + + tick(); + + return true; + }, task_priority::high_priority); } - + std::wstring print() const { return L"ffmpeg_input[" + filename_ + L")]"; } -private: - void internal_seek(uint32_t target) + bool full() const { - graph_->set_tag("seek"); + return (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > get_max_buffer_count()) && buffer_.size() > get_min_buffer_count(); + } - CASPAR_LOG(debug) << print() << " Seeking: " << target; + void tick() + { + if(!executor_.is_running()) + return; - int flags = AVSEEK_FLAG_FRAME; - if(target == 0) + executor_.begin_invoke([this] { - // Fix VP6 seeking - int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); - if(vid_stream_index >= 0) + if(full()) + return; + + try { - auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; - if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) - flags = AVSEEK_FLAG_BYTE; + auto packet = create_packet(); + + auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + + if(is_eof(ret)) + { + frame_number_ = 0; + + if(loop_) + { + queued_seek(start_); + graph_->set_tag(diagnostics::tag_severity::INFO, "seek"); + CASPAR_LOG(trace) << print() << " Looping."; + } + else + executor_.stop(); + } + else + { + THROW_ON_ERROR(ret, "av_read_frame", print()); + + if(packet->stream_index == default_stream_index_) + ++frame_number_; + + THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); + + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + auto size = packet->size; + auto data = packet->data; + + packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) + { + packet->size = size; + packet->data = data; + }); + + buffer_.try_push(packet); + buffer_size_ += packet->size; + + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + } + + tick(); } - } - - auto stream = format_context_->streams[default_stream_index_]; - auto codec = stream->codec; - auto fixed_target = (target*stream->time_base.den*codec->time_base.num)/(stream->time_base.num*codec->time_base.den)*codec->ticks_per_frame; - - THROW_ON_ERROR2(avformat_seek_file(format_context_.get(), default_stream_index_, std::numeric_limits::min(), fixed_target, fixed_target, 0), print()); - - video_stream_.push(nullptr); - audio_stream_.push(nullptr); + catch(...) + { + if (!thumbnail_mode_) + CASPAR_LOG_CURRENT_EXCEPTION(); + executor_.stop(); + } + }); } - void tick() + spl::shared_ptr open_input(const std::wstring& url_or_file, const ffmpeg_options& vid_params) { - if(seek_target_) - { - internal_seek(*seek_target_); - seek_target_.reset(); - } + AVDictionary* format_options = nullptr; - auto packet = create_packet(); - - auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - - if(is_eof(ret)) + CASPAR_SCOPE_EXIT { - video_stream_.push(packet); - audio_stream_.push(packet); + if (format_options) + av_dict_free(&format_options); + }; - if(loop_) - internal_seek(start_); + for (auto& option : vid_params) + av_dict_set(&format_options, option.first.c_str(), option.second.c_str(), 0); + + auto resource_name = std::wstring(); + auto parts = caspar::protocol_split(url_or_file); + AVInputFormat* input_format = nullptr; + + if (parts.at(0).empty()) + resource_name = parts.at(1); + else if (parts.at(0) == L"dshow") + { + input_format = av_find_input_format("dshow"); + resource_name = parts.at(1); } else - { - THROW_ON_ERROR(ret, "av_read_frame", print()); - - THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); - - // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. - const auto size = packet->size; - const auto data = packet->data; - - packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) - { - packet->size = size; - packet->data = data; - }); - - const auto stream_time_base = format_context_->streams[packet->stream_index]->time_base; - const auto packet_frame_number = static_cast((static_cast(packet->pts * stream_time_base.num)/stream_time_base.den)*fps_); - - if(packet->stream_index == default_stream_index_) - frame_number_ = packet_frame_number; - - if(packet_frame_number >= start_ && packet_frame_number < length_) + resource_name = parts.at(0) + L"://" + parts.at(1); + + AVFormatContext* weak_context = nullptr; + THROW_ON_ERROR2(avformat_open_input(&weak_context, u8(resource_name).c_str(), input_format, &format_options), resource_name); + + spl::shared_ptr context(weak_context, [](AVFormatContext* ptr) + { + avformat_close_input(&ptr); + }); + + if (format_options) + { + std::string unsupported_tokens = ""; + AVDictionaryEntry *t = NULL; + while ((t = av_dict_get(format_options, "", t, AV_DICT_IGNORE_SUFFIX)) != nullptr) { - video_stream_.push(packet); - audio_stream_.push(packet); + if (!unsupported_tokens.empty()) + unsupported_tokens += ", "; + unsupported_tokens += t->key; } - } - - graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size()/MIN_FRAMES))); - graph_->set_value("audio-buffer", std::min(1.0, static_cast(audio_stream_.size()/MIN_FRAMES))); - } - - bool full() const - { - return video_stream_.size() > MIN_FRAMES && audio_stream_.size() > MIN_FRAMES; + CASPAR_THROW_EXCEPTION(user_error() << msg_info(unsupported_tokens)); + } + + THROW_ON_ERROR2(avformat_find_stream_info(context.get(), nullptr), resource_name); + fix_meta_data(*context); + return context; } - void run() + void fix_meta_data(AVFormatContext& context) { - ensure_gpf_handler_installed_for_thread(u8(print()).c_str()); + auto video_index = av_find_best_stream(&context, AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); - while(is_running_) + if (video_index > -1) { - try + auto video_stream = context.streams[video_index]; + auto video_context = context.streams[video_index]->codec; + + if (boost::filesystem::path(context.filename).extension().string() == ".flv") { - boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); - + try { - boost::unique_lock lock(mutex_); - - while(full() && !seek_target_ && is_running_) - cond_.wait(lock); - - tick(); + auto meta = read_flv_meta_info(context.filename); + double fps = boost::lexical_cast(meta["framerate"]); + video_stream->nb_frames = static_cast(boost::lexical_cast(meta["duration"])*fps); } + catch (...) {} } - catch(...) + else + { + auto stream_time = video_stream->time_base; + auto duration = video_stream->duration; + auto codec_time = video_context->time_base; + auto ticks = video_context->ticks_per_frame; + + if (video_stream->nb_frames == 0) + video_stream->nb_frames = (duration*stream_time.num*codec_time.den) / (stream_time.den*codec_time.num*ticks); + } + } + } + + void queued_seek(const uint32_t target) + { + if (!thumbnail_mode_) + CASPAR_LOG(debug) << print() << " Seeking: " << target; + + int flags = AVSEEK_FLAG_FRAME; + if(target == 0) + { + // Fix VP6 seeking + int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + if(vid_stream_index >= 0) { - CASPAR_LOG_CURRENT_EXCEPTION(); - is_running_ = false; + auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; + if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) + flags = AVSEEK_FLAG_BYTE; } } + + auto stream = format_context_->streams[default_stream_index_]; + + + auto fps = read_fps(*format_context_, 0.0); + + THROW_ON_ERROR2(avformat_seek_file( + format_context_.get(), + default_stream_index_, + std::numeric_limits::min(), + static_cast((target / fps * stream->time_base.den) / stream->time_base.num), + std::numeric_limits::max(), + 0), print()); + + auto flush_packet = create_packet(); + flush_packet->data = nullptr; + flush_packet->size = 0; + flush_packet->pos = target; + + buffer_.push(flush_packet); } - + bool is_eof(int ret) { - #pragma warning (disable : 4146) + if(ret == AVERROR(EIO)) + CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. "; + if(ret == AVERROR_EOF) + CASPAR_LOG(trace) << print() << " Received EOF. "; + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; } + + int num_audio_streams() const + { + return 0; // TODO + } + + boost::rational framerate() const + { + return framerate_; + } }; -input::input(const spl::shared_ptr& graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length) - : impl_(new impl(graph, filename, loop, start, length)){} -bool input::try_pop_video(std::shared_ptr& packet){return impl_->try_pop_video(packet);} -bool input::try_pop_audio(std::shared_ptr& packet){return impl_->try_pop_audio(packet);} -AVFormatContext& input::context(){return *impl_->format_context_;} -void input::loop(bool value){impl_->loop_ = value;} -bool input::loop() const{return impl_->loop_;} -void input::seek(uint32_t target){impl_->seek(target);} +input::input(const spl::shared_ptr& graph, const std::wstring& url_or_file, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : impl_(new implementation(graph, url_or_file, loop, start, length, thumbnail_mode, vid_params)){} +bool input::eof() const {return !impl_->executor_.is_running();} +bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} +spl::shared_ptr input::context(){return impl_->format_context_;} void input::start(uint32_t value){impl_->start_ = value;} uint32_t input::start() const{return impl_->start_;} void input::length(uint32_t value){impl_->length_ = value;} uint32_t input::length() const{return impl_->length_;} +void input::loop(bool value){impl_->loop_ = value;} +bool input::loop() const{return impl_->loop_;} +int input::num_audio_streams() const { return impl_->num_audio_streams(); } +boost::rational input::framerate() const { return impl_->framerate(); } +std::future input::seek(uint32_t target){return impl_->seek(target);} }}