]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
82fac99ac6a3fc4d1a09a1f7aee20944bd97b520
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6 #include "../producer/util/util.h"
7
8 #include <common/except.h>
9 #include <common/executor.h>
10 #include <common/assert.h>
11 #include <common/utf.h>
12 #include <common/future.h>
13 #include <common/env.h>
14 #include <common/scope_exit.h>
15 #include <common/ptree.h>
16
17 #include <core/consumer/frame_consumer.h>
18 #include <core/frame/frame.h>
19 #include <core/frame/audio_channel_layout.h>
20 #include <core/video_format.h>
21 #include <core/monitor/monitor.h>
22 #include <core/help/help_repository.h>
23 #include <core/help/help_sink.h>
24
25 #include <boost/noncopyable.hpp>
26 #include <boost/rational.hpp>
27 #include <boost/format.hpp>
28 #include <boost/algorithm/string/predicate.hpp>
29 #include <boost/property_tree/ptree.hpp>
30
31 #pragma warning(push)
32 #pragma warning(disable: 4244)
33 #pragma warning(disable: 4245)
34 #include <boost/crc.hpp>
35 #pragma warning(pop)
36
37 #include <tbb/atomic.h>
38 #include <tbb/concurrent_queue.h>
39 #include <tbb/parallel_invoke.h>
40 #include <tbb/parallel_for.h>
41
42 #include <numeric>
43
44 #pragma warning(push)
45 #pragma warning(disable: 4244)
46
47 extern "C"
48 {
49         #define __STDC_CONSTANT_MACROS
50         #define __STDC_LIMIT_MACROS
51         #include <libavformat/avformat.h>
52         #include <libavcodec/avcodec.h>
53         #include <libavutil/avutil.h>
54         #include <libavutil/frame.h>
55         #include <libavutil/opt.h>
56         #include <libavutil/imgutils.h>
57         #include <libavutil/parseutils.h>
58         #include <libavfilter/avfilter.h>
59         #include <libavfilter/buffersink.h>
60         #include <libavfilter/buffersrc.h>
61 }
62
63 #pragma warning(pop)
64
65 namespace caspar { namespace ffmpeg {
66
67 int crc16(const std::string& str)
68 {
69         boost::crc_16_type result;
70
71         result.process_bytes(str.data(), str.length());
72
73         return result.checksum();
74 }
75
76 class streaming_consumer final : public core::frame_consumer
77 {
78 public:
79         // Static Members
80
81 private:
82         core::monitor::subject                                          subject_;
83         boost::filesystem::path                                         path_;
84         int                                                                                     consumer_index_offset_;
85
86         std::map<std::string, std::string>                      options_;
87         bool                                                                            compatibility_mode_;
88
89         core::video_format_desc                                         in_video_format_;
90         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
91
92         std::shared_ptr<AVFormatContext>                        oc_;
93         tbb::atomic<bool>                                                       abort_request_;
94
95         std::shared_ptr<AVStream>                                       video_st_;
96         std::shared_ptr<AVStream>                                       audio_st_;
97
98         std::int64_t                                                            video_pts_;
99         std::int64_t                                                            audio_pts_;
100
101     AVFilterContext*                                                    audio_graph_in_;
102     AVFilterContext*                                                    audio_graph_out_;
103     std::shared_ptr<AVFilterGraph>                              audio_graph_;
104
105     AVFilterContext*                                                    video_graph_in_;
106     AVFilterContext*                                                    video_graph_out_;
107     std::shared_ptr<AVFilterGraph>                              video_graph_;
108
109         executor                                                                        executor_;
110
111         executor                                                                        video_encoder_executor_;
112         executor                                                                        audio_encoder_executor_;
113
114         tbb::atomic<int>                                                        tokens_;
115         boost::mutex                                                            tokens_mutex_;
116         boost::condition_variable                                       tokens_cond_;
117         tbb::atomic<int64_t>                                            current_encoding_delay_;
118
119         executor                                                                        write_executor_;
120
121 public:
122
123         streaming_consumer(
124                         std::string path,
125                         std::string options,
126                         bool compatibility_mode)
127                 : path_(path)
128                 , consumer_index_offset_(crc16(path))
129                 , compatibility_mode_(compatibility_mode)
130                 , video_pts_(0)
131                 , audio_pts_(0)
132                 , executor_(print())
133                 , audio_encoder_executor_(print() + L" audio_encoder")
134                 , video_encoder_executor_(print() + L" video_encoder")
135                 , write_executor_(print() + L" io")
136         {
137                 abort_request_ = false;
138                 current_encoding_delay_ = 0;
139
140                 for(auto it =
141                                 boost::sregex_iterator(
142                                         options.begin(),
143                                         options.end(),
144                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?"));
145                         it != boost::sregex_iterator();
146                         ++it)
147                 {
148                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
149                 }
150
151         if (options_.find("threads") == options_.end())
152             options_["threads"] = "auto";
153
154                 tokens_ =
155                         std::max(
156                                 1,
157                                 try_remove_arg<int>(
158                                         options_,
159                                         boost::regex("tokens")).get_value_or(2));
160         }
161
162         ~streaming_consumer()
163         {
164                 if(oc_)
165                 {
166                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
167                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
168
169                         video_encoder_executor_.stop();
170                         audio_encoder_executor_.stop();
171                         video_encoder_executor_.join();
172                         audio_encoder_executor_.join();
173
174                         video_graph_.reset();
175                         audio_graph_.reset();
176                         video_st_.reset();
177                         audio_st_.reset();
178
179                         write_packet(nullptr, nullptr);
180
181                         write_executor_.stop();
182                         write_executor_.join();
183
184                         FF(av_write_trailer(oc_.get()));
185
186                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
187                                 avio_close(oc_->pb);
188
189                         oc_.reset();
190                 }
191         }
192
193         void initialize(
194                         const core::video_format_desc& format_desc,
195                         const core::audio_channel_layout& channel_layout,
196                         int channel_index) override
197         {
198                 try
199                 {
200                         static boost::regex prot_exp("^.+:.*" );
201
202                         const auto overwrite =
203                                 try_remove_arg<std::string>(
204                                         options_,
205                                         boost::regex("y")) != boost::none;
206
207                         if(!boost::regex_match(
208                                         path_.string(),
209                                         prot_exp))
210                         {
211                                 if(!path_.is_complete())
212                                 {
213                                         path_ =
214                                                 u8(
215                                                         env::media_folder()) +
216                                                         path_.string();
217                                 }
218
219                                 if(boost::filesystem::exists(path_))
220                                 {
221                                         if(!overwrite && !compatibility_mode_)
222                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
223
224                                         boost::filesystem::remove(path_);
225                                 }
226                         }
227
228                         const auto oformat_name =
229                                 try_remove_arg<std::string>(
230                                         options_,
231                                         boost::regex("^f|format$"));
232
233                         AVFormatContext* oc;
234
235                         FF(avformat_alloc_output_context2(
236                                 &oc,
237                                 nullptr,
238                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr,
239                                 path_.string().c_str()));
240
241                         oc_.reset(
242                                 oc,
243                                 avformat_free_context);
244
245                         CASPAR_VERIFY(oc_->oformat);
246
247                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
248                         oc_->interrupt_callback.opaque   = this;
249
250                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
251
252                         in_video_format_ = format_desc;
253                         in_channel_layout_ = channel_layout;
254
255                         CASPAR_VERIFY(oc_->oformat);
256
257                         const auto video_codec_name =
258                                 try_remove_arg<std::string>(
259                                         options_,
260                                         boost::regex("^c:v|codec:v|vcodec$"));
261
262                         const auto video_codec =
263                                 video_codec_name
264                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
265                                         : avcodec_find_encoder(oc_->oformat->video_codec);
266
267                         const auto audio_codec_name =
268                                 try_remove_arg<std::string>(
269                                         options_,
270                                          boost::regex("^c:a|codec:a|acodec$"));
271
272                         const auto audio_codec =
273                                 audio_codec_name
274                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
275                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
276
277                         if (!video_codec)
278                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
279                                                 "Failed to find video codec " + (video_codec_name
280                                                                 ? *video_codec_name
281                                                                 : "with id " + boost::lexical_cast<std::string>(
282                                                                                 oc_->oformat->video_codec))));
283                         if (!audio_codec)
284                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
285                                                 "Failed to find audio codec " + (audio_codec_name
286                                                                 ? *audio_codec_name
287                                                                 : "with id " + boost::lexical_cast<std::string>(
288                                                                                 oc_->oformat->audio_codec))));
289
290                         // Filters
291
292                         {
293                                 configure_video_filters(
294                                         *video_codec,
295                                         try_remove_arg<std::string>(options_,
296                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
297
298                                 configure_audio_filters(
299                                         *audio_codec,
300                                         try_remove_arg<std::string>(options_,
301                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
302                         }
303
304                         // Encoders
305
306                         {
307                                 auto video_options = options_;
308                                 auto audio_options = options_;
309
310                                 video_st_ = open_encoder(
311                                         *video_codec,
312                                         video_options);
313
314                                 audio_st_ = open_encoder(
315                                         *audio_codec,
316                                         audio_options);
317
318                                 auto it = options_.begin();
319                                 while(it != options_.end())
320                                 {
321                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
322                                                 it = options_.erase(it);
323                                         else
324                                                 ++it;
325                                 }
326                         }
327
328                         // Output
329                         {
330                                 AVDictionary* av_opts = nullptr;
331
332                                 to_dict(
333                                         &av_opts,
334                                         std::move(options_));
335
336                                 CASPAR_SCOPE_EXIT
337                                 {
338                                         av_dict_free(&av_opts);
339                                 };
340
341                                 if (!(oc_->oformat->flags & AVFMT_NOFILE))
342                                 {
343                                         FF(avio_open2(
344                                                 &oc_->pb,
345                                                 path_.string().c_str(),
346                                                 AVIO_FLAG_WRITE,
347                                                 &oc_->interrupt_callback,
348                                                 &av_opts));
349                                 }
350
351                                 FF(avformat_write_header(
352                                         oc_.get(),
353                                         &av_opts));
354
355                                 options_ = to_map(av_opts);
356                         }
357
358                         // Dump Info
359
360                         av_dump_format(
361                                 oc_.get(),
362                                 0,
363                                 oc_->filename,
364                                 1);
365
366                         for (const auto& option : options_)
367                         {
368                                 CASPAR_LOG(warning)
369                                         << L"Invalid option: -"
370                                         << u16(option.first)
371                                         << L" "
372                                         << u16(option.second);
373                         }
374                 }
375                 catch(...)
376                 {
377                         video_st_.reset();
378                         audio_st_.reset();
379                         oc_.reset();
380                         throw;
381                 }
382         }
383
384         core::monitor::subject& monitor_output() override
385         {
386                 return subject_;
387         }
388
389         std::wstring name() const override
390         {
391                 return L"streaming";
392         }
393
394         std::future<bool> send(core::const_frame frame) override
395         {
396                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
397
398                 --tokens_;
399                 std::shared_ptr<void> token(
400                         nullptr,
401                         [this, frame](void*)
402                         {
403                                 ++tokens_;
404                                 tokens_cond_.notify_one();
405                                 current_encoding_delay_ = frame.get_age_millis();
406                         });
407
408                 return executor_.begin_invoke([=]() -> bool
409                 {
410                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
411
412                         while(tokens_ < 0)
413                                 tokens_cond_.wait(tokens_lock);
414
415                         video_encoder_executor_.begin_invoke([=]() mutable
416                         {
417                                 encode_video(
418                                         frame,
419                                         token);
420                         });
421
422                         audio_encoder_executor_.begin_invoke([=]() mutable
423                         {
424                                 encode_audio(
425                                         frame,
426                                         token);
427                         });
428
429                         return true;
430                 });
431         }
432
433         std::wstring print() const override
434         {
435                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
436         }
437
438         virtual boost::property_tree::wptree info() const override
439         {
440                 boost::property_tree::wptree info;
441                 info.add(L"type", L"stream");
442                 info.add(L"path", path_.wstring());
443                 return info;
444         }
445
446         bool has_synchronization_clock() const override
447         {
448                 return false;
449         }
450
451         int buffer_depth() const override
452         {
453                 return -1;
454         }
455
456         int index() const override
457         {
458                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
459         }
460
461         int64_t presentation_frame_age_millis() const override
462         {
463                 return current_encoding_delay_;
464         }
465
466 private:
467
468         static int interrupt_cb(void* ctx)
469         {
470                 CASPAR_ASSERT(ctx);
471                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;
472         }
473
474         std::shared_ptr<AVStream> open_encoder(
475                         const AVCodec& codec,
476                         std::map<std::string,
477                         std::string>& options)
478         {
479                 auto st =
480                         avformat_new_stream(
481                                 oc_.get(),
482                                 &codec);
483
484                 if (!st)
485                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
486
487                 auto enc = st->codec;
488
489                 CASPAR_VERIFY(enc);
490
491                 switch(enc->codec_type)
492                 {
493                         case AVMEDIA_TYPE_VIDEO:
494                         {
495                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
496                                 enc->pix_fmt                            = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
497                                 enc->sample_aspect_ratio        = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
498                                 enc->width                                      = video_graph_out_->inputs[0]->w;
499                                 enc->height                                     = video_graph_out_->inputs[0]->h;
500                                 enc->bit_rate_tolerance         = 400 * 1000000;
501
502                                 break;
503                         }
504                         case AVMEDIA_TYPE_AUDIO:
505                         {
506                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
507                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
508                                 enc->sample_rate                        = audio_graph_out_->inputs[0]->sample_rate;
509                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
510                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
511
512                                 break;
513                         }
514                 }
515
516                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
517                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
518
519                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
520
521                 const auto char_id = char_id_map.at(enc->codec_type);
522
523                 const auto codec_opts =
524                         remove_options(
525                                 options,
526                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
527
528                 AVDictionary* av_codec_opts = nullptr;
529
530                 to_dict(
531                         &av_codec_opts,
532                         options);
533
534                 to_dict(
535                         &av_codec_opts,
536                         codec_opts);
537
538                 options.clear();
539
540                 FF(avcodec_open2(
541                         enc,
542                         &codec,
543                         av_codec_opts ? &av_codec_opts : nullptr));
544
545                 if(av_codec_opts)
546                 {
547                         auto t =
548                                 av_dict_get(
549                                         av_codec_opts,
550                                         "",
551                                          nullptr,
552                                         AV_DICT_IGNORE_SUFFIX);
553
554                         while(t)
555                         {
556                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
557
558                                 t = av_dict_get(
559                                                 av_codec_opts,
560                                                 "",
561                                                 t,
562                                                 AV_DICT_IGNORE_SUFFIX);
563                         }
564
565                         av_dict_free(&av_codec_opts);
566                 }
567
568                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
569                 {
570                         CASPAR_ASSERT(enc->frame_size > 0);
571                         av_buffersink_set_frame_size(audio_graph_out_,
572                                                                                  enc->frame_size);
573                 }
574
575                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
576                 {
577                         avcodec_close(st->codec);
578                 });
579         }
580
581         void configure_video_filters(
582                         const AVCodec& codec,
583                         const std::string& filtergraph)
584         {
585                 video_graph_.reset(
586                                 avfilter_graph_alloc(),
587                                 [](AVFilterGraph* p)
588                                 {
589                                         avfilter_graph_free(&p);
590                                 });
591
592                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
593                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
594
595                 const auto sample_aspect_ratio =
596                         boost::rational<int>(
597                                         in_video_format_.square_width,
598                                         in_video_format_.square_height) /
599                         boost::rational<int>(
600                                         in_video_format_.width,
601                                         in_video_format_.height);
602
603                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
604                         % in_video_format_.width % in_video_format_.height
605                         % AV_PIX_FMT_BGRA
606                         % in_video_format_.duration     % in_video_format_.time_scale
607                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
608                         % in_video_format_.time_scale % in_video_format_.duration).str();
609
610                 AVFilterContext* filt_vsrc = nullptr;
611                 FF(avfilter_graph_create_filter(
612                                 &filt_vsrc,
613                                 avfilter_get_by_name("buffer"),
614                                 "ffmpeg_consumer_buffer",
615                                 vsrc_options.c_str(),
616                                 nullptr,
617                                 video_graph_.get()));
618
619                 AVFilterContext* filt_vsink = nullptr;
620                 FF(avfilter_graph_create_filter(
621                                 &filt_vsink,
622                                 avfilter_get_by_name("buffersink"),
623                                 "ffmpeg_consumer_buffersink",
624                                 nullptr,
625                                 nullptr,
626                                 video_graph_.get()));
627
628 #pragma warning (push)
629 #pragma warning (disable : 4245)
630
631                 FF(av_opt_set_int_list(
632                                 filt_vsink,
633                                 "pix_fmts",
634                                 codec.pix_fmts,
635                                 -1,
636                                 AV_OPT_SEARCH_CHILDREN));
637
638 #pragma warning (pop)
639
640                 configure_filtergraph(
641                                 *video_graph_,
642                                 filtergraph,
643                                 *filt_vsrc,
644                                 *filt_vsink);
645
646                 video_graph_in_  = filt_vsrc;
647                 video_graph_out_ = filt_vsink;
648
649                 CASPAR_LOG(info)
650                         <<      u16(std::string("\n")
651                                 + avfilter_graph_dump(
652                                                 video_graph_.get(),
653                                                 nullptr));
654         }
655
656         void configure_audio_filters(
657                         const AVCodec& codec,
658                         const std::string& filtergraph)
659         {
660                 audio_graph_.reset(
661                         avfilter_graph_alloc(),
662                         [](AVFilterGraph* p)
663                         {
664                                 avfilter_graph_free(&p);
665                         });
666
667                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
668                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
669
670                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
671                         % in_video_format_.audio_sample_rate
672                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
673                         % in_channel_layout_.num_channels
674                         % 1     % in_video_format_.audio_sample_rate
675                         % boost::io::group(
676                                 std::hex,
677                                 std::showbase,
678                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
679
680                 AVFilterContext* filt_asrc = nullptr;
681                 FF(avfilter_graph_create_filter(
682                         &filt_asrc,
683                         avfilter_get_by_name("abuffer"),
684                         "ffmpeg_consumer_abuffer",
685                         asrc_options.c_str(),
686                         nullptr,
687                         audio_graph_.get()));
688
689                 AVFilterContext* filt_asink = nullptr;
690                 FF(avfilter_graph_create_filter(
691                         &filt_asink,
692                         avfilter_get_by_name("abuffersink"),
693                         "ffmpeg_consumer_abuffersink",
694                         nullptr,
695                         nullptr,
696                         audio_graph_.get()));
697
698 #pragma warning (push)
699 #pragma warning (disable : 4245)
700
701                 FF(av_opt_set_int(
702                         filt_asink,
703                         "all_channel_counts",
704                         1,
705                         AV_OPT_SEARCH_CHILDREN));
706
707                 FF(av_opt_set_int_list(
708                         filt_asink,
709                         "sample_fmts",
710                         codec.sample_fmts,
711                         -1,
712                         AV_OPT_SEARCH_CHILDREN));
713
714                 FF(av_opt_set_int_list(
715                         filt_asink,
716                         "channel_layouts",
717                         codec.channel_layouts,
718                         -1,
719                         AV_OPT_SEARCH_CHILDREN));
720
721                 FF(av_opt_set_int_list(
722                         filt_asink,
723                         "sample_rates" ,
724                         codec.supported_samplerates,
725                         -1,
726                         AV_OPT_SEARCH_CHILDREN));
727
728 #pragma warning (pop)
729
730                 configure_filtergraph(
731                         *audio_graph_,
732                         filtergraph,
733                         *filt_asrc,
734                         *filt_asink);
735
736                 audio_graph_in_  = filt_asrc;
737                 audio_graph_out_ = filt_asink;
738
739                 CASPAR_LOG(info)
740                         <<      u16(std::string("\n")
741                                 + avfilter_graph_dump(
742                                         audio_graph_.get(),
743                                         nullptr));
744         }
745
746         void configure_filtergraph(
747                         AVFilterGraph& graph,
748                         const std::string& filtergraph,
749                         AVFilterContext& source_ctx,
750                         AVFilterContext& sink_ctx)
751         {
752                 AVFilterInOut* outputs = nullptr;
753                 AVFilterInOut* inputs = nullptr;
754
755                 try
756                 {
757                         if(!filtergraph.empty())
758                         {
759                                 outputs = avfilter_inout_alloc();
760                                 inputs  = avfilter_inout_alloc();
761
762                                 CASPAR_VERIFY(outputs && inputs);
763
764                                 outputs->name       = av_strdup("in");
765                                 outputs->filter_ctx = &source_ctx;
766                                 outputs->pad_idx    = 0;
767                                 outputs->next       = nullptr;
768
769                                 inputs->name        = av_strdup("out");
770                                 inputs->filter_ctx  = &sink_ctx;
771                                 inputs->pad_idx     = 0;
772                                 inputs->next        = nullptr;
773
774                                 FF(avfilter_graph_parse(
775                                         &graph,
776                                         filtergraph.c_str(),
777                                         inputs,
778                                         outputs,
779                                         nullptr));
780                         }
781                         else
782                         {
783                                 FF(avfilter_link(
784                                         &source_ctx,
785                                         0,
786                                         &sink_ctx,
787                                         0));
788                         }
789
790                         FF(avfilter_graph_config(
791                                 &graph,
792                                 nullptr));
793                 }
794                 catch(...)
795                 {
796                         avfilter_inout_free(&outputs);
797                         avfilter_inout_free(&inputs);
798                         throw;
799                 }
800         }
801
802         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
803         {
804                 if(!video_st_)
805                         return;
806
807                 auto enc = video_st_->codec;
808
809                 if(frame_ptr != core::const_frame::empty())
810                 {
811                         auto src_av_frame = create_frame();
812
813                         const auto sample_aspect_ratio =
814                                 boost::rational<int>(
815                                         in_video_format_.square_width,
816                                         in_video_format_.square_height) /
817                                 boost::rational<int>(
818                                         in_video_format_.width,
819                                         in_video_format_.height);
820
821                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
822                         src_av_frame->width                                       = in_video_format_.width;
823                         src_av_frame->height                              = in_video_format_.height;
824                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
825                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
826                         src_av_frame->pts                                         = video_pts_;
827
828                         video_pts_ += 1;
829
830                         FF(av_image_fill_arrays(
831                                 src_av_frame->data,
832                                 src_av_frame->linesize,
833                                 frame_ptr.image_data().begin(),
834                                 static_cast<AVPixelFormat>(src_av_frame->format),
835                                 in_video_format_.width,
836                                 in_video_format_.height,
837                                 1));
838
839                         FF(av_buffersrc_add_frame(
840                                 video_graph_in_,
841                                 src_av_frame.get()));
842                 }
843
844                 int ret = 0;
845
846                 while(ret >= 0)
847                 {
848                         auto filt_frame = create_frame();
849
850                         ret = av_buffersink_get_frame(
851                                 video_graph_out_,
852                                 filt_frame.get());
853
854                         video_encoder_executor_.begin_invoke([=]
855                         {
856                                 if(ret == AVERROR_EOF)
857                                 {
858                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
859                                         {
860                                                 while(encode_av_frame(
861                                                                 *video_st_,
862                                                                 avcodec_encode_video2,
863                                                                 nullptr, token))
864                                                 {
865                                                         boost::this_thread::yield(); // TODO:
866                                                 }
867                                         }
868                                 }
869                                 else if(ret != AVERROR(EAGAIN))
870                                 {
871                                         FF_RET(ret, "av_buffersink_get_frame");
872
873                                         if (filt_frame->interlaced_frame)
874                                         {
875                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
876                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
877                                                 else
878                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
879                                         }
880                                         else
881                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
882
883                                         filt_frame->quality = enc->global_quality;
884
885                                         if (!enc->me_threshold)
886                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
887
888                                         encode_av_frame(
889                                                 *video_st_,
890                                                 avcodec_encode_video2,
891                                                 filt_frame,
892                                                 token);
893
894                                         boost::this_thread::yield(); // TODO:
895                                 }
896                         });
897                 }
898         }
899
900         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
901         {
902                 if(!audio_st_)
903                         return;
904
905                 auto enc = audio_st_->codec;
906
907                 if(frame_ptr != core::const_frame::empty())
908                 {
909                         auto src_av_frame = create_frame();
910
911                         src_av_frame->channels           = in_channel_layout_.num_channels;
912                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
913                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
914                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
915                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
916                         src_av_frame->pts                        = audio_pts_;
917
918                         audio_pts_ += src_av_frame->nb_samples;
919
920                         FF(av_samples_fill_arrays(
921                                         src_av_frame->extended_data,
922                                         src_av_frame->linesize,
923                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
924                                         src_av_frame->channels,
925                                         src_av_frame->nb_samples,
926                                         static_cast<AVSampleFormat>(src_av_frame->format),
927                                         16));
928
929                         FF(av_buffersrc_add_frame(
930                                         audio_graph_in_,
931                                         src_av_frame.get()));
932                 }
933
934                 int ret = 0;
935
936                 while(ret >= 0)
937                 {
938                         auto filt_frame = create_frame();
939
940                         ret = av_buffersink_get_frame(
941                                 audio_graph_out_,
942                                 filt_frame.get());
943
944                         audio_encoder_executor_.begin_invoke([=]
945                         {
946                                 if(ret == AVERROR_EOF)
947                                 {
948                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
949                                         {
950                                                 while(encode_av_frame(
951                                                                 *audio_st_,
952                                                                 avcodec_encode_audio2,
953                                                                 nullptr,
954                                                                 token))
955                                                 {
956                                                         boost::this_thread::yield(); // TODO:
957                                                 }
958                                         }
959                                 }
960                                 else if(ret != AVERROR(EAGAIN))
961                                 {
962                                         FF_RET(
963                                                 ret,
964                                                 "av_buffersink_get_frame");
965
966                                         encode_av_frame(
967                                                 *audio_st_,
968                                                 avcodec_encode_audio2,
969                                                 filt_frame,
970                                                 token);
971
972                                         boost::this_thread::yield(); // TODO:
973                                 }
974                         });
975                 }
976         }
977
978         template<typename F>
979         bool encode_av_frame(
980                         AVStream& st,
981                         const F& func,
982                         const std::shared_ptr<AVFrame>& src_av_frame,
983                         std::shared_ptr<void> token)
984         {
985                 AVPacket pkt = {};
986                 av_init_packet(&pkt);
987
988                 int got_packet = 0;
989
990                 FF(func(
991                         st.codec,
992                         &pkt,
993                         src_av_frame.get(),
994                         &got_packet));
995
996                 if(!got_packet || pkt.size <= 0)
997                         return false;
998
999                 pkt.stream_index = st.index;
1000
1001                 if (pkt.pts != AV_NOPTS_VALUE)
1002                 {
1003                         pkt.pts =
1004                                 av_rescale_q(
1005                                         pkt.pts,
1006                                         st.codec->time_base,
1007                                         st.time_base);
1008                 }
1009
1010                 if (pkt.dts != AV_NOPTS_VALUE)
1011                 {
1012                         pkt.dts =
1013                                 av_rescale_q(
1014                                         pkt.dts,
1015                                         st.codec->time_base,
1016                                         st.time_base);
1017                 }
1018
1019                 pkt.duration =
1020                         static_cast<int>(
1021                                 av_rescale_q(
1022                                         pkt.duration,
1023                                         st.codec->time_base, st.time_base));
1024
1025                 write_packet(
1026                         std::shared_ptr<AVPacket>(
1027                                 new AVPacket(pkt),
1028                                 [](AVPacket* p)
1029                                 {
1030                                         av_free_packet(p);
1031                                         delete p;
1032                                 }), token);
1033
1034                 return true;
1035         }
1036
1037         void write_packet(
1038                         const std::shared_ptr<AVPacket>& pkt_ptr,
1039                         std::shared_ptr<void> token)
1040         {
1041                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1042                 {
1043                         FF(av_interleaved_write_frame(
1044                                 oc_.get(),
1045                                 pkt_ptr.get()));
1046                 });
1047         }
1048
1049         template<typename T>
1050         static boost::optional<T> try_remove_arg(
1051                         std::map<std::string, std::string>& options,
1052                         const boost::regex& expr)
1053         {
1054                 for(auto it = options.begin(); it != options.end(); ++it)
1055                 {
1056                         if(boost::regex_search(it->first, expr))
1057                         {
1058                                 auto arg = it->second;
1059                                 options.erase(it);
1060                                 return boost::lexical_cast<T>(arg);
1061                         }
1062                 }
1063
1064                 return boost::optional<T>();
1065         }
1066
1067         static std::map<std::string, std::string> remove_options(
1068                         std::map<std::string, std::string>& options,
1069                         const boost::regex& expr)
1070         {
1071                 std::map<std::string, std::string> result;
1072
1073                 auto it = options.begin();
1074                 while(it != options.end())
1075                 {
1076                         boost::smatch what;
1077                         if(boost::regex_search(it->first, what, expr))
1078                         {
1079                                 result[
1080                                         what.size() > 0 && what[1].matched
1081                                                 ? what[1].str()
1082                                                 : it->first] = it->second;
1083                                 it = options.erase(it);
1084                         }
1085                         else
1086                                 ++it;
1087                 }
1088
1089                 return result;
1090         }
1091
1092         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1093         {
1094                 for (const auto& entry : c)
1095                 {
1096                         av_dict_set(
1097                                 dest,
1098                                 entry.first.c_str(),
1099                                 entry.second.c_str(), 0);
1100                 }
1101         }
1102
1103         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1104         {
1105                 std::map<std::string, std::string> result;
1106
1107                 for(auto t = dict
1108                                 ? av_dict_get(
1109                                         dict,
1110                                         "",
1111                                         nullptr,
1112                                         AV_DICT_IGNORE_SUFFIX)
1113                                 : nullptr;
1114                         t;
1115                         t = av_dict_get(
1116                                 dict,
1117                                 "",
1118                                 t,
1119                                 AV_DICT_IGNORE_SUFFIX))
1120                 {
1121                         result[t->key] = t->value;
1122                 }
1123
1124                 return result;
1125         }
1126 };
1127
1128 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1129 {
1130         sink.short_description(L"For streaming the contents of a channel using FFmpeg.");
1131         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1132         sink.para()->text(L"For streaming the contents of a channel using FFmpeg");
1133         sink.definitions()
1134                 ->item(L"url", L"The stream URL to create/stream to.")
1135                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1136         sink.para()->text(L"Examples:");
1137         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1138 }
1139
1140 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1141                 const std::vector<std::wstring>& params, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1142 {
1143         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1144                 return core::frame_consumer::empty();
1145
1146         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1147         auto path = u8(params.size() > 1 ? params.at(1) : L"");
1148         auto args = u8(boost::join(params, L" "));
1149
1150         return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
1151 }
1152
1153 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1154                 const boost::property_tree::wptree& ptree, core::interaction_sink*, std::vector<spl::shared_ptr<core::video_channel>> channels)
1155 {
1156         return spl::make_shared<streaming_consumer>(
1157                         u8(ptree_get<std::wstring>(ptree, L"path")),
1158                         u8(ptree.get<std::wstring>(L"args", L"")),
1159                         false);
1160 }
1161
1162 }}