X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=modules%2Fffmpeg%2Fconsumer%2Fstreaming_consumer.cpp;h=d0cf1aa72df82a2983bfe7c6eb25bac27a531304;hb=49f6906495afac6548207a49a0f3c8a9e92709d6;hp=7c240d75983e9d240911a3bfbd950107f9654528;hpb=3f24604bb885d59b9dd6a804260125c054148d98;p=casparcg diff --git a/modules/ffmpeg/consumer/streaming_consumer.cpp b/modules/ffmpeg/consumer/streaming_consumer.cpp index 7c240d759..d0cf1aa72 100644 --- a/modules/ffmpeg/consumer/streaming_consumer.cpp +++ b/modules/ffmpeg/consumer/streaming_consumer.cpp @@ -3,17 +3,24 @@ #include "ffmpeg_consumer.h" #include "../ffmpeg_error.h" +#include "../producer/util/util.h" +#include "../producer/filter/filter.h" #include #include #include #include #include +#include #include #include +#include +#include +#include #include #include +#include #include #include #include @@ -41,7 +48,7 @@ #pragma warning(push) #pragma warning(disable: 4244) -extern "C" +extern "C" { #define __STDC_CONSTANT_MACROS #define __STDC_LIMIT_MACROS @@ -59,102 +66,166 @@ extern "C" #pragma warning(pop) -namespace caspar { namespace ffmpeg { +namespace caspar { namespace ffmpeg { namespace { -int crc16(const std::string& str) +void set_pixel_format(AVFilterContext* sink, AVPixelFormat pix_fmt) { - boost::crc_16_type result; +#pragma warning (push) +#pragma warning (disable : 4245) - result.process_bytes(str.data(), str.length()); + FF(av_opt_set_int_list( + sink, + "pix_fmts", + std::vector({ pix_fmt, AVPixelFormat::AV_PIX_FMT_NONE }).data(), + -1, + AV_OPT_SEARCH_CHILDREN)); - return result.checksum(); +#pragma warning (pop) } -class streaming_consumer final : public core::frame_consumer +void adjust_video_filter(const AVCodec& codec, const core::video_format_desc& in_format, AVFilterContext* sink, std::string& filter) +{ + switch (codec.id) + { + case AV_CODEC_ID_DVVIDEO: + // Crop + if (in_format.format == core::video_format::ntsc) + filter = u8(append_filter(u16(filter), L"crop=720:480:0:2")); + + // Pixel format selection + if (in_format.format == core::video_format::ntsc) + set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV411P); + else if (in_format.format == core::video_format::pal) + set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV420P); + else + set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV422P); + + // Scale + if (in_format.height == 1080) + filter = u8(append_filter(u16(filter), in_format.duration == 1001 + ? L"scale=1280:1080" + : L"scale=1440:1080")); + else if (in_format.height == 720) + filter = u8(append_filter(u16(filter), L"scale=960:720")); + + break; + } +} + +void setup_codec_defaults(AVCodecContext& encoder) +{ + static const int MEGABIT = 1000000; + + switch (encoder.codec_id) + { + case AV_CODEC_ID_DNXHD: + encoder.bit_rate = 220 * MEGABIT; + + break; + case AV_CODEC_ID_PRORES: + encoder.bit_rate = encoder.width < 1280 + ? 63 * MEGABIT + : 220 * MEGABIT; + + break; + case AV_CODEC_ID_H264: + av_opt_set(encoder.priv_data, "preset", "ultrafast", 0); + av_opt_set(encoder.priv_data, "tune", "fastdecode", 0); + av_opt_set(encoder.priv_data, "crf", "5", 0); + + break; + } +} + +bool is_pcm_s24le_not_supported(const AVFormatContext& container) +{ + auto name = std::string(container.oformat->name); + + if (name == "mp4" || name == "dv") + return true; + + return false; +} + +class ffmpeg_consumer { public: // Static Members - + private: + const spl::shared_ptr graph_; core::monitor::subject subject_; - boost::filesystem::path path_; - int consumer_index_offset_; + std::string path_; + boost::filesystem::path full_path_; std::map options_; - + core::video_format_desc in_video_format_; + core::audio_channel_layout in_channel_layout_ = core::audio_channel_layout::invalid(); std::shared_ptr oc_; tbb::atomic abort_request_; - + std::shared_ptr video_st_; std::shared_ptr audio_st_; - std::int64_t video_pts_; - std::int64_t audio_pts_; - - AVFilterContext* audio_graph_in_; - AVFilterContext* audio_graph_out_; - std::shared_ptr audio_graph_; - std::shared_ptr audio_bitstream_filter_; - - AVFilterContext* video_graph_in_; - AVFilterContext* video_graph_out_; - std::shared_ptr video_graph_; - std::shared_ptr video_bitstream_filter_; - - executor executor_; + std::int64_t video_pts_ = 0; + std::int64_t audio_pts_ = 0; + + AVFilterContext* audio_graph_in_; + AVFilterContext* audio_graph_out_; + std::shared_ptr audio_graph_; + + AVFilterContext* video_graph_in_; + AVFilterContext* video_graph_out_; + std::shared_ptr video_graph_; executor video_encoder_executor_; executor audio_encoder_executor_; - tbb::atomic tokens_; - boost::mutex tokens_mutex_; - boost::condition_variable tokens_cond_; + semaphore tokens_ { 0 }; + tbb::atomic current_encoding_delay_; executor write_executor_; - + public: - streaming_consumer( - std::string path, + ffmpeg_consumer( + std::string path, std::string options) : path_(path) - , consumer_index_offset_(crc16(path)) - , video_pts_(0) - , audio_pts_(0) - , executor_(print()) + , full_path_(path) , audio_encoder_executor_(print() + L" audio_encoder") , video_encoder_executor_(print() + L" video_encoder") , write_executor_(print() + L" io") - { + { abort_request_ = false; current_encoding_delay_ = 0; - for(auto it = + for(auto it = boost::sregex_iterator( - options.begin(), - options.end(), - boost::regex("-(?[^-\\s]+)(\\s+(?[^\\s]+))?")); - it != boost::sregex_iterator(); + options.begin(), + options.end(), + boost::regex("-(?[^-\\s]+)(\\s+(?[^\\s]+))?")); + it != boost::sregex_iterator(); ++it) - { + { options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : ""; } - + if (options_.find("threads") == options_.end()) options_["threads"] = "auto"; - tokens_ = + tokens_.release( std::max( - 1, + 1, try_remove_arg( - options_, - boost::regex("tokens")).get_value_or(2)); + options_, + boost::regex("tokens")).get_value_or(2))); } - - ~streaming_consumer() + + ~ffmpeg_consumer() { if(oc_) { @@ -187,119 +258,113 @@ public: void initialize( const core::video_format_desc& format_desc, - int channel_index) override + const core::audio_channel_layout& channel_layout) { try - { + { static boost::regex prot_exp("^.+:.*" ); - - const auto overwrite = - try_remove_arg( - options_, - boost::regex("y")) != nullptr; if(!boost::regex_match( - path_.string(), + path_, prot_exp)) { - if(!path_.is_complete()) + if(!full_path_.is_complete()) { - path_ = + full_path_ = u8( - env::media_folder()) + - path_.string(); - } - - if(boost::filesystem::exists(path_)) - { - if(!overwrite) - BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists")); - - boost::filesystem::remove(path_); + env::media_folder()) + + path_; } + + if(boost::filesystem::exists(full_path_)) + boost::filesystem::remove(full_path_); + + boost::filesystem::create_directories(full_path_.parent_path()); } - - const auto oformat_name = + + graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f)); + graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f)); + graph_->set_text(print()); + diagnostics::register_graph(graph_); + + const auto oformat_name = try_remove_arg( - options_, + options_, boost::regex("^f|format$")); - + AVFormatContext* oc; FF(avformat_alloc_output_context2( - &oc, - nullptr, - oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, - path_.string().c_str())); + &oc, + nullptr, + oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, + full_path_.string().c_str())); oc_.reset( - oc, + oc, avformat_free_context); - + CASPAR_VERIFY(oc_->oformat); - oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb; - oc_->interrupt_callback.opaque = this; + oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb; + oc_->interrupt_callback.opaque = this; CASPAR_VERIFY(format_desc.format != core::video_format::invalid); in_video_format_ = format_desc; - + in_channel_layout_ = channel_layout; + CASPAR_VERIFY(oc_->oformat); - - const auto video_codec_name = + + const auto video_codec_name = try_remove_arg( - options_, + options_, boost::regex("^c:v|codec:v|vcodec$")); - const auto video_codec = - video_codec_name + const auto video_codec = + video_codec_name ? avcodec_find_encoder_by_name(video_codec_name->c_str()) : avcodec_find_encoder(oc_->oformat->video_codec); - - const auto audio_codec_name = + + const auto audio_codec_name = try_remove_arg( - options_, + options_, boost::regex("^c:a|codec:a|acodec$")); - - const auto audio_codec = - audio_codec_name + + const auto audio_codec = + audio_codec_name ? avcodec_find_encoder_by_name(audio_codec_name->c_str()) - : avcodec_find_encoder(oc_->oformat->audio_codec); - + : (is_pcm_s24le_not_supported(*oc_) + ? avcodec_find_encoder(oc_->oformat->audio_codec) + : avcodec_find_encoder_by_name("pcm_s24le")); + if (!video_codec) - BOOST_THROW_EXCEPTION(caspar_exception() << msg_info( + CASPAR_THROW_EXCEPTION(user_error() << msg_info( "Failed to find video codec " + (video_codec_name ? *video_codec_name : "with id " + boost::lexical_cast( oc_->oformat->video_codec)))); if (!audio_codec) - BOOST_THROW_EXCEPTION(caspar_exception() << msg_info( + CASPAR_THROW_EXCEPTION(user_error() << msg_info( "Failed to find audio codec " + (audio_codec_name ? *audio_codec_name : "with id " + boost::lexical_cast( oc_->oformat->audio_codec)))); - + // Filters { configure_video_filters( - *video_codec, - try_remove_arg(options_, + *video_codec, + try_remove_arg(options_, boost::regex("vf|f:v|filter:v")).get_value_or("")); configure_audio_filters( - *audio_codec, + *audio_codec, try_remove_arg(options_, boost::regex("af|f:a|filter:a")).get_value_or("")); } - // Bistream Filters - { - configue_audio_bistream_filters(options_); - configue_video_bistream_filters(options_); - } - // Encoders { @@ -307,11 +372,11 @@ public: auto audio_options = options_; video_st_ = open_encoder( - *video_codec, + *video_codec, video_options); audio_st_ = open_encoder( - *audio_codec, + *audio_codec, audio_options); auto it = options_.begin(); @@ -329,7 +394,7 @@ public: AVDictionary* av_opts = nullptr; to_dict( - &av_opts, + &av_opts, std::move(options_)); CASPAR_SCOPE_EXIT @@ -337,37 +402,37 @@ public: av_dict_free(&av_opts); }; - if (!(oc_->oformat->flags & AVFMT_NOFILE)) + if (!(oc_->oformat->flags & AVFMT_NOFILE)) { FF(avio_open2( - &oc_->pb, - path_.string().c_str(), - AVIO_FLAG_WRITE, - &oc_->interrupt_callback, + &oc_->pb, + full_path_.string().c_str(), + AVIO_FLAG_WRITE, + &oc_->interrupt_callback, &av_opts)); } - + FF(avformat_write_header( - oc_.get(), + oc_.get(), &av_opts)); - + options_ = to_map(av_opts); } // Dump Info - + av_dump_format( - oc_.get(), - 0, - oc_->filename, - 1); + oc_.get(), + 0, + oc_->filename, + 1); for (const auto& option : options_) { - CASPAR_LOG(warning) - << L"Invalid option: -" - << u16(option.first) - << L" " + CASPAR_LOG(warning) + << L"Invalid option: -" + << u16(option.first) + << L" " << u16(option.second); } } @@ -380,81 +445,58 @@ public: } } - core::monitor::subject& monitor_output() override + core::monitor::subject& monitor_output() { return subject_; } - std::wstring name() const override + void send(core::const_frame frame) { - return L"streaming"; - } - - std::future send(core::const_frame frame) override - { CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid); - - --tokens_; + + auto frame_timer = spl::make_shared(); + std::shared_ptr token( - nullptr, - [this, frame](void*) + nullptr, + [this, frame, frame_timer](void*) { - ++tokens_; - tokens_cond_.notify_one(); + tokens_.release(); current_encoding_delay_ = frame.get_age_millis(); + graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5); }); + tokens_.acquire(); - return executor_.begin_invoke([=]() -> bool + video_encoder_executor_.begin_invoke([=]() mutable { - boost::unique_lock tokens_lock(tokens_mutex_); - - while(tokens_ < 0) - tokens_cond_.wait(tokens_lock); - - video_encoder_executor_.begin_invoke([=]() mutable - { - encode_video( - frame, - token); - }); - - audio_encoder_executor_.begin_invoke([=]() mutable - { - encode_audio( - frame, - token); - }); - - return true; + encode_video( + frame, + token); }); - } - std::wstring print() const override - { - return L"streaming_consumer[" + u16(path_.string()) + L"]"; - } - - virtual boost::property_tree::wptree info() const override - { - return boost::property_tree::wptree(); + audio_encoder_executor_.begin_invoke([=]() mutable + { + encode_audio( + frame, + token); + }); } - bool has_synchronization_clock() const override + bool ready_for_frame() const { - return false; + return tokens_.permits() > 0; } - int buffer_depth() const override + void mark_dropped() { - return -1; + graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame"); } - int index() const override + std::wstring print() const { - return 100000 + consumer_index_offset_; + return L"ffmpeg_consumer[" + u16(path_) + L"]"; } - int64_t presentation_frame_age_millis() const override + int64_t presentation_frame_age_millis() const { return current_encoding_delay_; } @@ -464,66 +506,69 @@ private: static int interrupt_cb(void* ctx) { CASPAR_ASSERT(ctx); - return reinterpret_cast(ctx)->abort_request_; + return reinterpret_cast(ctx)->abort_request_; } - + std::shared_ptr open_encoder( const AVCodec& codec, std::map& options) - { - auto st = + { + auto st = avformat_new_stream( - oc_.get(), + oc_.get(), &codec); - if (!st) - BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream")); + if (!st) + CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream")); auto enc = st->codec; - + CASPAR_VERIFY(enc); - + switch(enc->codec_type) { case AVMEDIA_TYPE_VIDEO: - { - enc->time_base = video_graph_out_->inputs[0]->time_base; - enc->pix_fmt = static_cast(video_graph_out_->inputs[0]->format); - enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio; - enc->width = video_graph_out_->inputs[0]->w; - enc->height = video_graph_out_->inputs[0]->h; - + { + enc->time_base = video_graph_out_->inputs[0]->time_base; + enc->pix_fmt = static_cast(video_graph_out_->inputs[0]->format); + enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio; + enc->width = video_graph_out_->inputs[0]->w; + enc->height = video_graph_out_->inputs[0]->h; + enc->bit_rate_tolerance = 400 * 1000000; + break; } case AVMEDIA_TYPE_AUDIO: { - enc->time_base = audio_graph_out_->inputs[0]->time_base; - enc->sample_fmt = static_cast(audio_graph_out_->inputs[0]->format); - enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate; - enc->channel_layout = audio_graph_out_->inputs[0]->channel_layout; - enc->channels = audio_graph_out_->inputs[0]->channels; - + enc->time_base = audio_graph_out_->inputs[0]->time_base; + enc->sample_fmt = static_cast(audio_graph_out_->inputs[0]->format); + enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate; + enc->channel_layout = audio_graph_out_->inputs[0]->channel_layout; + enc->channels = audio_graph_out_->inputs[0]->channels; + break; } } - + + setup_codec_defaults(*enc); + if(oc_->oformat->flags & AVFMT_GLOBALHEADER) enc->flags |= CODEC_FLAG_GLOBAL_HEADER; - + static const std::array char_id_map = {{"v", "a", "d", "s"}}; const auto char_id = char_id_map.at(enc->codec_type); - - const auto codec_opts = + + const auto codec_opts = remove_options( - options, + options, boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$")); - + AVDictionary* av_codec_opts = nullptr; to_dict( - &av_codec_opts, + &av_codec_opts, options); to_dict( @@ -531,19 +576,19 @@ private: codec_opts); options.clear(); - + FF(avcodec_open2( - enc, - &codec, - av_codec_opts ? &av_codec_opts : nullptr)); + enc, + &codec, + av_codec_opts ? &av_codec_opts : nullptr)); if(av_codec_opts) { - auto t = + auto t = av_dict_get( - av_codec_opts, - "", - nullptr, + av_codec_opts, + "", + nullptr, AV_DICT_IGNORE_SUFFIX); while(t) @@ -551,91 +596,39 @@ private: options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value; t = av_dict_get( - av_codec_opts, - "", - t, + av_codec_opts, + "", + t, AV_DICT_IGNORE_SUFFIX); } av_dict_free(&av_codec_opts); } - + if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE)) { CASPAR_ASSERT(enc->frame_size > 0); - av_buffersink_set_frame_size(audio_graph_out_, + av_buffersink_set_frame_size(audio_graph_out_, enc->frame_size); } - + return std::shared_ptr(st, [this](AVStream* st) { avcodec_close(st->codec); }); } - void configue_audio_bistream_filters( - std::map& options) - { - const auto audio_bitstream_filter_str = - try_remove_arg( - options, - boost::regex("^bsf:a|absf$")); - - const auto audio_bitstream_filter = - audio_bitstream_filter_str - ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) - : nullptr; - - CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter); - - if(audio_bitstream_filter) - { - audio_bitstream_filter_.reset( - audio_bitstream_filter, - av_bitstream_filter_close); - } - - if(audio_bitstream_filter_str && !audio_bitstream_filter_) - options["bsf:a"] = *audio_bitstream_filter_str; - } - - void configue_video_bistream_filters( - std::map& options) - { - const auto video_bitstream_filter_str = - try_remove_arg( - options, - boost::regex("^bsf:v|vbsf$")); - - const auto video_bitstream_filter = - video_bitstream_filter_str - ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) - : nullptr; - - CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter); - - if(video_bitstream_filter) - { - video_bitstream_filter_.reset( - video_bitstream_filter, - av_bitstream_filter_close); - } - - if(video_bitstream_filter_str && !video_bitstream_filter_) - options["bsf:v"] = *video_bitstream_filter_str; - } - void configure_video_filters( const AVCodec& codec, - const std::string& filtergraph) + std::string filtergraph) { video_graph_.reset( - avfilter_graph_alloc(), + avfilter_graph_alloc(), [](AVFilterGraph* p) { avfilter_graph_free(&p); }); - + video_graph_->nb_threads = boost::thread::hardware_concurrency()/2; video_graph_->thread_type = AVFILTER_THREAD_SLICE; @@ -646,57 +639,59 @@ private: boost::rational( in_video_format_.width, in_video_format_.height); - + const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%") % in_video_format_.width % in_video_format_.height - % AV_PIX_FMT_BGRA + % AVPixelFormat::AV_PIX_FMT_BGRA % in_video_format_.duration % in_video_format_.time_scale % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator() % in_video_format_.time_scale % in_video_format_.duration).str(); - - AVFilterContext* filt_vsrc = nullptr; + + AVFilterContext* filt_vsrc = nullptr; FF(avfilter_graph_create_filter( &filt_vsrc, - avfilter_get_by_name("buffer"), + avfilter_get_by_name("buffer"), "ffmpeg_consumer_buffer", - vsrc_options.c_str(), - nullptr, + vsrc_options.c_str(), + nullptr, video_graph_.get())); - + AVFilterContext* filt_vsink = nullptr; FF(avfilter_graph_create_filter( &filt_vsink, - avfilter_get_by_name("buffersink"), + avfilter_get_by_name("buffersink"), "ffmpeg_consumer_buffersink", - nullptr, - nullptr, + nullptr, + nullptr, video_graph_.get())); - + #pragma warning (push) #pragma warning (disable : 4245) FF(av_opt_set_int_list( - filt_vsink, - "pix_fmts", - codec.pix_fmts, + filt_vsink, + "pix_fmts", + codec.pix_fmts, -1, AV_OPT_SEARCH_CHILDREN)); #pragma warning (pop) - + + adjust_video_filter(codec, in_video_format_, filt_vsink, filtergraph); + configure_filtergraph( - *video_graph_, + *video_graph_, filtergraph, *filt_vsrc, *filt_vsink); video_graph_in_ = filt_vsrc; video_graph_out_ = filt_vsink; - + CASPAR_LOG(info) - << u16(std::string("\n") + << u16(std::string("\n") + avfilter_graph_dump( - video_graph_.get(), + video_graph_.get(), nullptr)); } @@ -705,88 +700,88 @@ private: const std::string& filtergraph) { audio_graph_.reset( - avfilter_graph_alloc(), + avfilter_graph_alloc(), [](AVFilterGraph* p) { avfilter_graph_free(&p); }); - + audio_graph_->nb_threads = boost::thread::hardware_concurrency()/2; audio_graph_->thread_type = AVFILTER_THREAD_SLICE; - + const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%") % in_video_format_.audio_sample_rate % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32) - % in_video_format_.audio_channels + % in_channel_layout_.num_channels % 1 % in_video_format_.audio_sample_rate % boost::io::group( - std::hex, - std::showbase, - av_get_default_channel_layout(in_video_format_.audio_channels))).str(); + std::hex, + std::showbase, + av_get_default_channel_layout(in_channel_layout_.num_channels))).str(); AVFilterContext* filt_asrc = nullptr; FF(avfilter_graph_create_filter( &filt_asrc, - avfilter_get_by_name("abuffer"), + avfilter_get_by_name("abuffer"), "ffmpeg_consumer_abuffer", - asrc_options.c_str(), - nullptr, + asrc_options.c_str(), + nullptr, audio_graph_.get())); - + AVFilterContext* filt_asink = nullptr; FF(avfilter_graph_create_filter( &filt_asink, - avfilter_get_by_name("abuffersink"), + avfilter_get_by_name("abuffersink"), "ffmpeg_consumer_abuffersink", - nullptr, - nullptr, + nullptr, + nullptr, audio_graph_.get())); - + #pragma warning (push) #pragma warning (disable : 4245) FF(av_opt_set_int( - filt_asink, + filt_asink, "all_channel_counts", - 1, + 1, AV_OPT_SEARCH_CHILDREN)); FF(av_opt_set_int_list( - filt_asink, - "sample_fmts", - codec.sample_fmts, - -1, + filt_asink, + "sample_fmts", + codec.sample_fmts, + -1, AV_OPT_SEARCH_CHILDREN)); FF(av_opt_set_int_list( filt_asink, - "channel_layouts", - codec.channel_layouts, - -1, + "channel_layouts", + codec.channel_layouts, + -1, AV_OPT_SEARCH_CHILDREN)); FF(av_opt_set_int_list( - filt_asink, - "sample_rates" , - codec.supported_samplerates, - -1, + filt_asink, + "sample_rates" , + codec.supported_samplerates, + -1, AV_OPT_SEARCH_CHILDREN)); #pragma warning (pop) - + configure_filtergraph( - *audio_graph_, - filtergraph, - *filt_asrc, + *audio_graph_, + filtergraph, + *filt_asrc, *filt_asink); audio_graph_in_ = filt_asrc; audio_graph_out_ = filt_asink; - CASPAR_LOG(info) - << u16(std::string("\n") + CASPAR_LOG(info) + << u16(std::string("\n") + avfilter_graph_dump( - audio_graph_.get(), + audio_graph_.get(), nullptr)); } @@ -799,119 +794,110 @@ private: AVFilterInOut* outputs = nullptr; AVFilterInOut* inputs = nullptr; - try + if(!filtergraph.empty()) { - if(!filtergraph.empty()) - { - outputs = avfilter_inout_alloc(); - inputs = avfilter_inout_alloc(); + outputs = avfilter_inout_alloc(); + inputs = avfilter_inout_alloc(); + try + { CASPAR_VERIFY(outputs && inputs); - outputs->name = av_strdup("in"); - outputs->filter_ctx = &source_ctx; - outputs->pad_idx = 0; - outputs->next = nullptr; + outputs->name = av_strdup("in"); + outputs->filter_ctx = &source_ctx; + outputs->pad_idx = 0; + outputs->next = nullptr; - inputs->name = av_strdup("out"); - inputs->filter_ctx = &sink_ctx; - inputs->pad_idx = 0; - inputs->next = nullptr; + inputs->name = av_strdup("out"); + inputs->filter_ctx = &sink_ctx; + inputs->pad_idx = 0; + inputs->next = nullptr; + } + catch (...) + { + avfilter_inout_free(&outputs); + avfilter_inout_free(&inputs); + throw; + } - FF(avfilter_graph_parse( - &graph, - filtergraph.c_str(), + FF(avfilter_graph_parse( + &graph, + filtergraph.c_str(), inputs, outputs, nullptr)); - } - else - { - FF(avfilter_link( - &source_ctx, - 0, - &sink_ctx, - 0)); - } - - FF(avfilter_graph_config( - &graph, - nullptr)); } - catch(...) + else { - avfilter_inout_free(&outputs); - avfilter_inout_free(&inputs); - throw; + FF(avfilter_link( + &source_ctx, + 0, + &sink_ctx, + 0)); } + + FF(avfilter_graph_config( + &graph, + nullptr)); } - + void encode_video(core::const_frame frame_ptr, std::shared_ptr token) - { + { if(!video_st_) return; auto enc = video_st_->codec; - - std::shared_ptr src_av_frame; if(frame_ptr != core::const_frame::empty()) { - src_av_frame.reset( - av_frame_alloc(), - [frame_ptr](AVFrame* frame) - { - av_frame_free(&frame); - }); + auto src_av_frame = create_frame(); - avcodec_get_frame_defaults(src_av_frame.get()); - - const auto sample_aspect_ratio = + const auto sample_aspect_ratio = boost::rational( - in_video_format_.square_width, + in_video_format_.square_width, in_video_format_.square_height) / boost::rational( - in_video_format_.width, + in_video_format_.width, in_video_format_.height); - src_av_frame->format = AV_PIX_FMT_BGRA; - src_av_frame->width = in_video_format_.width; - src_av_frame->height = in_video_format_.height; - src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator(); - src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator(); - src_av_frame->pts = video_pts_; + src_av_frame->format = AVPixelFormat::AV_PIX_FMT_BGRA; + src_av_frame->width = in_video_format_.width; + src_av_frame->height = in_video_format_.height; + src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator(); + src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator(); + src_av_frame->pts = video_pts_; video_pts_ += 1; + subject_ + << core::monitor::message("/frame") % video_pts_ + << core::monitor::message("/path") % path_ + << core::monitor::message("/fps") % in_video_format_.fps; + FF(av_image_fill_arrays( src_av_frame->data, src_av_frame->linesize, frame_ptr.image_data().begin(), - static_cast(src_av_frame->format), - in_video_format_.width, - in_video_format_.height, + static_cast(src_av_frame->format), + in_video_format_.width, + in_video_format_.height, 1)); FF(av_buffersrc_add_frame( - video_graph_in_, + video_graph_in_, src_av_frame.get())); - } + } int ret = 0; while(ret >= 0) { - std::shared_ptr filt_frame( - av_frame_alloc(), - [](AVFrame* p) - { - av_frame_free(&p); - }); + auto filt_frame = create_frame(); ret = av_buffersink_get_frame( - video_graph_out_, + video_graph_out_, filt_frame.get()); - + video_encoder_executor_.begin_invoke([=] { if(ret == AVERROR_EOF) @@ -919,26 +905,25 @@ private: if(enc->codec->capabilities & CODEC_CAP_DELAY) { while(encode_av_frame( - *video_st_, - video_bitstream_filter_.get(), - avcodec_encode_video2, + *video_st_, + avcodec_encode_video2, nullptr, token)) { boost::this_thread::yield(); // TODO: } - } + } } else if(ret != AVERROR(EAGAIN)) { FF_RET(ret, "av_buffersink_get_frame"); - - if (filt_frame->interlaced_frame) + + if (filt_frame->interlaced_frame) { if (enc->codec->id == AV_CODEC_ID_MJPEG) enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB; else enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT; - } + } else enc->field_order = AV_FIELD_PROGRESSIVE; @@ -946,12 +931,11 @@ private: if (!enc->me_threshold) filt_frame->pict_type = AV_PICTURE_TYPE_NONE; - + encode_av_frame( *video_st_, - video_bitstream_filter_.get(), avcodec_encode_video2, - filt_frame, + filt_frame, token); boost::this_thread::yield(); // TODO: @@ -959,27 +943,20 @@ private: }); } } - + void encode_audio(core::const_frame frame_ptr, std::shared_ptr token) - { + { if(!audio_st_) return; - + auto enc = audio_st_->codec; - - std::shared_ptr src_av_frame; if(frame_ptr != core::const_frame::empty()) { - src_av_frame.reset( - av_frame_alloc(), - [](AVFrame* p) - { - av_frame_free(&p); - }); - - src_av_frame->channels = in_video_format_.audio_channels; - src_av_frame->channel_layout = av_get_default_channel_layout(in_video_format_.audio_channels); + auto src_av_frame = create_frame(); + + src_av_frame->channels = in_channel_layout_.num_channels; + src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels); src_av_frame->sample_rate = in_video_format_.audio_sample_rate; src_av_frame->nb_samples = static_cast(frame_ptr.audio_data().size()) / src_av_frame->channels; src_av_frame->format = AV_SAMPLE_FMT_S32; @@ -995,9 +972,9 @@ private: src_av_frame->nb_samples, static_cast(src_av_frame->format), 16)); - + FF(av_buffersrc_add_frame( - audio_graph_in_, + audio_graph_in_, src_av_frame.get())); } @@ -1005,28 +982,22 @@ private: while(ret >= 0) { - std::shared_ptr filt_frame( - av_frame_alloc(), - [](AVFrame* p) - { - av_frame_free(&p); - }); + auto filt_frame = create_frame(); ret = av_buffersink_get_frame( - audio_graph_out_, + audio_graph_out_, filt_frame.get()); - + audio_encoder_executor_.begin_invoke([=] - { + { if(ret == AVERROR_EOF) { if(enc->codec->capabilities & CODEC_CAP_DELAY) { while(encode_av_frame( - *audio_st_, - audio_bitstream_filter_.get(), - avcodec_encode_audio2, - nullptr, + *audio_st_, + avcodec_encode_audio2, + nullptr, token)) { boost::this_thread::yield(); // TODO: @@ -1036,14 +1007,13 @@ private: else if(ret != AVERROR(EAGAIN)) { FF_RET( - ret, + ret, "av_buffersink_get_frame"); encode_av_frame( - *audio_st_, - audio_bitstream_filter_.get(), - avcodec_encode_audio2, - filt_frame, + *audio_st_, + avcodec_encode_audio2, + filt_frame, token); boost::this_thread::yield(); // TODO: @@ -1051,13 +1021,12 @@ private: }); } } - + template bool encode_av_frame( AVStream& st, - AVBitStreamFilterContext* bsfc, - const F& func, - const std::shared_ptr& src_av_frame, + const F& func, + const std::shared_ptr& src_av_frame, std::shared_ptr token) { AVPacket pkt = {}; @@ -1066,102 +1035,46 @@ private: int got_packet = 0; FF(func( - st.codec, - &pkt, - src_av_frame.get(), + st.codec, + &pkt, + src_av_frame.get(), &got_packet)); - + if(!got_packet || pkt.size <= 0) return false; - - pkt.stream_index = st.index; - - if(bsfc) - { - auto new_pkt = pkt; - - auto a = av_bitstream_filter_filter( - bsfc, - st.codec, - nullptr, - &new_pkt.data, - &new_pkt.size, - pkt.data, - pkt.size, - pkt.flags & AV_PKT_FLAG_KEY); - - if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) - { - auto t = reinterpret_cast(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE)); - - if(t) - { - memcpy( - t, - new_pkt.data, - new_pkt.size); - - memset( - t + new_pkt.size, - 0, - FF_INPUT_BUFFER_PADDING_SIZE); - - new_pkt.data = t; - new_pkt.buf = nullptr; - } - else - a = AVERROR(ENOMEM); - } - av_free_packet(&pkt); - - FF_RET( - a, - "av_bitstream_filter_filter"); - - new_pkt.buf = - av_buffer_create( - new_pkt.data, - new_pkt.size, - av_buffer_default_free, - nullptr, - 0); - - CASPAR_VERIFY(new_pkt.buf); + pkt.stream_index = st.index; - pkt = new_pkt; - } - if (pkt.pts != AV_NOPTS_VALUE) { - pkt.pts = + pkt.pts = av_rescale_q( pkt.pts, - st.codec->time_base, + st.codec->time_base, st.time_base); } if (pkt.dts != AV_NOPTS_VALUE) { - pkt.dts = + pkt.dts = av_rescale_q( - pkt.dts, - st.codec->time_base, + pkt.dts, + st.codec->time_base, st.time_base); } - - pkt.duration = + + pkt.duration = static_cast( av_rescale_q( - pkt.duration, + pkt.duration, st.codec->time_base, st.time_base)); write_packet( std::shared_ptr( - new AVPacket(pkt), + new AVPacket(pkt), [](AVPacket* p) { - av_free_packet(p); + av_free_packet(p); delete p; }), token); @@ -1171,22 +1084,22 @@ private: void write_packet( const std::shared_ptr& pkt_ptr, std::shared_ptr token) - { + { write_executor_.begin_invoke([this, pkt_ptr, token]() mutable { FF(av_interleaved_write_frame( - oc_.get(), + oc_.get(), pkt_ptr.get())); - }); - } - + }); + } + template static boost::optional try_remove_arg( - std::map& options, + std::map& options, const boost::regex& expr) { for(auto it = options.begin(); it != options.end(); ++it) - { + { if(boost::regex_search(it->first, expr)) { auto arg = it->second; @@ -1197,22 +1110,22 @@ private: return boost::optional(); } - + static std::map remove_options( - std::map& options, + std::map& options, const boost::regex& expr) { std::map result; - + auto it = options.begin(); while(it != options.end()) - { + { boost::smatch what; if(boost::regex_search(it->first, what, expr)) { result[ - what.size() > 0 && what[1].matched - ? what[1].str() + what.size() > 0 && what[1].matched + ? what[1].str() : it->first] = it->second; it = options.erase(it); } @@ -1222,14 +1135,14 @@ private: return result; } - + static void to_dict(AVDictionary** dest, const std::map& c) - { + { for (const auto& entry : c) { av_dict_set( - dest, - entry.first.c_str(), + dest, + entry.first.c_str(), entry.second.c_str(), 0); } } @@ -1237,18 +1150,18 @@ private: static std::map to_map(AVDictionary* dict) { std::map result; - - for(auto t = dict + + for(auto t = dict ? av_dict_get( - dict, - "", - nullptr, - AV_DICT_IGNORE_SUFFIX) + dict, + "", + nullptr, + AV_DICT_IGNORE_SUFFIX) : nullptr; - t; + t; t = av_dict_get( - dict, - "", + dict, + "", t, AV_DICT_IGNORE_SUFFIX)) { @@ -1259,36 +1172,178 @@ private: } }; +int crc16(const std::string& str) +{ + boost::crc_16_type result; + + result.process_bytes(str.data(), str.length()); + + return result.checksum(); +} + +struct ffmpeg_consumer_proxy : public core::frame_consumer +{ + const std::string path_; + const std::string options_; + const bool separate_key_; + const bool compatibility_mode_; + int consumer_index_offset_; + + std::unique_ptr consumer_; + std::unique_ptr key_only_consumer_; + +public: + + ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode) + : path_(path) + , options_(options) + , separate_key_(separate_key) + , compatibility_mode_(compatibility_mode) + , consumer_index_offset_(crc16(path)) + { + } + + void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override + { + if (consumer_) + CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer.")); + + consumer_.reset(new ffmpeg_consumer(path_, options_)); + consumer_->initialize(format_desc, channel_layout); + + if (separate_key_) + { + boost::filesystem::path fill_file(path_); + auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string()); + auto key_file = without_extension + L"_A" + u16(fill_file.extension().string()); + + key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_)); + key_only_consumer_->initialize(format_desc, channel_layout); + } + } + + int64_t presentation_frame_age_millis() const override + { + return consumer_ ? static_cast(consumer_->presentation_frame_age_millis()) : 0; + } + + std::future send(core::const_frame frame) override + { + bool ready_for_frame = consumer_->ready_for_frame(); + + if (ready_for_frame && separate_key_) + ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame(); + + if (ready_for_frame) + { + consumer_->send(frame); + + if (separate_key_) + key_only_consumer_->send(frame.key_only()); + } + else + { + consumer_->mark_dropped(); + + if (separate_key_) + key_only_consumer_->mark_dropped(); + } + + return make_ready_future(true); + } + + std::wstring print() const override + { + return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]"; + } + + std::wstring name() const override + { + return L"ffmpeg"; + } + + boost::property_tree::wptree info() const override + { + boost::property_tree::wptree info; + info.add(L"type", L"ffmpeg"); + info.add(L"path", u16(path_)); + info.add(L"separate_key", separate_key_); + return info; + } + + bool has_synchronization_clock() const override + { + return false; + } + + int buffer_depth() const override + { + return -1; + } + + int index() const override + { + return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_; + } + + core::monitor::subject& monitor_output() override + { + return consumer_->monitor_output(); + } +}; + +} + void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo) { - sink.short_description(L"For streaming the contents of a channel using FFMpeg."); - sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}"); - sink.para()->text(L"For streaming the contents of a channel using FFMpeg"); + sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg."); + sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}"); + sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg"); sink.definitions() - ->item(L"url", L"The stream URL to create/stream to.") - ->item(L"ffmpeg_paramX", L"A parameter supported by FFMpeg. For example vcodec or acodec etc."); + ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).") + ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.") + ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc."); sink.para()->text(L"Examples:"); - sink.example(L">> ADD 1 STREAM udp://:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd"); + sink.example(L">> ADD 1 FILE output.mov -vcodec prores"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo"); + sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha"); + sink.example(L">> ADD 1 STREAM udp://:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast", + L"for streaming over UDP instead of creating a local file."); } spl::shared_ptr create_streaming_consumer( - const std::vector& params, core::interaction_sink*) -{ - if (params.size() < 1 || params.at(0) != L"STREAM") + const std::vector& params, core::interaction_sink*, std::vector> channels) +{ + if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE"))) return core::frame_consumer::empty(); - auto path = u8(params.at(1)); - auto args = u8(boost::join(params, L" ")); + auto params2 = params; + auto separate_key_it = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY")); + bool separate_key = false; - return spl::make_shared(path, args); + if (separate_key_it != params2.end()) + { + separate_key = true; + params2.erase(separate_key_it); + } + + auto compatibility_mode = boost::iequals(params.at(0), L"FILE"); + auto path = u8(params2.size() > 1 ? params2.at(1) : L""); + auto args = u8(boost::join(params2, L" ")); + + return spl::make_shared(path, args, separate_key, compatibility_mode); } spl::shared_ptr create_preconfigured_streaming_consumer( - const boost::property_tree::wptree& ptree, core::interaction_sink*) -{ - return spl::make_shared( - u8(ptree.get(L"path")), - u8(ptree.get(L"args", L""))); + const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) +{ + return spl::make_shared( + u8(ptree_get(ptree, L"path")), + u8(ptree.get(L"args", L"")), + ptree.get(L"separate-key", false), + false); } }}