1 #include "../StdAfx.h"
\r
4 #pragma warning (disable : 4244)
\r
7 #include "frame_consumer_device.h"
\r
9 #include "../video_format.h"
\r
11 #include <common/concurrency/executor.h>
\r
12 #include <common/utility/timer.h>
\r
13 #include <common/utility/assert.h>
\r
15 #include <boost/range/algorithm_ext/erase.hpp>
\r
16 #include <boost/range/algorithm.hpp>
\r
17 #include <boost/circular_buffer.hpp>
\r
19 namespace caspar { namespace core {
\r
21 struct frame_consumer_device::implementation
\r
23 static int const MAX_DEPTH = 3;
\r
27 boost::circular_buffer<safe_ptr<const read_frame>> buffer_;
\r
29 std::map<int, std::shared_ptr<frame_consumer>> consumers_; // Valid iterators after erase
\r
31 const video_format_desc fmt_;
\r
33 executor executor_;
\r
35 implementation(const video_format_desc& format_desc) : fmt_(format_desc)
\r
37 executor_.set_capacity(2);
\r
44 CASPAR_LOG(info) << "Shutting down consumer-device.";
\r
47 void add(int index, const safe_ptr<frame_consumer>& consumer)
\r
49 executor_.invoke([&]
\r
51 if(buffer_.capacity() < consumer->buffer_depth())
\r
52 buffer_.set_capacity(consumer->buffer_depth());
\r
53 consumers_[index] = consumer;
\r
57 void remove(int index)
\r
59 executor_.invoke([&]
\r
61 auto it = consumers_.find(index);
\r
62 if(it != consumers_.end())
\r
63 consumers_.erase(it);
\r
67 safe_ptr<frame_consumer> get(int index)
\r
69 return executor_.invoke([&]() -> safe_ptr<frame_consumer>
\r
71 auto it = consumers_.find(index);
\r
72 return it != consumers_.end() && it->second ? safe_ptr<frame_consumer>(it->second) : frame_consumer::empty();
\r
76 void send(const safe_ptr<const read_frame>& frame)
\r
78 executor_.begin_invoke([=]
\r
80 buffer_.push_back(std::move(frame));
\r
85 auto it = consumers_.begin();
\r
86 while(it != consumers_.end())
\r
90 it->second->send(buffer_[it->second->buffer_depth()-1]);
\r
95 CASPAR_LOG_CURRENT_EXCEPTION();
\r
96 consumers_.erase(it++);
\r
97 CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";
\r
101 clock_.tick(1.0/fmt_.fps);
\r
106 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}
\r
107 frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc) : impl_(new implementation(format_desc)){}
\r
108 void frame_consumer_device::add(int index, const safe_ptr<frame_consumer>& consumer){impl_->add(index, consumer);}
\r
109 void frame_consumer_device::remove(int index){impl_->remove(index);}
\r
110 safe_ptr<frame_consumer> frame_consumer_device::get(int index) { return impl_->get(index); }
\r
111 void frame_consumer_device::send(const safe_ptr<const read_frame>& future_frame) { impl_->send(future_frame); }
\r