]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
95fa9c0eb8f23e23d75ee21f58b538eea2fc60d5
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7
8 #include <common/except.h>
9 #include <common/executor.h>
10 #include <common/assert.h>
11 #include <common/utf.h>
12 #include <common/future.h>
13 #include <common/diagnostics/graph.h>
14 #include <common/env.h>
15 #include <common/scope_exit.h>
16 #include <common/ptree.h>
17 #include <common/param.h>
18 #include <common/semaphore.h>
19
20 #include <core/consumer/frame_consumer.h>
21 #include <core/frame/frame.h>
22 #include <core/frame/audio_channel_layout.h>
23 #include <core/video_format.h>
24 #include <core/monitor/monitor.h>
25 #include <core/help/help_repository.h>
26 #include <core/help/help_sink.h>
27
28 #include <boost/noncopyable.hpp>
29 #include <boost/rational.hpp>
30 #include <boost/format.hpp>
31 #include <boost/algorithm/string/predicate.hpp>
32 #include <boost/property_tree/ptree.hpp>
33
34 #pragma warning(push)
35 #pragma warning(disable: 4244)
36 #pragma warning(disable: 4245)
37 #include <boost/crc.hpp>
38 #pragma warning(pop)
39
40 #include <tbb/atomic.h>
41 #include <tbb/concurrent_queue.h>
42 #include <tbb/parallel_invoke.h>
43 #include <tbb/parallel_for.h>
44
45 #include <numeric>
46
47 #pragma warning(push)
48 #pragma warning(disable: 4244)
49
50 extern "C"
51 {
52         #define __STDC_CONSTANT_MACROS
53         #define __STDC_LIMIT_MACROS
54         #include <libavformat/avformat.h>
55         #include <libavcodec/avcodec.h>
56         #include <libavutil/avutil.h>
57         #include <libavutil/frame.h>
58         #include <libavutil/opt.h>
59         #include <libavutil/imgutils.h>
60         #include <libavutil/parseutils.h>
61         #include <libavfilter/avfilter.h>
62         #include <libavfilter/buffersink.h>
63         #include <libavfilter/buffersrc.h>
64 }
65
66 #pragma warning(pop)
67
68 namespace caspar { namespace ffmpeg { namespace {
69
70 bool is_pcm_s24le_not_supported(const AVFormatContext& container)
71 {
72         auto name = std::string(container.oformat->name);
73
74         if (name == "mp4" || name == "dv")
75                 return true;
76
77         return false;
78 }
79
80 class ffmpeg_consumer
81 {
82 public:
83         // Static Members
84
85 private:
86         const spl::shared_ptr<diagnostics::graph>       graph_;
87         core::monitor::subject                                          subject_;
88         boost::filesystem::path                                         path_;
89
90         std::map<std::string, std::string>                      options_;
91
92         core::video_format_desc                                         in_video_format_;
93         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
94
95         std::shared_ptr<AVFormatContext>                        oc_;
96         tbb::atomic<bool>                                                       abort_request_;
97
98         std::shared_ptr<AVStream>                                       video_st_;
99         std::shared_ptr<AVStream>                                       audio_st_;
100
101         std::int64_t                                                            video_pts_;
102         std::int64_t                                                            audio_pts_;
103
104     AVFilterContext*                                                    audio_graph_in_;
105     AVFilterContext*                                                    audio_graph_out_;
106     std::shared_ptr<AVFilterGraph>                              audio_graph_;
107
108     AVFilterContext*                                                    video_graph_in_;
109     AVFilterContext*                                                    video_graph_out_;
110     std::shared_ptr<AVFilterGraph>                              video_graph_;
111
112         executor                                                                        video_encoder_executor_;
113         executor                                                                        audio_encoder_executor_;
114
115         semaphore                                                                       tokens_                                         { 0 };
116
117         tbb::atomic<int64_t>                                            current_encoding_delay_;
118
119         executor                                                                        write_executor_;
120
121 public:
122
123         ffmpeg_consumer(
124                         std::string path,
125                         std::string options)
126                 : path_(path)
127                 , video_pts_(0)
128                 , audio_pts_(0)
129                 , audio_encoder_executor_(print() + L" audio_encoder")
130                 , video_encoder_executor_(print() + L" video_encoder")
131                 , write_executor_(print() + L" io")
132         {
133                 abort_request_ = false;
134                 current_encoding_delay_ = 0;
135
136                 for(auto it =
137                                 boost::sregex_iterator(
138                                         options.begin(),
139                                         options.end(),
140                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
141                         it != boost::sregex_iterator();
142                         ++it)
143                 {
144                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
145                 }
146
147         if (options_.find("threads") == options_.end())
148             options_["threads"] = "auto";
149
150                 tokens_.release(
151                         std::max(
152                                 1,
153                                 try_remove_arg<int>(
154                                         options_,
155                                         boost::regex("tokens")).get_value_or(2)));
156         }
157
158         ~ffmpeg_consumer()
159         {
160                 if(oc_)
161                 {
162                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
163                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
164
165                         video_encoder_executor_.stop();
166                         audio_encoder_executor_.stop();
167                         video_encoder_executor_.join();
168                         audio_encoder_executor_.join();
169
170                         video_graph_.reset();
171                         audio_graph_.reset();
172                         video_st_.reset();
173                         audio_st_.reset();
174
175                         write_packet(nullptr, nullptr);
176
177                         write_executor_.stop();
178                         write_executor_.join();
179
180                         FF(av_write_trailer(oc_.get()));
181
182                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
183                                 avio_close(oc_->pb);
184
185                         oc_.reset();
186                 }
187         }
188
189         void initialize(
190                         const core::video_format_desc& format_desc,
191                         const core::audio_channel_layout& channel_layout)
192         {
193                 try
194                 {
195                         static boost::regex prot_exp("^.+:.*" );
196
197                         if(!boost::regex_match(
198                                         path_.string(),
199                                         prot_exp))
200                         {
201                                 if(!path_.is_complete())
202                                 {
203                                         path_ =
204                                                 u8(
205                                                         env::media_folder()) +
206                                                         path_.string();
207                                 }
208
209                                 if(boost::filesystem::exists(path_))
210                                         boost::filesystem::remove(path_);
211                         }
212
213                         graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
214                         graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
215                         graph_->set_text(print());
216                         diagnostics::register_graph(graph_);
217
218                         const auto oformat_name =
219                                 try_remove_arg<std::string>(
220                                         options_,
221                                         boost::regex("^f|format$"));
222
223                         AVFormatContext* oc;
224
225                         FF(avformat_alloc_output_context2(
226                                 &oc,
227                                 nullptr,
228                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
229                                 path_.string().c_str()));
230
231                         oc_.reset(
232                                 oc,
233                                 avformat_free_context);
234
235                         CASPAR_VERIFY(oc_->oformat);
236
237                         oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
238                         oc_->interrupt_callback.opaque   = this;
239
240                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
241
242                         in_video_format_ = format_desc;
243                         in_channel_layout_ = channel_layout;
244
245                         CASPAR_VERIFY(oc_->oformat);
246
247                         const auto video_codec_name =
248                                 try_remove_arg<std::string>(
249                                         options_,
250                                         boost::regex("^c:v|codec:v|vcodec$"));
251
252                         const auto video_codec =
253                                 video_codec_name
254                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
255                                         : avcodec_find_encoder(oc_->oformat->video_codec);
256
257                         const auto audio_codec_name =
258                                 try_remove_arg<std::string>(
259                                         options_,
260                                          boost::regex("^c:a|codec:a|acodec$"));
261
262                         const auto audio_codec =
263                                 audio_codec_name
264                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
265                                         : (is_pcm_s24le_not_supported(*oc_)
266                                                 ? avcodec_find_encoder(oc_->oformat->audio_codec)
267                                                 : avcodec_find_encoder_by_name("pcm_s24le"));
268
269                         if (!video_codec)
270                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
271                                                 "Failed to find video codec " + (video_codec_name
272                                                                 ? *video_codec_name
273                                                                 : "with id " + boost::lexical_cast<std::string>(
274                                                                                 oc_->oformat->video_codec))));
275                         if (!audio_codec)
276                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
277                                                 "Failed to find audio codec " + (audio_codec_name
278                                                                 ? *audio_codec_name
279                                                                 : "with id " + boost::lexical_cast<std::string>(
280                                                                                 oc_->oformat->audio_codec))));
281
282                         // Filters
283
284                         {
285                                 configure_video_filters(
286                                         *video_codec,
287                                         try_remove_arg<std::string>(options_,
288                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
289
290                                 configure_audio_filters(
291                                         *audio_codec,
292                                         try_remove_arg<std::string>(options_,
293                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
294                         }
295
296                         // Encoders
297
298                         {
299                                 auto video_options = options_;
300                                 auto audio_options = options_;
301
302                                 video_st_ = open_encoder(
303                                         *video_codec,
304                                         video_options);
305
306                                 audio_st_ = open_encoder(
307                                         *audio_codec,
308                                         audio_options);
309
310                                 auto it = options_.begin();
311                                 while(it != options_.end())
312                                 {
313                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
314                                                 it = options_.erase(it);
315                                         else
316                                                 ++it;
317                                 }
318                         }
319
320                         // Output
321                         {
322                                 AVDictionary* av_opts = nullptr;
323
324                                 to_dict(
325                                         &av_opts,
326                                         std::move(options_));
327
328                                 CASPAR_SCOPE_EXIT
329                                 {
330                                         av_dict_free(&av_opts);
331                                 };
332
333                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
334                                 {
335                                         FF(avio_open2(
336                                                 &oc_->pb,
337                                                 path_.string().c_str(),
338                                                 AVIO_FLAG_WRITE,
339                                                 &oc_->interrupt_callback,
340                                                 &av_opts));
341                                 }
342
343                                 FF(avformat_write_header(
344                                         oc_.get(),
345                                         &av_opts));
346
347                                 options_ = to_map(av_opts);
348                         }
349
350                         // Dump Info
351
352                         av_dump_format(
353                                 oc_.get(),
354                                 0,
355                                 oc_->filename,
356                                 1);
357
358                         for (const auto& option : options_)
359                         {
360                                 CASPAR_LOG(warning)
361                                         << L"Invalid option: -"
362                                         << u16(option.first)
363                                         << L" "
364                                         << u16(option.second);
365                         }
366                 }
367                 catch(...)
368                 {
369                         video_st_.reset();
370                         audio_st_.reset();
371                         oc_.reset();
372                         throw;
373                 }
374         }
375
376         core::monitor::subject& monitor_output()
377         {
378                 return subject_;
379         }
380
381         void send(core::const_frame frame)
382         {
383                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
384
385                 auto frame_timer = spl::make_shared<caspar::timer>();
386
387                 std::shared_ptr<void> token(
388                         nullptr,
389                         [this, frame, frame_timer](void*)
390                         {
391                                 tokens_.release();
392                                 current_encoding_delay_ = frame.get_age_millis();
393                                 graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
394                         });
395                 tokens_.acquire();
396
397                 video_encoder_executor_.begin_invoke([=]() mutable
398                 {
399                         encode_video(
400                                 frame,
401                                 token);
402                 });
403
404                 audio_encoder_executor_.begin_invoke([=]() mutable
405                 {
406                         encode_audio(
407                                 frame,
408                                 token);
409                 });
410         }
411
412         bool ready_for_frame() const
413         {
414                 return tokens_.permits() > 0;
415         }
416
417         void mark_dropped()
418         {
419                 graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
420         }
421
422         std::wstring print() const
423         {
424                 return L"ffmpeg_consumer[" + u16(path_.string()) + L"]";
425         }
426
427         int64_t presentation_frame_age_millis() const
428         {
429                 return current_encoding_delay_;
430         }
431
432 private:
433
434         static int interrupt_cb(void* ctx)
435         {
436                 CASPAR_ASSERT(ctx);
437                 return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
438         }
439
440         std::shared_ptr<AVStream> open_encoder(
441                         const AVCodec& codec,
442                         std::map<std::string,
443                         std::string>& options)
444         {
445                 auto st =
446                         avformat_new_stream(
447                                 oc_.get(),
448                                 &codec);
449
450                 if (!st)
451                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream"));
452
453                 auto enc = st->codec;
454
455                 CASPAR_VERIFY(enc);
456
457                 switch(enc->codec_type)
458                 {
459                         case AVMEDIA_TYPE_VIDEO:
460                         {
461                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
462                                 enc->pix_fmt                                    = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
463                                 enc->sample_aspect_ratio                = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
464                                 enc->width                                      = video_graph_out_->inputs[0]->w;
465                                 enc->height                                     = video_graph_out_->inputs[0]->h;
466                                 enc->bit_rate_tolerance         = 400 * 1000000;
467
468                                 break;
469                         }
470                         case AVMEDIA_TYPE_AUDIO:
471                         {
472                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
473                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
474                                 enc->sample_rate                                = audio_graph_out_->inputs[0]->sample_rate;
475                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
476                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
477
478                                 break;
479                         }
480                 }
481
482                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
483                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
484
485                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
486
487                 const auto char_id = char_id_map.at(enc->codec_type);
488
489                 const auto codec_opts =
490                         remove_options(
491                                 options,
492                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
493
494                 AVDictionary* av_codec_opts = nullptr;
495
496                 to_dict(
497                         &av_codec_opts,
498                         options);
499
500                 to_dict(
501                         &av_codec_opts,
502                         codec_opts);
503
504                 options.clear();
505
506                 FF(avcodec_open2(
507                         enc,
508                         &codec,
509                         av_codec_opts ? &av_codec_opts : nullptr));
510
511                 if(av_codec_opts)
512                 {
513                         auto t =
514                                 av_dict_get(
515                                         av_codec_opts,
516                                         "",
517                                          nullptr,
518                                         AV_DICT_IGNORE_SUFFIX);
519
520                         while(t)
521                         {
522                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
523
524                                 t = av_dict_get(
525                                                 av_codec_opts,
526                                                 "",
527                                                 t,
528                                                 AV_DICT_IGNORE_SUFFIX);
529                         }
530
531                         av_dict_free(&av_codec_opts);
532                 }
533
534                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
535                 {
536                         CASPAR_ASSERT(enc->frame_size > 0);
537                         av_buffersink_set_frame_size(audio_graph_out_,
538                                                                                  enc->frame_size);
539                 }
540
541                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
542                 {
543                         avcodec_close(st->codec);
544                 });
545         }
546
547         void configure_video_filters(
548                         const AVCodec& codec,
549                         const std::string& filtergraph)
550         {
551                 video_graph_.reset(
552                                 avfilter_graph_alloc(),
553                                 [](AVFilterGraph* p)
554                                 {
555                                         avfilter_graph_free(&p);
556                                 });
557
558                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
559                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
560
561                 const auto sample_aspect_ratio =
562                         boost::rational<int>(
563                                         in_video_format_.square_width,
564                                         in_video_format_.square_height) /
565                         boost::rational<int>(
566                                         in_video_format_.width,
567                                         in_video_format_.height);
568
569                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
570                         % in_video_format_.width % in_video_format_.height
571                         % AV_PIX_FMT_BGRA
572                         % in_video_format_.duration     % in_video_format_.time_scale
573                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
574                         % in_video_format_.time_scale % in_video_format_.duration).str();
575
576                 AVFilterContext* filt_vsrc = nullptr;
577                 FF(avfilter_graph_create_filter(
578                                 &filt_vsrc,
579                                 avfilter_get_by_name("buffer"),
580                                 "ffmpeg_consumer_buffer",
581                                 vsrc_options.c_str(),
582                                 nullptr,
583                                 video_graph_.get()));
584
585                 AVFilterContext* filt_vsink = nullptr;
586                 FF(avfilter_graph_create_filter(
587                                 &filt_vsink,
588                                 avfilter_get_by_name("buffersink"),
589                                 "ffmpeg_consumer_buffersink",
590                                 nullptr,
591                                 nullptr,
592                                 video_graph_.get()));
593
594 #pragma warning (push)
595 #pragma warning (disable : 4245)
596
597                 FF(av_opt_set_int_list(
598                                 filt_vsink,
599                                 "pix_fmts",
600                                 codec.pix_fmts,
601                                 -1,
602                                 AV_OPT_SEARCH_CHILDREN));
603
604 #pragma warning (pop)
605
606                 configure_filtergraph(
607                                 *video_graph_,
608                                 filtergraph,
609                                 *filt_vsrc,
610                                 *filt_vsink);
611
612                 video_graph_in_  = filt_vsrc;
613                 video_graph_out_ = filt_vsink;
614
615                 CASPAR_LOG(info)
616                         <<      u16(std::string("\n")
617                                 + avfilter_graph_dump(
618                                                 video_graph_.get(),
619                                                 nullptr));
620         }
621
622         void configure_audio_filters(
623                         const AVCodec& codec,
624                         const std::string& filtergraph)
625         {
626                 audio_graph_.reset(
627                         avfilter_graph_alloc(),
628                         [](AVFilterGraph* p)
629                         {
630                                 avfilter_graph_free(&p);
631                         });
632
633                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
634                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
635
636                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
637                         % in_video_format_.audio_sample_rate
638                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
639                         % in_channel_layout_.num_channels
640                         % 1     % in_video_format_.audio_sample_rate
641                         % boost::io::group(
642                                 std::hex,
643                                 std::showbase,
644                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
645
646                 AVFilterContext* filt_asrc = nullptr;
647                 FF(avfilter_graph_create_filter(
648                         &filt_asrc,
649                         avfilter_get_by_name("abuffer"),
650                         "ffmpeg_consumer_abuffer",
651                         asrc_options.c_str(),
652                         nullptr,
653                         audio_graph_.get()));
654
655                 AVFilterContext* filt_asink = nullptr;
656                 FF(avfilter_graph_create_filter(
657                         &filt_asink,
658                         avfilter_get_by_name("abuffersink"),
659                         "ffmpeg_consumer_abuffersink",
660                         nullptr,
661                         nullptr,
662                         audio_graph_.get()));
663
664 #pragma warning (push)
665 #pragma warning (disable : 4245)
666
667                 FF(av_opt_set_int(
668                         filt_asink,
669                         "all_channel_counts",
670                         1,
671                         AV_OPT_SEARCH_CHILDREN));
672
673                 FF(av_opt_set_int_list(
674                         filt_asink,
675                         "sample_fmts",
676                         codec.sample_fmts,
677                         -1,
678                         AV_OPT_SEARCH_CHILDREN));
679
680                 FF(av_opt_set_int_list(
681                         filt_asink,
682                         "channel_layouts",
683                         codec.channel_layouts,
684                         -1,
685                         AV_OPT_SEARCH_CHILDREN));
686
687                 FF(av_opt_set_int_list(
688                         filt_asink,
689                         "sample_rates" ,
690                         codec.supported_samplerates,
691                         -1,
692                         AV_OPT_SEARCH_CHILDREN));
693
694 #pragma warning (pop)
695
696                 configure_filtergraph(
697                         *audio_graph_,
698                         filtergraph,
699                         *filt_asrc,
700                         *filt_asink);
701
702                 audio_graph_in_  = filt_asrc;
703                 audio_graph_out_ = filt_asink;
704
705                 CASPAR_LOG(info)
706                         <<      u16(std::string("\n")
707                                 + avfilter_graph_dump(
708                                         audio_graph_.get(),
709                                         nullptr));
710         }
711
712         void configure_filtergraph(
713                         AVFilterGraph& graph,
714                         const std::string& filtergraph,
715                         AVFilterContext& source_ctx,
716                         AVFilterContext& sink_ctx)
717         {
718                 AVFilterInOut* outputs = nullptr;
719                 AVFilterInOut* inputs = nullptr;
720
721                 try
722                 {
723                         if(!filtergraph.empty())
724                         {
725                                 outputs = avfilter_inout_alloc();
726                                 inputs  = avfilter_inout_alloc();
727
728                                 CASPAR_VERIFY(outputs && inputs);
729
730                                 outputs->name       = av_strdup("in");
731                                 outputs->filter_ctx = &source_ctx;
732                                 outputs->pad_idx    = 0;
733                                 outputs->next       = nullptr;
734
735                                 inputs->name        = av_strdup("out");
736                                 inputs->filter_ctx  = &sink_ctx;
737                                 inputs->pad_idx     = 0;
738                                 inputs->next        = nullptr;
739
740                                 FF(avfilter_graph_parse(
741                                         &graph,
742                                         filtergraph.c_str(),
743                                         inputs,
744                                         outputs,
745                                         nullptr));
746                         }
747                         else
748                         {
749                                 FF(avfilter_link(
750                                         &source_ctx,
751                                         0,
752                                         &sink_ctx,
753                                         0));
754                         }
755
756                         FF(avfilter_graph_config(
757                                 &graph,
758                                 nullptr));
759                 }
760                 catch(...)
761                 {
762                         avfilter_inout_free(&outputs);
763                         avfilter_inout_free(&inputs);
764                         throw;
765                 }
766         }
767
768         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
769         {
770                 if(!video_st_)
771                         return;
772
773                 auto enc = video_st_->codec;
774
775                 if(frame_ptr != core::const_frame::empty())
776                 {
777                         auto src_av_frame = create_frame();
778
779                         const auto sample_aspect_ratio =
780                                 boost::rational<int>(
781                                         in_video_format_.square_width,
782                                         in_video_format_.square_height) /
783                                 boost::rational<int>(
784                                         in_video_format_.width,
785                                         in_video_format_.height);
786
787                         src_av_frame->format                                            = AV_PIX_FMT_BGRA;
788                         src_av_frame->width                                             = in_video_format_.width;
789                         src_av_frame->height                                            = in_video_format_.height;
790                         src_av_frame->sample_aspect_ratio.num   = sample_aspect_ratio.numerator();
791                         src_av_frame->sample_aspect_ratio.den   = sample_aspect_ratio.denominator();
792                         src_av_frame->pts                                               = video_pts_;
793
794                         video_pts_ += 1;
795
796                         FF(av_image_fill_arrays(
797                                 src_av_frame->data,
798                                 src_av_frame->linesize,
799                                 frame_ptr.image_data().begin(),
800                                 static_cast<AVPixelFormat>(src_av_frame->format),
801                                 in_video_format_.width,
802                                 in_video_format_.height,
803                                 1));
804
805                         FF(av_buffersrc_add_frame(
806                                 video_graph_in_,
807                                 src_av_frame.get()));
808                 }
809
810                 int ret = 0;
811
812                 while(ret >= 0)
813                 {
814                         auto filt_frame = create_frame();
815
816                         ret = av_buffersink_get_frame(
817                                 video_graph_out_,
818                                 filt_frame.get());
819
820                         video_encoder_executor_.begin_invoke([=]
821                         {
822                                 if(ret == AVERROR_EOF)
823                                 {
824                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
825                                         {
826                                                 while(encode_av_frame(
827                                                                 *video_st_,
828                                                                 avcodec_encode_video2,
829                                                                 nullptr, token))
830                                                 {
831                                                         boost::this_thread::yield(); // TODO:
832                                                 }
833                                         }
834                                 }
835                                 else if(ret != AVERROR(EAGAIN))
836                                 {
837                                         FF_RET(ret, "av_buffersink_get_frame");
838
839                                         if (filt_frame->interlaced_frame)
840                                         {
841                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
842                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
843                                                 else
844                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
845                                         }
846                                         else
847                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
848
849                                         filt_frame->quality = enc->global_quality;
850
851                                         if (!enc->me_threshold)
852                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
853
854                                         encode_av_frame(
855                                                 *video_st_,
856                                                 avcodec_encode_video2,
857                                                 filt_frame,
858                                                 token);
859
860                                         boost::this_thread::yield(); // TODO:
861                                 }
862                         });
863                 }
864         }
865
866         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
867         {
868                 if(!audio_st_)
869                         return;
870
871                 auto enc = audio_st_->codec;
872
873                 if(frame_ptr != core::const_frame::empty())
874                 {
875                         auto src_av_frame = create_frame();
876
877                         src_av_frame->channels           = in_channel_layout_.num_channels;
878                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
879                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
880                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
881                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
882                         src_av_frame->pts                        = audio_pts_;
883
884                         audio_pts_ += src_av_frame->nb_samples;
885
886                         FF(av_samples_fill_arrays(
887                                         src_av_frame->extended_data,
888                                         src_av_frame->linesize,
889                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
890                                         src_av_frame->channels,
891                                         src_av_frame->nb_samples,
892                                         static_cast<AVSampleFormat>(src_av_frame->format),
893                                         16));
894
895                         FF(av_buffersrc_add_frame(
896                                         audio_graph_in_,
897                                         src_av_frame.get()));
898                 }
899
900                 int ret = 0;
901
902                 while(ret >= 0)
903                 {
904                         auto filt_frame = create_frame();
905
906                         ret = av_buffersink_get_frame(
907                                 audio_graph_out_,
908                                 filt_frame.get());
909
910                         audio_encoder_executor_.begin_invoke([=]
911                         {
912                                 if(ret == AVERROR_EOF)
913                                 {
914                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
915                                         {
916                                                 while(encode_av_frame(
917                                                                 *audio_st_,
918                                                                 avcodec_encode_audio2,
919                                                                 nullptr,
920                                                                 token))
921                                                 {
922                                                         boost::this_thread::yield(); // TODO:
923                                                 }
924                                         }
925                                 }
926                                 else if(ret != AVERROR(EAGAIN))
927                                 {
928                                         FF_RET(
929                                                 ret,
930                                                 "av_buffersink_get_frame");
931
932                                         encode_av_frame(
933                                                 *audio_st_,
934                                                 avcodec_encode_audio2,
935                                                 filt_frame,
936                                                 token);
937
938                                         boost::this_thread::yield(); // TODO:
939                                 }
940                         });
941                 }
942         }
943
944         template<typename F>
945         bool encode_av_frame(
946                         AVStream& st,
947                         const F& func,
948                         const std::shared_ptr<AVFrame>& src_av_frame,
949                         std::shared_ptr<void> token)
950         {
951                 AVPacket pkt = {};
952                 av_init_packet(&pkt);
953
954                 int got_packet = 0;
955
956                 FF(func(
957                         st.codec,
958                         &pkt,
959                         src_av_frame.get(),
960                         &got_packet));
961
962                 if(!got_packet || pkt.size <= 0)
963                         return false;
964
965                 pkt.stream_index = st.index;
966
967                 if (pkt.pts != AV_NOPTS_VALUE)
968                 {
969                         pkt.pts =
970                                 av_rescale_q(
971                                         pkt.pts,
972                                         st.codec->time_base,
973                                         st.time_base);
974                 }
975
976                 if (pkt.dts != AV_NOPTS_VALUE)
977                 {
978                         pkt.dts =
979                                 av_rescale_q(
980                                         pkt.dts,
981                                         st.codec->time_base,
982                                         st.time_base);
983                 }
984
985                 pkt.duration =
986                         static_cast<int>(
987                                 av_rescale_q(
988                                         pkt.duration,
989                                         st.codec->time_base, st.time_base));
990
991                 write_packet(
992                         std::shared_ptr<AVPacket>(
993                                 new AVPacket(pkt),
994                                 [](AVPacket* p)
995                                 {
996                                         av_free_packet(p);
997                                         delete p;
998                                 }), token);
999
1000                 return true;
1001         }
1002
1003         void write_packet(
1004                         const std::shared_ptr<AVPacket>& pkt_ptr,
1005                         std::shared_ptr<void> token)
1006         {
1007                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1008                 {
1009                         FF(av_interleaved_write_frame(
1010                                 oc_.get(),
1011                                 pkt_ptr.get()));
1012                 });
1013         }
1014
1015         template<typename T>
1016         static boost::optional<T> try_remove_arg(
1017                         std::map<std::string, std::string>& options,
1018                         const boost::regex& expr)
1019         {
1020                 for(auto it = options.begin(); it != options.end(); ++it)
1021                 {
1022                         if(boost::regex_search(it->first, expr))
1023                         {
1024                                 auto arg = it->second;
1025                                 options.erase(it);
1026                                 return boost::lexical_cast<T>(arg);
1027                         }
1028                 }
1029
1030                 return boost::optional<T>();
1031         }
1032
1033         static std::map<std::string, std::string> remove_options(
1034                         std::map<std::string, std::string>& options,
1035                         const boost::regex& expr)
1036         {
1037                 std::map<std::string, std::string> result;
1038
1039                 auto it = options.begin();
1040                 while(it != options.end())
1041                 {
1042                         boost::smatch what;
1043                         if(boost::regex_search(it->first, what, expr))
1044                         {
1045                                 result[
1046                                         what.size() > 0 && what[1].matched
1047                                                 ? what[1].str()
1048                                                 : it->first] = it->second;
1049                                 it = options.erase(it);
1050                         }
1051                         else
1052                                 ++it;
1053                 }
1054
1055                 return result;
1056         }
1057
1058         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1059         {
1060                 for (const auto& entry : c)
1061                 {
1062                         av_dict_set(
1063                                 dest,
1064                                 entry.first.c_str(),
1065                                 entry.second.c_str(), 0);
1066                 }
1067         }
1068
1069         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1070         {
1071                 std::map<std::string, std::string> result;
1072
1073                 for(auto t = dict
1074                                 ? av_dict_get(
1075                                         dict,
1076                                         "",
1077                                         nullptr,
1078                                         AV_DICT_IGNORE_SUFFIX)
1079                                 : nullptr;
1080                         t;
1081                         t = av_dict_get(
1082                                 dict,
1083                                 "",
1084                                 t,
1085                                 AV_DICT_IGNORE_SUFFIX))
1086                 {
1087                         result[t->key] = t->value;
1088                 }
1089
1090                 return result;
1091         }
1092 };
1093
1094 int crc16(const std::string& str)
1095 {
1096         boost::crc_16_type result;
1097
1098         result.process_bytes(str.data(), str.length());
1099
1100         return result.checksum();
1101 }
1102
1103 struct ffmpeg_consumer_proxy : public core::frame_consumer
1104 {
1105         const std::string                                       path_;
1106         const std::string                                       options_;
1107         const bool                                                      separate_key_;
1108         const bool                                                      compatibility_mode_;
1109         int                                                                     consumer_index_offset_;
1110
1111         std::unique_ptr<ffmpeg_consumer>        consumer_;
1112         std::unique_ptr<ffmpeg_consumer>        key_only_consumer_;
1113
1114 public:
1115
1116         ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode)
1117                 : path_(path)
1118                 , options_(options)
1119                 , separate_key_(separate_key)
1120                 , compatibility_mode_(compatibility_mode)
1121                 , consumer_index_offset_(crc16(path))
1122         {
1123         }
1124
1125         void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
1126         {
1127                 if (consumer_)
1128                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
1129
1130                 consumer_.reset(new ffmpeg_consumer(path_, options_));
1131                 consumer_->initialize(format_desc, channel_layout);
1132
1133                 if (separate_key_)
1134                 {
1135                         boost::filesystem::path fill_file(path_);
1136                         auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
1137                         auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
1138
1139                         key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_));
1140                         key_only_consumer_->initialize(format_desc, channel_layout);
1141                 }
1142         }
1143
1144         int64_t presentation_frame_age_millis() const override
1145         {
1146                 return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
1147         }
1148
1149         std::future<bool> send(core::const_frame frame) override
1150         {
1151                 bool ready_for_frame = consumer_->ready_for_frame();
1152
1153                 if (ready_for_frame && separate_key_)
1154                         ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
1155
1156                 if (ready_for_frame)
1157                 {
1158                         consumer_->send(frame);
1159
1160                         if (separate_key_)
1161                                 key_only_consumer_->send(frame.key_only());
1162                 }
1163                 else
1164                 {
1165                         consumer_->mark_dropped();
1166
1167                         if (separate_key_)
1168                                 key_only_consumer_->mark_dropped();
1169                 }
1170
1171                 return make_ready_future(true);
1172         }
1173
1174         std::wstring print() const override
1175         {
1176                 return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
1177         }
1178
1179         std::wstring name() const override
1180         {
1181                 return L"ffmpeg";
1182         }
1183
1184         boost::property_tree::wptree info() const override
1185         {
1186                 boost::property_tree::wptree info;
1187                 info.add(L"type", L"ffmpeg");
1188                 info.add(L"path", u16(path_));
1189                 info.add(L"separate_key", separate_key_);
1190                 return info;
1191         }
1192
1193         bool has_synchronization_clock() const override
1194         {
1195                 return false;
1196         }
1197
1198         int buffer_depth() const override
1199         {
1200                 return -1;
1201         }
1202
1203         int index() const override
1204         {
1205                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
1206         }
1207
1208         core::monitor::subject& monitor_output()
1209         {
1210                 return consumer_->monitor_output();
1211         }
1212 };
1213
1214 }
1215
1216 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1217 {
1218         sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
1219         sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1220         sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
1221         sink.definitions()
1222                 ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
1223                 ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
1224                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1225         sink.para()->text(L"Examples:");
1226         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
1227         sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
1228         sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
1229         sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
1230         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
1231         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
1232                 L"for streaming over UDP instead of creating a local file.");
1233 }
1234
1235 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1236                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1237 {
1238         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1239                 return core::frame_consumer::empty();
1240
1241         auto params2                    = params;
1242         auto separate_key_it    = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
1243         bool separate_key               = false;
1244
1245         if (separate_key_it != params2.end())
1246         {
1247                 separate_key = true;
1248                 params2.erase(separate_key_it);
1249         }
1250
1251         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1252         auto path                               = u8(params2.size() > 1 ? params2.at(1) : L"");
1253         auto args                               = u8(boost::join(params2, L" "));
1254
1255         return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, compatibility_mode);
1256 }
1257
1258 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1259                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1260 {
1261         return spl::make_shared<ffmpeg_consumer_proxy>(
1262                         u8(ptree_get<std::wstring>(ptree, L"path")),
1263                         u8(ptree.get<std::wstring>(L"args", L"")),
1264                         ptree.get<bool>(L"separate-key", false),
1265                         false);
1266 }
1267
1268 }}