]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
[streaming_consumer] Added support for sending recording activity via OSC like in...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7
8 #include <common/except.h>
9 #include <common/executor.h>
10 #include <common/assert.h>
11 #include <common/utf.h>
12 #include <common/future.h>
13 #include <common/diagnostics/graph.h>
14 #include <common/env.h>
15 #include <common/scope_exit.h>
16 #include <common/ptree.h>
17 #include <common/param.h>
18 #include <common/semaphore.h>
19
20 #include <core/consumer/frame_consumer.h>
21 #include <core/frame/frame.h>
22 #include <core/frame/audio_channel_layout.h>
23 #include <core/video_format.h>
24 #include <core/monitor/monitor.h>
25 #include <core/help/help_repository.h>
26 #include <core/help/help_sink.h>
27
28 #include <boost/noncopyable.hpp>
29 #include <boost/rational.hpp>
30 #include <boost/format.hpp>
31 #include <boost/algorithm/string/predicate.hpp>
32 #include <boost/property_tree/ptree.hpp>
33
34 #pragma warning(push)
35 #pragma warning(disable: 4244)
36 #pragma warning(disable: 4245)
37 #include <boost/crc.hpp>
38 #pragma warning(pop)
39
40 #include <tbb/atomic.h>
41 #include <tbb/concurrent_queue.h>
42 #include <tbb/parallel_invoke.h>
43 #include <tbb/parallel_for.h>
44
45 #include <numeric>
46
47 #pragma warning(push)
48 #pragma warning(disable: 4244)
49
50 extern "C"
51 {
52         #define __STDC_CONSTANT_MACROS
53         #define __STDC_LIMIT_MACROS
54         #include <libavformat/avformat.h>
55         #include <libavcodec/avcodec.h>
56         #include <libavutil/avutil.h>
57         #include <libavutil/frame.h>
58         #include <libavutil/opt.h>
59         #include <libavutil/imgutils.h>
60         #include <libavutil/parseutils.h>
61         #include <libavfilter/avfilter.h>
62         #include <libavfilter/buffersink.h>
63         #include <libavfilter/buffersrc.h>
64 }
65
66 #pragma warning(pop)
67
68 namespace caspar { namespace ffmpeg { namespace {
69
70 bool is_pcm_s24le_not_supported(const AVFormatContext& container)
71 {
72         auto name = std::string(container.oformat->name);
73
74         if (name == "mp4" || name == "dv")
75                 return true;
76
77         return false;
78 }
79
80 class ffmpeg_consumer
81 {
82 public:
83         // Static Members
84
85 private:
86         const spl::shared_ptr<diagnostics::graph>       graph_;
87         core::monitor::subject                                          subject_;
88         std::string                                                                     path_;
89         boost::filesystem::path                                         full_path_;
90
91         std::map<std::string, std::string>                      options_;
92
93         core::video_format_desc                                         in_video_format_;
94         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
95
96         std::shared_ptr<AVFormatContext>                        oc_;
97         tbb::atomic<bool>                                                       abort_request_;
98
99         std::shared_ptr<AVStream>                                       video_st_;
100         std::shared_ptr<AVStream>                                       audio_st_;
101
102         std::int64_t                                                            video_pts_                                      = 0;
103         std::int64_t                                                            audio_pts_                                      = 0;
104
105     AVFilterContext*                                                    audio_graph_in_;
106     AVFilterContext*                                                    audio_graph_out_;
107     std::shared_ptr<AVFilterGraph>                              audio_graph_;
108
109     AVFilterContext*                                                    video_graph_in_;
110     AVFilterContext*                                                    video_graph_out_;
111     std::shared_ptr<AVFilterGraph>                              video_graph_;
112
113         executor                                                                        video_encoder_executor_;
114         executor                                                                        audio_encoder_executor_;
115
116         semaphore                                                                       tokens_                                         { 0 };
117
118         tbb::atomic<int64_t>                                            current_encoding_delay_;
119
120         executor                                                                        write_executor_;
121
122 public:
123
124         ffmpeg_consumer(
125                         std::string path,
126                         std::string options)
127                 : path_(path)
128                 , full_path_(path)
129                 , audio_encoder_executor_(print() + L" audio_encoder")
130                 , video_encoder_executor_(print() + L" video_encoder")
131                 , write_executor_(print() + L" io")
132         {
133                 abort_request_ = false;
134                 current_encoding_delay_ = 0;
135
136                 for(auto it =
137                                 boost::sregex_iterator(
138                                         options.begin(),
139                                         options.end(),
140                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
141                         it != boost::sregex_iterator();
142                         ++it)
143                 {
144                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
145                 }
146
147         if (options_.find("threads") == options_.end())
148             options_["threads"] = "auto";
149
150                 tokens_.release(
151                         std::max(
152                                 1,
153                                 try_remove_arg<int>(
154                                         options_,
155                                         boost::regex("tokens")).get_value_or(2)));
156         }
157
158         ~ffmpeg_consumer()
159         {
160                 if(oc_)
161                 {
162                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
163                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
164
165                         video_encoder_executor_.stop();
166                         audio_encoder_executor_.stop();
167                         video_encoder_executor_.join();
168                         audio_encoder_executor_.join();
169
170                         video_graph_.reset();
171                         audio_graph_.reset();
172                         video_st_.reset();
173                         audio_st_.reset();
174
175                         write_packet(nullptr, nullptr);
176
177                         write_executor_.stop();
178                         write_executor_.join();
179
180                         FF(av_write_trailer(oc_.get()));
181
182                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
183                                 avio_close(oc_->pb);
184
185                         oc_.reset();
186                 }
187         }
188
189         void initialize(
190                         const core::video_format_desc& format_desc,
191                         const core::audio_channel_layout& channel_layout)
192         {
193                 try
194                 {
195                         static boost::regex prot_exp("^.+:.*" );
196
197                         if(!boost::regex_match(
198                                         path_,
199                                         prot_exp))
200                         {
201                                 if(!full_path_.is_complete())
202                                 {
203                                         full_path_ =
204                                                 u8(
205                                                         env::media_folder()) +
206                                                         path_;
207                                 }
208
209                                 if(boost::filesystem::exists(full_path_))
210                                         boost::filesystem::remove(full_path_);
211
212                                 boost::filesystem::create_directories(full_path_.parent_path());
213                         }
214
215                         graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
216                         graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
217                         graph_->set_text(print());
218                         diagnostics::register_graph(graph_);
219
220                         const auto oformat_name =
221                                 try_remove_arg<std::string>(
222                                         options_,
223                                         boost::regex("^f|format$"));
224
225                         AVFormatContext* oc;
226
227                         FF(avformat_alloc_output_context2(
228                                 &oc,
229                                 nullptr,
230                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
231                                 full_path_.string().c_str()));
232
233                         oc_.reset(
234                                 oc,
235                                 avformat_free_context);
236
237                         CASPAR_VERIFY(oc_->oformat);
238
239                         oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
240                         oc_->interrupt_callback.opaque   = this;
241
242                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
243
244                         in_video_format_ = format_desc;
245                         in_channel_layout_ = channel_layout;
246
247                         CASPAR_VERIFY(oc_->oformat);
248
249                         const auto video_codec_name =
250                                 try_remove_arg<std::string>(
251                                         options_,
252                                         boost::regex("^c:v|codec:v|vcodec$"));
253
254                         const auto video_codec =
255                                 video_codec_name
256                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
257                                         : avcodec_find_encoder(oc_->oformat->video_codec);
258
259                         const auto audio_codec_name =
260                                 try_remove_arg<std::string>(
261                                         options_,
262                                          boost::regex("^c:a|codec:a|acodec$"));
263
264                         const auto audio_codec =
265                                 audio_codec_name
266                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
267                                         : (is_pcm_s24le_not_supported(*oc_)
268                                                 ? avcodec_find_encoder(oc_->oformat->audio_codec)
269                                                 : avcodec_find_encoder_by_name("pcm_s24le"));
270
271                         if (!video_codec)
272                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
273                                                 "Failed to find video codec " + (video_codec_name
274                                                                 ? *video_codec_name
275                                                                 : "with id " + boost::lexical_cast<std::string>(
276                                                                                 oc_->oformat->video_codec))));
277                         if (!audio_codec)
278                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
279                                                 "Failed to find audio codec " + (audio_codec_name
280                                                                 ? *audio_codec_name
281                                                                 : "with id " + boost::lexical_cast<std::string>(
282                                                                                 oc_->oformat->audio_codec))));
283
284                         // Filters
285
286                         {
287                                 configure_video_filters(
288                                         *video_codec,
289                                         try_remove_arg<std::string>(options_,
290                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
291
292                                 configure_audio_filters(
293                                         *audio_codec,
294                                         try_remove_arg<std::string>(options_,
295                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
296                         }
297
298                         // Encoders
299
300                         {
301                                 auto video_options = options_;
302                                 auto audio_options = options_;
303
304                                 video_st_ = open_encoder(
305                                         *video_codec,
306                                         video_options);
307
308                                 audio_st_ = open_encoder(
309                                         *audio_codec,
310                                         audio_options);
311
312                                 auto it = options_.begin();
313                                 while(it != options_.end())
314                                 {
315                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
316                                                 it = options_.erase(it);
317                                         else
318                                                 ++it;
319                                 }
320                         }
321
322                         // Output
323                         {
324                                 AVDictionary* av_opts = nullptr;
325
326                                 to_dict(
327                                         &av_opts,
328                                         std::move(options_));
329
330                                 CASPAR_SCOPE_EXIT
331                                 {
332                                         av_dict_free(&av_opts);
333                                 };
334
335                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
336                                 {
337                                         FF(avio_open2(
338                                                 &oc_->pb,
339                                                 full_path_.string().c_str(),
340                                                 AVIO_FLAG_WRITE,
341                                                 &oc_->interrupt_callback,
342                                                 &av_opts));
343                                 }
344
345                                 FF(avformat_write_header(
346                                         oc_.get(),
347                                         &av_opts));
348
349                                 options_ = to_map(av_opts);
350                         }
351
352                         // Dump Info
353
354                         av_dump_format(
355                                 oc_.get(),
356                                 0,
357                                 oc_->filename,
358                                 1);
359
360                         for (const auto& option : options_)
361                         {
362                                 CASPAR_LOG(warning)
363                                         << L"Invalid option: -"
364                                         << u16(option.first)
365                                         << L" "
366                                         << u16(option.second);
367                         }
368                 }
369                 catch(...)
370                 {
371                         video_st_.reset();
372                         audio_st_.reset();
373                         oc_.reset();
374                         throw;
375                 }
376         }
377
378         core::monitor::subject& monitor_output()
379         {
380                 return subject_;
381         }
382
383         void send(core::const_frame frame)
384         {
385                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
386
387                 auto frame_timer = spl::make_shared<caspar::timer>();
388
389                 std::shared_ptr<void> token(
390                         nullptr,
391                         [this, frame, frame_timer](void*)
392                         {
393                                 tokens_.release();
394                                 current_encoding_delay_ = frame.get_age_millis();
395                                 graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
396                         });
397                 tokens_.acquire();
398
399                 video_encoder_executor_.begin_invoke([=]() mutable
400                 {
401                         encode_video(
402                                 frame,
403                                 token);
404                 });
405
406                 audio_encoder_executor_.begin_invoke([=]() mutable
407                 {
408                         encode_audio(
409                                 frame,
410                                 token);
411                 });
412         }
413
414         bool ready_for_frame() const
415         {
416                 return tokens_.permits() > 0;
417         }
418
419         void mark_dropped()
420         {
421                 graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
422         }
423
424         std::wstring print() const
425         {
426                 return L"ffmpeg_consumer[" + u16(path_) + L"]";
427         }
428
429         int64_t presentation_frame_age_millis() const
430         {
431                 return current_encoding_delay_;
432         }
433
434 private:
435
436         static int interrupt_cb(void* ctx)
437         {
438                 CASPAR_ASSERT(ctx);
439                 return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
440         }
441
442         std::shared_ptr<AVStream> open_encoder(
443                         const AVCodec& codec,
444                         std::map<std::string,
445                         std::string>& options)
446         {
447                 auto st =
448                         avformat_new_stream(
449                                 oc_.get(),
450                                 &codec);
451
452                 if (!st)
453                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream"));
454
455                 auto enc = st->codec;
456
457                 CASPAR_VERIFY(enc);
458
459                 switch(enc->codec_type)
460                 {
461                         case AVMEDIA_TYPE_VIDEO:
462                         {
463                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
464                                 enc->pix_fmt                                    = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
465                                 enc->sample_aspect_ratio                = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
466                                 enc->width                                      = video_graph_out_->inputs[0]->w;
467                                 enc->height                                     = video_graph_out_->inputs[0]->h;
468                                 enc->bit_rate_tolerance         = 400 * 1000000;
469
470                                 break;
471                         }
472                         case AVMEDIA_TYPE_AUDIO:
473                         {
474                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
475                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
476                                 enc->sample_rate                                = audio_graph_out_->inputs[0]->sample_rate;
477                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
478                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
479
480                                 break;
481                         }
482                 }
483
484                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
485                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
486
487                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
488
489                 const auto char_id = char_id_map.at(enc->codec_type);
490
491                 const auto codec_opts =
492                         remove_options(
493                                 options,
494                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
495
496                 AVDictionary* av_codec_opts = nullptr;
497
498                 to_dict(
499                         &av_codec_opts,
500                         options);
501
502                 to_dict(
503                         &av_codec_opts,
504                         codec_opts);
505
506                 options.clear();
507
508                 FF(avcodec_open2(
509                         enc,
510                         &codec,
511                         av_codec_opts ? &av_codec_opts : nullptr));
512
513                 if(av_codec_opts)
514                 {
515                         auto t =
516                                 av_dict_get(
517                                         av_codec_opts,
518                                         "",
519                                          nullptr,
520                                         AV_DICT_IGNORE_SUFFIX);
521
522                         while(t)
523                         {
524                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
525
526                                 t = av_dict_get(
527                                                 av_codec_opts,
528                                                 "",
529                                                 t,
530                                                 AV_DICT_IGNORE_SUFFIX);
531                         }
532
533                         av_dict_free(&av_codec_opts);
534                 }
535
536                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
537                 {
538                         CASPAR_ASSERT(enc->frame_size > 0);
539                         av_buffersink_set_frame_size(audio_graph_out_,
540                                                                                  enc->frame_size);
541                 }
542
543                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
544                 {
545                         avcodec_close(st->codec);
546                 });
547         }
548
549         void configure_video_filters(
550                         const AVCodec& codec,
551                         const std::string& filtergraph)
552         {
553                 video_graph_.reset(
554                                 avfilter_graph_alloc(),
555                                 [](AVFilterGraph* p)
556                                 {
557                                         avfilter_graph_free(&p);
558                                 });
559
560                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
561                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
562
563                 const auto sample_aspect_ratio =
564                         boost::rational<int>(
565                                         in_video_format_.square_width,
566                                         in_video_format_.square_height) /
567                         boost::rational<int>(
568                                         in_video_format_.width,
569                                         in_video_format_.height);
570
571                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
572                         % in_video_format_.width % in_video_format_.height
573                         % AV_PIX_FMT_BGRA
574                         % in_video_format_.duration     % in_video_format_.time_scale
575                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
576                         % in_video_format_.time_scale % in_video_format_.duration).str();
577
578                 AVFilterContext* filt_vsrc = nullptr;
579                 FF(avfilter_graph_create_filter(
580                                 &filt_vsrc,
581                                 avfilter_get_by_name("buffer"),
582                                 "ffmpeg_consumer_buffer",
583                                 vsrc_options.c_str(),
584                                 nullptr,
585                                 video_graph_.get()));
586
587                 AVFilterContext* filt_vsink = nullptr;
588                 FF(avfilter_graph_create_filter(
589                                 &filt_vsink,
590                                 avfilter_get_by_name("buffersink"),
591                                 "ffmpeg_consumer_buffersink",
592                                 nullptr,
593                                 nullptr,
594                                 video_graph_.get()));
595
596 #pragma warning (push)
597 #pragma warning (disable : 4245)
598
599                 FF(av_opt_set_int_list(
600                                 filt_vsink,
601                                 "pix_fmts",
602                                 codec.pix_fmts,
603                                 -1,
604                                 AV_OPT_SEARCH_CHILDREN));
605
606 #pragma warning (pop)
607
608                 configure_filtergraph(
609                                 *video_graph_,
610                                 filtergraph,
611                                 *filt_vsrc,
612                                 *filt_vsink);
613
614                 video_graph_in_  = filt_vsrc;
615                 video_graph_out_ = filt_vsink;
616
617                 CASPAR_LOG(info)
618                         <<      u16(std::string("\n")
619                                 + avfilter_graph_dump(
620                                                 video_graph_.get(),
621                                                 nullptr));
622         }
623
624         void configure_audio_filters(
625                         const AVCodec& codec,
626                         const std::string& filtergraph)
627         {
628                 audio_graph_.reset(
629                         avfilter_graph_alloc(),
630                         [](AVFilterGraph* p)
631                         {
632                                 avfilter_graph_free(&p);
633                         });
634
635                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
636                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
637
638                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
639                         % in_video_format_.audio_sample_rate
640                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
641                         % in_channel_layout_.num_channels
642                         % 1     % in_video_format_.audio_sample_rate
643                         % boost::io::group(
644                                 std::hex,
645                                 std::showbase,
646                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
647
648                 AVFilterContext* filt_asrc = nullptr;
649                 FF(avfilter_graph_create_filter(
650                         &filt_asrc,
651                         avfilter_get_by_name("abuffer"),
652                         "ffmpeg_consumer_abuffer",
653                         asrc_options.c_str(),
654                         nullptr,
655                         audio_graph_.get()));
656
657                 AVFilterContext* filt_asink = nullptr;
658                 FF(avfilter_graph_create_filter(
659                         &filt_asink,
660                         avfilter_get_by_name("abuffersink"),
661                         "ffmpeg_consumer_abuffersink",
662                         nullptr,
663                         nullptr,
664                         audio_graph_.get()));
665
666 #pragma warning (push)
667 #pragma warning (disable : 4245)
668
669                 FF(av_opt_set_int(
670                         filt_asink,
671                         "all_channel_counts",
672                         1,
673                         AV_OPT_SEARCH_CHILDREN));
674
675                 FF(av_opt_set_int_list(
676                         filt_asink,
677                         "sample_fmts",
678                         codec.sample_fmts,
679                         -1,
680                         AV_OPT_SEARCH_CHILDREN));
681
682                 FF(av_opt_set_int_list(
683                         filt_asink,
684                         "channel_layouts",
685                         codec.channel_layouts,
686                         -1,
687                         AV_OPT_SEARCH_CHILDREN));
688
689                 FF(av_opt_set_int_list(
690                         filt_asink,
691                         "sample_rates" ,
692                         codec.supported_samplerates,
693                         -1,
694                         AV_OPT_SEARCH_CHILDREN));
695
696 #pragma warning (pop)
697
698                 configure_filtergraph(
699                         *audio_graph_,
700                         filtergraph,
701                         *filt_asrc,
702                         *filt_asink);
703
704                 audio_graph_in_  = filt_asrc;
705                 audio_graph_out_ = filt_asink;
706
707                 CASPAR_LOG(info)
708                         <<      u16(std::string("\n")
709                                 + avfilter_graph_dump(
710                                         audio_graph_.get(),
711                                         nullptr));
712         }
713
714         void configure_filtergraph(
715                         AVFilterGraph& graph,
716                         const std::string& filtergraph,
717                         AVFilterContext& source_ctx,
718                         AVFilterContext& sink_ctx)
719         {
720                 AVFilterInOut* outputs = nullptr;
721                 AVFilterInOut* inputs = nullptr;
722
723                 try
724                 {
725                         if(!filtergraph.empty())
726                         {
727                                 outputs = avfilter_inout_alloc();
728                                 inputs  = avfilter_inout_alloc();
729
730                                 CASPAR_VERIFY(outputs && inputs);
731
732                                 outputs->name       = av_strdup("in");
733                                 outputs->filter_ctx = &source_ctx;
734                                 outputs->pad_idx    = 0;
735                                 outputs->next       = nullptr;
736
737                                 inputs->name        = av_strdup("out");
738                                 inputs->filter_ctx  = &sink_ctx;
739                                 inputs->pad_idx     = 0;
740                                 inputs->next        = nullptr;
741
742                                 FF(avfilter_graph_parse(
743                                         &graph,
744                                         filtergraph.c_str(),
745                                         inputs,
746                                         outputs,
747                                         nullptr));
748                         }
749                         else
750                         {
751                                 FF(avfilter_link(
752                                         &source_ctx,
753                                         0,
754                                         &sink_ctx,
755                                         0));
756                         }
757
758                         FF(avfilter_graph_config(
759                                 &graph,
760                                 nullptr));
761                 }
762                 catch(...)
763                 {
764                         avfilter_inout_free(&outputs);
765                         avfilter_inout_free(&inputs);
766                         throw;
767                 }
768         }
769
770         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
771         {
772                 if(!video_st_)
773                         return;
774
775                 auto enc = video_st_->codec;
776
777                 if(frame_ptr != core::const_frame::empty())
778                 {
779                         auto src_av_frame = create_frame();
780
781                         const auto sample_aspect_ratio =
782                                 boost::rational<int>(
783                                         in_video_format_.square_width,
784                                         in_video_format_.square_height) /
785                                 boost::rational<int>(
786                                         in_video_format_.width,
787                                         in_video_format_.height);
788
789                         src_av_frame->format                                            = AV_PIX_FMT_BGRA;
790                         src_av_frame->width                                             = in_video_format_.width;
791                         src_av_frame->height                                            = in_video_format_.height;
792                         src_av_frame->sample_aspect_ratio.num   = sample_aspect_ratio.numerator();
793                         src_av_frame->sample_aspect_ratio.den   = sample_aspect_ratio.denominator();
794                         src_av_frame->pts                                               = video_pts_;
795
796                         video_pts_ += 1;
797
798                         subject_
799                                         << core::monitor::message("/frame")     % video_pts_
800                                         << core::monitor::message("/path")      % path_
801                                         << core::monitor::message("/fps")       % in_video_format_.fps;
802
803                         FF(av_image_fill_arrays(
804                                 src_av_frame->data,
805                                 src_av_frame->linesize,
806                                 frame_ptr.image_data().begin(),
807                                 static_cast<AVPixelFormat>(src_av_frame->format),
808                                 in_video_format_.width,
809                                 in_video_format_.height,
810                                 1));
811
812                         FF(av_buffersrc_add_frame(
813                                 video_graph_in_,
814                                 src_av_frame.get()));
815                 }
816
817                 int ret = 0;
818
819                 while(ret >= 0)
820                 {
821                         auto filt_frame = create_frame();
822
823                         ret = av_buffersink_get_frame(
824                                 video_graph_out_,
825                                 filt_frame.get());
826
827                         video_encoder_executor_.begin_invoke([=]
828                         {
829                                 if(ret == AVERROR_EOF)
830                                 {
831                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
832                                         {
833                                                 while(encode_av_frame(
834                                                                 *video_st_,
835                                                                 avcodec_encode_video2,
836                                                                 nullptr, token))
837                                                 {
838                                                         boost::this_thread::yield(); // TODO:
839                                                 }
840                                         }
841                                 }
842                                 else if(ret != AVERROR(EAGAIN))
843                                 {
844                                         FF_RET(ret, "av_buffersink_get_frame");
845
846                                         if (filt_frame->interlaced_frame)
847                                         {
848                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
849                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
850                                                 else
851                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
852                                         }
853                                         else
854                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
855
856                                         filt_frame->quality = enc->global_quality;
857
858                                         if (!enc->me_threshold)
859                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
860
861                                         encode_av_frame(
862                                                 *video_st_,
863                                                 avcodec_encode_video2,
864                                                 filt_frame,
865                                                 token);
866
867                                         boost::this_thread::yield(); // TODO:
868                                 }
869                         });
870                 }
871         }
872
873         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
874         {
875                 if(!audio_st_)
876                         return;
877
878                 auto enc = audio_st_->codec;
879
880                 if(frame_ptr != core::const_frame::empty())
881                 {
882                         auto src_av_frame = create_frame();
883
884                         src_av_frame->channels           = in_channel_layout_.num_channels;
885                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
886                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
887                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
888                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
889                         src_av_frame->pts                        = audio_pts_;
890
891                         audio_pts_ += src_av_frame->nb_samples;
892
893                         FF(av_samples_fill_arrays(
894                                         src_av_frame->extended_data,
895                                         src_av_frame->linesize,
896                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
897                                         src_av_frame->channels,
898                                         src_av_frame->nb_samples,
899                                         static_cast<AVSampleFormat>(src_av_frame->format),
900                                         16));
901
902                         FF(av_buffersrc_add_frame(
903                                         audio_graph_in_,
904                                         src_av_frame.get()));
905                 }
906
907                 int ret = 0;
908
909                 while(ret >= 0)
910                 {
911                         auto filt_frame = create_frame();
912
913                         ret = av_buffersink_get_frame(
914                                 audio_graph_out_,
915                                 filt_frame.get());
916
917                         audio_encoder_executor_.begin_invoke([=]
918                         {
919                                 if(ret == AVERROR_EOF)
920                                 {
921                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
922                                         {
923                                                 while(encode_av_frame(
924                                                                 *audio_st_,
925                                                                 avcodec_encode_audio2,
926                                                                 nullptr,
927                                                                 token))
928                                                 {
929                                                         boost::this_thread::yield(); // TODO:
930                                                 }
931                                         }
932                                 }
933                                 else if(ret != AVERROR(EAGAIN))
934                                 {
935                                         FF_RET(
936                                                 ret,
937                                                 "av_buffersink_get_frame");
938
939                                         encode_av_frame(
940                                                 *audio_st_,
941                                                 avcodec_encode_audio2,
942                                                 filt_frame,
943                                                 token);
944
945                                         boost::this_thread::yield(); // TODO:
946                                 }
947                         });
948                 }
949         }
950
951         template<typename F>
952         bool encode_av_frame(
953                         AVStream& st,
954                         const F& func,
955                         const std::shared_ptr<AVFrame>& src_av_frame,
956                         std::shared_ptr<void> token)
957         {
958                 AVPacket pkt = {};
959                 av_init_packet(&pkt);
960
961                 int got_packet = 0;
962
963                 FF(func(
964                         st.codec,
965                         &pkt,
966                         src_av_frame.get(),
967                         &got_packet));
968
969                 if(!got_packet || pkt.size <= 0)
970                         return false;
971
972                 pkt.stream_index = st.index;
973
974                 if (pkt.pts != AV_NOPTS_VALUE)
975                 {
976                         pkt.pts =
977                                 av_rescale_q(
978                                         pkt.pts,
979                                         st.codec->time_base,
980                                         st.time_base);
981                 }
982
983                 if (pkt.dts != AV_NOPTS_VALUE)
984                 {
985                         pkt.dts =
986                                 av_rescale_q(
987                                         pkt.dts,
988                                         st.codec->time_base,
989                                         st.time_base);
990                 }
991
992                 pkt.duration =
993                         static_cast<int>(
994                                 av_rescale_q(
995                                         pkt.duration,
996                                         st.codec->time_base, st.time_base));
997
998                 write_packet(
999                         std::shared_ptr<AVPacket>(
1000                                 new AVPacket(pkt),
1001                                 [](AVPacket* p)
1002                                 {
1003                                         av_free_packet(p);
1004                                         delete p;
1005                                 }), token);
1006
1007                 return true;
1008         }
1009
1010         void write_packet(
1011                         const std::shared_ptr<AVPacket>& pkt_ptr,
1012                         std::shared_ptr<void> token)
1013         {
1014                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1015                 {
1016                         FF(av_interleaved_write_frame(
1017                                 oc_.get(),
1018                                 pkt_ptr.get()));
1019                 });
1020         }
1021
1022         template<typename T>
1023         static boost::optional<T> try_remove_arg(
1024                         std::map<std::string, std::string>& options,
1025                         const boost::regex& expr)
1026         {
1027                 for(auto it = options.begin(); it != options.end(); ++it)
1028                 {
1029                         if(boost::regex_search(it->first, expr))
1030                         {
1031                                 auto arg = it->second;
1032                                 options.erase(it);
1033                                 return boost::lexical_cast<T>(arg);
1034                         }
1035                 }
1036
1037                 return boost::optional<T>();
1038         }
1039
1040         static std::map<std::string, std::string> remove_options(
1041                         std::map<std::string, std::string>& options,
1042                         const boost::regex& expr)
1043         {
1044                 std::map<std::string, std::string> result;
1045
1046                 auto it = options.begin();
1047                 while(it != options.end())
1048                 {
1049                         boost::smatch what;
1050                         if(boost::regex_search(it->first, what, expr))
1051                         {
1052                                 result[
1053                                         what.size() > 0 && what[1].matched
1054                                                 ? what[1].str()
1055                                                 : it->first] = it->second;
1056                                 it = options.erase(it);
1057                         }
1058                         else
1059                                 ++it;
1060                 }
1061
1062                 return result;
1063         }
1064
1065         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1066         {
1067                 for (const auto& entry : c)
1068                 {
1069                         av_dict_set(
1070                                 dest,
1071                                 entry.first.c_str(),
1072                                 entry.second.c_str(), 0);
1073                 }
1074         }
1075
1076         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1077         {
1078                 std::map<std::string, std::string> result;
1079
1080                 for(auto t = dict
1081                                 ? av_dict_get(
1082                                         dict,
1083                                         "",
1084                                         nullptr,
1085                                         AV_DICT_IGNORE_SUFFIX)
1086                                 : nullptr;
1087                         t;
1088                         t = av_dict_get(
1089                                 dict,
1090                                 "",
1091                                 t,
1092                                 AV_DICT_IGNORE_SUFFIX))
1093                 {
1094                         result[t->key] = t->value;
1095                 }
1096
1097                 return result;
1098         }
1099 };
1100
1101 int crc16(const std::string& str)
1102 {
1103         boost::crc_16_type result;
1104
1105         result.process_bytes(str.data(), str.length());
1106
1107         return result.checksum();
1108 }
1109
1110 struct ffmpeg_consumer_proxy : public core::frame_consumer
1111 {
1112         const std::string                                       path_;
1113         const std::string                                       options_;
1114         const bool                                                      separate_key_;
1115         const bool                                                      compatibility_mode_;
1116         int                                                                     consumer_index_offset_;
1117
1118         std::unique_ptr<ffmpeg_consumer>        consumer_;
1119         std::unique_ptr<ffmpeg_consumer>        key_only_consumer_;
1120
1121 public:
1122
1123         ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode)
1124                 : path_(path)
1125                 , options_(options)
1126                 , separate_key_(separate_key)
1127                 , compatibility_mode_(compatibility_mode)
1128                 , consumer_index_offset_(crc16(path))
1129         {
1130         }
1131
1132         void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
1133         {
1134                 if (consumer_)
1135                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
1136
1137                 consumer_.reset(new ffmpeg_consumer(path_, options_));
1138                 consumer_->initialize(format_desc, channel_layout);
1139
1140                 if (separate_key_)
1141                 {
1142                         boost::filesystem::path fill_file(path_);
1143                         auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
1144                         auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
1145
1146                         key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_));
1147                         key_only_consumer_->initialize(format_desc, channel_layout);
1148                 }
1149         }
1150
1151         int64_t presentation_frame_age_millis() const override
1152         {
1153                 return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
1154         }
1155
1156         std::future<bool> send(core::const_frame frame) override
1157         {
1158                 bool ready_for_frame = consumer_->ready_for_frame();
1159
1160                 if (ready_for_frame && separate_key_)
1161                         ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
1162
1163                 if (ready_for_frame)
1164                 {
1165                         consumer_->send(frame);
1166
1167                         if (separate_key_)
1168                                 key_only_consumer_->send(frame.key_only());
1169                 }
1170                 else
1171                 {
1172                         consumer_->mark_dropped();
1173
1174                         if (separate_key_)
1175                                 key_only_consumer_->mark_dropped();
1176                 }
1177
1178                 return make_ready_future(true);
1179         }
1180
1181         std::wstring print() const override
1182         {
1183                 return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
1184         }
1185
1186         std::wstring name() const override
1187         {
1188                 return L"ffmpeg";
1189         }
1190
1191         boost::property_tree::wptree info() const override
1192         {
1193                 boost::property_tree::wptree info;
1194                 info.add(L"type", L"ffmpeg");
1195                 info.add(L"path", u16(path_));
1196                 info.add(L"separate_key", separate_key_);
1197                 return info;
1198         }
1199
1200         bool has_synchronization_clock() const override
1201         {
1202                 return false;
1203         }
1204
1205         int buffer_depth() const override
1206         {
1207                 return -1;
1208         }
1209
1210         int index() const override
1211         {
1212                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
1213         }
1214
1215         core::monitor::subject& monitor_output() override
1216         {
1217                 return consumer_->monitor_output();
1218         }
1219 };
1220
1221 }
1222
1223 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1224 {
1225         sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
1226         sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1227         sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
1228         sink.definitions()
1229                 ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
1230                 ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
1231                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1232         sink.para()->text(L"Examples:");
1233         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
1234         sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
1235         sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
1236         sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
1237         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
1238         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
1239                 L"for streaming over UDP instead of creating a local file.");
1240 }
1241
1242 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1243                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1244 {
1245         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1246                 return core::frame_consumer::empty();
1247
1248         auto params2                    = params;
1249         auto separate_key_it    = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
1250         bool separate_key               = false;
1251
1252         if (separate_key_it != params2.end())
1253         {
1254                 separate_key = true;
1255                 params2.erase(separate_key_it);
1256         }
1257
1258         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1259         auto path                               = u8(params2.size() > 1 ? params2.at(1) : L"");
1260         auto args                               = u8(boost::join(params2, L" "));
1261
1262         return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, compatibility_mode);
1263 }
1264
1265 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1266                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1267 {
1268         return spl::make_shared<ffmpeg_consumer_proxy>(
1269                         u8(ptree_get<std::wstring>(ptree, L"path")),
1270                         u8(ptree.get<std::wstring>(L"args", L"")),
1271                         ptree.get<bool>(L"separate-key", false),
1272                         false);
1273 }
1274
1275 }}