#include <common/assert.h>
#include <common/utf.h>
#include <common/future.h>
+#include <common/diagnostics/graph.h>
#include <common/env.h>
#include <common/scope_exit.h>
#include <common/ptree.h>
+#include <common/param.h>
+#include <common/semaphore.h>
#include <core/consumer/frame_consumer.h>
#include <core/frame/frame.h>
#pragma warning(pop)
-namespace caspar { namespace ffmpeg {
+namespace caspar { namespace ffmpeg { namespace {
-int crc16(const std::string& str)
+bool is_pcm_s24le_not_supported(const AVFormatContext& container)
{
- boost::crc_16_type result;
+ auto name = std::string(container.oformat->name);
- result.process_bytes(str.data(), str.length());
+ if (name == "mp4" || name == "dv")
+ return true;
- return result.checksum();
+ return false;
}
-class streaming_consumer final : public core::frame_consumer
+class ffmpeg_consumer
{
public:
// Static Members
private:
+ const spl::shared_ptr<diagnostics::graph> graph_;
core::monitor::subject subject_;
boost::filesystem::path path_;
- int consumer_index_offset_;
std::map<std::string, std::string> options_;
- bool compatibility_mode_;
core::video_format_desc in_video_format_;
core::audio_channel_layout in_channel_layout_ = core::audio_channel_layout::invalid();
AVFilterContext* audio_graph_in_;
AVFilterContext* audio_graph_out_;
std::shared_ptr<AVFilterGraph> audio_graph_;
- std::shared_ptr<AVBitStreamFilterContext> audio_bitstream_filter_;
AVFilterContext* video_graph_in_;
AVFilterContext* video_graph_out_;
std::shared_ptr<AVFilterGraph> video_graph_;
- std::shared_ptr<AVBitStreamFilterContext> video_bitstream_filter_;
-
- executor executor_;
executor video_encoder_executor_;
executor audio_encoder_executor_;
- tbb::atomic<int> tokens_;
- boost::mutex tokens_mutex_;
- boost::condition_variable tokens_cond_;
+ semaphore tokens_ { 0 };
+
tbb::atomic<int64_t> current_encoding_delay_;
executor write_executor_;
public:
- streaming_consumer(
+ ffmpeg_consumer(
std::string path,
- std::string options,
- bool compatibility_mode)
+ std::string options)
: path_(path)
- , consumer_index_offset_(crc16(path))
- , compatibility_mode_(compatibility_mode)
, video_pts_(0)
, audio_pts_(0)
- , executor_(print())
, audio_encoder_executor_(print() + L" audio_encoder")
, video_encoder_executor_(print() + L" video_encoder")
, write_executor_(print() + L" io")
if (options_.find("threads") == options_.end())
options_["threads"] = "auto";
- tokens_ =
+ tokens_.release(
std::max(
1,
try_remove_arg<int>(
options_,
- boost::regex("tokens")).get_value_or(2));
+ boost::regex("tokens")).get_value_or(2)));
}
- ~streaming_consumer()
+ ~ffmpeg_consumer()
{
if(oc_)
{
void initialize(
const core::video_format_desc& format_desc,
- const core::audio_channel_layout& channel_layout,
- int channel_index) override
+ const core::audio_channel_layout& channel_layout)
{
try
{
static boost::regex prot_exp("^.+:.*" );
- const auto overwrite =
- try_remove_arg<std::string>(
- options_,
- boost::regex("y")) != boost::none;
-
if(!boost::regex_match(
path_.string(),
prot_exp))
}
if(boost::filesystem::exists(path_))
- {
- if(!overwrite && !compatibility_mode_)
- BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
-
boost::filesystem::remove(path_);
- }
}
+ graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
+ graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
+ graph_->set_text(print());
+ diagnostics::register_graph(graph_);
+
const auto oformat_name =
try_remove_arg<std::string>(
options_,
CASPAR_VERIFY(oc_->oformat);
- oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
+ oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
oc_->interrupt_callback.opaque = this;
CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
const auto audio_codec =
audio_codec_name
? avcodec_find_encoder_by_name(audio_codec_name->c_str())
- : avcodec_find_encoder(oc_->oformat->audio_codec);
+ : (is_pcm_s24le_not_supported(*oc_)
+ ? avcodec_find_encoder(oc_->oformat->audio_codec)
+ : avcodec_find_encoder_by_name("pcm_s24le"));
if (!video_codec)
CASPAR_THROW_EXCEPTION(user_error() << msg_info(
boost::regex("af|f:a|filter:a")).get_value_or(""));
}
- // Bistream Filters
- {
- configue_audio_bistream_filters(options_);
- configue_video_bistream_filters(options_);
- }
-
// Encoders
{
}
}
- core::monitor::subject& monitor_output() override
+ core::monitor::subject& monitor_output()
{
return subject_;
}
- std::wstring name() const override
- {
- return L"streaming";
- }
-
- std::future<bool> send(core::const_frame frame) override
+ void send(core::const_frame frame)
{
CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
- --tokens_;
+ auto frame_timer = spl::make_shared<caspar::timer>();
+
std::shared_ptr<void> token(
nullptr,
- [this, frame](void*)
+ [this, frame, frame_timer](void*)
{
- ++tokens_;
- tokens_cond_.notify_one();
+ tokens_.release();
current_encoding_delay_ = frame.get_age_millis();
+ graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
});
+ tokens_.acquire();
- return executor_.begin_invoke([=]() -> bool
+ video_encoder_executor_.begin_invoke([=]() mutable
{
- boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
-
- while(tokens_ < 0)
- tokens_cond_.wait(tokens_lock);
-
- video_encoder_executor_.begin_invoke([=]() mutable
- {
- encode_video(
- frame,
- token);
- });
-
- audio_encoder_executor_.begin_invoke([=]() mutable
- {
- encode_audio(
- frame,
- token);
- });
-
- return true;
+ encode_video(
+ frame,
+ token);
});
- }
-
- std::wstring print() const override
- {
- return L"streaming_consumer[" + u16(path_.string()) + L"]";
- }
- virtual boost::property_tree::wptree info() const override
- {
- boost::property_tree::wptree info;
- info.add(L"type", L"stream");
- info.add(L"path", path_.wstring());
- return info;
+ audio_encoder_executor_.begin_invoke([=]() mutable
+ {
+ encode_audio(
+ frame,
+ token);
+ });
}
- bool has_synchronization_clock() const override
+ bool ready_for_frame() const
{
- return false;
+ return tokens_.permits() > 0;
}
- int buffer_depth() const override
+ void mark_dropped()
{
- return -1;
+ graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
}
- int index() const override
+ std::wstring print() const
{
- return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
+ return L"ffmpeg_consumer[" + u16(path_.string()) + L"]";
}
- int64_t presentation_frame_age_millis() const override
+ int64_t presentation_frame_age_millis() const
{
return current_encoding_delay_;
}
static int interrupt_cb(void* ctx)
{
CASPAR_ASSERT(ctx);
- return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;
+ return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
}
std::shared_ptr<AVStream> open_encoder(
&codec);
if (!st)
- CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
+ CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream"));
auto enc = st->codec;
case AVMEDIA_TYPE_VIDEO:
{
enc->time_base = video_graph_out_->inputs[0]->time_base;
- enc->pix_fmt = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
- enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
+ enc->pix_fmt = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
+ enc->sample_aspect_ratio = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
enc->width = video_graph_out_->inputs[0]->w;
enc->height = video_graph_out_->inputs[0]->h;
enc->bit_rate_tolerance = 400 * 1000000;
{
enc->time_base = audio_graph_out_->inputs[0]->time_base;
enc->sample_fmt = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
- enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate;
+ enc->sample_rate = audio_graph_out_->inputs[0]->sample_rate;
enc->channel_layout = audio_graph_out_->inputs[0]->channel_layout;
enc->channels = audio_graph_out_->inputs[0]->channels;
});
}
- void configue_audio_bistream_filters(
- std::map<std::string, std::string>& options)
- {
- const auto audio_bitstream_filter_str =
- try_remove_arg<std::string>(
- options,
- boost::regex("^bsf:a|absf$"));
-
- const auto audio_bitstream_filter =
- audio_bitstream_filter_str
- ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str())
- : nullptr;
-
- CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
-
- if(audio_bitstream_filter)
- {
- audio_bitstream_filter_.reset(
- audio_bitstream_filter,
- av_bitstream_filter_close);
- }
-
- if(audio_bitstream_filter_str && !audio_bitstream_filter_)
- options["bsf:a"] = *audio_bitstream_filter_str;
- }
-
- void configue_video_bistream_filters(
- std::map<std::string, std::string>& options)
- {
- const auto video_bitstream_filter_str =
- try_remove_arg<std::string>(
- options,
- boost::regex("^bsf:v|vbsf$"));
-
- const auto video_bitstream_filter =
- video_bitstream_filter_str
- ? av_bitstream_filter_init(video_bitstream_filter_str->c_str())
- : nullptr;
-
- CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
-
- if(video_bitstream_filter)
- {
- video_bitstream_filter_.reset(
- video_bitstream_filter,
- av_bitstream_filter_close);
- }
-
- if(video_bitstream_filter_str && !video_bitstream_filter_)
- options["bsf:v"] = *video_bitstream_filter_str;
- }
-
void configure_video_filters(
const AVCodec& codec,
const std::string& filtergraph)
in_video_format_.width,
in_video_format_.height);
- src_av_frame->format = AV_PIX_FMT_BGRA;
- src_av_frame->width = in_video_format_.width;
- src_av_frame->height = in_video_format_.height;
- src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
- src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
- src_av_frame->pts = video_pts_;
+ src_av_frame->format = AV_PIX_FMT_BGRA;
+ src_av_frame->width = in_video_format_.width;
+ src_av_frame->height = in_video_format_.height;
+ src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
+ src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
+ src_av_frame->pts = video_pts_;
video_pts_ += 1;
{
while(encode_av_frame(
*video_st_,
- video_bitstream_filter_.get(),
avcodec_encode_video2,
nullptr, token))
{
encode_av_frame(
*video_st_,
- video_bitstream_filter_.get(),
avcodec_encode_video2,
filt_frame,
token);
{
while(encode_av_frame(
*audio_st_,
- audio_bitstream_filter_.get(),
avcodec_encode_audio2,
nullptr,
token))
encode_av_frame(
*audio_st_,
- audio_bitstream_filter_.get(),
avcodec_encode_audio2,
filt_frame,
token);
template<typename F>
bool encode_av_frame(
AVStream& st,
- AVBitStreamFilterContext* bsfc,
const F& func,
const std::shared_ptr<AVFrame>& src_av_frame,
std::shared_ptr<void> token)
pkt.stream_index = st.index;
- if(bsfc)
- {
- auto new_pkt = pkt;
-
- auto a = av_bitstream_filter_filter(
- bsfc,
- st.codec,
- nullptr,
- &new_pkt.data,
- &new_pkt.size,
- pkt.data,
- pkt.size,
- pkt.flags & AV_PKT_FLAG_KEY);
-
- if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct)
- {
- auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
-
- if(t)
- {
- memcpy(
- t,
- new_pkt.data,
- new_pkt.size);
-
- memset(
- t + new_pkt.size,
- 0,
- FF_INPUT_BUFFER_PADDING_SIZE);
-
- new_pkt.data = t;
- new_pkt.buf = nullptr;
- }
- else
- a = AVERROR(ENOMEM);
- }
-
- av_free_packet(&pkt);
-
- FF_RET(
- a,
- "av_bitstream_filter_filter");
-
- new_pkt.buf =
- av_buffer_create(
- new_pkt.data,
- new_pkt.size,
- av_buffer_default_free,
- nullptr,
- 0);
-
- CASPAR_VERIFY(new_pkt.buf);
-
- pkt = new_pkt;
- }
-
if (pkt.pts != AV_NOPTS_VALUE)
{
pkt.pts =
}
};
+int crc16(const std::string& str)
+{
+ boost::crc_16_type result;
+
+ result.process_bytes(str.data(), str.length());
+
+ return result.checksum();
+}
+
+struct ffmpeg_consumer_proxy : public core::frame_consumer
+{
+ const std::string path_;
+ const std::string options_;
+ const bool separate_key_;
+ const bool compatibility_mode_;
+ int consumer_index_offset_;
+
+ std::unique_ptr<ffmpeg_consumer> consumer_;
+ std::unique_ptr<ffmpeg_consumer> key_only_consumer_;
+
+public:
+
+ ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode)
+ : path_(path)
+ , options_(options)
+ , separate_key_(separate_key)
+ , compatibility_mode_(compatibility_mode)
+ , consumer_index_offset_(crc16(path))
+ {
+ }
+
+ void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
+ {
+ if (consumer_)
+ CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
+
+ consumer_.reset(new ffmpeg_consumer(path_, options_));
+ consumer_->initialize(format_desc, channel_layout);
+
+ if (separate_key_)
+ {
+ boost::filesystem::path fill_file(path_);
+ auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
+ auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
+
+ key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_));
+ key_only_consumer_->initialize(format_desc, channel_layout);
+ }
+ }
+
+ int64_t presentation_frame_age_millis() const override
+ {
+ return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
+ }
+
+ std::future<bool> send(core::const_frame frame) override
+ {
+ bool ready_for_frame = consumer_->ready_for_frame();
+
+ if (ready_for_frame && separate_key_)
+ ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
+
+ if (ready_for_frame)
+ {
+ consumer_->send(frame);
+
+ if (separate_key_)
+ key_only_consumer_->send(frame.key_only());
+ }
+ else
+ {
+ consumer_->mark_dropped();
+
+ if (separate_key_)
+ key_only_consumer_->mark_dropped();
+ }
+
+ return make_ready_future(true);
+ }
+
+ std::wstring print() const override
+ {
+ return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
+ }
+
+ std::wstring name() const override
+ {
+ return L"ffmpeg";
+ }
+
+ boost::property_tree::wptree info() const override
+ {
+ boost::property_tree::wptree info;
+ info.add(L"type", L"ffmpeg");
+ info.add(L"path", u16(path_));
+ info.add(L"separate_key", separate_key_);
+ return info;
+ }
+
+ bool has_synchronization_clock() const override
+ {
+ return false;
+ }
+
+ int buffer_depth() const override
+ {
+ return -1;
+ }
+
+ int index() const override
+ {
+ return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
+ }
+
+ core::monitor::subject& monitor_output()
+ {
+ return consumer_->monitor_output();
+ }
+};
+
+}
+
void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
{
- sink.short_description(L"For streaming the contents of a channel using FFmpeg.");
- sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
- sink.para()->text(L"For streaming the contents of a channel using FFmpeg");
+ sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
+ sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
+ sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
sink.definitions()
- ->item(L"url", L"The stream URL to create/stream to.")
+ ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
+ ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
sink.para()->text(L"Examples:");
- sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
+ sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
+ sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
+ sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
+ sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
+ sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
+ sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
+ L"for streaming over UDP instead of creating a local file.");
}
spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
- const std::vector<std::wstring>& params, core::interaction_sink*)
+ const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
{
if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
return core::frame_consumer::empty();
- auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
- auto path = u8(params.size() > 1 ? params.at(1) : L"");
- auto args = u8(boost::join(params, L" "));
+ auto params2 = params;
+ auto separate_key_it = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
+ bool separate_key = false;
+
+ if (separate_key_it != params2.end())
+ {
+ separate_key = true;
+ params2.erase(separate_key_it);
+ }
+
+ auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
+ auto path = u8(params2.size() > 1 ? params2.at(1) : L"");
+ auto args = u8(boost::join(params2, L" "));
- return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
+ return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, compatibility_mode);
}
spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
- const boost::property_tree::wptree& ptree, core::interaction_sink*)
+ const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
{
- return spl::make_shared<streaming_consumer>(
+ return spl::make_shared<ffmpeg_consumer_proxy>(
u8(ptree_get<std::wstring>(ptree, L"path")),
u8(ptree.get<std::wstring>(L"args", L"")),
+ ptree.get<bool>(L"separate-key", false),
false);
}