]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
* Code changes required by Visual Studio 2015 including some local disabling of some...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14
15 #include <core/consumer/frame_consumer.h>
16 #include <core/frame/frame.h>
17 #include <core/video_format.h>
18 #include <core/monitor/monitor.h>
19 #include <core/help/help_repository.h>
20 #include <core/help/help_sink.h>
21
22 #include <boost/noncopyable.hpp>
23 #include <boost/rational.hpp>
24 #include <boost/format.hpp>
25 #include <boost/algorithm/string/predicate.hpp>
26 #include <boost/property_tree/ptree.hpp>
27
28 #pragma warning(push)
29 #pragma warning(disable: 4244)
30 #pragma warning(disable: 4245)
31 #include <boost/crc.hpp>
32 #pragma warning(pop)
33
34 #include <tbb/atomic.h>
35 #include <tbb/concurrent_queue.h>
36 #include <tbb/parallel_invoke.h>
37 #include <tbb/parallel_for.h>
38
39 #include <numeric>
40
41 #pragma warning(push)
42 #pragma warning(disable: 4244)
43
44 extern "C" 
45 {
46         #define __STDC_CONSTANT_MACROS
47         #define __STDC_LIMIT_MACROS
48         #include <libavformat/avformat.h>
49         #include <libavcodec/avcodec.h>
50         #include <libavutil/avutil.h>
51         #include <libavutil/frame.h>
52         #include <libavutil/opt.h>
53         #include <libavutil/imgutils.h>
54         #include <libavutil/parseutils.h>
55         #include <libavfilter/avfilter.h>
56         #include <libavfilter/buffersink.h>
57         #include <libavfilter/buffersrc.h>
58 }
59
60 #pragma warning(pop)
61
62 namespace caspar { namespace ffmpeg {
63
64 int crc16(const std::string& str)
65 {
66         boost::crc_16_type result;
67
68         result.process_bytes(str.data(), str.length());
69
70         return result.checksum();
71 }
72
73 class streaming_consumer final : public core::frame_consumer
74 {
75 public:
76         // Static Members
77                 
78 private:
79         core::monitor::subject                                          subject_;
80         boost::filesystem::path                                         path_;
81         int                                                                                     consumer_index_offset_;
82
83         std::map<std::string, std::string>                      options_;
84         bool                                                                            compatibility_mode_;
85                                                                                                 
86         core::video_format_desc                                         in_video_format_;
87
88         std::shared_ptr<AVFormatContext>                        oc_;
89         tbb::atomic<bool>                                                       abort_request_;
90                                                                                                 
91         std::shared_ptr<AVStream>                                       video_st_;
92         std::shared_ptr<AVStream>                                       audio_st_;
93
94         std::int64_t                                                            video_pts_;
95         std::int64_t                                                            audio_pts_;
96                                                                                                                                                                         
97     AVFilterContext*                                                    audio_graph_in_;  
98     AVFilterContext*                                                    audio_graph_out_; 
99     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
100         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
101
102     AVFilterContext*                                                    video_graph_in_;  
103     AVFilterContext*                                                    video_graph_out_; 
104     std::shared_ptr<AVFilterGraph>                              video_graph_;  
105         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
106         
107         executor                                                                        executor_;
108
109         executor                                                                        video_encoder_executor_;
110         executor                                                                        audio_encoder_executor_;
111
112         tbb::atomic<int>                                                        tokens_;
113         boost::mutex                                                            tokens_mutex_;
114         boost::condition_variable                                       tokens_cond_;
115         tbb::atomic<int64_t>                                            current_encoding_delay_;
116
117         executor                                                                        write_executor_;
118         
119 public:
120
121         streaming_consumer(
122                         std::string path, 
123                         std::string options,
124                         bool compatibility_mode)
125                 : path_(path)
126                 , consumer_index_offset_(crc16(path))
127                 , compatibility_mode_(compatibility_mode)
128                 , video_pts_(0)
129                 , audio_pts_(0)
130                 , executor_(print())
131                 , audio_encoder_executor_(print() + L" audio_encoder")
132                 , video_encoder_executor_(print() + L" video_encoder")
133                 , write_executor_(print() + L" io")
134         {               
135                 abort_request_ = false;
136                 current_encoding_delay_ = 0;
137
138                 for(auto it = 
139                                 boost::sregex_iterator(
140                                         options.begin(), 
141                                         options.end(), 
142                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
143                         it != boost::sregex_iterator(); 
144                         ++it)
145                 {                               
146                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
147                 }
148                                                                                 
149         if (options_.find("threads") == options_.end())
150             options_["threads"] = "auto";
151
152                 tokens_ = 
153                         std::max(
154                                 1, 
155                                 try_remove_arg<int>(
156                                         options_, 
157                                         boost::regex("tokens")).get_value_or(2));               
158         }
159                 
160         ~streaming_consumer()
161         {
162                 if(oc_)
163                 {
164                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
165                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
166
167                         video_encoder_executor_.stop();
168                         audio_encoder_executor_.stop();
169                         video_encoder_executor_.join();
170                         audio_encoder_executor_.join();
171
172                         video_graph_.reset();
173                         audio_graph_.reset();
174                         video_st_.reset();
175                         audio_st_.reset();
176
177                         write_packet(nullptr, nullptr);
178
179                         write_executor_.stop();
180                         write_executor_.join();
181
182                         FF(av_write_trailer(oc_.get()));
183
184                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
185                                 avio_close(oc_->pb);
186
187                         oc_.reset();
188                 }
189         }
190
191         void initialize(
192                         const core::video_format_desc& format_desc,
193                         int channel_index) override
194         {
195                 try
196                 {                               
197                         static boost::regex prot_exp("^.+:.*" );
198                         
199                         const auto overwrite = 
200                                 try_remove_arg<std::string>(
201                                         options_,
202                                         boost::regex("y")) != boost::none;
203
204                         if(!boost::regex_match(
205                                         path_.string(), 
206                                         prot_exp))
207                         {
208                                 if(!path_.is_complete())
209                                 {
210                                         path_ = 
211                                                 u8(
212                                                         env::media_folder()) + 
213                                                         path_.string();
214                                 }
215                         
216                                 if(boost::filesystem::exists(path_))
217                                 {
218                                         if(!overwrite && !compatibility_mode_)
219                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
220                                                 
221                                         boost::filesystem::remove(path_);
222                                 }
223                         }
224                                                         
225                         const auto oformat_name = 
226                                 try_remove_arg<std::string>(
227                                         options_, 
228                                         boost::regex("^f|format$"));
229                         
230                         AVFormatContext* oc;
231
232                         FF(avformat_alloc_output_context2(
233                                 &oc, 
234                                 nullptr, 
235                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
236                                 path_.string().c_str()));
237
238                         oc_.reset(
239                                 oc, 
240                                 avformat_free_context);
241                                         
242                         CASPAR_VERIFY(oc_->oformat);
243
244                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
245                         oc_->interrupt_callback.opaque   = this;        
246
247                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
248
249                         in_video_format_ = format_desc;
250                                                         
251                         CASPAR_VERIFY(oc_->oformat);
252                         
253                         const auto video_codec_name = 
254                                 try_remove_arg<std::string>(
255                                         options_, 
256                                         boost::regex("^c:v|codec:v|vcodec$"));
257
258                         const auto video_codec = 
259                                 video_codec_name 
260                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
261                                         : avcodec_find_encoder(oc_->oformat->video_codec);
262                                                 
263                         const auto audio_codec_name = 
264                                 try_remove_arg<std::string>(
265                                         options_, 
266                                          boost::regex("^c:a|codec:a|acodec$"));
267                         
268                         const auto audio_codec = 
269                                 audio_codec_name 
270                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
271                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
272                         
273                         if (!video_codec)
274                                 CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
275                                                 "Failed to find video codec " + (video_codec_name
276                                                                 ? *video_codec_name
277                                                                 : "with id " + boost::lexical_cast<std::string>(
278                                                                                 oc_->oformat->video_codec))));
279                         if (!audio_codec)
280                                 CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
281                                                 "Failed to find audio codec " + (audio_codec_name
282                                                                 ? *audio_codec_name
283                                                                 : "with id " + boost::lexical_cast<std::string>(
284                                                                                 oc_->oformat->audio_codec))));
285                         
286                         // Filters
287
288                         {
289                                 configure_video_filters(
290                                         *video_codec, 
291                                         try_remove_arg<std::string>(options_, 
292                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
293
294                                 configure_audio_filters(
295                                         *audio_codec, 
296                                         try_remove_arg<std::string>(options_,
297                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
298                         }
299
300                         // Bistream Filters
301                         {
302                                 configue_audio_bistream_filters(options_);
303                                 configue_video_bistream_filters(options_);
304                         }
305
306                         // Encoders
307
308                         {
309                                 auto video_options = options_;
310                                 auto audio_options = options_;
311
312                                 video_st_ = open_encoder(
313                                         *video_codec, 
314                                         video_options);
315
316                                 audio_st_ = open_encoder(
317                                         *audio_codec, 
318                                         audio_options);
319
320                                 auto it = options_.begin();
321                                 while(it != options_.end())
322                                 {
323                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
324                                                 it = options_.erase(it);
325                                         else
326                                                 ++it;
327                                 }
328                         }
329
330                         // Output
331                         {
332                                 AVDictionary* av_opts = nullptr;
333
334                                 to_dict(
335                                         &av_opts, 
336                                         std::move(options_));
337
338                                 CASPAR_SCOPE_EXIT
339                                 {
340                                         av_dict_free(&av_opts);
341                                 };
342
343                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
344                                 {
345                                         FF(avio_open2(
346                                                 &oc_->pb, 
347                                                 path_.string().c_str(), 
348                                                 AVIO_FLAG_WRITE, 
349                                                 &oc_->interrupt_callback, 
350                                                 &av_opts));
351                                 }
352                                 
353                                 FF(avformat_write_header(
354                                         oc_.get(), 
355                                         &av_opts));
356                                 
357                                 options_ = to_map(av_opts);
358                         }
359
360                         // Dump Info
361                         
362                         av_dump_format(
363                                 oc_.get(), 
364                                 0, 
365                                 oc_->filename, 
366                                 1);             
367
368                         for (const auto& option : options_)
369                         {
370                                 CASPAR_LOG(warning) 
371                                         << L"Invalid option: -" 
372                                         << u16(option.first) 
373                                         << L" " 
374                                         << u16(option.second);
375                         }
376                 }
377                 catch(...)
378                 {
379                         video_st_.reset();
380                         audio_st_.reset();
381                         oc_.reset();
382                         throw;
383                 }
384         }
385
386         core::monitor::subject& monitor_output() override
387         {
388                 return subject_;
389         }
390
391         std::wstring name() const override
392         {
393                 return L"streaming";
394         }
395
396         std::future<bool> send(core::const_frame frame) override
397         {               
398                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
399                 
400                 --tokens_;
401                 std::shared_ptr<void> token(
402                         nullptr, 
403                         [this, frame](void*)
404                         {
405                                 ++tokens_;
406                                 tokens_cond_.notify_one();
407                                 current_encoding_delay_ = frame.get_age_millis();
408                         });
409
410                 return executor_.begin_invoke([=]() -> bool
411                 {
412                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
413
414                         while(tokens_ < 0)
415                                 tokens_cond_.wait(tokens_lock);
416
417                         video_encoder_executor_.begin_invoke([=]() mutable
418                         {
419                                 encode_video(
420                                         frame, 
421                                         token);
422                         });
423                 
424                         audio_encoder_executor_.begin_invoke([=]() mutable
425                         {
426                                 encode_audio(
427                                         frame, 
428                                         token);
429                         });
430                                 
431                         return true;
432                 });
433         }
434
435         std::wstring print() const override
436         {
437                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
438         }
439         
440         virtual boost::property_tree::wptree info() const override
441         {
442                 boost::property_tree::wptree info;
443                 info.add(L"type", L"stream");
444                 info.add(L"path", path_.wstring());
445                 return info;
446         }
447
448         bool has_synchronization_clock() const override
449         {
450                 return false;
451         }
452
453         int buffer_depth() const override
454         {
455                 return -1;
456         }
457
458         int index() const override
459         {
460                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
461         }
462
463         int64_t presentation_frame_age_millis() const override
464         {
465                 return current_encoding_delay_;
466         }
467
468 private:
469
470         static int interrupt_cb(void* ctx)
471         {
472                 CASPAR_ASSERT(ctx);
473                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
474         }
475                 
476         std::shared_ptr<AVStream> open_encoder(
477                         const AVCodec& codec,
478                         std::map<std::string,
479                         std::string>& options)
480         {                       
481                 auto st = 
482                         avformat_new_stream(
483                                 oc_.get(), 
484                                 &codec);
485
486                 if (!st)                
487                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
488
489                 auto enc = st->codec;
490                                 
491                 CASPAR_VERIFY(enc);
492                                                 
493                 switch(enc->codec_type)
494                 {
495                         case AVMEDIA_TYPE_VIDEO:
496                         {
497                                 st->time_base                           = video_graph_out_->inputs[0]->time_base;
498                                 enc->pix_fmt                            = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
499                                 enc->sample_aspect_ratio        = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
500                                 enc->width                                      = video_graph_out_->inputs[0]->w;
501                                 enc->height                                     = video_graph_out_->inputs[0]->h;
502                                 enc->bit_rate_tolerance         = 400 * 1000000;
503                         
504                                 break;
505                         }
506                         case AVMEDIA_TYPE_AUDIO:
507                         {
508                                 st->time_base                           = audio_graph_out_->inputs[0]->time_base;
509                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
510                                 enc->sample_rate                        = audio_graph_out_->inputs[0]->sample_rate;
511                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
512                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
513                         
514                                 break;
515                         }
516                 }
517                                                                                 
518                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
519                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
520                 
521                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
522
523                 const auto char_id = char_id_map.at(enc->codec_type);
524                                                                 
525                 const auto codec_opts = 
526                         remove_options(
527                                 options, 
528                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
529                 
530                 AVDictionary* av_codec_opts = nullptr;
531
532                 to_dict(
533                         &av_codec_opts, 
534                         options);
535
536                 to_dict(
537                         &av_codec_opts,
538                         codec_opts);
539
540                 options.clear();
541                 
542                 FF(avcodec_open2(
543                         enc,            
544                         &codec, 
545                         av_codec_opts ? &av_codec_opts : nullptr));             
546
547                 if(av_codec_opts)
548                 {
549                         auto t = 
550                                 av_dict_get(
551                                         av_codec_opts, 
552                                         "", 
553                                          nullptr, 
554                                         AV_DICT_IGNORE_SUFFIX);
555
556                         while(t)
557                         {
558                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
559
560                                 t = av_dict_get(
561                                                 av_codec_opts, 
562                                                 "", 
563                                                 t, 
564                                                 AV_DICT_IGNORE_SUFFIX);
565                         }
566
567                         av_dict_free(&av_codec_opts);
568                 }
569                                 
570                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
571                 {
572                         CASPAR_ASSERT(enc->frame_size > 0);
573                         av_buffersink_set_frame_size(audio_graph_out_, 
574                                                                                  enc->frame_size);
575                 }
576                 
577                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
578                 {
579                         avcodec_close(st->codec);
580                 });
581         }
582
583         void configue_audio_bistream_filters(
584                         std::map<std::string, std::string>& options)
585         {
586                 const auto audio_bitstream_filter_str = 
587                         try_remove_arg<std::string>(
588                                 options, 
589                                 boost::regex("^bsf:a|absf$"));
590
591                 const auto audio_bitstream_filter = 
592                         audio_bitstream_filter_str 
593                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
594                                 : nullptr;
595
596                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
597
598                 if(audio_bitstream_filter)
599                 {
600                         audio_bitstream_filter_.reset(
601                                 audio_bitstream_filter, 
602                                 av_bitstream_filter_close);
603                 }
604                 
605                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
606                         options["bsf:a"] = *audio_bitstream_filter_str;
607         }
608         
609         void configue_video_bistream_filters(
610                         std::map<std::string, std::string>& options)
611         {
612                 const auto video_bitstream_filter_str = 
613                                 try_remove_arg<std::string>(
614                                         options, 
615                                         boost::regex("^bsf:v|vbsf$"));
616
617                 const auto video_bitstream_filter = 
618                         video_bitstream_filter_str 
619                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
620                                 : nullptr;
621
622                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
623
624                 if(video_bitstream_filter)
625                 {
626                         video_bitstream_filter_.reset(
627                                 video_bitstream_filter, 
628                                 av_bitstream_filter_close);
629                 }
630                 
631                 if(video_bitstream_filter_str && !video_bitstream_filter_)
632                         options["bsf:v"] = *video_bitstream_filter_str;
633         }
634         
635         void configure_video_filters(
636                         const AVCodec& codec,
637                         const std::string& filtergraph)
638         {
639                 video_graph_.reset(
640                                 avfilter_graph_alloc(), 
641                                 [](AVFilterGraph* p)
642                                 {
643                                         avfilter_graph_free(&p);
644                                 });
645                 
646                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
647                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
648
649                 const auto sample_aspect_ratio =
650                         boost::rational<int>(
651                                         in_video_format_.square_width,
652                                         in_video_format_.square_height) /
653                         boost::rational<int>(
654                                         in_video_format_.width,
655                                         in_video_format_.height);
656                 
657                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
658                         % in_video_format_.width % in_video_format_.height
659                         % AV_PIX_FMT_BGRA
660                         % in_video_format_.duration     % in_video_format_.time_scale
661                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
662                         % in_video_format_.time_scale % in_video_format_.duration).str();
663                                         
664                 AVFilterContext* filt_vsrc = nullptr;                   
665                 FF(avfilter_graph_create_filter(
666                                 &filt_vsrc,
667                                 avfilter_get_by_name("buffer"), 
668                                 "ffmpeg_consumer_buffer",
669                                 vsrc_options.c_str(), 
670                                 nullptr, 
671                                 video_graph_.get()));
672                                 
673                 AVFilterContext* filt_vsink = nullptr;
674                 FF(avfilter_graph_create_filter(
675                                 &filt_vsink,
676                                 avfilter_get_by_name("buffersink"), 
677                                 "ffmpeg_consumer_buffersink",
678                                 nullptr, 
679                                 nullptr, 
680                                 video_graph_.get()));
681                 
682 #pragma warning (push)
683 #pragma warning (disable : 4245)
684
685                 FF(av_opt_set_int_list(
686                                 filt_vsink, 
687                                 "pix_fmts", 
688                                 codec.pix_fmts, 
689                                 -1,
690                                 AV_OPT_SEARCH_CHILDREN));
691
692 #pragma warning (pop)
693                         
694                 configure_filtergraph(
695                                 *video_graph_, 
696                                 filtergraph,
697                                 *filt_vsrc,
698                                 *filt_vsink);
699
700                 video_graph_in_  = filt_vsrc;
701                 video_graph_out_ = filt_vsink;
702                 
703                 CASPAR_LOG(info)
704                         <<      u16(std::string("\n") 
705                                 + avfilter_graph_dump(
706                                                 video_graph_.get(), 
707                                                 nullptr));
708         }
709
710         void configure_audio_filters(
711                         const AVCodec& codec,
712                         const std::string& filtergraph)
713         {
714                 audio_graph_.reset(
715                         avfilter_graph_alloc(), 
716                         [](AVFilterGraph* p)
717                         {
718                                 avfilter_graph_free(&p);
719                         });
720                 
721                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
722                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
723                 
724                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
725                         % in_video_format_.audio_sample_rate
726                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
727                         % in_video_format_.audio_channels
728                         % 1     % in_video_format_.audio_sample_rate
729                         % boost::io::group(
730                                 std::hex, 
731                                 std::showbase, 
732                                 av_get_default_channel_layout(in_video_format_.audio_channels))).str();
733
734                 AVFilterContext* filt_asrc = nullptr;
735                 FF(avfilter_graph_create_filter(
736                         &filt_asrc,
737                         avfilter_get_by_name("abuffer"), 
738                         "ffmpeg_consumer_abuffer",
739                         asrc_options.c_str(), 
740                         nullptr, 
741                         audio_graph_.get()));
742                                 
743                 AVFilterContext* filt_asink = nullptr;
744                 FF(avfilter_graph_create_filter(
745                         &filt_asink,
746                         avfilter_get_by_name("abuffersink"), 
747                         "ffmpeg_consumer_abuffersink",
748                         nullptr, 
749                         nullptr, 
750                         audio_graph_.get()));
751                 
752 #pragma warning (push)
753 #pragma warning (disable : 4245)
754
755                 FF(av_opt_set_int(
756                         filt_asink,        
757                         "all_channel_counts",
758                         1,      
759                         AV_OPT_SEARCH_CHILDREN));
760
761                 FF(av_opt_set_int_list(
762                         filt_asink, 
763                         "sample_fmts",           
764                         codec.sample_fmts,                              
765                         -1, 
766                         AV_OPT_SEARCH_CHILDREN));
767
768                 FF(av_opt_set_int_list(
769                         filt_asink,
770                         "channel_layouts",       
771                         codec.channel_layouts,                  
772                         -1, 
773                         AV_OPT_SEARCH_CHILDREN));
774
775                 FF(av_opt_set_int_list(
776                         filt_asink, 
777                         "sample_rates" ,         
778                         codec.supported_samplerates,    
779                         -1, 
780                         AV_OPT_SEARCH_CHILDREN));
781
782 #pragma warning (pop)
783                         
784                 configure_filtergraph(
785                         *audio_graph_, 
786                         filtergraph, 
787                         *filt_asrc, 
788                         *filt_asink);
789
790                 audio_graph_in_  = filt_asrc;
791                 audio_graph_out_ = filt_asink;
792
793                 CASPAR_LOG(info) 
794                         <<      u16(std::string("\n") 
795                                 + avfilter_graph_dump(
796                                         audio_graph_.get(), 
797                                         nullptr));
798         }
799
800         void configure_filtergraph(
801                         AVFilterGraph& graph,
802                         const std::string& filtergraph,
803                         AVFilterContext& source_ctx,
804                         AVFilterContext& sink_ctx)
805         {
806                 AVFilterInOut* outputs = nullptr;
807                 AVFilterInOut* inputs = nullptr;
808
809                 try
810                 {
811                         if(!filtergraph.empty())
812                         {
813                                 outputs = avfilter_inout_alloc();
814                                 inputs  = avfilter_inout_alloc();
815
816                                 CASPAR_VERIFY(outputs && inputs);
817
818                                 outputs->name       = av_strdup("in");
819                                 outputs->filter_ctx = &source_ctx;
820                                 outputs->pad_idx    = 0;
821                                 outputs->next       = nullptr;
822
823                                 inputs->name        = av_strdup("out");
824                                 inputs->filter_ctx  = &sink_ctx;
825                                 inputs->pad_idx     = 0;
826                                 inputs->next        = nullptr;
827
828                                 FF(avfilter_graph_parse(
829                                         &graph, 
830                                         filtergraph.c_str(), 
831                                         inputs,
832                                         outputs,
833                                         nullptr));
834                         } 
835                         else 
836                         {
837                                 FF(avfilter_link(
838                                         &source_ctx, 
839                                         0, 
840                                         &sink_ctx, 
841                                         0));
842                         }
843
844                         FF(avfilter_graph_config(
845                                 &graph, 
846                                 nullptr));
847                 }
848                 catch(...)
849                 {
850                         avfilter_inout_free(&outputs);
851                         avfilter_inout_free(&inputs);
852                         throw;
853                 }
854         }
855         
856         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
857         {               
858                 if(!video_st_)
859                         return;
860
861                 auto enc = video_st_->codec;
862                         
863                 std::shared_ptr<AVFrame> src_av_frame;
864
865                 if(frame_ptr != core::const_frame::empty())
866                 {
867                         src_av_frame.reset(
868                                 av_frame_alloc(),
869                                 [frame_ptr](AVFrame* frame)
870                                 {
871                                         av_frame_free(&frame);
872                                 });
873
874                         avcodec_get_frame_defaults(src_av_frame.get());         
875                         
876                         const auto sample_aspect_ratio = 
877                                 boost::rational<int>(
878                                         in_video_format_.square_width, 
879                                         in_video_format_.square_height) /
880                                 boost::rational<int>(
881                                         in_video_format_.width, 
882                                         in_video_format_.height);
883
884                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
885                         src_av_frame->width                                       = in_video_format_.width;
886                         src_av_frame->height                              = in_video_format_.height;
887                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
888                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
889                         src_av_frame->pts                                         = video_pts_;
890
891                         video_pts_ += 1;
892
893                         FF(av_image_fill_arrays(
894                                 src_av_frame->data,
895                                 src_av_frame->linesize,
896                                 frame_ptr.image_data().begin(),
897                                 static_cast<AVPixelFormat>(src_av_frame->format), 
898                                 in_video_format_.width, 
899                                 in_video_format_.height, 
900                                 1));
901
902                         FF(av_buffersrc_add_frame(
903                                 video_graph_in_, 
904                                 src_av_frame.get()));
905                 }               
906
907                 int ret = 0;
908
909                 while(ret >= 0)
910                 {
911                         std::shared_ptr<AVFrame> filt_frame(
912                                 av_frame_alloc(), 
913                                 [](AVFrame* p)
914                                 {
915                                         av_frame_free(&p);
916                                 });
917
918                         ret = av_buffersink_get_frame(
919                                 video_graph_out_, 
920                                 filt_frame.get());
921                                                 
922                         video_encoder_executor_.begin_invoke([=]
923                         {
924                                 if(ret == AVERROR_EOF)
925                                 {
926                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
927                                         {
928                                                 while(encode_av_frame(
929                                                                 *video_st_, 
930                                                                 video_bitstream_filter_.get(),
931                                                                 avcodec_encode_video2, 
932                                                                 nullptr, token))
933                                                 {
934                                                         boost::this_thread::yield(); // TODO:
935                                                 }
936                                         }               
937                                 }
938                                 else if(ret != AVERROR(EAGAIN))
939                                 {
940                                         FF_RET(ret, "av_buffersink_get_frame");
941                                         
942                                         if (filt_frame->interlaced_frame) 
943                                         {
944                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
945                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
946                                                 else
947                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
948                                         } 
949                                         else
950                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
951
952                                         filt_frame->quality = enc->global_quality;
953
954                                         if (!enc->me_threshold)
955                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
956                         
957                                         encode_av_frame(
958                                                 *video_st_,
959                                                 video_bitstream_filter_.get(),
960                                                 avcodec_encode_video2,
961                                                 filt_frame, 
962                                                 token);
963
964                                         boost::this_thread::yield(); // TODO:
965                                 }
966                         });
967                 }
968         }
969                                         
970         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
971         {               
972                 if(!audio_st_)
973                         return;
974                 
975                 auto enc = audio_st_->codec;
976                         
977                 std::shared_ptr<AVFrame> src_av_frame;
978
979                 if(frame_ptr != core::const_frame::empty())
980                 {
981                         src_av_frame.reset(
982                                 av_frame_alloc(), 
983                                 [](AVFrame* p)
984                                 {
985                                         av_frame_free(&p);
986                                 });
987                 
988                         src_av_frame->channels           = in_video_format_.audio_channels;
989                         src_av_frame->channel_layout = av_get_default_channel_layout(in_video_format_.audio_channels);
990                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
991                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
992                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
993                         src_av_frame->pts                        = audio_pts_;
994
995                         audio_pts_ += src_av_frame->nb_samples;
996
997                         FF(av_samples_fill_arrays(
998                                         src_av_frame->extended_data,
999                                         src_av_frame->linesize,
1000                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
1001                                         src_av_frame->channels,
1002                                         src_av_frame->nb_samples,
1003                                         static_cast<AVSampleFormat>(src_av_frame->format),
1004                                         16));
1005                 
1006                         FF(av_buffersrc_add_frame(
1007                                         audio_graph_in_, 
1008                                         src_av_frame.get()));
1009                 }
1010
1011                 int ret = 0;
1012
1013                 while(ret >= 0)
1014                 {
1015                         std::shared_ptr<AVFrame> filt_frame(
1016                                 av_frame_alloc(), 
1017                                 [](AVFrame* p)
1018                                 {
1019                                         av_frame_free(&p);
1020                                 });
1021
1022                         ret = av_buffersink_get_frame(
1023                                 audio_graph_out_, 
1024                                 filt_frame.get());
1025                                         
1026                         audio_encoder_executor_.begin_invoke([=]
1027                         {       
1028                                 if(ret == AVERROR_EOF)
1029                                 {
1030                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1031                                         {
1032                                                 while(encode_av_frame(
1033                                                                 *audio_st_, 
1034                                                                 audio_bitstream_filter_.get(), 
1035                                                                 avcodec_encode_audio2, 
1036                                                                 nullptr, 
1037                                                                 token))
1038                                                 {
1039                                                         boost::this_thread::yield(); // TODO:
1040                                                 }
1041                                         }
1042                                 }
1043                                 else if(ret != AVERROR(EAGAIN))
1044                                 {
1045                                         FF_RET(
1046                                                 ret, 
1047                                                 "av_buffersink_get_frame");
1048
1049                                         encode_av_frame(
1050                                                 *audio_st_, 
1051                                                 audio_bitstream_filter_.get(), 
1052                                                 avcodec_encode_audio2, 
1053                                                 filt_frame, 
1054                                                 token);
1055
1056                                         boost::this_thread::yield(); // TODO:
1057                                 }
1058                         });
1059                 }
1060         }
1061         
1062         template<typename F>
1063         bool encode_av_frame(
1064                         AVStream& st,
1065                         AVBitStreamFilterContext* bsfc, 
1066                         const F& func, 
1067                         const std::shared_ptr<AVFrame>& src_av_frame, 
1068                         std::shared_ptr<void> token)
1069         {
1070                 AVPacket pkt = {};
1071                 av_init_packet(&pkt);
1072
1073                 int got_packet = 0;
1074
1075                 FF(func(
1076                         st.codec, 
1077                         &pkt, 
1078                         src_av_frame.get(), 
1079                         &got_packet));
1080                                         
1081                 if(!got_packet || pkt.size <= 0)
1082                         return false;
1083                                 
1084                 pkt.stream_index = st.index;
1085                 
1086                 if(bsfc)
1087                 {
1088                         auto new_pkt = pkt;
1089
1090                         auto a = av_bitstream_filter_filter(
1091                                         bsfc,
1092                                         st.codec,
1093                                         nullptr,
1094                                         &new_pkt.data,
1095                                         &new_pkt.size,
1096                                         pkt.data,
1097                                         pkt.size,
1098                                         pkt.flags & AV_PKT_FLAG_KEY);
1099
1100                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1101                         {
1102                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1103
1104                                 if(t) 
1105                                 {
1106                                         memcpy(
1107                                                 t, 
1108                                                 new_pkt.data,
1109                                                 new_pkt.size);
1110
1111                                         memset(
1112                                                 t + new_pkt.size, 
1113                                                 0, 
1114                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1115
1116                                         new_pkt.data = t;
1117                                         new_pkt.buf  = nullptr;
1118                                 } 
1119                                 else
1120                                         a = AVERROR(ENOMEM);
1121                         }
1122
1123                         av_free_packet(&pkt);
1124
1125                         FF_RET(
1126                                 a, 
1127                                 "av_bitstream_filter_filter");
1128
1129                         new_pkt.buf =
1130                                 av_buffer_create(
1131                                         new_pkt.data, 
1132                                         new_pkt.size,
1133                                         av_buffer_default_free, 
1134                                         nullptr, 
1135                                         0);
1136
1137                         CASPAR_VERIFY(new_pkt.buf);
1138
1139                         pkt = new_pkt;
1140                 }
1141                 
1142                 if (pkt.pts != AV_NOPTS_VALUE)
1143                 {
1144                         pkt.pts = 
1145                                 av_rescale_q(
1146                                         pkt.pts,
1147                                         st.codec->time_base, 
1148                                         st.time_base);
1149                 }
1150
1151                 if (pkt.dts != AV_NOPTS_VALUE)
1152                 {
1153                         pkt.dts = 
1154                                 av_rescale_q(
1155                                         pkt.dts, 
1156                                         st.codec->time_base, 
1157                                         st.time_base);
1158                 }
1159                                 
1160                 pkt.duration = 
1161                         static_cast<int>(
1162                                 av_rescale_q(
1163                                         pkt.duration, 
1164                                         st.codec->time_base, st.time_base));
1165
1166                 write_packet(
1167                         std::shared_ptr<AVPacket>(
1168                                 new AVPacket(pkt), 
1169                                 [](AVPacket* p)
1170                                 {
1171                                         av_free_packet(p); 
1172                                         delete p;
1173                                 }), token);
1174
1175                 return true;
1176         }
1177
1178         void write_packet(
1179                         const std::shared_ptr<AVPacket>& pkt_ptr,
1180                         std::shared_ptr<void> token)
1181         {               
1182                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1183                 {
1184                         FF(av_interleaved_write_frame(
1185                                 oc_.get(), 
1186                                 pkt_ptr.get()));
1187                 });     
1188         }       
1189         
1190         template<typename T>
1191         static boost::optional<T> try_remove_arg(
1192                         std::map<std::string, std::string>& options, 
1193                         const boost::regex& expr)
1194         {
1195                 for(auto it = options.begin(); it != options.end(); ++it)
1196                 {                       
1197                         if(boost::regex_search(it->first, expr))
1198                         {
1199                                 auto arg = it->second;
1200                                 options.erase(it);
1201                                 return boost::lexical_cast<T>(arg);
1202                         }
1203                 }
1204
1205                 return boost::optional<T>();
1206         }
1207                 
1208         static std::map<std::string, std::string> remove_options(
1209                         std::map<std::string, std::string>& options, 
1210                         const boost::regex& expr)
1211         {
1212                 std::map<std::string, std::string> result;
1213                         
1214                 auto it = options.begin();
1215                 while(it != options.end())
1216                 {                       
1217                         boost::smatch what;
1218                         if(boost::regex_search(it->first, what, expr))
1219                         {
1220                                 result[
1221                                         what.size() > 0 && what[1].matched 
1222                                                 ? what[1].str() 
1223                                                 : it->first] = it->second;
1224                                 it = options.erase(it);
1225                         }
1226                         else
1227                                 ++it;
1228                 }
1229
1230                 return result;
1231         }
1232                 
1233         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1234         {               
1235                 for (const auto& entry : c)
1236                 {
1237                         av_dict_set(
1238                                 dest, 
1239                                 entry.first.c_str(), 
1240                                 entry.second.c_str(), 0);
1241                 }
1242         }
1243
1244         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1245         {
1246                 std::map<std::string, std::string> result;
1247                 
1248                 for(auto t = dict 
1249                                 ? av_dict_get(
1250                                         dict, 
1251                                         "", 
1252                                         nullptr, 
1253                                         AV_DICT_IGNORE_SUFFIX) 
1254                                 : nullptr;
1255                         t; 
1256                         t = av_dict_get(
1257                                 dict, 
1258                                 "", 
1259                                 t,
1260                                 AV_DICT_IGNORE_SUFFIX))
1261                 {
1262                         result[t->key] = t->value;
1263                 }
1264
1265                 return result;
1266         }
1267 };
1268
1269 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1270 {
1271         sink.short_description(L"For streaming the contents of a channel using FFMpeg.");
1272         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1273         sink.para()->text(L"For streaming the contents of a channel using FFMpeg");
1274         sink.definitions()
1275                 ->item(L"url", L"The stream URL to create/stream to.")
1276                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFMpeg. For example vcodec or acodec etc.");
1277         sink.para()->text(L"Examples:");
1278         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1279 }
1280
1281 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1282                 const std::vector<std::wstring>& params, core::interaction_sink*)
1283 {       
1284         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1285                 return core::frame_consumer::empty();
1286
1287         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1288         auto path = u8(params.size() > 1 ? params.at(1) : L"");
1289         auto args = u8(boost::join(params, L" "));
1290
1291         return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
1292 }
1293
1294 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1295                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1296 {               
1297         return spl::make_shared<streaming_consumer>(
1298                         u8(ptree.get<std::wstring>(L"path")), 
1299                         u8(ptree.get<std::wstring>(L"args", L"")),
1300                         false);
1301 }
1302
1303 }}