]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
[ffmpeg] Remove redundant av_frame_alloc()/av_frame_free() RAII pairs all over the...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7
8 #include <common/except.h>
9 #include <common/executor.h>
10 #include <common/assert.h>
11 #include <common/utf.h>
12 #include <common/future.h>
13 #include <common/env.h>
14 #include <common/scope_exit.h>
15 #include <common/ptree.h>
16
17 #include <core/consumer/frame_consumer.h>
18 #include <core/frame/frame.h>
19 #include <core/frame/audio_channel_layout.h>
20 #include <core/video_format.h>
21 #include <core/monitor/monitor.h>
22 #include <core/help/help_repository.h>
23 #include <core/help/help_sink.h>
24
25 #include <boost/noncopyable.hpp>
26 #include <boost/rational.hpp>
27 #include <boost/format.hpp>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/property_tree/ptree.hpp>
30
31 #pragma warning(push)
32 #pragma warning(disable: 4244)
33 #pragma warning(disable: 4245)
34 #include <boost/crc.hpp>
35 #pragma warning(pop)
36
37 #include <tbb/atomic.h>
38 #include <tbb/concurrent_queue.h>
39 #include <tbb/parallel_invoke.h>
40 #include <tbb/parallel_for.h>
41
42 #include <numeric>
43
44 #pragma warning(push)
45 #pragma warning(disable: 4244)
46
47 extern "C"
48 {
49         #define __STDC_CONSTANT_MACROS
50         #define __STDC_LIMIT_MACROS
51         #include <libavformat/avformat.h>
52         #include <libavcodec/avcodec.h>
53         #include <libavutil/avutil.h>
54         #include <libavutil/frame.h>
55         #include <libavutil/opt.h>
56         #include <libavutil/imgutils.h>
57         #include <libavutil/parseutils.h>
58         #include <libavfilter/avfilter.h>
59         #include <libavfilter/buffersink.h>
60         #include <libavfilter/buffersrc.h>
61 }
62
63 #pragma warning(pop)
64
65 namespace caspar { namespace ffmpeg {
66
67 int crc16(const std::string& str)
68 {
69         boost::crc_16_type result;
70
71         result.process_bytes(str.data(), str.length());
72
73         return result.checksum();
74 }
75
76 class streaming_consumer final : public core::frame_consumer
77 {
78 public:
79         // Static Members
80
81 private:
82         core::monitor::subject                                          subject_;
83         boost::filesystem::path                                         path_;
84         int                                                                                     consumer_index_offset_;
85
86         std::map<std::string, std::string>                      options_;
87         bool                                                                            compatibility_mode_;
88
89         core::video_format_desc                                         in_video_format_;
90         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
91
92         std::shared_ptr<AVFormatContext>                        oc_;
93         tbb::atomic<bool>                                                       abort_request_;
94
95         std::shared_ptr<AVStream>                                       video_st_;
96         std::shared_ptr<AVStream>                                       audio_st_;
97
98         std::int64_t                                                            video_pts_;
99         std::int64_t                                                            audio_pts_;
100
101     AVFilterContext*                                                    audio_graph_in_;
102     AVFilterContext*                                                    audio_graph_out_;
103     std::shared_ptr<AVFilterGraph>                              audio_graph_;
104         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;
105
106     AVFilterContext*                                                    video_graph_in_;
107     AVFilterContext*                                                    video_graph_out_;
108     std::shared_ptr<AVFilterGraph>                              video_graph_;
109         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
110
111         executor                                                                        executor_;
112
113         executor                                                                        video_encoder_executor_;
114         executor                                                                        audio_encoder_executor_;
115
116         tbb::atomic<int>                                                        tokens_;
117         boost::mutex                                                            tokens_mutex_;
118         boost::condition_variable                                       tokens_cond_;
119         tbb::atomic<int64_t>                                            current_encoding_delay_;
120
121         executor                                                                        write_executor_;
122
123 public:
124
125         streaming_consumer(
126                         std::string path,
127                         std::string options,
128                         bool compatibility_mode)
129                 : path_(path)
130                 , consumer_index_offset_(crc16(path))
131                 , compatibility_mode_(compatibility_mode)
132                 , video_pts_(0)
133                 , audio_pts_(0)
134                 , executor_(print())
135                 , audio_encoder_executor_(print() + L" audio_encoder")
136                 , video_encoder_executor_(print() + L" video_encoder")
137                 , write_executor_(print() + L" io")
138         {
139                 abort_request_ = false;
140                 current_encoding_delay_ = 0;
141
142                 for(auto it =
143                                 boost::sregex_iterator(
144                                         options.begin(),
145                                         options.end(),
146                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
147                         it != boost::sregex_iterator();
148                         ++it)
149                 {
150                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
151                 }
152
153         if (options_.find("threads") == options_.end())
154             options_["threads"] = "auto";
155
156                 tokens_ =
157                         std::max(
158                                 1,
159                                 try_remove_arg<int>(
160                                         options_,
161                                         boost::regex("tokens")).get_value_or(2));
162         }
163
164         ~streaming_consumer()
165         {
166                 if(oc_)
167                 {
168                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
169                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
170
171                         video_encoder_executor_.stop();
172                         audio_encoder_executor_.stop();
173                         video_encoder_executor_.join();
174                         audio_encoder_executor_.join();
175
176                         video_graph_.reset();
177                         audio_graph_.reset();
178                         video_st_.reset();
179                         audio_st_.reset();
180
181                         write_packet(nullptr, nullptr);
182
183                         write_executor_.stop();
184                         write_executor_.join();
185
186                         FF(av_write_trailer(oc_.get()));
187
188                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
189                                 avio_close(oc_->pb);
190
191                         oc_.reset();
192                 }
193         }
194
195         void initialize(
196                         const core::video_format_desc& format_desc,
197                         const core::audio_channel_layout& channel_layout,
198                         int channel_index) override
199         {
200                 try
201                 {
202                         static boost::regex prot_exp("^.+:.*" );
203
204                         const auto overwrite =
205                                 try_remove_arg<std::string>(
206                                         options_,
207                                         boost::regex("y")) != boost::none;
208
209                         if(!boost::regex_match(
210                                         path_.string(),
211                                         prot_exp))
212                         {
213                                 if(!path_.is_complete())
214                                 {
215                                         path_ =
216                                                 u8(
217                                                         env::media_folder()) +
218                                                         path_.string();
219                                 }
220
221                                 if(boost::filesystem::exists(path_))
222                                 {
223                                         if(!overwrite && !compatibility_mode_)
224                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
225
226                                         boost::filesystem::remove(path_);
227                                 }
228                         }
229
230                         const auto oformat_name =
231                                 try_remove_arg<std::string>(
232                                         options_,
233                                         boost::regex("^f|format$"));
234
235                         AVFormatContext* oc;
236
237                         FF(avformat_alloc_output_context2(
238                                 &oc,
239                                 nullptr,
240                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
241                                 path_.string().c_str()));
242
243                         oc_.reset(
244                                 oc,
245                                 avformat_free_context);
246
247                         CASPAR_VERIFY(oc_->oformat);
248
249                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
250                         oc_->interrupt_callback.opaque   = this;
251
252                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
253
254                         in_video_format_ = format_desc;
255                         in_channel_layout_ = channel_layout;
256
257                         CASPAR_VERIFY(oc_->oformat);
258
259                         const auto video_codec_name =
260                                 try_remove_arg<std::string>(
261                                         options_,
262                                         boost::regex("^c:v|codec:v|vcodec$"));
263
264                         const auto video_codec =
265                                 video_codec_name
266                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
267                                         : avcodec_find_encoder(oc_->oformat->video_codec);
268
269                         const auto audio_codec_name =
270                                 try_remove_arg<std::string>(
271                                         options_,
272                                          boost::regex("^c:a|codec:a|acodec$"));
273
274                         const auto audio_codec =
275                                 audio_codec_name
276                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
277                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
278
279                         if (!video_codec)
280                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
281                                                 "Failed to find video codec " + (video_codec_name
282                                                                 ? *video_codec_name
283                                                                 : "with id " + boost::lexical_cast<std::string>(
284                                                                                 oc_->oformat->video_codec))));
285                         if (!audio_codec)
286                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
287                                                 "Failed to find audio codec " + (audio_codec_name
288                                                                 ? *audio_codec_name
289                                                                 : "with id " + boost::lexical_cast<std::string>(
290                                                                                 oc_->oformat->audio_codec))));
291
292                         // Filters
293
294                         {
295                                 configure_video_filters(
296                                         *video_codec,
297                                         try_remove_arg<std::string>(options_,
298                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
299
300                                 configure_audio_filters(
301                                         *audio_codec,
302                                         try_remove_arg<std::string>(options_,
303                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
304                         }
305
306                         // Bistream Filters
307                         {
308                                 configue_audio_bistream_filters(options_);
309                                 configue_video_bistream_filters(options_);
310                         }
311
312                         // Encoders
313
314                         {
315                                 auto video_options = options_;
316                                 auto audio_options = options_;
317
318                                 video_st_ = open_encoder(
319                                         *video_codec,
320                                         video_options);
321
322                                 audio_st_ = open_encoder(
323                                         *audio_codec,
324                                         audio_options);
325
326                                 auto it = options_.begin();
327                                 while(it != options_.end())
328                                 {
329                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
330                                                 it = options_.erase(it);
331                                         else
332                                                 ++it;
333                                 }
334                         }
335
336                         // Output
337                         {
338                                 AVDictionary* av_opts = nullptr;
339
340                                 to_dict(
341                                         &av_opts,
342                                         std::move(options_));
343
344                                 CASPAR_SCOPE_EXIT
345                                 {
346                                         av_dict_free(&av_opts);
347                                 };
348
349                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
350                                 {
351                                         FF(avio_open2(
352                                                 &oc_->pb,
353                                                 path_.string().c_str(),
354                                                 AVIO_FLAG_WRITE,
355                                                 &oc_->interrupt_callback,
356                                                 &av_opts));
357                                 }
358
359                                 FF(avformat_write_header(
360                                         oc_.get(),
361                                         &av_opts));
362
363                                 options_ = to_map(av_opts);
364                         }
365
366                         // Dump Info
367
368                         av_dump_format(
369                                 oc_.get(),
370                                 0,
371                                 oc_->filename,
372                                 1);
373
374                         for (const auto& option : options_)
375                         {
376                                 CASPAR_LOG(warning)
377                                         << L"Invalid option: -"
378                                         << u16(option.first)
379                                         << L" "
380                                         << u16(option.second);
381                         }
382                 }
383                 catch(...)
384                 {
385                         video_st_.reset();
386                         audio_st_.reset();
387                         oc_.reset();
388                         throw;
389                 }
390         }
391
392         core::monitor::subject& monitor_output() override
393         {
394                 return subject_;
395         }
396
397         std::wstring name() const override
398         {
399                 return L"streaming";
400         }
401
402         std::future<bool> send(core::const_frame frame) override
403         {
404                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
405
406                 --tokens_;
407                 std::shared_ptr<void> token(
408                         nullptr,
409                         [this, frame](void*)
410                         {
411                                 ++tokens_;
412                                 tokens_cond_.notify_one();
413                                 current_encoding_delay_ = frame.get_age_millis();
414                         });
415
416                 return executor_.begin_invoke([=]() -> bool
417                 {
418                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
419
420                         while(tokens_ < 0)
421                                 tokens_cond_.wait(tokens_lock);
422
423                         video_encoder_executor_.begin_invoke([=]() mutable
424                         {
425                                 encode_video(
426                                         frame,
427                                         token);
428                         });
429
430                         audio_encoder_executor_.begin_invoke([=]() mutable
431                         {
432                                 encode_audio(
433                                         frame,
434                                         token);
435                         });
436
437                         return true;
438                 });
439         }
440
441         std::wstring print() const override
442         {
443                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
444         }
445
446         virtual boost::property_tree::wptree info() const override
447         {
448                 boost::property_tree::wptree info;
449                 info.add(L"type", L"stream");
450                 info.add(L"path", path_.wstring());
451                 return info;
452         }
453
454         bool has_synchronization_clock() const override
455         {
456                 return false;
457         }
458
459         int buffer_depth() const override
460         {
461                 return -1;
462         }
463
464         int index() const override
465         {
466                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
467         }
468
469         int64_t presentation_frame_age_millis() const override
470         {
471                 return current_encoding_delay_;
472         }
473
474 private:
475
476         static int interrupt_cb(void* ctx)
477         {
478                 CASPAR_ASSERT(ctx);
479                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;
480         }
481
482         std::shared_ptr<AVStream> open_encoder(
483                         const AVCodec& codec,
484                         std::map<std::string,
485                         std::string>& options)
486         {
487                 auto st =
488                         avformat_new_stream(
489                                 oc_.get(),
490                                 &codec);
491
492                 if (!st)
493                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
494
495                 auto enc = st->codec;
496
497                 CASPAR_VERIFY(enc);
498
499                 switch(enc->codec_type)
500                 {
501                         case AVMEDIA_TYPE_VIDEO:
502                         {
503                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
504                                 enc->pix_fmt                            = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
505                                 enc->sample_aspect_ratio        = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
506                                 enc->width                                      = video_graph_out_->inputs[0]->w;
507                                 enc->height                                     = video_graph_out_->inputs[0]->h;
508                                 enc->bit_rate_tolerance         = 400 * 1000000;
509
510                                 break;
511                         }
512                         case AVMEDIA_TYPE_AUDIO:
513                         {
514                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
515                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
516                                 enc->sample_rate                        = audio_graph_out_->inputs[0]->sample_rate;
517                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
518                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
519
520                                 break;
521                         }
522                 }
523
524                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
525                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
526
527                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
528
529                 const auto char_id = char_id_map.at(enc->codec_type);
530
531                 const auto codec_opts =
532                         remove_options(
533                                 options,
534                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
535
536                 AVDictionary* av_codec_opts = nullptr;
537
538                 to_dict(
539                         &av_codec_opts,
540                         options);
541
542                 to_dict(
543                         &av_codec_opts,
544                         codec_opts);
545
546                 options.clear();
547
548                 FF(avcodec_open2(
549                         enc,
550                         &codec,
551                         av_codec_opts ? &av_codec_opts : nullptr));
552
553                 if(av_codec_opts)
554                 {
555                         auto t =
556                                 av_dict_get(
557                                         av_codec_opts,
558                                         "",
559                                          nullptr,
560                                         AV_DICT_IGNORE_SUFFIX);
561
562                         while(t)
563                         {
564                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
565
566                                 t = av_dict_get(
567                                                 av_codec_opts,
568                                                 "",
569                                                 t,
570                                                 AV_DICT_IGNORE_SUFFIX);
571                         }
572
573                         av_dict_free(&av_codec_opts);
574                 }
575
576                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
577                 {
578                         CASPAR_ASSERT(enc->frame_size > 0);
579                         av_buffersink_set_frame_size(audio_graph_out_,
580                                                                                  enc->frame_size);
581                 }
582
583                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
584                 {
585                         avcodec_close(st->codec);
586                 });
587         }
588
589         void configue_audio_bistream_filters(
590                         std::map<std::string, std::string>& options)
591         {
592                 const auto audio_bitstream_filter_str =
593                         try_remove_arg<std::string>(
594                                 options,
595                                 boost::regex("^bsf:a|absf$"));
596
597                 const auto audio_bitstream_filter =
598                         audio_bitstream_filter_str
599                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str())
600                                 : nullptr;
601
602                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
603
604                 if(audio_bitstream_filter)
605                 {
606                         audio_bitstream_filter_.reset(
607                                 audio_bitstream_filter,
608                                 av_bitstream_filter_close);
609                 }
610
611                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
612                         options["bsf:a"] = *audio_bitstream_filter_str;
613         }
614
615         void configue_video_bistream_filters(
616                         std::map<std::string, std::string>& options)
617         {
618                 const auto video_bitstream_filter_str =
619                                 try_remove_arg<std::string>(
620                                         options,
621                                         boost::regex("^bsf:v|vbsf$"));
622
623                 const auto video_bitstream_filter =
624                         video_bitstream_filter_str
625                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str())
626                                 : nullptr;
627
628                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
629
630                 if(video_bitstream_filter)
631                 {
632                         video_bitstream_filter_.reset(
633                                 video_bitstream_filter,
634                                 av_bitstream_filter_close);
635                 }
636
637                 if(video_bitstream_filter_str && !video_bitstream_filter_)
638                         options["bsf:v"] = *video_bitstream_filter_str;
639         }
640
641         void configure_video_filters(
642                         const AVCodec& codec,
643                         const std::string& filtergraph)
644         {
645                 video_graph_.reset(
646                                 avfilter_graph_alloc(),
647                                 [](AVFilterGraph* p)
648                                 {
649                                         avfilter_graph_free(&p);
650                                 });
651
652                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
653                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
654
655                 const auto sample_aspect_ratio =
656                         boost::rational<int>(
657                                         in_video_format_.square_width,
658                                         in_video_format_.square_height) /
659                         boost::rational<int>(
660                                         in_video_format_.width,
661                                         in_video_format_.height);
662
663                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
664                         % in_video_format_.width % in_video_format_.height
665                         % AV_PIX_FMT_BGRA
666                         % in_video_format_.duration     % in_video_format_.time_scale
667                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
668                         % in_video_format_.time_scale % in_video_format_.duration).str();
669
670                 AVFilterContext* filt_vsrc = nullptr;
671                 FF(avfilter_graph_create_filter(
672                                 &filt_vsrc,
673                                 avfilter_get_by_name("buffer"),
674                                 "ffmpeg_consumer_buffer",
675                                 vsrc_options.c_str(),
676                                 nullptr,
677                                 video_graph_.get()));
678
679                 AVFilterContext* filt_vsink = nullptr;
680                 FF(avfilter_graph_create_filter(
681                                 &filt_vsink,
682                                 avfilter_get_by_name("buffersink"),
683                                 "ffmpeg_consumer_buffersink",
684                                 nullptr,
685                                 nullptr,
686                                 video_graph_.get()));
687
688 #pragma warning (push)
689 #pragma warning (disable : 4245)
690
691                 FF(av_opt_set_int_list(
692                                 filt_vsink,
693                                 "pix_fmts",
694                                 codec.pix_fmts,
695                                 -1,
696                                 AV_OPT_SEARCH_CHILDREN));
697
698 #pragma warning (pop)
699
700                 configure_filtergraph(
701                                 *video_graph_,
702                                 filtergraph,
703                                 *filt_vsrc,
704                                 *filt_vsink);
705
706                 video_graph_in_  = filt_vsrc;
707                 video_graph_out_ = filt_vsink;
708
709                 CASPAR_LOG(info)
710                         <<      u16(std::string("\n")
711                                 + avfilter_graph_dump(
712                                                 video_graph_.get(),
713                                                 nullptr));
714         }
715
716         void configure_audio_filters(
717                         const AVCodec& codec,
718                         const std::string& filtergraph)
719         {
720                 audio_graph_.reset(
721                         avfilter_graph_alloc(),
722                         [](AVFilterGraph* p)
723                         {
724                                 avfilter_graph_free(&p);
725                         });
726
727                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
728                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
729
730                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
731                         % in_video_format_.audio_sample_rate
732                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
733                         % in_channel_layout_.num_channels
734                         % 1     % in_video_format_.audio_sample_rate
735                         % boost::io::group(
736                                 std::hex,
737                                 std::showbase,
738                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
739
740                 AVFilterContext* filt_asrc = nullptr;
741                 FF(avfilter_graph_create_filter(
742                         &filt_asrc,
743                         avfilter_get_by_name("abuffer"),
744                         "ffmpeg_consumer_abuffer",
745                         asrc_options.c_str(),
746                         nullptr,
747                         audio_graph_.get()));
748
749                 AVFilterContext* filt_asink = nullptr;
750                 FF(avfilter_graph_create_filter(
751                         &filt_asink,
752                         avfilter_get_by_name("abuffersink"),
753                         "ffmpeg_consumer_abuffersink",
754                         nullptr,
755                         nullptr,
756                         audio_graph_.get()));
757
758 #pragma warning (push)
759 #pragma warning (disable : 4245)
760
761                 FF(av_opt_set_int(
762                         filt_asink,
763                         "all_channel_counts",
764                         1,
765                         AV_OPT_SEARCH_CHILDREN));
766
767                 FF(av_opt_set_int_list(
768                         filt_asink,
769                         "sample_fmts",
770                         codec.sample_fmts,
771                         -1,
772                         AV_OPT_SEARCH_CHILDREN));
773
774                 FF(av_opt_set_int_list(
775                         filt_asink,
776                         "channel_layouts",
777                         codec.channel_layouts,
778                         -1,
779                         AV_OPT_SEARCH_CHILDREN));
780
781                 FF(av_opt_set_int_list(
782                         filt_asink,
783                         "sample_rates" ,
784                         codec.supported_samplerates,
785                         -1,
786                         AV_OPT_SEARCH_CHILDREN));
787
788 #pragma warning (pop)
789
790                 configure_filtergraph(
791                         *audio_graph_,
792                         filtergraph,
793                         *filt_asrc,
794                         *filt_asink);
795
796                 audio_graph_in_  = filt_asrc;
797                 audio_graph_out_ = filt_asink;
798
799                 CASPAR_LOG(info)
800                         <<      u16(std::string("\n")
801                                 + avfilter_graph_dump(
802                                         audio_graph_.get(),
803                                         nullptr));
804         }
805
806         void configure_filtergraph(
807                         AVFilterGraph& graph,
808                         const std::string& filtergraph,
809                         AVFilterContext& source_ctx,
810                         AVFilterContext& sink_ctx)
811         {
812                 AVFilterInOut* outputs = nullptr;
813                 AVFilterInOut* inputs = nullptr;
814
815                 try
816                 {
817                         if(!filtergraph.empty())
818                         {
819                                 outputs = avfilter_inout_alloc();
820                                 inputs  = avfilter_inout_alloc();
821
822                                 CASPAR_VERIFY(outputs && inputs);
823
824                                 outputs->name       = av_strdup("in");
825                                 outputs->filter_ctx = &source_ctx;
826                                 outputs->pad_idx    = 0;
827                                 outputs->next       = nullptr;
828
829                                 inputs->name        = av_strdup("out");
830                                 inputs->filter_ctx  = &sink_ctx;
831                                 inputs->pad_idx     = 0;
832                                 inputs->next        = nullptr;
833
834                                 FF(avfilter_graph_parse(
835                                         &graph,
836                                         filtergraph.c_str(),
837                                         inputs,
838                                         outputs,
839                                         nullptr));
840                         }
841                         else
842                         {
843                                 FF(avfilter_link(
844                                         &source_ctx,
845                                         0,
846                                         &sink_ctx,
847                                         0));
848                         }
849
850                         FF(avfilter_graph_config(
851                                 &graph,
852                                 nullptr));
853                 }
854                 catch(...)
855                 {
856                         avfilter_inout_free(&outputs);
857                         avfilter_inout_free(&inputs);
858                         throw;
859                 }
860         }
861
862         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
863         {
864                 if(!video_st_)
865                         return;
866
867                 auto enc = video_st_->codec;
868
869                 if(frame_ptr != core::const_frame::empty())
870                 {
871                         auto src_av_frame = create_frame();
872
873                         const auto sample_aspect_ratio =
874                                 boost::rational<int>(
875                                         in_video_format_.square_width,
876                                         in_video_format_.square_height) /
877                                 boost::rational<int>(
878                                         in_video_format_.width,
879                                         in_video_format_.height);
880
881                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
882                         src_av_frame->width                                       = in_video_format_.width;
883                         src_av_frame->height                              = in_video_format_.height;
884                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
885                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
886                         src_av_frame->pts                                         = video_pts_;
887
888                         video_pts_ += 1;
889
890                         FF(av_image_fill_arrays(
891                                 src_av_frame->data,
892                                 src_av_frame->linesize,
893                                 frame_ptr.image_data().begin(),
894                                 static_cast<AVPixelFormat>(src_av_frame->format),
895                                 in_video_format_.width,
896                                 in_video_format_.height,
897                                 1));
898
899                         FF(av_buffersrc_add_frame(
900                                 video_graph_in_,
901                                 src_av_frame.get()));
902                 }
903
904                 int ret = 0;
905
906                 while(ret >= 0)
907                 {
908                         auto filt_frame = create_frame();
909
910                         ret = av_buffersink_get_frame(
911                                 video_graph_out_,
912                                 filt_frame.get());
913
914                         video_encoder_executor_.begin_invoke([=]
915                         {
916                                 if(ret == AVERROR_EOF)
917                                 {
918                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
919                                         {
920                                                 while(encode_av_frame(
921                                                                 *video_st_,
922                                                                 video_bitstream_filter_.get(),
923                                                                 avcodec_encode_video2,
924                                                                 nullptr, token))
925                                                 {
926                                                         boost::this_thread::yield(); // TODO:
927                                                 }
928                                         }
929                                 }
930                                 else if(ret != AVERROR(EAGAIN))
931                                 {
932                                         FF_RET(ret, "av_buffersink_get_frame");
933
934                                         if (filt_frame->interlaced_frame)
935                                         {
936                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
937                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
938                                                 else
939                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
940                                         }
941                                         else
942                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
943
944                                         filt_frame->quality = enc->global_quality;
945
946                                         if (!enc->me_threshold)
947                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
948
949                                         encode_av_frame(
950                                                 *video_st_,
951                                                 video_bitstream_filter_.get(),
952                                                 avcodec_encode_video2,
953                                                 filt_frame,
954                                                 token);
955
956                                         boost::this_thread::yield(); // TODO:
957                                 }
958                         });
959                 }
960         }
961
962         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
963         {
964                 if(!audio_st_)
965                         return;
966
967                 auto enc = audio_st_->codec;
968
969                 if(frame_ptr != core::const_frame::empty())
970                 {
971                         auto src_av_frame = create_frame();
972
973                         src_av_frame->channels           = in_channel_layout_.num_channels;
974                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
975                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
976                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
977                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
978                         src_av_frame->pts                        = audio_pts_;
979
980                         audio_pts_ += src_av_frame->nb_samples;
981
982                         FF(av_samples_fill_arrays(
983                                         src_av_frame->extended_data,
984                                         src_av_frame->linesize,
985                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
986                                         src_av_frame->channels,
987                                         src_av_frame->nb_samples,
988                                         static_cast<AVSampleFormat>(src_av_frame->format),
989                                         16));
990
991                         FF(av_buffersrc_add_frame(
992                                         audio_graph_in_,
993                                         src_av_frame.get()));
994                 }
995
996                 int ret = 0;
997
998                 while(ret >= 0)
999                 {
1000                         auto filt_frame = create_frame();
1001
1002                         ret = av_buffersink_get_frame(
1003                                 audio_graph_out_,
1004                                 filt_frame.get());
1005
1006                         audio_encoder_executor_.begin_invoke([=]
1007                         {
1008                                 if(ret == AVERROR_EOF)
1009                                 {
1010                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1011                                         {
1012                                                 while(encode_av_frame(
1013                                                                 *audio_st_,
1014                                                                 audio_bitstream_filter_.get(),
1015                                                                 avcodec_encode_audio2,
1016                                                                 nullptr,
1017                                                                 token))
1018                                                 {
1019                                                         boost::this_thread::yield(); // TODO:
1020                                                 }
1021                                         }
1022                                 }
1023                                 else if(ret != AVERROR(EAGAIN))
1024                                 {
1025                                         FF_RET(
1026                                                 ret,
1027                                                 "av_buffersink_get_frame");
1028
1029                                         encode_av_frame(
1030                                                 *audio_st_,
1031                                                 audio_bitstream_filter_.get(),
1032                                                 avcodec_encode_audio2,
1033                                                 filt_frame,
1034                                                 token);
1035
1036                                         boost::this_thread::yield(); // TODO:
1037                                 }
1038                         });
1039                 }
1040         }
1041
1042         template<typename F>
1043         bool encode_av_frame(
1044                         AVStream& st,
1045                         AVBitStreamFilterContext* bsfc,
1046                         const F& func,
1047                         const std::shared_ptr<AVFrame>& src_av_frame,
1048                         std::shared_ptr<void> token)
1049         {
1050                 AVPacket pkt = {};
1051                 av_init_packet(&pkt);
1052
1053                 int got_packet = 0;
1054
1055                 FF(func(
1056                         st.codec,
1057                         &pkt,
1058                         src_av_frame.get(),
1059                         &got_packet));
1060
1061                 if(!got_packet || pkt.size <= 0)
1062                         return false;
1063
1064                 pkt.stream_index = st.index;
1065
1066                 if(bsfc)
1067                 {
1068                         auto new_pkt = pkt;
1069
1070                         auto a = av_bitstream_filter_filter(
1071                                         bsfc,
1072                                         st.codec,
1073                                         nullptr,
1074                                         &new_pkt.data,
1075                                         &new_pkt.size,
1076                                         pkt.data,
1077                                         pkt.size,
1078                                         pkt.flags & AV_PKT_FLAG_KEY);
1079
1080                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct)
1081                         {
1082                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1083
1084                                 if(t)
1085                                 {
1086                                         memcpy(
1087                                                 t,
1088                                                 new_pkt.data,
1089                                                 new_pkt.size);
1090
1091                                         memset(
1092                                                 t + new_pkt.size,
1093                                                 0,
1094                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1095
1096                                         new_pkt.data = t;
1097                                         new_pkt.buf  = nullptr;
1098                                 }
1099                                 else
1100                                         a = AVERROR(ENOMEM);
1101                         }
1102
1103                         av_free_packet(&pkt);
1104
1105                         FF_RET(
1106                                 a,
1107                                 "av_bitstream_filter_filter");
1108
1109                         new_pkt.buf =
1110                                 av_buffer_create(
1111                                         new_pkt.data,
1112                                         new_pkt.size,
1113                                         av_buffer_default_free,
1114                                         nullptr,
1115                                         0);
1116
1117                         CASPAR_VERIFY(new_pkt.buf);
1118
1119                         pkt = new_pkt;
1120                 }
1121
1122                 if (pkt.pts != AV_NOPTS_VALUE)
1123                 {
1124                         pkt.pts =
1125                                 av_rescale_q(
1126                                         pkt.pts,
1127                                         st.codec->time_base,
1128                                         st.time_base);
1129                 }
1130
1131                 if (pkt.dts != AV_NOPTS_VALUE)
1132                 {
1133                         pkt.dts =
1134                                 av_rescale_q(
1135                                         pkt.dts,
1136                                         st.codec->time_base,
1137                                         st.time_base);
1138                 }
1139
1140                 pkt.duration =
1141                         static_cast<int>(
1142                                 av_rescale_q(
1143                                         pkt.duration,
1144                                         st.codec->time_base, st.time_base));
1145
1146                 write_packet(
1147                         std::shared_ptr<AVPacket>(
1148                                 new AVPacket(pkt),
1149                                 [](AVPacket* p)
1150                                 {
1151                                         av_free_packet(p);
1152                                         delete p;
1153                                 }), token);
1154
1155                 return true;
1156         }
1157
1158         void write_packet(
1159                         const std::shared_ptr<AVPacket>& pkt_ptr,
1160                         std::shared_ptr<void> token)
1161         {
1162                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1163                 {
1164                         FF(av_interleaved_write_frame(
1165                                 oc_.get(),
1166                                 pkt_ptr.get()));
1167                 });
1168         }
1169
1170         template<typename T>
1171         static boost::optional<T> try_remove_arg(
1172                         std::map<std::string, std::string>& options,
1173                         const boost::regex& expr)
1174         {
1175                 for(auto it = options.begin(); it != options.end(); ++it)
1176                 {
1177                         if(boost::regex_search(it->first, expr))
1178                         {
1179                                 auto arg = it->second;
1180                                 options.erase(it);
1181                                 return boost::lexical_cast<T>(arg);
1182                         }
1183                 }
1184
1185                 return boost::optional<T>();
1186         }
1187
1188         static std::map<std::string, std::string> remove_options(
1189                         std::map<std::string, std::string>& options,
1190                         const boost::regex& expr)
1191         {
1192                 std::map<std::string, std::string> result;
1193
1194                 auto it = options.begin();
1195                 while(it != options.end())
1196                 {
1197                         boost::smatch what;
1198                         if(boost::regex_search(it->first, what, expr))
1199                         {
1200                                 result[
1201                                         what.size() > 0 && what[1].matched
1202                                                 ? what[1].str()
1203                                                 : it->first] = it->second;
1204                                 it = options.erase(it);
1205                         }
1206                         else
1207                                 ++it;
1208                 }
1209
1210                 return result;
1211         }
1212
1213         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1214         {
1215                 for (const auto& entry : c)
1216                 {
1217                         av_dict_set(
1218                                 dest,
1219                                 entry.first.c_str(),
1220                                 entry.second.c_str(), 0);
1221                 }
1222         }
1223
1224         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1225         {
1226                 std::map<std::string, std::string> result;
1227
1228                 for(auto t = dict
1229                                 ? av_dict_get(
1230                                         dict,
1231                                         "",
1232                                         nullptr,
1233                                         AV_DICT_IGNORE_SUFFIX)
1234                                 : nullptr;
1235                         t;
1236                         t = av_dict_get(
1237                                 dict,
1238                                 "",
1239                                 t,
1240                                 AV_DICT_IGNORE_SUFFIX))
1241                 {
1242                         result[t->key] = t->value;
1243                 }
1244
1245                 return result;
1246         }
1247 };
1248
1249 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1250 {
1251         sink.short_description(L"For streaming the contents of a channel using FFmpeg.");
1252         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1253         sink.para()->text(L"For streaming the contents of a channel using FFmpeg");
1254         sink.definitions()
1255                 ->item(L"url", L"The stream URL to create/stream to.")
1256                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1257         sink.para()->text(L"Examples:");
1258         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1259 }
1260
1261 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1262                 const std::vector<std::wstring>& params, core::interaction_sink*)
1263 {
1264         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1265                 return core::frame_consumer::empty();
1266
1267         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1268         auto path = u8(params.size() > 1 ? params.at(1) : L"");
1269         auto args = u8(boost::join(params, L" "));
1270
1271         return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
1272 }
1273
1274 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1275                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1276 {
1277         return spl::make_shared<streaming_consumer>(
1278                         u8(ptree_get<std::wstring>(ptree, L"path")),
1279                         u8(ptree.get<std::wstring>(L"args", L"")),
1280                         false);
1281 }
1282
1283 }}