]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
* Enforce help descriptions for consumers in code.
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14
15 #include <core/consumer/frame_consumer.h>
16 #include <core/frame/frame.h>
17 #include <core/video_format.h>
18 #include <core/monitor/monitor.h>
19 #include <core/help/help_repository.h>
20 #include <core/help/help_sink.h>
21
22 #include <boost/noncopyable.hpp>
23 #include <boost/rational.hpp>
24 #include <boost/format.hpp>
25 #include <boost/algorithm/string/predicate.hpp>
26 #include <boost/property_tree/ptree.hpp>
27
28 #pragma warning(push)
29 #pragma warning(disable: 4244)
30 #pragma warning(disable: 4245)
31 #include <boost/crc.hpp>
32 #pragma warning(pop)
33
34 #include <tbb/atomic.h>
35 #include <tbb/concurrent_queue.h>
36 #include <tbb/parallel_invoke.h>
37 #include <tbb/parallel_for.h>
38
39 #include <numeric>
40
41 #pragma warning(push)
42 #pragma warning(disable: 4244)
43
44 extern "C" 
45 {
46         #define __STDC_CONSTANT_MACROS
47         #define __STDC_LIMIT_MACROS
48         #include <libavformat/avformat.h>
49         #include <libavcodec/avcodec.h>
50         #include <libavutil/avutil.h>
51         #include <libavutil/frame.h>
52         #include <libavutil/opt.h>
53         #include <libavutil/imgutils.h>
54         #include <libavutil/parseutils.h>
55         #include <libavfilter/avfilter.h>
56         #include <libavfilter/buffersink.h>
57         #include <libavfilter/buffersrc.h>
58 }
59
60 #pragma warning(pop)
61
62 namespace caspar { namespace ffmpeg {
63
64 int crc16(const std::string& str)
65 {
66         boost::crc_16_type result;
67
68         result.process_bytes(str.data(), str.length());
69
70         return result.checksum();
71 }
72
73 class streaming_consumer final : public core::frame_consumer
74 {
75 public:
76         // Static Members
77                 
78 private:
79         core::monitor::subject                                          subject_;
80         boost::filesystem::path                                         path_;
81         int                                                                                     consumer_index_offset_;
82
83         std::map<std::string, std::string>                      options_;
84                                                                                                 
85         core::video_format_desc                                         in_video_format_;
86
87         std::shared_ptr<AVFormatContext>                        oc_;
88         tbb::atomic<bool>                                                       abort_request_;
89                                                                                                 
90         std::shared_ptr<AVStream>                                       video_st_;
91         std::shared_ptr<AVStream>                                       audio_st_;
92
93         std::int64_t                                                            video_pts_;
94         std::int64_t                                                            audio_pts_;
95                                                                                                                                                                         
96     AVFilterContext*                                                    audio_graph_in_;  
97     AVFilterContext*                                                    audio_graph_out_; 
98     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
99         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
100
101     AVFilterContext*                                                    video_graph_in_;  
102     AVFilterContext*                                                    video_graph_out_; 
103     std::shared_ptr<AVFilterGraph>                              video_graph_;  
104         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
105         
106         executor                                                                        executor_;
107
108         executor                                                                        video_encoder_executor_;
109         executor                                                                        audio_encoder_executor_;
110
111         tbb::atomic<int>                                                        tokens_;
112         boost::mutex                                                            tokens_mutex_;
113         boost::condition_variable                                       tokens_cond_;
114
115         executor                                                                        write_executor_;
116         
117 public:
118
119         streaming_consumer(
120                         std::string path, 
121                         std::string options)
122                 : path_(path)
123                 , consumer_index_offset_(crc16(path))
124                 , video_pts_(0)
125                 , audio_pts_(0)
126                 , executor_(print())
127                 , audio_encoder_executor_(print() + L" audio_encoder")
128                 , video_encoder_executor_(print() + L" video_encoder")
129                 , write_executor_(print() + L" io")
130         {               
131                 abort_request_ = false; 
132
133                 for(auto it = 
134                                 boost::sregex_iterator(
135                                         options.begin(), 
136                                         options.end(), 
137                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
138                         it != boost::sregex_iterator(); 
139                         ++it)
140                 {                               
141                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
142                 }
143                                                                                 
144         if (options_.find("threads") == options_.end())
145             options_["threads"] = "auto";
146
147                 tokens_ = 
148                         std::max(
149                                 1, 
150                                 try_remove_arg<int>(
151                                         options_, 
152                                         boost::regex("tokens")).get_value_or(2));               
153         }
154                 
155         ~streaming_consumer()
156         {
157                 if(oc_)
158                 {
159                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
160                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
161
162                         video_encoder_executor_.stop();
163                         audio_encoder_executor_.stop();
164                         video_encoder_executor_.join();
165                         audio_encoder_executor_.join();
166
167                         video_graph_.reset();
168                         audio_graph_.reset();
169                         video_st_.reset();
170                         audio_st_.reset();
171
172                         write_packet(nullptr, nullptr);
173
174                         write_executor_.stop();
175                         write_executor_.join();
176
177                         FF(av_write_trailer(oc_.get()));
178
179                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
180                                 avio_close(oc_->pb);
181
182                         oc_.reset();
183                 }
184         }
185
186         void initialize(
187                         const core::video_format_desc& format_desc,
188                         int channel_index) override
189         {
190                 try
191                 {                               
192                         static boost::regex prot_exp("^.+:.*" );
193                         
194                         const auto overwrite = 
195                                 try_remove_arg<std::string>(
196                                         options_,
197                                         boost::regex("y")) != nullptr;
198
199                         if(!boost::regex_match(
200                                         path_.string(), 
201                                         prot_exp))
202                         {
203                                 if(!path_.is_complete())
204                                 {
205                                         path_ = 
206                                                 u8(
207                                                         env::media_folder()) + 
208                                                         path_.string();
209                                 }
210                         
211                                 if(boost::filesystem::exists(path_))
212                                 {
213                                         if(!overwrite)
214                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
215                                                 
216                                         boost::filesystem::remove(path_);
217                                 }
218                         }
219                                                         
220                         const auto oformat_name = 
221                                 try_remove_arg<std::string>(
222                                         options_, 
223                                         boost::regex("^f|format$"));
224                         
225                         AVFormatContext* oc;
226
227                         FF(avformat_alloc_output_context2(
228                                 &oc, 
229                                 nullptr, 
230                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
231                                 path_.string().c_str()));
232
233                         oc_.reset(
234                                 oc, 
235                                 avformat_free_context);
236                                         
237                         CASPAR_VERIFY(oc_->oformat);
238
239                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
240                         oc_->interrupt_callback.opaque   = this;        
241
242                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
243
244                         in_video_format_ = format_desc;
245                                                         
246                         CASPAR_VERIFY(oc_->oformat);
247                         
248                         const auto video_codec_name = 
249                                 try_remove_arg<std::string>(
250                                         options_, 
251                                         boost::regex("^c:v|codec:v|vcodec$"));
252
253                         const auto video_codec = 
254                                 video_codec_name 
255                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
256                                         : avcodec_find_encoder(oc_->oformat->video_codec);
257                                                 
258                         const auto audio_codec_name = 
259                                 try_remove_arg<std::string>(
260                                         options_, 
261                                          boost::regex("^c:a|codec:a|acodec$"));
262                         
263                         const auto audio_codec = 
264                                 audio_codec_name 
265                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
266                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
267                         
268                         if (!video_codec)
269                                 BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(
270                                                 "Failed to find video codec " + (video_codec_name
271                                                                 ? *video_codec_name
272                                                                 : "with id " + boost::lexical_cast<std::string>(
273                                                                                 oc_->oformat->video_codec))));
274                         if (!audio_codec)
275                                 BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(
276                                                 "Failed to find audio codec " + (audio_codec_name
277                                                                 ? *audio_codec_name
278                                                                 : "with id " + boost::lexical_cast<std::string>(
279                                                                                 oc_->oformat->audio_codec))));
280                         
281                         // Filters
282
283                         {
284                                 configure_video_filters(
285                                         *video_codec, 
286                                         try_remove_arg<std::string>(options_, 
287                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
288
289                                 configure_audio_filters(
290                                         *audio_codec, 
291                                         try_remove_arg<std::string>(options_,
292                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
293                         }
294
295                         // Bistream Filters
296                         {
297                                 configue_audio_bistream_filters(options_);
298                                 configue_video_bistream_filters(options_);
299                         }
300
301                         // Encoders
302
303                         {
304                                 auto video_options = options_;
305                                 auto audio_options = options_;
306
307                                 video_st_ = open_encoder(
308                                         *video_codec, 
309                                         video_options);
310
311                                 audio_st_ = open_encoder(
312                                         *audio_codec, 
313                                         audio_options);
314
315                                 auto it = options_.begin();
316                                 while(it != options_.end())
317                                 {
318                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
319                                                 it = options_.erase(it);
320                                         else
321                                                 ++it;
322                                 }
323                         }
324
325                         // Output
326                         {
327                                 AVDictionary* av_opts = nullptr;
328
329                                 to_dict(
330                                         &av_opts, 
331                                         std::move(options_));
332
333                                 CASPAR_SCOPE_EXIT
334                                 {
335                                         av_dict_free(&av_opts);
336                                 };
337
338                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
339                                 {
340                                         FF(avio_open2(
341                                                 &oc_->pb, 
342                                                 path_.string().c_str(), 
343                                                 AVIO_FLAG_WRITE, 
344                                                 &oc_->interrupt_callback, 
345                                                 &av_opts));
346                                 }
347                                 
348                                 FF(avformat_write_header(
349                                         oc_.get(), 
350                                         &av_opts));
351                                 
352                                 options_ = to_map(av_opts);
353                         }
354
355                         // Dump Info
356                         
357                         av_dump_format(
358                                 oc_.get(), 
359                                 0, 
360                                 oc_->filename, 
361                                 1);             
362
363                         for (const auto& option : options_)
364                         {
365                                 CASPAR_LOG(warning) 
366                                         << L"Invalid option: -" 
367                                         << u16(option.first) 
368                                         << L" " 
369                                         << u16(option.second);
370                         }
371                 }
372                 catch(...)
373                 {
374                         video_st_.reset();
375                         audio_st_.reset();
376                         oc_.reset();
377                         throw;
378                 }
379         }
380
381         core::monitor::subject& monitor_output() override
382         {
383                 return subject_;
384         }
385
386         std::wstring name() const override
387         {
388                 return L"streaming";
389         }
390
391         std::future<bool> send(core::const_frame frame) override
392         {               
393                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
394                 
395                 --tokens_;
396                 std::shared_ptr<void> token(
397                         nullptr, 
398                         [this](void*)
399                         {
400                                 ++tokens_;
401                                 tokens_cond_.notify_one();
402                         });
403
404                 return executor_.begin_invoke([=]() -> bool
405                 {
406                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
407
408                         while(tokens_ < 0)
409                                 tokens_cond_.wait(tokens_lock);
410
411                         video_encoder_executor_.begin_invoke([=]() mutable
412                         {
413                                 encode_video(
414                                         frame, 
415                                         token);
416                         });
417                 
418                         audio_encoder_executor_.begin_invoke([=]() mutable
419                         {
420                                 encode_audio(
421                                         frame, 
422                                         token);
423                         });
424                                 
425                         return true;
426                 });
427         }
428
429         std::wstring print() const override
430         {
431                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
432         }
433         
434         virtual boost::property_tree::wptree info() const override
435         {
436                 return boost::property_tree::wptree();
437         }
438
439         bool has_synchronization_clock() const override
440         {
441                 return false;
442         }
443
444         int buffer_depth() const override
445         {
446                 return -1;
447         }
448
449         int index() const override
450         {
451                 return 100000 + consumer_index_offset_;
452         }
453
454 private:
455
456         static int interrupt_cb(void* ctx)
457         {
458                 CASPAR_ASSERT(ctx);
459                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
460         }
461                 
462         std::shared_ptr<AVStream> open_encoder(
463                         const AVCodec& codec,
464                         std::map<std::string,
465                         std::string>& options)
466         {                       
467                 auto st = 
468                         avformat_new_stream(
469                                 oc_.get(), 
470                                 &codec);
471
472                 if (!st)                
473                         BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));      
474
475                 auto enc = st->codec;
476                                 
477                 CASPAR_VERIFY(enc);
478                                                 
479                 switch(enc->codec_type)
480                 {
481                         case AVMEDIA_TYPE_VIDEO:
482                         {                               
483                                 enc->time_base                    = video_graph_out_->inputs[0]->time_base;
484                                 enc->pix_fmt                      = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
485                                 enc->sample_aspect_ratio  = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
486                                 enc->width                                = video_graph_out_->inputs[0]->w;
487                                 enc->height                               = video_graph_out_->inputs[0]->h;
488                         
489                                 break;
490                         }
491                         case AVMEDIA_TYPE_AUDIO:
492                         {
493                                 enc->time_base                    = audio_graph_out_->inputs[0]->time_base;
494                                 enc->sample_fmt                   = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
495                                 enc->sample_rate                  = audio_graph_out_->inputs[0]->sample_rate;
496                                 enc->channel_layout               = audio_graph_out_->inputs[0]->channel_layout;
497                                 enc->channels                     = audio_graph_out_->inputs[0]->channels;
498                         
499                                 break;
500                         }
501                 }
502                                                                                 
503                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
504                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
505                 
506                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
507
508                 const auto char_id = char_id_map.at(enc->codec_type);
509                                                                 
510                 const auto codec_opts = 
511                         remove_options(
512                                 options, 
513                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
514                 
515                 AVDictionary* av_codec_opts = nullptr;
516
517                 to_dict(
518                         &av_codec_opts, 
519                         options);
520
521                 to_dict(
522                         &av_codec_opts,
523                         codec_opts);
524
525                 options.clear();
526                 
527                 FF(avcodec_open2(
528                         enc,            
529                         &codec, 
530                         av_codec_opts ? &av_codec_opts : nullptr));             
531
532                 if(av_codec_opts)
533                 {
534                         auto t = 
535                                 av_dict_get(
536                                         av_codec_opts, 
537                                         "", 
538                                          nullptr, 
539                                         AV_DICT_IGNORE_SUFFIX);
540
541                         while(t)
542                         {
543                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
544
545                                 t = av_dict_get(
546                                                 av_codec_opts, 
547                                                 "", 
548                                                 t, 
549                                                 AV_DICT_IGNORE_SUFFIX);
550                         }
551
552                         av_dict_free(&av_codec_opts);
553                 }
554                                 
555                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
556                 {
557                         CASPAR_ASSERT(enc->frame_size > 0);
558                         av_buffersink_set_frame_size(audio_graph_out_, 
559                                                                                  enc->frame_size);
560                 }
561                 
562                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
563                 {
564                         avcodec_close(st->codec);
565                 });
566         }
567
568         void configue_audio_bistream_filters(
569                         std::map<std::string, std::string>& options)
570         {
571                 const auto audio_bitstream_filter_str = 
572                         try_remove_arg<std::string>(
573                                 options, 
574                                 boost::regex("^bsf:a|absf$"));
575
576                 const auto audio_bitstream_filter = 
577                         audio_bitstream_filter_str 
578                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
579                                 : nullptr;
580
581                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
582
583                 if(audio_bitstream_filter)
584                 {
585                         audio_bitstream_filter_.reset(
586                                 audio_bitstream_filter, 
587                                 av_bitstream_filter_close);
588                 }
589                 
590                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
591                         options["bsf:a"] = *audio_bitstream_filter_str;
592         }
593         
594         void configue_video_bistream_filters(
595                         std::map<std::string, std::string>& options)
596         {
597                 const auto video_bitstream_filter_str = 
598                                 try_remove_arg<std::string>(
599                                         options, 
600                                         boost::regex("^bsf:v|vbsf$"));
601
602                 const auto video_bitstream_filter = 
603                         video_bitstream_filter_str 
604                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
605                                 : nullptr;
606
607                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
608
609                 if(video_bitstream_filter)
610                 {
611                         video_bitstream_filter_.reset(
612                                 video_bitstream_filter, 
613                                 av_bitstream_filter_close);
614                 }
615                 
616                 if(video_bitstream_filter_str && !video_bitstream_filter_)
617                         options["bsf:v"] = *video_bitstream_filter_str;
618         }
619         
620         void configure_video_filters(
621                         const AVCodec& codec,
622                         const std::string& filtergraph)
623         {
624                 video_graph_.reset(
625                                 avfilter_graph_alloc(), 
626                                 [](AVFilterGraph* p)
627                                 {
628                                         avfilter_graph_free(&p);
629                                 });
630                 
631                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
632                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
633
634                 const auto sample_aspect_ratio =
635                         boost::rational<int>(
636                                         in_video_format_.square_width,
637                                         in_video_format_.square_height) /
638                         boost::rational<int>(
639                                         in_video_format_.width,
640                                         in_video_format_.height);
641                 
642                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
643                         % in_video_format_.width % in_video_format_.height
644                         % AV_PIX_FMT_BGRA
645                         % in_video_format_.duration     % in_video_format_.time_scale
646                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
647                         % in_video_format_.time_scale % in_video_format_.duration).str();
648                                         
649                 AVFilterContext* filt_vsrc = nullptr;                   
650                 FF(avfilter_graph_create_filter(
651                                 &filt_vsrc,
652                                 avfilter_get_by_name("buffer"), 
653                                 "ffmpeg_consumer_buffer",
654                                 vsrc_options.c_str(), 
655                                 nullptr, 
656                                 video_graph_.get()));
657                                 
658                 AVFilterContext* filt_vsink = nullptr;
659                 FF(avfilter_graph_create_filter(
660                                 &filt_vsink,
661                                 avfilter_get_by_name("buffersink"), 
662                                 "ffmpeg_consumer_buffersink",
663                                 nullptr, 
664                                 nullptr, 
665                                 video_graph_.get()));
666                 
667 #pragma warning (push)
668 #pragma warning (disable : 4245)
669
670                 FF(av_opt_set_int_list(
671                                 filt_vsink, 
672                                 "pix_fmts", 
673                                 codec.pix_fmts, 
674                                 -1,
675                                 AV_OPT_SEARCH_CHILDREN));
676
677 #pragma warning (pop)
678                         
679                 configure_filtergraph(
680                                 *video_graph_, 
681                                 filtergraph,
682                                 *filt_vsrc,
683                                 *filt_vsink);
684
685                 video_graph_in_  = filt_vsrc;
686                 video_graph_out_ = filt_vsink;
687                 
688                 CASPAR_LOG(info)
689                         <<      u16(std::string("\n") 
690                                 + avfilter_graph_dump(
691                                                 video_graph_.get(), 
692                                                 nullptr));
693         }
694
695         void configure_audio_filters(
696                         const AVCodec& codec,
697                         const std::string& filtergraph)
698         {
699                 audio_graph_.reset(
700                         avfilter_graph_alloc(), 
701                         [](AVFilterGraph* p)
702                         {
703                                 avfilter_graph_free(&p);
704                         });
705                 
706                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
707                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
708                 
709                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
710                         % in_video_format_.audio_sample_rate
711                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
712                         % in_video_format_.audio_channels
713                         % 1     % in_video_format_.audio_sample_rate
714                         % boost::io::group(
715                                 std::hex, 
716                                 std::showbase, 
717                                 av_get_default_channel_layout(in_video_format_.audio_channels))).str();
718
719                 AVFilterContext* filt_asrc = nullptr;
720                 FF(avfilter_graph_create_filter(
721                         &filt_asrc,
722                         avfilter_get_by_name("abuffer"), 
723                         "ffmpeg_consumer_abuffer",
724                         asrc_options.c_str(), 
725                         nullptr, 
726                         audio_graph_.get()));
727                                 
728                 AVFilterContext* filt_asink = nullptr;
729                 FF(avfilter_graph_create_filter(
730                         &filt_asink,
731                         avfilter_get_by_name("abuffersink"), 
732                         "ffmpeg_consumer_abuffersink",
733                         nullptr, 
734                         nullptr, 
735                         audio_graph_.get()));
736                 
737 #pragma warning (push)
738 #pragma warning (disable : 4245)
739
740                 FF(av_opt_set_int(
741                         filt_asink,        
742                         "all_channel_counts",
743                         1,      
744                         AV_OPT_SEARCH_CHILDREN));
745
746                 FF(av_opt_set_int_list(
747                         filt_asink, 
748                         "sample_fmts",           
749                         codec.sample_fmts,                              
750                         -1, 
751                         AV_OPT_SEARCH_CHILDREN));
752
753                 FF(av_opt_set_int_list(
754                         filt_asink,
755                         "channel_layouts",       
756                         codec.channel_layouts,                  
757                         -1, 
758                         AV_OPT_SEARCH_CHILDREN));
759
760                 FF(av_opt_set_int_list(
761                         filt_asink, 
762                         "sample_rates" ,         
763                         codec.supported_samplerates,    
764                         -1, 
765                         AV_OPT_SEARCH_CHILDREN));
766
767 #pragma warning (pop)
768                         
769                 configure_filtergraph(
770                         *audio_graph_, 
771                         filtergraph, 
772                         *filt_asrc, 
773                         *filt_asink);
774
775                 audio_graph_in_  = filt_asrc;
776                 audio_graph_out_ = filt_asink;
777
778                 CASPAR_LOG(info) 
779                         <<      u16(std::string("\n") 
780                                 + avfilter_graph_dump(
781                                         audio_graph_.get(), 
782                                         nullptr));
783         }
784
785         void configure_filtergraph(
786                         AVFilterGraph& graph,
787                         const std::string& filtergraph,
788                         AVFilterContext& source_ctx,
789                         AVFilterContext& sink_ctx)
790         {
791                 AVFilterInOut* outputs = nullptr;
792                 AVFilterInOut* inputs = nullptr;
793
794                 try
795                 {
796                         if(!filtergraph.empty())
797                         {
798                                 outputs = avfilter_inout_alloc();
799                                 inputs  = avfilter_inout_alloc();
800
801                                 CASPAR_VERIFY(outputs && inputs);
802
803                                 outputs->name       = av_strdup("in");
804                                 outputs->filter_ctx = &source_ctx;
805                                 outputs->pad_idx    = 0;
806                                 outputs->next       = nullptr;
807
808                                 inputs->name        = av_strdup("out");
809                                 inputs->filter_ctx  = &sink_ctx;
810                                 inputs->pad_idx     = 0;
811                                 inputs->next        = nullptr;
812
813                                 FF(avfilter_graph_parse(
814                                         &graph, 
815                                         filtergraph.c_str(), 
816                                         &inputs, 
817                                         &outputs, 
818                                         nullptr));
819                         } 
820                         else 
821                         {
822                                 FF(avfilter_link(
823                                         &source_ctx, 
824                                         0, 
825                                         &sink_ctx, 
826                                         0));
827                         }
828
829                         FF(avfilter_graph_config(
830                                 &graph, 
831                                 nullptr));
832                 }
833                 catch(...)
834                 {
835                         avfilter_inout_free(&outputs);
836                         avfilter_inout_free(&inputs);
837                         throw;
838                 }
839         }
840         
841         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
842         {               
843                 if(!video_st_)
844                         return;
845
846                 auto enc = video_st_->codec;
847                         
848                 std::shared_ptr<AVFrame> src_av_frame;
849
850                 if(frame_ptr != core::const_frame::empty())
851                 {
852                         src_av_frame.reset(
853                                 av_frame_alloc(),
854                                 [frame_ptr](AVFrame* frame)
855                                 {
856                                         av_frame_free(&frame);
857                                 });
858
859                         avcodec_get_frame_defaults(src_av_frame.get());         
860                         
861                         const auto sample_aspect_ratio = 
862                                 boost::rational<int>(
863                                         in_video_format_.square_width, 
864                                         in_video_format_.square_height) /
865                                 boost::rational<int>(
866                                         in_video_format_.width, 
867                                         in_video_format_.height);
868
869                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
870                         src_av_frame->width                                       = in_video_format_.width;
871                         src_av_frame->height                              = in_video_format_.height;
872                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
873                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
874                         src_av_frame->pts                                         = video_pts_;
875
876                         video_pts_ += 1;
877
878                         FF(av_image_fill_arrays(
879                                 src_av_frame->data,
880                                 src_av_frame->linesize,
881                                 frame_ptr.image_data().begin(),
882                                 static_cast<AVPixelFormat>(src_av_frame->format), 
883                                 in_video_format_.width, 
884                                 in_video_format_.height, 
885                                 1));
886
887                         FF(av_buffersrc_add_frame(
888                                 video_graph_in_, 
889                                 src_av_frame.get()));
890                 }               
891
892                 int ret = 0;
893
894                 while(ret >= 0)
895                 {
896                         std::shared_ptr<AVFrame> filt_frame(
897                                 av_frame_alloc(), 
898                                 [](AVFrame* p)
899                                 {
900                                         av_frame_free(&p);
901                                 });
902
903                         ret = av_buffersink_get_frame(
904                                 video_graph_out_, 
905                                 filt_frame.get());
906                                                 
907                         video_encoder_executor_.begin_invoke([=]
908                         {
909                                 if(ret == AVERROR_EOF)
910                                 {
911                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
912                                         {
913                                                 while(encode_av_frame(
914                                                                 *video_st_, 
915                                                                 video_bitstream_filter_.get(),
916                                                                 avcodec_encode_video2, 
917                                                                 nullptr, token))
918                                                 {
919                                                         boost::this_thread::yield(); // TODO:
920                                                 }
921                                         }               
922                                 }
923                                 else if(ret != AVERROR(EAGAIN))
924                                 {
925                                         FF_RET(ret, "av_buffersink_get_frame");
926                                         
927                                         if (filt_frame->interlaced_frame) 
928                                         {
929                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
930                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
931                                                 else
932                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
933                                         } 
934                                         else
935                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
936
937                                         filt_frame->quality = enc->global_quality;
938
939                                         if (!enc->me_threshold)
940                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
941                         
942                                         encode_av_frame(
943                                                 *video_st_,
944                                                 video_bitstream_filter_.get(),
945                                                 avcodec_encode_video2,
946                                                 filt_frame, 
947                                                 token);
948
949                                         boost::this_thread::yield(); // TODO:
950                                 }
951                         });
952                 }
953         }
954                                         
955         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
956         {               
957                 if(!audio_st_)
958                         return;
959                 
960                 auto enc = audio_st_->codec;
961                         
962                 std::shared_ptr<AVFrame> src_av_frame;
963
964                 if(frame_ptr != core::const_frame::empty())
965                 {
966                         src_av_frame.reset(
967                                 av_frame_alloc(), 
968                                 [](AVFrame* p)
969                                 {
970                                         av_frame_free(&p);
971                                 });
972                 
973                         src_av_frame->channels           = in_video_format_.audio_channels;
974                         src_av_frame->channel_layout = av_get_default_channel_layout(in_video_format_.audio_channels);
975                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
976                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
977                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
978                         src_av_frame->pts                        = audio_pts_;
979
980                         audio_pts_ += src_av_frame->nb_samples;
981
982                         FF(av_samples_fill_arrays(
983                                         src_av_frame->extended_data,
984                                         src_av_frame->linesize,
985                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
986                                         src_av_frame->channels,
987                                         src_av_frame->nb_samples,
988                                         static_cast<AVSampleFormat>(src_av_frame->format),
989                                         16));
990                 
991                         FF(av_buffersrc_add_frame(
992                                         audio_graph_in_, 
993                                         src_av_frame.get()));
994                 }
995
996                 int ret = 0;
997
998                 while(ret >= 0)
999                 {
1000                         std::shared_ptr<AVFrame> filt_frame(
1001                                 av_frame_alloc(), 
1002                                 [](AVFrame* p)
1003                                 {
1004                                         av_frame_free(&p);
1005                                 });
1006
1007                         ret = av_buffersink_get_frame(
1008                                 audio_graph_out_, 
1009                                 filt_frame.get());
1010                                         
1011                         audio_encoder_executor_.begin_invoke([=]
1012                         {       
1013                                 if(ret == AVERROR_EOF)
1014                                 {
1015                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1016                                         {
1017                                                 while(encode_av_frame(
1018                                                                 *audio_st_, 
1019                                                                 audio_bitstream_filter_.get(), 
1020                                                                 avcodec_encode_audio2, 
1021                                                                 nullptr, 
1022                                                                 token))
1023                                                 {
1024                                                         boost::this_thread::yield(); // TODO:
1025                                                 }
1026                                         }
1027                                 }
1028                                 else if(ret != AVERROR(EAGAIN))
1029                                 {
1030                                         FF_RET(
1031                                                 ret, 
1032                                                 "av_buffersink_get_frame");
1033
1034                                         encode_av_frame(
1035                                                 *audio_st_, 
1036                                                 audio_bitstream_filter_.get(), 
1037                                                 avcodec_encode_audio2, 
1038                                                 filt_frame, 
1039                                                 token);
1040
1041                                         boost::this_thread::yield(); // TODO:
1042                                 }
1043                         });
1044                 }
1045         }
1046         
1047         template<typename F>
1048         bool encode_av_frame(
1049                         AVStream& st,
1050                         AVBitStreamFilterContext* bsfc, 
1051                         const F& func, 
1052                         const std::shared_ptr<AVFrame>& src_av_frame, 
1053                         std::shared_ptr<void> token)
1054         {
1055                 AVPacket pkt = {};
1056                 av_init_packet(&pkt);
1057
1058                 int got_packet = 0;
1059
1060                 FF(func(
1061                         st.codec, 
1062                         &pkt, 
1063                         src_av_frame.get(), 
1064                         &got_packet));
1065                                         
1066                 if(!got_packet || pkt.size <= 0)
1067                         return false;
1068                                 
1069                 pkt.stream_index = st.index;
1070                 
1071                 if(bsfc)
1072                 {
1073                         auto new_pkt = pkt;
1074
1075                         auto a = av_bitstream_filter_filter(
1076                                         bsfc,
1077                                         st.codec,
1078                                         nullptr,
1079                                         &new_pkt.data,
1080                                         &new_pkt.size,
1081                                         pkt.data,
1082                                         pkt.size,
1083                                         pkt.flags & AV_PKT_FLAG_KEY);
1084
1085                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1086                         {
1087                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1088
1089                                 if(t) 
1090                                 {
1091                                         memcpy(
1092                                                 t, 
1093                                                 new_pkt.data,
1094                                                 new_pkt.size);
1095
1096                                         memset(
1097                                                 t + new_pkt.size, 
1098                                                 0, 
1099                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1100
1101                                         new_pkt.data = t;
1102                                         new_pkt.buf  = nullptr;
1103                                 } 
1104                                 else
1105                                         a = AVERROR(ENOMEM);
1106                         }
1107
1108                         av_free_packet(&pkt);
1109
1110                         FF_RET(
1111                                 a, 
1112                                 "av_bitstream_filter_filter");
1113
1114                         new_pkt.buf =
1115                                 av_buffer_create(
1116                                         new_pkt.data, 
1117                                         new_pkt.size,
1118                                         av_buffer_default_free, 
1119                                         nullptr, 
1120                                         0);
1121
1122                         CASPAR_VERIFY(new_pkt.buf);
1123
1124                         pkt = new_pkt;
1125                 }
1126                 
1127                 if (pkt.pts != AV_NOPTS_VALUE)
1128                 {
1129                         pkt.pts = 
1130                                 av_rescale_q(
1131                                         pkt.pts,
1132                                         st.codec->time_base, 
1133                                         st.time_base);
1134                 }
1135
1136                 if (pkt.dts != AV_NOPTS_VALUE)
1137                 {
1138                         pkt.dts = 
1139                                 av_rescale_q(
1140                                         pkt.dts, 
1141                                         st.codec->time_base, 
1142                                         st.time_base);
1143                 }
1144                                 
1145                 pkt.duration = 
1146                         static_cast<int>(
1147                                 av_rescale_q(
1148                                         pkt.duration, 
1149                                         st.codec->time_base, st.time_base));
1150
1151                 write_packet(
1152                         std::shared_ptr<AVPacket>(
1153                                 new AVPacket(pkt), 
1154                                 [](AVPacket* p)
1155                                 {
1156                                         av_free_packet(p); 
1157                                         delete p;
1158                                 }), token);
1159
1160                 return true;
1161         }
1162
1163         void write_packet(
1164                         const std::shared_ptr<AVPacket>& pkt_ptr,
1165                         std::shared_ptr<void> token)
1166         {               
1167                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1168                 {
1169                         FF(av_interleaved_write_frame(
1170                                 oc_.get(), 
1171                                 pkt_ptr.get()));
1172                 });     
1173         }       
1174         
1175         template<typename T>
1176         static boost::optional<T> try_remove_arg(
1177                         std::map<std::string, std::string>& options, 
1178                         const boost::regex& expr)
1179         {
1180                 for(auto it = options.begin(); it != options.end(); ++it)
1181                 {                       
1182                         if(boost::regex_search(it->first, expr))
1183                         {
1184                                 auto arg = it->second;
1185                                 options.erase(it);
1186                                 return boost::lexical_cast<T>(arg);
1187                         }
1188                 }
1189
1190                 return boost::optional<T>();
1191         }
1192                 
1193         static std::map<std::string, std::string> remove_options(
1194                         std::map<std::string, std::string>& options, 
1195                         const boost::regex& expr)
1196         {
1197                 std::map<std::string, std::string> result;
1198                         
1199                 auto it = options.begin();
1200                 while(it != options.end())
1201                 {                       
1202                         boost::smatch what;
1203                         if(boost::regex_search(it->first, what, expr))
1204                         {
1205                                 result[
1206                                         what.size() > 0 && what[1].matched 
1207                                                 ? what[1].str() 
1208                                                 : it->first] = it->second;
1209                                 it = options.erase(it);
1210                         }
1211                         else
1212                                 ++it;
1213                 }
1214
1215                 return result;
1216         }
1217                 
1218         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1219         {               
1220                 for (const auto& entry : c)
1221                 {
1222                         av_dict_set(
1223                                 dest, 
1224                                 entry.first.c_str(), 
1225                                 entry.second.c_str(), 0);
1226                 }
1227         }
1228
1229         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1230         {
1231                 std::map<std::string, std::string> result;
1232                 
1233                 for(auto t = dict 
1234                                 ? av_dict_get(
1235                                         dict, 
1236                                         "", 
1237                                         nullptr, 
1238                                         AV_DICT_IGNORE_SUFFIX) 
1239                                 : nullptr;
1240                         t; 
1241                         t = av_dict_get(
1242                                 dict, 
1243                                 "", 
1244                                 t,
1245                                 AV_DICT_IGNORE_SUFFIX))
1246                 {
1247                         result[t->key] = t->value;
1248                 }
1249
1250                 return result;
1251         }
1252 };
1253
1254 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1255 {
1256         sink.short_description(L"For streaming the contents of a channel using FFMpeg.");
1257         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1258         sink.para()->text(L"For streaming the contents of a channel using FFMpeg");
1259         sink.definitions()
1260                 ->item(L"url", L"The stream URL to create/stream to.")
1261                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFMpeg. For example vcodec or acodec etc.");
1262         sink.para()->text(L"Examples:");
1263         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1264 }
1265
1266 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1267                 const std::vector<std::wstring>& params, core::interaction_sink*)
1268 {       
1269         if (params.size() < 1 || params.at(0) != L"STREAM")
1270                 return core::frame_consumer::empty();
1271
1272         auto path = u8(params.at(1));
1273         auto args = u8(boost::join(params, L" "));
1274
1275         return spl::make_shared<streaming_consumer>(path, args);
1276 }
1277
1278 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1279                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1280 {               
1281         return spl::make_shared<streaming_consumer>(
1282                         u8(ptree.get<std::wstring>(L"path")), 
1283                         u8(ptree.get<std::wstring>(L"args", L"")));
1284 }
1285
1286 }}