]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
* Changed usage of BOOST_THROW_EXCEPTION to CASPAR_THROW_EXCEPTION in all places...
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14
15 #include <core/consumer/frame_consumer.h>
16 #include <core/frame/frame.h>
17 #include <core/video_format.h>
18 #include <core/monitor/monitor.h>
19 #include <core/help/help_repository.h>
20 #include <core/help/help_sink.h>
21
22 #include <boost/noncopyable.hpp>
23 #include <boost/rational.hpp>
24 #include <boost/format.hpp>
25 #include <boost/algorithm/string/predicate.hpp>
26 #include <boost/property_tree/ptree.hpp>
27
28 #pragma warning(push)
29 #pragma warning(disable: 4244)
30 #pragma warning(disable: 4245)
31 #include <boost/crc.hpp>
32 #pragma warning(pop)
33
34 #include <tbb/atomic.h>
35 #include <tbb/concurrent_queue.h>
36 #include <tbb/parallel_invoke.h>
37 #include <tbb/parallel_for.h>
38
39 #include <numeric>
40
41 #pragma warning(push)
42 #pragma warning(disable: 4244)
43
44 extern "C" 
45 {
46         #define __STDC_CONSTANT_MACROS
47         #define __STDC_LIMIT_MACROS
48         #include <libavformat/avformat.h>
49         #include <libavcodec/avcodec.h>
50         #include <libavutil/avutil.h>
51         #include <libavutil/frame.h>
52         #include <libavutil/opt.h>
53         #include <libavutil/imgutils.h>
54         #include <libavutil/parseutils.h>
55         #include <libavfilter/avfilter.h>
56         #include <libavfilter/buffersink.h>
57         #include <libavfilter/buffersrc.h>
58 }
59
60 #pragma warning(pop)
61
62 namespace caspar { namespace ffmpeg {
63
64 int crc16(const std::string& str)
65 {
66         boost::crc_16_type result;
67
68         result.process_bytes(str.data(), str.length());
69
70         return result.checksum();
71 }
72
73 class streaming_consumer final : public core::frame_consumer
74 {
75 public:
76         // Static Members
77                 
78 private:
79         core::monitor::subject                                          subject_;
80         boost::filesystem::path                                         path_;
81         int                                                                                     consumer_index_offset_;
82
83         std::map<std::string, std::string>                      options_;
84                                                                                                 
85         core::video_format_desc                                         in_video_format_;
86
87         std::shared_ptr<AVFormatContext>                        oc_;
88         tbb::atomic<bool>                                                       abort_request_;
89                                                                                                 
90         std::shared_ptr<AVStream>                                       video_st_;
91         std::shared_ptr<AVStream>                                       audio_st_;
92
93         std::int64_t                                                            video_pts_;
94         std::int64_t                                                            audio_pts_;
95                                                                                                                                                                         
96     AVFilterContext*                                                    audio_graph_in_;  
97     AVFilterContext*                                                    audio_graph_out_; 
98     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
99         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
100
101     AVFilterContext*                                                    video_graph_in_;  
102     AVFilterContext*                                                    video_graph_out_; 
103     std::shared_ptr<AVFilterGraph>                              video_graph_;  
104         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
105         
106         executor                                                                        executor_;
107
108         executor                                                                        video_encoder_executor_;
109         executor                                                                        audio_encoder_executor_;
110
111         tbb::atomic<int>                                                        tokens_;
112         boost::mutex                                                            tokens_mutex_;
113         boost::condition_variable                                       tokens_cond_;
114         tbb::atomic<int64_t>                                            current_encoding_delay_;
115
116         executor                                                                        write_executor_;
117         
118 public:
119
120         streaming_consumer(
121                         std::string path, 
122                         std::string options)
123                 : path_(path)
124                 , consumer_index_offset_(crc16(path))
125                 , video_pts_(0)
126                 , audio_pts_(0)
127                 , executor_(print())
128                 , audio_encoder_executor_(print() + L" audio_encoder")
129                 , video_encoder_executor_(print() + L" video_encoder")
130                 , write_executor_(print() + L" io")
131         {               
132                 abort_request_ = false;
133                 current_encoding_delay_ = 0;
134
135                 for(auto it = 
136                                 boost::sregex_iterator(
137                                         options.begin(), 
138                                         options.end(), 
139                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
140                         it != boost::sregex_iterator(); 
141                         ++it)
142                 {                               
143                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
144                 }
145                                                                                 
146         if (options_.find("threads") == options_.end())
147             options_["threads"] = "auto";
148
149                 tokens_ = 
150                         std::max(
151                                 1, 
152                                 try_remove_arg<int>(
153                                         options_, 
154                                         boost::regex("tokens")).get_value_or(2));               
155         }
156                 
157         ~streaming_consumer()
158         {
159                 if(oc_)
160                 {
161                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
162                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
163
164                         video_encoder_executor_.stop();
165                         audio_encoder_executor_.stop();
166                         video_encoder_executor_.join();
167                         audio_encoder_executor_.join();
168
169                         video_graph_.reset();
170                         audio_graph_.reset();
171                         video_st_.reset();
172                         audio_st_.reset();
173
174                         write_packet(nullptr, nullptr);
175
176                         write_executor_.stop();
177                         write_executor_.join();
178
179                         FF(av_write_trailer(oc_.get()));
180
181                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
182                                 avio_close(oc_->pb);
183
184                         oc_.reset();
185                 }
186         }
187
188         void initialize(
189                         const core::video_format_desc& format_desc,
190                         int channel_index) override
191         {
192                 try
193                 {                               
194                         static boost::regex prot_exp("^.+:.*" );
195                         
196                         const auto overwrite = 
197                                 try_remove_arg<std::string>(
198                                         options_,
199                                         boost::regex("y")) != nullptr;
200
201                         if(!boost::regex_match(
202                                         path_.string(), 
203                                         prot_exp))
204                         {
205                                 if(!path_.is_complete())
206                                 {
207                                         path_ = 
208                                                 u8(
209                                                         env::media_folder()) + 
210                                                         path_.string();
211                                 }
212                         
213                                 if(boost::filesystem::exists(path_))
214                                 {
215                                         if(!overwrite)
216                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
217                                                 
218                                         boost::filesystem::remove(path_);
219                                 }
220                         }
221                                                         
222                         const auto oformat_name = 
223                                 try_remove_arg<std::string>(
224                                         options_, 
225                                         boost::regex("^f|format$"));
226                         
227                         AVFormatContext* oc;
228
229                         FF(avformat_alloc_output_context2(
230                                 &oc, 
231                                 nullptr, 
232                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
233                                 path_.string().c_str()));
234
235                         oc_.reset(
236                                 oc, 
237                                 avformat_free_context);
238                                         
239                         CASPAR_VERIFY(oc_->oformat);
240
241                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
242                         oc_->interrupt_callback.opaque   = this;        
243
244                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
245
246                         in_video_format_ = format_desc;
247                                                         
248                         CASPAR_VERIFY(oc_->oformat);
249                         
250                         const auto video_codec_name = 
251                                 try_remove_arg<std::string>(
252                                         options_, 
253                                         boost::regex("^c:v|codec:v|vcodec$"));
254
255                         const auto video_codec = 
256                                 video_codec_name 
257                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
258                                         : avcodec_find_encoder(oc_->oformat->video_codec);
259                                                 
260                         const auto audio_codec_name = 
261                                 try_remove_arg<std::string>(
262                                         options_, 
263                                          boost::regex("^c:a|codec:a|acodec$"));
264                         
265                         const auto audio_codec = 
266                                 audio_codec_name 
267                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
268                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
269                         
270                         if (!video_codec)
271                                 CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
272                                                 "Failed to find video codec " + (video_codec_name
273                                                                 ? *video_codec_name
274                                                                 : "with id " + boost::lexical_cast<std::string>(
275                                                                                 oc_->oformat->video_codec))));
276                         if (!audio_codec)
277                                 CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
278                                                 "Failed to find audio codec " + (audio_codec_name
279                                                                 ? *audio_codec_name
280                                                                 : "with id " + boost::lexical_cast<std::string>(
281                                                                                 oc_->oformat->audio_codec))));
282                         
283                         // Filters
284
285                         {
286                                 configure_video_filters(
287                                         *video_codec, 
288                                         try_remove_arg<std::string>(options_, 
289                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
290
291                                 configure_audio_filters(
292                                         *audio_codec, 
293                                         try_remove_arg<std::string>(options_,
294                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
295                         }
296
297                         // Bistream Filters
298                         {
299                                 configue_audio_bistream_filters(options_);
300                                 configue_video_bistream_filters(options_);
301                         }
302
303                         // Encoders
304
305                         {
306                                 auto video_options = options_;
307                                 auto audio_options = options_;
308
309                                 video_st_ = open_encoder(
310                                         *video_codec, 
311                                         video_options);
312
313                                 audio_st_ = open_encoder(
314                                         *audio_codec, 
315                                         audio_options);
316
317                                 auto it = options_.begin();
318                                 while(it != options_.end())
319                                 {
320                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
321                                                 it = options_.erase(it);
322                                         else
323                                                 ++it;
324                                 }
325                         }
326
327                         // Output
328                         {
329                                 AVDictionary* av_opts = nullptr;
330
331                                 to_dict(
332                                         &av_opts, 
333                                         std::move(options_));
334
335                                 CASPAR_SCOPE_EXIT
336                                 {
337                                         av_dict_free(&av_opts);
338                                 };
339
340                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
341                                 {
342                                         FF(avio_open2(
343                                                 &oc_->pb, 
344                                                 path_.string().c_str(), 
345                                                 AVIO_FLAG_WRITE, 
346                                                 &oc_->interrupt_callback, 
347                                                 &av_opts));
348                                 }
349                                 
350                                 FF(avformat_write_header(
351                                         oc_.get(), 
352                                         &av_opts));
353                                 
354                                 options_ = to_map(av_opts);
355                         }
356
357                         // Dump Info
358                         
359                         av_dump_format(
360                                 oc_.get(), 
361                                 0, 
362                                 oc_->filename, 
363                                 1);             
364
365                         for (const auto& option : options_)
366                         {
367                                 CASPAR_LOG(warning) 
368                                         << L"Invalid option: -" 
369                                         << u16(option.first) 
370                                         << L" " 
371                                         << u16(option.second);
372                         }
373                 }
374                 catch(...)
375                 {
376                         video_st_.reset();
377                         audio_st_.reset();
378                         oc_.reset();
379                         throw;
380                 }
381         }
382
383         core::monitor::subject& monitor_output() override
384         {
385                 return subject_;
386         }
387
388         std::wstring name() const override
389         {
390                 return L"streaming";
391         }
392
393         std::future<bool> send(core::const_frame frame) override
394         {               
395                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
396                 
397                 --tokens_;
398                 std::shared_ptr<void> token(
399                         nullptr, 
400                         [this, frame](void*)
401                         {
402                                 ++tokens_;
403                                 tokens_cond_.notify_one();
404                                 current_encoding_delay_ = frame.get_age_millis();
405                         });
406
407                 return executor_.begin_invoke([=]() -> bool
408                 {
409                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
410
411                         while(tokens_ < 0)
412                                 tokens_cond_.wait(tokens_lock);
413
414                         video_encoder_executor_.begin_invoke([=]() mutable
415                         {
416                                 encode_video(
417                                         frame, 
418                                         token);
419                         });
420                 
421                         audio_encoder_executor_.begin_invoke([=]() mutable
422                         {
423                                 encode_audio(
424                                         frame, 
425                                         token);
426                         });
427                                 
428                         return true;
429                 });
430         }
431
432         std::wstring print() const override
433         {
434                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
435         }
436         
437         virtual boost::property_tree::wptree info() const override
438         {
439                 boost::property_tree::wptree info;
440                 info.add(L"type", L"stream");
441                 info.add(L"path", path_.wstring());
442                 return info;
443         }
444
445         bool has_synchronization_clock() const override
446         {
447                 return false;
448         }
449
450         int buffer_depth() const override
451         {
452                 return -1;
453         }
454
455         int index() const override
456         {
457                 return 100000 + consumer_index_offset_;
458         }
459
460         int64_t presentation_frame_age_millis() const override
461         {
462                 return current_encoding_delay_;
463         }
464
465 private:
466
467         static int interrupt_cb(void* ctx)
468         {
469                 CASPAR_ASSERT(ctx);
470                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
471         }
472                 
473         std::shared_ptr<AVStream> open_encoder(
474                         const AVCodec& codec,
475                         std::map<std::string,
476                         std::string>& options)
477         {                       
478                 auto st = 
479                         avformat_new_stream(
480                                 oc_.get(), 
481                                 &codec);
482
483                 if (!st)                
484                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
485
486                 auto enc = st->codec;
487                                 
488                 CASPAR_VERIFY(enc);
489                                                 
490                 switch(enc->codec_type)
491                 {
492                         case AVMEDIA_TYPE_VIDEO:
493                         {                               
494                                 enc->time_base                    = video_graph_out_->inputs[0]->time_base;
495                                 enc->pix_fmt                      = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
496                                 enc->sample_aspect_ratio  = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
497                                 enc->width                                = video_graph_out_->inputs[0]->w;
498                                 enc->height                               = video_graph_out_->inputs[0]->h;
499                         
500                                 break;
501                         }
502                         case AVMEDIA_TYPE_AUDIO:
503                         {
504                                 enc->time_base                    = audio_graph_out_->inputs[0]->time_base;
505                                 enc->sample_fmt                   = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
506                                 enc->sample_rate                  = audio_graph_out_->inputs[0]->sample_rate;
507                                 enc->channel_layout               = audio_graph_out_->inputs[0]->channel_layout;
508                                 enc->channels                     = audio_graph_out_->inputs[0]->channels;
509                         
510                                 break;
511                         }
512                 }
513                                                                                 
514                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
515                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
516                 
517                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
518
519                 const auto char_id = char_id_map.at(enc->codec_type);
520                                                                 
521                 const auto codec_opts = 
522                         remove_options(
523                                 options, 
524                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
525                 
526                 AVDictionary* av_codec_opts = nullptr;
527
528                 to_dict(
529                         &av_codec_opts, 
530                         options);
531
532                 to_dict(
533                         &av_codec_opts,
534                         codec_opts);
535
536                 options.clear();
537                 
538                 FF(avcodec_open2(
539                         enc,            
540                         &codec, 
541                         av_codec_opts ? &av_codec_opts : nullptr));             
542
543                 if(av_codec_opts)
544                 {
545                         auto t = 
546                                 av_dict_get(
547                                         av_codec_opts, 
548                                         "", 
549                                          nullptr, 
550                                         AV_DICT_IGNORE_SUFFIX);
551
552                         while(t)
553                         {
554                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
555
556                                 t = av_dict_get(
557                                                 av_codec_opts, 
558                                                 "", 
559                                                 t, 
560                                                 AV_DICT_IGNORE_SUFFIX);
561                         }
562
563                         av_dict_free(&av_codec_opts);
564                 }
565                                 
566                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
567                 {
568                         CASPAR_ASSERT(enc->frame_size > 0);
569                         av_buffersink_set_frame_size(audio_graph_out_, 
570                                                                                  enc->frame_size);
571                 }
572                 
573                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
574                 {
575                         avcodec_close(st->codec);
576                 });
577         }
578
579         void configue_audio_bistream_filters(
580                         std::map<std::string, std::string>& options)
581         {
582                 const auto audio_bitstream_filter_str = 
583                         try_remove_arg<std::string>(
584                                 options, 
585                                 boost::regex("^bsf:a|absf$"));
586
587                 const auto audio_bitstream_filter = 
588                         audio_bitstream_filter_str 
589                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
590                                 : nullptr;
591
592                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
593
594                 if(audio_bitstream_filter)
595                 {
596                         audio_bitstream_filter_.reset(
597                                 audio_bitstream_filter, 
598                                 av_bitstream_filter_close);
599                 }
600                 
601                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
602                         options["bsf:a"] = *audio_bitstream_filter_str;
603         }
604         
605         void configue_video_bistream_filters(
606                         std::map<std::string, std::string>& options)
607         {
608                 const auto video_bitstream_filter_str = 
609                                 try_remove_arg<std::string>(
610                                         options, 
611                                         boost::regex("^bsf:v|vbsf$"));
612
613                 const auto video_bitstream_filter = 
614                         video_bitstream_filter_str 
615                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
616                                 : nullptr;
617
618                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
619
620                 if(video_bitstream_filter)
621                 {
622                         video_bitstream_filter_.reset(
623                                 video_bitstream_filter, 
624                                 av_bitstream_filter_close);
625                 }
626                 
627                 if(video_bitstream_filter_str && !video_bitstream_filter_)
628                         options["bsf:v"] = *video_bitstream_filter_str;
629         }
630         
631         void configure_video_filters(
632                         const AVCodec& codec,
633                         const std::string& filtergraph)
634         {
635                 video_graph_.reset(
636                                 avfilter_graph_alloc(), 
637                                 [](AVFilterGraph* p)
638                                 {
639                                         avfilter_graph_free(&p);
640                                 });
641                 
642                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
643                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
644
645                 const auto sample_aspect_ratio =
646                         boost::rational<int>(
647                                         in_video_format_.square_width,
648                                         in_video_format_.square_height) /
649                         boost::rational<int>(
650                                         in_video_format_.width,
651                                         in_video_format_.height);
652                 
653                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
654                         % in_video_format_.width % in_video_format_.height
655                         % AV_PIX_FMT_BGRA
656                         % in_video_format_.duration     % in_video_format_.time_scale
657                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
658                         % in_video_format_.time_scale % in_video_format_.duration).str();
659                                         
660                 AVFilterContext* filt_vsrc = nullptr;                   
661                 FF(avfilter_graph_create_filter(
662                                 &filt_vsrc,
663                                 avfilter_get_by_name("buffer"), 
664                                 "ffmpeg_consumer_buffer",
665                                 vsrc_options.c_str(), 
666                                 nullptr, 
667                                 video_graph_.get()));
668                                 
669                 AVFilterContext* filt_vsink = nullptr;
670                 FF(avfilter_graph_create_filter(
671                                 &filt_vsink,
672                                 avfilter_get_by_name("buffersink"), 
673                                 "ffmpeg_consumer_buffersink",
674                                 nullptr, 
675                                 nullptr, 
676                                 video_graph_.get()));
677                 
678 #pragma warning (push)
679 #pragma warning (disable : 4245)
680
681                 FF(av_opt_set_int_list(
682                                 filt_vsink, 
683                                 "pix_fmts", 
684                                 codec.pix_fmts, 
685                                 -1,
686                                 AV_OPT_SEARCH_CHILDREN));
687
688 #pragma warning (pop)
689                         
690                 configure_filtergraph(
691                                 *video_graph_, 
692                                 filtergraph,
693                                 *filt_vsrc,
694                                 *filt_vsink);
695
696                 video_graph_in_  = filt_vsrc;
697                 video_graph_out_ = filt_vsink;
698                 
699                 CASPAR_LOG(info)
700                         <<      u16(std::string("\n") 
701                                 + avfilter_graph_dump(
702                                                 video_graph_.get(), 
703                                                 nullptr));
704         }
705
706         void configure_audio_filters(
707                         const AVCodec& codec,
708                         const std::string& filtergraph)
709         {
710                 audio_graph_.reset(
711                         avfilter_graph_alloc(), 
712                         [](AVFilterGraph* p)
713                         {
714                                 avfilter_graph_free(&p);
715                         });
716                 
717                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
718                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
719                 
720                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
721                         % in_video_format_.audio_sample_rate
722                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
723                         % in_video_format_.audio_channels
724                         % 1     % in_video_format_.audio_sample_rate
725                         % boost::io::group(
726                                 std::hex, 
727                                 std::showbase, 
728                                 av_get_default_channel_layout(in_video_format_.audio_channels))).str();
729
730                 AVFilterContext* filt_asrc = nullptr;
731                 FF(avfilter_graph_create_filter(
732                         &filt_asrc,
733                         avfilter_get_by_name("abuffer"), 
734                         "ffmpeg_consumer_abuffer",
735                         asrc_options.c_str(), 
736                         nullptr, 
737                         audio_graph_.get()));
738                                 
739                 AVFilterContext* filt_asink = nullptr;
740                 FF(avfilter_graph_create_filter(
741                         &filt_asink,
742                         avfilter_get_by_name("abuffersink"), 
743                         "ffmpeg_consumer_abuffersink",
744                         nullptr, 
745                         nullptr, 
746                         audio_graph_.get()));
747                 
748 #pragma warning (push)
749 #pragma warning (disable : 4245)
750
751                 FF(av_opt_set_int(
752                         filt_asink,        
753                         "all_channel_counts",
754                         1,      
755                         AV_OPT_SEARCH_CHILDREN));
756
757                 FF(av_opt_set_int_list(
758                         filt_asink, 
759                         "sample_fmts",           
760                         codec.sample_fmts,                              
761                         -1, 
762                         AV_OPT_SEARCH_CHILDREN));
763
764                 FF(av_opt_set_int_list(
765                         filt_asink,
766                         "channel_layouts",       
767                         codec.channel_layouts,                  
768                         -1, 
769                         AV_OPT_SEARCH_CHILDREN));
770
771                 FF(av_opt_set_int_list(
772                         filt_asink, 
773                         "sample_rates" ,         
774                         codec.supported_samplerates,    
775                         -1, 
776                         AV_OPT_SEARCH_CHILDREN));
777
778 #pragma warning (pop)
779                         
780                 configure_filtergraph(
781                         *audio_graph_, 
782                         filtergraph, 
783                         *filt_asrc, 
784                         *filt_asink);
785
786                 audio_graph_in_  = filt_asrc;
787                 audio_graph_out_ = filt_asink;
788
789                 CASPAR_LOG(info) 
790                         <<      u16(std::string("\n") 
791                                 + avfilter_graph_dump(
792                                         audio_graph_.get(), 
793                                         nullptr));
794         }
795
796         void configure_filtergraph(
797                         AVFilterGraph& graph,
798                         const std::string& filtergraph,
799                         AVFilterContext& source_ctx,
800                         AVFilterContext& sink_ctx)
801         {
802                 AVFilterInOut* outputs = nullptr;
803                 AVFilterInOut* inputs = nullptr;
804
805                 try
806                 {
807                         if(!filtergraph.empty())
808                         {
809                                 outputs = avfilter_inout_alloc();
810                                 inputs  = avfilter_inout_alloc();
811
812                                 CASPAR_VERIFY(outputs && inputs);
813
814                                 outputs->name       = av_strdup("in");
815                                 outputs->filter_ctx = &source_ctx;
816                                 outputs->pad_idx    = 0;
817                                 outputs->next       = nullptr;
818
819                                 inputs->name        = av_strdup("out");
820                                 inputs->filter_ctx  = &sink_ctx;
821                                 inputs->pad_idx     = 0;
822                                 inputs->next        = nullptr;
823
824                                 FF(avfilter_graph_parse(
825                                         &graph, 
826                                         filtergraph.c_str(), 
827                                         inputs,
828                                         outputs,
829                                         nullptr));
830                         } 
831                         else 
832                         {
833                                 FF(avfilter_link(
834                                         &source_ctx, 
835                                         0, 
836                                         &sink_ctx, 
837                                         0));
838                         }
839
840                         FF(avfilter_graph_config(
841                                 &graph, 
842                                 nullptr));
843                 }
844                 catch(...)
845                 {
846                         avfilter_inout_free(&outputs);
847                         avfilter_inout_free(&inputs);
848                         throw;
849                 }
850         }
851         
852         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
853         {               
854                 if(!video_st_)
855                         return;
856
857                 auto enc = video_st_->codec;
858                         
859                 std::shared_ptr<AVFrame> src_av_frame;
860
861                 if(frame_ptr != core::const_frame::empty())
862                 {
863                         src_av_frame.reset(
864                                 av_frame_alloc(),
865                                 [frame_ptr](AVFrame* frame)
866                                 {
867                                         av_frame_free(&frame);
868                                 });
869
870                         avcodec_get_frame_defaults(src_av_frame.get());         
871                         
872                         const auto sample_aspect_ratio = 
873                                 boost::rational<int>(
874                                         in_video_format_.square_width, 
875                                         in_video_format_.square_height) /
876                                 boost::rational<int>(
877                                         in_video_format_.width, 
878                                         in_video_format_.height);
879
880                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
881                         src_av_frame->width                                       = in_video_format_.width;
882                         src_av_frame->height                              = in_video_format_.height;
883                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
884                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
885                         src_av_frame->pts                                         = video_pts_;
886
887                         video_pts_ += 1;
888
889                         FF(av_image_fill_arrays(
890                                 src_av_frame->data,
891                                 src_av_frame->linesize,
892                                 frame_ptr.image_data().begin(),
893                                 static_cast<AVPixelFormat>(src_av_frame->format), 
894                                 in_video_format_.width, 
895                                 in_video_format_.height, 
896                                 1));
897
898                         FF(av_buffersrc_add_frame(
899                                 video_graph_in_, 
900                                 src_av_frame.get()));
901                 }               
902
903                 int ret = 0;
904
905                 while(ret >= 0)
906                 {
907                         std::shared_ptr<AVFrame> filt_frame(
908                                 av_frame_alloc(), 
909                                 [](AVFrame* p)
910                                 {
911                                         av_frame_free(&p);
912                                 });
913
914                         ret = av_buffersink_get_frame(
915                                 video_graph_out_, 
916                                 filt_frame.get());
917                                                 
918                         video_encoder_executor_.begin_invoke([=]
919                         {
920                                 if(ret == AVERROR_EOF)
921                                 {
922                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
923                                         {
924                                                 while(encode_av_frame(
925                                                                 *video_st_, 
926                                                                 video_bitstream_filter_.get(),
927                                                                 avcodec_encode_video2, 
928                                                                 nullptr, token))
929                                                 {
930                                                         boost::this_thread::yield(); // TODO:
931                                                 }
932                                         }               
933                                 }
934                                 else if(ret != AVERROR(EAGAIN))
935                                 {
936                                         FF_RET(ret, "av_buffersink_get_frame");
937                                         
938                                         if (filt_frame->interlaced_frame) 
939                                         {
940                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
941                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
942                                                 else
943                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
944                                         } 
945                                         else
946                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
947
948                                         filt_frame->quality = enc->global_quality;
949
950                                         if (!enc->me_threshold)
951                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
952                         
953                                         encode_av_frame(
954                                                 *video_st_,
955                                                 video_bitstream_filter_.get(),
956                                                 avcodec_encode_video2,
957                                                 filt_frame, 
958                                                 token);
959
960                                         boost::this_thread::yield(); // TODO:
961                                 }
962                         });
963                 }
964         }
965                                         
966         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
967         {               
968                 if(!audio_st_)
969                         return;
970                 
971                 auto enc = audio_st_->codec;
972                         
973                 std::shared_ptr<AVFrame> src_av_frame;
974
975                 if(frame_ptr != core::const_frame::empty())
976                 {
977                         src_av_frame.reset(
978                                 av_frame_alloc(), 
979                                 [](AVFrame* p)
980                                 {
981                                         av_frame_free(&p);
982                                 });
983                 
984                         src_av_frame->channels           = in_video_format_.audio_channels;
985                         src_av_frame->channel_layout = av_get_default_channel_layout(in_video_format_.audio_channels);
986                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
987                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
988                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
989                         src_av_frame->pts                        = audio_pts_;
990
991                         audio_pts_ += src_av_frame->nb_samples;
992
993                         FF(av_samples_fill_arrays(
994                                         src_av_frame->extended_data,
995                                         src_av_frame->linesize,
996                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
997                                         src_av_frame->channels,
998                                         src_av_frame->nb_samples,
999                                         static_cast<AVSampleFormat>(src_av_frame->format),
1000                                         16));
1001                 
1002                         FF(av_buffersrc_add_frame(
1003                                         audio_graph_in_, 
1004                                         src_av_frame.get()));
1005                 }
1006
1007                 int ret = 0;
1008
1009                 while(ret >= 0)
1010                 {
1011                         std::shared_ptr<AVFrame> filt_frame(
1012                                 av_frame_alloc(), 
1013                                 [](AVFrame* p)
1014                                 {
1015                                         av_frame_free(&p);
1016                                 });
1017
1018                         ret = av_buffersink_get_frame(
1019                                 audio_graph_out_, 
1020                                 filt_frame.get());
1021                                         
1022                         audio_encoder_executor_.begin_invoke([=]
1023                         {       
1024                                 if(ret == AVERROR_EOF)
1025                                 {
1026                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1027                                         {
1028                                                 while(encode_av_frame(
1029                                                                 *audio_st_, 
1030                                                                 audio_bitstream_filter_.get(), 
1031                                                                 avcodec_encode_audio2, 
1032                                                                 nullptr, 
1033                                                                 token))
1034                                                 {
1035                                                         boost::this_thread::yield(); // TODO:
1036                                                 }
1037                                         }
1038                                 }
1039                                 else if(ret != AVERROR(EAGAIN))
1040                                 {
1041                                         FF_RET(
1042                                                 ret, 
1043                                                 "av_buffersink_get_frame");
1044
1045                                         encode_av_frame(
1046                                                 *audio_st_, 
1047                                                 audio_bitstream_filter_.get(), 
1048                                                 avcodec_encode_audio2, 
1049                                                 filt_frame, 
1050                                                 token);
1051
1052                                         boost::this_thread::yield(); // TODO:
1053                                 }
1054                         });
1055                 }
1056         }
1057         
1058         template<typename F>
1059         bool encode_av_frame(
1060                         AVStream& st,
1061                         AVBitStreamFilterContext* bsfc, 
1062                         const F& func, 
1063                         const std::shared_ptr<AVFrame>& src_av_frame, 
1064                         std::shared_ptr<void> token)
1065         {
1066                 AVPacket pkt = {};
1067                 av_init_packet(&pkt);
1068
1069                 int got_packet = 0;
1070
1071                 FF(func(
1072                         st.codec, 
1073                         &pkt, 
1074                         src_av_frame.get(), 
1075                         &got_packet));
1076                                         
1077                 if(!got_packet || pkt.size <= 0)
1078                         return false;
1079                                 
1080                 pkt.stream_index = st.index;
1081                 
1082                 if(bsfc)
1083                 {
1084                         auto new_pkt = pkt;
1085
1086                         auto a = av_bitstream_filter_filter(
1087                                         bsfc,
1088                                         st.codec,
1089                                         nullptr,
1090                                         &new_pkt.data,
1091                                         &new_pkt.size,
1092                                         pkt.data,
1093                                         pkt.size,
1094                                         pkt.flags & AV_PKT_FLAG_KEY);
1095
1096                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1097                         {
1098                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1099
1100                                 if(t) 
1101                                 {
1102                                         memcpy(
1103                                                 t, 
1104                                                 new_pkt.data,
1105                                                 new_pkt.size);
1106
1107                                         memset(
1108                                                 t + new_pkt.size, 
1109                                                 0, 
1110                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1111
1112                                         new_pkt.data = t;
1113                                         new_pkt.buf  = nullptr;
1114                                 } 
1115                                 else
1116                                         a = AVERROR(ENOMEM);
1117                         }
1118
1119                         av_free_packet(&pkt);
1120
1121                         FF_RET(
1122                                 a, 
1123                                 "av_bitstream_filter_filter");
1124
1125                         new_pkt.buf =
1126                                 av_buffer_create(
1127                                         new_pkt.data, 
1128                                         new_pkt.size,
1129                                         av_buffer_default_free, 
1130                                         nullptr, 
1131                                         0);
1132
1133                         CASPAR_VERIFY(new_pkt.buf);
1134
1135                         pkt = new_pkt;
1136                 }
1137                 
1138                 if (pkt.pts != AV_NOPTS_VALUE)
1139                 {
1140                         pkt.pts = 
1141                                 av_rescale_q(
1142                                         pkt.pts,
1143                                         st.codec->time_base, 
1144                                         st.time_base);
1145                 }
1146
1147                 if (pkt.dts != AV_NOPTS_VALUE)
1148                 {
1149                         pkt.dts = 
1150                                 av_rescale_q(
1151                                         pkt.dts, 
1152                                         st.codec->time_base, 
1153                                         st.time_base);
1154                 }
1155                                 
1156                 pkt.duration = 
1157                         static_cast<int>(
1158                                 av_rescale_q(
1159                                         pkt.duration, 
1160                                         st.codec->time_base, st.time_base));
1161
1162                 write_packet(
1163                         std::shared_ptr<AVPacket>(
1164                                 new AVPacket(pkt), 
1165                                 [](AVPacket* p)
1166                                 {
1167                                         av_free_packet(p); 
1168                                         delete p;
1169                                 }), token);
1170
1171                 return true;
1172         }
1173
1174         void write_packet(
1175                         const std::shared_ptr<AVPacket>& pkt_ptr,
1176                         std::shared_ptr<void> token)
1177         {               
1178                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1179                 {
1180                         FF(av_interleaved_write_frame(
1181                                 oc_.get(), 
1182                                 pkt_ptr.get()));
1183                 });     
1184         }       
1185         
1186         template<typename T>
1187         static boost::optional<T> try_remove_arg(
1188                         std::map<std::string, std::string>& options, 
1189                         const boost::regex& expr)
1190         {
1191                 for(auto it = options.begin(); it != options.end(); ++it)
1192                 {                       
1193                         if(boost::regex_search(it->first, expr))
1194                         {
1195                                 auto arg = it->second;
1196                                 options.erase(it);
1197                                 return boost::lexical_cast<T>(arg);
1198                         }
1199                 }
1200
1201                 return boost::optional<T>();
1202         }
1203                 
1204         static std::map<std::string, std::string> remove_options(
1205                         std::map<std::string, std::string>& options, 
1206                         const boost::regex& expr)
1207         {
1208                 std::map<std::string, std::string> result;
1209                         
1210                 auto it = options.begin();
1211                 while(it != options.end())
1212                 {                       
1213                         boost::smatch what;
1214                         if(boost::regex_search(it->first, what, expr))
1215                         {
1216                                 result[
1217                                         what.size() > 0 && what[1].matched 
1218                                                 ? what[1].str() 
1219                                                 : it->first] = it->second;
1220                                 it = options.erase(it);
1221                         }
1222                         else
1223                                 ++it;
1224                 }
1225
1226                 return result;
1227         }
1228                 
1229         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1230         {               
1231                 for (const auto& entry : c)
1232                 {
1233                         av_dict_set(
1234                                 dest, 
1235                                 entry.first.c_str(), 
1236                                 entry.second.c_str(), 0);
1237                 }
1238         }
1239
1240         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1241         {
1242                 std::map<std::string, std::string> result;
1243                 
1244                 for(auto t = dict 
1245                                 ? av_dict_get(
1246                                         dict, 
1247                                         "", 
1248                                         nullptr, 
1249                                         AV_DICT_IGNORE_SUFFIX) 
1250                                 : nullptr;
1251                         t; 
1252                         t = av_dict_get(
1253                                 dict, 
1254                                 "", 
1255                                 t,
1256                                 AV_DICT_IGNORE_SUFFIX))
1257                 {
1258                         result[t->key] = t->value;
1259                 }
1260
1261                 return result;
1262         }
1263 };
1264
1265 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1266 {
1267         sink.short_description(L"For streaming the contents of a channel using FFMpeg.");
1268         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1269         sink.para()->text(L"For streaming the contents of a channel using FFMpeg");
1270         sink.definitions()
1271                 ->item(L"url", L"The stream URL to create/stream to.")
1272                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFMpeg. For example vcodec or acodec etc.");
1273         sink.para()->text(L"Examples:");
1274         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1275 }
1276
1277 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1278                 const std::vector<std::wstring>& params, core::interaction_sink*)
1279 {       
1280         if (params.size() < 1 || params.at(0) != L"STREAM")
1281                 return core::frame_consumer::empty();
1282
1283         auto path = u8(params.at(1));
1284         auto args = u8(boost::join(params, L" "));
1285
1286         return spl::make_shared<streaming_consumer>(path, args);
1287 }
1288
1289 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1290                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1291 {               
1292         return spl::make_shared<streaming_consumer>(
1293                         u8(ptree.get<std::wstring>(L"path")), 
1294                         u8(ptree.get<std::wstring>(L"args", L"")));
1295 }
1296
1297 }}