]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
d0cf1aa72df82a2983bfe7c6eb25bac27a531304
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7 #include "../producer/filter/filter.h"
8
9 #include <common/except.h>
10 #include <common/executor.h>
11 #include <common/assert.h>
12 #include <common/utf.h>
13 #include <common/future.h>
14 #include <common/diagnostics/graph.h>
15 #include <common/env.h>
16 #include <common/scope_exit.h>
17 #include <common/ptree.h>
18 #include <common/param.h>
19 #include <common/semaphore.h>
20
21 #include <core/consumer/frame_consumer.h>
22 #include <core/frame/frame.h>
23 #include <core/frame/audio_channel_layout.h>
24 #include <core/video_format.h>
25 #include <core/monitor/monitor.h>
26 #include <core/help/help_repository.h>
27 #include <core/help/help_sink.h>
28
29 #include <boost/noncopyable.hpp>
30 #include <boost/rational.hpp>
31 #include <boost/format.hpp>
32 #include <boost/algorithm/string/predicate.hpp>
33 #include <boost/property_tree/ptree.hpp>
34
35 #pragma warning(push)
36 #pragma warning(disable: 4244)
37 #pragma warning(disable: 4245)
38 #include <boost/crc.hpp>
39 #pragma warning(pop)
40
41 #include <tbb/atomic.h>
42 #include <tbb/concurrent_queue.h>
43 #include <tbb/parallel_invoke.h>
44 #include <tbb/parallel_for.h>
45
46 #include <numeric>
47
48 #pragma warning(push)
49 #pragma warning(disable: 4244)
50
51 extern "C"
52 {
53         #define __STDC_CONSTANT_MACROS
54         #define __STDC_LIMIT_MACROS
55         #include <libavformat/avformat.h>
56         #include <libavcodec/avcodec.h>
57         #include <libavutil/avutil.h>
58         #include <libavutil/frame.h>
59         #include <libavutil/opt.h>
60         #include <libavutil/imgutils.h>
61         #include <libavutil/parseutils.h>
62         #include <libavfilter/avfilter.h>
63         #include <libavfilter/buffersink.h>
64         #include <libavfilter/buffersrc.h>
65 }
66
67 #pragma warning(pop)
68
69 namespace caspar { namespace ffmpeg { namespace {
70
71 void set_pixel_format(AVFilterContext* sink, AVPixelFormat pix_fmt)
72 {
73 #pragma warning (push)
74 #pragma warning (disable : 4245)
75
76         FF(av_opt_set_int_list(
77                 sink,
78                 "pix_fmts",
79                 std::vector<AVPixelFormat>({ pix_fmt, AVPixelFormat::AV_PIX_FMT_NONE }).data(),
80                 -1,
81                 AV_OPT_SEARCH_CHILDREN));
82
83 #pragma warning (pop)
84 }
85
86 void adjust_video_filter(const AVCodec& codec, const core::video_format_desc& in_format, AVFilterContext* sink, std::string& filter)
87 {
88         switch (codec.id)
89         {
90         case AV_CODEC_ID_DVVIDEO:
91                 // Crop
92                 if (in_format.format == core::video_format::ntsc)
93                         filter = u8(append_filter(u16(filter), L"crop=720:480:0:2"));
94
95                 // Pixel format selection
96                 if (in_format.format == core::video_format::ntsc)
97                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV411P);
98                 else if (in_format.format == core::video_format::pal)
99                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV420P);
100                 else
101                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV422P);
102
103                 // Scale
104                 if (in_format.height == 1080)
105                         filter = u8(append_filter(u16(filter), in_format.duration == 1001
106                                 ? L"scale=1280:1080"
107                                 : L"scale=1440:1080"));
108                 else if (in_format.height == 720)
109                         filter = u8(append_filter(u16(filter), L"scale=960:720"));
110
111                 break;
112         }
113 }
114
115 void setup_codec_defaults(AVCodecContext& encoder)
116 {
117         static const int MEGABIT = 1000000;
118
119         switch (encoder.codec_id)
120         {
121         case AV_CODEC_ID_DNXHD:
122                 encoder.bit_rate = 220 * MEGABIT;
123
124                 break;
125         case AV_CODEC_ID_PRORES:
126                 encoder.bit_rate = encoder.width < 1280
127                                 ?  63 * MEGABIT
128                                 : 220 * MEGABIT;
129
130                 break;
131         case AV_CODEC_ID_H264:
132                 av_opt_set(encoder.priv_data,   "preset",       "ultrafast",    0);
133                 av_opt_set(encoder.priv_data,   "tune",         "fastdecode",   0);
134                 av_opt_set(encoder.priv_data,   "crf",          "5",                    0);
135
136                 break;
137         }
138 }
139
140 bool is_pcm_s24le_not_supported(const AVFormatContext& container)
141 {
142         auto name = std::string(container.oformat->name);
143
144         if (name == "mp4" || name == "dv")
145                 return true;
146
147         return false;
148 }
149
150 class ffmpeg_consumer
151 {
152 public:
153         // Static Members
154
155 private:
156         const spl::shared_ptr<diagnostics::graph>       graph_;
157         core::monitor::subject                                          subject_;
158         std::string                                                                     path_;
159         boost::filesystem::path                                         full_path_;
160
161         std::map<std::string, std::string>                      options_;
162
163         core::video_format_desc                                         in_video_format_;
164         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
165
166         std::shared_ptr<AVFormatContext>                        oc_;
167         tbb::atomic<bool>                                                       abort_request_;
168
169         std::shared_ptr<AVStream>                                       video_st_;
170         std::shared_ptr<AVStream>                                       audio_st_;
171
172         std::int64_t                                                            video_pts_                                      = 0;
173         std::int64_t                                                            audio_pts_                                      = 0;
174
175     AVFilterContext*                                                    audio_graph_in_;
176     AVFilterContext*                                                    audio_graph_out_;
177     std::shared_ptr<AVFilterGraph>                              audio_graph_;
178
179     AVFilterContext*                                                    video_graph_in_;
180     AVFilterContext*                                                    video_graph_out_;
181     std::shared_ptr<AVFilterGraph>                              video_graph_;
182
183         executor                                                                        video_encoder_executor_;
184         executor                                                                        audio_encoder_executor_;
185
186         semaphore                                                                       tokens_                                         { 0 };
187
188         tbb::atomic<int64_t>                                            current_encoding_delay_;
189
190         executor                                                                        write_executor_;
191
192 public:
193
194         ffmpeg_consumer(
195                         std::string path,
196                         std::string options)
197                 : path_(path)
198                 , full_path_(path)
199                 , audio_encoder_executor_(print() + L" audio_encoder")
200                 , video_encoder_executor_(print() + L" video_encoder")
201                 , write_executor_(print() + L" io")
202         {
203                 abort_request_ = false;
204                 current_encoding_delay_ = 0;
205
206                 for(auto it =
207                                 boost::sregex_iterator(
208                                         options.begin(),
209                                         options.end(),
210                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
211                         it != boost::sregex_iterator();
212                         ++it)
213                 {
214                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
215                 }
216
217         if (options_.find("threads") == options_.end())
218             options_["threads"] = "auto";
219
220                 tokens_.release(
221                         std::max(
222                                 1,
223                                 try_remove_arg<int>(
224                                         options_,
225                                         boost::regex("tokens")).get_value_or(2)));
226         }
227
228         ~ffmpeg_consumer()
229         {
230                 if(oc_)
231                 {
232                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
233                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
234
235                         video_encoder_executor_.stop();
236                         audio_encoder_executor_.stop();
237                         video_encoder_executor_.join();
238                         audio_encoder_executor_.join();
239
240                         video_graph_.reset();
241                         audio_graph_.reset();
242                         video_st_.reset();
243                         audio_st_.reset();
244
245                         write_packet(nullptr, nullptr);
246
247                         write_executor_.stop();
248                         write_executor_.join();
249
250                         FF(av_write_trailer(oc_.get()));
251
252                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
253                                 avio_close(oc_->pb);
254
255                         oc_.reset();
256                 }
257         }
258
259         void initialize(
260                         const core::video_format_desc& format_desc,
261                         const core::audio_channel_layout& channel_layout)
262         {
263                 try
264                 {
265                         static boost::regex prot_exp("^.+:.*" );
266
267                         if(!boost::regex_match(
268                                         path_,
269                                         prot_exp))
270                         {
271                                 if(!full_path_.is_complete())
272                                 {
273                                         full_path_ =
274                                                 u8(
275                                                         env::media_folder()) +
276                                                         path_;
277                                 }
278
279                                 if(boost::filesystem::exists(full_path_))
280                                         boost::filesystem::remove(full_path_);
281
282                                 boost::filesystem::create_directories(full_path_.parent_path());
283                         }
284
285                         graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
286                         graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
287                         graph_->set_text(print());
288                         diagnostics::register_graph(graph_);
289
290                         const auto oformat_name =
291                                 try_remove_arg<std::string>(
292                                         options_,
293                                         boost::regex("^f|format$"));
294
295                         AVFormatContext* oc;
296
297                         FF(avformat_alloc_output_context2(
298                                 &oc,
299                                 nullptr,
300                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
301                                 full_path_.string().c_str()));
302
303                         oc_.reset(
304                                 oc,
305                                 avformat_free_context);
306
307                         CASPAR_VERIFY(oc_->oformat);
308
309                         oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
310                         oc_->interrupt_callback.opaque   = this;
311
312                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
313
314                         in_video_format_ = format_desc;
315                         in_channel_layout_ = channel_layout;
316
317                         CASPAR_VERIFY(oc_->oformat);
318
319                         const auto video_codec_name =
320                                 try_remove_arg<std::string>(
321                                         options_,
322                                         boost::regex("^c:v|codec:v|vcodec$"));
323
324                         const auto video_codec =
325                                 video_codec_name
326                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
327                                         : avcodec_find_encoder(oc_->oformat->video_codec);
328
329                         const auto audio_codec_name =
330                                 try_remove_arg<std::string>(
331                                         options_,
332                                          boost::regex("^c:a|codec:a|acodec$"));
333
334                         const auto audio_codec =
335                                 audio_codec_name
336                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
337                                         : (is_pcm_s24le_not_supported(*oc_)
338                                                 ? avcodec_find_encoder(oc_->oformat->audio_codec)
339                                                 : avcodec_find_encoder_by_name("pcm_s24le"));
340
341                         if (!video_codec)
342                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
343                                                 "Failed to find video codec " + (video_codec_name
344                                                                 ? *video_codec_name
345                                                                 : "with id " + boost::lexical_cast<std::string>(
346                                                                                 oc_->oformat->video_codec))));
347                         if (!audio_codec)
348                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
349                                                 "Failed to find audio codec " + (audio_codec_name
350                                                                 ? *audio_codec_name
351                                                                 : "with id " + boost::lexical_cast<std::string>(
352                                                                                 oc_->oformat->audio_codec))));
353
354                         // Filters
355
356                         {
357                                 configure_video_filters(
358                                         *video_codec,
359                                         try_remove_arg<std::string>(options_,
360                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
361
362                                 configure_audio_filters(
363                                         *audio_codec,
364                                         try_remove_arg<std::string>(options_,
365                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
366                         }
367
368                         // Encoders
369
370                         {
371                                 auto video_options = options_;
372                                 auto audio_options = options_;
373
374                                 video_st_ = open_encoder(
375                                         *video_codec,
376                                         video_options);
377
378                                 audio_st_ = open_encoder(
379                                         *audio_codec,
380                                         audio_options);
381
382                                 auto it = options_.begin();
383                                 while(it != options_.end())
384                                 {
385                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
386                                                 it = options_.erase(it);
387                                         else
388                                                 ++it;
389                                 }
390                         }
391
392                         // Output
393                         {
394                                 AVDictionary* av_opts = nullptr;
395
396                                 to_dict(
397                                         &av_opts,
398                                         std::move(options_));
399
400                                 CASPAR_SCOPE_EXIT
401                                 {
402                                         av_dict_free(&av_opts);
403                                 };
404
405                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
406                                 {
407                                         FF(avio_open2(
408                                                 &oc_->pb,
409                                                 full_path_.string().c_str(),
410                                                 AVIO_FLAG_WRITE,
411                                                 &oc_->interrupt_callback,
412                                                 &av_opts));
413                                 }
414
415                                 FF(avformat_write_header(
416                                         oc_.get(),
417                                         &av_opts));
418
419                                 options_ = to_map(av_opts);
420                         }
421
422                         // Dump Info
423
424                         av_dump_format(
425                                 oc_.get(),
426                                 0,
427                                 oc_->filename,
428                                 1);
429
430                         for (const auto& option : options_)
431                         {
432                                 CASPAR_LOG(warning)
433                                         << L"Invalid option: -"
434                                         << u16(option.first)
435                                         << L" "
436                                         << u16(option.second);
437                         }
438                 }
439                 catch(...)
440                 {
441                         video_st_.reset();
442                         audio_st_.reset();
443                         oc_.reset();
444                         throw;
445                 }
446         }
447
448         core::monitor::subject& monitor_output()
449         {
450                 return subject_;
451         }
452
453         void send(core::const_frame frame)
454         {
455                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
456
457                 auto frame_timer = spl::make_shared<caspar::timer>();
458
459                 std::shared_ptr<void> token(
460                         nullptr,
461                         [this, frame, frame_timer](void*)
462                         {
463                                 tokens_.release();
464                                 current_encoding_delay_ = frame.get_age_millis();
465                                 graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
466                         });
467                 tokens_.acquire();
468
469                 video_encoder_executor_.begin_invoke([=]() mutable
470                 {
471                         encode_video(
472                                 frame,
473                                 token);
474                 });
475
476                 audio_encoder_executor_.begin_invoke([=]() mutable
477                 {
478                         encode_audio(
479                                 frame,
480                                 token);
481                 });
482         }
483
484         bool ready_for_frame() const
485         {
486                 return tokens_.permits() > 0;
487         }
488
489         void mark_dropped()
490         {
491                 graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
492         }
493
494         std::wstring print() const
495         {
496                 return L"ffmpeg_consumer[" + u16(path_) + L"]";
497         }
498
499         int64_t presentation_frame_age_millis() const
500         {
501                 return current_encoding_delay_;
502         }
503
504 private:
505
506         static int interrupt_cb(void* ctx)
507         {
508                 CASPAR_ASSERT(ctx);
509                 return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
510         }
511
512         std::shared_ptr<AVStream> open_encoder(
513                         const AVCodec& codec,
514                         std::map<std::string,
515                         std::string>& options)
516         {
517                 auto st =
518                         avformat_new_stream(
519                                 oc_.get(),
520                                 &codec);
521
522                 if (!st)
523                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream"));
524
525                 auto enc = st->codec;
526
527                 CASPAR_VERIFY(enc);
528
529                 switch(enc->codec_type)
530                 {
531                         case AVMEDIA_TYPE_VIDEO:
532                         {
533                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
534                                 enc->pix_fmt                                    = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
535                                 enc->sample_aspect_ratio                = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
536                                 enc->width                                      = video_graph_out_->inputs[0]->w;
537                                 enc->height                                     = video_graph_out_->inputs[0]->h;
538                                 enc->bit_rate_tolerance         = 400 * 1000000;
539
540                                 break;
541                         }
542                         case AVMEDIA_TYPE_AUDIO:
543                         {
544                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
545                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
546                                 enc->sample_rate                                = audio_graph_out_->inputs[0]->sample_rate;
547                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
548                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
549
550                                 break;
551                         }
552                 }
553
554                 setup_codec_defaults(*enc);
555
556                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
557                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
558
559                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
560
561                 const auto char_id = char_id_map.at(enc->codec_type);
562
563                 const auto codec_opts =
564                         remove_options(
565                                 options,
566                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
567
568                 AVDictionary* av_codec_opts = nullptr;
569
570                 to_dict(
571                         &av_codec_opts,
572                         options);
573
574                 to_dict(
575                         &av_codec_opts,
576                         codec_opts);
577
578                 options.clear();
579
580                 FF(avcodec_open2(
581                         enc,
582                         &codec,
583                         av_codec_opts ? &av_codec_opts : nullptr));
584
585                 if(av_codec_opts)
586                 {
587                         auto t =
588                                 av_dict_get(
589                                         av_codec_opts,
590                                         "",
591                                          nullptr,
592                                         AV_DICT_IGNORE_SUFFIX);
593
594                         while(t)
595                         {
596                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
597
598                                 t = av_dict_get(
599                                                 av_codec_opts,
600                                                 "",
601                                                 t,
602                                                 AV_DICT_IGNORE_SUFFIX);
603                         }
604
605                         av_dict_free(&av_codec_opts);
606                 }
607
608                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
609                 {
610                         CASPAR_ASSERT(enc->frame_size > 0);
611                         av_buffersink_set_frame_size(audio_graph_out_,
612                                                                                  enc->frame_size);
613                 }
614
615                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
616                 {
617                         avcodec_close(st->codec);
618                 });
619         }
620
621         void configure_video_filters(
622                         const AVCodec& codec,
623                         std::string filtergraph)
624         {
625                 video_graph_.reset(
626                                 avfilter_graph_alloc(),
627                                 [](AVFilterGraph* p)
628                                 {
629                                         avfilter_graph_free(&p);
630                                 });
631
632                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
633                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
634
635                 const auto sample_aspect_ratio =
636                         boost::rational<int>(
637                                         in_video_format_.square_width,
638                                         in_video_format_.square_height) /
639                         boost::rational<int>(
640                                         in_video_format_.width,
641                                         in_video_format_.height);
642
643                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
644                         % in_video_format_.width % in_video_format_.height
645                         % AVPixelFormat::AV_PIX_FMT_BGRA
646                         % in_video_format_.duration     % in_video_format_.time_scale
647                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
648                         % in_video_format_.time_scale % in_video_format_.duration).str();
649
650                 AVFilterContext* filt_vsrc = nullptr;
651                 FF(avfilter_graph_create_filter(
652                                 &filt_vsrc,
653                                 avfilter_get_by_name("buffer"),
654                                 "ffmpeg_consumer_buffer",
655                                 vsrc_options.c_str(),
656                                 nullptr,
657                                 video_graph_.get()));
658
659                 AVFilterContext* filt_vsink = nullptr;
660                 FF(avfilter_graph_create_filter(
661                                 &filt_vsink,
662                                 avfilter_get_by_name("buffersink"),
663                                 "ffmpeg_consumer_buffersink",
664                                 nullptr,
665                                 nullptr,
666                                 video_graph_.get()));
667
668 #pragma warning (push)
669 #pragma warning (disable : 4245)
670
671                 FF(av_opt_set_int_list(
672                                 filt_vsink,
673                                 "pix_fmts",
674                                 codec.pix_fmts,
675                                 -1,
676                                 AV_OPT_SEARCH_CHILDREN));
677
678 #pragma warning (pop)
679
680                 adjust_video_filter(codec, in_video_format_, filt_vsink, filtergraph);
681
682                 configure_filtergraph(
683                                 *video_graph_,
684                                 filtergraph,
685                                 *filt_vsrc,
686                                 *filt_vsink);
687
688                 video_graph_in_  = filt_vsrc;
689                 video_graph_out_ = filt_vsink;
690
691                 CASPAR_LOG(info)
692                         <<      u16(std::string("\n")
693                                 + avfilter_graph_dump(
694                                                 video_graph_.get(),
695                                                 nullptr));
696         }
697
698         void configure_audio_filters(
699                         const AVCodec& codec,
700                         const std::string& filtergraph)
701         {
702                 audio_graph_.reset(
703                         avfilter_graph_alloc(),
704                         [](AVFilterGraph* p)
705                         {
706                                 avfilter_graph_free(&p);
707                         });
708
709                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
710                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
711
712                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
713                         % in_video_format_.audio_sample_rate
714                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
715                         % in_channel_layout_.num_channels
716                         % 1     % in_video_format_.audio_sample_rate
717                         % boost::io::group(
718                                 std::hex,
719                                 std::showbase,
720                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
721
722                 AVFilterContext* filt_asrc = nullptr;
723                 FF(avfilter_graph_create_filter(
724                         &filt_asrc,
725                         avfilter_get_by_name("abuffer"),
726                         "ffmpeg_consumer_abuffer",
727                         asrc_options.c_str(),
728                         nullptr,
729                         audio_graph_.get()));
730
731                 AVFilterContext* filt_asink = nullptr;
732                 FF(avfilter_graph_create_filter(
733                         &filt_asink,
734                         avfilter_get_by_name("abuffersink"),
735                         "ffmpeg_consumer_abuffersink",
736                         nullptr,
737                         nullptr,
738                         audio_graph_.get()));
739
740 #pragma warning (push)
741 #pragma warning (disable : 4245)
742
743                 FF(av_opt_set_int(
744                         filt_asink,
745                         "all_channel_counts",
746                         1,
747                         AV_OPT_SEARCH_CHILDREN));
748
749                 FF(av_opt_set_int_list(
750                         filt_asink,
751                         "sample_fmts",
752                         codec.sample_fmts,
753                         -1,
754                         AV_OPT_SEARCH_CHILDREN));
755
756                 FF(av_opt_set_int_list(
757                         filt_asink,
758                         "channel_layouts",
759                         codec.channel_layouts,
760                         -1,
761                         AV_OPT_SEARCH_CHILDREN));
762
763                 FF(av_opt_set_int_list(
764                         filt_asink,
765                         "sample_rates" ,
766                         codec.supported_samplerates,
767                         -1,
768                         AV_OPT_SEARCH_CHILDREN));
769
770 #pragma warning (pop)
771
772                 configure_filtergraph(
773                         *audio_graph_,
774                         filtergraph,
775                         *filt_asrc,
776                         *filt_asink);
777
778                 audio_graph_in_  = filt_asrc;
779                 audio_graph_out_ = filt_asink;
780
781                 CASPAR_LOG(info)
782                         <<      u16(std::string("\n")
783                                 + avfilter_graph_dump(
784                                         audio_graph_.get(),
785                                         nullptr));
786         }
787
788         void configure_filtergraph(
789                         AVFilterGraph& graph,
790                         const std::string& filtergraph,
791                         AVFilterContext& source_ctx,
792                         AVFilterContext& sink_ctx)
793         {
794                 AVFilterInOut* outputs = nullptr;
795                 AVFilterInOut* inputs = nullptr;
796
797                 if(!filtergraph.empty())
798                 {
799                         outputs = avfilter_inout_alloc();
800                         inputs  = avfilter_inout_alloc();
801
802                         try
803                         {
804                                 CASPAR_VERIFY(outputs && inputs);
805
806                                 outputs->name           = av_strdup("in");
807                                 outputs->filter_ctx     = &source_ctx;
808                                 outputs->pad_idx                = 0;
809                                 outputs->next           = nullptr;
810
811                                 inputs->name                    = av_strdup("out");
812                                 inputs->filter_ctx      = &sink_ctx;
813                                 inputs->pad_idx         = 0;
814                                 inputs->next                    = nullptr;
815                         }
816                         catch (...)
817                         {
818                                 avfilter_inout_free(&outputs);
819                                 avfilter_inout_free(&inputs);
820                                 throw;
821                         }
822
823                         FF(avfilter_graph_parse(
824                                         &graph,
825                                         filtergraph.c_str(),
826                                         inputs,
827                                         outputs,
828                                         nullptr));
829                 }
830                 else
831                 {
832                         FF(avfilter_link(
833                                         &source_ctx,
834                                         0,
835                                         &sink_ctx,
836                                         0));
837                 }
838
839                 FF(avfilter_graph_config(
840                                 &graph,
841                                 nullptr));
842         }
843
844         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
845         {
846                 if(!video_st_)
847                         return;
848
849                 auto enc = video_st_->codec;
850
851                 if(frame_ptr != core::const_frame::empty())
852                 {
853                         auto src_av_frame = create_frame();
854
855                         const auto sample_aspect_ratio =
856                                 boost::rational<int>(
857                                         in_video_format_.square_width,
858                                         in_video_format_.square_height) /
859                                 boost::rational<int>(
860                                         in_video_format_.width,
861                                         in_video_format_.height);
862
863                         src_av_frame->format                                            = AVPixelFormat::AV_PIX_FMT_BGRA;
864                         src_av_frame->width                                             = in_video_format_.width;
865                         src_av_frame->height                                            = in_video_format_.height;
866                         src_av_frame->sample_aspect_ratio.num   = sample_aspect_ratio.numerator();
867                         src_av_frame->sample_aspect_ratio.den   = sample_aspect_ratio.denominator();
868                         src_av_frame->pts                                               = video_pts_;
869
870                         video_pts_ += 1;
871
872                         subject_
873                                         << core::monitor::message("/frame")     % video_pts_
874                                         << core::monitor::message("/path")      % path_
875                                         << core::monitor::message("/fps")       % in_video_format_.fps;
876
877                         FF(av_image_fill_arrays(
878                                 src_av_frame->data,
879                                 src_av_frame->linesize,
880                                 frame_ptr.image_data().begin(),
881                                 static_cast<AVPixelFormat>(src_av_frame->format),
882                                 in_video_format_.width,
883                                 in_video_format_.height,
884                                 1));
885
886                         FF(av_buffersrc_add_frame(
887                                 video_graph_in_,
888                                 src_av_frame.get()));
889                 }
890
891                 int ret = 0;
892
893                 while(ret >= 0)
894                 {
895                         auto filt_frame = create_frame();
896
897                         ret = av_buffersink_get_frame(
898                                 video_graph_out_,
899                                 filt_frame.get());
900
901                         video_encoder_executor_.begin_invoke([=]
902                         {
903                                 if(ret == AVERROR_EOF)
904                                 {
905                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
906                                         {
907                                                 while(encode_av_frame(
908                                                                 *video_st_,
909                                                                 avcodec_encode_video2,
910                                                                 nullptr, token))
911                                                 {
912                                                         boost::this_thread::yield(); // TODO:
913                                                 }
914                                         }
915                                 }
916                                 else if(ret != AVERROR(EAGAIN))
917                                 {
918                                         FF_RET(ret, "av_buffersink_get_frame");
919
920                                         if (filt_frame->interlaced_frame)
921                                         {
922                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
923                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
924                                                 else
925                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
926                                         }
927                                         else
928                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
929
930                                         filt_frame->quality = enc->global_quality;
931
932                                         if (!enc->me_threshold)
933                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
934
935                                         encode_av_frame(
936                                                 *video_st_,
937                                                 avcodec_encode_video2,
938                                                 filt_frame,
939                                                 token);
940
941                                         boost::this_thread::yield(); // TODO:
942                                 }
943                         });
944                 }
945         }
946
947         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
948         {
949                 if(!audio_st_)
950                         return;
951
952                 auto enc = audio_st_->codec;
953
954                 if(frame_ptr != core::const_frame::empty())
955                 {
956                         auto src_av_frame = create_frame();
957
958                         src_av_frame->channels           = in_channel_layout_.num_channels;
959                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
960                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
961                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
962                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
963                         src_av_frame->pts                        = audio_pts_;
964
965                         audio_pts_ += src_av_frame->nb_samples;
966
967                         FF(av_samples_fill_arrays(
968                                         src_av_frame->extended_data,
969                                         src_av_frame->linesize,
970                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
971                                         src_av_frame->channels,
972                                         src_av_frame->nb_samples,
973                                         static_cast<AVSampleFormat>(src_av_frame->format),
974                                         16));
975
976                         FF(av_buffersrc_add_frame(
977                                         audio_graph_in_,
978                                         src_av_frame.get()));
979                 }
980
981                 int ret = 0;
982
983                 while(ret >= 0)
984                 {
985                         auto filt_frame = create_frame();
986
987                         ret = av_buffersink_get_frame(
988                                 audio_graph_out_,
989                                 filt_frame.get());
990
991                         audio_encoder_executor_.begin_invoke([=]
992                         {
993                                 if(ret == AVERROR_EOF)
994                                 {
995                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
996                                         {
997                                                 while(encode_av_frame(
998                                                                 *audio_st_,
999                                                                 avcodec_encode_audio2,
1000                                                                 nullptr,
1001                                                                 token))
1002                                                 {
1003                                                         boost::this_thread::yield(); // TODO:
1004                                                 }
1005                                         }
1006                                 }
1007                                 else if(ret != AVERROR(EAGAIN))
1008                                 {
1009                                         FF_RET(
1010                                                 ret,
1011                                                 "av_buffersink_get_frame");
1012
1013                                         encode_av_frame(
1014                                                 *audio_st_,
1015                                                 avcodec_encode_audio2,
1016                                                 filt_frame,
1017                                                 token);
1018
1019                                         boost::this_thread::yield(); // TODO:
1020                                 }
1021                         });
1022                 }
1023         }
1024
1025         template<typename F>
1026         bool encode_av_frame(
1027                         AVStream& st,
1028                         const F& func,
1029                         const std::shared_ptr<AVFrame>& src_av_frame,
1030                         std::shared_ptr<void> token)
1031         {
1032                 AVPacket pkt = {};
1033                 av_init_packet(&pkt);
1034
1035                 int got_packet = 0;
1036
1037                 FF(func(
1038                         st.codec,
1039                         &pkt,
1040                         src_av_frame.get(),
1041                         &got_packet));
1042
1043                 if(!got_packet || pkt.size <= 0)
1044                         return false;
1045
1046                 pkt.stream_index = st.index;
1047
1048                 if (pkt.pts != AV_NOPTS_VALUE)
1049                 {
1050                         pkt.pts =
1051                                 av_rescale_q(
1052                                         pkt.pts,
1053                                         st.codec->time_base,
1054                                         st.time_base);
1055                 }
1056
1057                 if (pkt.dts != AV_NOPTS_VALUE)
1058                 {
1059                         pkt.dts =
1060                                 av_rescale_q(
1061                                         pkt.dts,
1062                                         st.codec->time_base,
1063                                         st.time_base);
1064                 }
1065
1066                 pkt.duration =
1067                         static_cast<int>(
1068                                 av_rescale_q(
1069                                         pkt.duration,
1070                                         st.codec->time_base, st.time_base));
1071
1072                 write_packet(
1073                         std::shared_ptr<AVPacket>(
1074                                 new AVPacket(pkt),
1075                                 [](AVPacket* p)
1076                                 {
1077                                         av_free_packet(p);
1078                                         delete p;
1079                                 }), token);
1080
1081                 return true;
1082         }
1083
1084         void write_packet(
1085                         const std::shared_ptr<AVPacket>& pkt_ptr,
1086                         std::shared_ptr<void> token)
1087         {
1088                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1089                 {
1090                         FF(av_interleaved_write_frame(
1091                                 oc_.get(),
1092                                 pkt_ptr.get()));
1093                 });
1094         }
1095
1096         template<typename T>
1097         static boost::optional<T> try_remove_arg(
1098                         std::map<std::string, std::string>& options,
1099                         const boost::regex& expr)
1100         {
1101                 for(auto it = options.begin(); it != options.end(); ++it)
1102                 {
1103                         if(boost::regex_search(it->first, expr))
1104                         {
1105                                 auto arg = it->second;
1106                                 options.erase(it);
1107                                 return boost::lexical_cast<T>(arg);
1108                         }
1109                 }
1110
1111                 return boost::optional<T>();
1112         }
1113
1114         static std::map<std::string, std::string> remove_options(
1115                         std::map<std::string, std::string>& options,
1116                         const boost::regex& expr)
1117         {
1118                 std::map<std::string, std::string> result;
1119
1120                 auto it = options.begin();
1121                 while(it != options.end())
1122                 {
1123                         boost::smatch what;
1124                         if(boost::regex_search(it->first, what, expr))
1125                         {
1126                                 result[
1127                                         what.size() > 0 && what[1].matched
1128                                                 ? what[1].str()
1129                                                 : it->first] = it->second;
1130                                 it = options.erase(it);
1131                         }
1132                         else
1133                                 ++it;
1134                 }
1135
1136                 return result;
1137         }
1138
1139         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1140         {
1141                 for (const auto& entry : c)
1142                 {
1143                         av_dict_set(
1144                                 dest,
1145                                 entry.first.c_str(),
1146                                 entry.second.c_str(), 0);
1147                 }
1148         }
1149
1150         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1151         {
1152                 std::map<std::string, std::string> result;
1153
1154                 for(auto t = dict
1155                                 ? av_dict_get(
1156                                         dict,
1157                                         "",
1158                                         nullptr,
1159                                         AV_DICT_IGNORE_SUFFIX)
1160                                 : nullptr;
1161                         t;
1162                         t = av_dict_get(
1163                                 dict,
1164                                 "",
1165                                 t,
1166                                 AV_DICT_IGNORE_SUFFIX))
1167                 {
1168                         result[t->key] = t->value;
1169                 }
1170
1171                 return result;
1172         }
1173 };
1174
1175 int crc16(const std::string& str)
1176 {
1177         boost::crc_16_type result;
1178
1179         result.process_bytes(str.data(), str.length());
1180
1181         return result.checksum();
1182 }
1183
1184 struct ffmpeg_consumer_proxy : public core::frame_consumer
1185 {
1186         const std::string                                       path_;
1187         const std::string                                       options_;
1188         const bool                                                      separate_key_;
1189         const bool                                                      compatibility_mode_;
1190         int                                                                     consumer_index_offset_;
1191
1192         std::unique_ptr<ffmpeg_consumer>        consumer_;
1193         std::unique_ptr<ffmpeg_consumer>        key_only_consumer_;
1194
1195 public:
1196
1197         ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode)
1198                 : path_(path)
1199                 , options_(options)
1200                 , separate_key_(separate_key)
1201                 , compatibility_mode_(compatibility_mode)
1202                 , consumer_index_offset_(crc16(path))
1203         {
1204         }
1205
1206         void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
1207         {
1208                 if (consumer_)
1209                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
1210
1211                 consumer_.reset(new ffmpeg_consumer(path_, options_));
1212                 consumer_->initialize(format_desc, channel_layout);
1213
1214                 if (separate_key_)
1215                 {
1216                         boost::filesystem::path fill_file(path_);
1217                         auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
1218                         auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
1219
1220                         key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_));
1221                         key_only_consumer_->initialize(format_desc, channel_layout);
1222                 }
1223         }
1224
1225         int64_t presentation_frame_age_millis() const override
1226         {
1227                 return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
1228         }
1229
1230         std::future<bool> send(core::const_frame frame) override
1231         {
1232                 bool ready_for_frame = consumer_->ready_for_frame();
1233
1234                 if (ready_for_frame && separate_key_)
1235                         ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
1236
1237                 if (ready_for_frame)
1238                 {
1239                         consumer_->send(frame);
1240
1241                         if (separate_key_)
1242                                 key_only_consumer_->send(frame.key_only());
1243                 }
1244                 else
1245                 {
1246                         consumer_->mark_dropped();
1247
1248                         if (separate_key_)
1249                                 key_only_consumer_->mark_dropped();
1250                 }
1251
1252                 return make_ready_future(true);
1253         }
1254
1255         std::wstring print() const override
1256         {
1257                 return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
1258         }
1259
1260         std::wstring name() const override
1261         {
1262                 return L"ffmpeg";
1263         }
1264
1265         boost::property_tree::wptree info() const override
1266         {
1267                 boost::property_tree::wptree info;
1268                 info.add(L"type", L"ffmpeg");
1269                 info.add(L"path", u16(path_));
1270                 info.add(L"separate_key", separate_key_);
1271                 return info;
1272         }
1273
1274         bool has_synchronization_clock() const override
1275         {
1276                 return false;
1277         }
1278
1279         int buffer_depth() const override
1280         {
1281                 return -1;
1282         }
1283
1284         int index() const override
1285         {
1286                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
1287         }
1288
1289         core::monitor::subject& monitor_output() override
1290         {
1291                 return consumer_->monitor_output();
1292         }
1293 };
1294
1295 }
1296
1297 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1298 {
1299         sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
1300         sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1301         sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
1302         sink.definitions()
1303                 ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
1304                 ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
1305                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1306         sink.para()->text(L"Examples:");
1307         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
1308         sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
1309         sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
1310         sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
1311         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
1312         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
1313                 L"for streaming over UDP instead of creating a local file.");
1314 }
1315
1316 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1317                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1318 {
1319         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1320                 return core::frame_consumer::empty();
1321
1322         auto params2                    = params;
1323         auto separate_key_it    = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
1324         bool separate_key               = false;
1325
1326         if (separate_key_it != params2.end())
1327         {
1328                 separate_key = true;
1329                 params2.erase(separate_key_it);
1330         }
1331
1332         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1333         auto path                               = u8(params2.size() > 1 ? params2.at(1) : L"");
1334         auto args                               = u8(boost::join(params2, L" "));
1335
1336         return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, compatibility_mode);
1337 }
1338
1339 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1340                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1341 {
1342         return spl::make_shared<ffmpeg_consumer_proxy>(
1343                         u8(ptree_get<std::wstring>(ptree, L"path")),
1344                         u8(ptree.get<std::wstring>(L"args", L"")),
1345                         ptree.get<bool>(L"separate-key", false),
1346                         false);
1347 }
1348
1349 }}