X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fproducer%2Finput%2Finput.cpp;h=72648037d8d9704374ebaacf0e3ee89d3dc3394e;hb=c0dc760a3d87b346c9f267cd9d74c67c55d3bdc3;hp=95842235a138d4652941b86fe9327dc8d8bb8d4e;hpb=7060484708769b2798ea0ac1ed20463eef3abc15;p=casparcg diff --git a/modules/ffmpeg/producer/input/input.cpp b/modules/ffmpeg/producer/input/input.cpp index 95842235a..72648037d 100644 --- a/modules/ffmpeg/producer/input/input.cpp +++ b/modules/ffmpeg/producer/input/input.cpp @@ -1,294 +1,399 @@ -/* -* copyright (c) 2010 Sveriges Television AB -* -* This file is part of CasparCG. -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. - -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -*/ -#if defined(_MSC_VER) -#pragma warning (disable : 4244) -#endif - -#include "../../stdafx.h" - -#include "input.h" - -#include "../util/util.h" -#include "../../ffmpeg_error.h" - -#include - -#include -#include -#include - -#include -#include - -#include -#include -#include -#include - -#if defined(_MSC_VER) -#pragma warning (push) -#pragma warning (disable : 4244) -#endif -extern "C" -{ - #define __STDC_CONSTANT_MACROS - #define __STDC_LIMIT_MACROS - #include -} -#if defined(_MSC_VER) -#pragma warning (pop) -#endif - -namespace caspar { namespace ffmpeg { - -static const size_t MAX_BUFFER_COUNT = 100; -static const size_t MIN_BUFFER_COUNT = 4; -static const size_t MAX_BUFFER_SIZE = 16 * 1000000; - -struct input::implementation : boost::noncopyable -{ - safe_ptr graph_; - - const safe_ptr format_context_; // Destroy this last - const int default_stream_index_; - - const std::wstring filename_; - const bool loop_; - const size_t start_; - const size_t length_; - size_t frame_number_; - - tbb::concurrent_bounded_queue> buffer_; - tbb::atomic buffer_size_; - boost::condition_variable buffer_cond_; - boost::mutex buffer_mutex_; - - tbb::atomic nb_frames_; - tbb::atomic nb_loops_; - - boost::thread thread_; - tbb::atomic is_running_; - -public: - explicit implementation(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : graph_(graph) - , format_context_(open_input(filename)) - , default_stream_index_(av_find_default_stream_index(format_context_.get())) - , loop_(loop) - , filename_(filename) - , start_(start) - , length_(length) - , frame_number_(0) - { - buffer_size_ = 0; - nb_frames_ = 0; - nb_loops_ = 0; - - buffer_size_ = 0; - nb_frames_ = 0; - nb_loops_ = 0; - - if(start_ > 0) - seek_frame(start_); - - graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); - graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); - graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); - - is_running_ = true; - thread_ = boost::thread([this]{run();}); - - CASPAR_LOG(info) << print() << L" Initialized."; - } - - ~implementation() - { - is_running_ = false; - buffer_cond_.notify_all(); - thread_.join(); - } - - bool try_pop(std::shared_ptr& packet) - { - const bool result = buffer_.try_pop(packet); - - if(result) - { - if(packet) - buffer_size_ -= packet->size; - buffer_cond_.notify_all(); - } - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - - return result; - } - - size_t nb_frames() const - { - return nb_frames_; - } - - size_t nb_loops() const - { - return nb_loops_; - } - -private: - - void run() - { - caspar::win32_exception::install_handler(); - - try - { - CASPAR_LOG(info) << print() << " Thread Started."; - - CASPAR_ASSERT(nb_frames_ < 1000); - - while(is_running_) - { - { - boost::unique_lock lock(buffer_mutex_); - while(full()) - buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20)); - } - read_next_packet(); - } - - CASPAR_LOG(info) << print() << " Thread Stopped."; - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - is_running_ = false; - } - } - - void read_next_packet() - { - auto packet = create_packet(); - auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. - - if(is_eof(ret)) - { - ++nb_loops_; - frame_number_ = 0; - - if(loop_) - { - seek_frame(start_); - graph_->add_tag("seek"); - CASPAR_LOG(debug) << print() << " Looping."; - } - else - { - is_running_ = false; - CASPAR_LOG(debug) << print() << " Stopping."; - } - } - else - { - THROW_ON_ERROR(ret, "av_read_frame", print()); - - if(packet->stream_index == default_stream_index_) - { - if(nb_loops_ == 0) - ++nb_frames_; - ++frame_number_; - } - - THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); - - // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. - auto size = packet->size; - auto data = packet->data; - - packet = safe_ptr(packet.get(), [packet, size, data](AVPacket*) - { - packet->size = size; - packet->data = data; - }); - - buffer_.try_push(packet); - buffer_size_ += packet->size; - - graph_->update_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); - graph_->update_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); - } - } - - bool full() const - { - return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT; - } - - void seek_frame(int64_t target) - { - CASPAR_LOG(debug) << print() << " Seeking: " << target; - - int flags = AVSEEK_FLAG_FRAME; - if(target == 0) - { - // Fix VP6 seeking - int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); - if(vid_stream_index >= 0) - { - auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; - if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) - flags = AVSEEK_FLAG_BYTE; - } - } - - auto time_base = format_context_->streams[default_stream_index_]->time_base; - target = (target*time_base.den)/time_base.num; - auto fixed_time_base = fix_time_base(time_base); - target = (target * fixed_time_base.num) / fixed_time_base.den; - - THROW_ON_ERROR2(avformat_seek_file(format_context_.get(), default_stream_index_, std::numeric_limits::min(), target, std::numeric_limits::max(), 0), print()); - - buffer_.push(flush_packet()); - } - - bool is_eof(int ret) - { - if(ret == AVERROR(EIO)) - CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. " << nb_frames_; - if(ret == AVERROR_EOF) - CASPAR_LOG(debug) << print() << " Received EOF. " << nb_frames_; - - return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; - } - - std::wstring print() const - { - return L"ffmpeg_input[" + filename_ + L")]"; - } -}; - -input::input(const safe_ptr& graph, const std::wstring& filename, bool loop, size_t start, size_t length) - : impl_(new implementation(graph, filename, loop, start, length)){} -bool input::eof() const {return !impl_->is_running_;} -bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} -safe_ptr input::context(){return impl_->format_context_;} -size_t input::nb_frames() const {return impl_->nb_frames();} -size_t input::nb_loops() const {return impl_->nb_loops();} -}} +/* +* Copyright 2013 Sveriges Television AB http://casparcg.com/ +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Robert Nagy, ronag89@gmail.com +*/ + +#include "../../StdAfx.h" + +#include "input.h" + +#include "../util/util.h" +#include "../util/flv.h" +#include "../../ffmpeg_error.h" +#include "../../ffmpeg.h" + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#if defined(_MSC_VER) +#pragma warning (push) +#pragma warning (disable : 4244) +#endif +extern "C" +{ + #define __STDC_CONSTANT_MACROS + #define __STDC_LIMIT_MACROS + #include +} +#if defined(_MSC_VER) +#pragma warning (pop) +#endif + +static const size_t MAX_BUFFER_COUNT = 100; +static const size_t MAX_BUFFER_COUNT_RT = 3; +static const size_t MIN_BUFFER_COUNT = 50; +static const size_t MAX_BUFFER_SIZE = 64 * 1000000; + +namespace caspar { namespace ffmpeg { +struct input::implementation : boost::noncopyable +{ + const spl::shared_ptr graph_; + + const spl::shared_ptr format_context_; // Destroy this last + const int default_stream_index_ = av_find_default_stream_index(format_context_.get()); + + const std::wstring filename_; + tbb::atomic start_; + tbb::atomic length_; + const bool thumbnail_mode_; + tbb::atomic loop_; + uint32_t frame_number_ = 0; + boost::rational framerate_ = read_framerate(*format_context_, 1); + + tbb::concurrent_bounded_queue> buffer_; + tbb::atomic buffer_size_; + + executor executor_; + + explicit implementation(const spl::shared_ptr graph, const std::wstring& url_or_file, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : graph_(graph) + , format_context_(open_input(url_or_file, vid_params)) + , filename_(url_or_file) + , thumbnail_mode_(thumbnail_mode) + , executor_(print()) + { + if (thumbnail_mode_) + executor_.invoke([] + { + enable_quiet_logging_for_thread(); + }); + + start_ = start; + length_ = length; + loop_ = loop; + buffer_size_ = 0; + + if(start_ > 0) + queued_seek(start_); + + graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); + graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f)); + graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); + + tick(); + } + + bool try_pop(std::shared_ptr& packet) + { + auto result = buffer_.try_pop(packet); + + if(result) + { + if(packet) + buffer_size_ -= packet->size; + tick(); + } + + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + + return result; + } + + std::ptrdiff_t get_max_buffer_count() const + { + return thumbnail_mode_ ? 1 : MAX_BUFFER_COUNT; + } + + std::ptrdiff_t get_min_buffer_count() const + { + return thumbnail_mode_ ? 0 : MIN_BUFFER_COUNT; + } + + std::future seek(uint32_t target) + { + if (!executor_.is_running()) + return make_ready_future(false); + + return executor_.begin_invoke([=]() -> bool + { + std::shared_ptr packet; + while(buffer_.try_pop(packet) && packet) + buffer_size_ -= packet->size; + + queued_seek(target); + + tick(); + + return true; + }, task_priority::high_priority); + } + + std::wstring print() const + { + return L"ffmpeg_input[" + filename_ + L")]"; + } + + bool full() const + { + return (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > get_max_buffer_count()) && buffer_.size() > get_min_buffer_count(); + } + + void tick() + { + if(!executor_.is_running()) + return; + + executor_.begin_invoke([this] + { + if(full()) + return; + + try + { + auto packet = create_packet(); + + auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. + + if(is_eof(ret)) + { + frame_number_ = 0; + + if(loop_) + { + queued_seek(start_); + graph_->set_tag(diagnostics::tag_severity::INFO, "seek"); + CASPAR_LOG(trace) << print() << " Looping."; + } + else + executor_.stop(); + } + else + { + THROW_ON_ERROR(ret, "av_read_frame", print()); + + if(packet->stream_index == default_stream_index_) + ++frame_number_; + + THROW_ON_ERROR2(av_dup_packet(packet.get()), print()); + + // Make sure that the packet is correctly deallocated even if size and data is modified during decoding. + auto size = packet->size; + auto data = packet->data; + + packet = spl::shared_ptr(packet.get(), [packet, size, data](AVPacket*) + { + packet->size = size; + packet->data = data; + }); + + buffer_.try_push(packet); + buffer_size_ += packet->size; + + graph_->set_value("buffer-size", (static_cast(buffer_size_)+0.001)/MAX_BUFFER_SIZE); + graph_->set_value("buffer-count", (static_cast(buffer_.size()+0.001)/MAX_BUFFER_COUNT)); + } + + tick(); + } + catch(...) + { + if (!thumbnail_mode_) + CASPAR_LOG_CURRENT_EXCEPTION(); + executor_.stop(); + } + }); + } + + spl::shared_ptr open_input(const std::wstring& url_or_file, const ffmpeg_options& vid_params) + { + AVDictionary* format_options = nullptr; + + CASPAR_SCOPE_EXIT + { + if (format_options) + av_dict_free(&format_options); + }; + + for (auto& option : vid_params) + av_dict_set(&format_options, option.first.c_str(), option.second.c_str(), 0); + + auto resource_name = std::wstring(); + auto parts = caspar::protocol_split(url_or_file); + AVInputFormat* input_format = nullptr; + + if (parts.at(0).empty()) + resource_name = parts.at(1); + else if (parts.at(0) == L"dshow") + { + input_format = av_find_input_format("dshow"); + resource_name = parts.at(1); + } + else + resource_name = parts.at(0) + L"://" + parts.at(1); + + AVFormatContext* weak_context = nullptr; + THROW_ON_ERROR2(avformat_open_input(&weak_context, u8(resource_name).c_str(), input_format, &format_options), resource_name); + + spl::shared_ptr context(weak_context, [](AVFormatContext* ptr) + { + avformat_close_input(&ptr); + }); + + if (format_options) + { + std::string unsupported_tokens = ""; + AVDictionaryEntry *t = NULL; + while ((t = av_dict_get(format_options, "", t, AV_DICT_IGNORE_SUFFIX)) != nullptr) + { + if (!unsupported_tokens.empty()) + unsupported_tokens += ", "; + unsupported_tokens += t->key; + } + CASPAR_THROW_EXCEPTION(user_error() << msg_info(unsupported_tokens)); + } + + THROW_ON_ERROR2(avformat_find_stream_info(context.get(), nullptr), resource_name); + fix_meta_data(*context); + return context; + } + + void fix_meta_data(AVFormatContext& context) + { + auto video_index = av_find_best_stream(&context, AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + + if (video_index > -1) + { + auto video_stream = context.streams[video_index]; + auto video_context = context.streams[video_index]->codec; + + if (boost::filesystem::path(context.filename).extension().string() == ".flv") + { + try + { + auto meta = read_flv_meta_info(context.filename); + double fps = boost::lexical_cast(meta["framerate"]); + video_stream->nb_frames = static_cast(boost::lexical_cast(meta["duration"])*fps); + } + catch (...) {} + } + else + { + auto stream_time = video_stream->time_base; + auto duration = video_stream->duration; + auto codec_time = video_context->time_base; + auto ticks = video_context->ticks_per_frame; + + if (video_stream->nb_frames == 0) + video_stream->nb_frames = (duration*stream_time.num*codec_time.den) / (stream_time.den*codec_time.num*ticks); + } + } + } + + void queued_seek(const uint32_t target) + { + if (!thumbnail_mode_) + CASPAR_LOG(debug) << print() << " Seeking: " << target; + + int flags = AVSEEK_FLAG_FRAME; + if(target == 0) + { + // Fix VP6 seeking + int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0); + if(vid_stream_index >= 0) + { + auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id; + if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6) + flags = AVSEEK_FLAG_BYTE; + } + } + + auto stream = format_context_->streams[default_stream_index_]; + + + auto fps = read_fps(*format_context_, 0.0); + + THROW_ON_ERROR2(avformat_seek_file( + format_context_.get(), + default_stream_index_, + std::numeric_limits::min(), + static_cast((target / fps * stream->time_base.den) / stream->time_base.num), + std::numeric_limits::max(), + 0), print()); + + auto flush_packet = create_packet(); + flush_packet->data = nullptr; + flush_packet->size = 0; + flush_packet->pos = target; + + buffer_.push(flush_packet); + } + + bool is_eof(int ret) + { + if(ret == AVERROR(EIO)) + CASPAR_LOG(trace) << print() << " Received EIO, assuming EOF. "; + if(ret == AVERROR_EOF) + CASPAR_LOG(trace) << print() << " Received EOF. "; + + return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF; + } + + int num_audio_streams() const + { + return 0; // TODO + } + + boost::rational framerate() const + { + return framerate_; + } +}; + +input::input(const spl::shared_ptr& graph, const std::wstring& url_or_file, bool loop, uint32_t start, uint32_t length, bool thumbnail_mode, const ffmpeg_options& vid_params) + : impl_(new implementation(graph, url_or_file, loop, start, length, thumbnail_mode, vid_params)){} +bool input::eof() const {return !impl_->executor_.is_running();} +bool input::try_pop(std::shared_ptr& packet){return impl_->try_pop(packet);} +spl::shared_ptr input::context(){return impl_->format_context_;} +void input::start(uint32_t value){impl_->start_ = value;} +uint32_t input::start() const{return impl_->start_;} +void input::length(uint32_t value){impl_->length_ = value;} +uint32_t input::length() const{return impl_->length_;} +void input::loop(bool value){impl_->loop_ = value;} +bool input::loop() const{return impl_->loop_;} +int input::num_audio_streams() const { return impl_->num_audio_streams(); } +boost::rational input::framerate() const { return impl_->framerate(); } +std::future input::seek(uint32_t target){return impl_->seek(target);} +}}