bool auto_reset_;\r
\r
impl()\r
+ : auto_reset_(false)\r
{\r
}\r
\r
#include "monitor.h"
-namespace caspar { namespace core {
-
-}}
\ No newline at end of file
+namespace caspar { namespace core { namespace monitor {
+
+/*class in_callers_thread_schedule_group : public Concurrency::ScheduleGroup
+{
+ virtual void ScheduleTask(Concurrency::TaskProc proc, void* data) override
+ {
+ proc(data);
+ }
+
+ virtual unsigned int Id() const override
+ {
+ return 1;
+ }
+
+ virtual unsigned int Reference() override
+ {
+ return 1;
+ }
+
+ virtual unsigned int Release() override
+ {
+ return 1;
+ }
+};
+
+Concurrency::ScheduleGroup& get_in_callers_thread_schedule_group()
+{
+ static in_callers_thread_schedule_group group;
+
+ return group;
+}*/
+
+}}}
#include <string>
#include <vector>
-#include <agents.h>
-
namespace caspar { namespace core { namespace monitor {
typedef boost::variant<bool,
safe_ptr<std::vector<data_t>> data_ptr_;
};
-class subject : public Concurrency::transformer<monitor::message, monitor::message>
+struct sink
+{
+ virtual ~sink() { }
+
+ virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
{
+private:
+ std::weak_ptr<sink> parent_;
+ const std::string path_;
public:
subject(std::string path = "")
- : Concurrency::transformer<monitor::message, monitor::message>([=](const message& msg)
- {
- return msg.propagate(path);
- })
+ : path_(std::move(path))
{
CASPAR_ASSERT(path.empty() || path[0] == '/');
}
- template<typename T>
- subject& operator<<(T&& msg)
+ void attach_parent(const safe_ptr<sink>& parent)
+ {
+ parent_ = parent;
+ }
+
+ void detach_parent()
{
- Concurrency::send(*this, std::forward<T>(msg));
+ parent_.reset();
+ }
+
+ subject& operator<<(const message& msg)
+ {
+ propagate(msg);
+
return *this;
}
-};
-typedef Concurrency::ISource<monitor::message> source;
+ virtual void propagate(const message& msg) override
+ {
+ auto parent = parent_.lock();
+
+ if (parent)
+ parent->propagate(msg.propagate(path_));
+ }
+};
-}}}
\ No newline at end of file
+}}}
return info;\r
}\r
\r
- monitor::source& monitor_output() \r
+ monitor::subject& monitor_output() \r
{\r
return monitor_subject_;\r
}\r
return info;\r
}\r
\r
- monitor::source& monitor_output()\r
+ monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
virtual safe_ptr<frame_producer> get_following_producer() const override {return (*producer_)->get_following_producer();}\r
virtual void set_leading_producer(const safe_ptr<frame_producer>& producer) override {(*producer_)->set_leading_producer(producer);}\r
virtual uint32_t nb_frames() const override {return (*producer_)->nb_frames();}\r
- virtual monitor::source& monitor_output() {return (*producer_)->monitor_output();}\r
+ virtual monitor::subject& monitor_output() {return (*producer_)->monitor_output();}\r
};\r
\r
safe_ptr<core::frame_producer> create_producer_destroy_proxy(safe_ptr<core::frame_producer> producer)\r
virtual safe_ptr<frame_producer> get_following_producer() const override {return (producer_)->get_following_producer();}\r
virtual void set_leading_producer(const safe_ptr<frame_producer>& producer) override {(producer_)->set_leading_producer(producer);}\r
virtual uint32_t nb_frames() const override {return (producer_)->nb_frames();}\r
- virtual monitor::source& monitor_output() {return (producer_)->monitor_output();}\r
+ virtual monitor::subject& monitor_output() {return (producer_)->monitor_output();}\r
};\r
\r
safe_ptr<core::frame_producer> create_producer_print_proxy(safe_ptr<core::frame_producer> producer)\r
info.add(L"type", L"last-frame-producer");\r
return info;\r
}\r
- virtual monitor::source& monitor_output()\r
+ virtual monitor::subject& monitor_output()\r
{\r
static monitor::subject monitor_subject("");\r
return monitor_subject;\r
return info;\r
}\r
\r
- virtual monitor::source& monitor_output()\r
+ virtual monitor::subject& monitor_output()\r
{\r
static monitor::subject monitor_subject("");\r
return monitor_subject;\r
\r
static const safe_ptr<frame_producer>& empty(); // nothrow\r
\r
- virtual monitor::source& monitor_output() = 0;\r
+ virtual monitor::subject& monitor_output() = 0;\r
};\r
\r
safe_ptr<basic_frame> receive_and_follow(safe_ptr<frame_producer>& producer, int hints);\r
int32_t auto_play_delta_;\r
bool is_paused_;\r
int64_t current_frame_age_;\r
- monitor::subject monitor_subject_;\r
+ safe_ptr<monitor::subject> monitor_subject_;\r
\r
public:\r
implementation(int index) \r
, frame_number_(0)\r
, auto_play_delta_(-1)\r
, is_paused_(false)\r
- , monitor_subject_("/layer/" + boost::lexical_cast<std::string>(index))\r
+ , monitor_subject_(make_safe<monitor::subject>("/layer/" + boost::lexical_cast<std::string>(index)))\r
{\r
}\r
\r
{ \r
try\r
{\r
- monitor_subject_ << monitor::message("/paused") % is_paused_;\r
+ *monitor_subject_ << monitor::message("/paused") % is_paused_;\r
\r
if(is_paused_)\r
{\r
\r
void set_foreground(safe_ptr<core::frame_producer> producer)\r
{\r
- foreground_->monitor_output().unlink_target(&monitor_subject_);\r
+ foreground_->monitor_output().detach_parent();\r
foreground_ = producer;\r
- foreground_->monitor_output().link_target(&monitor_subject_);\r
+ foreground_->monitor_output().attach_parent(monitor_subject_);\r
}\r
};\r
\r
boost::unique_future<std::wstring> layer::call(bool foreground, const std::wstring& param){return impl_->call(foreground, param);}\r
boost::property_tree::wptree layer::info() const{return impl_->info();}\r
boost::property_tree::wptree layer::delay_info() const{return impl_->delay_info();}\r
-monitor::source& layer::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& layer::monitor_output(){return *impl_->monitor_subject_;}\r
}}
\ No newline at end of file
boost::property_tree::wptree info() const;\r
boost::property_tree::wptree delay_info() const;\r
\r
- monitor::source& monitor_output();\r
+ monitor::subject& monitor_output();\r
private:\r
struct implementation;\r
safe_ptr<implementation> impl_;\r
return info;
}
- monitor::source& monitor_output()
+ monitor::subject& monitor_output()
{
return monitor_subject_;
}
\r
struct separated_producer : public frame_producer\r
{ \r
- monitor::subject monitor_subject_;\r
- monitor::subject key_monitor_subject_;\r
+ safe_ptr<monitor::subject> monitor_subject_;\r
+ safe_ptr<monitor::subject> key_monitor_subject_;\r
\r
safe_ptr<frame_producer> fill_producer_;\r
safe_ptr<frame_producer> key_producer_;\r
safe_ptr<basic_frame> last_frame_;\r
\r
explicit separated_producer(const safe_ptr<frame_producer>& fill, const safe_ptr<frame_producer>& key) \r
- : monitor_subject_("")\r
- , key_monitor_subject_("/keyer")\r
+ : key_monitor_subject_(make_safe<monitor::subject>("/keyer"))\r
, fill_producer_(fill)\r
, key_producer_(key)\r
, fill_(core::basic_frame::late())\r
, key_(core::basic_frame::late())\r
, last_frame_(core::basic_frame::empty())\r
{\r
- key_monitor_subject_.link_target(&monitor_subject_);\r
+ key_monitor_subject_->attach_parent(monitor_subject_);\r
\r
- key_producer_->monitor_output().link_target(&key_monitor_subject_);\r
- fill_producer_->monitor_output().link_target(&monitor_subject_);\r
+ key_producer_->monitor_output().attach_parent(key_monitor_subject_);\r
+ fill_producer_->monitor_output().attach_parent(monitor_subject_);\r
}\r
\r
// frame_producer\r
return info;\r
}\r
\r
- monitor::source& monitor_output()\r
+ monitor::subject& monitor_output()\r
{\r
- return monitor_subject_;\r
+ return *monitor_subject_;\r
}\r
};\r
\r
// map of layer -> map of tokens (src ref) -> layer_consumer\r
std::map<int, std::map<void*, std::shared_ptr<write_frame_consumer>>> layer_consumers_;\r
\r
- monitor::subject monitor_subject_;\r
+ safe_ptr<monitor::subject> monitor_subject_;\r
\r
executor executor_;\r
\r
: graph_(graph)\r
, format_desc_(format_desc)\r
, target_(target)\r
- , monitor_subject_("/stage")\r
+ , monitor_subject_(make_safe<monitor::subject>("/stage"))\r
, executor_(L"stage")\r
{\r
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f, 0.8)); \r
if(it == std::end(layers_))\r
{\r
it = layers_.insert(std::make_pair(index, std::make_shared<layer>(index))).first;\r
- it->second->monitor_output().link_target(&monitor_subject_);\r
+ it->second->monitor_output().attach_parent(monitor_subject_);\r
}\r
return *it->second;\r
}\r
auto other_layers = other_impl->layers_ | boost::adaptors::map_values;\r
\r
BOOST_FOREACH(auto& layer, layers)\r
- layer->monitor_output().unlink_target(&monitor_subject_);\r
+ layer->monitor_output().detach_parent();\r
\r
BOOST_FOREACH(auto& layer, other_layers)\r
- layer->monitor_output().unlink_target(&monitor_subject_);\r
+ layer->monitor_output().attach_parent(monitor_subject_);\r
\r
std::swap(layers_, other_impl->layers_);\r
\r
BOOST_FOREACH(auto& layer, layers)\r
- layer->monitor_output().link_target(&monitor_subject_);\r
+ layer->monitor_output().detach_parent();\r
\r
BOOST_FOREACH(auto& layer, other_layers)\r
- layer->monitor_output().link_target(&monitor_subject_);\r
+ layer->monitor_output().detach_parent();\r
}; \r
\r
executor_.begin_invoke([=]\r
auto& my_layer = get_layer(index);\r
auto& other_layer = other_impl->get_layer(other_index);\r
\r
- my_layer.monitor_output().unlink_target(&monitor_subject_);\r
- other_layer.monitor_output().unlink_target(&other_impl->monitor_subject_);\r
+ my_layer.monitor_output().detach_parent();\r
+ other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);\r
\r
std::swap(my_layer, other_layer);\r
\r
- my_layer.monitor_output().link_target(&monitor_subject_);\r
- other_layer.monitor_output().link_target(&other_impl->monitor_subject_);\r
+ my_layer.monitor_output().detach_parent();\r
+ other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);\r
}; \r
\r
executor_.begin_invoke([=]\r
boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}\r
boost::unique_future<boost::property_tree::wptree> stage::delay_info() const{return impl_->delay_info();}\r
boost::unique_future<boost::property_tree::wptree> stage::delay_info(int index) const{return impl_->delay_info(index);}\r
-monitor::source& stage::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;}\r
}}
\ No newline at end of file
\r
void set_video_format_desc(const video_format_desc& format_desc);\r
\r
- monitor::source& monitor_output();\r
+ monitor::subject& monitor_output();\r
\r
private:\r
struct implementation;\r
\r
struct transition_producer : public frame_producer\r
{ \r
- monitor::subject monitor_subject_;\r
+ safe_ptr<monitor::subject> monitor_subject_;\r
\r
const field_mode::type mode_;\r
unsigned int current_frame_;\r
, source_producer_(frame_producer::empty())\r
, last_frame_(basic_frame::empty())\r
{\r
- dest->monitor_output().link_target(&monitor_subject_);\r
+ dest->monitor_output().attach_parent(monitor_subject_);\r
}\r
\r
// frame_producer\r
source = source_producer_->last_frame();\r
});\r
\r
- monitor_subject_ << monitor::message("/transition/frame") % static_cast<std::int32_t>(current_frame_) % static_cast<std::int32_t>(info_.duration)\r
- << monitor::message("/transition/type") % [&]() -> std::string\r
+ *monitor_subject_ << monitor::message("/transition/frame") % static_cast<std::int32_t>(current_frame_) % static_cast<std::int32_t>(info_.duration)\r
+ << monitor::message("/transition/type") % [&]() -> std::string\r
{\r
switch(info_.type)\r
{\r
return basic_frame::combine(s_frame, d_frame);\r
}\r
\r
- monitor::source& monitor_output()\r
+ monitor::subject& monitor_output()\r
{\r
- return monitor_subject_;\r
+ return *monitor_subject_;\r
}\r
};\r
\r
const safe_ptr<caspar::core::mixer> mixer_;\r
const safe_ptr<caspar::core::stage> stage_;\r
\r
- monitor::subject monitor_subject_;\r
+ safe_ptr<monitor::subject> monitor_subject_;\r
\r
public:\r
implementation(video_channel& self, int index, const video_format_desc& format_desc, const safe_ptr<ogl_device>& ogl, const channel_layout& audio_channel_layout) \r
, output_(new caspar::core::output(graph_, format_desc, index))\r
, mixer_(new caspar::core::mixer(graph_, output_, format_desc, ogl, audio_channel_layout))\r
, stage_(new caspar::core::stage(graph_, mixer_, format_desc)) \r
- , monitor_subject_("/channel/" + boost::lexical_cast<std::string>(index))\r
+ , monitor_subject_(make_safe<monitor::subject>("/channel/" + boost::lexical_cast<std::string>(index)))\r
{\r
graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
for(int n = 0; n < std::max(1, env::properties().get(L"configuration.pipeline-tokens", 2)); ++n)\r
stage_->spawn_token();\r
\r
- stage_->monitor_output().link_target(&monitor_subject_);\r
+ stage_->monitor_output().attach_parent(monitor_subject_);\r
\r
CASPAR_LOG(info) << print() << " Successfully Initialized.";\r
}\r
void video_channel::set_video_format_desc(const video_format_desc& format_desc){impl_->set_video_format_desc(format_desc);}\r
boost::property_tree::wptree video_channel::info() const{return impl_->info();}\r
int video_channel::index() const {return impl_->index_;}\r
-monitor::source& video_channel::monitor_output(){return impl_->monitor_subject_;}\r
+monitor::subject& video_channel::monitor_output(){return *impl_->monitor_subject_;}\r
boost::property_tree::wptree video_channel::delay_info() const { return impl_->delay_info(); }\r
}}
\ No newline at end of file
\r
int index() const;\r
\r
- monitor::source& monitor_output();\r
+ monitor::subject& monitor_output();\r
\r
private:\r
struct implementation;\r
return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"|" + format_desc_.name + L"]";\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
return info;\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return context_->monitor_output();\r
}\r
frame_buffer_.push(std::make_pair(make_safe_ptr(frame), file_frame_number));\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
\r
void STDMETHODCALLTYPE FlashAxContainer::OnReadyStateChange(long newState)\r
{\r
- if(newState == 4)\r
- {\r
+ if (newState == 4)\r
bReadyToRender_ = true;\r
- }\r
- else\r
- bReadyToRender_ = false;\r
}\r
\r
void FlashAxContainer::DestroyAxControl()\r
return L"";\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return flash_producer_->monitor_output();\r
}\r
};\r
- \r
-safe_ptr<cg_producer> get_default_cg_producer(\r
+\r
+void with_default_cg_producer(\r
+ std::function<void (safe_ptr<cg_producer>)> command,\r
const safe_ptr<core::video_channel>& video_channel,\r
bool expect_existing,\r
- int render_layer)\r
-{ \r
- auto flash_producer = video_channel->stage()->foreground(render_layer).get();\r
+ int layer_index)\r
+{\r
+ auto flash_producer = video_channel->stage()->foreground(layer_index).get();\r
+ bool was_created = false;\r
\r
try\r
{\r
if (expect_existing)\r
BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(\r
"No flash producer on layer "\r
- + boost::lexical_cast<std::string>(render_layer)));\r
+ + boost::lexical_cast<std::string>(layer_index)));\r
\r
- flash_producer = flash::create_producer(video_channel->mixer(), boost::assign::list_of<std::wstring>()); \r
- video_channel->stage()->load(render_layer, flash_producer); \r
- video_channel->stage()->play(render_layer);\r
+ flash_producer = flash::create_producer(video_channel->mixer(), boost::assign::list_of<std::wstring>());\r
}\r
\r
if (expect_existing && flash_producer->call(L"?").get() == L"0")\r
BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(\r
"No flash player on layer "\r
- + boost::lexical_cast<std::string>(render_layer)));\r
+ + boost::lexical_cast<std::string>(layer_index)));\r
+\r
+ was_created = true;\r
}\r
catch(...)\r
{\r
throw;\r
}\r
\r
- return make_safe<cg_producer>(flash_producer);\r
+ command(make_safe<cg_producer>(flash_producer));\r
+\r
+ if (was_created)\r
+ {\r
+ video_channel->stage()->load(layer_index, flash_producer); \r
+ video_channel->stage()->play(layer_index);\r
+ }\r
+}\r
+ \r
+safe_ptr<cg_producer> get_default_cg_producer(\r
+ const safe_ptr<core::video_channel>& video_channel,\r
+ bool expect_existing,\r
+ int render_layer)\r
+{ \r
+ std::shared_ptr<cg_producer> producer;\r
+\r
+ with_default_cg_producer(\r
+ [&producer](safe_ptr<cg_producer> p)\r
+ {\r
+ producer = p;\r
+ }, video_channel, expect_existing, render_layer);\r
+\r
+ return make_safe_ptr(producer);\r
}\r
\r
safe_ptr<core::frame_producer> create_cg_producer_and_autoplay_file(\r
cg_producer::cg_producer(cg_producer&& other) : impl_(std::move(other.impl_)){}\r
safe_ptr<core::basic_frame> cg_producer::receive(int hints){return impl_->receive(hints);}\r
safe_ptr<core::basic_frame> cg_producer::last_frame() const{return impl_->last_frame();}\r
-void cg_producer::add(int layer, const std::wstring& template_name, bool play_on_load, const std::wstring& startFromLabel, const std::wstring& data){impl_->add(layer, template_name, play_on_load, startFromLabel, data);}\r
+void cg_producer::add(int layer, const std::wstring& template_name, bool play_on_load, const std::wstring& startFromLabel, const std::wstring& data){impl_->add(layer, template_name, play_on_load, startFromLabel, data).wait();}\r
void cg_producer::remove(int layer){impl_->remove(layer);}\r
void cg_producer::play(int layer){impl_->play(layer);}\r
void cg_producer::stop(int layer, unsigned int mix_out_duration){impl_->stop(layer, mix_out_duration);}\r
std::wstring cg_producer::description(int layer){return impl_->timed_description(layer);}\r
std::wstring cg_producer::template_host_info(){return impl_->timed_template_host_info();}\r
boost::property_tree::wptree cg_producer::info() const{return impl_->info();}\r
-core::monitor::source& cg_producer::monitor_output(){return impl_->monitor_output();}\r
+core::monitor::subject& cg_producer::monitor_output(){return impl_->monitor_output();}\r
}}
\ No newline at end of file
#include <boost/thread/future.hpp>\r
\r
#include <string>\r
+#include <functional>\r
\r
namespace caspar {\r
namespace core {\r
std::wstring description(int layer);\r
std::wstring template_host_info();\r
\r
- core::monitor::source& monitor_output();\r
+ core::monitor::subject& monitor_output();\r
\r
private:\r
struct implementation;\r
};\r
safe_ptr<cg_producer> get_default_cg_producer(const safe_ptr<core::video_channel>& video_channel, bool expect_existing, int layer_index = cg_producer::DEFAULT_LAYER);\r
\r
+void with_default_cg_producer(std::function<void (safe_ptr<cg_producer>)> command, const safe_ptr<core::video_channel>& video_channel, bool expect_existing, int layer_index = cg_producer::DEFAULT_LAYER);\r
+\r
safe_ptr<core::frame_producer> create_ct_producer(\r
const safe_ptr<core::frame_factory>& frame_factory,\r
const core::parameters& params);\r
return template_host;\r
}\r
\r
+boost::mutex& get_global_init_destruct_mutex()\r
+{\r
+ static boost::mutex m;\r
+\r
+ return m;\r
+}\r
+\r
class flash_renderer\r
{ \r
struct com_init\r
\r
boost::timer frame_timer_;\r
boost::timer tick_timer_;\r
-\r
- high_prec_timer timer_;\r
\r
public:\r
flash_renderer(const safe_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height) \r
, filename_(filename)\r
, frame_factory_(frame_factory)\r
, ax_(nullptr)\r
- , head_(core::basic_frame::empty())\r
+ , head_(core::basic_frame::late())\r
, bmp_(width, height)\r
{ \r
graph_->set_color("frame-time", diagnostics::color(0.1f, 1.0f, 0.1f));\r
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f));\r
- graph_->set_color("param", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
- graph_->set_color("sync", diagnostics::color(0.8f, 0.3f, 0.2f)); \r
- \r
- if(FAILED(CComObject<caspar::flash::FlashAxContainer>::CreateInstance(&ax_)))\r
- BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to create FlashAxContainer"));\r
+ graph_->set_color("param", diagnostics::color(1.0f, 0.5f, 0.0f));\r
+ graph_->set_color("buffered", diagnostics::color(0.8f, 0.3f, 0.2f));\r
+\r
+ lock(get_global_init_destruct_mutex(), [this]\r
+ {\r
+\r
+ CoInitialize(nullptr);\r
+\r
+ if(FAILED(CComObject<caspar::flash::FlashAxContainer>::CreateInstance(&ax_)))\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to create FlashAxContainer"));\r
\r
- ax_->set_print([this]{return print();});\r
+ if(FAILED(ax_->CreateAxControl()))\r
+ BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Create FlashAxControl"));\r
+ });\r
\r
- if(FAILED(ax_->CreateAxControl()))\r
- BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Create FlashAxControl"));\r
+ ax_->set_print([this]{return print();});\r
\r
CComPtr<IShockwaveFlash> spFlash;\r
if(FAILED(ax_->QueryControl(&spFlash)))\r
BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Failed to Set Scale Mode"));\r
\r
ax_->SetSize(width_, height_); \r
- render_frame(false);\r
+ render_frame();\r
\r
CASPAR_LOG(info) << print() << L" Initialized.";\r
}\r
{ \r
if(ax_)\r
{\r
- ax_->DestroyAxControl();\r
- ax_->Release();\r
+ lock(get_global_init_destruct_mutex(), [this]\r
+ {\r
+ ax_->DestroyAxControl();\r
+ ax_->Release();\r
+ });\r
}\r
graph_->set_value("tick-time", 0.0f);\r
graph_->set_value("frame-time", 0.0f);\r
\r
if(!ax_->FlashCall(param, result))\r
CASPAR_LOG(warning) << print() << L" Flash call failed:" << param;//BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("Flash function call failed.") << arg_name_info("param") << arg_value_info(narrow(param)));\r
- graph_->set_tag("param");\r
+ graph_->set_tag("param");\r
\r
return result;\r
}\r
\r
- safe_ptr<core::basic_frame> render_frame(double sync)\r
+ safe_ptr<core::basic_frame> render_frame()\r
{\r
float frame_time = 1.0f/ax_->GetFPS();\r
\r
graph_->set_value("tick-time", static_cast<float>(tick_timer_.elapsed()/frame_time)*0.5f);\r
tick_timer_.restart();\r
\r
+ if (!ax_->IsReadyToRender())\r
+ return head_;\r
+\r
if(ax_->IsEmpty())\r
- return core::basic_frame::empty(); \r
+ return core::basic_frame::empty();\r
\r
- if(sync > 0.00001) \r
- timer_.tick(frame_time*sync); // This will block the thread.\r
- else\r
- graph_->set_tag("sync");\r
-\r
- graph_->set_value("sync", sync);\r
- \r
frame_timer_.restart();\r
\r
ax_->Tick();\r
std::queue<safe_ptr<core::basic_frame>> frame_buffer_;\r
tbb::concurrent_bounded_queue<safe_ptr<core::basic_frame>> output_buffer_;\r
\r
- mutable tbb::spin_mutex last_frame_mutex_;\r
safe_ptr<core::basic_frame> last_frame_;\r
\r
std::unique_ptr<flash_renderer> renderer_;\r
graph_->set_text(print());\r
diagnostics::register_graph(graph_);\r
\r
- renderer_.reset(new flash_renderer(graph_, frame_factory_, filename_, width_, height_));\r
- has_renderer_ = true;\r
-\r
- while(output_buffer_.size() < buffer_size_)\r
- output_buffer_.push(core::basic_frame::empty());\r
+ has_renderer_ = false;\r
}\r
\r
~flash_producer()\r
{ \r
auto frame = core::basic_frame::late();\r
\r
- if(output_buffer_.try_pop(frame)) \r
- next();\r
+ double buffered = output_buffer_.size();\r
+ auto ratio = buffered / buffer_size_;\r
+ graph_->set_value("buffered", ratio);\r
+\r
+ if(output_buffer_.try_pop(frame))\r
+ last_frame_ = frame;\r
else\r
graph_->set_tag("late-frame");\r
+\r
+ fill_buffer();\r
\r
monitor_subject_ << core::monitor::message("/host/path") % filename_\r
<< core::monitor::message("/host/width") % width_\r
\r
virtual safe_ptr<core::basic_frame> last_frame() const override\r
{\r
- return lock(last_frame_mutex_, [this]\r
- {\r
- return last_frame_;\r
- });\r
+ return last_frame_;\r
} \r
\r
virtual boost::unique_future<std::wstring> call(const std::wstring& param) override\r
{\r
try\r
{\r
- if(!renderer_)\r
+ bool initialize_renderer = !renderer_;\r
+\r
+ if(initialize_renderer)\r
{\r
renderer_.reset(new flash_renderer(graph_, frame_factory_, filename_, width_, height_));\r
\r
- while(output_buffer_.size() < buffer_size_)\r
- output_buffer_.push(core::basic_frame::empty());\r
-\r
has_renderer_ = true;\r
}\r
\r
- return renderer_->call(param); \r
+ std::wstring result = param == L"start_rendering"\r
+ ? L"" : renderer_->call(param);\r
\r
+ if (initialize_renderer)\r
+ {\r
+ do_fill_buffer();\r
+ }\r
+\r
+ return result;\r
//const auto& format_desc = frame_factory_->get_video_format_desc();\r
//if(abs(context_->fps() - format_desc.fps) > 0.01 && abs(context_->fps()/2.0 - format_desc.fps) > 0.01)\r
// CASPAR_LOG(warning) << print() << " Invalid frame-rate: " << context_->fps() << L". Should be either " << format_desc.fps << L" or " << format_desc.fps*2.0 << L".";\r
}\r
\r
return L"";\r
- });\r
+ }, high_priority);\r
}\r
\r
virtual std::wstring print() const override\r
}\r
\r
// flash_producer\r
- \r
- void next()\r
- { \r
+\r
+ void fill_buffer()\r
+ {\r
executor_.begin_invoke([this]\r
{\r
- if(!renderer_)\r
- frame_buffer_.push(core::basic_frame::empty());\r
+ do_fill_buffer();\r
+ });\r
+ }\r
+\r
+ void do_fill_buffer()\r
+ {\r
+ int nothing_rendered = 0;\r
+ const int MAX_NOTHING_RENDERED_RETRIES = 4;\r
+\r
+ auto to_render = buffer_size_ - output_buffer_.size();\r
+ int rendered = 0;\r
+\r
+ while (rendered < to_render)\r
+ {\r
+ bool was_rendered = next();\r
\r
- if(frame_buffer_.empty())\r
+ if (was_rendered)\r
{\r
- auto format_desc = frame_factory_->get_video_format_desc();\r
+ ++rendered;\r
+ }\r
+ else\r
+ {\r
+ if (nothing_rendered++ < MAX_NOTHING_RENDERED_RETRIES)\r
+ {\r
+ // Flash player not ready with first frame, sleep to not busy-loop;\r
+ boost::this_thread::sleep(boost::posix_time::milliseconds(10));\r
+ boost::this_thread::yield();\r
+ }\r
+ else\r
+ return;\r
+ }\r
+\r
+ executor_.yield();\r
+ }\r
+ }\r
+ \r
+ bool next()\r
+ { \r
+ if(!renderer_)\r
+ frame_buffer_.push(core::basic_frame::empty());\r
+\r
+ if(frame_buffer_.empty())\r
+ {\r
+ auto format_desc = frame_factory_->get_video_format_desc();\r
\r
- if(abs(renderer_->fps()/2.0 - format_desc.fps) < 2.0) // flash == 2 * format -> interlace\r
+ if(abs(renderer_->fps()/2.0 - format_desc.fps) < 2.0) // flash == 2 * format -> interlace\r
+ {\r
+ auto frame1 = render_frame();\r
+\r
+ if (frame1 != core::basic_frame::late())\r
{\r
- auto frame1 = render_frame();\r
auto frame2 = render_frame();\r
frame_buffer_.push(core::basic_frame::interlace(frame1, frame2, format_desc.field_mode));\r
}\r
- else if(abs(renderer_->fps() - format_desc.fps/2.0) < 2.0) // format == 2 * flash -> duplicate\r
+ }\r
+ else if(abs(renderer_->fps() - format_desc.fps/2.0) < 2.0) // format == 2 * flash -> duplicate\r
+ {\r
+ auto frame = render_frame();\r
+\r
+ if (frame != core::basic_frame::late())\r
{\r
- auto frame = render_frame();\r
frame_buffer_.push(frame);\r
frame_buffer_.push(frame);\r
}\r
- else //if(abs(renderer_->fps() - format_desc_.fps) < 0.1) // format == flash -> simple\r
- {\r
- auto frame = render_frame();\r
+ }\r
+ else //if(abs(renderer_->fps() - format_desc_.fps) < 0.1) // format == flash -> simple\r
+ {\r
+ auto frame = render_frame();\r
+\r
+ if (frame != core::basic_frame::late())\r
frame_buffer_.push(frame);\r
- }\r
+ }\r
\r
- fps_.fetch_and_store(static_cast<int>(renderer_->fps()*100.0)); \r
- graph_->set_text(print());\r
+ fps_.fetch_and_store(static_cast<int>(renderer_->fps()*100.0)); \r
+ graph_->set_text(print());\r
\r
- if(renderer_->is_empty())\r
- {\r
- renderer_.reset();\r
- has_renderer_ = false;\r
- }\r
+ if(renderer_->is_empty())\r
+ {\r
+ renderer_.reset();\r
+ has_renderer_ = false;\r
}\r
+ }\r
\r
+ if (frame_buffer_.empty())\r
+ {\r
+ return false;\r
+ }\r
+ else\r
+ {\r
output_buffer_.push(std::move(frame_buffer_.front()));\r
frame_buffer_.pop();\r
- });\r
+ return true;\r
+ }\r
}\r
\r
safe_ptr<core::basic_frame> render_frame()\r
{ \r
- double ratio = std::min(1.0, static_cast<double>(output_buffer_.size())/static_cast<double>(std::max(1, buffer_size_ - 1)));\r
- double sync = 2*ratio - ratio*ratio;\r
-\r
- auto frame = renderer_->render_frame(sync);\r
- lock(last_frame_mutex_, [&]\r
- {\r
- last_frame_ = frame;\r
- });\r
- return frame;\r
+ return renderer_->render_frame();\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
\r
swf_t::header_t header(filename);\r
\r
- return create_producer_destroy_proxy(\r
- create_producer_print_proxy(\r
- make_safe<flash_producer>(frame_factory, filename, header.frame_width, header.frame_height)));\r
+ auto producer = make_safe<flash_producer>(\r
+ frame_factory, filename, header.frame_width, header.frame_height);\r
+\r
+ producer->call(L"start_rendering").get();\r
+\r
+ return create_producer_destroy_proxy(create_producer_print_proxy(producer));\r
}\r
\r
std::wstring find_template(const std::wstring& template_name)\r
return info;\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
}\r
}\r
\r
- core::monitor::source& monitor_output()\r
+ core::monitor::subject& monitor_output()\r
{\r
return monitor_subject_;\r
}\r
std::wstring filename = _parameters[2];\r
filename.append(extension);\r
\r
- flash::get_default_cg_producer(safe_ptr<core::video_channel>(GetChannel()), false, GetLayerIndex(flash::cg_producer::DEFAULT_LAYER))->add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));\r
+ flash::with_default_cg_producer(\r
+ [&](safe_ptr<flash::cg_producer> producer)\r
+ {\r
+ producer->add(layer, filename, bDoStart, label, (pDataString!=0) ? pDataString : TEXT(""));\r
+ },\r
+ safe_ptr<core::video_channel>(GetChannel()), false, GetLayerIndex(flash::cg_producer::DEFAULT_LAYER));\r
SetReplyString(TEXT("202 CG OK\r\n"));\r
}\r
else\r
#include "client.h"
-#include "oscpack/oscOutboundPacketStream.h"
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
#include <common/utility/string.h>
#include <common/exception/win32_exception.h>
+#include <common/memory/endian.h>
+
+#include <core/monitor/monitor.h>
#include <functional>
#include <vector>
+#include <unordered_map>
#include <boost/asio.hpp>
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
+#include <boost/thread.hpp>
#include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
using namespace boost::asio::ip;
namespace caspar { namespace protocol { namespace osc {
+template<typename T>
+struct no_init_proxy
+{
+ T value;
+
+ no_init_proxy()
+ {
+ static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+ static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+ }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
template<typename T>
struct param_visitor : public boost::static_visitor<void>
{
void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
};
-std::vector<char> write_osc_event(const core::monitor::message& e)
-{
- std::array<char, 4096> buffer;
- ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+{
+ destination.resize(4096);
- o << ::osc::BeginMessage(e.path().c_str());
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginMessage(e.path().c_str());
- param_visitor<decltype(o)> pd_visitor(o);
- BOOST_FOREACH(auto data, e.data())
- boost::apply_visitor(pd_visitor, data);
+ param_visitor<decltype(o)> param_visitor(o);
+ BOOST_FOREACH(const auto& data, e.data())
+ boost::apply_visitor(param_visitor, data);
- o << ::osc::EndMessage;
+ o << ::osc::EndMessage;
- return std::vector<char>(o.Data(), o.Data() + o.Size());
+ destination.resize(o.Size());
}
-struct client::impl : public std::enable_shared_from_this<client::impl>
+byte_vector write_osc_bundle_start()
{
- tbb::spin_mutex endpoints_mutex_;
- std::map<udp::endpoint, int> reference_counts_by_endpoint_;
- udp::socket socket_;
+ byte_vector destination;
+ destination.resize(16);
+
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginBundle();
+
+ destination.resize(o.Size());
+
+ return destination;
+}
+
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{
+ destination.resize(4);
+
+ int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
- Concurrency::call<core::monitor::message> on_next_;
+#ifdef OSC_HOST_LITTLE_ENDIAN
+ *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+ *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public std::enable_shared_from_this<client::impl>, core::monitor::sink
+{
+ udp::socket socket_;
+ tbb::spin_mutex endpoints_mutex_;
+ std::map<udp::endpoint, int> reference_counts_by_endpoint_;
+
+ std::unordered_map<std::string, byte_vector> updates_;
+ boost::mutex updates_mutex_;
+ boost::condition_variable updates_cond_;
+
+ tbb::atomic<bool> is_running_;
+
+ boost::thread thread_;
public:
- impl(
- boost::asio::io_service& service,
- Concurrency::ISource<core::monitor::message>& source)
+ impl(boost::asio::io_service& service)
: socket_(service, udp::v4())
- , on_next_([this](const core::monitor::message& msg) { on_next(msg); })
+ , thread_(boost::bind(&impl::run, this))
+ {
+ }
+
+ ~impl()
{
- source.link_target(&on_next_);
+ is_running_ = false;
+
+ updates_cond_.notify_one();
+
+ thread_.join();
}
std::shared_ptr<void> get_subscription_token(
self.reference_counts_by_endpoint_.erase(endpoint);
});
}
-
- void on_next(const core::monitor::message& msg)
+private:
+ void propagate(const core::monitor::message& msg)
{
- win32_exception::ensure_handler_installed_for_thread("agents-thread");
- auto data_ptr = make_safe<std::vector<char>>(write_osc_event(msg));
-
- tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+ boost::lock_guard<boost::mutex> lock(updates_mutex_);
- BOOST_FOREACH(auto& elem, reference_counts_by_endpoint_)
+ try
{
- auto& endpoint = elem.first;
-
- // TODO: We seem to be lucky here, because according to asio
- // documentation only one async operation can be "in flight"
- // at any given point in time for a socket. This somehow seems
- // to work though in the case of UDP and Windows.
- socket_.async_send_to(
- boost::asio::buffer(*data_ptr),
- endpoint,
- boost::bind(
- &impl::handle_send_to,
- this,
- data_ptr, // The data_ptr needs to live
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ write_osc_event(updates_[msg.path()], msg);
}
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ updates_.erase(msg.path());
+ }
+
+ updates_cond_.notify_one();
+ }
+
+ template<typename T>
+ void do_send(
+ const T& buffers, const std::vector<udp::endpoint>& destinations)
+ {
+ boost::system::error_code ec;
+
+ BOOST_FOREACH(const auto& endpoint, destinations)
+ socket_.send_to(buffers, endpoint, 0, ec);
}
- void handle_send_to(
- const safe_ptr<std::vector<char>>& /* sent_buffer */,
- const boost::system::error_code& /*error*/,
- size_t /*bytes_sent*/)
+ void run()
{
+ // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+ const int SAFE_DATAGRAM_SIZE = 508;
+
+ try
+ {
+ is_running_ = true;
+
+ std::unordered_map<std::string, byte_vector> updates;
+ std::vector<udp::endpoint> destinations;
+ const byte_vector bundle_header = write_osc_bundle_start();
+ std::vector<byte_vector> element_headers;
+
+ while (is_running_)
+ {
+ updates.clear();
+ destinations.clear();
+
+ {
+ boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+
+ if (updates_.empty())
+ updates_cond_.wait(cond_lock);
+
+ std::swap(updates, updates_);
+ }
+
+ {
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+ destinations.push_back(endpoint.first);
+ }
+
+ if (destinations.empty())
+ continue;
+
+ std::vector<boost::asio::const_buffers_1> buffers;
+ element_headers.resize(
+ std::max(element_headers.size(), updates.size()));
+
+ int i = 0;
+ int datagram_size = bundle_header.size();
+ buffers.push_back(boost::asio::buffer(bundle_header));
+
+ BOOST_FOREACH(const auto& slot, updates)
+ {
+ write_osc_bundle_element_start(element_headers[i], slot.second);
+ const auto& headers = element_headers;
+
+ auto size_of_element = headers[i].size() + slot.second.size();
+
+ if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+ {
+ do_send(buffers, destinations);
+ buffers.clear();
+ buffers.push_back(boost::asio::buffer(bundle_header));
+ datagram_size = bundle_header.size();
+ }
+
+ buffers.push_back(boost::asio::buffer(headers[i]));
+ buffers.push_back(boost::asio::buffer(slot.second));
+
+ datagram_size += size_of_element;
+ ++i;
+ }
+
+ if (!buffers.empty())
+ do_send(buffers, destinations);
+ }
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
}
};
-client::client(
- boost::asio::io_service& service,
- Concurrency::ISource<core::monitor::message>& source)
- : impl_(new impl(service, source))
+client::client(boost::asio::io_service& service)
+ : impl_(new impl(service))
{
}
return impl_->get_subscription_token(endpoint);
}
+safe_ptr<core::monitor::sink> client::sink()
+{
+ return impl_;
+}
+
}}}
// Constructors
- client(
- boost::asio::io_service& service,
- Concurrency::ISource<core::monitor::message>& source);
+ client(boost::asio::io_service& service);
client(client&&);
// Properties
+ safe_ptr<core::monitor::sink> sink();
private:
struct impl;
- std::shared_ptr<impl> impl_;
+ safe_ptr<impl> impl_;
};
}}}
struct server::implementation : boost::noncopyable\r
{\r
protocol::asio::io_service_manager io_service_manager_;\r
- core::monitor::subject monitor_subject_;\r
+ safe_ptr<core::monitor::subject> monitor_subject_;\r
boost::promise<bool>& shutdown_server_now_;\r
safe_ptr<ogl_device> ogl_;\r
std::vector<safe_ptr<IO::AsyncEventServer>> async_servers_; \r
implementation(boost::promise<bool>& shutdown_server_now)\r
: shutdown_server_now_(shutdown_server_now)\r
, ogl_(ogl_device::create())\r
- , osc_client_(io_service_manager_.service(), monitor_subject_)\r
+ , osc_client_(io_service_manager_.service())\r
{\r
setup_audio(env::properties());\r
\r
\r
channels_.push_back(make_safe<video_channel>(channels_.size()+1, format_desc, ogl_, audio_channel_layout));\r
\r
- channels_.back()->monitor_output().link_target(&monitor_subject_);\r
+ channels_.back()->monitor_output().attach_parent(monitor_subject_);\r
channels_.back()->mixer()->set_straight_alpha_output(\r
xml_channel.second.get(L"straight-alpha-output", false));\r
\r
{ \r
using boost::property_tree::wptree;\r
using namespace boost::asio::ip;\r
+\r
+ monitor_subject_->attach_parent(osc_client_.sink());\r
\r
auto default_port =\r
pt.get<unsigned short>(L"configuration.osc.default-port", 6250);\r
return impl_->thumbnail_generator_;\r
}\r
\r
-core::monitor::source& server::monitor_output()\r
+core::monitor::subject& server::monitor_output()\r
{\r
- return impl_->monitor_subject_;\r
+ return *impl_->monitor_subject_;\r
}\r
\r
}
\ No newline at end of file
const std::vector<safe_ptr<core::video_channel>> get_channels() const;\r
std::shared_ptr<core::thumbnail_generator> get_thumbnail_generator() const;\r
\r
- core::monitor::source& monitor_output();\r
+ core::monitor::subject& monitor_output();\r
\r
private:\r
struct implementation;\r