]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/ffmpeg_consumer.cpp
723805dec7ecfea7278ffab27e8c2e5a78598756
[casparcg] / modules / ffmpeg / consumer / ffmpeg_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7 #include "../producer/filter/filter.h"
8 #include "../producer/filter/audio_filter.h"
9
10 #include <common/except.h>
11 #include <common/executor.h>
12 #include <common/assert.h>
13 #include <common/utf.h>
14 #include <common/future.h>
15 #include <common/diagnostics/graph.h>
16 #include <common/env.h>
17 #include <common/scope_exit.h>
18 #include <common/ptree.h>
19 #include <common/param.h>
20 #include <common/semaphore.h>
21
22 #include <core/consumer/frame_consumer.h>
23 #include <core/frame/frame.h>
24 #include <core/frame/audio_channel_layout.h>
25 #include <core/video_format.h>
26 #include <core/monitor/monitor.h>
27 #include <core/help/help_repository.h>
28 #include <core/help/help_sink.h>
29
30 #include <boost/noncopyable.hpp>
31 #include <boost/rational.hpp>
32 #include <boost/format.hpp>
33 #include <boost/algorithm/string/predicate.hpp>
34 #include <boost/property_tree/ptree.hpp>
35
36 #pragma warning(push)
37 #pragma warning(disable: 4244)
38 #pragma warning(disable: 4245)
39 #include <boost/crc.hpp>
40 #pragma warning(pop)
41
42 #include <tbb/atomic.h>
43 #include <tbb/concurrent_queue.h>
44 #include <tbb/parallel_invoke.h>
45 #include <tbb/parallel_for.h>
46
47 #include <numeric>
48 #include <thread>
49
50 #pragma warning(push)
51 #pragma warning(disable: 4244)
52
53 extern "C"
54 {
55         #define __STDC_CONSTANT_MACROS
56         #define __STDC_LIMIT_MACROS
57         #include <libavformat/avformat.h>
58         #include <libavcodec/avcodec.h>
59         #include <libavutil/avutil.h>
60         #include <libavutil/frame.h>
61         #include <libavutil/opt.h>
62         #include <libavutil/imgutils.h>
63         #include <libavutil/parseutils.h>
64         #include <libavfilter/avfilter.h>
65         #include <libavfilter/buffersink.h>
66         #include <libavfilter/buffersrc.h>
67 }
68
69 #pragma warning(pop)
70
71 namespace caspar { namespace ffmpeg {
72
73 void set_pixel_format(AVFilterContext* sink, AVPixelFormat pix_fmt)
74 {
75 #pragma warning (push)
76 #pragma warning (disable : 4245)
77
78         FF(av_opt_set_int_list(
79                 sink,
80                 "pix_fmts",
81                 std::vector<AVPixelFormat>({ pix_fmt, AVPixelFormat::AV_PIX_FMT_NONE }).data(),
82                 -1,
83                 AV_OPT_SEARCH_CHILDREN));
84
85 #pragma warning (pop)
86 }
87
88 void adjust_video_filter(const AVCodec& codec, const core::video_format_desc& in_format, AVFilterContext* sink, std::string& filter)
89 {
90         switch (codec.id)
91         {
92         case AV_CODEC_ID_DVVIDEO:
93                 // Crop
94                 if (in_format.format == core::video_format::ntsc)
95                         filter = u8(append_filter(u16(filter), L"crop=720:480:0:2"));
96
97                 // Pixel format selection
98                 if (in_format.format == core::video_format::ntsc)
99                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV411P);
100                 else if (in_format.format == core::video_format::pal)
101                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV420P);
102                 else
103                         set_pixel_format(sink, AVPixelFormat::AV_PIX_FMT_YUV422P);
104
105                 // Scale
106                 if (in_format.height == 1080)
107                         filter = u8(append_filter(u16(filter), in_format.duration == 1001
108                                 ? L"scale=1280:1080"
109                                 : L"scale=1440:1080"));
110                 else if (in_format.height == 720)
111                         filter = u8(append_filter(u16(filter), L"scale=960:720"));
112
113                 break;
114         }
115 }
116
117 void setup_codec_defaults(AVCodecContext& encoder)
118 {
119         static const int MEGABIT = 1000000;
120
121         switch (encoder.codec_id)
122         {
123         case AV_CODEC_ID_DNXHD:
124                 encoder.bit_rate = 220 * MEGABIT;
125
126                 break;
127         case AV_CODEC_ID_PRORES:
128                 encoder.bit_rate = encoder.width < 1280
129                                 ?  63 * MEGABIT
130                                 : 220 * MEGABIT;
131
132                 break;
133         case AV_CODEC_ID_H264:
134                 av_opt_set(encoder.priv_data,   "preset",       "ultrafast",    0);
135                 av_opt_set(encoder.priv_data,   "tune",         "fastdecode",   0);
136                 av_opt_set(encoder.priv_data,   "crf",          "5",                    0);
137
138                 break;
139         }
140 }
141
142 bool is_pcm_s24le_not_supported(const AVFormatContext& container)
143 {
144         auto name = std::string(container.oformat->name);
145
146         if (name == "mp4" || name == "dv")
147                 return true;
148
149         return false;
150 }
151
152 template<typename Out, typename In>
153 std::vector<Out> from_terminated_array(const In* array, In terminator)
154 {
155         std::vector<Out> result;
156
157         while (array != nullptr && *array != terminator)
158         {
159                 In val          = *array;
160                 Out casted      = static_cast<Out>(val);
161
162                 result.push_back(casted);
163
164                 ++array;
165         }
166
167         return result;
168 }
169
170 class ffmpeg_consumer
171 {
172 private:
173         const spl::shared_ptr<diagnostics::graph>       graph_;
174         core::monitor::subject                                          subject_;
175         std::string                                                                     path_;
176         boost::filesystem::path                                         full_path_;
177
178         std::map<std::string, std::string>                      options_;
179         bool                                                                            mono_streams_;
180
181         core::video_format_desc                                         in_video_format_;
182         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
183
184         std::shared_ptr<AVFormatContext>                        oc_;
185         tbb::atomic<bool>                                                       abort_request_;
186
187         std::shared_ptr<AVStream>                                       video_st_;
188         std::vector<std::shared_ptr<AVStream>>          audio_sts_;
189
190         std::int64_t                                                            video_pts_                                      = 0;
191         std::int64_t                                                            audio_pts_                                      = 0;
192
193         std::unique_ptr<audio_filter>                           audio_filter_;
194
195         // TODO: make use of already existent avfilter abstraction for video also
196     AVFilterContext*                                                    video_graph_in_;
197     AVFilterContext*                                                    video_graph_out_;
198     std::shared_ptr<AVFilterGraph>                              video_graph_;
199
200         executor                                                                        video_encoder_executor_;
201         executor                                                                        audio_encoder_executor_;
202
203         semaphore                                                                       tokens_                                         { 0 };
204
205         tbb::atomic<int64_t>                                            current_encoding_delay_;
206
207         executor                                                                        write_executor_;
208
209 public:
210
211         ffmpeg_consumer(
212                         std::string path,
213                         std::string options,
214                         bool mono_streams)
215                 : path_(path)
216                 , full_path_(path)
217                 , mono_streams_(mono_streams)
218                 , audio_encoder_executor_(print() + L" audio_encoder")
219                 , video_encoder_executor_(print() + L" video_encoder")
220                 , write_executor_(print() + L" io")
221         {
222                 abort_request_ = false;
223                 current_encoding_delay_ = 0;
224
225                 for(auto it =
226                                 boost::sregex_iterator(
227                                         options.begin(),
228                                         options.end(),
229                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
230                         it != boost::sregex_iterator();
231                         ++it)
232                 {
233                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
234                 }
235
236         if (options_.find("threads") == options_.end())
237             options_["threads"] = "auto";
238
239                 tokens_.release(
240                         std::max(
241                                 1,
242                                 try_remove_arg<int>(
243                                         options_,
244                                         boost::regex("tokens")).get_value_or(2)));
245         }
246
247         ~ffmpeg_consumer()
248         {
249                 if(oc_)
250                 {
251                         try
252                         {
253                                 video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
254                                 audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
255
256                                 video_encoder_executor_.stop();
257                                 audio_encoder_executor_.stop();
258                                 video_encoder_executor_.join();
259                                 audio_encoder_executor_.join();
260
261                                 video_graph_.reset();
262                                 audio_filter_.reset();
263                                 video_st_.reset();
264                                 audio_sts_.clear();
265
266                                 write_packet(nullptr, nullptr);
267
268                                 write_executor_.stop();
269                                 write_executor_.join();
270
271                                 FF(av_write_trailer(oc_.get()));
272
273                                 if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
274                                         avio_close(oc_->pb);
275
276                                 oc_.reset();
277                         }
278                         catch (...)
279                         {
280                                 CASPAR_LOG_CURRENT_EXCEPTION();
281                         }
282                 }
283         }
284
285         void initialize(
286                         const core::video_format_desc& format_desc,
287                         const core::audio_channel_layout& channel_layout)
288         {
289                 try
290                 {
291                         static boost::regex prot_exp("^.+:.*" );
292
293                         if(!boost::regex_match(
294                                         path_,
295                                         prot_exp))
296                         {
297                                 if(!full_path_.is_complete())
298                                 {
299                                         full_path_ =
300                                                 u8(
301                                                         env::media_folder()) +
302                                                         path_;
303                                 }
304
305                                 if(boost::filesystem::exists(full_path_))
306                                         boost::filesystem::remove(full_path_);
307
308                                 boost::filesystem::create_directories(full_path_.parent_path());
309                         }
310
311                         graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
312                         graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
313                         graph_->set_text(print());
314                         diagnostics::register_graph(graph_);
315
316                         const auto oformat_name =
317                                 try_remove_arg<std::string>(
318                                         options_,
319                                         boost::regex("^f|format$"));
320
321                         AVFormatContext* oc;
322
323                         FF(avformat_alloc_output_context2(
324                                 &oc,
325                                 nullptr,
326                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
327                                 full_path_.string().c_str()));
328
329                         oc_.reset(
330                                 oc,
331                                 avformat_free_context);
332
333                         CASPAR_VERIFY(oc_->oformat);
334
335                         oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
336                         oc_->interrupt_callback.opaque   = this;
337
338                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
339
340                         in_video_format_ = format_desc;
341                         in_channel_layout_ = channel_layout;
342
343                         CASPAR_VERIFY(oc_->oformat);
344
345                         const auto video_codec_name =
346                                 try_remove_arg<std::string>(
347                                         options_,
348                                         boost::regex("^c:v|codec:v|vcodec$"));
349
350                         const auto video_codec =
351                                 video_codec_name
352                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
353                                         : avcodec_find_encoder(oc_->oformat->video_codec);
354
355                         const auto audio_codec_name =
356                                 try_remove_arg<std::string>(
357                                         options_,
358                                          boost::regex("^c:a|codec:a|acodec$"));
359
360                         const auto audio_codec =
361                                 audio_codec_name
362                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
363                                         : (is_pcm_s24le_not_supported(*oc_)
364                                                 ? avcodec_find_encoder(oc_->oformat->audio_codec)
365                                                 : avcodec_find_encoder_by_name("pcm_s24le"));
366
367                         if (!video_codec)
368                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
369                                                 "Failed to find video codec " + (video_codec_name
370                                                                 ? *video_codec_name
371                                                                 : "with id " + std::to_string(
372                                                                                 oc_->oformat->video_codec))));
373                         if (!audio_codec)
374                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
375                                                 "Failed to find audio codec " + (audio_codec_name
376                                                                 ? *audio_codec_name
377                                                                 : "with id " + std::to_string(
378                                                                                 oc_->oformat->audio_codec))));
379
380                         // Filters
381
382                         {
383                                 configure_video_filters(
384                                         *video_codec,
385                                         try_remove_arg<std::string>(options_, boost::regex("vf|f:v|filter:v"))
386                                                         .get_value_or(""),
387                                         try_remove_arg<std::string>(options_, boost::regex("pix_fmt")));
388
389                                 configure_audio_filters(
390                                         *audio_codec,
391                                         try_remove_arg<std::string>(options_,
392                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
393                         }
394
395                         // Encoders
396
397                         {
398                                 auto video_options = options_;
399                                 auto audio_options = options_;
400
401                                 video_st_ = open_encoder(
402                                         *video_codec,
403                                         video_options,
404                                         0);
405
406                                 for (int i = 0; i < audio_filter_->get_num_output_pads(); ++i)
407                                         audio_sts_.push_back(open_encoder(
408                                                         *audio_codec,
409                                                         audio_options,
410                                                         i));
411
412                                 auto it = options_.begin();
413                                 while(it != options_.end())
414                                 {
415                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
416                                                 it = options_.erase(it);
417                                         else
418                                                 ++it;
419                                 }
420                         }
421
422                         // Output
423                         {
424                                 AVDictionary* av_opts = nullptr;
425
426                                 to_dict(
427                                         &av_opts,
428                                         std::move(options_));
429
430                                 CASPAR_SCOPE_EXIT
431                                 {
432                                         av_dict_free(&av_opts);
433                                 };
434
435                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
436                                 {
437                                         FF(avio_open2(
438                                                 &oc_->pb,
439                                                 full_path_.string().c_str(),
440                                                 AVIO_FLAG_WRITE,
441                                                 &oc_->interrupt_callback,
442                                                 &av_opts));
443                                 }
444
445                                 FF(avformat_write_header(
446                                         oc_.get(),
447                                         &av_opts));
448
449                                 options_ = to_map(av_opts);
450                         }
451
452                         // Dump Info
453
454                         av_dump_format(
455                                 oc_.get(),
456                                 0,
457                                 oc_->filename,
458                                 1);
459
460                         for (const auto& option : options_)
461                         {
462                                 CASPAR_LOG(warning)
463                                         << L"Invalid option: -"
464                                         << u16(option.first)
465                                         << L" "
466                                         << u16(option.second);
467                         }
468                 }
469                 catch(...)
470                 {
471                         video_st_.reset();
472                         audio_sts_.clear();
473                         oc_.reset();
474                         throw;
475                 }
476         }
477
478         core::monitor::subject& monitor_output()
479         {
480                 return subject_;
481         }
482
483         void send(core::const_frame frame)
484         {
485                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
486
487                 auto frame_timer = spl::make_shared<caspar::timer>();
488
489                 std::shared_ptr<void> token(
490                         nullptr,
491                         [this, frame, frame_timer](void*)
492                         {
493                                 tokens_.release();
494                                 current_encoding_delay_ = frame.get_age_millis();
495                                 graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
496                         });
497                 tokens_.acquire();
498
499                 video_encoder_executor_.begin_invoke([=]() mutable
500                 {
501                         encode_video(
502                                 frame,
503                                 token);
504                 });
505
506                 audio_encoder_executor_.begin_invoke([=]() mutable
507                 {
508                         encode_audio(
509                                 frame,
510                                 token);
511                 });
512         }
513
514         bool ready_for_frame() const
515         {
516                 return tokens_.permits() > 0;
517         }
518
519         void mark_dropped()
520         {
521                 graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
522         }
523
524         std::wstring print() const
525         {
526                 return L"ffmpeg_consumer[" + u16(path_) + L"]";
527         }
528
529         int64_t presentation_frame_age_millis() const
530         {
531                 return current_encoding_delay_;
532         }
533
534 private:
535
536         static int interrupt_cb(void* ctx)
537         {
538                 CASPAR_ASSERT(ctx);
539                 return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
540         }
541
542         std::shared_ptr<AVStream> open_encoder(
543                         const AVCodec& codec,
544                         std::map<std::string,
545                         std::string>& options,
546                         int stream_number_for_media_type)
547         {
548                 auto st =
549                         avformat_new_stream(
550                                 oc_.get(),
551                                 &codec);
552
553                 if (!st)
554                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("avformat_new_stream"));
555
556                 auto enc = st->codec;
557
558                 CASPAR_VERIFY(enc);
559
560                 switch(enc->codec_type)
561                 {
562                         case AVMEDIA_TYPE_VIDEO:
563                         {
564                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
565                                 enc->pix_fmt                                    = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
566                                 enc->sample_aspect_ratio                = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
567                                 enc->width                                      = video_graph_out_->inputs[0]->w;
568                                 enc->height                                     = video_graph_out_->inputs[0]->h;
569                                 enc->bit_rate_tolerance         = 400 * 1000000;
570
571                                 break;
572                         }
573                         case AVMEDIA_TYPE_AUDIO:
574                         {
575                                 enc->time_base                          = audio_filter_->get_output_pad_info(stream_number_for_media_type).time_base;
576                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_filter_->get_output_pad_info(stream_number_for_media_type).format);
577                                 enc->sample_rate                                = audio_filter_->get_output_pad_info(stream_number_for_media_type).sample_rate;
578                                 enc->channel_layout                     = audio_filter_->get_output_pad_info(stream_number_for_media_type).channel_layout;
579                                 enc->channels                           = audio_filter_->get_output_pad_info(stream_number_for_media_type).channels;
580
581                                 break;
582                         }
583                 }
584
585                 setup_codec_defaults(*enc);
586
587                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
588                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
589
590                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
591
592                 const auto char_id = char_id_map.at(enc->codec_type);
593
594                 const auto codec_opts =
595                         remove_options(
596                                 options,
597                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
598
599                 AVDictionary* av_codec_opts = nullptr;
600
601                 to_dict(
602                         &av_codec_opts,
603                         options);
604
605                 to_dict(
606                         &av_codec_opts,
607                         codec_opts);
608
609                 options.clear();
610
611                 FF(avcodec_open2(
612                         enc,
613                         &codec,
614                         av_codec_opts ? &av_codec_opts : nullptr));
615
616                 if(av_codec_opts)
617                 {
618                         auto t =
619                                 av_dict_get(
620                                         av_codec_opts,
621                                         "",
622                                          nullptr,
623                                         AV_DICT_IGNORE_SUFFIX);
624
625                         while(t)
626                         {
627                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
628
629                                 t = av_dict_get(
630                                                 av_codec_opts,
631                                                 "",
632                                                 t,
633                                                 AV_DICT_IGNORE_SUFFIX);
634                         }
635
636                         av_dict_free(&av_codec_opts);
637                 }
638
639                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
640                 {
641                         CASPAR_ASSERT(enc->frame_size > 0);
642                         audio_filter_->set_guaranteed_output_num_samples_per_frame(
643                                         stream_number_for_media_type,
644                                         enc->frame_size);
645                 }
646
647                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
648                 {
649                         avcodec_close(st->codec);
650                 });
651         }
652
653         void configure_video_filters(
654                         const AVCodec& codec,
655                         std::string filtergraph,
656                         const boost::optional<std::string>& preferred_pix_fmt)
657         {
658                 video_graph_.reset(
659                                 avfilter_graph_alloc(),
660                                 [](AVFilterGraph* p)
661                                 {
662                                         avfilter_graph_free(&p);
663                                 });
664
665                 video_graph_->nb_threads  = std::thread::hardware_concurrency()/2;
666                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
667
668                 const auto sample_aspect_ratio =
669                         boost::rational<int>(
670                                         in_video_format_.square_width,
671                                         in_video_format_.square_height) /
672                         boost::rational<int>(
673                                         in_video_format_.width,
674                                         in_video_format_.height);
675
676                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
677                         % in_video_format_.width % in_video_format_.height
678                         % AVPixelFormat::AV_PIX_FMT_BGRA
679                         % in_video_format_.duration     % in_video_format_.time_scale
680                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
681                         % in_video_format_.time_scale % in_video_format_.duration).str();
682
683                 AVFilterContext* filt_vsrc = nullptr;
684                 FF(avfilter_graph_create_filter(
685                                 &filt_vsrc,
686                                 avfilter_get_by_name("buffer"),
687                                 "ffmpeg_consumer_buffer",
688                                 vsrc_options.c_str(),
689                                 nullptr,
690                                 video_graph_.get()));
691
692                 AVFilterContext* filt_vsink = nullptr;
693                 FF(avfilter_graph_create_filter(
694                                 &filt_vsink,
695                                 avfilter_get_by_name("buffersink"),
696                                 "ffmpeg_consumer_buffersink",
697                                 nullptr,
698                                 nullptr,
699                                 video_graph_.get()));
700
701 #pragma warning (push)
702 #pragma warning (disable : 4245)
703
704                 FF(av_opt_set_int_list(
705                         filt_vsink,
706                         "pix_fmts",
707                         codec.pix_fmts,
708                         -1,
709                         AV_OPT_SEARCH_CHILDREN));
710
711
712 #pragma warning (pop)
713
714                 adjust_video_filter(codec, in_video_format_, filt_vsink, filtergraph);
715
716                 if (preferred_pix_fmt)
717                 {
718                         auto requested_fmt      = av_get_pix_fmt(preferred_pix_fmt->c_str());
719                         auto valid_fmts         = from_terminated_array<AVPixelFormat>(codec.pix_fmts, AVPixelFormat::AV_PIX_FMT_NONE);
720
721                         if (!cpplinq::from(valid_fmts).contains(requested_fmt))
722                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(*preferred_pix_fmt + " is not supported by codec."));
723
724                         set_pixel_format(filt_vsink, requested_fmt);
725                 }
726
727                 if (in_video_format_.width < 1280)
728                         video_graph_->scale_sws_opts = "out_color_matrix=bt601";
729                 else
730                         video_graph_->scale_sws_opts = "out_color_matrix=bt709";
731
732                 configure_filtergraph(
733                                 *video_graph_,
734                                 filtergraph,
735                                 *filt_vsrc,
736                                 *filt_vsink);
737
738                 video_graph_in_  = filt_vsrc;
739                 video_graph_out_ = filt_vsink;
740
741                 CASPAR_LOG(info)
742                         <<      u16(std::string("\n")
743                                 + avfilter_graph_dump(
744                                                 video_graph_.get(),
745                                                 nullptr));
746         }
747
748         void configure_audio_filters(
749                         const AVCodec& codec,
750                         std::string filtergraph)
751         {
752                 int num_output_pads = 1;
753
754                 if (mono_streams_)
755                 {
756                         num_output_pads = in_channel_layout_.num_channels;
757                 }
758
759                 if (num_output_pads > 1)
760                 {
761                         std::string splitfilter = "[a:0]channelsplit=channel_layout=";
762
763                         splitfilter += (boost::format("0x%|1$x|") % create_channel_layout_bitmask(in_channel_layout_.num_channels)).str();
764
765                         for (int i = 0; i < num_output_pads; ++i)
766                                 splitfilter += "[aout:" + std::to_string(i) + "]";
767
768                         filtergraph = u8(append_filter(u16(filtergraph), u16(splitfilter)));
769                 }
770
771                 std::vector<audio_output_pad> output_pads(
772                                 num_output_pads,
773                                 audio_output_pad(
774                                                 from_terminated_array<int>(                             codec.supported_samplerates,    0),
775                                                 from_terminated_array<AVSampleFormat>(  codec.sample_fmts,                              AVSampleFormat::AV_SAMPLE_FMT_NONE),
776                                                 from_terminated_array<uint64_t>(                codec.channel_layouts,                  static_cast<uint64_t>(0))));
777
778                 audio_filter_.reset(new audio_filter(
779                                 { audio_input_pad(
780                                                 boost::rational<int>(1, in_video_format_.audio_sample_rate),
781                                                 in_video_format_.audio_sample_rate,
782                                                 AVSampleFormat::AV_SAMPLE_FMT_S32,
783                                                 create_channel_layout_bitmask(in_channel_layout_.num_channels)) },
784                                                 output_pads,
785                                                 filtergraph));
786         }
787
788         void configure_filtergraph(
789                         AVFilterGraph& graph,
790                         const std::string& filtergraph,
791                         AVFilterContext& source_ctx,
792                         AVFilterContext& sink_ctx)
793         {
794                 AVFilterInOut* outputs = nullptr;
795                 AVFilterInOut* inputs = nullptr;
796
797                 if(!filtergraph.empty())
798                 {
799                         outputs = avfilter_inout_alloc();
800                         inputs  = avfilter_inout_alloc();
801
802                         try
803                         {
804                                 CASPAR_VERIFY(outputs && inputs);
805
806                                 outputs->name           = av_strdup("in");
807                                 outputs->filter_ctx     = &source_ctx;
808                                 outputs->pad_idx                = 0;
809                                 outputs->next           = nullptr;
810
811                                 inputs->name                    = av_strdup("out");
812                                 inputs->filter_ctx      = &sink_ctx;
813                                 inputs->pad_idx         = 0;
814                                 inputs->next                    = nullptr;
815                         }
816                         catch (...)
817                         {
818                                 avfilter_inout_free(&outputs);
819                                 avfilter_inout_free(&inputs);
820                                 throw;
821                         }
822
823                         FF(avfilter_graph_parse(
824                                         &graph,
825                                         filtergraph.c_str(),
826                                         inputs,
827                                         outputs,
828                                         nullptr));
829                 }
830                 else
831                 {
832                         FF(avfilter_link(
833                                         &source_ctx,
834                                         0,
835                                         &sink_ctx,
836                                         0));
837                 }
838
839                 FF(avfilter_graph_config(
840                                 &graph,
841                                 nullptr));
842         }
843
844         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
845         {
846                 if(!video_st_)
847                         return;
848
849                 auto enc = video_st_->codec;
850
851                 if(frame_ptr != core::const_frame::empty())
852                 {
853                         auto src_av_frame = create_frame();
854
855                         const auto sample_aspect_ratio =
856                                 boost::rational<int>(
857                                         in_video_format_.square_width,
858                                         in_video_format_.square_height) /
859                                 boost::rational<int>(
860                                         in_video_format_.width,
861                                         in_video_format_.height);
862
863                         src_av_frame->format                                            = AVPixelFormat::AV_PIX_FMT_BGRA;
864                         src_av_frame->width                                             = in_video_format_.width;
865                         src_av_frame->height                                            = in_video_format_.height;
866                         src_av_frame->sample_aspect_ratio.num   = sample_aspect_ratio.numerator();
867                         src_av_frame->sample_aspect_ratio.den   = sample_aspect_ratio.denominator();
868                         src_av_frame->pts                                               = video_pts_;
869                         src_av_frame->interlaced_frame                  = in_video_format_.field_mode != core::field_mode::progressive;
870                         src_av_frame->top_field_first                   = (in_video_format_.field_mode & core::field_mode::upper) == core::field_mode::upper ? 1 : 0;
871
872                         video_pts_ += 1;
873
874                         subject_
875                                         << core::monitor::message("/frame")     % video_pts_
876                                         << core::monitor::message("/path")      % path_
877                                         << core::monitor::message("/fps")       % in_video_format_.fps;
878
879                         FF(av_image_fill_arrays(
880                                 src_av_frame->data,
881                                 src_av_frame->linesize,
882                                 frame_ptr.image_data().begin(),
883                                 static_cast<AVPixelFormat>(src_av_frame->format),
884                                 in_video_format_.width,
885                                 in_video_format_.height,
886                                 1));
887
888                         FF(av_buffersrc_add_frame(
889                                 video_graph_in_,
890                                 src_av_frame.get()));
891                 }
892
893                 int ret = 0;
894
895                 while(ret >= 0)
896                 {
897                         auto filt_frame = create_frame();
898
899                         ret = av_buffersink_get_frame(
900                                 video_graph_out_,
901                                 filt_frame.get());
902
903                         video_encoder_executor_.begin_invoke([=]
904                         {
905                                 if(ret == AVERROR_EOF)
906                                 {
907                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
908                                         {
909                                                 while(encode_av_frame(
910                                                                 *video_st_,
911                                                                 avcodec_encode_video2,
912                                                                 nullptr, token))
913                                                 {
914                                                         boost::this_thread::yield(); // TODO:
915                                                 }
916                                         }
917                                 }
918                                 else if(ret != AVERROR(EAGAIN))
919                                 {
920                                         FF_RET(ret, "av_buffersink_get_frame");
921
922                                         if (filt_frame->interlaced_frame)
923                                         {
924                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
925                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
926                                                 else
927                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
928                                         }
929                                         else
930                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
931
932                                         filt_frame->quality = enc->global_quality;
933
934                                         if (!enc->me_threshold)
935                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
936
937                                         encode_av_frame(
938                                                 *video_st_,
939                                                 avcodec_encode_video2,
940                                                 filt_frame,
941                                                 token);
942
943                                         boost::this_thread::yield(); // TODO:
944                                 }
945                         });
946                 }
947         }
948
949         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
950         {
951                 if(audio_sts_.empty())
952                         return;
953
954                 if(frame_ptr != core::const_frame::empty())
955                 {
956                         auto src_av_frame = create_frame();
957
958                         src_av_frame->channels                  = in_channel_layout_.num_channels;
959                         src_av_frame->channel_layout            = create_channel_layout_bitmask(in_channel_layout_.num_channels);
960                         src_av_frame->sample_rate               = in_video_format_.audio_sample_rate;
961                         src_av_frame->nb_samples                        = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
962                         src_av_frame->format                            = AV_SAMPLE_FMT_S32;
963                         src_av_frame->pts                               = audio_pts_;
964
965                         audio_pts_ += src_av_frame->nb_samples;
966
967                         FF(av_samples_fill_arrays(
968                                         src_av_frame->extended_data,
969                                         src_av_frame->linesize,
970                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
971                                         src_av_frame->channels,
972                                         src_av_frame->nb_samples,
973                                         static_cast<AVSampleFormat>(src_av_frame->format),
974                                         16));
975
976                         audio_filter_->push(0, src_av_frame);
977                 }
978
979                 for (int pad_id = 0; pad_id < audio_filter_->get_num_output_pads(); ++pad_id)
980                 {
981                         for (auto filt_frame : audio_filter_->poll_all(pad_id))
982                         {
983                                 audio_encoder_executor_.begin_invoke([=]
984                                 {
985                                         encode_av_frame(
986                                                         *audio_sts_.at(pad_id),
987                                                         avcodec_encode_audio2,
988                                                         filt_frame,
989                                                         token);
990
991                                         std::this_thread::yield(); // TODO:
992                                 });
993                         }
994                 }
995
996                 bool eof = frame_ptr == core::const_frame::empty();
997
998                 if (eof)
999                 {
1000                         audio_encoder_executor_.begin_invoke([=]
1001                         {
1002                                 for (int pad_id = 0; pad_id < audio_filter_->get_num_output_pads(); ++pad_id)
1003                                 {
1004                                         auto enc = audio_sts_.at(pad_id)->codec;
1005
1006                                         if (enc->codec->capabilities & CODEC_CAP_DELAY)
1007                                         {
1008                                                 while (encode_av_frame(
1009                                                                 *audio_sts_.at(pad_id),
1010                                                                 avcodec_encode_audio2,
1011                                                                 nullptr,
1012                                                                 token))
1013                                                 {
1014                                                         std::this_thread::yield(); // TODO:
1015                                                 }
1016                                         }
1017                                 }
1018                         });
1019                 }
1020         }
1021
1022         template<typename F>
1023         bool encode_av_frame(
1024                         AVStream& st,
1025                         const F& func,
1026                         const std::shared_ptr<AVFrame>& src_av_frame,
1027                         std::shared_ptr<void> token)
1028         {
1029                 AVPacket pkt = {};
1030                 av_init_packet(&pkt);
1031
1032                 int got_packet = 0;
1033
1034                 FF(func(
1035                         st.codec,
1036                         &pkt,
1037                         src_av_frame.get(),
1038                         &got_packet));
1039
1040                 if(!got_packet || pkt.size <= 0)
1041                         return false;
1042
1043                 pkt.stream_index = st.index;
1044
1045                 if (pkt.pts != AV_NOPTS_VALUE)
1046                 {
1047                         pkt.pts =
1048                                 av_rescale_q(
1049                                         pkt.pts,
1050                                         st.codec->time_base,
1051                                         st.time_base);
1052                 }
1053
1054                 if (pkt.dts != AV_NOPTS_VALUE)
1055                 {
1056                         pkt.dts =
1057                                 av_rescale_q(
1058                                         pkt.dts,
1059                                         st.codec->time_base,
1060                                         st.time_base);
1061                 }
1062
1063                 pkt.duration =
1064                         static_cast<int>(
1065                                 av_rescale_q(
1066                                         pkt.duration,
1067                                         st.codec->time_base, st.time_base));
1068
1069                 write_packet(
1070                         std::shared_ptr<AVPacket>(
1071                                 new AVPacket(pkt),
1072                                 [](AVPacket* p)
1073                                 {
1074                                         av_free_packet(p);
1075                                         delete p;
1076                                 }), token);
1077
1078                 return true;
1079         }
1080
1081         void write_packet(
1082                         const std::shared_ptr<AVPacket>& pkt_ptr,
1083                         std::shared_ptr<void> token)
1084         {
1085                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1086                 {
1087                         FF(av_interleaved_write_frame(
1088                                 oc_.get(),
1089                                 pkt_ptr.get()));
1090                 });
1091         }
1092
1093         template<typename T>
1094         static boost::optional<T> try_remove_arg(
1095                         std::map<std::string, std::string>& options,
1096                         const boost::regex& expr)
1097         {
1098                 for(auto it = options.begin(); it != options.end(); ++it)
1099                 {
1100                         if(boost::regex_search(it->first, expr))
1101                         {
1102                                 auto arg = it->second;
1103                                 options.erase(it);
1104                                 return boost::lexical_cast<T>(arg);
1105                         }
1106                 }
1107
1108                 return boost::optional<T>();
1109         }
1110
1111         static std::map<std::string, std::string> remove_options(
1112                         std::map<std::string, std::string>& options,
1113                         const boost::regex& expr)
1114         {
1115                 std::map<std::string, std::string> result;
1116
1117                 auto it = options.begin();
1118                 while(it != options.end())
1119                 {
1120                         boost::smatch what;
1121                         if(boost::regex_search(it->first, what, expr))
1122                         {
1123                                 result[
1124                                         what.size() > 0 && what[1].matched
1125                                                 ? what[1].str()
1126                                                 : it->first] = it->second;
1127                                 it = options.erase(it);
1128                         }
1129                         else
1130                                 ++it;
1131                 }
1132
1133                 return result;
1134         }
1135
1136         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1137         {
1138                 for (const auto& entry : c)
1139                 {
1140                         av_dict_set(
1141                                 dest,
1142                                 entry.first.c_str(),
1143                                 entry.second.c_str(), 0);
1144                 }
1145         }
1146
1147         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1148         {
1149                 std::map<std::string, std::string> result;
1150
1151                 for(auto t = dict
1152                                 ? av_dict_get(
1153                                         dict,
1154                                         "",
1155                                         nullptr,
1156                                         AV_DICT_IGNORE_SUFFIX)
1157                                 : nullptr;
1158                         t;
1159                         t = av_dict_get(
1160                                 dict,
1161                                 "",
1162                                 t,
1163                                 AV_DICT_IGNORE_SUFFIX))
1164                 {
1165                         result[t->key] = t->value;
1166                 }
1167
1168                 return result;
1169         }
1170 };
1171
1172 int crc16(const std::string& str)
1173 {
1174         boost::crc_16_type result;
1175
1176         result.process_bytes(str.data(), str.length());
1177
1178         return result.checksum();
1179 }
1180
1181 struct ffmpeg_consumer_proxy : public core::frame_consumer
1182 {
1183         const std::string                                       path_;
1184         const std::string                                       options_;
1185         const bool                                                      separate_key_;
1186         const bool                                                      mono_streams_;
1187         const bool                                                      compatibility_mode_;
1188         int                                                                     consumer_index_offset_;
1189
1190         std::unique_ptr<ffmpeg_consumer>        consumer_;
1191         std::unique_ptr<ffmpeg_consumer>        key_only_consumer_;
1192
1193 public:
1194
1195         ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool mono_streams, bool compatibility_mode)
1196                 : path_(path)
1197                 , options_(options)
1198                 , separate_key_(separate_key)
1199                 , mono_streams_(mono_streams)
1200                 , compatibility_mode_(compatibility_mode)
1201                 , consumer_index_offset_(crc16(path))
1202         {
1203         }
1204
1205         void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
1206         {
1207                 if (consumer_)
1208                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
1209
1210                 consumer_.reset(new ffmpeg_consumer(path_, options_, mono_streams_));
1211                 consumer_->initialize(format_desc, channel_layout);
1212
1213                 if (separate_key_)
1214                 {
1215                         boost::filesystem::path fill_file(path_);
1216                         auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
1217                         auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
1218
1219                         key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_, mono_streams_));
1220                         key_only_consumer_->initialize(format_desc, channel_layout);
1221                 }
1222         }
1223
1224         int64_t presentation_frame_age_millis() const override
1225         {
1226                 return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
1227         }
1228
1229         std::future<bool> send(core::const_frame frame) override
1230         {
1231                 bool ready_for_frame = consumer_->ready_for_frame();
1232
1233                 if (ready_for_frame && separate_key_)
1234                         ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
1235
1236                 if (ready_for_frame)
1237                 {
1238                         consumer_->send(frame);
1239
1240                         if (separate_key_)
1241                                 key_only_consumer_->send(frame.key_only());
1242                 }
1243                 else
1244                 {
1245                         consumer_->mark_dropped();
1246
1247                         if (separate_key_)
1248                                 key_only_consumer_->mark_dropped();
1249                 }
1250
1251                 return make_ready_future(true);
1252         }
1253
1254         std::wstring print() const override
1255         {
1256                 return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
1257         }
1258
1259         std::wstring name() const override
1260         {
1261                 return L"ffmpeg";
1262         }
1263
1264         boost::property_tree::wptree info() const override
1265         {
1266                 boost::property_tree::wptree info;
1267
1268                 info.add(L"type",                       L"ffmpeg");
1269                 info.add(L"path",                       u16(path_));
1270                 info.add(L"separate_key",       separate_key_);
1271                 info.add(L"mono_streams",       mono_streams_);
1272
1273                 return info;
1274         }
1275
1276         bool has_synchronization_clock() const override
1277         {
1278                 return false;
1279         }
1280
1281         int buffer_depth() const override
1282         {
1283                 return -1;
1284         }
1285
1286         int index() const override
1287         {
1288                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
1289         }
1290
1291         core::monitor::subject& monitor_output() override
1292         {
1293                 return consumer_->monitor_output();
1294         }
1295 };
1296
1297 void describe_ffmpeg_consumer(core::help_sink& sink, const core::help_repository& repo)
1298 {
1299         sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
1300         sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}} {[separate_key:SEPARATE_KEY]} {[mono_streams:MONO_STREAMS]}");
1301         sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
1302         sink.definitions()
1303                 ->item(L"filename",                     L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
1304                 ->item(L"url",                          L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
1305                 ->item(L"ffmpeg_paramX",                L"A parameter supported by FFmpeg. For example vcodec or acodec etc.")
1306                 ->item(L"separate_key",         L"If defined will create two files simultaneously -- One for fill and one for key (_A will be appended).")
1307                 ->item(L"mono_streams",         L"If defined every audio channel will be written to its own audio stream.");
1308         sink.para()->text(L"Examples:");
1309         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
1310         sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
1311         sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
1312         sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
1313         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
1314         sink.example(L">> ADD 1 FILE output.mxf -vcodec dnxhd MONO_STREAMS", L"for creating output.mxf with every audio channel encoded in its own mono stream.");
1315         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
1316                 L"for streaming over UDP instead of creating a local file.");
1317 }
1318
1319 spl::shared_ptr<core::frame_consumer> create_ffmpeg_consumer(
1320                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1321 {
1322         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1323                 return core::frame_consumer::empty();
1324
1325         auto params2                    = params;
1326         bool separate_key               = get_and_consume_flag(L"SEPARATE_KEY", params2);
1327         bool mono_streams               = get_and_consume_flag(L"MONO_STREAMS", params2);
1328         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1329         auto path                               = u8(params2.size() > 1 ? params2.at(1) : L"");
1330
1331         // remove FILE or STREAM
1332         params2.erase(params2.begin());
1333
1334         // remove path
1335         if (!path.empty())
1336                 params2.erase(params2.begin());
1337
1338         // join only the args
1339         auto args                               = u8(boost::join(params2, L" "));
1340
1341         return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, mono_streams, compatibility_mode);
1342 }
1343
1344 spl::shared_ptr<core::frame_consumer> create_preconfigured_ffmpeg_consumer(
1345                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1346 {
1347         return spl::make_shared<ffmpeg_consumer_proxy>(
1348                         u8(ptree_get<std::wstring>(ptree, L"path")),
1349                         u8(ptree.get<std::wstring>(L"args", L"")),
1350                         ptree.get<bool>(L"separate-key", false),
1351                         ptree.get<bool>(L"mono-streams", false),
1352                         false);
1353 }
1354
1355 }}