]> git.sesse.net Git - casparcg/blob - core/consumer/frame_consumer_device.cpp
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
[casparcg] / core / consumer / frame_consumer_device.cpp
1 #include "../StdAfx.h"\r
2 \r
3 #ifdef _MSC_VER\r
4 #pragma warning (disable : 4244)\r
5 #endif\r
6 \r
7 #include "frame_consumer_device.h"\r
8 \r
9 #include "../video_format.h"\r
10 \r
11 #include <common/concurrency/executor.h>\r
12 #include <common/utility/timer.h>\r
13 #include <common/utility/assert.h>\r
14 \r
15 #include <boost/range/algorithm_ext/erase.hpp>\r
16 #include <boost/range/algorithm.hpp>\r
17 #include <boost/circular_buffer.hpp>\r
18 \r
19 namespace caspar { namespace core {\r
20         \r
21 struct frame_consumer_device::implementation\r
22 {\r
23         static int const MAX_DEPTH = 3;\r
24         \r
25         timer clock_;\r
26 \r
27         boost::circular_buffer<safe_ptr<const read_frame>> buffer_;\r
28 \r
29         std::map<int, std::shared_ptr<frame_consumer>> consumers_; // Valid iterators after erase\r
30         \r
31         const video_format_desc fmt_;\r
32         \r
33         executor executor_;     \r
34 public:\r
35         implementation(const video_format_desc& format_desc) : fmt_(format_desc)\r
36         {               \r
37                 executor_.set_capacity(2);\r
38                 executor_.start();\r
39         }\r
40 \r
41         ~implementation()\r
42         {\r
43                 executor_.clear();\r
44                 CASPAR_LOG(info) << "Shutting down consumer-device.";\r
45         }\r
46 \r
47         void add(int index, const safe_ptr<frame_consumer>& consumer)\r
48         {               \r
49                 executor_.invoke([&]\r
50                 {\r
51                         if(buffer_.capacity() < consumer->buffer_depth())\r
52                                 buffer_.set_capacity(consumer->buffer_depth());\r
53                         consumers_[index] = consumer;\r
54                 });\r
55         }\r
56 \r
57         void remove(int index)\r
58         {\r
59                 executor_.invoke([&]\r
60                 {\r
61                         auto it = consumers_.find(index);\r
62                         if(it != consumers_.end())\r
63                                 consumers_.erase(it);\r
64                 });\r
65         }\r
66 \r
67         safe_ptr<frame_consumer> get(int index)\r
68         {\r
69                 return executor_.invoke([&]() -> safe_ptr<frame_consumer>\r
70                 {\r
71                         auto it = consumers_.find(index);\r
72                         return it != consumers_.end() && it->second ? safe_ptr<frame_consumer>(it->second) : frame_consumer::empty();\r
73                 });\r
74         }\r
75                         \r
76         void send(const safe_ptr<const read_frame>& frame)\r
77         {               \r
78                 executor_.begin_invoke([=]\r
79                 {       \r
80                         buffer_.push_back(std::move(frame));\r
81 \r
82                         if(!buffer_.full())\r
83                                 return;\r
84         \r
85                         auto it = consumers_.begin();\r
86                         while(it != consumers_.end())\r
87                         {\r
88                                 try\r
89                                 {\r
90                                         it->second->send(buffer_[it->second->buffer_depth()-1]);\r
91                                         ++it;\r
92                                 }\r
93                                 catch(...)\r
94                                 {\r
95                                         CASPAR_LOG_CURRENT_EXCEPTION();\r
96                                         consumers_.erase(it++);\r
97                                         CASPAR_LOG(warning) << "Removed consumer from frame_consumer_device.";\r
98                                 }\r
99                         }\r
100 \r
101                         clock_.tick(1.0/fmt_.fps);\r
102                 });\r
103         }\r
104 };\r
105 \r
106 frame_consumer_device::frame_consumer_device(frame_consumer_device&& other) : impl_(std::move(other.impl_)){}\r
107 frame_consumer_device::frame_consumer_device(const video_format_desc& format_desc) : impl_(new implementation(format_desc)){}\r
108 void frame_consumer_device::add(int index, const safe_ptr<frame_consumer>& consumer){impl_->add(index, consumer);}\r
109 void frame_consumer_device::remove(int index){impl_->remove(index);}\r
110 safe_ptr<frame_consumer> frame_consumer_device::get(int index) { return impl_->get(index); }\r
111 void frame_consumer_device::send(const safe_ptr<const read_frame>& future_frame) { impl_->send(future_frame); }\r
112 }}