]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
39904a05f9aad28b5c53bbbfe35102682e1216fe
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../ffmpeg_error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14 #include <common/ptree.h>
15
16 #include <core/consumer/frame_consumer.h>
17 #include <core/frame/frame.h>
18 #include <core/frame/audio_channel_layout.h>
19 #include <core/video_format.h>
20 #include <core/monitor/monitor.h>
21 #include <core/help/help_repository.h>
22 #include <core/help/help_sink.h>
23
24 #include <boost/noncopyable.hpp>
25 #include <boost/rational.hpp>
26 #include <boost/format.hpp>
27 #include <boost/algorithm/string/predicate.hpp>
28 #include <boost/property_tree/ptree.hpp>
29
30 #pragma warning(push)
31 #pragma warning(disable: 4244)
32 #pragma warning(disable: 4245)
33 #include <boost/crc.hpp>
34 #pragma warning(pop)
35
36 #include <tbb/atomic.h>
37 #include <tbb/concurrent_queue.h>
38 #include <tbb/parallel_invoke.h>
39 #include <tbb/parallel_for.h>
40
41 #include <numeric>
42
43 #pragma warning(push)
44 #pragma warning(disable: 4244)
45
46 extern "C" 
47 {
48         #define __STDC_CONSTANT_MACROS
49         #define __STDC_LIMIT_MACROS
50         #include <libavformat/avformat.h>
51         #include <libavcodec/avcodec.h>
52         #include <libavutil/avutil.h>
53         #include <libavutil/frame.h>
54         #include <libavutil/opt.h>
55         #include <libavutil/imgutils.h>
56         #include <libavutil/parseutils.h>
57         #include <libavfilter/avfilter.h>
58         #include <libavfilter/buffersink.h>
59         #include <libavfilter/buffersrc.h>
60 }
61
62 #pragma warning(pop)
63
64 namespace caspar { namespace ffmpeg {
65
66 int crc16(const std::string& str)
67 {
68         boost::crc_16_type result;
69
70         result.process_bytes(str.data(), str.length());
71
72         return result.checksum();
73 }
74
75 class streaming_consumer final : public core::frame_consumer
76 {
77 public:
78         // Static Members
79                 
80 private:
81         core::monitor::subject                                          subject_;
82         boost::filesystem::path                                         path_;
83         int                                                                                     consumer_index_offset_;
84
85         std::map<std::string, std::string>                      options_;
86         bool                                                                            compatibility_mode_;
87                                                                                                 
88         core::video_format_desc                                         in_video_format_;
89         core::audio_channel_layout                                      in_channel_layout_                      = core::audio_channel_layout::invalid();
90
91         std::shared_ptr<AVFormatContext>                        oc_;
92         tbb::atomic<bool>                                                       abort_request_;
93                                                                                                 
94         std::shared_ptr<AVStream>                                       video_st_;
95         std::shared_ptr<AVStream>                                       audio_st_;
96
97         std::int64_t                                                            video_pts_;
98         std::int64_t                                                            audio_pts_;
99                                                                                                                                                                         
100     AVFilterContext*                                                    audio_graph_in_;  
101     AVFilterContext*                                                    audio_graph_out_; 
102     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
103         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
104
105     AVFilterContext*                                                    video_graph_in_;  
106     AVFilterContext*                                                    video_graph_out_; 
107     std::shared_ptr<AVFilterGraph>                              video_graph_;  
108         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
109         
110         executor                                                                        executor_;
111
112         executor                                                                        video_encoder_executor_;
113         executor                                                                        audio_encoder_executor_;
114
115         tbb::atomic<int>                                                        tokens_;
116         boost::mutex                                                            tokens_mutex_;
117         boost::condition_variable                                       tokens_cond_;
118         tbb::atomic<int64_t>                                            current_encoding_delay_;
119
120         executor                                                                        write_executor_;
121         
122 public:
123
124         streaming_consumer(
125                         std::string path, 
126                         std::string options,
127                         bool compatibility_mode)
128                 : path_(path)
129                 , consumer_index_offset_(crc16(path))
130                 , compatibility_mode_(compatibility_mode)
131                 , video_pts_(0)
132                 , audio_pts_(0)
133                 , executor_(print())
134                 , audio_encoder_executor_(print() + L" audio_encoder")
135                 , video_encoder_executor_(print() + L" video_encoder")
136                 , write_executor_(print() + L" io")
137         {               
138                 abort_request_ = false;
139                 current_encoding_delay_ = 0;
140
141                 for(auto it = 
142                                 boost::sregex_iterator(
143                                         options.begin(), 
144                                         options.end(), 
145                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
146                         it != boost::sregex_iterator(); 
147                         ++it)
148                 {                               
149                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
150                 }
151                                                                                 
152         if (options_.find("threads") == options_.end())
153             options_["threads"] = "auto";
154
155                 tokens_ = 
156                         std::max(
157                                 1, 
158                                 try_remove_arg<int>(
159                                         options_, 
160                                         boost::regex("tokens")).get_value_or(2));               
161         }
162                 
163         ~streaming_consumer()
164         {
165                 if(oc_)
166                 {
167                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
168                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
169
170                         video_encoder_executor_.stop();
171                         audio_encoder_executor_.stop();
172                         video_encoder_executor_.join();
173                         audio_encoder_executor_.join();
174
175                         video_graph_.reset();
176                         audio_graph_.reset();
177                         video_st_.reset();
178                         audio_st_.reset();
179
180                         write_packet(nullptr, nullptr);
181
182                         write_executor_.stop();
183                         write_executor_.join();
184
185                         FF(av_write_trailer(oc_.get()));
186
187                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
188                                 avio_close(oc_->pb);
189
190                         oc_.reset();
191                 }
192         }
193
194         void initialize(
195                         const core::video_format_desc& format_desc,
196                         const core::audio_channel_layout& channel_layout,
197                         int channel_index) override
198         {
199                 try
200                 {                               
201                         static boost::regex prot_exp("^.+:.*" );
202                         
203                         const auto overwrite = 
204                                 try_remove_arg<std::string>(
205                                         options_,
206                                         boost::regex("y")) != boost::none;
207
208                         if(!boost::regex_match(
209                                         path_.string(), 
210                                         prot_exp))
211                         {
212                                 if(!path_.is_complete())
213                                 {
214                                         path_ = 
215                                                 u8(
216                                                         env::media_folder()) + 
217                                                         path_.string();
218                                 }
219                         
220                                 if(boost::filesystem::exists(path_))
221                                 {
222                                         if(!overwrite && !compatibility_mode_)
223                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
224                                                 
225                                         boost::filesystem::remove(path_);
226                                 }
227                         }
228                                                         
229                         const auto oformat_name = 
230                                 try_remove_arg<std::string>(
231                                         options_, 
232                                         boost::regex("^f|format$"));
233                         
234                         AVFormatContext* oc;
235
236                         FF(avformat_alloc_output_context2(
237                                 &oc, 
238                                 nullptr, 
239                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
240                                 path_.string().c_str()));
241
242                         oc_.reset(
243                                 oc, 
244                                 avformat_free_context);
245                                         
246                         CASPAR_VERIFY(oc_->oformat);
247
248                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
249                         oc_->interrupt_callback.opaque   = this;        
250
251                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
252
253                         in_video_format_ = format_desc;
254                         in_channel_layout_ = channel_layout;
255                                                         
256                         CASPAR_VERIFY(oc_->oformat);
257                         
258                         const auto video_codec_name = 
259                                 try_remove_arg<std::string>(
260                                         options_, 
261                                         boost::regex("^c:v|codec:v|vcodec$"));
262
263                         const auto video_codec = 
264                                 video_codec_name 
265                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
266                                         : avcodec_find_encoder(oc_->oformat->video_codec);
267                                                 
268                         const auto audio_codec_name = 
269                                 try_remove_arg<std::string>(
270                                         options_, 
271                                          boost::regex("^c:a|codec:a|acodec$"));
272                         
273                         const auto audio_codec = 
274                                 audio_codec_name 
275                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
276                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
277                         
278                         if (!video_codec)
279                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
280                                                 "Failed to find video codec " + (video_codec_name
281                                                                 ? *video_codec_name
282                                                                 : "with id " + boost::lexical_cast<std::string>(
283                                                                                 oc_->oformat->video_codec))));
284                         if (!audio_codec)
285                                 CASPAR_THROW_EXCEPTION(user_error() << msg_info(
286                                                 "Failed to find audio codec " + (audio_codec_name
287                                                                 ? *audio_codec_name
288                                                                 : "with id " + boost::lexical_cast<std::string>(
289                                                                                 oc_->oformat->audio_codec))));
290                         
291                         // Filters
292
293                         {
294                                 configure_video_filters(
295                                         *video_codec, 
296                                         try_remove_arg<std::string>(options_, 
297                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
298
299                                 configure_audio_filters(
300                                         *audio_codec, 
301                                         try_remove_arg<std::string>(options_,
302                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
303                         }
304
305                         // Bistream Filters
306                         {
307                                 configue_audio_bistream_filters(options_);
308                                 configue_video_bistream_filters(options_);
309                         }
310
311                         // Encoders
312
313                         {
314                                 auto video_options = options_;
315                                 auto audio_options = options_;
316
317                                 video_st_ = open_encoder(
318                                         *video_codec, 
319                                         video_options);
320
321                                 audio_st_ = open_encoder(
322                                         *audio_codec, 
323                                         audio_options);
324
325                                 auto it = options_.begin();
326                                 while(it != options_.end())
327                                 {
328                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
329                                                 it = options_.erase(it);
330                                         else
331                                                 ++it;
332                                 }
333                         }
334
335                         // Output
336                         {
337                                 AVDictionary* av_opts = nullptr;
338
339                                 to_dict(
340                                         &av_opts, 
341                                         std::move(options_));
342
343                                 CASPAR_SCOPE_EXIT
344                                 {
345                                         av_dict_free(&av_opts);
346                                 };
347
348                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
349                                 {
350                                         FF(avio_open2(
351                                                 &oc_->pb, 
352                                                 path_.string().c_str(), 
353                                                 AVIO_FLAG_WRITE, 
354                                                 &oc_->interrupt_callback, 
355                                                 &av_opts));
356                                 }
357                                 
358                                 FF(avformat_write_header(
359                                         oc_.get(), 
360                                         &av_opts));
361                                 
362                                 options_ = to_map(av_opts);
363                         }
364
365                         // Dump Info
366                         
367                         av_dump_format(
368                                 oc_.get(), 
369                                 0, 
370                                 oc_->filename, 
371                                 1);             
372
373                         for (const auto& option : options_)
374                         {
375                                 CASPAR_LOG(warning) 
376                                         << L"Invalid option: -" 
377                                         << u16(option.first) 
378                                         << L" " 
379                                         << u16(option.second);
380                         }
381                 }
382                 catch(...)
383                 {
384                         video_st_.reset();
385                         audio_st_.reset();
386                         oc_.reset();
387                         throw;
388                 }
389         }
390
391         core::monitor::subject& monitor_output() override
392         {
393                 return subject_;
394         }
395
396         std::wstring name() const override
397         {
398                 return L"streaming";
399         }
400
401         std::future<bool> send(core::const_frame frame) override
402         {               
403                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
404                 
405                 --tokens_;
406                 std::shared_ptr<void> token(
407                         nullptr, 
408                         [this, frame](void*)
409                         {
410                                 ++tokens_;
411                                 tokens_cond_.notify_one();
412                                 current_encoding_delay_ = frame.get_age_millis();
413                         });
414
415                 return executor_.begin_invoke([=]() -> bool
416                 {
417                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
418
419                         while(tokens_ < 0)
420                                 tokens_cond_.wait(tokens_lock);
421
422                         video_encoder_executor_.begin_invoke([=]() mutable
423                         {
424                                 encode_video(
425                                         frame, 
426                                         token);
427                         });
428                 
429                         audio_encoder_executor_.begin_invoke([=]() mutable
430                         {
431                                 encode_audio(
432                                         frame, 
433                                         token);
434                         });
435                                 
436                         return true;
437                 });
438         }
439
440         std::wstring print() const override
441         {
442                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
443         }
444         
445         virtual boost::property_tree::wptree info() const override
446         {
447                 boost::property_tree::wptree info;
448                 info.add(L"type", L"stream");
449                 info.add(L"path", path_.wstring());
450                 return info;
451         }
452
453         bool has_synchronization_clock() const override
454         {
455                 return false;
456         }
457
458         int buffer_depth() const override
459         {
460                 return -1;
461         }
462
463         int index() const override
464         {
465                 return compatibility_mode_ ? 200 : 100000 + consumer_index_offset_;
466         }
467
468         int64_t presentation_frame_age_millis() const override
469         {
470                 return current_encoding_delay_;
471         }
472
473 private:
474
475         static int interrupt_cb(void* ctx)
476         {
477                 CASPAR_ASSERT(ctx);
478                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
479         }
480                 
481         std::shared_ptr<AVStream> open_encoder(
482                         const AVCodec& codec,
483                         std::map<std::string,
484                         std::string>& options)
485         {                       
486                 auto st = 
487                         avformat_new_stream(
488                                 oc_.get(), 
489                                 &codec);
490
491                 if (!st)                
492                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));
493
494                 auto enc = st->codec;
495                                 
496                 CASPAR_VERIFY(enc);
497                                                 
498                 switch(enc->codec_type)
499                 {
500                         case AVMEDIA_TYPE_VIDEO:
501                         {
502                                 enc->time_base                          = video_graph_out_->inputs[0]->time_base;
503                                 enc->pix_fmt                            = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
504                                 enc->sample_aspect_ratio        = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
505                                 enc->width                                      = video_graph_out_->inputs[0]->w;
506                                 enc->height                                     = video_graph_out_->inputs[0]->h;
507                                 enc->bit_rate_tolerance         = 400 * 1000000;
508                         
509                                 break;
510                         }
511                         case AVMEDIA_TYPE_AUDIO:
512                         {
513                                 enc->time_base                          = audio_graph_out_->inputs[0]->time_base;
514                                 enc->sample_fmt                         = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
515                                 enc->sample_rate                        = audio_graph_out_->inputs[0]->sample_rate;
516                                 enc->channel_layout                     = audio_graph_out_->inputs[0]->channel_layout;
517                                 enc->channels                           = audio_graph_out_->inputs[0]->channels;
518                         
519                                 break;
520                         }
521                 }
522                                                                                 
523                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
524                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
525                 
526                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
527
528                 const auto char_id = char_id_map.at(enc->codec_type);
529                                                                 
530                 const auto codec_opts = 
531                         remove_options(
532                                 options, 
533                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
534                 
535                 AVDictionary* av_codec_opts = nullptr;
536
537                 to_dict(
538                         &av_codec_opts, 
539                         options);
540
541                 to_dict(
542                         &av_codec_opts,
543                         codec_opts);
544
545                 options.clear();
546                 
547                 FF(avcodec_open2(
548                         enc,            
549                         &codec, 
550                         av_codec_opts ? &av_codec_opts : nullptr));             
551
552                 if(av_codec_opts)
553                 {
554                         auto t = 
555                                 av_dict_get(
556                                         av_codec_opts, 
557                                         "", 
558                                          nullptr, 
559                                         AV_DICT_IGNORE_SUFFIX);
560
561                         while(t)
562                         {
563                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
564
565                                 t = av_dict_get(
566                                                 av_codec_opts, 
567                                                 "", 
568                                                 t, 
569                                                 AV_DICT_IGNORE_SUFFIX);
570                         }
571
572                         av_dict_free(&av_codec_opts);
573                 }
574                                 
575                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
576                 {
577                         CASPAR_ASSERT(enc->frame_size > 0);
578                         av_buffersink_set_frame_size(audio_graph_out_, 
579                                                                                  enc->frame_size);
580                 }
581                 
582                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
583                 {
584                         avcodec_close(st->codec);
585                 });
586         }
587
588         void configue_audio_bistream_filters(
589                         std::map<std::string, std::string>& options)
590         {
591                 const auto audio_bitstream_filter_str = 
592                         try_remove_arg<std::string>(
593                                 options, 
594                                 boost::regex("^bsf:a|absf$"));
595
596                 const auto audio_bitstream_filter = 
597                         audio_bitstream_filter_str 
598                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
599                                 : nullptr;
600
601                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
602
603                 if(audio_bitstream_filter)
604                 {
605                         audio_bitstream_filter_.reset(
606                                 audio_bitstream_filter, 
607                                 av_bitstream_filter_close);
608                 }
609                 
610                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
611                         options["bsf:a"] = *audio_bitstream_filter_str;
612         }
613         
614         void configue_video_bistream_filters(
615                         std::map<std::string, std::string>& options)
616         {
617                 const auto video_bitstream_filter_str = 
618                                 try_remove_arg<std::string>(
619                                         options, 
620                                         boost::regex("^bsf:v|vbsf$"));
621
622                 const auto video_bitstream_filter = 
623                         video_bitstream_filter_str 
624                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
625                                 : nullptr;
626
627                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
628
629                 if(video_bitstream_filter)
630                 {
631                         video_bitstream_filter_.reset(
632                                 video_bitstream_filter, 
633                                 av_bitstream_filter_close);
634                 }
635                 
636                 if(video_bitstream_filter_str && !video_bitstream_filter_)
637                         options["bsf:v"] = *video_bitstream_filter_str;
638         }
639         
640         void configure_video_filters(
641                         const AVCodec& codec,
642                         const std::string& filtergraph)
643         {
644                 video_graph_.reset(
645                                 avfilter_graph_alloc(), 
646                                 [](AVFilterGraph* p)
647                                 {
648                                         avfilter_graph_free(&p);
649                                 });
650                 
651                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
652                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
653
654                 const auto sample_aspect_ratio =
655                         boost::rational<int>(
656                                         in_video_format_.square_width,
657                                         in_video_format_.square_height) /
658                         boost::rational<int>(
659                                         in_video_format_.width,
660                                         in_video_format_.height);
661                 
662                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
663                         % in_video_format_.width % in_video_format_.height
664                         % AV_PIX_FMT_BGRA
665                         % in_video_format_.duration     % in_video_format_.time_scale
666                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
667                         % in_video_format_.time_scale % in_video_format_.duration).str();
668                                         
669                 AVFilterContext* filt_vsrc = nullptr;                   
670                 FF(avfilter_graph_create_filter(
671                                 &filt_vsrc,
672                                 avfilter_get_by_name("buffer"), 
673                                 "ffmpeg_consumer_buffer",
674                                 vsrc_options.c_str(), 
675                                 nullptr, 
676                                 video_graph_.get()));
677                                 
678                 AVFilterContext* filt_vsink = nullptr;
679                 FF(avfilter_graph_create_filter(
680                                 &filt_vsink,
681                                 avfilter_get_by_name("buffersink"), 
682                                 "ffmpeg_consumer_buffersink",
683                                 nullptr, 
684                                 nullptr, 
685                                 video_graph_.get()));
686                 
687 #pragma warning (push)
688 #pragma warning (disable : 4245)
689
690                 FF(av_opt_set_int_list(
691                                 filt_vsink, 
692                                 "pix_fmts", 
693                                 codec.pix_fmts, 
694                                 -1,
695                                 AV_OPT_SEARCH_CHILDREN));
696
697 #pragma warning (pop)
698                         
699                 configure_filtergraph(
700                                 *video_graph_, 
701                                 filtergraph,
702                                 *filt_vsrc,
703                                 *filt_vsink);
704
705                 video_graph_in_  = filt_vsrc;
706                 video_graph_out_ = filt_vsink;
707                 
708                 CASPAR_LOG(info)
709                         <<      u16(std::string("\n") 
710                                 + avfilter_graph_dump(
711                                                 video_graph_.get(), 
712                                                 nullptr));
713         }
714
715         void configure_audio_filters(
716                         const AVCodec& codec,
717                         const std::string& filtergraph)
718         {
719                 audio_graph_.reset(
720                         avfilter_graph_alloc(), 
721                         [](AVFilterGraph* p)
722                         {
723                                 avfilter_graph_free(&p);
724                         });
725                 
726                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
727                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
728                 
729                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
730                         % in_video_format_.audio_sample_rate
731                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
732                         % in_channel_layout_.num_channels
733                         % 1     % in_video_format_.audio_sample_rate
734                         % boost::io::group(
735                                 std::hex, 
736                                 std::showbase, 
737                                 av_get_default_channel_layout(in_channel_layout_.num_channels))).str();
738
739                 AVFilterContext* filt_asrc = nullptr;
740                 FF(avfilter_graph_create_filter(
741                         &filt_asrc,
742                         avfilter_get_by_name("abuffer"), 
743                         "ffmpeg_consumer_abuffer",
744                         asrc_options.c_str(), 
745                         nullptr, 
746                         audio_graph_.get()));
747                                 
748                 AVFilterContext* filt_asink = nullptr;
749                 FF(avfilter_graph_create_filter(
750                         &filt_asink,
751                         avfilter_get_by_name("abuffersink"), 
752                         "ffmpeg_consumer_abuffersink",
753                         nullptr, 
754                         nullptr, 
755                         audio_graph_.get()));
756                 
757 #pragma warning (push)
758 #pragma warning (disable : 4245)
759
760                 FF(av_opt_set_int(
761                         filt_asink,        
762                         "all_channel_counts",
763                         1,      
764                         AV_OPT_SEARCH_CHILDREN));
765
766                 FF(av_opt_set_int_list(
767                         filt_asink, 
768                         "sample_fmts",           
769                         codec.sample_fmts,                              
770                         -1, 
771                         AV_OPT_SEARCH_CHILDREN));
772
773                 FF(av_opt_set_int_list(
774                         filt_asink,
775                         "channel_layouts",       
776                         codec.channel_layouts,                  
777                         -1, 
778                         AV_OPT_SEARCH_CHILDREN));
779
780                 FF(av_opt_set_int_list(
781                         filt_asink, 
782                         "sample_rates" ,         
783                         codec.supported_samplerates,    
784                         -1, 
785                         AV_OPT_SEARCH_CHILDREN));
786
787 #pragma warning (pop)
788                         
789                 configure_filtergraph(
790                         *audio_graph_, 
791                         filtergraph, 
792                         *filt_asrc, 
793                         *filt_asink);
794
795                 audio_graph_in_  = filt_asrc;
796                 audio_graph_out_ = filt_asink;
797
798                 CASPAR_LOG(info) 
799                         <<      u16(std::string("\n") 
800                                 + avfilter_graph_dump(
801                                         audio_graph_.get(), 
802                                         nullptr));
803         }
804
805         void configure_filtergraph(
806                         AVFilterGraph& graph,
807                         const std::string& filtergraph,
808                         AVFilterContext& source_ctx,
809                         AVFilterContext& sink_ctx)
810         {
811                 AVFilterInOut* outputs = nullptr;
812                 AVFilterInOut* inputs = nullptr;
813
814                 try
815                 {
816                         if(!filtergraph.empty())
817                         {
818                                 outputs = avfilter_inout_alloc();
819                                 inputs  = avfilter_inout_alloc();
820
821                                 CASPAR_VERIFY(outputs && inputs);
822
823                                 outputs->name       = av_strdup("in");
824                                 outputs->filter_ctx = &source_ctx;
825                                 outputs->pad_idx    = 0;
826                                 outputs->next       = nullptr;
827
828                                 inputs->name        = av_strdup("out");
829                                 inputs->filter_ctx  = &sink_ctx;
830                                 inputs->pad_idx     = 0;
831                                 inputs->next        = nullptr;
832
833                                 FF(avfilter_graph_parse(
834                                         &graph, 
835                                         filtergraph.c_str(), 
836                                         inputs,
837                                         outputs,
838                                         nullptr));
839                         } 
840                         else 
841                         {
842                                 FF(avfilter_link(
843                                         &source_ctx, 
844                                         0, 
845                                         &sink_ctx, 
846                                         0));
847                         }
848
849                         FF(avfilter_graph_config(
850                                 &graph, 
851                                 nullptr));
852                 }
853                 catch(...)
854                 {
855                         avfilter_inout_free(&outputs);
856                         avfilter_inout_free(&inputs);
857                         throw;
858                 }
859         }
860         
861         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
862         {               
863                 if(!video_st_)
864                         return;
865
866                 auto enc = video_st_->codec;
867                         
868                 std::shared_ptr<AVFrame> src_av_frame;
869
870                 if(frame_ptr != core::const_frame::empty())
871                 {
872                         src_av_frame.reset(
873                                 av_frame_alloc(),
874                                 [frame_ptr](AVFrame* frame)
875                                 {
876                                         av_frame_free(&frame);
877                                 });
878
879                         avcodec_get_frame_defaults(src_av_frame.get());         
880                         
881                         const auto sample_aspect_ratio = 
882                                 boost::rational<int>(
883                                         in_video_format_.square_width, 
884                                         in_video_format_.square_height) /
885                                 boost::rational<int>(
886                                         in_video_format_.width, 
887                                         in_video_format_.height);
888
889                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
890                         src_av_frame->width                                       = in_video_format_.width;
891                         src_av_frame->height                              = in_video_format_.height;
892                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
893                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
894                         src_av_frame->pts                                         = video_pts_;
895
896                         video_pts_ += 1;
897
898                         FF(av_image_fill_arrays(
899                                 src_av_frame->data,
900                                 src_av_frame->linesize,
901                                 frame_ptr.image_data().begin(),
902                                 static_cast<AVPixelFormat>(src_av_frame->format), 
903                                 in_video_format_.width, 
904                                 in_video_format_.height, 
905                                 1));
906
907                         FF(av_buffersrc_add_frame(
908                                 video_graph_in_, 
909                                 src_av_frame.get()));
910                 }               
911
912                 int ret = 0;
913
914                 while(ret >= 0)
915                 {
916                         std::shared_ptr<AVFrame> filt_frame(
917                                 av_frame_alloc(), 
918                                 [](AVFrame* p)
919                                 {
920                                         av_frame_free(&p);
921                                 });
922
923                         ret = av_buffersink_get_frame(
924                                 video_graph_out_, 
925                                 filt_frame.get());
926                                                 
927                         video_encoder_executor_.begin_invoke([=]
928                         {
929                                 if(ret == AVERROR_EOF)
930                                 {
931                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
932                                         {
933                                                 while(encode_av_frame(
934                                                                 *video_st_, 
935                                                                 video_bitstream_filter_.get(),
936                                                                 avcodec_encode_video2, 
937                                                                 nullptr, token))
938                                                 {
939                                                         boost::this_thread::yield(); // TODO:
940                                                 }
941                                         }               
942                                 }
943                                 else if(ret != AVERROR(EAGAIN))
944                                 {
945                                         FF_RET(ret, "av_buffersink_get_frame");
946                                         
947                                         if (filt_frame->interlaced_frame) 
948                                         {
949                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
950                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
951                                                 else
952                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
953                                         } 
954                                         else
955                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
956
957                                         filt_frame->quality = enc->global_quality;
958
959                                         if (!enc->me_threshold)
960                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
961                         
962                                         encode_av_frame(
963                                                 *video_st_,
964                                                 video_bitstream_filter_.get(),
965                                                 avcodec_encode_video2,
966                                                 filt_frame, 
967                                                 token);
968
969                                         boost::this_thread::yield(); // TODO:
970                                 }
971                         });
972                 }
973         }
974                                         
975         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
976         {               
977                 if(!audio_st_)
978                         return;
979                 
980                 auto enc = audio_st_->codec;
981                         
982                 std::shared_ptr<AVFrame> src_av_frame;
983
984                 if(frame_ptr != core::const_frame::empty())
985                 {
986                         src_av_frame.reset(
987                                 av_frame_alloc(), 
988                                 [](AVFrame* p)
989                                 {
990                                         av_frame_free(&p);
991                                 });
992                 
993                         src_av_frame->channels           = in_channel_layout_.num_channels;
994                         src_av_frame->channel_layout = av_get_default_channel_layout(in_channel_layout_.num_channels);
995                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
996                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
997                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
998                         src_av_frame->pts                        = audio_pts_;
999
1000                         audio_pts_ += src_av_frame->nb_samples;
1001
1002                         FF(av_samples_fill_arrays(
1003                                         src_av_frame->extended_data,
1004                                         src_av_frame->linesize,
1005                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
1006                                         src_av_frame->channels,
1007                                         src_av_frame->nb_samples,
1008                                         static_cast<AVSampleFormat>(src_av_frame->format),
1009                                         16));
1010                 
1011                         FF(av_buffersrc_add_frame(
1012                                         audio_graph_in_, 
1013                                         src_av_frame.get()));
1014                 }
1015
1016                 int ret = 0;
1017
1018                 while(ret >= 0)
1019                 {
1020                         std::shared_ptr<AVFrame> filt_frame(
1021                                 av_frame_alloc(), 
1022                                 [](AVFrame* p)
1023                                 {
1024                                         av_frame_free(&p);
1025                                 });
1026
1027                         ret = av_buffersink_get_frame(
1028                                 audio_graph_out_, 
1029                                 filt_frame.get());
1030                                         
1031                         audio_encoder_executor_.begin_invoke([=]
1032                         {       
1033                                 if(ret == AVERROR_EOF)
1034                                 {
1035                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1036                                         {
1037                                                 while(encode_av_frame(
1038                                                                 *audio_st_, 
1039                                                                 audio_bitstream_filter_.get(), 
1040                                                                 avcodec_encode_audio2, 
1041                                                                 nullptr, 
1042                                                                 token))
1043                                                 {
1044                                                         boost::this_thread::yield(); // TODO:
1045                                                 }
1046                                         }
1047                                 }
1048                                 else if(ret != AVERROR(EAGAIN))
1049                                 {
1050                                         FF_RET(
1051                                                 ret, 
1052                                                 "av_buffersink_get_frame");
1053
1054                                         encode_av_frame(
1055                                                 *audio_st_, 
1056                                                 audio_bitstream_filter_.get(), 
1057                                                 avcodec_encode_audio2, 
1058                                                 filt_frame, 
1059                                                 token);
1060
1061                                         boost::this_thread::yield(); // TODO:
1062                                 }
1063                         });
1064                 }
1065         }
1066         
1067         template<typename F>
1068         bool encode_av_frame(
1069                         AVStream& st,
1070                         AVBitStreamFilterContext* bsfc, 
1071                         const F& func, 
1072                         const std::shared_ptr<AVFrame>& src_av_frame, 
1073                         std::shared_ptr<void> token)
1074         {
1075                 AVPacket pkt = {};
1076                 av_init_packet(&pkt);
1077
1078                 int got_packet = 0;
1079
1080                 FF(func(
1081                         st.codec, 
1082                         &pkt, 
1083                         src_av_frame.get(), 
1084                         &got_packet));
1085                                         
1086                 if(!got_packet || pkt.size <= 0)
1087                         return false;
1088
1089                 pkt.stream_index = st.index;
1090                 
1091                 if(bsfc)
1092                 {
1093                         auto new_pkt = pkt;
1094
1095                         auto a = av_bitstream_filter_filter(
1096                                         bsfc,
1097                                         st.codec,
1098                                         nullptr,
1099                                         &new_pkt.data,
1100                                         &new_pkt.size,
1101                                         pkt.data,
1102                                         pkt.size,
1103                                         pkt.flags & AV_PKT_FLAG_KEY);
1104
1105                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1106                         {
1107                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1108
1109                                 if(t) 
1110                                 {
1111                                         memcpy(
1112                                                 t, 
1113                                                 new_pkt.data,
1114                                                 new_pkt.size);
1115
1116                                         memset(
1117                                                 t + new_pkt.size, 
1118                                                 0, 
1119                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1120
1121                                         new_pkt.data = t;
1122                                         new_pkt.buf  = nullptr;
1123                                 } 
1124                                 else
1125                                         a = AVERROR(ENOMEM);
1126                         }
1127
1128                         av_free_packet(&pkt);
1129
1130                         FF_RET(
1131                                 a, 
1132                                 "av_bitstream_filter_filter");
1133
1134                         new_pkt.buf =
1135                                 av_buffer_create(
1136                                         new_pkt.data, 
1137                                         new_pkt.size,
1138                                         av_buffer_default_free, 
1139                                         nullptr, 
1140                                         0);
1141
1142                         CASPAR_VERIFY(new_pkt.buf);
1143
1144                         pkt = new_pkt;
1145                 }
1146                 
1147                 if (pkt.pts != AV_NOPTS_VALUE)
1148                 {
1149                         pkt.pts = 
1150                                 av_rescale_q(
1151                                         pkt.pts,
1152                                         st.codec->time_base, 
1153                                         st.time_base);
1154                 }
1155
1156                 if (pkt.dts != AV_NOPTS_VALUE)
1157                 {
1158                         pkt.dts = 
1159                                 av_rescale_q(
1160                                         pkt.dts, 
1161                                         st.codec->time_base, 
1162                                         st.time_base);
1163                 }
1164                                 
1165                 pkt.duration = 
1166                         static_cast<int>(
1167                                 av_rescale_q(
1168                                         pkt.duration, 
1169                                         st.codec->time_base, st.time_base));
1170
1171                 write_packet(
1172                         std::shared_ptr<AVPacket>(
1173                                 new AVPacket(pkt), 
1174                                 [](AVPacket* p)
1175                                 {
1176                                         av_free_packet(p); 
1177                                         delete p;
1178                                 }), token);
1179
1180                 return true;
1181         }
1182
1183         void write_packet(
1184                         const std::shared_ptr<AVPacket>& pkt_ptr,
1185                         std::shared_ptr<void> token)
1186         {               
1187                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1188                 {
1189                         FF(av_interleaved_write_frame(
1190                                 oc_.get(), 
1191                                 pkt_ptr.get()));
1192                 });     
1193         }       
1194         
1195         template<typename T>
1196         static boost::optional<T> try_remove_arg(
1197                         std::map<std::string, std::string>& options, 
1198                         const boost::regex& expr)
1199         {
1200                 for(auto it = options.begin(); it != options.end(); ++it)
1201                 {                       
1202                         if(boost::regex_search(it->first, expr))
1203                         {
1204                                 auto arg = it->second;
1205                                 options.erase(it);
1206                                 return boost::lexical_cast<T>(arg);
1207                         }
1208                 }
1209
1210                 return boost::optional<T>();
1211         }
1212                 
1213         static std::map<std::string, std::string> remove_options(
1214                         std::map<std::string, std::string>& options, 
1215                         const boost::regex& expr)
1216         {
1217                 std::map<std::string, std::string> result;
1218                         
1219                 auto it = options.begin();
1220                 while(it != options.end())
1221                 {                       
1222                         boost::smatch what;
1223                         if(boost::regex_search(it->first, what, expr))
1224                         {
1225                                 result[
1226                                         what.size() > 0 && what[1].matched 
1227                                                 ? what[1].str() 
1228                                                 : it->first] = it->second;
1229                                 it = options.erase(it);
1230                         }
1231                         else
1232                                 ++it;
1233                 }
1234
1235                 return result;
1236         }
1237                 
1238         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1239         {               
1240                 for (const auto& entry : c)
1241                 {
1242                         av_dict_set(
1243                                 dest, 
1244                                 entry.first.c_str(), 
1245                                 entry.second.c_str(), 0);
1246                 }
1247         }
1248
1249         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1250         {
1251                 std::map<std::string, std::string> result;
1252                 
1253                 for(auto t = dict 
1254                                 ? av_dict_get(
1255                                         dict, 
1256                                         "", 
1257                                         nullptr, 
1258                                         AV_DICT_IGNORE_SUFFIX) 
1259                                 : nullptr;
1260                         t; 
1261                         t = av_dict_get(
1262                                 dict, 
1263                                 "", 
1264                                 t,
1265                                 AV_DICT_IGNORE_SUFFIX))
1266                 {
1267                         result[t->key] = t->value;
1268                 }
1269
1270                 return result;
1271         }
1272 };
1273
1274 void describe_streaming_consumer(core::help_sink& sink, const core::help_repository& repo)
1275 {
1276         sink.short_description(L"For streaming the contents of a channel using FFmpeg.");
1277         sink.syntax(L"STREAM [url:string] {-[ffmpeg_param1:string] [value1:string] {-[ffmpeg_param2:string] [value2:string] {...}}}");
1278         sink.para()->text(L"For streaming the contents of a channel using FFmpeg");
1279         sink.definitions()
1280                 ->item(L"url", L"The stream URL to create/stream to.")
1281                 ->item(L"ffmpeg_paramX", L"A parameter supported by FFmpeg. For example vcodec or acodec etc.");
1282         sink.para()->text(L"Examples:");
1283         sink.example(L">> ADD 1 STREAM udp://<client_ip_address>:9250 -format mpegts -vcodec libx264 -crf 25 -tune zerolatency -preset ultrafast");
1284 }
1285
1286 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1287                 const std::vector<std::wstring>& params, core::interaction_sink*)
1288 {       
1289         if (params.size() < 1 || (!boost::iequals(params.at(0), L"STREAM") && !boost::iequals(params.at(0), L"FILE")))
1290                 return core::frame_consumer::empty();
1291
1292         auto compatibility_mode = boost::iequals(params.at(0), L"FILE");
1293         auto path = u8(params.size() > 1 ? params.at(1) : L"");
1294         auto args = u8(boost::join(params, L" "));
1295
1296         return spl::make_shared<streaming_consumer>(path, args, compatibility_mode);
1297 }
1298
1299 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1300                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1301 {               
1302         return spl::make_shared<streaming_consumer>(
1303                         u8(ptree_get<std::wstring>(ptree, L"path")), 
1304                         u8(ptree.get<std::wstring>(L"args", L"")),
1305                         false);
1306 }
1307
1308 }}