X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput%2Finput.cpp;h=c51cf152d406c503b0807e55e139664eba10a045;hb=70235ae09df45c874e133fd4d3fcc1e6e34e993a;hp=093bc7ac401a82d89f4d9f5df482ebf0f2a9fce4;hpb=28e1d17d30f053c37eee8e9b045f735dd9385e83;p=casparcg diff --git a/modules/ffmpeg/producer/input/input.cpp b/modules/ffmpeg/producer/input/input.cpp index 093bc7ac4..c51cf152d 100644 --- a/modules/ffmpeg/producer/input/input.cpp +++ b/modules/ffmpeg/producer/input/input.cpp @@ -1,286 +1,429 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Robert Nagy, ronag89@gmail.com -*/ - -#include "../../stdafx.h" - -#include "input.h" - -#include "../util/util.h" -#include "../../ffmpeg_error.h" - -#include - -#include -#include -#include - -#include -#include -#include - -#include -#include -#include -#include - -#if defined(_MSC_VER) -#pragma warning (push) -#pragma warning (disable : 4244) -#endif -extern "C" -{ - #define __STDC_CONSTANT_MACROS - #define __STDC_LIMIT_MACROS - #include -} -#if defined(_MSC_VER) -#pragma warning (pop) -#endif - -namespace caspar { namespace ffmpeg { - -static const size_t MAX_BUFFER_COUNT = 100; -static const size_t MIN_BUFFER_COUNT = 4; -static const size_t MAX_BUFFER_SIZE = 16 * 1000000; - -struct input::implementation : boost::noncopyable -{ - safe_ptr graph_; - - const safe_ptr format_context_; // Destroy this last - const int default_stream_index_; - - const std::wstring filename_; - tbb::atomic loop_; - const size_t start_; - const size_t length_; - size_t frame_number_; - - tbb::concurrent_bounded_queue> buffer_; - tbb::atomic buffer_size_; - boost::condition_variable buffer_cond_; - boost::mutex buffer_mutex_; - - boost::thread thread_; - tbb::atomic is_running_; - tbb::atomic is_eof_; - - tbb::recursive_mutex mutex_; - - explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : graph_(graph) - , format_context_(open_input(filename)) - , default_stream_index_(av_find_default_stream_index(format_context_.get())) - , filename_(filename) - , start_(start) - , length_(length) - , frame_number_(0) - { - is_eof_ = false; - loop_ = loop; - buffer_size_ = 0; - - if(start_ > 0) - do_seek(start_); - - graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); - graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); - graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); - - is_running_ = true; - thread_ = boost::thread([this]{run();}); - - CASPAR_LOG(info) << print() << L" Initialized."; - } - - ~implementation() - { - is_running_ = false; - buffer_cond_.notify_all(); - thread_.join(); - } - - bool try_pop(std::shared_ptr& packet) - { - const bool result = buffer_.try_pop(packet); - - if(result) - { - if(packet) - buffer_size_ -= packet->size; - buffer_cond_.notify_all(); - } - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - - return result; - } - - void run() - { - caspar::win32_exception::install_handler(); - - try - { - CASPAR_LOG(info) << print() << " Thread Started."; - - while(is_running_) - { - { - boost::unique_lock lock(buffer_mutex_); - while(full()) - buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20)); - } - read_next_packet(); - } - - CASPAR_LOG(info) << print() << " Thread Stopped."; - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - is_running_ = false; - } - } - - void read_next_packet() - { - tbb::recursive_mutex::scoped_lock lock(mutex_); - - auto packet = create_packet(); - auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - - if(is_eof(ret)) - { - frame_number_ = 0; - is_eof_ = true; - - if(loop_) - { - do_seek(start_); - graph_->add_tag("seek"); - CASPAR_LOG(trace) << print() << " Looping."; - } - } - else - { - THROW_ON_ERROR(ret, "av_read_frame", print()); - - if(packet->stream_index == default_stream_index_) - ++frame_number_; - - THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); - - // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. - auto size = packet->size; - auto data = packet->data; - - packet = safe_ptr(packet.get(), [packet, size, data](AVPacket*) - { - packet->size = size; - packet->data = data; - }); - - buffer_.try_push(packet); - buffer_size_ += packet->size; - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - } - } - - bool full() const - { - return is_running_ && (is_eof_ || (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT); - } - - void do_seek(const int64_t target) - { - CASPAR_LOG(debug) << print() << " Seeking: " << target; - - int flags = AVSEEK_FLAG_FRAME; - if(target == 0) - { - // Fix VP6 seeking - int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); - if(vid_stream_index >= 0) - { - auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; - if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) - flags = AVSEEK_FLAG_BYTE; - } - } - - auto stream = format_context_->streams[default_stream_index_]; - auto codec = stream->codec; - auto fixed_target = (target*stream->time_base.den*codec->time_base.num)/(stream->time_base.num*codec->time_base.den)*codec->ticks_per_frame; - - THROW_ON_ERROR2(avformat_seek_file(format_context_.get(), default_stream_index_, std::numeric_limits::min(), fixed_target, std::numeric_limits::max(), 0), print()); - - is_eof_ = false; - buffer_cond_.notify_all(); - - auto flush_packet = create_packet(); - flush_packet->data = nullptr; - flush_packet->size = 0; - flush_packet->pos = target; - - buffer_.push(flush_packet); - } - - void seek(int64_t target) - { - tbb::recursive_mutex::scoped_lock lock(mutex_); - - std::shared_ptr packet; - while(try_pop(packet)) - { - } - - do_seek(target); - } - - bool is_eof(int ret) - { - if(ret == AVERROR(EIO)) - CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. "; - if(ret == AVERROR_EOF) - CASPAR_LOG(trace) << print() << " Received EOF. "; - - return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; - } - - std::wstring print() const - { - return L"ffmpeg_input[" + filename_ + L")]"; - } -}; - -input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : impl_(new implementation(graph, filename, loop, start, length)){} -bool input::eof() const {return impl_->is_eof_;} -bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} -safe_ptr input::context(){return impl_->format_context_;} -void input::loop(bool value){impl_->loop_ = value;} -bool input::loop() const{return impl_->loop_;} -void input::seek(int64_t target){impl_->seek(target);} -}} +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Robert Nagy, ronag89@gmail.com +*/ + +#include "../../StdAfx.h" + +#include "input.h" + +#include "../util/util.h" +#include "../../ffmpeg_error.h" +#include "../../ffmpeg.h" + +#include +#include +#include +//#include +#include +#include + +#include + +#include +#include +#include + +#include +#include +#include + +#if defined(_MSC_VER) +#pragma warning (push) +#pragma warning (disable : 4244) +#endif +extern "C" +{ + #define __STDC_CONSTANT_MACROS + #define __STDC_LIMIT_MACROS + #include +} +#if defined(_MSC_VER) +#pragma warning (pop) +#endif + +namespace caspar { namespace ffmpeg { + +static const int MAX_PUSH_WITHOUT_POP = 200; +static const int MIN_FRAMES = 25; + +class stream +{ + stream(const stream&); + stream& operator=(const stream&); + + typedef tbb::concurrent_bounded_queue>::size_type size_type; + + int index_; + tbb::concurrent_bounded_queue> packets_; + tbb::atomic push_since_pop_; +public: + + stream(int index) + : index_(index) + { + push_since_pop_ = 0; + } + + stream(stream&&) = default; + + bool is_available() const + { + return index_ >= 0; + } + + int index() const + { + return index_; + } + + void push(const std::shared_ptr& packet) + { + if(packet && packet->data && packet->stream_index != index_) + return; + + if (++push_since_pop_ > MAX_PUSH_WITHOUT_POP) // Out of memory protection for streams never being used. + { + return; + } + + packets_.push(packet); + } + + bool try_pop(std::shared_ptr& packet) + { + push_since_pop_ = 0; + + return packets_.try_pop(packet); + } + + void clear() + { + std::shared_ptr packet; + push_since_pop_ = 0; + while(packets_.try_pop(packet)); + } + + size_type size() const + { + return is_available() ? packets_.size() : std::numeric_limits::max(); + } +}; + +struct input::impl : boost::noncopyable +{ + const spl::shared_ptr graph_; + + const std::wstring filename_; + const spl::shared_ptr format_context_ = open_input(filename_); // Destroy this last + const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); + + tbb::atomic start_; + tbb::atomic length_; + tbb::atomic loop_; + tbb::atomic eof_; + double fps_ = read_fps(*format_context_, 0.0); + uint32_t frame_number_ = 0; + + stream video_stream_ { av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0) }; + std::vector audio_streams_; + + boost::optional seek_target_; + + tbb::atomic is_running_; + boost::mutex mutex_; + boost::condition_variable cond_; + boost::thread thread_; + + impl( + const spl::shared_ptr graph, + const std::wstring& filename, + const bool loop, + const uint32_t start, + const uint32_t length, + bool thumbnail_mode) + : graph_(graph) + , filename_(filename) + { + start_ = start; + length_ = length; + loop_ = loop; + eof_ = false; + is_running_ = true; + + if(start_ != 0) + seek_target_ = start_; + + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + + if (!thumbnail_mode) + for (unsigned i = 0; i < format_context_->nb_streams; ++i) + if (format_context_->streams[i]->codec->codec_type == AVMediaType::AVMEDIA_TYPE_AUDIO) + audio_streams_.emplace_back(i); + + for (int i = 0; i < audio_streams_.size(); ++i) + graph_->set_color("audio-buffer" + boost::lexical_cast(i + 1), diagnostics::color(0.7f, 0.4f, 0.4f)); + + if (video_stream_.is_available()) + graph_->set_color("video-buffer", diagnostics::color(1.0f, 1.0f, 0.0f)); + + for(int n = 0; n < 8; ++n) + tick(); + + thread_ = boost::thread([this, thumbnail_mode]{run(thumbnail_mode);}); + } + + ~impl() + { + is_running_ = false; + cond_.notify_one(); + thread_.join(); + } + + bool try_pop_video(std::shared_ptr& packet) + { + if (!video_stream_.is_available()) + return false; + + bool result = video_stream_.try_pop(packet); + + if(result) + cond_.notify_one(); + + graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size())/MIN_FRAMES)); + + return result; + } + + bool try_pop_audio(std::shared_ptr& packet, int audio_stream_index) + { + if (audio_streams_.size() < audio_stream_index + 1) + return false; + + auto& audio_stream = audio_streams_.at(audio_stream_index); + bool result = audio_stream.try_pop(packet); + if(result) + cond_.notify_one(); + + auto buffer_nr = boost::lexical_cast(audio_stream_index + 1); + graph_->set_value("audio-buffer" + buffer_nr, std::min(1.0, static_cast(audio_stream.size())/MIN_FRAMES)); + + return result; + } + + void seek(uint32_t target) + { + { + boost::lock_guard lock(mutex_); + + seek_target_ = target; + video_stream_.clear(); + + for (auto& audio_stream : audio_streams_) + audio_stream.clear(); + } + + cond_.notify_one(); + } + + int get_actual_audio_stream_index(int audio_stream_index) const + { + if (audio_stream_index + 1 > audio_streams_.size()) + CASPAR_THROW_EXCEPTION(averror_stream_not_found()); + + return audio_streams_.at(audio_stream_index).index(); + } + + std::wstring print() const + { + return L"ffmpeg_input[" + filename_ + L")]"; + } + +private: + void internal_seek(uint32_t target) + { + eof_ = false; + graph_->set_tag(diagnostics::tag_severity::INFO, "seek"); + + if (is_logging_quiet_for_thread()) + CASPAR_LOG(trace) << print() << " Seeking: " << target; + else + CASPAR_LOG(debug) << print() << " Seeking: " << target; + + int flags = AVSEEK_FLAG_FRAME; + if(target == 0) + { + // Fix VP6 seeking + int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + if(vid_stream_index >= 0) + { + auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; + if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) + flags = AVSEEK_FLAG_BYTE; + } + } + + auto stream = format_context_->streams[default_stream_index_]; + auto fps = read_fps(*format_context_, 0.0); + auto target_timestamp = static_cast((target / fps * stream->time_base.den) / stream->time_base.num); + + THROW_ON_ERROR2(avformat_seek_file( + format_context_.get(), + default_stream_index_, + std::numeric_limits::min(), + target_timestamp, + std::numeric_limits::max(), + 0), print()); + + auto flush_packet = create_packet(); + flush_packet->data = nullptr; + flush_packet->size = 0; + flush_packet->pos = target; + + video_stream_.push(flush_packet); + + for (auto& audio_stream : audio_streams_) + audio_stream.push(flush_packet); + } + + void tick() + { + if(seek_target_) + { + internal_seek(*seek_target_); + seek_target_.reset(); + } + + auto packet = create_packet(); + + auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + + if(is_eof(ret)) + { + if (loop_) + internal_seek(start_); + else + { + eof_ = true; + } + } + else + { + THROW_ON_ERROR(ret, "av_read_frame", print()); + + THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); + + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + const auto size = packet->size; + const auto data = packet->data; + + packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) + { + packet->size = size; + packet->data = data; + }); + + const auto stream_time_base = format_context_->streams[packet->stream_index]->time_base; + const auto packet_frame_number = static_cast((static_cast(packet->pts * stream_time_base.num)/stream_time_base.den)*fps_); + + if(packet->stream_index == default_stream_index_) + frame_number_ = packet_frame_number; + + if(packet_frame_number >= start_ && packet_frame_number < length_) + { + video_stream_.push(packet); + + for (auto& audio_stream : audio_streams_) + audio_stream.push(packet); + } + } + + if (video_stream_.is_available()) + graph_->set_value("video-buffer", std::min(1.0, static_cast(video_stream_.size())/MIN_FRAMES)); + + for (int i = 0; i < audio_streams_.size(); ++i) + graph_->set_value( + "audio-buffer" + boost::lexical_cast(i + 1), + std::min(1.0, static_cast(audio_streams_[i].size())/MIN_FRAMES)); + } + + bool full() const + { + bool video_full = video_stream_.size() >= MIN_FRAMES; + + if (!video_full) + return false; + + for (auto& audio_stream : audio_streams_) + if (audio_stream.size() < MIN_FRAMES) + return false; + + return true; + } + + void run(bool thumbnail_mode) + { + ensure_gpf_handler_installed_for_thread(u8(print()).c_str()); + auto quiet_logging = temporary_enable_quiet_logging_for_thread(thumbnail_mode); + + while(is_running_) + { + try + { + + { + boost::unique_lock lock(mutex_); + + while((eof_ || full()) && !seek_target_ && is_running_) + cond_.wait(lock); + + tick(); + } + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + is_running_ = false; + } + } + } + + bool is_eof(int ret) + { + #pragma warning (disable : 4146) + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; + } +}; + +input::input(const spl::shared_ptr& graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode) + : impl_(new impl(graph, filename, loop, start, length, thumbnail_mode)){} +int input::get_actual_audio_stream_index(int audio_stream_index) const { return impl_->get_actual_audio_stream_index(audio_stream_index); }; +int input::num_audio_streams() const { return static_cast(impl_->audio_streams_.size()); } +bool input::try_pop_video(std::shared_ptr& packet){return impl_->try_pop_video(packet);} +bool input::try_pop_audio(std::shared_ptr& packet, int audio_stream_index){return impl_->try_pop_audio(packet, audio_stream_index);} +AVFormatContext& input::context(){return *impl_->format_context_;} +void input::loop(bool value){impl_->loop_ = value;} +bool input::loop() const{return impl_->loop_;} +void input::seek(uint32_t target){impl_->seek(target);} +void input::start(uint32_t value){impl_->start_ = value;} +uint32_t input::start() const{return impl_->start_;} +void input::length(uint32_t value){impl_->length_ = value;} +uint32_t input::length() const{return impl_->length_;} +bool input::eof() const { return impl_->eof_; } +}}