}).detach(); \r
}\r
\r
- bool send(const_frame frame) override {return consumer_->send(std::move(frame));}\r
+ bool send(const_frame frame) override {return consumer_->send(std::move(frame));}\r
virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);}\r
- std::wstring print() const override {return consumer_->print();} \r
- std::wstring name() const override {return consumer_->name();}\r
- boost::property_tree::wptree info() const override {return consumer_->info();}\r
- bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
- int buffer_depth() const override {return consumer_->buffer_depth();}\r
- int index() const override {return consumer_->index();}\r
+ std::wstring print() const override {return consumer_->print();} \r
+ std::wstring name() const override {return consumer_->name();}\r
+ boost::property_tree::wptree info() const override {return consumer_->info();}\r
+ bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
+ int buffer_depth() const override {return consumer_->buffer_depth();}\r
+ int index() const override {return consumer_->index();}\r
+ void subscribe(const monitor::observable::observer_ptr& o) override {consumer_->subscribe(o);}\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override {consumer_->unsubscribe(o);} \r
};\r
\r
class print_consumer_proxy : public frame_consumer\r
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
int buffer_depth() const override {return consumer_->buffer_depth();}\r
int index() const override {return consumer_->index();}\r
+ void subscribe(const monitor::observable::observer_ptr& o) override {consumer_->subscribe(o);}\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override {consumer_->unsubscribe(o);} \r
};\r
\r
class recover_consumer_proxy : public frame_consumer\r
return consumer_->initialize(format_desc, channel_index);\r
}\r
\r
- std::wstring print() const override {return consumer_->print();}\r
- std::wstring name() const override {return consumer_->name();}\r
- boost::property_tree::wptree info() const override {return consumer_->info();}\r
- bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
- int buffer_depth() const override {return consumer_->buffer_depth();}\r
- int index() const override {return consumer_->index();}\r
+ std::wstring print() const override {return consumer_->print();}\r
+ std::wstring name() const override {return consumer_->name();}\r
+ boost::property_tree::wptree info() const override {return consumer_->info();}\r
+ bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
+ int buffer_depth() const override {return consumer_->buffer_depth();}\r
+ int index() const override {return consumer_->index();}\r
+ void subscribe(const monitor::observable::observer_ptr& o) override {consumer_->subscribe(o);}\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override {consumer_->unsubscribe(o);} \r
};\r
\r
// This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.\r
return result;\r
}\r
\r
- std::wstring print() const override {return consumer_->print();}\r
- std::wstring name() const override {return consumer_->name();}\r
- boost::property_tree::wptree info() const override {return consumer_->info();}\r
- bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
- int buffer_depth() const override {return consumer_->buffer_depth();}\r
- int index() const override {return consumer_->index();}\r
+ std::wstring print() const override {return consumer_->print();}\r
+ std::wstring name() const override {return consumer_->name();}\r
+ boost::property_tree::wptree info() const override {return consumer_->info();}\r
+ bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}\r
+ int buffer_depth() const override {return consumer_->buffer_depth();}\r
+ int index() const override {return consumer_->index();}\r
+ void subscribe(const monitor::observable::observer_ptr& o) override {consumer_->subscribe(o);}\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override {consumer_->unsubscribe(o);} \r
};\r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
bool has_synchronization_clock() const override {return false;}\r
int buffer_depth() const override {return 0;};\r
virtual int index() const{return -1;}\r
+ void subscribe(const monitor::observable::observer_ptr& o) override{}\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override{}\r
boost::property_tree::wptree info() const override\r
{\r
boost::property_tree::wptree info;\r
\r
#pragma once\r
\r
+#include "../monitor/monitor.h"\r
+\r
#include <common/memory.h>\r
\r
#include <boost/property_tree/ptree_fwd.hpp>\r
namespace caspar { namespace core {\r
\r
// Interface\r
-class frame_consumer\r
+class frame_consumer : public monitor::observable\r
{\r
frame_consumer(const frame_consumer&);\r
frame_consumer& operator=(const frame_consumer&);\r
\r
virtual bool send(class const_frame frame) = 0;\r
virtual void initialize(const struct video_format_desc& format_desc, int channel_index) = 0;\r
+ \r
+ // monitor::observable\r
+\r
+ virtual void subscribe(const monitor::observable::observer_ptr& o) = 0;\r
+ virtual void unsubscribe(const monitor::observable::observer_ptr& o) = 0;\r
\r
// Properties\r
\r
\r
namespace caspar { namespace core {\r
\r
+class port : public monitor::observable\r
+{\r
+ port(const port&);\r
+ port& operator=(const port&);\r
+\r
+ monitor::basic_subject event_subject_;\r
+ std::shared_ptr<frame_consumer> consumer_;\r
+ int index_;\r
+ int channel_index_;\r
+public:\r
+ port(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer)\r
+ : event_subject_(monitor::path("port") % index)\r
+ , consumer_(std::move(consumer))\r
+ , index_(index)\r
+ , channel_index_(channel_index)\r
+ {\r
+ consumer_->subscribe(event_subject_);\r
+ }\r
+\r
+ port(port&& other)\r
+ : event_subject_(std::move(other.event_subject_))\r
+ , consumer_(std::move(other.consumer_))\r
+ , index_(other.index_)\r
+ , channel_index_(other.channel_index_)\r
+ {\r
+ }\r
+\r
+ port& operator=(port&& other)\r
+ {\r
+ event_subject_ = std::move(other.event_subject_);\r
+ consumer_ = std::move(other.consumer_);\r
+ index_ = std::move(other.index_);\r
+ channel_index_ = std::move(other.channel_index_);\r
+ }\r
+\r
+ void video_format_desc(const struct video_format_desc& format_desc)\r
+ {\r
+ consumer_->initialize(format_desc, channel_index_);\r
+ }\r
+ \r
+ bool send(class const_frame frame)\r
+ {\r
+ event_subject_ << monitor::event("type") % consumer_->name();\r
+ return consumer_->send(frame);\r
+ }\r
+ \r
+ int index() const\r
+ {\r
+ return index_;\r
+ }\r
+\r
+ int buffer_depth() const\r
+ {\r
+ return consumer_->buffer_depth();\r
+ }\r
+\r
+ bool has_synchronization_clock() const\r
+ {\r
+ return consumer_->has_synchronization_clock();\r
+ }\r
+\r
+ boost::property_tree::wptree info() const\r
+ {\r
+ return consumer_->info();\r
+ }\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ event_subject_.subscribe(o);\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ event_subject_.unsubscribe(o);\r
+ } \r
+};\r
+\r
struct output::impl\r
{ \r
- spl::shared_ptr<diagnostics::graph> graph_;\r
- const int channel_index_;\r
- video_format_desc format_desc_;\r
- std::map<int, spl::shared_ptr<frame_consumer>> consumers_; \r
- prec_timer sync_timer_;\r
- boost::circular_buffer<const_frame> frames_;\r
- executor executor_; \r
+ spl::shared_ptr<diagnostics::graph> graph_;\r
+ monitor::basic_subject event_subject_;\r
+ const int channel_index_;\r
+ video_format_desc format_desc_;\r
+ std::map<int, port> ports_; \r
+ prec_timer sync_timer_;\r
+ boost::circular_buffer<const_frame> frames_;\r
+ executor executor_; \r
public:\r
impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index) \r
: graph_(std::move(graph))\r
+ , event_subject_("output")\r
, channel_index_(channel_index)\r
, format_desc_(format_desc)\r
, executor_(L"output")\r
remove(index);\r
\r
consumer->initialize(format_desc_, channel_index_);\r
-\r
- executor_.begin_invoke([=]\r
- {\r
- consumers_.insert(std::make_pair(index, consumer));\r
- CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Added.";\r
+ \r
+ executor_.begin_invoke([this, index, consumer]\r
+ { \r
+ port p(index, channel_index_, std::move(consumer));\r
+ p.subscribe(event_subject_);\r
+ ports_.insert(std::make_pair(index, std::move(p)));\r
}, task_priority::high_priority);\r
}\r
\r
{ \r
executor_.begin_invoke([=]\r
{\r
- auto it = consumers_.find(index);\r
- if(it != consumers_.end())\r
- consumers_.erase(it); \r
+ auto it = ports_.find(index);\r
+ if(it != ports_.end())\r
+ ports_.erase(it); \r
}, task_priority::high_priority);\r
}\r
\r
if(format_desc_ == format_desc)\r
return;\r
\r
- auto it = consumers_.begin();\r
- while(it != consumers_.end())\r
+ auto it = ports_.begin();\r
+ while(it != ports_.end())\r
{ \r
try\r
{\r
- it->second->initialize(format_desc, channel_index_);\r
+ it->second.video_format_desc(format_desc);\r
++it;\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
+ ports_.erase(it++);\r
}\r
}\r
\r
\r
std::pair<int, int> minmax_buffer_depth() const\r
{ \r
- if(consumers_.empty())\r
+ if(ports_.empty())\r
return std::make_pair(0, 0);\r
\r
- auto buffer_depths = consumers_ | \r
+ auto buffer_depths = ports_ | \r
boost::adaptors::map_values | // std::function is MSVC workaround\r
- boost::adaptors::transformed(std::function<int(const spl::shared_ptr<frame_consumer>&)>([](const spl::shared_ptr<frame_consumer>& x){return x->buffer_depth();})); \r
+ boost::adaptors::transformed(std::function<int(const port&)>([](const port& p){return p.buffer_depth();})); \r
\r
\r
return std::make_pair(*boost::range::min_element(buffer_depths), *boost::range::max_element(buffer_depths));\r
\r
bool has_synchronization_clock() const\r
{\r
- return boost::range::count_if(consumers_ | boost::adaptors::map_values, [](const spl::shared_ptr<frame_consumer>& x){return x->has_synchronization_clock();}) > 0;\r
+ return boost::range::count_if(ports_ | boost::adaptors::map_values,\r
+ [](const port& p){return p.has_synchronization_clock();}) > 0;\r
}\r
\r
void operator()(const_frame input_frame, const core::video_format_desc& format_desc)\r
if(!frames_.full())\r
return;\r
\r
- for(auto it = consumers_.begin(); it != consumers_.end();)\r
+ for(auto it = ports_.begin(); it != ports_.end();)\r
{\r
- auto consumer = it->second;\r
- auto frame = frames_.at(consumer->buffer_depth()-minmax.first);\r
+ auto& port = it->second;\r
+ auto& frame = frames_.at(port.buffer_depth()-minmax.first);\r
\r
try\r
{\r
- if(consumer->send(frame))\r
+ if(port.send(frame))\r
++it;\r
else\r
- {\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
- }\r
+ ports_.erase(it++); \r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
+ ports_.erase(it++);\r
}\r
}\r
\r
return std::move(executor_.begin_invoke([&]() -> boost::property_tree::wptree\r
{ \r
boost::property_tree::wptree info;\r
- BOOST_FOREACH(auto& consumer, consumers_)\r
+ BOOST_FOREACH(auto& port, ports_ | boost::adaptors::map_values)\r
{\r
- info.add_child(L"consumers.consumer", consumer.second->info())\r
- .add(L"index", consumer.first); \r
+ info.add_child(L"consumers.consumer", port.info())\r
+ .add(L"index", port.index()); \r
}\r
return info;\r
}, task_priority::high_priority));\r
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}\r
boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}\r
void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}\r
+void output::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.subscribe(o);}\r
+void output::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.unsubscribe(o);}\r
}}
\ No newline at end of file
\r
#pragma once\r
\r
+#include "../monitor/monitor.h"\r
+\r
#include <common/forward.h>\r
#include <common/future_fwd.h>\r
#include <common/memory.h>\r
\r
namespace caspar { namespace core {\r
\r
-class output sealed\r
+class output sealed : public monitor::observable\r
{\r
output(const output&);\r
output& operator=(const output&);\r
void remove(const spl::shared_ptr<class frame_consumer>& consumer);\r
void remove(int index);\r
\r
+ // monitor::observable\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override;\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override;\r
+\r
// Properties\r
\r
boost::unique_future<boost::property_tree::wptree> info() const;\r
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); \r
graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
-\r
+ \r
+ output_.subscribe(event_subject_);\r
stage_.subscribe(event_subject_);\r
\r
executor_.begin_invoke([=]{tick();});\r
{\r
return 400 + device_index_;\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ } \r
}; \r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
{\r
return 300 + config_.device_index;\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ } \r
}; \r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params) \r
\r
struct ffmpeg_consumer : boost::noncopyable\r
{ \r
- const std::string filename_;\r
- \r
- const std::shared_ptr<AVFormatContext> oc_;\r
- const core::video_format_desc format_desc_;\r
- \r
- const spl::shared_ptr<diagnostics::graph> graph_;\r
+ const spl::shared_ptr<diagnostics::graph> graph_;\r
+ const std::string filename_; \r
+ const std::shared_ptr<AVFormatContext> oc_;\r
+ const core::video_format_desc format_desc_; \r
+\r
+ monitor::basic_subject event_subject_;\r
\r
- tbb::spin_mutex exception_mutex_;\r
- std::exception_ptr exception_;\r
+ tbb::spin_mutex exception_mutex_;\r
+ std::exception_ptr exception_;\r
\r
- std::shared_ptr<AVStream> audio_st_;\r
- std::shared_ptr<AVStream> video_st_;\r
+ std::shared_ptr<AVStream> audio_st_;\r
+ std::shared_ptr<AVStream> video_st_;\r
\r
- byte_vector picture_buffer_;\r
- byte_vector audio_buffer_;\r
- std::shared_ptr<SwrContext> swr_;\r
- std::shared_ptr<SwsContext> sws_;\r
+ byte_vector picture_buffer_;\r
+ byte_vector audio_buffer_;\r
+ std::shared_ptr<SwrContext> swr_;\r
+ std::shared_ptr<SwsContext> sws_;\r
\r
- int64_t frame_number_;\r
+ int64_t frame_number_;\r
\r
- output_format output_format_;\r
+ output_format output_format_;\r
\r
- executor executor_;\r
+ executor executor_;\r
public:\r
ffmpeg_consumer(const std::string& filename, const core::video_format_desc& format_desc, std::vector<option> options)\r
: filename_(filename)\r
\r
CASPAR_LOG(info) << print() << L" Successfully Uninitialized."; \r
}\r
+ \r
+ // frame_consumer\r
+\r
+ bool send(core::const_frame& frame)\r
+ {\r
+ auto exception = lock(exception_mutex_, [&]\r
+ {\r
+ return exception_;\r
+ });\r
+\r
+ if(exception != nullptr)\r
+ std::rethrow_exception(exception);\r
\r
+ executor_.begin_invoke([=]\r
+ { \r
+ encode(frame);\r
+ });\r
+ \r
+ return true;\r
+ }\r
+\r
std::wstring print() const\r
{\r
return L"ffmpeg[" + u16(filename_) + L"]";\r
}\r
+ \r
+ void subscribe(const monitor::observable::observer_ptr& o)\r
+ {\r
+ event_subject_.subscribe(o);\r
+ }\r
\r
+ void unsubscribe(const monitor::observable::observer_ptr& o)\r
+ {\r
+ event_subject_.unsubscribe(o);\r
+ } \r
+\r
+private:\r
std::shared_ptr<AVStream> add_video_stream(std::vector<option>& options)\r
{ \r
if(output_format_.vcodec == CODEC_ID_NONE)\r
av_frame->top_field_first = format_desc_.field_mode == core::field_mode::upper;\r
av_frame->pts = frame_number_++;\r
\r
+ event_subject_ << monitor::event("frame") % static_cast<int64_t>(frame_number_)\r
+ % static_cast<int64_t>(std::numeric_limits<int64_t>::max());\r
+\r
AVPacket pkt;\r
av_init_packet(&pkt);\r
pkt.data = nullptr;\r
});\r
}\r
}\r
-\r
- bool send(core::const_frame& frame)\r
- {\r
- auto exception = lock(exception_mutex_, [&]\r
- {\r
- return exception_;\r
- });\r
-\r
- if(exception != nullptr)\r
- std::rethrow_exception(exception);\r
- \r
- executor_.begin_invoke([=]\r
- { \r
- encode(frame);\r
- });\r
- \r
- return true;\r
- }\r
};\r
\r
struct ffmpeg_consumer_proxy : public core::frame_consumer\r
{\r
return 200;\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ consumer_->subscribe(o);\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ consumer_->unsubscribe(o);\r
+ } \r
}; \r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
{\r
{\r
return 100;\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ } \r
};\r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
{\r
return 500;\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ } \r
};\r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
{\r
return 600 + (config_.key_only ? 1 : 0);\r
}\r
+\r
+ void subscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ }\r
+\r
+ void unsubscribe(const monitor::observable::observer_ptr& o) override\r
+ {\r
+ } \r
}; \r
\r
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r
</flash>\r
<channels>\r
<channel>\r
- <video-mode>NTSC</video-mode>\r
+ <video-mode>PAL</video-mode>\r
<consumers>\r
<screen>\r
<device>1</device>\r