]> git.sesse.net Git - casparcg/blob - modules/ffmpeg/ffmpeg_pipeline_backend_internal.cpp
ffmpeg_producer: Multiple audio streams are now merged (flattened) before the audio...
[casparcg] / modules / ffmpeg / ffmpeg_pipeline_backend_internal.cpp
1 /*
2 * Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
3 *
4 * This file is part of CasparCG (www.casparcg.com).
5 *
6 * CasparCG is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * CasparCG is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * Author: Helge Norberg, helge.norberg@svt.se
20 */
21
22 #include "StdAfx.h"
23
24 #include "ffmpeg_pipeline_backend.h"
25 #include "ffmpeg_pipeline_backend_internal.h"
26 #include "producer/input/input.h"
27 #include "producer/video/video_decoder.h"
28 #include "producer/audio/audio_decoder.h"
29 #include "producer/filter/audio_filter.h"
30 #include "producer/filter/filter.h"
31 #include "producer/util/util.h"
32 #include "ffmpeg_error.h"
33 #include "ffmpeg.h"
34
35 #include <common/diagnostics/graph.h>
36 #include <common/os/general_protection_fault.h>
37 #include <common/enum_class.h>
38
39 #include <core/frame/audio_channel_layout.h>
40 #include <core/frame/frame.h>
41 #include <core/frame/frame_factory.h>
42 #include <core/video_format.h>
43
44 #include <functional>
45 #include <limits>
46 #include <queue>
47 #include <map>
48
49 #include <tbb/atomic.h>
50 #include <tbb/concurrent_queue.h>
51 #include <tbb/spin_mutex.h>
52
53 #include <boost/thread.hpp>
54 #include <boost/optional.hpp>
55 #include <boost/exception_ptr.hpp>
56
57 namespace caspar { namespace ffmpeg {
58
59 std::string to_string(const boost::rational<int>& framerate)
60 {
61         return boost::lexical_cast<std::string>(framerate.numerator())
62                 + "/" + boost::lexical_cast<std::string>(framerate.denominator()) + " (" + boost::lexical_cast<std::string>(static_cast<double>(framerate.numerator()) / static_cast<double>(framerate.denominator())) + ") fps";
63 }
64
65 std::vector<int> find_audio_cadence(const boost::rational<int>& framerate)
66 {
67         static std::map<boost::rational<int>, std::vector<int>> CADENCES_BY_FRAMERATE = []
68         {
69                 std::map<boost::rational<int>, std::vector<int>> result;
70
71                 for (core::video_format format : enum_constants<core::video_format>())
72                 {
73                         core::video_format_desc desc(format);
74                         boost::rational<int> format_rate(desc.time_scale, desc.duration);
75
76                         result.insert(std::make_pair(format_rate, desc.audio_cadence));
77                 }
78
79                 return result;
80         }();
81
82         auto exact_match = CADENCES_BY_FRAMERATE.find(framerate);
83
84         if (exact_match != CADENCES_BY_FRAMERATE.end())
85                 return exact_match->second;
86
87         boost::rational<int> closest_framerate_diff     = std::numeric_limits<int>::max();
88         boost::rational<int> closest_framerate          = 0;
89
90         for (auto format_framerate : CADENCES_BY_FRAMERATE | boost::adaptors::map_keys)
91         {
92                 auto diff = boost::abs(framerate - format_framerate);
93
94                 if (diff < closest_framerate_diff)
95                 {
96                         closest_framerate_diff  = diff;
97                         closest_framerate               = format_framerate;
98                 }
99         }
100
101         if (is_logging_quiet_for_thread())
102                 CASPAR_LOG(debug) << "No exact audio cadence match found for framerate " << to_string(framerate)
103                         << "\nClosest match is " << to_string(closest_framerate)
104                         << "\nwhich is a " << to_string(closest_framerate_diff) << " difference.";
105         else
106                 CASPAR_LOG(warning) << "No exact audio cadence match found for framerate " << to_string(framerate)
107                         << "\nClosest match is " << to_string(closest_framerate)
108                         << "\nwhich is a " << to_string(closest_framerate_diff) << " difference.";
109
110         return CADENCES_BY_FRAMERATE[closest_framerate];
111 }
112
113 struct source
114 {
115         virtual ~source() { }
116
117         virtual std::wstring                                                    print() const                                                                                   = 0;
118         virtual void                                                                    start()                                                                                                 { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
119         virtual void                                                                    graph(spl::shared_ptr<caspar::diagnostics::graph> g)    { }
120         virtual void                                                                    stop()                                                                                                  { }
121         virtual void                                                                    start_frame(std::uint32_t frame)                                                { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
122         virtual std::uint32_t                                                   start_frame() const                                                                             { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
123         virtual void                                                                    loop(bool value)                                                                                { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
124         virtual bool                                                                    loop() const                                                                                    { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
125         virtual void                                                                    length(std::uint32_t frames)                                                    { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
126         virtual std::uint32_t                                                   length() const                                                                                  { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
127         virtual std::string                                                             filename() const                                                                                { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print())); }
128         virtual void                                                                    seek(std::uint32_t frame)                                                               { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not seekable.")); }
129         virtual bool                                                                    has_audio() const                                                                               { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
130         virtual int                                                                             samplerate() const                                                                              { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
131         virtual bool                                                                    has_video() const                                                                               { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
132         virtual bool                                                                    eof() const                                                                                             { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
133         virtual boost::rational<int>                                    framerate() const                                                                               { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
134         virtual std::uint32_t                                                   frame_number() const                                                                    { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
135         virtual std::vector<std::shared_ptr<AVFrame>>   get_input_frames_for_streams(AVMediaType type)                  { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
136 };
137
138 struct no_source_selected : public source
139 {
140         std::wstring print() const override
141         {
142                 return L"[no_source_selected]";
143         }
144 };
145
146 class file_source : public source
147 {
148         std::wstring                                                            filename_;
149         spl::shared_ptr<diagnostics::graph>                     graph_;
150         std::uint32_t                                                           start_frame_    = 0;
151         std::uint32_t                                                           length_                 = std::numeric_limits<std::uint32_t>::max();
152         bool                                                                            loop_                   = false;
153         mutable boost::mutex                                            pointer_mutex_;
154         std::shared_ptr<input>                                          input_;
155         std::vector<spl::shared_ptr<audio_decoder>>     audio_decoders_;
156         std::shared_ptr<video_decoder>                          video_decoder_;
157         bool                                                                            started_                = false;
158 public:
159         file_source(std::string filename)
160                 : filename_(u16(filename))
161         {
162         }
163
164         std::wstring print() const override
165         {
166                 return L"[file_source " + filename_ + L"]";
167         }
168
169         void graph(spl::shared_ptr<caspar::diagnostics::graph> g) override
170         {
171                 graph_ = std::move(g);
172         }
173
174         void start() override
175         {
176                 boost::lock_guard<boost::mutex> lock(pointer_mutex_);
177                 bool thumbnail_mode = is_logging_quiet_for_thread();
178                 input_.reset(new input(graph_, filename_, loop_, start_frame_, length_, thumbnail_mode));
179
180                 for (int i = 0; i < input_->num_audio_streams(); ++i)
181                 {
182                         try
183                         {
184                                 audio_decoders_.push_back(spl::make_shared<audio_decoder>(*input_, core::video_format_desc(), i));
185                         }
186                         catch (...)
187                         {
188                                 if (is_logging_quiet_for_thread())
189                                 {
190                                         CASPAR_LOG_CURRENT_EXCEPTION_AT_LEVEL(debug);
191                                         CASPAR_LOG(info) << print() << " Failed to open audio-stream. Turn on log level debug to see more information.";
192                                 }
193                                 else
194                                 {
195                                         CASPAR_LOG_CURRENT_EXCEPTION();
196                                         CASPAR_LOG(warning) << print() << " Failed to open audio-stream.";
197                                 }
198                         }
199                 }
200
201                 if (audio_decoders_.empty())
202                         CASPAR_LOG(debug) << print() << " No audio-stream found. Running without audio.";
203
204                 try
205                 {
206                         video_decoder_.reset(new video_decoder(*input_, false));
207                 }
208                 catch (averror_stream_not_found&)
209                 {
210                         CASPAR_LOG(debug) << print() << " No video-stream found. Running without video.";
211                 }
212                 catch (...)
213                 {
214                         if (is_logging_quiet_for_thread())
215                         {
216                                 CASPAR_LOG_CURRENT_EXCEPTION_AT_LEVEL(debug);
217                                 CASPAR_LOG(info) << print() << " Failed to open video-stream. Running without audio. Turn on log level debug to see more information.";
218                         }
219                         else
220                         {
221                                 CASPAR_LOG_CURRENT_EXCEPTION();
222                                 CASPAR_LOG(warning) << print() << " Failed to open video-stream. Running without audio.";
223                         }
224                 }
225
226                 started_ = true;
227         }
228
229         void stop() override
230         {
231                 started_ = false;
232         }
233
234         void start_frame(std::uint32_t frame) override 
235         {
236                 start_frame_ = frame;
237
238                 auto i = get_input();
239                 if (i)
240                         i->start(frame);
241         }
242
243         std::uint32_t start_frame() const override
244         {
245                 return start_frame_;
246         }
247
248         void loop(bool value) override
249         {
250                 loop_ = value;
251
252                 auto i = get_input();
253                 if (i)
254                         i->loop(value);
255         }
256
257         bool loop() const override
258         {
259                 return loop_;
260         }
261
262         void length(std::uint32_t frames) override
263         {
264                 length_ = frames;
265
266                 auto i = get_input();
267                 if (i)
268                         i->length(frames);
269         }
270
271         std::uint32_t length() const override
272         {
273                 auto v = get_video_decoder();
274
275                 if (v)
276                         return v->nb_frames();
277
278                 auto a = get_audio_decoders();
279
280                 if (!a.empty())
281                         return a.at(0)->nb_frames(); // Should be ok.
282
283                 return length_;
284         }
285
286         std::string filename() const override
287         {
288                 return u8(filename_);
289         }
290
291         void seek(std::uint32_t frame) override
292         {
293                 expect_started();
294                 get_input()->seek(frame);
295         }
296
297         bool eof() const override
298         {
299                 auto i = get_input();
300                 return !i || i->eof();
301         }
302
303         bool has_audio() const override
304         {
305                 return !get_audio_decoders().empty();
306         }
307
308         int samplerate() const override
309         {
310                 if (get_audio_decoders().empty())
311                         return -1;
312
313                 return 48000;
314         }
315
316         bool has_video() const override
317         {
318                 return static_cast<bool>(get_video_decoder());
319         }
320
321         boost::rational<int> framerate() const override
322         {
323                 auto decoder = get_video_decoder();
324
325                 if (!decoder)
326                         return -1;
327
328                 return decoder->framerate();
329         }
330
331         std::uint32_t frame_number() const override
332         {
333                 auto decoder = get_video_decoder();
334
335                 if (!decoder)
336                         return 0;
337
338                 return decoder->file_frame_number();
339         }
340
341         std::vector<std::shared_ptr<AVFrame>> get_input_frames_for_streams(AVMediaType type) override
342         {
343                 auto a_decoders = get_audio_decoders();
344                 auto v_decoder  = get_video_decoder();
345                 expect_started();
346
347                 if (type == AVMediaType::AVMEDIA_TYPE_AUDIO && !a_decoders.empty())
348                 {
349                         std::vector<std::shared_ptr<AVFrame>> frames;
350
351                         for (auto& a_decoder : a_decoders)
352                         {
353                                 std::shared_ptr<AVFrame> frame;
354
355                                 for (int i = 0; i < 64; ++i)
356                                 {
357                                         frame = (*a_decoder)();
358
359                                         if (frame && frame->data[0])
360                                                 break;
361                                         else
362                                                 frame.reset();
363                                 }
364
365                                 frames.push_back(std::move(frame));
366                         }
367
368                         return frames;
369                 }
370                 else if (type == AVMediaType::AVMEDIA_TYPE_VIDEO && v_decoder)
371                 {
372                         std::shared_ptr<AVFrame> frame;
373
374                         for (int i = 0; i < 128; ++i)
375                         {
376                                 frame = (*v_decoder)();
377
378                                 if (frame && frame->data[0])
379                                         return { frame };
380                         }
381                 }
382                 else
383                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
384                                 print() + L" Unhandled media type " + boost::lexical_cast<std::wstring>(type)));
385
386                 return { };
387         }
388 private:
389         void expect_started() const
390         {
391                 if (!started_)
392                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" Not started."));
393         }
394
395         std::shared_ptr<input> get_input() const
396         {
397                 boost::lock_guard<boost::mutex> lock(pointer_mutex_);
398                 return input_;
399         }
400
401         std::vector<spl::shared_ptr<audio_decoder>> get_audio_decoders() const
402         {
403                 boost::lock_guard<boost::mutex> lock(pointer_mutex_);
404                 return audio_decoders_;
405         }
406
407         std::shared_ptr<video_decoder> get_video_decoder() const
408         {
409                 boost::lock_guard<boost::mutex> lock(pointer_mutex_);
410                 return video_decoder_;
411         }
412 };
413
414 class memory_source : public source
415 {
416         int                                                                                                                     samplerate_             = -1;
417         int                                                                                                                     num_channels_   = -1;
418         int                                                                                                                     width_                  = -1;
419         int                                                                                                                     height_                 = -1;
420         boost::rational<int>                                                                            framerate_              = -1;
421
422         tbb::atomic<bool>                                                                                       running_;
423         tbb::concurrent_bounded_queue<caspar::array<const int32_t>>     audio_frames_;
424         tbb::concurrent_bounded_queue<caspar::array<const uint8_t>>     video_frames_;
425         int64_t                                                                                                         audio_pts_              = 0;
426         int64_t                                                                                                         video_pts_              = 0;
427 public:
428         memory_source()
429         {
430                 running_ = false;
431                 video_frames_.set_capacity(1);
432                 audio_frames_.set_capacity(1);
433         }
434
435         ~memory_source()
436         {
437                 stop();
438         }
439
440         void graph(spl::shared_ptr<caspar::diagnostics::graph> g) override
441         {
442         }
443
444         std::wstring print() const override
445         {
446                 return L"[memory_source]";
447         }
448
449         void enable_audio(int samplerate, int num_channels)
450         {
451                 samplerate_ = samplerate;
452                 num_channels_ = num_channels;
453         }
454
455         void enable_video(int width, int height, boost::rational<int> framerate)
456         {
457                 width_ = width;
458                 height_ = height;
459         }
460
461         void start() override
462         {
463                 running_ = true;
464         }
465
466         void stop() override
467         {
468                 running_ = false;
469                 video_frames_.try_push(caspar::array<const uint8_t>());
470                 audio_frames_.try_push(caspar::array<const int32_t>());
471         }
472
473         bool has_audio() const override
474         {
475                 return samplerate_ != -1;
476         }
477
478         int samplerate() const override
479         {
480                 return samplerate_;
481         }
482
483         bool has_video() const override
484         {
485                 return width_ != -1;
486         }
487
488         bool eof() const override
489         {
490                 return !running_;
491         }
492
493         boost::rational<int> framerate() const override
494         {
495                 return framerate_;
496         }
497         
498         bool try_push_audio(caspar::array<const std::int32_t> data)
499         {
500                 if (!has_audio())
501                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" audio not enabled."));
502
503                 if (data.empty() || data.size() % num_channels_ != 0)
504                         CASPAR_THROW_EXCEPTION(invalid_argument() << msg_info(print() + L" audio with incorrect number of channels submitted."));
505
506                 return audio_frames_.try_push(std::move(data));
507         }
508
509         bool try_push_video(caspar::array<const std::uint8_t> data)
510         {
511                 if (!has_video())
512                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" video not enabled."));
513
514                 if (data.size() != width_ * height_ * 4)
515                         CASPAR_THROW_EXCEPTION(invalid_argument() << msg_info(print() + L" video with incorrect size submitted."));
516
517                 return video_frames_.try_push(std::move(data));
518         }
519
520         std::vector<std::shared_ptr<AVFrame>> get_input_frames_for_streams(AVMediaType type) override
521         {
522                 if (!running_)
523                         CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not running."));
524
525                 if (type == AVMediaType::AVMEDIA_TYPE_AUDIO && has_audio())
526                 {
527                         caspar::array<const std::int32_t> samples;
528                         audio_frames_.pop(samples);
529
530                         if (samples.empty())
531                                 return { };
532                         
533                         spl::shared_ptr<AVFrame> av_frame(av_frame_alloc(), [samples](AVFrame* p) { av_frame_free(&p); });
534
535                         av_frame->channels                      = num_channels_;
536                         av_frame->channel_layout        = av_get_default_channel_layout(num_channels_);
537                         av_frame->sample_rate           = samplerate_;
538                         av_frame->nb_samples            = static_cast<int>(samples.size()) / num_channels_;
539                         av_frame->format                        = AV_SAMPLE_FMT_S32;
540                         av_frame->pts                           = audio_pts_;
541
542                         audio_pts_ += av_frame->nb_samples;
543
544                         FF(av_samples_fill_arrays(
545                                         av_frame->extended_data,
546                                         av_frame->linesize,
547                                         reinterpret_cast<const std::uint8_t*>(&*samples.begin()),
548                                         av_frame->channels,
549                                         av_frame->nb_samples,
550                                         static_cast<AVSampleFormat>(av_frame->format),
551                                         16));
552
553                         return { av_frame };
554                 }
555                 else if (type == AVMediaType::AVMEDIA_TYPE_VIDEO && has_video())
556                 {
557                         caspar::array<const std::uint8_t> data;
558                         video_frames_.pop(data);
559
560                         if (data.empty())
561                                 return {};
562
563                         spl::shared_ptr<AVFrame> av_frame(av_frame_alloc(), [data](AVFrame* p) { av_frame_free(&p); });
564                         avcodec_get_frame_defaults(av_frame.get());             
565                         
566                         const auto sample_aspect_ratio = boost::rational<int>(width_, height_);
567
568                         av_frame->format                                  = AV_PIX_FMT_BGRA;
569                         av_frame->width                                   = width_;
570                         av_frame->height                                  = height_;
571                         av_frame->sample_aspect_ratio.num = sample_aspect_ratio.numerator();
572                         av_frame->sample_aspect_ratio.den = sample_aspect_ratio.denominator();
573                         av_frame->pts                                     = video_pts_;
574
575                         video_pts_ += 1;
576
577                         FF(av_image_fill_arrays(
578                                         av_frame->data,
579                                         av_frame->linesize,
580                                         data.begin(),
581                                         static_cast<AVPixelFormat>(av_frame->format),
582                                         width_,
583                                         height_,
584                                         1));
585
586                         return { av_frame };
587                 }
588                 else
589                         CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(
590                                 print() + L" Unhandled media type " + boost::lexical_cast<std::wstring>(type)));
591         }
592 };
593
594 struct sink
595 {
596         virtual ~sink() { }
597
598         virtual std::wstring                                    print() const                                                                                                                                   = 0;
599         virtual void                                                    graph(spl::shared_ptr<caspar::diagnostics::graph> g)                                                    { }
600         virtual void                                                    acodec(std::string codec)                                                                                                               { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not an encoder.")); }
601         virtual void                                                    vcodec(std::string codec)                                                                                                               { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not an encoder.")); }
602         virtual void                                                    format(std::string fmt)                                                                                                                 { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not an encoder.")); }
603         virtual void                                                    framerate(boost::rational<int> framerate)                                                                               { CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info(print() + L" not an encoder.")); }
604         virtual void                                                    start(bool has_audio, bool has_video)                                                                                   { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
605         virtual void                                                    stop()                                                                                                                                                  { }
606         virtual std::vector<AVSampleFormat>             supported_sample_formats() const                                                                                                { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
607         virtual std::vector<int>                                supported_samplerates() const                                                                                                   { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
608         virtual std::vector<AVPixelFormat>              supported_pixel_formats() const                                                                                                 { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
609         virtual int                                                             wanted_num_audio_streams() const                                                                                                { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
610         virtual boost::optional<int>                    wanted_num_channels_per_stream() const                                                                          { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
611         virtual boost::optional<AVMediaType>    try_push(AVMediaType type, int stream_index, spl::shared_ptr<AVFrame> frame)    { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
612         virtual void                                                    eof()                                                                                                                                                   { CASPAR_THROW_EXCEPTION(not_implemented() << msg_info(print())); }
613 };
614
615 struct no_sink_selected : public sink
616 {
617         std::wstring print() const override
618         {
619                 return L"[no_sink_selected]";
620         }
621 };
622
623 class file_sink : public sink
624 {
625         std::wstring                                            filename_;
626         spl::shared_ptr<diagnostics::graph>     graph_;
627 public:
628         file_sink(std::string filename)
629                 : filename_(u16(std::move(filename)))
630         {
631         }
632
633         std::wstring print() const override
634         {
635                 return L"[file_sink " + filename_ + L"]";
636         }
637
638         void graph(spl::shared_ptr<caspar::diagnostics::graph> g) override
639         {
640                 graph_ = std::move(g);
641         }
642 };
643
644 class memory_sink : public sink
645 {
646         spl::shared_ptr<core::frame_factory>                    factory_;
647
648         bool                                                                                    has_audio_                      = false;
649         bool                                                                                    has_video_                      = false;
650         std::vector<int>                                                                audio_cadence_;
651         core::audio_channel_layout                                              channel_layout_         = core::audio_channel_layout::invalid();
652         core::mutable_audio_buffer                                              audio_samples_;
653
654         std::queue<std::shared_ptr<AVFrame>>                    video_frames_;
655
656         tbb::concurrent_bounded_queue<core::draw_frame> output_frames_;
657         tbb::atomic<bool>                                                               running_;
658 public:
659         memory_sink(spl::shared_ptr<core::frame_factory> factory, core::video_format_desc format)
660                 : factory_(std::move(factory))
661                 , audio_cadence_(format.audio_cadence)
662         {
663                 output_frames_.set_capacity(2);
664                 running_ = false;
665                 // Note: Uses 1 step rotated cadence for 1001 modes (1602, 1602, 1601, 1602, 1601)
666                 // This cadence fills the audio mixer most optimally.
667                 boost::range::rotate(audio_cadence_, std::end(audio_cadence_) - 1);
668         }
669
670         ~memory_sink()
671         {
672                 stop();
673         }
674
675         std::wstring print() const override
676         {
677                 return L"[memory_sink]";
678         }
679
680         void graph(spl::shared_ptr<caspar::diagnostics::graph> g) override
681         {
682         }
683
684         void framerate(boost::rational<int> framerate) override
685         {
686                 audio_cadence_ = find_audio_cadence(framerate);
687                 // Note: Uses 1 step rotated cadence for 1001 modes (1602, 1602, 1601, 1602, 1601)
688                 // This cadence fills the audio mixer most optimally.
689                 boost::range::rotate(audio_cadence_, std::end(audio_cadence_) - 1);
690         }
691
692         void start(bool has_audio, bool has_video) override
693         {
694                 has_audio_      = has_audio;
695                 has_video_      = has_video;
696                 running_        = true;
697         }
698
699         void stop() override
700         {
701                 running_ = false;
702                 try_pop_frame();
703                 try_pop_frame();
704         }
705
706         std::vector<AVSampleFormat> supported_sample_formats() const override
707         {
708                 return { AVSampleFormat::AV_SAMPLE_FMT_S32 };
709         }
710
711         std::vector<int> supported_samplerates() const override {
712                 return { 48000 };
713         }
714
715         std::vector<AVPixelFormat> supported_pixel_formats() const override
716         {
717                 return {
718                         AVPixelFormat::AV_PIX_FMT_YUVA420P,
719                         AVPixelFormat::AV_PIX_FMT_YUV444P,
720                         AVPixelFormat::AV_PIX_FMT_YUV422P,
721                         AVPixelFormat::AV_PIX_FMT_YUV420P,
722                         AVPixelFormat::AV_PIX_FMT_YUV411P,
723                         AVPixelFormat::AV_PIX_FMT_BGRA,
724                         AVPixelFormat::AV_PIX_FMT_ARGB,
725                         AVPixelFormat::AV_PIX_FMT_RGBA,
726                         AVPixelFormat::AV_PIX_FMT_ABGR,
727                         AVPixelFormat::AV_PIX_FMT_GRAY8
728                 };
729         }
730
731         int wanted_num_audio_streams() const override
732         {
733                 return 1;
734         }
735
736         boost::optional<int> wanted_num_channels_per_stream() const
737         {
738                 return boost::none;
739         }
740
741         boost::optional<AVMediaType> try_push(AVMediaType type, int stream_index, spl::shared_ptr<AVFrame> av_frame) override
742         {
743                 if (!has_audio_ && !has_video_)
744                         CASPAR_THROW_EXCEPTION(invalid_operation());
745
746                 if (type == AVMediaType::AVMEDIA_TYPE_AUDIO && av_frame->data[0])
747                 {
748                         if (channel_layout_ == core::audio_channel_layout::invalid()) // First audio
749                         {
750                                 channel_layout_ = get_audio_channel_layout(av_frame->channels, av_frame->channel_layout, L"");
751
752                                 // Insert silence samples so that the audio mixer is guaranteed to be filled.
753                                 auto min_num_samples_per_frame  = *boost::min_element(audio_cadence_);
754                                 auto max_num_samples_per_frame  = *boost::max_element(audio_cadence_);
755                                 auto cadence_safety_samples             = max_num_samples_per_frame - min_num_samples_per_frame;
756                                 audio_samples_.resize(channel_layout_.num_channels * cadence_safety_samples, 0);
757                         }
758
759                         auto ptr = reinterpret_cast<int32_t*>(av_frame->data[0]);
760
761                         audio_samples_.insert(audio_samples_.end(), ptr, ptr + av_frame->linesize[0] / sizeof(int32_t));
762                 }
763                 else if (type == AVMediaType::AVMEDIA_TYPE_VIDEO)
764                 {
765                         video_frames_.push(std::move(av_frame));
766                 }
767
768                 while (true)
769                 {
770                         bool enough_audio =
771                                 !has_audio_ ||
772                                 (channel_layout_ != core::audio_channel_layout::invalid() && audio_samples_.size() >= audio_cadence_.front() * channel_layout_.num_channels);
773                         bool enough_video =
774                                 !has_video_ ||
775                                 !video_frames_.empty();
776
777                         if (!enough_audio)
778                                 return AVMediaType::AVMEDIA_TYPE_AUDIO;
779
780                         if (!enough_video)
781                                 return AVMediaType::AVMEDIA_TYPE_VIDEO;
782
783                         core::mutable_audio_buffer audio_data;
784
785                         if (has_audio_)
786                         {
787                                 auto begin = audio_samples_.begin();
788                                 auto end = begin + audio_cadence_.front() * channel_layout_.num_channels;
789
790                                 audio_data.insert(audio_data.begin(), begin, end);
791                                 audio_samples_.erase(begin, end);
792                                 boost::range::rotate(audio_cadence_, std::begin(audio_cadence_) + 1);
793                         }
794
795                         if (!has_video_) // Audio only
796                         {
797                                 core::mutable_frame audio_only_frame(
798                                                 { },
799                                                 std::move(audio_data),
800                                                 this,
801                                                 core::pixel_format_desc(core::pixel_format::invalid),
802                                                 channel_layout_);
803
804                                 output_frames_.push(core::draw_frame(std::move(audio_only_frame)));
805
806                                 return AVMediaType::AVMEDIA_TYPE_AUDIO;
807                         }
808
809                         auto output_frame = make_frame(this, spl::make_shared_ptr(video_frames_.front()), *factory_, channel_layout_);
810                         video_frames_.pop();
811                         output_frame.audio_data() = std::move(audio_data);
812
813                         output_frames_.push(core::draw_frame(std::move(output_frame)));
814                 }
815         }
816
817         void eof() override
818         {
819                 // Drain rest, regardless of it being enough or not.
820                 while (!video_frames_.empty() || !audio_samples_.empty())
821                 {
822                         core::mutable_audio_buffer audio_data;
823
824                         audio_data.swap(audio_samples_);
825
826                         if (!video_frames_.empty())
827                         {
828                                 auto output_frame = make_frame(this, spl::make_shared_ptr(video_frames_.front()), *factory_, channel_layout_);
829                                 video_frames_.pop();
830                                 output_frame.audio_data() = std::move(audio_data);
831
832                                 output_frames_.push(core::draw_frame(std::move(output_frame)));
833                         }
834                         else
835                         {
836                                 core::mutable_frame audio_only_frame(
837                                                 {},
838                                                 std::move(audio_data),
839                                                 this,
840                                                 core::pixel_format_desc(core::pixel_format::invalid),
841                                                 channel_layout_);
842
843                                 output_frames_.push(core::draw_frame(std::move(audio_only_frame)));
844                                 output_frames_.push(core::draw_frame::empty());
845                         }
846                 }
847         }
848
849         core::draw_frame try_pop_frame()
850         {
851                 core::draw_frame frame = core::draw_frame::late();
852
853                 if (!output_frames_.try_pop(frame) && !running_)
854                         return core::draw_frame::empty();
855
856                 return frame;
857         }
858 };
859
860 struct audio_stream_info
861 {
862         int                             num_channels = 0;
863         AVSampleFormat  sampleformat = AVSampleFormat::AV_SAMPLE_FMT_NONE;
864 };
865
866 struct video_stream_info
867 {
868         int                                     width           = 0;
869         int                                     height          = 0;
870         AVPixelFormat           pixelformat     = AVPixelFormat::AV_PIX_FMT_NONE;
871         core::field_mode        fieldmode       = core::field_mode::progressive;
872 };
873
874 class ffmpeg_pipeline_backend_internal : public ffmpeg_pipeline_backend
875 {
876         spl::shared_ptr<diagnostics::graph>                                                             graph_;
877
878         spl::unique_ptr<source>                                                                                 source_                                 = spl::make_unique<no_source_selected>();
879         std::function<bool (caspar::array<const std::int32_t> data)>    try_push_audio_;
880         std::function<bool (caspar::array<const std::uint8_t> data)>    try_push_video_;
881
882         std::vector<audio_stream_info>                                                                  source_audio_streams_;
883         video_stream_info                                                                                               source_video_stream_;
884
885         std::string                                                                                                             afilter_;
886         std::unique_ptr<audio_filter>                                                                   audio_filter_;
887         std::string                                                                                                             vfilter_;
888         std::unique_ptr<filter>                                                                                 video_filter_;
889
890         spl::unique_ptr<sink>                                                                                   sink_                                   = spl::make_unique<no_sink_selected>();
891         std::function<core::draw_frame ()>                                                              try_pop_frame_;
892
893         tbb::atomic<bool>                                                                                               started_;
894         tbb::spin_mutex                                                                                                 exception_mutex_;
895         boost::exception_ptr                                                                                    exception_;
896         boost::thread                                                                                                   thread_;
897 public:
898         ffmpeg_pipeline_backend_internal()
899         {
900                 started_ = false;
901                 diagnostics::register_graph(graph_);
902         }
903
904         ~ffmpeg_pipeline_backend_internal()
905         {
906                 stop();
907         }
908
909         void throw_if_error()
910         {
911                 boost::lock_guard<tbb::spin_mutex> lock(exception_mutex_);
912
913                 if (exception_ != nullptr)
914                         boost::rethrow_exception(exception_);
915         }
916
917         void graph(spl::shared_ptr<caspar::diagnostics::graph> g) override
918         {
919                 graph_ = std::move(g);
920                 source_->graph(graph_);
921                 sink_->graph(graph_);
922         }
923
924         // Source setup
925
926         void from_file(std::string filename) override
927         {
928                 source_                 = spl::make_unique<file_source>(std::move(filename));
929                 try_push_audio_ = std::function<bool (caspar::array<const std::int32_t>)>();
930                 try_push_video_ = std::function<bool (caspar::array<const std::uint8_t>)>();
931                 source_->graph(graph_);
932         }
933
934         void from_memory_only_audio(int num_channels, int samplerate) override
935         {
936                 auto source             = spl::make_unique<memory_source>();
937                 auto source_ptr = source.get();
938                 try_push_audio_ = [this, source_ptr](caspar::array<const std::int32_t> data) { return source_ptr->try_push_audio(std::move(data)); };
939                 source->enable_audio(samplerate, num_channels);
940
941                 source_ = std::move(source);
942                 source_->graph(graph_);
943         }
944
945         void from_memory_only_video(int width, int height, boost::rational<int> framerate) override
946         {
947                 auto source             = spl::make_unique<memory_source>();
948                 auto source_ptr = source.get();
949                 try_push_video_ = [this, source_ptr](caspar::array<const std::uint8_t> data) { return source_ptr->try_push_video(std::move(data)); };
950                 source->enable_video(width, height, std::move(framerate));
951
952                 source_ = std::move(source);
953                 source_->graph(graph_);
954         }
955
956         void from_memory(int num_channels, int samplerate, int width, int height, boost::rational<int> framerate) override
957         {
958                 auto source             = spl::make_unique<memory_source>();
959                 auto source_ptr = source.get();
960                 try_push_audio_ = [this, source_ptr](caspar::array<const std::int32_t> data) { return source_ptr->try_push_audio(std::move(data)); };
961                 try_push_video_ = [this, source_ptr](caspar::array<const std::uint8_t> data) { return source_ptr->try_push_video(std::move(data)); };
962                 source->enable_audio(samplerate, num_channels);
963                 source->enable_video(width, height, std::move(framerate));
964
965                 source_ = std::move(source);
966                 source_->graph(graph_);
967         }
968
969         void                    start_frame(std::uint32_t frame) override       { source_->start_frame(frame);          }
970         std::uint32_t   start_frame() const override                            { return source_->start_frame();        }
971         void                    length(std::uint32_t frames) override           { source_->length(frames);                      }
972         std::uint32_t   length() const override                                         { return source_->length();                     }
973         void                    seek(std::uint32_t frame) override                      { source_->seek(frame);                         }
974         void                    loop(bool value) override                                       { source_->loop(value);                         }
975         bool                    loop() const override                                           { return source_->loop();                       }
976         std::string             source_filename() const override                        { return source_->filename();           }
977
978         // Filter setup
979
980         void vfilter(std::string filter) override
981         {
982                 vfilter_ = std::move(filter);
983         }
984
985         void afilter(std::string filter) override
986         {
987                 afilter_ = std::move(filter);
988         }
989
990         int width() const override
991         {
992                 return source_video_stream_.width;
993         }
994
995         int height() const override
996         {
997                 return source_video_stream_.height;
998         }
999
1000         boost::rational<int> framerate() const override
1001         {
1002                 bool double_rate = filter::is_double_rate(u16(vfilter_));
1003
1004                 return double_rate ? source_->framerate() * 2 : source_->framerate();
1005         }
1006
1007         bool progressive() const override
1008         {
1009                 return true;//TODO
1010         }
1011
1012         // Sink setup
1013
1014         void to_memory(spl::shared_ptr<core::frame_factory> factory, core::video_format_desc format) override
1015         {
1016                 auto sink               = spl::make_unique<memory_sink>(std::move(factory), std::move(format));
1017                 auto sink_ptr   = sink.get();
1018                 try_pop_frame_  = [sink_ptr] { return sink_ptr->try_pop_frame(); };
1019
1020                 sink_ = std::move(sink);
1021                 sink_->graph(graph_);
1022         }
1023
1024         void to_file(std::string filename) override
1025         {
1026                 sink_                   = spl::make_unique<file_sink>(std::move(filename));
1027                 try_pop_frame_  = std::function<core::draw_frame ()>();
1028                 sink_->graph(graph_);
1029         }
1030
1031         void acodec(std::string codec) override { sink_->acodec(std::move(codec)); }
1032         void vcodec(std::string codec) override { sink_->vcodec(std::move(codec)); }
1033         void format(std::string fmt) override   { sink_->format(std::move(fmt)); }
1034
1035         // Runtime control
1036
1037         void start() override
1038         {
1039                 source_->start();
1040                 sink_->start(source_->has_audio(), source_->has_video());
1041                 started_ = true;
1042                 bool quiet = is_logging_quiet_for_thread();
1043
1044                 thread_ = boost::thread([=] { run(quiet); });
1045         }
1046
1047         bool try_push_audio(caspar::array<const std::int32_t> data) override
1048         {
1049                 throw_if_error();
1050
1051                 if (try_push_audio_)
1052                         return try_push_audio_(std::move(data));
1053                 else
1054                         return false;
1055         }
1056
1057         bool try_push_video(caspar::array<const std::uint8_t> data) override
1058         {
1059                 throw_if_error();
1060
1061                 if (try_push_video_)
1062                         return try_push_video_(std::move(data));
1063                 else
1064                         return false;
1065         }
1066
1067         core::draw_frame try_pop_frame() override
1068         {
1069                 throw_if_error();
1070
1071                 if (!try_pop_frame_)
1072                         CASPAR_THROW_EXCEPTION(invalid_operation());
1073
1074                 return try_pop_frame_();
1075         }
1076
1077         std::uint32_t last_frame() const override
1078         {
1079                 return source_->frame_number();
1080         }
1081
1082         bool started() const override
1083         {
1084                 return started_;
1085         }
1086
1087         void stop() override
1088         {
1089                 started_ = false;
1090
1091                 sink_->stop();
1092                 source_->stop();
1093
1094                 if (thread_.joinable())
1095                         thread_.join();
1096         }
1097
1098 private:
1099         void run(bool quiet)
1100         {
1101                 ensure_gpf_handler_installed_for_thread(u8(L"ffmpeg-pipeline: " + source_->print() + L" -> " + sink_->print()).c_str());
1102                 auto quiet_logging = temporary_enable_quiet_logging_for_thread(quiet);
1103
1104                 try
1105                 {
1106                         boost::optional<AVMediaType> result = source_->has_audio() ? AVMediaType::AVMEDIA_TYPE_AUDIO : AVMediaType::AVMEDIA_TYPE_VIDEO;
1107
1108                         while (started_ && (source_->has_audio() || source_->has_video()))
1109                         {
1110                                 auto needed                                             = *result;
1111                                 auto input_frames_for_streams   = source_->get_input_frames_for_streams(needed);
1112
1113                                 if (!input_frames_for_streams.empty() && input_frames_for_streams.at(0))
1114                                 {
1115                                         for (int input_stream_index = 0; input_stream_index < input_frames_for_streams.size(); ++input_stream_index)
1116                                         {
1117                                                 if (needed == AVMediaType::AVMEDIA_TYPE_AUDIO)
1118                                                 {
1119                                                         initialize_audio_filter_if_needed(input_frames_for_streams);
1120                                                         audio_filter_->push(input_stream_index, std::move(input_frames_for_streams.at(input_stream_index)));
1121
1122                                                         for (int output_stream_index = 0; output_stream_index < sink_->wanted_num_audio_streams(); ++output_stream_index)
1123                                                                 for (auto filtered_frame : audio_filter_->poll_all(output_stream_index))
1124                                                                         result = sink_->try_push(AVMediaType::AVMEDIA_TYPE_AUDIO, output_stream_index, std::move(filtered_frame));
1125                                                 }
1126                                                 else if (needed == AVMediaType::AVMEDIA_TYPE_VIDEO)
1127                                                 {
1128                                                         initialize_video_filter_if_needed(*input_frames_for_streams.at(input_stream_index));
1129                                                         video_filter_->push(std::move(input_frames_for_streams.at(input_stream_index)));
1130
1131                                                         for (auto filtered_frame : video_filter_->poll_all())
1132                                                                 result = sink_->try_push(AVMediaType::AVMEDIA_TYPE_VIDEO, 0, std::move(filtered_frame));
1133                                                 }
1134                                                 else
1135                                                         CASPAR_THROW_EXCEPTION(not_supported());
1136                                         }
1137                                 }
1138                                 else if (source_->eof())
1139                                 {
1140                                         started_ = false;
1141                                         sink_->eof();
1142                                         break;
1143                                 }
1144                                 else
1145                                         result = boost::none;
1146
1147                                 if (!result)
1148                                 {
1149                                         graph_->set_tag(caspar::diagnostics::tag_severity::WARNING, "dropped-frame");
1150                                         result = needed; // Repeat same media type
1151                                 }
1152                         }
1153                 }
1154                 catch (...)
1155                 {
1156                         if (is_logging_quiet_for_thread())
1157                         {
1158                                 CASPAR_LOG_CURRENT_EXCEPTION_AT_LEVEL(debug);
1159                         }
1160                         else
1161                         {
1162                                 CASPAR_LOG_CURRENT_EXCEPTION();
1163                         }
1164
1165                         boost::lock_guard<tbb::spin_mutex> lock(exception_mutex_);
1166                         exception_ = boost::current_exception();
1167                 }
1168
1169                 video_filter_.reset();
1170                 audio_filter_.reset();
1171                 source_->stop();
1172                 sink_->stop();
1173                 started_ = false;
1174         }
1175
1176         template<typename T>
1177         void set_if_changed(bool& changed, T& old_value, T new_value)
1178         {
1179                 if (old_value != new_value)
1180                 {
1181                         changed = true;
1182                         old_value = new_value;
1183                 }
1184         }
1185
1186         void initialize_audio_filter_if_needed(const std::vector<std::shared_ptr<AVFrame>>& av_frames_per_stream)
1187         {
1188                 bool changed = av_frames_per_stream.size() != source_audio_streams_.size();
1189                 source_audio_streams_.resize(av_frames_per_stream.size());
1190
1191                 for (int i = 0; i < av_frames_per_stream.size(); ++i)
1192                 {
1193                         auto& av_frame  = *av_frames_per_stream.at(i);
1194                         auto& stream    = source_audio_streams_.at(i);
1195
1196                         set_if_changed(changed, stream.sampleformat, static_cast<AVSampleFormat>(av_frame.format));
1197                         set_if_changed(changed, stream.num_channels, av_frame.channels);
1198                 }
1199
1200                 if (changed)
1201                         initialize_audio_filter();
1202         }
1203
1204         void initialize_audio_filter()
1205         {
1206                 std::vector<audio_input_pad> input_pads;
1207                 std::vector<audio_output_pad> output_pads;
1208
1209                 for (auto& source_audio_stream : source_audio_streams_)
1210                 {
1211                         input_pads.emplace_back(
1212                                         boost::rational<int>(1, source_->samplerate()),
1213                                         source_->samplerate(),
1214                                         source_audio_stream.sampleformat,
1215                                         av_get_default_channel_layout(source_audio_stream.num_channels));
1216                 }
1217
1218                 auto total_num_channels = cpplinq::from(source_audio_streams_)
1219                                 .select([](const audio_stream_info& info) { return info.num_channels; })
1220                                 .aggregate(0, std::plus<int>());
1221
1222                 if (total_num_channels > 1 && sink_->wanted_num_audio_streams() > 1)
1223                         CASPAR_THROW_EXCEPTION(invalid_operation()
1224                                         << msg_info("only one-to-many or many-to-one audio stream conversion supported."));
1225
1226                 std::wstring amerge;
1227
1228                 if (sink_->wanted_num_audio_streams() == 1 && !sink_->wanted_num_channels_per_stream())
1229                 {
1230                         output_pads.emplace_back(
1231                                         sink_->supported_samplerates(),
1232                                         sink_->supported_sample_formats(),
1233                                         std::vector<int64_t>({ av_get_default_channel_layout(total_num_channels) }));
1234
1235                         if (source_audio_streams_.size() > 1)
1236                         {
1237                                 for (int i = 0; i < source_audio_streams_.size(); ++i)
1238                                         amerge += L"[a:" + boost::lexical_cast<std::wstring>(i) + L"]";
1239
1240                                 amerge += L"amerge=inputs=" + boost::lexical_cast<std::wstring>(source_audio_streams_.size());
1241                         }
1242                 }
1243
1244                 std::wstring afilter = u16(afilter_);
1245
1246                 if (!amerge.empty())
1247                 {
1248                         afilter = prepend_filter(u16(afilter), amerge);
1249                         afilter += L"[aout:0]";
1250                 }
1251
1252                 audio_filter_.reset(new audio_filter(input_pads, output_pads, u8(afilter)));
1253         }
1254
1255         void initialize_video_filter_if_needed(const AVFrame& av_frame)
1256         {
1257                 bool changed = false;
1258
1259                 set_if_changed(changed, source_video_stream_.width, av_frame.width);
1260                 set_if_changed(changed, source_video_stream_.height, av_frame.height);
1261                 set_if_changed(changed, source_video_stream_.pixelformat, static_cast<AVPixelFormat>(av_frame.format));
1262
1263                 core::field_mode field_mode = core::field_mode::progressive;
1264
1265                 if (av_frame.interlaced_frame)
1266                         field_mode = av_frame.top_field_first ? core::field_mode::upper : core::field_mode::lower;
1267
1268                 set_if_changed(changed, source_video_stream_.fieldmode, field_mode);
1269
1270                 if (changed)
1271                         initialize_video_filter();
1272         }
1273
1274         void initialize_video_filter()
1275         {
1276                 if (source_video_stream_.fieldmode != core::field_mode::progressive && !filter::is_deinterlacing(u16(vfilter_)))
1277                         vfilter_ = u8(append_filter(u16(vfilter_), L"YADIF=1:-1"));
1278
1279                 if (source_video_stream_.height == 480) // NTSC DV
1280                 {
1281                         auto pad_str = L"PAD=" + boost::lexical_cast<std::wstring>(source_video_stream_.width) + L":486:0:2:black";
1282                         vfilter_ = u8(append_filter(u16(vfilter_), pad_str));
1283                 }
1284
1285                 video_filter_.reset(new filter(
1286                                 source_video_stream_.width,
1287                                 source_video_stream_.height,
1288                                 1 / source_->framerate(),
1289                                 source_->framerate(),
1290                                 boost::rational<int>(1, 1), // TODO
1291                                 source_video_stream_.pixelformat,
1292                                 sink_->supported_pixel_formats(),
1293                                 vfilter_));
1294                 sink_->framerate(framerate());
1295         }
1296 };
1297
1298 spl::shared_ptr<struct ffmpeg_pipeline_backend> create_internal_pipeline()
1299 {
1300         return spl::make_shared<ffmpeg_pipeline_backend_internal>();
1301 }
1302
1303 }}