]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
[streaming_consumer] Started the process of retiring ffmpeg_consumer so that streamin...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7
8 #include <common/except.h>
9 #include <common/executor.h>
10 #include <common/assert.h>
11 #include <common/utf.h>
12 #include <common/future.h>
13 #include <common/diagnostics/graph.h>
14 #include <common/env.h>
15 #include <common/scope_exit.h>
16 #include <common/ptree.h>
17 #include <common/param.h>
18 #include <common/semaphore.h>
19
20 #include <core/consumer/frame_consumer.h>
21 #include <core/frame/frame.h>
22 #include <core/frame/audio_channel_layout.h>
23 #include <core/video_format.h>
24 #include <core/monitor/monitor.h>
25 #include <core/help/help_repository.h>
26 #include <core/help/help_sink.h>
27
28 #include <boost/noncopyable.hpp>
29 #include <boost/rational.hpp>
30 #include <boost/format.hpp>
31 #include <boost/algorithm/string/predicate.hpp>
32 #include <boost/property_tree/ptree.hpp>
33
34 #pragma warning(push)
35 #pragma warning(disable: 4244)
36 #pragma warning(disable: 4245)
37 #include <boost/crc.hpp>
38 #pragma warning(pop)
39
40 #include <tbb/atomic.h>
41 #include <tbb/concurrent_queue.h>
42 #include <tbb/parallel_invoke.h>
43 #include <tbb/parallel_for.h>
44
45 #include <numeric>
46
47 #pragma warning(push)
48 #pragma warning(disable: 4244)
49
50 extern "C"
51 {
52         #define __STDC_CONSTANT_MACROS
53         #define __STDC_LIMIT_MACROS
54         #include <libavformat/avformat.h>
55         #include <libavcodec/avcodec.h>
56         #include <libavutil/avutil.h>
57         #include <libavutil/frame.h>
58         #include <libavutil/opt.h>
59         #include <libavutil/imgutils.h>
60         #include <libavutil/parseutils.h>
61         #include <libavfilter/avfilter.h>
62         #include <libavfilter/buffersink.h>
63         #include <libavfilter/buffersrc.h>
64 }
65
66 #pragma warning(pop)
67
68 namespace caspar { namespace ffmpeg { namespace {
69
70 class ffmpeg_consumer
71 {
72 public:
73         // Static Members
74
75 private:
76         const spl::shared_ptr<diagnostics::graph>       graph_;
77         core::monitor::subject                                          subject_;
78         boost::filesystem::path                                         path_;
79
80         std::map<std::string, std::string>                      options_;
81
82         core::video_format_desc                                         in_video_format_;
83         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
84
85         std::shared_ptr<AVFormatContext>                        oc_;
86         tbb::atomic<bool>                                                       abort_request_;
87
88         std::shared_ptr<AVStream>                                       video_st_;
89         std::shared_ptr<AVStream>                                       audio_st_;
90
91         std::int64_t                                                            video_pts_;
92         std::int64_t                                                            audio_pts_;
93
94     AVFilterContext*                                                    audio_graph_in_;
95     AVFilterContext*                                                    audio_graph_out_;
96     std::shared_ptr<AVFilterGraph>                              audio_graph_;
97
98     AVFilterContext*                                                    video_graph_in_;
99     AVFilterContext*                                                    video_graph_out_;
100     std::shared_ptr<AVFilterGraph>                              video_graph_;
101
102         executor                                                                        video_encoder_executor_;
103         executor                                                                        audio_encoder_executor_;
104
105         semaphore                                                                       tokens_                                         { 0 };
106
107         tbb::atomic<int64_t>                                            current_encoding_delay_;
108
109         executor                                                                        write_executor_;
110
111 public:
112
113         ffmpeg_consumer(
114                         std::string path,
115                         std::string options)
116                 : path_(path)
117                 , video_pts_(0)
118                 , audio_pts_(0)
119                 , audio_encoder_executor_(print() + L" audio_encoder")
120                 , video_encoder_executor_(print() + L" video_encoder")
121                 , write_executor_(print() + L" io")
122         {
123                 abort_request_ = false;
124                 current_encoding_delay_ = 0;
125
126                 for(auto it =
127                                 boost::sregex_iterator(
128                                         options.begin(),
129                                         options.end(),
130                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
131                         it != boost::sregex_iterator();
132                         ++it)
133                 {
134                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
135                 }
136
137         if (options_.find("threads") == options_.end())
138             options_["threads"] = "auto";
139
140                 tokens_.release(
141                         std::max(
142                                 1,
143                                 try_remove_arg<int>(
144                                         options_,
145                                         boost::regex("tokens")).get_value_or(2)));
146         }
147
148         ~ffmpeg_consumer()
149         {
150                 if(oc_)
151                 {
152                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
153                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
154
155                         video_encoder_executor_.stop();
156                         audio_encoder_executor_.stop();
157                         video_encoder_executor_.join();
158                         audio_encoder_executor_.join();
159
160                         video_graph_.reset();
161                         audio_graph_.reset();
162                         video_st_.reset();
163                         audio_st_.reset();
164
165                         write_packet(nullptr, nullptr);
166
167                         write_executor_.stop();
168                         write_executor_.join();
169
170                         FF(av_write_trailer(oc_.get()));
171
172                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
173                                 avio_close(oc_->pb);
174
175                         oc_.reset();
176                 }
177         }
178
179         void initialize(
180                         const core::video_format_desc& format_desc,
181                         const core::audio_channel_layout& channel_layout)
182         {
183                 try
184                 {
185                         static boost::regex prot_exp("^.+:.*" );
186
187                         if(!boost::regex_match(
188                                         path_.string(),
189                                         prot_exp))
190                         {
191                                 if(!path_.is_complete())
192                                 {
193                                         path_ =
194                                                 u8(
195                                                         env::media_folder()) +
196                                                         path_.string();
197                                 }
198
199                                 if(boost::filesystem::exists(path_))
200                                         boost::filesystem::remove(path_);
201                         }
202
203                         graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));
204                         graph_->set_color("dropped-frame", diagnostics::color(0.3f, 0.6f, 0.3f));
205                         graph_->set_text(print());
206                         diagnostics::register_graph(graph_);
207
208                         const auto oformat_name =
209                                 try_remove_arg<std::string>(
210                                         options_,
211                                         boost::regex("^f|format$"));
212
213                         AVFormatContext* oc;
214
215                         FF(avformat_alloc_output_context2(
216                                 &oc,
217                                 nullptr,
218                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
219                                 path_.string().c_str()));
220
221                         oc_.reset(
222                                 oc,
223                                 avformat_free_context);
224
225                         CASPAR_VERIFY(oc_->oformat);
226
227                         oc_->interrupt_callback.callback = ffmpeg_consumer::interrupt_cb;
228                         oc_->interrupt_callback.opaque   = this;
229
230                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
231
232                         in_video_format_ = format_desc;
233                         in_channel_layout_ = channel_layout;
234
235                         CASPAR_VERIFY(oc_->oformat);
236
237                         const auto video_codec_name =
238                                 try_remove_arg<std::string>(
239                                         options_,
240                                         boost::regex("^c:v|codec:v|vcodec$"));
241
242                         const auto video_codec =
243                                 video_codec_name
244                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
245                                         : avcodec_find_encoder(oc_->oformat->video_codec);
246
247                         const auto audio_codec_name =
248                                 try_remove_arg<std::string>(
249                                         options_,
250                                          boost::regex("^c:a|codec:a|acodec$"));
251
252                         const auto audio_codec =
253                                 audio_codec_name
254                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
255                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
256
257                         if (!video_codec)
258                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
259                                                 "Failed to find video codec " + (video_codec_name
260                                                                 ? *video_codec_name
261                                                                 : "with id " + boost::lexical_cast<std::string>(
262                                                                                 oc_->oformat->video_codec))));
263                         if (!audio_codec)
264                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
265                                                 "Failed to find audio codec " + (audio_codec_name
266                                                                 ? *audio_codec_name
267                                                                 : "with id " + boost::lexical_cast<std::string>(
268                                                                                 oc_->oformat->audio_codec))));
269
270                         // Filters
271
272                         {
273                                 configure_video_filters(
274                                         *video_codec,
275                                         try_remove_arg<std::string>(options_,
276                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
277
278                                 configure_audio_filters(
279                                         *audio_codec,
280                                         try_remove_arg<std::string>(options_,
281                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
282                         }
283
284                         // Encoders
285
286                         {
287                                 auto video_options = options_;
288                                 auto audio_options = options_;
289
290                                 video_st_ = open_encoder(
291                                         *video_codec,
292                                         video_options);
293
294                                 audio_st_ = open_encoder(
295                                         *audio_codec,
296                                         audio_options);
297
298                                 auto it = options_.begin();
299                                 while(it != options_.end())
300                                 {
301                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
302                                                 it = options_.erase(it);
303                                         else
304                                                 ++it;
305                                 }
306                         }
307
308                         // Output
309                         {
310                                 AVDictionary* av_opts = nullptr;
311
312                                 to_dict(
313                                         &av_opts,
314                                         std::move(options_));
315
316                                 CASPAR_SCOPE_EXIT
317                                 {
318                                         av_dict_free(&av_opts);
319                                 };
320
321                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
322                                 {
323                                         FF(avio_open2(
324                                                 &oc_->pb,
325                                                 path_.string().c_str(),
326                                                 AVIO_FLAG_WRITE,
327                                                 &oc_->interrupt_callback,
328                                                 &av_opts));
329                                 }
330
331                                 FF(avformat_write_header(
332                                         oc_.get(),
333                                         &av_opts));
334
335                                 options_ = to_map(av_opts);
336                         }
337
338                         // Dump Info
339
340                         av_dump_format(
341                                 oc_.get(),
342                                 0,
343                                 oc_->filename,
344                                 1);
345
346                         for (const auto& option : options_)
347                         {
348                                 CASPAR_LOG(warning)
349                                         << L"Invalid option: -"
350                                         << u16(option.first)
351                                         << L" "
352                                         << u16(option.second);
353                         }
354                 }
355                 catch(...)
356                 {
357                         video_st_.reset();
358                         audio_st_.reset();
359                         oc_.reset();
360                         throw;
361                 }
362         }
363
364         core::monitor::subject& monitor_output()
365         {
366                 return subject_;
367         }
368
369         void send(core::const_frame frame)
370         {
371                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
372
373                 auto frame_timer = spl::make_shared<caspar::timer>();
374
375                 std::shared_ptr<void> token(
376                         nullptr,
377                         [this, frame, frame_timer](void*)
378                         {
379                                 tokens_.release();
380                                 current_encoding_delay_ = frame.get_age_millis();
381                                 graph_->set_value("frame-time", frame_timer->elapsed() * in_video_format_.fps * 0.5);
382                         });
383                 tokens_.acquire();
384
385                 video_encoder_executor_.begin_invoke([=]() mutable
386                 {
387                         encode_video(
388                                 frame,
389                                 token);
390                 });
391
392                 audio_encoder_executor_.begin_invoke([=]() mutable
393                 {
394                         encode_audio(
395                                 frame,
396                                 token);
397                 });
398         }
399
400         bool ready_for_frame() const
401         {
402                 return tokens_.permits() > 0;
403         }
404
405         void mark_dropped()
406         {
407                 graph_->set_tag(diagnostics::tag_severity::WARNING, "dropped-frame");
408         }
409
410         std::wstring print() const
411         {
412                 return L"ffmpeg_consumer[" + u16(path_.string()) + L"]";
413         }
414
415         int64_t presentation_frame_age_millis() const
416         {
417                 return current_encoding_delay_;
418         }
419
420 private:
421
422         static int interrupt_cb(void* ctx)
423         {
424                 CASPAR_ASSERT(ctx);
425                 return reinterpret_cast<ffmpeg_consumer*>(ctx)->abort_request_;
426         }
427
428         std::shared_ptr<AVStream> open_encoder(
429                         const AVCodec& codec,
430                         std::map<std::string,
431                         std::string>& options)
432         {
433                 auto st =
434                         avformat_new_stream(
435                                 oc_.get(),
436                                 &codec);
437
438                 if (!st)
439                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
440
441                 auto enc = st->codec;
442
443                 CASPAR_VERIFY(enc);
444
445                 switch(enc->codec_type)
446                 {
447                         case AVMEDIA_TYPE_VIDEO:
448                         {
449                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
450                                 enc->pix_fmt                                    = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
451                                 enc->sample_aspect_ratio                = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
452                                 enc->width                                      = video_graph_out_->inputs[0]->w;
453                                 enc->height                                     = video_graph_out_->inputs[0]->h;
454                                 enc->bit_rate_tolerance         = 400 * 1000000;
455
456                                 break;
457                         }
458                         case AVMEDIA_TYPE_AUDIO:
459                         {
460                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
461                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
462                                 enc->sample_rate                                = audio_graph_out_->inputs[0]->sample_rate;
463                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
464                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
465
466                                 break;
467                         }
468                 }
469
470                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
471                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
472
473                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
474
475                 const auto char_id = char_id_map.at(enc->codec_type);
476
477                 const auto codec_opts =
478                         remove_options(
479                                 options,
480                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
481
482                 AVDictionary* av_codec_opts = nullptr;
483
484                 to_dict(
485                         &av_codec_opts,
486                         options);
487
488                 to_dict(
489                         &av_codec_opts,
490                         codec_opts);
491
492                 options.clear();
493
494                 FF(avcodec_open2(
495                         enc,
496                         &codec,
497                         av_codec_opts ? &av_codec_opts : nullptr));
498
499                 if(av_codec_opts)
500                 {
501                         auto t =
502                                 av_dict_get(
503                                         av_codec_opts,
504                                         "",
505                                          nullptr,
506                                         AV_DICT_IGNORE_SUFFIX);
507
508                         while(t)
509                         {
510                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
511
512                                 t = av_dict_get(
513                                                 av_codec_opts,
514                                                 "",
515                                                 t,
516                                                 AV_DICT_IGNORE_SUFFIX);
517                         }
518
519                         av_dict_free(&av_codec_opts);
520                 }
521
522                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
523                 {
524                         CASPAR_ASSERT(enc->frame_size > 0);
525                         av_buffersink_set_frame_size(audio_graph_out_,
526                                                                                  enc->frame_size);
527                 }
528
529                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
530                 {
531                         avcodec_close(st->codec);
532                 });
533         }
534
535         void configure_video_filters(
536                         const AVCodec& codec,
537                         const std::string& filtergraph)
538         {
539                 video_graph_.reset(
540                                 avfilter_graph_alloc(),
541                                 [](AVFilterGraph* p)
542                                 {
543                                         avfilter_graph_free(&p);
544                                 });
545
546                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
547                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
548
549                 const auto sample_aspect_ratio =
550                         boost::rational<int>(
551                                         in_video_format_.square_width,
552                                         in_video_format_.square_height) /
553                         boost::rational<int>(
554                                         in_video_format_.width,
555                                         in_video_format_.height);
556
557                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
558                         % in_video_format_.width % in_video_format_.height
559                         % AV_PIX_FMT_BGRA
560                         % in_video_format_.duration     % in_video_format_.time_scale
561                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
562                         % in_video_format_.time_scale % in_video_format_.duration).str();
563
564                 AVFilterContext* filt_vsrc = nullptr;
565                 FF(avfilter_graph_create_filter(
566                                 &filt_vsrc,
567                                 avfilter_get_by_name("buffer"),
568                                 "ffmpeg_consumer_buffer",
569                                 vsrc_options.c_str(),
570                                 nullptr,
571                                 video_graph_.get()));
572
573                 AVFilterContext* filt_vsink = nullptr;
574                 FF(avfilter_graph_create_filter(
575                                 &filt_vsink,
576                                 avfilter_get_by_name("buffersink"),
577                                 "ffmpeg_consumer_buffersink",
578                                 nullptr,
579                                 nullptr,
580                                 video_graph_.get()));
581
582 #pragma warning (push)
583 #pragma warning (disable : 4245)
584
585                 FF(av_opt_set_int_list(
586                                 filt_vsink,
587                                 "pix_fmts",
588                                 codec.pix_fmts,
589                                 -1,
590                                 AV_OPT_SEARCH_CHILDREN));
591
592 #pragma warning (pop)
593
594                 configure_filtergraph(
595                                 *video_graph_,
596                                 filtergraph,
597                                 *filt_vsrc,
598                                 *filt_vsink);
599
600                 video_graph_in_  = filt_vsrc;
601                 video_graph_out_ = filt_vsink;
602
603                 CASPAR_LOG(info)
604                         <<      u16(std::string("\n")
605                                 + avfilter_graph_dump(
606                                                 video_graph_.get(),
607                                                 nullptr));
608         }
609
610         void configure_audio_filters(
611                         const AVCodec& codec,
612                         const std::string& filtergraph)
613         {
614                 audio_graph_.reset(
615                         avfilter_graph_alloc(),
616                         [](AVFilterGraph* p)
617                         {
618                                 avfilter_graph_free(&p);
619                         });
620
621                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
622                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
623
624                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
625                         % in_video_format_.audio_sample_rate
626                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
627                         % in_channel_layout_.num_channels
628                         % 1     % in_video_format_.audio_sample_rate
629                         % boost::io::group(
630                                 std::hex,
631                                 std::showbase,
632                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
633
634                 AVFilterContext* filt_asrc = nullptr;
635                 FF(avfilter_graph_create_filter(
636                         &filt_asrc,
637                         avfilter_get_by_name("abuffer"),
638                         "ffmpeg_consumer_abuffer",
639                         asrc_options.c_str(),
640                         nullptr,
641                         audio_graph_.get()));
642
643                 AVFilterContext* filt_asink = nullptr;
644                 FF(avfilter_graph_create_filter(
645                         &filt_asink,
646                         avfilter_get_by_name("abuffersink"),
647                         "ffmpeg_consumer_abuffersink",
648                         nullptr,
649                         nullptr,
650                         audio_graph_.get()));
651
652 #pragma warning (push)
653 #pragma warning (disable : 4245)
654
655                 FF(av_opt_set_int(
656                         filt_asink,
657                         "all_channel_counts",
658                         1,
659                         AV_OPT_SEARCH_CHILDREN));
660
661                 FF(av_opt_set_int_list(
662                         filt_asink,
663                         "sample_fmts",
664                         codec.sample_fmts,
665                         -1,
666                         AV_OPT_SEARCH_CHILDREN));
667
668                 FF(av_opt_set_int_list(
669                         filt_asink,
670                         "channel_layouts",
671                         codec.channel_layouts,
672                         -1,
673                         AV_OPT_SEARCH_CHILDREN));
674
675                 FF(av_opt_set_int_list(
676                         filt_asink,
677                         "sample_rates" ,
678                         codec.supported_samplerates,
679                         -1,
680                         AV_OPT_SEARCH_CHILDREN));
681
682 #pragma warning (pop)
683
684                 configure_filtergraph(
685                         *audio_graph_,
686                         filtergraph,
687                         *filt_asrc,
688                         *filt_asink);
689
690                 audio_graph_in_  = filt_asrc;
691                 audio_graph_out_ = filt_asink;
692
693                 CASPAR_LOG(info)
694                         <<      u16(std::string("\n")
695                                 + avfilter_graph_dump(
696                                         audio_graph_.get(),
697                                         nullptr));
698         }
699
700         void configure_filtergraph(
701                         AVFilterGraph& graph,
702                         const std::string& filtergraph,
703                         AVFilterContext& source_ctx,
704                         AVFilterContext& sink_ctx)
705         {
706                 AVFilterInOut* outputs = nullptr;
707                 AVFilterInOut* inputs = nullptr;
708
709                 try
710                 {
711                         if(!filtergraph.empty())
712                         {
713                                 outputs = avfilter_inout_alloc();
714                                 inputs  = avfilter_inout_alloc();
715
716                                 CASPAR_VERIFY(outputs && inputs);
717
718                                 outputs->name       = av_strdup("in");
719                                 outputs->filter_ctx = &source_ctx;
720                                 outputs->pad_idx    = 0;
721                                 outputs->next       = nullptr;
722
723                                 inputs->name        = av_strdup("out");
724                                 inputs->filter_ctx  = &sink_ctx;
725                                 inputs->pad_idx     = 0;
726                                 inputs->next        = nullptr;
727
728                                 FF(avfilter_graph_parse(
729                                         &graph,
730                                         filtergraph.c_str(),
731                                         inputs,
732                                         outputs,
733                                         nullptr));
734                         }
735                         else
736                         {
737                                 FF(avfilter_link(
738                                         &source_ctx,
739                                         0,
740                                         &sink_ctx,
741                                         0));
742                         }
743
744                         FF(avfilter_graph_config(
745                                 &graph,
746                                 nullptr));
747                 }
748                 catch(...)
749                 {
750                         avfilter_inout_free(&outputs);
751                         avfilter_inout_free(&inputs);
752                         throw;
753                 }
754         }
755
756         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
757         {
758                 if(!video_st_)
759                         return;
760
761                 auto enc = video_st_->codec;
762
763                 if(frame_ptr != core::const_frame::empty())
764                 {
765                         auto src_av_frame = create_frame();
766
767                         const auto sample_aspect_ratio =
768                                 boost::rational<int>(
769                                         in_video_format_.square_width,
770                                         in_video_format_.square_height) /
771                                 boost::rational<int>(
772                                         in_video_format_.width,
773                                         in_video_format_.height);
774
775                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
776                         src_av_frame->width                                       = in_video_format_.width;
777                         src_av_frame->height                              = in_video_format_.height;
778                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
779                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
780                         src_av_frame->pts                                         = video_pts_;
781
782                         video_pts_ += 1;
783
784                         FF(av_image_fill_arrays(
785                                 src_av_frame->data,
786                                 src_av_frame->linesize,
787                                 frame_ptr.image_data().begin(),
788                                 static_cast<AVPixelFormat>(src_av_frame->format),
789                                 in_video_format_.width,
790                                 in_video_format_.height,
791                                 1));
792
793                         FF(av_buffersrc_add_frame(
794                                 video_graph_in_,
795                                 src_av_frame.get()));
796                 }
797
798                 int ret = 0;
799
800                 while(ret >= 0)
801                 {
802                         auto filt_frame = create_frame();
803
804                         ret = av_buffersink_get_frame(
805                                 video_graph_out_,
806                                 filt_frame.get());
807
808                         video_encoder_executor_.begin_invoke([=]
809                         {
810                                 if(ret == AVERROR_EOF)
811                                 {
812                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
813                                         {
814                                                 while(encode_av_frame(
815                                                                 *video_st_,
816                                                                 avcodec_encode_video2,
817                                                                 nullptr, token))
818                                                 {
819                                                         boost::this_thread::yield(); // TODO:
820                                                 }
821                                         }
822                                 }
823                                 else if(ret != AVERROR(EAGAIN))
824                                 {
825                                         FF_RET(ret, "av_buffersink_get_frame");
826
827                                         if (filt_frame->interlaced_frame)
828                                         {
829                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
830                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
831                                                 else
832                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
833                                         }
834                                         else
835                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
836
837                                         filt_frame->quality = enc->global_quality;
838
839                                         if (!enc->me_threshold)
840                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
841
842                                         encode_av_frame(
843                                                 *video_st_,
844                                                 avcodec_encode_video2,
845                                                 filt_frame,
846                                                 token);
847
848                                         boost::this_thread::yield(); // TODO:
849                                 }
850                         });
851                 }
852         }
853
854         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
855         {
856                 if(!audio_st_)
857                         return;
858
859                 auto enc = audio_st_->codec;
860
861                 if(frame_ptr != core::const_frame::empty())
862                 {
863                         auto src_av_frame = create_frame();
864
865                         src_av_frame->channels           = in_channel_layout_.num_channels;
866                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
867                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
868                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
869                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
870                         src_av_frame->pts                        = audio_pts_;
871
872                         audio_pts_ += src_av_frame->nb_samples;
873
874                         FF(av_samples_fill_arrays(
875                                         src_av_frame->extended_data,
876                                         src_av_frame->linesize,
877                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
878                                         src_av_frame->channels,
879                                         src_av_frame->nb_samples,
880                                         static_cast<AVSampleFormat>(src_av_frame->format),
881                                         16));
882
883                         FF(av_buffersrc_add_frame(
884                                         audio_graph_in_,
885                                         src_av_frame.get()));
886                 }
887
888                 int ret = 0;
889
890                 while(ret >= 0)
891                 {
892                         auto filt_frame = create_frame();
893
894                         ret = av_buffersink_get_frame(
895                                 audio_graph_out_,
896                                 filt_frame.get());
897
898                         audio_encoder_executor_.begin_invoke([=]
899                         {
900                                 if(ret == AVERROR_EOF)
901                                 {
902                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
903                                         {
904                                                 while(encode_av_frame(
905                                                                 *audio_st_,
906                                                                 avcodec_encode_audio2,
907                                                                 nullptr,
908                                                                 token))
909                                                 {
910                                                         boost::this_thread::yield(); // TODO:
911                                                 }
912                                         }
913                                 }
914                                 else if(ret != AVERROR(EAGAIN))
915                                 {
916                                         FF_RET(
917                                                 ret,
918                                                 "av_buffersink_get_frame");
919
920                                         encode_av_frame(
921                                                 *audio_st_,
922                                                 avcodec_encode_audio2,
923                                                 filt_frame,
924                                                 token);
925
926                                         boost::this_thread::yield(); // TODO:
927                                 }
928                         });
929                 }
930         }
931
932         template<typename F>
933         bool encode_av_frame(
934                         AVStream& st,
935                         const F& func,
936                         const std::shared_ptr<AVFrame>& src_av_frame,
937                         std::shared_ptr<void> token)
938         {
939                 AVPacket pkt = {};
940                 av_init_packet(&pkt);
941
942                 int got_packet = 0;
943
944                 FF(func(
945                         st.codec,
946                         &pkt,
947                         src_av_frame.get(),
948                         &got_packet));
949
950                 if(!got_packet || pkt.size <= 0)
951                         return false;
952
953                 pkt.stream_index = st.index;
954
955                 if (pkt.pts != AV_NOPTS_VALUE)
956                 {
957                         pkt.pts =
958                                 av_rescale_q(
959                                         pkt.pts,
960                                         st.codec->time_base,
961                                         st.time_base);
962                 }
963
964                 if (pkt.dts != AV_NOPTS_VALUE)
965                 {
966                         pkt.dts =
967                                 av_rescale_q(
968                                         pkt.dts,
969                                         st.codec->time_base,
970                                         st.time_base);
971                 }
972
973                 pkt.duration =
974                         static_cast<int>(
975                                 av_rescale_q(
976                                         pkt.duration,
977                                         st.codec->time_base, st.time_base));
978
979                 write_packet(
980                         std::shared_ptr<AVPacket>(
981                                 new AVPacket(pkt),
982                                 [](AVPacket* p)
983                                 {
984                                         av_free_packet(p);
985                                         delete p;
986                                 }), token);
987
988                 return true;
989         }
990
991         void write_packet(
992                         const std::shared_ptr<AVPacket>& pkt_ptr,
993                         std::shared_ptr<void> token)
994         {
995                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
996                 {
997                         FF(av_interleaved_write_frame(
998                                 oc_.get(),
999                                 pkt_ptr.get()));
1000                 });
1001         }
1002
1003         template<typename T>
1004         static boost::optional<T> try_remove_arg(
1005                         std::map<std::string, std::string>& options,
1006                         const boost::regex& expr)
1007         {
1008                 for(auto it = options.begin(); it != options.end(); ++it)
1009                 {
1010                         if(boost::regex_search(it->first, expr))
1011                         {
1012                                 auto arg = it->second;
1013                                 options.erase(it);
1014                                 return boost::lexical_cast<T>(arg);
1015                         }
1016                 }
1017
1018                 return boost::optional<T>();
1019         }
1020
1021         static std::map<std::string, std::string> remove_options(
1022                         std::map<std::string, std::string>& options,
1023                         const boost::regex& expr)
1024         {
1025                 std::map<std::string, std::string> result;
1026
1027                 auto it = options.begin();
1028                 while(it != options.end())
1029                 {
1030                         boost::smatch what;
1031                         if(boost::regex_search(it->first, what, expr))
1032                         {
1033                                 result[
1034                                         what.size() > 0 && what[1].matched
1035                                                 ? what[1].str()
1036                                                 : it->first] = it->second;
1037                                 it = options.erase(it);
1038                         }
1039                         else
1040                                 ++it;
1041                 }
1042
1043                 return result;
1044         }
1045
1046         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1047         {
1048                 for (const auto& entry : c)
1049                 {
1050                         av_dict_set(
1051                                 dest,
1052                                 entry.first.c_str(),
1053                                 entry.second.c_str(), 0);
1054                 }
1055         }
1056
1057         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1058         {
1059                 std::map<std::string, std::string> result;
1060
1061                 for(auto t = dict
1062                                 ? av_dict_get(
1063                                         dict,
1064                                         "",
1065                                         nullptr,
1066                                         AV_DICT_IGNORE_SUFFIX)
1067                                 : nullptr;
1068                         t;
1069                         t = av_dict_get(
1070                                 dict,
1071                                 "",
1072                                 t,
1073                                 AV_DICT_IGNORE_SUFFIX))
1074                 {
1075                         result[t->key] = t->value;
1076                 }
1077
1078                 return result;
1079         }
1080 };
1081
1082 int crc16(const std::string& str)
1083 {
1084         boost::crc_16_type result;
1085
1086         result.process_bytes(str.data(), str.length());
1087
1088         return result.checksum();
1089 }
1090
1091 struct ffmpeg_consumer_proxy : public core::frame_consumer
1092 {
1093         const std::string                                       path_;
1094         const std::string                                       options_;
1095         const bool                                                      separate_key_;
1096         const bool                                                      compatibility_mode_;
1097         int                                                                     consumer_index_offset_;
1098
1099         std::unique_ptr<ffmpeg_consumer>        consumer_;
1100         std::unique_ptr<ffmpeg_consumer>        key_only_consumer_;
1101
1102 public:
1103
1104         ffmpeg_consumer_proxy(const std::string& path, const std::string& options, bool separate_key, bool compatibility_mode)
1105                 : path_(path)
1106                 , options_(options)
1107                 , separate_key_(separate_key)
1108                 , compatibility_mode_(compatibility_mode)
1109                 , consumer_index_offset_(crc16(path))
1110         {
1111         }
1112
1113         void initialize(const core::video_format_desc& format_desc, const core::audio_channel_layout& channel_layout, int) override
1114         {
1115                 if (consumer_)
1116                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Cannot reinitialize ffmpeg-consumer."));
1117
1118                 consumer_.reset(new ffmpeg_consumer(path_, options_));
1119                 consumer_->initialize(format_desc, channel_layout);
1120
1121                 if (separate_key_)
1122                 {
1123                         boost::filesystem::path fill_file(path_);
1124                         auto without_extension = u16(fill_file.parent_path().string() + "/" + fill_file.stem().string());
1125                         auto key_file = without_extension + L"_A" + u16(fill_file.extension().string());
1126
1127                         key_only_consumer_.reset(new ffmpeg_consumer(u8(key_file), options_));
1128                         key_only_consumer_->initialize(format_desc, channel_layout);
1129                 }
1130         }
1131
1132         int64_t presentation_frame_age_millis() const override
1133         {
1134                 return consumer_ ? static_cast<int64_t>(consumer_->presentation_frame_age_millis()) : 0;
1135         }
1136
1137         std::future<bool> send(core::const_frame frame) override
1138         {
1139                 bool ready_for_frame = consumer_->ready_for_frame();
1140
1141                 if (ready_for_frame && separate_key_)
1142                         ready_for_frame = ready_for_frame && key_only_consumer_->ready_for_frame();
1143
1144                 if (ready_for_frame)
1145                 {
1146                         consumer_->send(frame);
1147
1148                         if (separate_key_)
1149                                 key_only_consumer_->send(frame.key_only());
1150                 }
1151                 else
1152                 {
1153                         consumer_->mark_dropped();
1154
1155                         if (separate_key_)
1156                                 key_only_consumer_->mark_dropped();
1157                 }
1158
1159                 return make_ready_future(true);
1160         }
1161
1162         std::wstring print() const override
1163         {
1164                 return consumer_ ? consumer_->print() : L"[ffmpeg_consumer]";
1165         }
1166
1167         std::wstring name() const override
1168         {
1169                 return L"ffmpeg";
1170         }
1171
1172         boost::property_tree::wptree info() const override
1173         {
1174                 boost::property_tree::wptree info;
1175                 info.add(L"type", L"ffmpeg");
1176                 info.add(L"path", u16(path_));
1177                 info.add(L"separate_key", separate_key_);
1178                 return info;
1179         }
1180
1181         bool has_synchronization_clock() const override
1182         {
1183                 return false;
1184         }
1185
1186         int buffer_depth() const override
1187         {
1188                 return -1;
1189         }
1190
1191         int index() const override
1192         {
1193                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
1194         }
1195
1196         core::monitor::subject& monitor_output()
1197         {
1198                 return consumer_->monitor_output();
1199         }
1200 };
1201
1202 }
1203
1204 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1205 {
1206         sink.short_description(L"For streaming/recording the contents of a channel using FFmpeg.");
1207         sink.syntax(L"FILE,STREAM [filename:string],[url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1208         sink.para()->text(L"For recording or streaming the contents of a channel using FFmpeg");
1209         sink.definitions()
1210                 ->item(L"filename", L"The filename under the media folder including the extension (decides which kind of container format that will be used).")
1211                 ->item(L"url", L"If the filename is given in the form of an URL a network stream will be created instead of a file on disk.")
1212                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1213         sink.para()->text(L"Examples:");
1214         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd");
1215         sink.example(L">> ADD 1 FILE output.mov -vcodec prores");
1216         sink.example(L">> ADD 1 FILE output.mov -vcodec dvvideo");
1217         sink.example(L">> ADD 1 FILE output.mov -vcodec libx264 -preset ultrafast -tune fastdecode -crf 25");
1218         sink.example(L">> ADD 1 FILE output.mov -vcodec dnxhd SEPARATE_KEY", L"for creating output.mov with fill and output_A.mov with key/alpha");
1219         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast",
1220                 L"for streaming over UDP instead of creating a local file.");
1221 }
1222
1223 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1224                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1225 {
1226         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1227                 return core::frame_consumer::empty();
1228
1229         auto params2                    = params;
1230         auto separate_key_it    = std::find_if(params2.begin(), params2.end(), param_comparer(L"SEPARATE_KEY"));
1231         bool separate_key               = false;
1232
1233         if (separate_key_it != params2.end())
1234         {
1235                 separate_key = true;
1236                 params2.erase(separate_key_it);
1237         }
1238
1239         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1240         auto path                               = u8(params2.size() > 1 ? params2.at(1) : L"");
1241         auto args                               = u8(boost::join(params2, L" "));
1242
1243         return spl::make_shared<ffmpeg_consumer_proxy>(path, args, separate_key, compatibility_mode);
1244 }
1245
1246 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1247                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1248 {
1249         return spl::make_shared<ffmpeg_consumer_proxy>(
1250                         u8(ptree_get<std::wstring>(ptree, L"path")),
1251                         u8(ptree.get<std::wstring>(L"args", L"")),
1252                         ptree.get<bool>(L"separate-key", false),
1253                         false);
1254 }
1255
1256 }}