]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/consumer/streaming_consumer.cpp
* Merged streaming_consumer from 2.0
[casparcg] / modules / ffmpeg / consumer / streaming_consumer.cpp
1 #include "../StdAfx.h"
2
3 #include "ffmpeg_consumer.h"
4
5 #include "../util/error.h"
6
7 #include <common/except.h>
8 #include <common/executor.h>
9 #include <common/assert.h>
10 #include <common/utf.h>
11 #include <common/future.h>
12 #include <common/env.h>
13 #include <common/scope_exit.h>
14
15 #include <core/consumer/frame_consumer.h>
16 #include <core/frame/frame.h>
17 #include <core/video_format.h>
18 #include <core/monitor/monitor.h>
19
20 #include <boost/noncopyable.hpp>
21 #include <boost/rational.hpp>
22 #include <boost/format.hpp>
23 #include <boost/algorithm/string/predicate.hpp>
24 #include <boost/property_tree/ptree.hpp>
25
26 #pragma warning(push)
27 #pragma warning(disable: 4244)
28 #pragma warning(disable: 4245)
29 #include <boost/crc.hpp>
30 #pragma warning(pop)
31
32 #include <tbb/atomic.h>
33 #include <tbb/concurrent_queue.h>
34 #include <tbb/parallel_invoke.h>
35 #include <tbb/parallel_for.h>
36
37 #include <agents.h>
38 #include <numeric>
39
40 #pragma warning(push)
41 #pragma warning(disable: 4244)
42
43 extern "C" 
44 {
45         #define __STDC_CONSTANT_MACROS
46         #define __STDC_LIMIT_MACROS
47         #include <libavformat/avformat.h>
48         #include <libavcodec/avcodec.h>
49         #include <libavutil/avutil.h>
50         #include <libavutil/frame.h>
51         #include <libavutil/opt.h>
52         #include <libavutil/imgutils.h>
53         #include <libavutil/parseutils.h>
54         #include <libavfilter/avfilter.h>
55         #include <libavfilter/buffersink.h>
56         #include <libavfilter/buffersrc.h>
57 }
58
59 #pragma warning(pop)
60
61 using namespace Concurrency;
62
63 namespace caspar { namespace ffmpeg {
64
65 int crc16(const std::string& str)
66 {
67         boost::crc_16_type result;
68
69         result.process_bytes(str.data(), str.length());
70
71         return result.checksum();
72 }
73
74 class streaming_consumer final : public core::frame_consumer
75 {
76 public:
77         // Static Members
78                 
79 private:
80         core::monitor::subject                                          subject_;
81         boost::filesystem::path                                         path_;
82         int                                                                                     consumer_index_offset_;
83
84         std::map<std::string, std::string>                      options_;
85                                                                                                 
86         core::video_format_desc                                         in_video_format_;
87
88         std::shared_ptr<AVFormatContext>                        oc_;
89         tbb::atomic<bool>                                                       abort_request_;
90                                                                                                 
91         std::shared_ptr<AVStream>                                       video_st_;
92         std::shared_ptr<AVStream>                                       audio_st_;
93
94         std::int64_t                                                            video_pts_;
95         std::int64_t                                                            audio_pts_;
96                                                                                                                                                                         
97     AVFilterContext*                                                    audio_graph_in_;  
98     AVFilterContext*                                                    audio_graph_out_; 
99     std::shared_ptr<AVFilterGraph>                              audio_graph_;    
100         std::shared_ptr<AVBitStreamFilterContext>       audio_bitstream_filter_;       
101
102     AVFilterContext*                                                    video_graph_in_;  
103     AVFilterContext*                                                    video_graph_out_; 
104     std::shared_ptr<AVFilterGraph>                              video_graph_;  
105         std::shared_ptr<AVBitStreamFilterContext>       video_bitstream_filter_;
106         
107         executor                                                                        executor_;
108
109         executor                                                                        video_encoder_executor_;
110         executor                                                                        audio_encoder_executor_;
111
112         tbb::atomic<int>                                                        tokens_;
113         boost::mutex                                                            tokens_mutex_;
114         boost::condition_variable                                       tokens_cond_;
115
116         executor                                                                        write_executor_;
117         
118 public:
119
120         streaming_consumer(
121                         std::string path, 
122                         std::string options)
123                 : path_(path)
124                 , consumer_index_offset_(crc16(path))
125                 , video_pts_(0)
126                 , audio_pts_(0)
127                 , executor_(print())
128                 , audio_encoder_executor_(print() + L" audio_encoder")
129                 , video_encoder_executor_(print() + L" video_encoder")
130                 , write_executor_(print() + L" io")
131         {               
132                 abort_request_ = false; 
133
134                 for(auto it = 
135                                 boost::sregex_iterator(
136                                         options.begin(), 
137                                         options.end(), 
138                                         boost::regex("-(?<NAME>[^-\\s]+)(\\s+(?<VALUE>[^\\s]+))?")); 
139                         it != boost::sregex_iterator(); 
140                         ++it)
141                 {                               
142                         options_[(*it)["NAME"].str()] = (*it)["VALUE"].matched ? (*it)["VALUE"].str() : "";
143                 }
144                                                                                 
145         if (options_.find("threads") == options_.end())
146             options_["threads"] = "auto";
147
148                 tokens_ = 
149                         std::max(
150                                 1, 
151                                 try_remove_arg<int>(
152                                         options_, 
153                                         boost::regex("tokens")).get_value_or(2));               
154         }
155                 
156         ~streaming_consumer()
157         {
158                 if(oc_)
159                 {
160                         video_encoder_executor_.begin_invoke([&] { encode_video(core::const_frame::empty(), nullptr); });
161                         audio_encoder_executor_.begin_invoke([&] { encode_audio(core::const_frame::empty(), nullptr); });
162
163                         video_encoder_executor_.stop();
164                         audio_encoder_executor_.stop();
165                         video_encoder_executor_.join();
166                         audio_encoder_executor_.join();
167
168                         video_graph_.reset();
169                         audio_graph_.reset();
170                         video_st_.reset();
171                         audio_st_.reset();
172
173                         write_packet(nullptr, nullptr);
174
175                         write_executor_.stop();
176                         write_executor_.join();
177
178                         FF(av_write_trailer(oc_.get()));
179
180                         if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
181                                 avio_close(oc_->pb);
182
183                         oc_.reset();
184                 }
185         }
186
187         void initialize(
188                         const core::video_format_desc& format_desc,
189                         int channel_index) override
190         {
191                 try
192                 {                               
193                         static boost::regex prot_exp("^.+:.*" );
194                         
195                         const auto overwrite = 
196                                 try_remove_arg<std::string>(
197                                         options_,
198                                         boost::regex("y")) != nullptr;
199
200                         if(!boost::regex_match(
201                                         path_.string(), 
202                                         prot_exp))
203                         {
204                                 if(!path_.is_complete())
205                                 {
206                                         path_ = 
207                                                 u8(
208                                                         env::media_folder()) + 
209                                                         path_.string();
210                                 }
211                         
212                                 if(boost::filesystem::exists(path_))
213                                 {
214                                         if(!overwrite)
215                                                 BOOST_THROW_EXCEPTION(invalid_argument() << msg_info("File exists"));
216                                                 
217                                         boost::filesystem::remove(path_);
218                                 }
219                         }
220                                                         
221                         const auto oformat_name = 
222                                 try_remove_arg<std::string>(
223                                         options_, 
224                                         boost::regex("^f|format$"));
225                         
226                         AVFormatContext* oc;
227
228                         FF(avformat_alloc_output_context2(
229                                 &oc, 
230                                 nullptr, 
231                                 oformat_name && !oformat_name->empty() ? oformat_name->c_str() : nullptr, 
232                                 path_.string().c_str()));
233
234                         oc_.reset(
235                                 oc, 
236                                 avformat_free_context);
237                                         
238                         CASPAR_VERIFY(oc_->oformat);
239
240                         oc_->interrupt_callback.callback = streaming_consumer::interrupt_cb;
241                         oc_->interrupt_callback.opaque   = this;        
242
243                         CASPAR_VERIFY(format_desc.format != core::video_format::invalid);
244
245                         in_video_format_ = format_desc;
246                                                         
247                         CASPAR_VERIFY(oc_->oformat);
248                         
249                         const auto video_codec_name = 
250                                 try_remove_arg<std::string>(
251                                         options_, 
252                                         boost::regex("^c:v|codec:v|vcodec$"));
253
254                         const auto video_codec = 
255                                 video_codec_name 
256                                         ? avcodec_find_encoder_by_name(video_codec_name->c_str())
257                                         : avcodec_find_encoder(oc_->oformat->video_codec);
258                                                 
259                         const auto audio_codec_name = 
260                                 try_remove_arg<std::string>(
261                                         options_, 
262                                          boost::regex("^c:a|codec:a|acodec$"));
263                         
264                         const auto audio_codec = 
265                                 audio_codec_name 
266                                         ? avcodec_find_encoder_by_name(audio_codec_name->c_str())
267                                         : avcodec_find_encoder(oc_->oformat->audio_codec);
268                         
269                         if (!video_codec)
270                                 BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(
271                                                 "Failed to find video codec " + (video_codec_name
272                                                                 ? *video_codec_name
273                                                                 : "with id " + boost::lexical_cast<std::string>(
274                                                                                 oc_->oformat->video_codec))));
275                         if (!audio_codec)
276                                 BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(
277                                                 "Failed to find audio codec " + (audio_codec_name
278                                                                 ? *audio_codec_name
279                                                                 : "with id " + boost::lexical_cast<std::string>(
280                                                                                 oc_->oformat->audio_codec))));
281                         
282                         // Filters
283
284                         {
285                                 configure_video_filters(
286                                         *video_codec, 
287                                         try_remove_arg<std::string>(options_, 
288                                         boost::regex("vf|f:v|filter:v")).get_value_or(""));
289
290                                 configure_audio_filters(
291                                         *audio_codec, 
292                                         try_remove_arg<std::string>(options_,
293                                         boost::regex("af|f:a|filter:a")).get_value_or(""));
294                         }
295
296                         // Bistream Filters
297                         {
298                                 configue_audio_bistream_filters(options_);
299                                 configue_video_bistream_filters(options_);
300                         }
301
302                         // Encoders
303
304                         {
305                                 auto video_options = options_;
306                                 auto audio_options = options_;
307
308                                 video_st_ = open_encoder(
309                                         *video_codec, 
310                                         video_options);
311
312                                 audio_st_ = open_encoder(
313                                         *audio_codec, 
314                                         audio_options);
315
316                                 auto it = options_.begin();
317                                 while(it != options_.end())
318                                 {
319                                         if(video_options.find(it->first) == video_options.end() || audio_options.find(it->first) == audio_options.end())
320                                                 it = options_.erase(it);
321                                         else
322                                                 ++it;
323                                 }
324                         }
325
326                         // Output
327                         {
328                                 AVDictionary* av_opts = nullptr;
329
330                                 to_dict(
331                                         &av_opts, 
332                                         std::move(options_));
333
334                                 CASPAR_SCOPE_EXIT
335                                 {
336                                         av_dict_free(&av_opts);
337                                 };
338
339                                 if (!(oc_->oformat->flags & AVFMT_NOFILE)) 
340                                 {
341                                         FF(avio_open2(
342                                                 &oc_->pb, 
343                                                 path_.string().c_str(), 
344                                                 AVIO_FLAG_WRITE, 
345                                                 &oc_->interrupt_callback, 
346                                                 &av_opts));
347                                 }
348                                 
349                                 FF(avformat_write_header(
350                                         oc_.get(), 
351                                         &av_opts));
352                                 
353                                 options_ = to_map(av_opts);
354                         }
355
356                         // Dump Info
357                         
358                         av_dump_format(
359                                 oc_.get(), 
360                                 0, 
361                                 oc_->filename, 
362                                 1);             
363
364                         for (const auto& option : options_)
365                         {
366                                 CASPAR_LOG(warning) 
367                                         << L"Invalid option: -" 
368                                         << u16(option.first) 
369                                         << L" " 
370                                         << u16(option.second);
371                         }
372                 }
373                 catch(...)
374                 {
375                         video_st_.reset();
376                         audio_st_.reset();
377                         oc_.reset();
378                         throw;
379                 }
380         }
381
382         core::monitor::subject& monitor_output() override
383         {
384                 return subject_;
385         }
386
387         std::wstring name() const override
388         {
389                 return L"streaming";
390         }
391
392         std::future<bool> send(core::const_frame frame) override
393         {               
394                 CASPAR_VERIFY(in_video_format_.format != core::video_format::invalid);
395                 
396                 --tokens_;
397                 std::shared_ptr<void> token(
398                         nullptr, 
399                         [this](void*)
400                         {
401                                 ++tokens_;
402                                 tokens_cond_.notify_one();
403                         });
404
405                 return executor_.begin_invoke([=]() -> bool
406                 {
407                         boost::unique_lock<boost::mutex> tokens_lock(tokens_mutex_);
408
409                         while(tokens_ < 0)
410                                 tokens_cond_.wait(tokens_lock);
411
412                         video_encoder_executor_.begin_invoke([=]() mutable
413                         {
414                                 encode_video(
415                                         frame, 
416                                         token);
417                         });
418                 
419                         audio_encoder_executor_.begin_invoke([=]() mutable
420                         {
421                                 encode_audio(
422                                         frame, 
423                                         token);
424                         });
425                                 
426                         return true;
427                 });
428         }
429
430         std::wstring print() const override
431         {
432                 return L"streaming_consumer[" + u16(path_.string()) + L"]";
433         }
434         
435         virtual boost::property_tree::wptree info() const override
436         {
437                 return boost::property_tree::wptree();
438         }
439
440         bool has_synchronization_clock() const override
441         {
442                 return false;
443         }
444
445         int buffer_depth() const override
446         {
447                 return -1;
448         }
449
450         int index() const override
451         {
452                 return 100000 + consumer_index_offset_;
453         }
454
455 private:
456
457         static int interrupt_cb(void* ctx)
458         {
459                 CASPAR_ASSERT(ctx);
460                 return reinterpret_cast<streaming_consumer*>(ctx)->abort_request_;              
461         }
462                 
463         std::shared_ptr<AVStream> open_encoder(
464                         const AVCodec& codec,
465                         std::map<std::string,
466                         std::string>& options)
467         {                       
468                 auto st = 
469                         avformat_new_stream(
470                                 oc_.get(), 
471                                 &codec);
472
473                 if (!st)                
474                         BOOST_THROW_EXCEPTION(caspar_exception() << msg_info("Could not allocate video-stream.") << boost::errinfo_api_function("av_new_stream"));      
475
476                 auto enc = st->codec;
477                                 
478                 CASPAR_VERIFY(enc);
479                                                 
480                 switch(enc->codec_type)
481                 {
482                         case AVMEDIA_TYPE_VIDEO:
483                         {                               
484                                 enc->time_base                    = video_graph_out_->inputs[0]->time_base;
485                                 enc->pix_fmt                      = static_cast<AVPixelFormat>(video_graph_out_->inputs[0]->format);
486                                 enc->sample_aspect_ratio  = st->sample_aspect_ratio = video_graph_out_->inputs[0]->sample_aspect_ratio;
487                                 enc->width                                = video_graph_out_->inputs[0]->w;
488                                 enc->height                               = video_graph_out_->inputs[0]->h;
489                         
490                                 break;
491                         }
492                         case AVMEDIA_TYPE_AUDIO:
493                         {
494                                 enc->time_base                    = audio_graph_out_->inputs[0]->time_base;
495                                 enc->sample_fmt                   = static_cast<AVSampleFormat>(audio_graph_out_->inputs[0]->format);
496                                 enc->sample_rate                  = audio_graph_out_->inputs[0]->sample_rate;
497                                 enc->channel_layout               = audio_graph_out_->inputs[0]->channel_layout;
498                                 enc->channels                     = audio_graph_out_->inputs[0]->channels;
499                         
500                                 break;
501                         }
502                 }
503                                                                                 
504                 if(oc_->oformat->flags & AVFMT_GLOBALHEADER)
505                         enc->flags |= CODEC_FLAG_GLOBAL_HEADER;
506                 
507                 static const std::array<std::string, 4> char_id_map = {{"v", "a", "d", "s"}};
508
509                 const auto char_id = char_id_map.at(enc->codec_type);
510                                                                 
511                 const auto codec_opts = 
512                         remove_options(
513                                 options, 
514                                 boost::regex("^(" + char_id + "?[^:]+):" + char_id + "$"));
515                 
516                 AVDictionary* av_codec_opts = nullptr;
517
518                 to_dict(
519                         &av_codec_opts, 
520                         options);
521
522                 to_dict(
523                         &av_codec_opts,
524                         codec_opts);
525
526                 options.clear();
527                 
528                 FF(avcodec_open2(
529                         enc,            
530                         &codec, 
531                         av_codec_opts ? &av_codec_opts : nullptr));             
532
533                 if(av_codec_opts)
534                 {
535                         auto t = 
536                                 av_dict_get(
537                                         av_codec_opts, 
538                                         "", 
539                                          nullptr, 
540                                         AV_DICT_IGNORE_SUFFIX);
541
542                         while(t)
543                         {
544                                 options[t->key + (codec_opts.find(t->key) != codec_opts.end() ? ":" + char_id : "")] = t->value;
545
546                                 t = av_dict_get(
547                                                 av_codec_opts, 
548                                                 "", 
549                                                 t, 
550                                                 AV_DICT_IGNORE_SUFFIX);
551                         }
552
553                         av_dict_free(&av_codec_opts);
554                 }
555                                 
556                 if(enc->codec_type == AVMEDIA_TYPE_AUDIO && !(codec.capabilities & CODEC_CAP_VARIABLE_FRAME_SIZE))
557                 {
558                         CASPAR_ASSERT(enc->frame_size > 0);
559                         av_buffersink_set_frame_size(audio_graph_out_, 
560                                                                                  enc->frame_size);
561                 }
562                 
563                 return std::shared_ptr<AVStream>(st, [this](AVStream* st)
564                 {
565                         avcodec_close(st->codec);
566                 });
567         }
568
569         void configue_audio_bistream_filters(
570                         std::map<std::string, std::string>& options)
571         {
572                 const auto audio_bitstream_filter_str = 
573                         try_remove_arg<std::string>(
574                                 options, 
575                                 boost::regex("^bsf:a|absf$"));
576
577                 const auto audio_bitstream_filter = 
578                         audio_bitstream_filter_str 
579                                 ? av_bitstream_filter_init(audio_bitstream_filter_str->c_str()) 
580                                 : nullptr;
581
582                 CASPAR_VERIFY(!audio_bitstream_filter_str || audio_bitstream_filter);
583
584                 if(audio_bitstream_filter)
585                 {
586                         audio_bitstream_filter_.reset(
587                                 audio_bitstream_filter, 
588                                 av_bitstream_filter_close);
589                 }
590                 
591                 if(audio_bitstream_filter_str && !audio_bitstream_filter_)
592                         options["bsf:a"] = *audio_bitstream_filter_str;
593         }
594         
595         void configue_video_bistream_filters(
596                         std::map<std::string, std::string>& options)
597         {
598                 const auto video_bitstream_filter_str = 
599                                 try_remove_arg<std::string>(
600                                         options, 
601                                         boost::regex("^bsf:v|vbsf$"));
602
603                 const auto video_bitstream_filter = 
604                         video_bitstream_filter_str 
605                                 ? av_bitstream_filter_init(video_bitstream_filter_str->c_str()) 
606                                 : nullptr;
607
608                 CASPAR_VERIFY(!video_bitstream_filter_str || video_bitstream_filter);
609
610                 if(video_bitstream_filter)
611                 {
612                         video_bitstream_filter_.reset(
613                                 video_bitstream_filter, 
614                                 av_bitstream_filter_close);
615                 }
616                 
617                 if(video_bitstream_filter_str && !video_bitstream_filter_)
618                         options["bsf:v"] = *video_bitstream_filter_str;
619         }
620         
621         void configure_video_filters(
622                         const AVCodec& codec,
623                         const std::string& filtergraph)
624         {
625                 video_graph_.reset(
626                                 avfilter_graph_alloc(), 
627                                 [](AVFilterGraph* p)
628                                 {
629                                         avfilter_graph_free(&p);
630                                 });
631                 
632                 video_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
633                 video_graph_->thread_type = AVFILTER_THREAD_SLICE;
634
635                 const auto sample_aspect_ratio =
636                         boost::rational<int>(
637                                         in_video_format_.square_width,
638                                         in_video_format_.square_height) /
639                         boost::rational<int>(
640                                         in_video_format_.width,
641                                         in_video_format_.height);
642                 
643                 const auto vsrc_options = (boost::format("video_size=%1%x%2%:pix_fmt=%3%:time_base=%4%/%5%:pixel_aspect=%6%/%7%:frame_rate=%8%/%9%")
644                         % in_video_format_.width % in_video_format_.height
645                         % AV_PIX_FMT_BGRA
646                         % in_video_format_.duration     % in_video_format_.time_scale
647                         % sample_aspect_ratio.numerator() % sample_aspect_ratio.denominator()
648                         % in_video_format_.time_scale % in_video_format_.duration).str();
649                                         
650                 AVFilterContext* filt_vsrc = nullptr;                   
651                 FF(avfilter_graph_create_filter(
652                                 &filt_vsrc,
653                                 avfilter_get_by_name("buffer"), 
654                                 "ffmpeg_consumer_buffer",
655                                 vsrc_options.c_str(), 
656                                 nullptr, 
657                                 video_graph_.get()));
658                                 
659                 AVFilterContext* filt_vsink = nullptr;
660                 FF(avfilter_graph_create_filter(
661                                 &filt_vsink,
662                                 avfilter_get_by_name("buffersink"), 
663                                 "ffmpeg_consumer_buffersink",
664                                 nullptr, 
665                                 nullptr, 
666                                 video_graph_.get()));
667                 
668 #pragma warning (push)
669 #pragma warning (disable : 4245)
670
671                 FF(av_opt_set_int_list(
672                                 filt_vsink, 
673                                 "pix_fmts", 
674                                 codec.pix_fmts, 
675                                 -1,
676                                 AV_OPT_SEARCH_CHILDREN));
677
678 #pragma warning (pop)
679                         
680                 configure_filtergraph(
681                                 *video_graph_, 
682                                 filtergraph,
683                                 *filt_vsrc,
684                                 *filt_vsink);
685
686                 video_graph_in_  = filt_vsrc;
687                 video_graph_out_ = filt_vsink;
688                 
689                 CASPAR_LOG(info)
690                         <<      u16(std::string("\n") 
691                                 + avfilter_graph_dump(
692                                                 video_graph_.get(), 
693                                                 nullptr));
694         }
695
696         void configure_audio_filters(
697                         const AVCodec& codec,
698                         const std::string& filtergraph)
699         {
700                 audio_graph_.reset(
701                         avfilter_graph_alloc(), 
702                         [](AVFilterGraph* p)
703                         {
704                                 avfilter_graph_free(&p);
705                         });
706                 
707                 audio_graph_->nb_threads  = boost::thread::hardware_concurrency()/2;
708                 audio_graph_->thread_type = AVFILTER_THREAD_SLICE;
709                 
710                 const auto asrc_options = (boost::format("sample_rate=%1%:sample_fmt=%2%:channels=%3%:time_base=%4%/%5%:channel_layout=%6%")
711                         % in_video_format_.audio_sample_rate
712                         % av_get_sample_fmt_name(AV_SAMPLE_FMT_S32)
713                         % in_video_format_.audio_channels
714                         % 1     % in_video_format_.audio_sample_rate
715                         % boost::io::group(
716                                 std::hex, 
717                                 std::showbase, 
718                                 av_get_default_channel_layout(in_video_format_.audio_channels))).str();
719
720                 AVFilterContext* filt_asrc = nullptr;
721                 FF(avfilter_graph_create_filter(
722                         &filt_asrc,
723                         avfilter_get_by_name("abuffer"), 
724                         "ffmpeg_consumer_abuffer",
725                         asrc_options.c_str(), 
726                         nullptr, 
727                         audio_graph_.get()));
728                                 
729                 AVFilterContext* filt_asink = nullptr;
730                 FF(avfilter_graph_create_filter(
731                         &filt_asink,
732                         avfilter_get_by_name("abuffersink"), 
733                         "ffmpeg_consumer_abuffersink",
734                         nullptr, 
735                         nullptr, 
736                         audio_graph_.get()));
737                 
738 #pragma warning (push)
739 #pragma warning (disable : 4245)
740
741                 FF(av_opt_set_int(
742                         filt_asink,        
743                         "all_channel_counts",
744                         1,      
745                         AV_OPT_SEARCH_CHILDREN));
746
747                 FF(av_opt_set_int_list(
748                         filt_asink, 
749                         "sample_fmts",           
750                         codec.sample_fmts,                              
751                         -1, 
752                         AV_OPT_SEARCH_CHILDREN));
753
754                 FF(av_opt_set_int_list(
755                         filt_asink,
756                         "channel_layouts",       
757                         codec.channel_layouts,                  
758                         -1, 
759                         AV_OPT_SEARCH_CHILDREN));
760
761                 FF(av_opt_set_int_list(
762                         filt_asink, 
763                         "sample_rates" ,         
764                         codec.supported_samplerates,    
765                         -1, 
766                         AV_OPT_SEARCH_CHILDREN));
767
768 #pragma warning (pop)
769                         
770                 configure_filtergraph(
771                         *audio_graph_, 
772                         filtergraph, 
773                         *filt_asrc, 
774                         *filt_asink);
775
776                 audio_graph_in_  = filt_asrc;
777                 audio_graph_out_ = filt_asink;
778
779                 CASPAR_LOG(info) 
780                         <<      u16(std::string("\n") 
781                                 + avfilter_graph_dump(
782                                         audio_graph_.get(), 
783                                         nullptr));
784         }
785
786         void configure_filtergraph(
787                         AVFilterGraph& graph,
788                         const std::string& filtergraph,
789                         AVFilterContext& source_ctx,
790                         AVFilterContext& sink_ctx)
791         {
792                 AVFilterInOut* outputs = nullptr;
793                 AVFilterInOut* inputs = nullptr;
794
795                 try
796                 {
797                         if(!filtergraph.empty())
798                         {
799                                 outputs = avfilter_inout_alloc();
800                                 inputs  = avfilter_inout_alloc();
801
802                                 CASPAR_VERIFY(outputs && inputs);
803
804                                 outputs->name       = av_strdup("in");
805                                 outputs->filter_ctx = &source_ctx;
806                                 outputs->pad_idx    = 0;
807                                 outputs->next       = nullptr;
808
809                                 inputs->name        = av_strdup("out");
810                                 inputs->filter_ctx  = &sink_ctx;
811                                 inputs->pad_idx     = 0;
812                                 inputs->next        = nullptr;
813
814                                 FF(avfilter_graph_parse(
815                                         &graph, 
816                                         filtergraph.c_str(), 
817                                         &inputs, 
818                                         &outputs, 
819                                         nullptr));
820                         } 
821                         else 
822                         {
823                                 FF(avfilter_link(
824                                         &source_ctx, 
825                                         0, 
826                                         &sink_ctx, 
827                                         0));
828                         }
829
830                         FF(avfilter_graph_config(
831                                 &graph, 
832                                 nullptr));
833                 }
834                 catch(...)
835                 {
836                         avfilter_inout_free(&outputs);
837                         avfilter_inout_free(&inputs);
838                         throw;
839                 }
840         }
841         
842         void encode_video(core::const_frame frame_ptr, std::shared_ptr<void> token)
843         {               
844                 if(!video_st_)
845                         return;
846
847                 auto enc = video_st_->codec;
848                         
849                 std::shared_ptr<AVFrame> src_av_frame;
850
851                 if(frame_ptr != core::const_frame::empty())
852                 {
853                         src_av_frame.reset(
854                                 av_frame_alloc(),
855                                 [frame_ptr](AVFrame* frame)
856                                 {
857                                         av_frame_free(&frame);
858                                 });
859
860                         avcodec_get_frame_defaults(src_av_frame.get());         
861                         
862                         const auto sample_aspect_ratio = 
863                                 boost::rational<int>(
864                                         in_video_format_.square_width, 
865                                         in_video_format_.square_height) /
866                                 boost::rational<int>(
867                                         in_video_format_.width, 
868                                         in_video_format_.height);
869
870                         src_av_frame->format                              = AV_PIX_FMT_BGRA;
871                         src_av_frame->width                                       = in_video_format_.width;
872                         src_av_frame->height                              = in_video_format_.height;
873                         src_av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
874                         src_av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
875                         src_av_frame->pts                                         = video_pts_;
876
877                         video_pts_ += 1;
878
879                         FF(av_image_fill_arrays(
880                                 src_av_frame->data,
881                                 src_av_frame->linesize,
882                                 frame_ptr.image_data().begin(),
883                                 static_cast<AVPixelFormat>(src_av_frame->format), 
884                                 in_video_format_.width, 
885                                 in_video_format_.height, 
886                                 1));
887
888                         FF(av_buffersrc_add_frame(
889                                 video_graph_in_, 
890                                 src_av_frame.get()));
891                 }               
892
893                 int ret = 0;
894
895                 while(ret >= 0)
896                 {
897                         std::shared_ptr<AVFrame> filt_frame(
898                                 av_frame_alloc(), 
899                                 [](AVFrame* p)
900                                 {
901                                         av_frame_free(&p);
902                                 });
903
904                         ret = av_buffersink_get_frame(
905                                 video_graph_out_, 
906                                 filt_frame.get());
907                                                 
908                         video_encoder_executor_.begin_invoke([=]
909                         {
910                                 if(ret == AVERROR_EOF)
911                                 {
912                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
913                                         {
914                                                 while(encode_av_frame(
915                                                                 *video_st_, 
916                                                                 video_bitstream_filter_.get(),
917                                                                 avcodec_encode_video2, 
918                                                                 nullptr, token))
919                                                 {
920                                                         boost::this_thread::yield(); // TODO:
921                                                 }
922                                         }               
923                                 }
924                                 else if(ret != AVERROR(EAGAIN))
925                                 {
926                                         FF_RET(ret, "av_buffersink_get_frame");
927                                         
928                                         if (filt_frame->interlaced_frame) 
929                                         {
930                                                 if (enc->codec->id == AV_CODEC_ID_MJPEG)
931                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TT : AV_FIELD_BB;
932                                                 else
933                                                         enc->field_order = filt_frame->top_field_first ? AV_FIELD_TB : AV_FIELD_BT;
934                                         } 
935                                         else
936                                                 enc->field_order = AV_FIELD_PROGRESSIVE;
937
938                                         filt_frame->quality = enc->global_quality;
939
940                                         if (!enc->me_threshold)
941                                                 filt_frame->pict_type = AV_PICTURE_TYPE_NONE;
942                         
943                                         encode_av_frame(
944                                                 *video_st_,
945                                                 video_bitstream_filter_.get(),
946                                                 avcodec_encode_video2,
947                                                 filt_frame, 
948                                                 token);
949
950                                         boost::this_thread::yield(); // TODO:
951                                 }
952                         });
953                 }
954         }
955                                         
956         void encode_audio(core::const_frame frame_ptr, std::shared_ptr<void> token)
957         {               
958                 if(!audio_st_)
959                         return;
960                 
961                 auto enc = audio_st_->codec;
962                         
963                 std::shared_ptr<AVFrame> src_av_frame;
964
965                 if(frame_ptr != core::const_frame::empty())
966                 {
967                         src_av_frame.reset(
968                                 av_frame_alloc(), 
969                                 [](AVFrame* p)
970                                 {
971                                         av_frame_free(&p);
972                                 });
973                 
974                         src_av_frame->channels           = in_video_format_.audio_channels;
975                         src_av_frame->channel_layout = av_get_default_channel_layout(in_video_format_.audio_channels);
976                         src_av_frame->sample_rate        = in_video_format_.audio_sample_rate;
977                         src_av_frame->nb_samples         = static_cast<int>(frame_ptr.audio_data().size()) / src_av_frame->channels;
978                         src_av_frame->format             = AV_SAMPLE_FMT_S32;
979                         src_av_frame->pts                        = audio_pts_;
980
981                         audio_pts_ += src_av_frame->nb_samples;
982
983                         FF(av_samples_fill_arrays(
984                                         src_av_frame->extended_data,
985                                         src_av_frame->linesize,
986                                         reinterpret_cast<const std::uint8_t*>(&*frame_ptr.audio_data().begin()),
987                                         src_av_frame->channels,
988                                         src_av_frame->nb_samples,
989                                         static_cast<AVSampleFormat>(src_av_frame->format),
990                                         16));
991                 
992                         FF(av_buffersrc_add_frame(
993                                         audio_graph_in_, 
994                                         src_av_frame.get()));
995                 }
996
997                 int ret = 0;
998
999                 while(ret >= 0)
1000                 {
1001                         std::shared_ptr<AVFrame> filt_frame(
1002                                 av_frame_alloc(), 
1003                                 [](AVFrame* p)
1004                                 {
1005                                         av_frame_free(&p);
1006                                 });
1007
1008                         ret = av_buffersink_get_frame(
1009                                 audio_graph_out_, 
1010                                 filt_frame.get());
1011                                         
1012                         audio_encoder_executor_.begin_invoke([=]
1013                         {       
1014                                 if(ret == AVERROR_EOF)
1015                                 {
1016                                         if(enc->codec->capabilities & CODEC_CAP_DELAY)
1017                                         {
1018                                                 while(encode_av_frame(
1019                                                                 *audio_st_, 
1020                                                                 audio_bitstream_filter_.get(), 
1021                                                                 avcodec_encode_audio2, 
1022                                                                 nullptr, 
1023                                                                 token))
1024                                                 {
1025                                                         boost::this_thread::yield(); // TODO:
1026                                                 }
1027                                         }
1028                                 }
1029                                 else if(ret != AVERROR(EAGAIN))
1030                                 {
1031                                         FF_RET(
1032                                                 ret, 
1033                                                 "av_buffersink_get_frame");
1034
1035                                         encode_av_frame(
1036                                                 *audio_st_, 
1037                                                 audio_bitstream_filter_.get(), 
1038                                                 avcodec_encode_audio2, 
1039                                                 filt_frame, 
1040                                                 token);
1041
1042                                         boost::this_thread::yield(); // TODO:
1043                                 }
1044                         });
1045                 }
1046         }
1047         
1048         template<typename F>
1049         bool encode_av_frame(
1050                         AVStream& st,
1051                         AVBitStreamFilterContext* bsfc, 
1052                         const F& func, 
1053                         const std::shared_ptr<AVFrame>& src_av_frame, 
1054                         std::shared_ptr<void> token)
1055         {
1056                 AVPacket pkt = {};
1057                 av_init_packet(&pkt);
1058
1059                 int got_packet = 0;
1060
1061                 FF(func(
1062                         st.codec, 
1063                         &pkt, 
1064                         src_av_frame.get(), 
1065                         &got_packet));
1066                                         
1067                 if(!got_packet || pkt.size <= 0)
1068                         return false;
1069                                 
1070                 pkt.stream_index = st.index;
1071                 
1072                 if(bsfc)
1073                 {
1074                         auto new_pkt = pkt;
1075
1076                         auto a = av_bitstream_filter_filter(
1077                                         bsfc,
1078                                         st.codec,
1079                                         nullptr,
1080                                         &new_pkt.data,
1081                                         &new_pkt.size,
1082                                         pkt.data,
1083                                         pkt.size,
1084                                         pkt.flags & AV_PKT_FLAG_KEY);
1085
1086                         if(a == 0 && new_pkt.data != pkt.data && new_pkt.destruct) 
1087                         {
1088                                 auto t = reinterpret_cast<std::uint8_t*>(av_malloc(new_pkt.size + FF_INPUT_BUFFER_PADDING_SIZE));
1089
1090                                 if(t) 
1091                                 {
1092                                         memcpy(
1093                                                 t, 
1094                                                 new_pkt.data,
1095                                                 new_pkt.size);
1096
1097                                         memset(
1098                                                 t + new_pkt.size, 
1099                                                 0, 
1100                                                 FF_INPUT_BUFFER_PADDING_SIZE);
1101
1102                                         new_pkt.data = t;
1103                                         new_pkt.buf  = nullptr;
1104                                 } 
1105                                 else
1106                                         a = AVERROR(ENOMEM);
1107                         }
1108
1109                         av_free_packet(&pkt);
1110
1111                         FF_RET(
1112                                 a, 
1113                                 "av_bitstream_filter_filter");
1114
1115                         new_pkt.buf =
1116                                 av_buffer_create(
1117                                         new_pkt.data, 
1118                                         new_pkt.size,
1119                                         av_buffer_default_free, 
1120                                         nullptr, 
1121                                         0);
1122
1123                         CASPAR_VERIFY(new_pkt.buf);
1124
1125                         pkt = new_pkt;
1126                 }
1127                 
1128                 if (pkt.pts != AV_NOPTS_VALUE)
1129                 {
1130                         pkt.pts = 
1131                                 av_rescale_q(
1132                                         pkt.pts,
1133                                         st.codec->time_base, 
1134                                         st.time_base);
1135                 }
1136
1137                 if (pkt.dts != AV_NOPTS_VALUE)
1138                 {
1139                         pkt.dts = 
1140                                 av_rescale_q(
1141                                         pkt.dts, 
1142                                         st.codec->time_base, 
1143                                         st.time_base);
1144                 }
1145                                 
1146                 pkt.duration = 
1147                         static_cast<int>(
1148                                 av_rescale_q(
1149                                         pkt.duration, 
1150                                         st.codec->time_base, st.time_base));
1151
1152                 write_packet(
1153                         std::shared_ptr<AVPacket>(
1154                                 new AVPacket(pkt), 
1155                                 [](AVPacket* p)
1156                                 {
1157                                         av_free_packet(p); 
1158                                         delete p;
1159                                 }), token);
1160
1161                 return true;
1162         }
1163
1164         void write_packet(
1165                         const std::shared_ptr<AVPacket>& pkt_ptr,
1166                         std::shared_ptr<void> token)
1167         {               
1168                 write_executor_.begin_invoke([this, pkt_ptr, token]() mutable
1169                 {
1170                         FF(av_interleaved_write_frame(
1171                                 oc_.get(), 
1172                                 pkt_ptr.get()));
1173                 });     
1174         }       
1175         
1176         template<typename T>
1177         static boost::optional<T> try_remove_arg(
1178                         std::map<std::string, std::string>& options, 
1179                         const boost::regex& expr)
1180         {
1181                 for(auto it = options.begin(); it != options.end(); ++it)
1182                 {                       
1183                         if(boost::regex_search(it->first, expr))
1184                         {
1185                                 auto arg = it->second;
1186                                 options.erase(it);
1187                                 return boost::lexical_cast<T>(arg);
1188                         }
1189                 }
1190
1191                 return boost::optional<T>();
1192         }
1193                 
1194         static std::map<std::string, std::string> remove_options(
1195                         std::map<std::string, std::string>& options, 
1196                         const boost::regex& expr)
1197         {
1198                 std::map<std::string, std::string> result;
1199                         
1200                 auto it = options.begin();
1201                 while(it != options.end())
1202                 {                       
1203                         boost::smatch what;
1204                         if(boost::regex_search(it->first, what, expr))
1205                         {
1206                                 result[
1207                                         what.size() > 0 && what[1].matched 
1208                                                 ? what[1].str() 
1209                                                 : it->first] = it->second;
1210                                 it = options.erase(it);
1211                         }
1212                         else
1213                                 ++it;
1214                 }
1215
1216                 return result;
1217         }
1218                 
1219         static void to_dict(AVDictionary** dest, const std::map<std::string, std::string>& c)
1220         {               
1221                 for (const auto& entry : c)
1222                 {
1223                         av_dict_set(
1224                                 dest, 
1225                                 entry.first.c_str(), 
1226                                 entry.second.c_str(), 0);
1227                 }
1228         }
1229
1230         static std::map<std::string, std::string> to_map(AVDictionary* dict)
1231         {
1232                 std::map<std::string, std::string> result;
1233                 
1234                 for(auto t = dict 
1235                                 ? av_dict_get(
1236                                         dict, 
1237                                         "", 
1238                                         nullptr, 
1239                                         AV_DICT_IGNORE_SUFFIX) 
1240                                 : nullptr;
1241                         t; 
1242                         t = av_dict_get(
1243                                 dict, 
1244                                 "", 
1245                                 t,
1246                                 AV_DICT_IGNORE_SUFFIX))
1247                 {
1248                         result[t->key] = t->value;
1249                 }
1250
1251                 return result;
1252         }
1253 };
1254         
1255 spl::shared_ptr<core::frame_consumer> create_streaming_consumer(
1256                 const std::vector<std::wstring>& params, core::interaction_sink*)
1257 {       
1258         if (params.size() < 1 || params.at(0) != L"STREAM")
1259                 return core::frame_consumer::empty();
1260
1261         auto path = u8(params.at(1));
1262         auto args = u8(boost::join(params, L" "));
1263
1264         return spl::make_shared<streaming_consumer>(path, args);
1265 }
1266
1267 spl::shared_ptr<core::frame_consumer> create_preconfigured_streaming_consumer(
1268                 const boost::property_tree::wptree& ptree, core::interaction_sink*)
1269 {               
1270         return spl::make_shared<streaming_consumer>(
1271                         u8(ptree.get<std::wstring>(L"path")), 
1272                         u8(ptree.get<std::wstring>(L"args", L"")));
1273 }
1274
1275 }}