]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
Throw user_error exception for errors that are caused by users of the system instead...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14
15 #include <core/consumer/frame_consumer.h>
16 #include <core/frame/frame.h>
17 #include <core/frame/audio_channel_layout.h>
18 #include <core/video_format.h>
19 #include <core/monitor/monitor.h>
20 #include <core/help/help_repository.h>
21 #include <core/help/help_sink.h>
22
23 #include <boost/noncopyable.hpp>
24 #include <boost/rational.hpp>
25 #include <boost/format.hpp>
26 #include <boost/algorithm/string/predicate.hpp>
27 #include <boost/property_tree/ptree.hpp>
28
29 #pragma warning(push)
30 #pragma warning(disable: 4244)
31 #pragma warning(disable: 4245)
32 #include <boost/crc.hpp>
33 #pragma warning(pop)
34
35 #include <tbb/atomic.h>
36 #include <tbb/concurrent_queue.h>
37 #include <tbb/parallel_invoke.h>
38 #include <tbb/parallel_for.h>
39
40 #include <numeric>
41
42 #pragma warning(push)
43 #pragma warning(disable: 4244)
44
45 extern "C" 
46 {
47         #define __STDC_CONSTANT_MACROS
48         #define __STDC_LIMIT_MACROS
49         #include <libavformat/avformat.h>
50         #include <libavcodec/avcodec.h>
51         #include <libavutil/avutil.h>
52         #include <libavutil/frame.h>
53         #include <libavutil/opt.h>
54         #include <libavutil/imgutils.h>
55         #include <libavutil/parseutils.h>
56         #include <libavfilter/avfilter.h>
57         #include <libavfilter/buffersink.h>
58         #include <libavfilter/buffersrc.h>
59 }
60
61 #pragma warning(pop)
62
63 namespace caspar { namespace ffmpeg {
64
65 int crc16(const std::string& str)
66 {
67         boost::crc_16_type result;
68
69         result.process_bytes(str.data(), str.length());
70
71         return result.checksum();
72 }
73
74 class streaming_consumer final : public core::frame_consumer
75 {
76 public:
77         // Static Members
78                 
79 private:
80         core::monitor::subject                                          subject_;
81         boost::filesystem::path                                         path_;
82         int                                                                                     consumer_index_offset_;
83
84         std::map<std::string, std::string>                      options_;
85         bool                                                                            compatibility_mode_;
86                                                                                                 
87         core::video_format_desc                                         in_video_format_;
88         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
89
90         std::shared_ptr<AVFormatContext>                        oc_;
91         tbb::atomic<bool>                                                       abort_request_;
92                                                                                                 
93         std::shared_ptr<AVStream>                                       video_st_;
94         std::shared_ptr<AVStream>                                       audio_st_;
95
96         std::int64_t                                                            video_pts_;
97         std::int64_t                                                            audio_pts_;
98                                                                                                                                                                         
99     AVFilterContext*                                                    audio_graph_in_;  
100     AVFilterContext*                                                    audio_graph_out_; 
101     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
102         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
103
104     AVFilterContext*                                                    video_graph_in_;  
105     AVFilterContext*                                                    video_graph_out_; 
106     std::shared_ptr<AVFilterGraph>                              video_graph_;  
107         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
108         
109         executor                                                                        executor_;
110
111         executor                                                                        video_encoder_executor_;
112         executor                                                                        audio_encoder_executor_;
113
114         tbb::atomic<int>                                                        tokens_;
115         boost::mutex                                                            tokens_mutex_;
116         boost::condition_variable                                       tokens_cond_;
117         tbb::atomic<int64_t>                                            current_encoding_delay_;
118
119         executor                                                                        write_executor_;
120         
121 public:
122
123         streaming_consumer(
124                         std::string path, 
125                         std::string options,
126                         bool compatibility_mode)
127                 : path_(path)
128                 , consumer_index_offset_(crc16(path))
129                 , compatibility_mode_(compatibility_mode)
130                 , video_pts_(0)
131                 , audio_pts_(0)
132                 , executor_(print())
133                 , audio_encoder_executor_(print() + L" audio_encoder")
134                 , video_encoder_executor_(print() + L" video_encoder")
135                 , write_executor_(print() + L" io")
136         {               
137                 abort_request_ = false;
138                 current_encoding_delay_ = 0;
139
140                 for(auto it = 
141                                 boost::sregex_iterator(
142                                         options.begin(), 
143                                         options.end(), 
144                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
145                         it != boost::sregex_iterator(); 
146                         ++it)
147                 {                               
148                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
149                 }
150                                                                                 
151         if (options_.find("threads") == options_.end())
152             options_["threads"] = "auto";
153
154                 tokens_ = 
155                         std::max(
156                                 1, 
157                                 try_remove_arg<int>(
158                                         options_, 
159                                         boost::regex("tokens")).get_value_or(2));               
160         }
161                 
162         ~streaming_consumer()
163         {
164                 if(oc_)
165                 {
166                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
167                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
168
169                         video_encoder_executor_.stop();
170                         audio_encoder_executor_.stop();
171                         video_encoder_executor_.join();
172                         audio_encoder_executor_.join();
173
174                         video_graph_.reset();
175                         audio_graph_.reset();
176                         video_st_.reset();
177                         audio_st_.reset();
178
179                         write_packet(nullptr, nullptr);
180
181                         write_executor_.stop();
182                         write_executor_.join();
183
184                         FF(av_write_trailer(oc_.get()));
185
186                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
187                                 avio_close(oc_->pb);
188
189                         oc_.reset();
190                 }
191         }
192
193         void initialize(
194                         const core::video_format_desc& format_desc,
195                         const core::audio_channel_layout& channel_layout,
196                         int channel_index) override
197         {
198                 try
199                 {                               
200                         static boost::regex prot_exp("^.+:.*" );
201                         
202                         const auto overwrite = 
203                                 try_remove_arg<std::string>(
204                                         options_,
205                                         boost::regex("y")) != boost::none;
206
207                         if(!boost::regex_match(
208                                         path_.string(), 
209                                         prot_exp))
210                         {
211                                 if(!path_.is_complete())
212                                 {
213                                         path_ = 
214                                                 u8(
215                                                         env::media_folder()) + 
216                                                         path_.string();
217                                 }
218                         
219                                 if(boost::filesystem::exists(path_))
220                                 {
221                                         if(!overwrite && !compatibility_mode_)
222                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
223                                                 
224                                         boost::filesystem::remove(path_);
225                                 }
226                         }
227                                                         
228                         const auto oformat_name = 
229                                 try_remove_arg<std::string>(
230                                         options_, 
231                                         boost::regex("^f|format$"));
232                         
233                         AVFormatContext* oc;
234
235                         FF(avformat_alloc_output_context2(
236                                 &oc, 
237                                 nullptr, 
238                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
239                                 path_.string().c_str()));
240
241                         oc_.reset(
242                                 oc, 
243                                 avformat_free_context);
244                                         
245                         CASPAR_VERIFY(oc_->oformat);
246
247                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
248                         oc_->interrupt_callback.opaque   = this;        
249
250                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
251
252                         in_video_format_ = format_desc;
253                         in_channel_layout_ = channel_layout;
254                                                         
255                         CASPAR_VERIFY(oc_->oformat);
256                         
257                         const auto video_codec_name = 
258                                 try_remove_arg<std::string>(
259                                         options_, 
260                                         boost::regex("^c:v|codec:v|vcodec$"));
261
262                         const auto video_codec = 
263                                 video_codec_name 
264                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
265                                         : avcodec_find_encoder(oc_->oformat->video_codec);
266                                                 
267                         const auto audio_codec_name = 
268                                 try_remove_arg<std::string>(
269                                         options_, 
270                                          boost::regex("^c:a|codec:a|acodec$"));
271                         
272                         const auto audio_codec = 
273                                 audio_codec_name 
274                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
275                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
276                         
277                         if (!video_codec)
278                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
279                                                 "Failed to find video codec " + (video_codec_name
280                                                                 ? *video_codec_name
281                                                                 : "with id " + boost::lexical_cast<std::string>(
282                                                                                 oc_->oformat->video_codec))));
283                         if (!audio_codec)
284                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
285                                                 "Failed to find audio codec " + (audio_codec_name
286                                                                 ? *audio_codec_name
287                                                                 : "with id " + boost::lexical_cast<std::string>(
288                                                                                 oc_->oformat->audio_codec))));
289                         
290                         // Filters
291
292                         {
293                                 configure_video_filters(
294                                         *video_codec, 
295                                         try_remove_arg<std::string>(options_, 
296                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
297
298                                 configure_audio_filters(
299                                         *audio_codec, 
300                                         try_remove_arg<std::string>(options_,
301                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
302                         }
303
304                         // Bistream Filters
305                         {
306                                 configue_audio_bistream_filters(options_);
307                                 configue_video_bistream_filters(options_);
308                         }
309
310                         // Encoders
311
312                         {
313                                 auto video_options = options_;
314                                 auto audio_options = options_;
315
316                                 video_st_ = open_encoder(
317                                         *video_codec, 
318                                         video_options);
319
320                                 audio_st_ = open_encoder(
321                                         *audio_codec, 
322                                         audio_options);
323
324                                 auto it = options_.begin();
325                                 while(it != options_.end())
326                                 {
327                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
328                                                 it = options_.erase(it);
329                                         else
330                                                 ++it;
331                                 }
332                         }
333
334                         // Output
335                         {
336                                 AVDictionary* av_opts = nullptr;
337
338                                 to_dict(
339                                         &av_opts, 
340                                         std::move(options_));
341
342                                 CASPAR_SCOPE_EXIT
343                                 {
344                                         av_dict_free(&av_opts);
345                                 };
346
347                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
348                                 {
349                                         FF(avio_open2(
350                                                 &oc_->pb, 
351                                                 path_.string().c_str(), 
352                                                 AVIO_FLAG_WRITE, 
353                                                 &oc_->interrupt_callback, 
354                                                 &av_opts));
355                                 }
356                                 
357                                 FF(avformat_write_header(
358                                         oc_.get(), 
359                                         &av_opts));
360                                 
361                                 options_ = to_map(av_opts);
362                         }
363
364                         // Dump Info
365                         
366                         av_dump_format(
367                                 oc_.get(), 
368                                 0, 
369                                 oc_->filename, 
370                                 1);             
371
372                         for (const auto& option : options_)
373                         {
374                                 CASPAR_LOG(warning) 
375                                         << L"Invalid option: -" 
376                                         << u16(option.first) 
377                                         << L" " 
378                                         << u16(option.second);
379                         }
380                 }
381                 catch(...)
382                 {
383                         video_st_.reset();
384                         audio_st_.reset();
385                         oc_.reset();
386                         throw;
387                 }
388         }
389
390         core::monitor::subject& monitor_output() override
391         {
392                 return subject_;
393         }
394
395         std::wstring name() const override
396         {
397                 return L"streaming";
398         }
399
400         std::future<bool> send(core::const_frame frame) override
401         {               
402                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
403                 
404                 --tokens_;
405                 std::shared_ptr<void> token(
406                         nullptr, 
407                         [this, frame](void*)
408                         {
409                                 ++tokens_;
410                                 tokens_cond_.notify_one();
411                                 current_encoding_delay_ = frame.get_age_millis();
412                         });
413
414                 return executor_.begin_invoke([=]() -> bool
415                 {
416                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
417
418                         while(tokens_ < 0)
419                                 tokens_cond_.wait(tokens_lock);
420
421                         video_encoder_executor_.begin_invoke([=]() mutable
422                         {
423                                 encode_video(
424                                         frame, 
425                                         token);
426                         });
427                 
428                         audio_encoder_executor_.begin_invoke([=]() mutable
429                         {
430                                 encode_audio(
431                                         frame, 
432                                         token);
433                         });
434                                 
435                         return true;
436                 });
437         }
438
439         std::wstring print() const override
440         {
441                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
442         }
443         
444         virtual boost::property_tree::wptree info() const override
445         {
446                 boost::property_tree::wptree info;
447                 info.add(L"type", L"stream");
448                 info.add(L"path", path_.wstring());
449                 return info;
450         }
451
452         bool has_synchronization_clock() const override
453         {
454                 return false;
455         }
456
457         int buffer_depth() const override
458         {
459                 return -1;
460         }
461
462         int index() const override
463         {
464                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
465         }
466
467         int64_t presentation_frame_age_millis() const override
468         {
469                 return current_encoding_delay_;
470         }
471
472 private:
473
474         static int interrupt_cb(void* ctx)
475         {
476                 CASPAR_ASSERT(ctx);
477                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
478         }
479                 
480         std::shared_ptr<AVStream> open_encoder(
481                         const AVCodec& codec,
482                         std::map<std::string,
483                         std::string>& options)
484         {                       
485                 auto st = 
486                         avformat_new_stream(
487                                 oc_.get(), 
488                                 &codec);
489
490                 if (!st)                
491                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
492
493                 auto enc = st->codec;
494                                 
495                 CASPAR_VERIFY(enc);
496                                                 
497                 switch(enc->codec_type)
498                 {
499                         case AVMEDIA_TYPE_VIDEO:
500                         {
501                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
502                                 enc->pix_fmt                            = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
503                                 enc->sample_aspect_ratio        = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
504                                 enc->width                                      = video_graph_out_->inputs[0]->w;
505                                 enc->height                                     = video_graph_out_->inputs[0]->h;
506                                 enc->bit_rate_tolerance         = 400 * 1000000;
507                         
508                                 break;
509                         }
510                         case AVMEDIA_TYPE_AUDIO:
511                         {
512                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
513                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
514                                 enc->sample_rate                        = audio_graph_out_->inputs[0]->sample_rate;
515                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
516                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
517                         
518                                 break;
519                         }
520                 }
521                                                                                 
522                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
523                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
524                 
525                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
526
527                 const auto char_id = char_id_map.at(enc->codec_type);
528                                                                 
529                 const auto codec_opts = 
530                         remove_options(
531                                 options, 
532                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
533                 
534                 AVDictionary* av_codec_opts = nullptr;
535
536                 to_dict(
537                         &av_codec_opts, 
538                         options);
539
540                 to_dict(
541                         &av_codec_opts,
542                         codec_opts);
543
544                 options.clear();
545                 
546                 FF(avcodec_open2(
547                         enc,            
548                         &codec, 
549                         av_codec_opts ? &av_codec_opts : nullptr));             
550
551                 if(av_codec_opts)
552                 {
553                         auto t = 
554                                 av_dict_get(
555                                         av_codec_opts, 
556                                         "", 
557                                          nullptr, 
558                                         AV_DICT_IGNORE_SUFFIX);
559
560                         while(t)
561                         {
562                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
563
564                                 t = av_dict_get(
565                                                 av_codec_opts, 
566                                                 "", 
567                                                 t, 
568                                                 AV_DICT_IGNORE_SUFFIX);
569                         }
570
571                         av_dict_free(&av_codec_opts);
572                 }
573                                 
574                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
575                 {
576                         CASPAR_ASSERT(enc->frame_size > 0);
577                         av_buffersink_set_frame_size(audio_graph_out_, 
578                                                                                  enc->frame_size);
579                 }
580                 
581                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
582                 {
583                         avcodec_close(st->codec);
584                 });
585         }
586
587         void configue_audio_bistream_filters(
588                         std::map<std::string, std::string>& options)
589         {
590                 const auto audio_bitstream_filter_str = 
591                         try_remove_arg<std::string>(
592                                 options, 
593                                 boost::regex("^bsf:a|absf$"));
594
595                 const auto audio_bitstream_filter = 
596                         audio_bitstream_filter_str 
597                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
598                                 : nullptr;
599
600                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
601
602                 if(audio_bitstream_filter)
603                 {
604                         audio_bitstream_filter_.reset(
605                                 audio_bitstream_filter, 
606                                 av_bitstream_filter_close);
607                 }
608                 
609                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
610                         options["bsf:a"] = *audio_bitstream_filter_str;
611         }
612         
613         void configue_video_bistream_filters(
614                         std::map<std::string, std::string>& options)
615         {
616                 const auto video_bitstream_filter_str = 
617                                 try_remove_arg<std::string>(
618                                         options, 
619                                         boost::regex("^bsf:v|vbsf$"));
620
621                 const auto video_bitstream_filter = 
622                         video_bitstream_filter_str 
623                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
624                                 : nullptr;
625
626                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
627
628                 if(video_bitstream_filter)
629                 {
630                         video_bitstream_filter_.reset(
631                                 video_bitstream_filter, 
632                                 av_bitstream_filter_close);
633                 }
634                 
635                 if(video_bitstream_filter_str && !video_bitstream_filter_)
636                         options["bsf:v"] = *video_bitstream_filter_str;
637         }
638         
639         void configure_video_filters(
640                         const AVCodec& codec,
641                         const std::string& filtergraph)
642         {
643                 video_graph_.reset(
644                                 avfilter_graph_alloc(), 
645                                 [](AVFilterGraph* p)
646                                 {
647                                         avfilter_graph_free(&p);
648                                 });
649                 
650                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
651                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
652
653                 const auto sample_aspect_ratio =
654                         boost::rational<int>(
655                                         in_video_format_.square_width,
656                                         in_video_format_.square_height) /
657                         boost::rational<int>(
658                                         in_video_format_.width,
659                                         in_video_format_.height);
660                 
661                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
662                         % in_video_format_.width % in_video_format_.height
663                         % AV_PIX_FMT_BGRA
664                         % in_video_format_.duration     % in_video_format_.time_scale
665                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
666                         % in_video_format_.time_scale % in_video_format_.duration).str();
667                                         
668                 AVFilterContext* filt_vsrc = nullptr;                   
669                 FF(avfilter_graph_create_filter(
670                                 &filt_vsrc,
671                                 avfilter_get_by_name("buffer"), 
672                                 "ffmpeg_consumer_buffer",
673                                 vsrc_options.c_str(), 
674                                 nullptr, 
675                                 video_graph_.get()));
676                                 
677                 AVFilterContext* filt_vsink = nullptr;
678                 FF(avfilter_graph_create_filter(
679                                 &filt_vsink,
680                                 avfilter_get_by_name("buffersink"), 
681                                 "ffmpeg_consumer_buffersink",
682                                 nullptr, 
683                                 nullptr, 
684                                 video_graph_.get()));
685                 
686 #pragma warning (push)
687 #pragma warning (disable : 4245)
688
689                 FF(av_opt_set_int_list(
690                                 filt_vsink, 
691                                 "pix_fmts", 
692                                 codec.pix_fmts, 
693                                 -1,
694                                 AV_OPT_SEARCH_CHILDREN));
695
696 #pragma warning (pop)
697                         
698                 configure_filtergraph(
699                                 *video_graph_, 
700                                 filtergraph,
701                                 *filt_vsrc,
702                                 *filt_vsink);
703
704                 video_graph_in_  = filt_vsrc;
705                 video_graph_out_ = filt_vsink;
706                 
707                 CASPAR_LOG(info)
708                         <<      u16(std::string("\n") 
709                                 + avfilter_graph_dump(
710                                                 video_graph_.get(), 
711                                                 nullptr));
712         }
713
714         void configure_audio_filters(
715                         const AVCodec& codec,
716                         const std::string& filtergraph)
717         {
718                 audio_graph_.reset(
719                         avfilter_graph_alloc(), 
720                         [](AVFilterGraph* p)
721                         {
722                                 avfilter_graph_free(&p);
723                         });
724                 
725                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
726                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
727                 
728                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
729                         % in_video_format_.audio_sample_rate
730                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
731                         % in_channel_layout_.num_channels
732                         % 1     % in_video_format_.audio_sample_rate
733                         % boost::io::group(
734                                 std::hex, 
735                                 std::showbase, 
736                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
737
738                 AVFilterContext* filt_asrc = nullptr;
739                 FF(avfilter_graph_create_filter(
740                         &filt_asrc,
741                         avfilter_get_by_name("abuffer"), 
742                         "ffmpeg_consumer_abuffer",
743                         asrc_options.c_str(), 
744                         nullptr, 
745                         audio_graph_.get()));
746                                 
747                 AVFilterContext* filt_asink = nullptr;
748                 FF(avfilter_graph_create_filter(
749                         &filt_asink,
750                         avfilter_get_by_name("abuffersink"), 
751                         "ffmpeg_consumer_abuffersink",
752                         nullptr, 
753                         nullptr, 
754                         audio_graph_.get()));
755                 
756 #pragma warning (push)
757 #pragma warning (disable : 4245)
758
759                 FF(av_opt_set_int(
760                         filt_asink,        
761                         "all_channel_counts",
762                         1,      
763                         AV_OPT_SEARCH_CHILDREN));
764
765                 FF(av_opt_set_int_list(
766                         filt_asink, 
767                         "sample_fmts",           
768                         codec.sample_fmts,                              
769                         -1, 
770                         AV_OPT_SEARCH_CHILDREN));
771
772                 FF(av_opt_set_int_list(
773                         filt_asink,
774                         "channel_layouts",       
775                         codec.channel_layouts,                  
776                         -1, 
777                         AV_OPT_SEARCH_CHILDREN));
778
779                 FF(av_opt_set_int_list(
780                         filt_asink, 
781                         "sample_rates" ,         
782                         codec.supported_samplerates,    
783                         -1, 
784                         AV_OPT_SEARCH_CHILDREN));
785
786 #pragma warning (pop)
787                         
788                 configure_filtergraph(
789                         *audio_graph_, 
790                         filtergraph, 
791                         *filt_asrc, 
792                         *filt_asink);
793
794                 audio_graph_in_  = filt_asrc;
795                 audio_graph_out_ = filt_asink;
796
797                 CASPAR_LOG(info) 
798                         <<      u16(std::string("\n") 
799                                 + avfilter_graph_dump(
800                                         audio_graph_.get(), 
801                                         nullptr));
802         }
803
804         void configure_filtergraph(
805                         AVFilterGraph& graph,
806                         const std::string& filtergraph,
807                         AVFilterContext& source_ctx,
808                         AVFilterContext& sink_ctx)
809         {
810                 AVFilterInOut* outputs = nullptr;
811                 AVFilterInOut* inputs = nullptr;
812
813                 try
814                 {
815                         if(!filtergraph.empty())
816                         {
817                                 outputs = avfilter_inout_alloc();
818                                 inputs  = avfilter_inout_alloc();
819
820                                 CASPAR_VERIFY(outputs && inputs);
821
822                                 outputs->name       = av_strdup("in");
823                                 outputs->filter_ctx = &source_ctx;
824                                 outputs->pad_idx    = 0;
825                                 outputs->next       = nullptr;
826
827                                 inputs->name        = av_strdup("out");
828                                 inputs->filter_ctx  = &sink_ctx;
829                                 inputs->pad_idx     = 0;
830                                 inputs->next        = nullptr;
831
832                                 FF(avfilter_graph_parse(
833                                         &graph, 
834                                         filtergraph.c_str(), 
835                                         inputs,
836                                         outputs,
837                                         nullptr));
838                         } 
839                         else 
840                         {
841                                 FF(avfilter_link(
842                                         &source_ctx, 
843                                         0, 
844                                         &sink_ctx, 
845                                         0));
846                         }
847
848                         FF(avfilter_graph_config(
849                                 &graph, 
850                                 nullptr));
851                 }
852                 catch(...)
853                 {
854                         avfilter_inout_free(&outputs);
855                         avfilter_inout_free(&inputs);
856                         throw;
857                 }
858         }
859         
860         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
861         {               
862                 if(!video_st_)
863                         return;
864
865                 auto enc = video_st_->codec;
866                         
867                 std::shared_ptr<AVFrame> src_av_frame;
868
869                 if(frame_ptr != core::const_frame::empty())
870                 {
871                         src_av_frame.reset(
872                                 av_frame_alloc(),
873                                 [frame_ptr](AVFrame* frame)
874                                 {
875                                         av_frame_free(&frame);
876                                 });
877
878                         avcodec_get_frame_defaults(src_av_frame.get());         
879                         
880                         const auto sample_aspect_ratio = 
881                                 boost::rational<int>(
882                                         in_video_format_.square_width, 
883                                         in_video_format_.square_height) /
884                                 boost::rational<int>(
885                                         in_video_format_.width, 
886                                         in_video_format_.height);
887
888                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
889                         src_av_frame->width                                       = in_video_format_.width;
890                         src_av_frame->height                              = in_video_format_.height;
891                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
892                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
893                         src_av_frame->pts                                         = video_pts_;
894
895                         video_pts_ += 1;
896
897                         FF(av_image_fill_arrays(
898                                 src_av_frame->data,
899                                 src_av_frame->linesize,
900                                 frame_ptr.image_data().begin(),
901                                 static_cast<AVPixelFormat>(src_av_frame->format), 
902                                 in_video_format_.width, 
903                                 in_video_format_.height, 
904                                 1));
905
906                         FF(av_buffersrc_add_frame(
907                                 video_graph_in_, 
908                                 src_av_frame.get()));
909                 }               
910
911                 int ret = 0;
912
913                 while(ret >= 0)
914                 {
915                         std::shared_ptr<AVFrame> filt_frame(
916                                 av_frame_alloc(), 
917                                 [](AVFrame* p)
918                                 {
919                                         av_frame_free(&p);
920                                 });
921
922                         ret = av_buffersink_get_frame(
923                                 video_graph_out_, 
924                                 filt_frame.get());
925                                                 
926                         video_encoder_executor_.begin_invoke([=]
927                         {
928                                 if(ret == AVERROR_EOF)
929                                 {
930                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
931                                         {
932                                                 while(encode_av_frame(
933                                                                 *video_st_, 
934                                                                 video_bitstream_filter_.get(),
935                                                                 avcodec_encode_video2, 
936                                                                 nullptr, token))
937                                                 {
938                                                         boost::this_thread::yield(); // TODO:
939                                                 }
940                                         }               
941                                 }
942                                 else if(ret != AVERROR(EAGAIN))
943                                 {
944                                         FF_RET(ret, "av_buffersink_get_frame");
945                                         
946                                         if (filt_frame->interlaced_frame) 
947                                         {
948                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
949                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
950                                                 else
951                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
952                                         } 
953                                         else
954                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
955
956                                         filt_frame->quality = enc->global_quality;
957
958                                         if (!enc->me_threshold)
959                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
960                         
961                                         encode_av_frame(
962                                                 *video_st_,
963                                                 video_bitstream_filter_.get(),
964                                                 avcodec_encode_video2,
965                                                 filt_frame, 
966                                                 token);
967
968                                         boost::this_thread::yield(); // TODO:
969                                 }
970                         });
971                 }
972         }
973                                         
974         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
975         {               
976                 if(!audio_st_)
977                         return;
978                 
979                 auto enc = audio_st_->codec;
980                         
981                 std::shared_ptr<AVFrame> src_av_frame;
982
983                 if(frame_ptr != core::const_frame::empty())
984                 {
985                         src_av_frame.reset(
986                                 av_frame_alloc(), 
987                                 [](AVFrame* p)
988                                 {
989                                         av_frame_free(&p);
990                                 });
991                 
992                         src_av_frame->channels           = in_channel_layout_.num_channels;
993                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
994                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
995                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
996                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
997                         src_av_frame->pts                        = audio_pts_;
998
999                         audio_pts_ += src_av_frame->nb_samples;
1000
1001                         FF(av_samples_fill_arrays(
1002                                         src_av_frame->extended_data,
1003                                         src_av_frame->linesize,
1004                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
1005                                         src_av_frame->channels,
1006                                         src_av_frame->nb_samples,
1007                                         static_cast<AVSampleFormat>(src_av_frame->format),
1008                                         16));
1009                 
1010                         FF(av_buffersrc_add_frame(
1011                                         audio_graph_in_, 
1012                                         src_av_frame.get()));
1013                 }
1014
1015                 int ret = 0;
1016
1017                 while(ret >= 0)
1018                 {
1019                         std::shared_ptr<AVFrame> filt_frame(
1020                                 av_frame_alloc(), 
1021                                 [](AVFrame* p)
1022                                 {
1023                                         av_frame_free(&p);
1024                                 });
1025
1026                         ret = av_buffersink_get_frame(
1027                                 audio_graph_out_, 
1028                                 filt_frame.get());
1029                                         
1030                         audio_encoder_executor_.begin_invoke([=]
1031                         {       
1032                                 if(ret == AVERROR_EOF)
1033                                 {
1034                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1035                                         {
1036                                                 while(encode_av_frame(
1037                                                                 *audio_st_, 
1038                                                                 audio_bitstream_filter_.get(), 
1039                                                                 avcodec_encode_audio2, 
1040                                                                 nullptr, 
1041                                                                 token))
1042                                                 {
1043                                                         boost::this_thread::yield(); // TODO:
1044                                                 }
1045                                         }
1046                                 }
1047                                 else if(ret != AVERROR(EAGAIN))
1048                                 {
1049                                         FF_RET(
1050                                                 ret, 
1051                                                 "av_buffersink_get_frame");
1052
1053                                         encode_av_frame(
1054                                                 *audio_st_, 
1055                                                 audio_bitstream_filter_.get(), 
1056                                                 avcodec_encode_audio2, 
1057                                                 filt_frame, 
1058                                                 token);
1059
1060                                         boost::this_thread::yield(); // TODO:
1061                                 }
1062                         });
1063                 }
1064         }
1065         
1066         template<typename F>
1067         bool encode_av_frame(
1068                         AVStream& st,
1069                         AVBitStreamFilterContext* bsfc, 
1070                         const F& func, 
1071                         const std::shared_ptr<AVFrame>& src_av_frame, 
1072                         std::shared_ptr<void> token)
1073         {
1074                 AVPacket pkt = {};
1075                 av_init_packet(&pkt);
1076
1077                 int got_packet = 0;
1078
1079                 FF(func(
1080                         st.codec, 
1081                         &pkt, 
1082                         src_av_frame.get(), 
1083                         &got_packet));
1084                                         
1085                 if(!got_packet || pkt.size <= 0)
1086                         return false;
1087
1088                 pkt.stream_index = st.index;
1089                 
1090                 if(bsfc)
1091                 {
1092                         auto new_pkt = pkt;
1093
1094                         auto a = av_bitstream_filter_filter(
1095                                         bsfc,
1096                                         st.codec,
1097                                         nullptr,
1098                                         &new_pkt.data,
1099                                         &new_pkt.size,
1100                                         pkt.data,
1101                                         pkt.size,
1102                                         pkt.flags & AV_PKT_FLAG_KEY);
1103
1104                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1105                         {
1106                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1107
1108                                 if(t) 
1109                                 {
1110                                         memcpy(
1111                                                 t, 
1112                                                 new_pkt.data,
1113                                                 new_pkt.size);
1114
1115                                         memset(
1116                                                 t + new_pkt.size, 
1117                                                 0, 
1118                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1119
1120                                         new_pkt.data = t;
1121                                         new_pkt.buf  = nullptr;
1122                                 } 
1123                                 else
1124                                         a = AVERROR(ENOMEM);
1125                         }
1126
1127                         av_free_packet(&pkt);
1128
1129                         FF_RET(
1130                                 a, 
1131                                 "av_bitstream_filter_filter");
1132
1133                         new_pkt.buf =
1134                                 av_buffer_create(
1135                                         new_pkt.data, 
1136                                         new_pkt.size,
1137                                         av_buffer_default_free, 
1138                                         nullptr, 
1139                                         0);
1140
1141                         CASPAR_VERIFY(new_pkt.buf);
1142
1143                         pkt = new_pkt;
1144                 }
1145                 
1146                 if (pkt.pts != AV_NOPTS_VALUE)
1147                 {
1148                         pkt.pts = 
1149                                 av_rescale_q(
1150                                         pkt.pts,
1151                                         st.codec->time_base, 
1152                                         st.time_base);
1153                 }
1154
1155                 if (pkt.dts != AV_NOPTS_VALUE)
1156                 {
1157                         pkt.dts = 
1158                                 av_rescale_q(
1159                                         pkt.dts, 
1160                                         st.codec->time_base, 
1161                                         st.time_base);
1162                 }
1163                                 
1164                 pkt.duration = 
1165                         static_cast<int>(
1166                                 av_rescale_q(
1167                                         pkt.duration, 
1168                                         st.codec->time_base, st.time_base));
1169
1170                 write_packet(
1171                         std::shared_ptr<AVPacket>(
1172                                 new AVPacket(pkt), 
1173                                 [](AVPacket* p)
1174                                 {
1175                                         av_free_packet(p); 
1176                                         delete p;
1177                                 }), token);
1178
1179                 return true;
1180         }
1181
1182         void write_packet(
1183                         const std::shared_ptr<AVPacket>& pkt_ptr,
1184                         std::shared_ptr<void> token)
1185         {               
1186                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1187                 {
1188                         FF(av_interleaved_write_frame(
1189                                 oc_.get(), 
1190                                 pkt_ptr.get()));
1191                 });     
1192         }       
1193         
1194         template<typename T>
1195         static boost::optional<T> try_remove_arg(
1196                         std::map<std::string, std::string>& options, 
1197                         const boost::regex& expr)
1198         {
1199                 for(auto it = options.begin(); it != options.end(); ++it)
1200                 {                       
1201                         if(boost::regex_search(it->first, expr))
1202                         {
1203                                 auto arg = it->second;
1204                                 options.erase(it);
1205                                 return boost::lexical_cast<T>(arg);
1206                         }
1207                 }
1208
1209                 return boost::optional<T>();
1210         }
1211                 
1212         static std::map<std::string, std::string> remove_options(
1213                         std::map<std::string, std::string>& options, 
1214                         const boost::regex& expr)
1215         {
1216                 std::map<std::string, std::string> result;
1217                         
1218                 auto it = options.begin();
1219                 while(it != options.end())
1220                 {                       
1221                         boost::smatch what;
1222                         if(boost::regex_search(it->first, what, expr))
1223                         {
1224                                 result[
1225                                         what.size() > 0 && what[1].matched 
1226                                                 ? what[1].str() 
1227                                                 : it->first] = it->second;
1228                                 it = options.erase(it);
1229                         }
1230                         else
1231                                 ++it;
1232                 }
1233
1234                 return result;
1235         }
1236                 
1237         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1238         {               
1239                 for (const auto& entry : c)
1240                 {
1241                         av_dict_set(
1242                                 dest, 
1243                                 entry.first.c_str(), 
1244                                 entry.second.c_str(), 0);
1245                 }
1246         }
1247
1248         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1249         {
1250                 std::map<std::string, std::string> result;
1251                 
1252                 for(auto t = dict 
1253                                 ? av_dict_get(
1254                                         dict, 
1255                                         "", 
1256                                         nullptr, 
1257                                         AV_DICT_IGNORE_SUFFIX) 
1258                                 : nullptr;
1259                         t; 
1260                         t = av_dict_get(
1261                                 dict, 
1262                                 "", 
1263                                 t,
1264                                 AV_DICT_IGNORE_SUFFIX))
1265                 {
1266                         result[t->key] = t->value;
1267                 }
1268
1269                 return result;
1270         }
1271 };
1272
1273 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1274 {
1275         sink.short_description(L"For streaming the contents of a channel using FFMpeg.");
1276         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1277         sink.para()->text(L"For streaming the contents of a channel using FFMpeg");
1278         sink.definitions()
1279                 ->item(L"url", L"The stream URL to create/stream to.")
1280                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFMpeg. For example vcodec or acodec etc.");
1281         sink.para()->text(L"Examples:");
1282         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1283 }
1284
1285 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1286                 const std::vector<std::wstring>& params, core::interaction_sink*)
1287 {       
1288         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1289                 return core::frame_consumer::empty();
1290
1291         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1292         auto path = u8(params.size() > 1 ? params.at(1) : L"");
1293         auto args = u8(boost::join(params, L" "));
1294
1295         return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
1296 }
1297
1298 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1299                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1300 {               
1301         return spl::make_shared<streaming_consumer>(
1302                         u8(ptree.get<std::wstring>(L"path")), 
1303                         u8(ptree.get<std::wstring>(L"args", L"")),
1304                         false);
1305 }
1306
1307 }}