From: Helge Norberg Date: Tue, 1 Nov 2016 20:17:07 +0000 (+0100) Subject: [streaming_consumer] Started the process of retiring ffmpeg_consumer so that streamin... X-Git-Tag: 2.1.0_Beta1~22 X-Git-Url: https://git.sesse.net/?p=casparcg;a=commitdiff_plain;h=181d938c6b2c4fd014bc420960e51d38c399b1e4 [streaming_consumer] Started the process of retiring ffmpeg_consumer so that streaming_consumer can finally take its place as both FILE and STREAM consumer. --- diff --git a/modules/ffmpeg/consumer/streaming_consumer.cpp b/modules/ffmpeg/consumer/streaming_consumer.cpp index 82fac99ac..d390cb457 100644 --- a/modules/ffmpeg/consumer/streaming_consumer.cpp +++ b/modules/ffmpeg/consumer/streaming_consumer.cpp @@ -10,9 +10,12 @@ #include #include #include +#include #include #include #include +#include +#include #include #include @@ -62,29 +65,19 @@ extern "C" #pragma warning(pop) -namespace caspar { namespace ffmpeg { +namespace caspar { namespace ffmpeg { namespace { -int crc16(const std::string& str) -{ - boost::crc_16_type result; - - result.process_bytes(str.data(), str.length()); - - return result.checksum(); -} - -class streaming_consumer final : public core::frame_consumer +class ffmpeg_consumer { public: // Static Members private: + const spl::shared_ptr graph_; core::monitor::subject subject_; boost::filesystem::path path_; - int consumer_index_offset_; std::map options_; - bool compatibility_mode_; core::video_format_desc in_video_format_; core::audio_channel_layout in_channel_layout_ = core::audio_channel_layout::invalid(); @@ -106,30 +99,23 @@ private: AVFilterContext* video_graph_out_; std::shared_ptr video_graph_; - executor executor_; - executor video_encoder_executor_; executor audio_encoder_executor_; - tbb::atomic tokens_; - boost::mutex tokens_mutex_; - boost::condition_variable tokens_cond_; + semaphore tokens_ { 0 }; + tbb::atomic current_encoding_delay_; executor write_executor_; public: - streaming_consumer( + ffmpeg_consumer( std::string path, - std::string options, - bool compatibility_mode) + std::string options) : path_(path) - , consumer_index_offset_(crc16(path)) - , compatibility_mode_(compatibility_mode) , video_pts_(0) , audio_pts_(0) - , executor_(print()) , audio_encoder_executor_(print() + L" audio_encoder") , video_encoder_executor_(print() + L" video_encoder") , write_executor_(print() + L" io") @@ -151,15 +137,15 @@ public: if (options_.find("threads") == options_.end()) options_["threads"] = "auto"; - tokens_ = + tokens_.release( std::max( 1, try_remove_arg( options_, - boost::regex("tokens")).get_value_or(2)); + boost::regex("tokens")).get_value_or(2))); } - ~streaming_consumer() + ~ffmpeg_consumer() { if(oc_) { @@ -192,18 +178,12 @@ public: void initialize( const core::video_format_desc& format_desc, - const core::audio_channel_layout& channel_layout, - int channel_index) override + const core::audio_channel_layout& channel_layout) { try { static boost::regex prot_exp("^.+:.*" ); - const auto overwrite = - try_remove_arg( - options_, - boost::regex("y")) != boost::none; - if(!boost::regex_match( path_.string(), prot_exp)) @@ -217,14 +197,14 @@ public: } if(boost::filesystem::exists(path_)) - { - if(!overwrite && !compatibility_mode_) - BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists")); - boost::filesystem::remove(path_); - } } + graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f)); + graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f)); + graph_->set_text(print()); + diagnostics::register_graph(graph_); + const auto oformat_name = try_remove_arg( options_, @@ -244,7 +224,7 @@ public: CASPAR_VERIFY(oc_->oformat); - oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb; + oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb; oc_->interrupt_callback.opaque = this; CASPAR_VERIFY(format_desc.format != core::video_format::invalid); @@ -381,84 +361,58 @@ public: } } - core::monitor::subject& monitor_output() override + core::monitor::subject& monitor_output() { return subject_; } - std::wstring name() const override - { - return L"streaming"; - } - - std::future send(core::const_frame frame) override + void send(core::const_frame frame) { CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid); - --tokens_; + auto frame_timer = spl::make_shared(); + std::shared_ptr token( nullptr, - [this, frame](void*) + [this, frame, frame_timer](void*) { - ++tokens_; - tokens_cond_.notify_one(); + tokens_.release(); current_encoding_delay_ = frame.get_age_millis(); + graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5); }); + tokens_.acquire(); - return executor_.begin_invoke([=]() -> bool + video_encoder_executor_.begin_invoke([=]() mutable { - boost::unique_lock tokens_lock(tokens_mutex_); - - while(tokens_ < 0) - tokens_cond_.wait(tokens_lock); - - video_encoder_executor_.begin_invoke([=]() mutable - { - encode_video( - frame, - token); - }); - - audio_encoder_executor_.begin_invoke([=]() mutable - { - encode_audio( - frame, - token); - }); - - return true; + encode_video( + frame, + token); }); - } - - std::wstring print() const override - { - return L"streaming_consumer[" + u16(path_.string()) + L"]"; - } - virtual boost::property_tree::wptree info() const override - { - boost::property_tree::wptree info; - info.add(L"type", L"stream"); - info.add(L"path", path_.wstring()); - return info; + audio_encoder_executor_.begin_invoke([=]() mutable + { + encode_audio( + frame, + token); + }); } - bool has_synchronization_clock() const override + bool ready_for_frame() const { - return false; + return tokens_.permits() > 0; } - int buffer_depth() const override + void mark_dropped() { - return -1; + graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame"); } - int index() const override + std::wstring print() const { - return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_; + return L"ffmpeg_consumer[" + u16(path_.string()) + L"]"; } - int64_t presentation_frame_age_millis() const override + int64_t presentation_frame_age_millis() const { return current_encoding_delay_; } @@ -468,7 +422,7 @@ private: static int interrupt_cb(void* ctx) { CASPAR_ASSERT(ctx); - return reinterpret_cast(ctx)->abort_request_; + return reinterpret_cast(ctx)->abort_request_; } std::shared_ptr open_encoder( @@ -493,8 +447,8 @@ private: case AVMEDIA_TYPE_VIDEO: { enc->time_base = video_graph_out_->inputs[0]->time_base; - enc->pix_fmt = static_cast(video_graph_out_->inputs[0]->format); - enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio; + enc->pix_fmt = static_cast(video_graph_out_->inputs[0]->format); + enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio; enc->width = video_graph_out_->inputs[0]->w; enc->height = video_graph_out_->inputs[0]->h; enc->bit_rate_tolerance = 400 * 1000000; @@ -505,7 +459,7 @@ private: { enc->time_base = audio_graph_out_->inputs[0]->time_base; enc->sample_fmt = static_cast(audio_graph_out_->inputs[0]->format); - enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate; + enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate; enc->channel_layout = audio_graph_out_->inputs[0]->channel_layout; enc->channels = audio_graph_out_->inputs[0]->channels; @@ -1125,16 +1079,145 @@ private: } }; +int crc16(const std::string& str) +{ + boost::crc_16_type result; + + result.process_bytes(str.data(), str.length()); + + return result.checksum(); +} + +struct ffmpeg_consumer_proxy : public core::frame_consumer +{ + const std::string path_; + const std::string options_; + const bool separate_key_; + const bool compatibility_mode_; + int consumer_index_offset_; + + std::unique_ptr consumer_; + std::unique_ptr key_only_consumer_; + +public: + + ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode) + : path_(path) + , options_(options) + , separate_key_(separate_key) + , compatibility_mode_(compatibility_mode) + , consumer_index_offset_(crc16(path)) + { + } + + void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override + { + if (consumer_) + CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer.")); + + consumer_.reset(new ffmpeg_consumer(path_, options_)); + consumer_->initialize(format_desc, channel_layout); + + if (separate_key_) + { + boost::filesystem::path fill_file(path_); + auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string()); + auto key_file = without_extension + L"_A" + u16(fill_file.extension().string()); + + key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_)); + key_only_consumer_->initialize(format_desc, channel_layout); + } + } + + int64_t presentation_frame_age_millis() const override + { + return consumer_ ? static_cast(consumer_->presentation_frame_age_millis()) : 0; + } + + std::future send(core::const_frame frame) override + { + bool ready_for_frame = consumer_->ready_for_frame(); + + if (ready_for_frame && separate_key_) + ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame(); + + if (ready_for_frame) + { + consumer_->send(frame); + + if (separate_key_) + key_only_consumer_->send(frame.key_only()); + } + else + { + consumer_->mark_dropped(); + + if (separate_key_) + key_only_consumer_->mark_dropped(); + } + + return make_ready_future(true); + } + + std::wstring print() const override + { + return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]"; + } + + std::wstring name() const override + { + return L"ffmpeg"; + } + + boost::property_tree::wptree info() const override + { + boost::property_tree::wptree info; + info.add(L"type", L"ffmpeg"); + info.add(L"path", u16(path_)); + info.add(L"separate_key", separate_key_); + return info; + } + + bool has_synchronization_clock() const override + { + return false; + } + + int buffer_depth() const override + { + return -1; + } + + int index() const override + { + return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_; + } + + core::monitor::subject& monitor_output() + { + return consumer_->monitor_output(); + } +}; + +} + void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo) { - sink.short_description(L"For streaming the contents of a channel using FFmpeg."); - sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}"); - sink.para()->text(L"For streaming the contents of a channel using FFmpeg"); + sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg."); + sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}"); + sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg"); sink.definitions() - ->item(L"url", L"The stream URL to create/stream to.") + ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).") + ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.") ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc."); sink.para()->text(L"Examples:"); - sink.example(L">> ADD 1 STREAM udp://:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd"); + sink.example(L">> ADD 1 FILE output.mov -vcodec prores"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo"); + sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25"); + sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha"); + sink.example(L">> ADD 1 STREAM udp://:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast", + L"for streaming over UDP instead of creating a local file."); } spl::shared_ptr create_streaming_consumer( @@ -1143,19 +1226,30 @@ spl::shared_ptr create_streaming_consumer( if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE"))) return core::frame_consumer::empty(); - auto compatibility_mode = boost::iequals(params.at(0), L"FILE"); - auto path = u8(params.size() > 1 ? params.at(1) : L""); - auto args = u8(boost::join(params, L" ")); + auto params2 = params; + auto separate_key_it = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY")); + bool separate_key = false; + + if (separate_key_it != params2.end()) + { + separate_key = true; + params2.erase(separate_key_it); + } + + auto compatibility_mode = boost::iequals(params.at(0), L"FILE"); + auto path = u8(params2.size() > 1 ? params2.at(1) : L""); + auto args = u8(boost::join(params2, L" ")); - return spl::make_shared(path, args, compatibility_mode); + return spl::make_shared(path, args, separate_key, compatibility_mode); } spl::shared_ptr create_preconfigured_streaming_consumer( const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector> channels) { - return spl::make_shared( + return spl::make_shared( u8(ptree_get(ptree, L"path")), u8(ptree.get(L"args", L"")), + ptree.get(L"separate-key", false), false); } diff --git a/shell/casparcg.config b/shell/casparcg.config index 9357721fe..2d9826ffd 100644 --- a/shell/casparcg.config +++ b/shell/casparcg.config @@ -115,6 +115,7 @@ udp://localhost:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast + false [true|false] 1