#include "except.h"
+#include <boost/thread.hpp>
+
#include "os/windows/windows.h"
-namespace caspar {
+namespace caspar { namespace detail {
+
+typedef struct tagTHREADNAME_INFO
+{
+ DWORD dwType; // must be 0x1000
+ LPCSTR szName; // pointer to name (in user addr space)
+ DWORD dwThreadID; // thread ID (-1=caller thread)
+ DWORD dwFlags; // reserved for future use, must be zero
+} THREADNAME_INFO;
+
+inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
+{
+ THREADNAME_INFO info;
+ {
+ info.dwType = 0x1000;
+ info.szName = szThreadName;
+ info.dwThreadID = dwThreadID;
+ info.dwFlags = 0;
+ }
+ __try
+ {
+ RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(ULONG_PTR), (ULONG_PTR*)&info );
+ }
+ __except (EXCEPTION_CONTINUE_EXECUTION){}
+}
+
+} // namespace detail
+
+bool& installed_for_thread()
+{
+ static boost::thread_specific_ptr<bool> installed;
+
+ auto for_thread = installed.get();
+ if (!for_thread)
+ {
+ for_thread = new bool(false);
+ installed.reset(for_thread);
+ }
+
+ return *for_thread;
+}
+
void win32_exception::install_handler()
{
//#ifndef _DEBUG
_set_se_translator(win32_exception::Handler);
+ installed_for_thread() = true;
//#endif
}
+void win32_exception::ensure_handler_installed_for_thread(
+ const char* thread_description)
+{
+ if (!installed_for_thread())
+ {
+ install_handler();
+
+ if (thread_description)
+ detail::SetThreadName(GetCurrentThreadId(), thread_description);
+ }
+}
+
void win32_exception::Handler(unsigned int errorCode, EXCEPTION_POINTERS* pInfo) {
switch(errorCode)
{
public:
typedef const void* address;
static void install_handler();
+ static void ensure_handler_installed_for_thread(
+ const char* thread_description = nullptr);
address location() const { return location_; }
unsigned int error_code() const { return errorCode_; }
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
- monitor::source& monitor_output() override {return consumer_->monitor_output();}
+ monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
class print_consumer_proxy : public frame_consumer
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
- monitor::source& monitor_output() override {return consumer_->monitor_output();}
+ monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
class recover_consumer_proxy : public frame_consumer
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
- monitor::source& monitor_output() override {return consumer_->monitor_output();}
+ monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
// This class is used to guarantee that audio cadence is correct. This is important for NTSC audio.
bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();}
int buffer_depth() const override {return consumer_->buffer_depth();}
int index() const override {return consumer_->index();}
- monitor::source& monitor_output() override {return consumer_->monitor_output();}
+ monitor::subject& monitor_output() override {return consumer_->monitor_output();}
};
spl::shared_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)
bool has_synchronization_clock() const override {return false;}
int buffer_depth() const override {return 0;};
virtual int index() const{return -1;}
- monitor::source& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}
+ monitor::subject& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}
boost::property_tree::wptree info() const override
{
boost::property_tree::wptree info;
// monitor::observable
- virtual monitor::source& monitor_output() = 0;
+ virtual monitor::subject& monitor_output() = 0;
// Properties
struct output::impl
{
spl::shared_ptr<diagnostics::graph> graph_;
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
const int channel_index_;
video_format_desc format_desc_;
std::map<int, port> ports_;
public:
impl(spl::shared_ptr<diagnostics::graph> graph, const video_format_desc& format_desc, int channel_index)
: graph_(std::move(graph))
- , monitor_subject_("/output")
+ , monitor_subject_(spl::make_shared<monitor::subject>("/output"))
, channel_index_(channel_index)
, format_desc_(format_desc)
, executor_(L"output")
executor_.begin_invoke([this, index, consumer]
{
port p(index, channel_index_, std::move(consumer));
- p.monitor_output().link_target(&monitor_subject_);
+ p.monitor_output().attach_parent(monitor_subject_);
ports_.insert(std::make_pair(index, std::move(p)));
}, task_priority::high_priority);
}
void output::remove(const spl::shared_ptr<frame_consumer>& consumer){impl_->remove(consumer);}
boost::unique_future<boost::property_tree::wptree> output::info() const{return impl_->info();}
void output::operator()(const_frame frame, const video_format_desc& format_desc){(*impl_)(std::move(frame), format_desc);}
-monitor::source& output::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& output::monitor_output() {return *impl_->monitor_subject_;}
}}
\ No newline at end of file
void remove(const spl::shared_ptr<class frame_consumer>& consumer);
void remove(int index);
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
// Properties
struct port::impl
{
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
std::shared_ptr<frame_consumer> consumer_;
int index_;
int channel_index_;
public:
impl(int index, int channel_index, spl::shared_ptr<frame_consumer> consumer)
- : monitor_subject_("/port" + boost::lexical_cast<std::string>(index))
+ : monitor_subject_(spl::make_shared<monitor::subject>(
+ "/port" + boost::lexical_cast<std::string>(index)))
, consumer_(std::move(consumer))
, index_(index)
, channel_index_(channel_index)
{
- consumer_->monitor_output().link_target(&monitor_subject_);
+ consumer_->monitor_output().attach_parent(monitor_subject_);
}
void video_format_desc(const struct video_format_desc& format_desc)
boost::unique_future<bool> send(const_frame frame)
{
- monitor_subject_ << monitor::message("/type") % consumer_->name();
+ *monitor_subject_ << monitor::message("/type") % consumer_->name();
return consumer_->send(std::move(frame));
}
std::wstring print() const
port::~port(){}
port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;}
boost::unique_future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));}
-monitor::source& port::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& port::monitor_output() {return *impl_->monitor_subject_;}
void port::video_format_desc(const struct video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
int port::buffer_depth() const{return impl_->buffer_depth();}
std::wstring port::print() const{ return impl_->print();}
boost::unique_future<bool> send(class const_frame frame);
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
// Properties
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
#include "../StdAfx.h"
#include "monitor.h"
-#include <utility>
+namespace caspar { namespace core { namespace monitor {
-namespace caspar { namespace monitor {
-
-}}
\ No newline at end of file
+/*class in_callers_thread_schedule_group : public Concurrency::ScheduleGroup
+{
+ virtual void ScheduleTask(Concurrency::TaskProc proc, void* data) override
+ {
+ proc(data);
+ }
+
+ virtual unsigned int Id() const override
+ {
+ return 1;
+ }
+
+ virtual unsigned int Reference() override
+ {
+ return 1;
+ }
+
+ virtual unsigned int Release() override
+ {
+ return 1;
+ }
+};
+
+Concurrency::ScheduleGroup& get_in_callers_thread_schedule_group()
+{
+ static in_callers_thread_schedule_group group;
+
+ return group;
+}*/
+
+}}}
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
#pragma once
#include <common/memory.h>
#include <string>
#include <vector>
-#include <agents.h>
-
-namespace caspar { namespace monitor {
-
+namespace caspar { namespace core { namespace monitor {
+
typedef boost::variant<bool,
std::int32_t,
std::int64_t,
{
CASPAR_ASSERT(path.empty() || path[0] == '/');
}
-
+
message(std::string path, spl::shared_ptr<std::vector<data_t>> data_ptr)
: path_(std::move(path))
, data_ptr_(std::move(data_ptr))
spl::shared_ptr<std::vector<data_t>> data_ptr_;
};
-class subject : public Concurrency::transformer<monitor::message, monitor::message>
+struct sink
{
+ virtual ~sink() { }
+
+ virtual void propagate(const message& msg) = 0;
+};
+
+class subject : public sink
+{
+private:
+ std::weak_ptr<sink> parent_;
+ const std::string path_;
public:
subject(std::string path = "")
- : Concurrency::transformer<monitor::message, monitor::message>([=](const message& msg)
- {
- return msg.propagate(path);
- })
+ : path_(std::move(path))
{
CASPAR_ASSERT(path.empty() || path[0] == '/');
}
- template<typename T>
- subject& operator<<(T&& msg)
+ void attach_parent(spl::shared_ptr<sink> parent)
{
- Concurrency::send(*this, std::forward<T>(msg));
+ parent_ = std::move(parent);
+ }
+
+ void detach_parent()
+ {
+ parent_.reset();
+ }
+
+ subject& operator<<(const message& msg)
+ {
+ propagate(msg);
+
return *this;
}
-};
-typedef Concurrency::ISource<monitor::message> source;
+ virtual void propagate(const message& msg) override
+ {
+ auto parent = parent_.lock();
+ if (parent)
+ parent->propagate(msg.propagate(path_));
+ }
+};
-}}
\ No newline at end of file
+}}}
return info;
}
- monitor::source& monitor_output() override {return monitor_subject_;}
+ monitor::subject& monitor_output() override {return monitor_subject_;}
};
std::wstring get_hex_color(const std::wstring& str)
return info;
}
- monitor::source& monitor_output()
+ monitor::subject& monitor_output()
{
return monitor_subject_;
}
void paused(bool value) override{}
uint32_t nb_frames() const override {return 0;}
std::wstring print() const override { return L"empty";}
- monitor::source& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}
+ monitor::subject& monitor_output() override {static monitor::subject monitor_subject(""); return monitor_subject;}
std::wstring name() const override {return L"empty";}
uint32_t frame_number() const override {return 0;}
boost::unique_future<std::wstring> call(const std::vector<std::wstring>& params) override{CASPAR_THROW_EXCEPTION(not_supported());}
uint32_t nb_frames() const override {return producer_->nb_frames();}
class draw_frame last_frame() {return producer_->last_frame();}
draw_frame create_thumbnail_frame() {return producer_->create_thumbnail_frame();}
- monitor::source& monitor_output() override {return producer_->monitor_output();}
+ monitor::subject& monitor_output() override {return producer_->monitor_output();}
bool collides(double x, double y) {return producer_->collides(x, y);}
void on_interaction(const interaction_event::ptr& event) {return producer_->on_interaction(event);}
constraints& pixel_constraints() override {return producer_->pixel_constraints();}
// monitor::observable
- virtual monitor::source& monitor_output() = 0;
+ virtual monitor::subject& monitor_output() = 0;
// interaction_sink
virtual void on_interaction(const interaction_event::ptr& event) override { }
struct layer::impl
{
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
spl::shared_ptr<frame_producer> foreground_;
spl::shared_ptr<frame_producer> background_;
boost::optional<int32_t> auto_play_delta_;
+ bool is_paused_;
public:
impl(int index)
- : monitor_subject_("/layer/" + boost::lexical_cast<std::string>(index))
+ : monitor_subject_(spl::make_shared<monitor::subject>(
+ "/layer/" + boost::lexical_cast<std::string>(index)))
// , foreground_event_subject_("")
// , background_event_subject_("background")
, foreground_(frame_producer::empty())
, background_(frame_producer::empty())
+ , is_paused_(false)
{
// foreground_event_subject_.subscribe(event_subject_);
// background_event_subject_.subscribe(event_subject_);
void set_foreground(spl::shared_ptr<frame_producer> producer)
{
- foreground_->monitor_output().unlink_target(&monitor_subject_);
+ foreground_->monitor_output().detach_parent();
foreground_ = std::move(producer);
- foreground_->monitor_output().link_target(&monitor_subject_);
+ foreground_->monitor_output().attach_parent(monitor_subject_);
}
void pause()
{
foreground_->paused(true);
+ is_paused_ = true;
}
void load(spl::shared_ptr<frame_producer> producer, bool preview, const boost::optional<int32_t>& auto_play_delta)
{
play();
foreground_->paused(true);
+ is_paused_ = true;
}
if(auto_play_delta_ && foreground_ == frame_producer::empty())
}
foreground_->paused(false);
+ is_paused_ = false;
}
void stop()
{
try
{
+ *monitor_subject_ << monitor::message("/paused") % is_paused_;
+
auto frame = foreground_->receive();
if(frame == core::draw_frame::late())
spl::shared_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}
spl::shared_ptr<frame_producer> layer::background() const { return impl_->background_;}
boost::property_tree::wptree layer::info() const{return impl_->info();}
-monitor::source& layer::monitor_output() {return impl_->monitor_subject_;}
+monitor::subject& layer::monitor_output() {return *impl_->monitor_subject_;}
void layer::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
bool layer::collides(double x, double y) const { return impl_->collides(x, y); }
}}
\ No newline at end of file
// monitor::observable
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
// interaction_sink
return info;
}
- monitor::source& monitor_output()
+ monitor::subject& monitor_output()
{
return monitor_subject_;
}
return info;
}
-monitor::source& hotswap_producer::monitor_output()
+monitor::subject& hotswap_producer::monitor_output()
{
return impl_->monitor_subject;
}
std::wstring print() const override;
std::wstring name() const override;
boost::property_tree::wptree info() const override;
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
binding<std::shared_ptr<frame_producer>>& producer();
private:
return info;
}
- monitor::source& monitor_output()
+ monitor::subject& monitor_output()
{
return monitor_subject_;
}
return impl_->call(params);
}
-monitor::source& scene_producer::monitor_output()
+monitor::subject& scene_producer::monitor_output()
{
return impl_->monitor_output();
}
std::wstring name() const override;
boost::unique_future<std::wstring> call(const std::vector<std::wstring>& params) override;
boost::property_tree::wptree info() const override;
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
layer& create_layer(
const spl::shared_ptr<frame_producer>& producer, int x, int y, const std::wstring& name);
class separated_producer : public frame_producer_base
{
- monitor::subject monitor_subject_;
- monitor::subject key_monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
+ spl::shared_ptr<monitor::subject> key_monitor_subject_;
spl::shared_ptr<frame_producer> fill_producer_;
spl::shared_ptr<frame_producer> key_producer_;
public:
explicit separated_producer(const spl::shared_ptr<frame_producer>& fill, const spl::shared_ptr<frame_producer>& key)
- : monitor_subject_("")
- , key_monitor_subject_("/keyer")
+ : key_monitor_subject_(spl::make_shared<monitor::subject>("/keyer"))
, fill_producer_(fill)
, key_producer_(key)
, fill_(core::draw_frame::late())
{
CASPAR_LOG(info) << print() << L" Initialized";
- key_monitor_subject_.link_target(&monitor_subject_);
+ key_monitor_subject_->attach_parent(monitor_subject_);
- key_producer_->monitor_output().link_target(&key_monitor_subject_);
- fill_producer_->monitor_output().link_target(&monitor_subject_);
+ key_producer_->monitor_output().attach_parent(key_monitor_subject_);
+ fill_producer_->monitor_output().attach_parent(monitor_subject_);
}
// frame_producer
return fill_producer_->info();;
}
- monitor::source& monitor_output() { return monitor_subject_; }
+ monitor::subject& monitor_output() { return *monitor_subject_; }
};
spl::shared_ptr<frame_producer> create_separated_producer(const spl::shared_ptr<frame_producer>& fill, const spl::shared_ptr<frame_producer>& key)
struct stage::impl : public std::enable_shared_from_this<impl>
{
spl::shared_ptr<diagnostics::graph> graph_;
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
//reactive::basic_subject<std::map<int, class draw_frame>> frames_subject_;
std::map<int, layer> layers_;
std::map<int, tweened_transform> tweens_;
public:
impl(spl::shared_ptr<diagnostics::graph> graph)
: graph_(std::move(graph))
- , monitor_subject_("/stage")
+ , monitor_subject_(spl::make_shared<monitor::subject>("/stage"))
, aggregator_([=] (double x, double y) { return collission_detect(x, y); })
, executor_(L"stage")
{
//frames_subject_ << frames;
graph_->set_value("produce-time", frame_timer.elapsed()*format_desc.fps*0.5);
- monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);
+ *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);
return frames;
}
if(it == std::end(layers_))
{
it = layers_.insert(std::make_pair(index, layer(index))).first;
- it->second.monitor_output().link_target(&monitor_subject_);
+ it->second.monitor_output().attach_parent(monitor_subject_);
}
return it->second;
}
auto other_layers = other_impl->layers_ | boost::adaptors::map_values;
BOOST_FOREACH(auto& layer, layers)
- layer.monitor_output().unlink_target(&monitor_subject_);
+ layer.monitor_output().detach_parent();
BOOST_FOREACH(auto& layer, other_layers)
- layer.monitor_output().unlink_target(&monitor_subject_);
+ layer.monitor_output().detach_parent();
std::swap(layers_, other_impl->layers_);
BOOST_FOREACH(auto& layer, layers)
- layer.monitor_output().link_target(&monitor_subject_);
+ layer.monitor_output().attach_parent(monitor_subject_);
BOOST_FOREACH(auto& layer, other_layers)
- layer.monitor_output().link_target(&monitor_subject_);
+ layer.monitor_output().attach_parent(monitor_subject_);
};
return executor_.begin_invoke([=]
auto& my_layer = get_layer(index);
auto& other_layer = other_impl->get_layer(other_index);
- my_layer.monitor_output().unlink_target(&monitor_subject_);
- other_layer.monitor_output().unlink_target(&other_impl->monitor_subject_);
+ my_layer.monitor_output().detach_parent();
+ other_layer.monitor_output().detach_parent();
std::swap(my_layer, other_layer);
- my_layer.monitor_output().link_target(&monitor_subject_);
- other_layer.monitor_output().link_target(&other_impl->monitor_subject_);
+ my_layer.monitor_output().attach_parent(monitor_subject_);
+ other_layer.monitor_output().attach_parent(other_impl->monitor_subject_);
};
return executor_.begin_invoke([=]
boost::unique_future<boost::property_tree::wptree> stage::info() const{return impl_->info();}
boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}
std::map<int, class draw_frame> stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);}
-monitor::source& stage::monitor_output(){return impl_->monitor_subject_;}
+monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;}
//void stage::subscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.subscribe(o);}
//void stage::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.unsubscribe(o);}
void stage::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); }
boost::unique_future<void> swap_layer(int index, int other_index);
boost::unique_future<void> swap_layer(int index, int other_index, stage& other);
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
// frame_observable
//void subscribe(const frame_observable::observer_ptr& o) override;
std::wstring text_producer::print() const { return impl_->print(); }
std::wstring text_producer::name() const { return impl_->name(); }
boost::property_tree::wptree text_producer::info() const { return impl_->info(); }
-monitor::source& text_producer::monitor_output() { return impl_->monitor_subject_; }
+monitor::subject& text_producer::monitor_output() { return impl_->monitor_subject_; }
binding<std::wstring>& text_producer::text() { return impl_->text(); }
binding<double>& text_producer::tracking() { return impl_->tracking(); }
const binding<double>& text_producer::current_bearing_y() const { return impl_->current_bearing_y(); }
std::wstring print() const override;
std::wstring name() const override;
boost::property_tree::wptree info() const override;
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
binding<std::wstring>& text();
binding<double>& tracking();
class transition_producer : public frame_producer_base
{
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
const field_mode mode_;
int current_frame_;
, source_producer_(frame_producer::empty())
, paused_(false)
{
- dest->monitor_output().link_target(&monitor_subject_);
+ dest->monitor_output().attach_parent(monitor_subject_);
CASPAR_LOG(info) << print() << L" Initialized";
}
source = source_producer_->last_frame();
});
- monitor_subject_ << monitor::message("/transition/frame") % current_frame_ % info_.duration
- << monitor::message("/transition/type") % [&]() -> std::string
+ *monitor_subject_ << monitor::message("/transition/frame") % current_frame_ % info_.duration
+ << monitor::message("/transition/type") % [&]() -> std::string
{
switch(info_.type.value())
{
return draw_frame::over(s_frame, d_frame);
}
- monitor::source& monitor_output()
+ monitor::subject& monitor_output()
{
- return monitor_subject_;
+ return *monitor_subject_;
}
void on_interaction(const interaction_event::ptr& event) override
struct video_channel::impl /* final */
{
- monitor::subject monitor_subject_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
const int index_;
executor executor_;
public:
impl(int index, const core::video_format_desc& format_desc, std::unique_ptr<image_mixer> image_mixer)
- : monitor_subject_("/channel" + boost::lexical_cast<std::string>(index))
+ : monitor_subject_(spl::make_shared<monitor::subject>(
+ "/channel/" + boost::lexical_cast<std::string>(index)))
, index_(index)
, format_desc_(format_desc)
, output_(graph_, format_desc, index)
graph_->set_text(print());
diagnostics::register_graph(graph_);
- output_.monitor_output().link_target(&monitor_subject_);
- stage_.monitor_output().link_target(&monitor_subject_);
+ output_.monitor_output().attach_parent(monitor_subject_);
+ stage_.monitor_output().attach_parent(monitor_subject_);
executor_.begin_invoke([=]{tick();});
graph_->set_value("tick-time", frame_timer.elapsed()*format_desc.fps*0.5);
- monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps)
+ *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps)
<< monitor::message("/format") % format_desc.name;
}
catch(...)
core::video_format_desc video_channel::video_format_desc() const{return impl_->video_format_desc();}
void core::video_channel::video_format_desc(const core::video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
boost::property_tree::wptree video_channel::info() const{return impl_->info();}
-monitor::source& video_channel::monitor_output(){return impl_->monitor_subject_;}
+monitor::subject& video_channel::monitor_output(){return *impl_->monitor_subject_;}
}}
\ No newline at end of file
// Methods
- monitor::source& monitor_output();
+ monitor::subject& monitor_output();
// Properties
struct bluefish_consumer_proxy : public core::frame_consumer
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
std::unique_ptr<bluefish_consumer> consumer_;
const int device_index_;
return 400 + device_index_;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
struct decklink_consumer_proxy : public core::frame_consumer
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
const configuration config_;
std::unique_ptr<decklink_consumer> consumer_;
executor executor_;
return 300 + config_.device_index;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
class decklink_producer : boost::noncopyable, public IDeckLinkInputCallback
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
spl::shared_ptr<diagnostics::graph> graph_;
boost::timer tick_timer_;
video_frame->interlaced_frame = in_format_desc_.field_mode != core::field_mode::progressive;
video_frame->top_field_first = in_format_desc_.field_mode == core::field_mode::upper ? 1 : 0;
- monitor_subject_ << monitor::message("/file/name") % model_name_
- << monitor::message("/file/path") % device_index_
- << monitor::message("/file/video/width") % video->GetWidth()
- << monitor::message("/file/video/height") % video->GetHeight()
- << monitor::message("/file/video/field") % u8(!video_frame->interlaced_frame ? "progressive" : (video_frame->top_field_first ? "upper" : "lower"))
- << monitor::message("/file/audio/sample-rate") % 48000
- << monitor::message("/file/audio/channels") % 2
- << monitor::message("/file/audio/format") % u8(av_get_sample_fmt_name(AV_SAMPLE_FMT_S32))
- << monitor::message("/file/fps") % in_format_desc_.fps;
+ monitor_subject_
+ << core::monitor::message("/file/name") % model_name_
+ << core::monitor::message("/file/path") % device_index_
+ << core::monitor::message("/file/video/width") % video->GetWidth()
+ << core::monitor::message("/file/video/height") % video->GetHeight()
+ << core::monitor::message("/file/video/field") % u8(!video_frame->interlaced_frame ? "progressive" : (video_frame->top_field_first ? "upper" : "lower"))
+ << core::monitor::message("/file/audio/sample-rate") % 48000
+ << core::monitor::message("/file/audio/channels") % 2
+ << core::monitor::message("/file/audio/format") % u8(av_get_sample_fmt_name(AV_SAMPLE_FMT_S32))
+ << core::monitor::message("/file/fps") % in_format_desc_.fps;
// Audio
}
graph_->set_value("frame-time", frame_timer.elapsed()*out_format_desc_.fps*0.5);
- monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % out_format_desc_.fps;
+ monitor_subject_ << core::monitor::message("/profiler/time") % frame_timer.elapsed() % out_format_desc_.fps;
graph_->set_value("output-buffer", static_cast<float>(frame_buffer_.size())/static_cast<float>(frame_buffer_.capacity()));
- monitor_subject_ << monitor::message("/buffer") % frame_buffer_.size() % frame_buffer_.capacity();
+ monitor_subject_ << core::monitor::message("/buffer") % frame_buffer_.size() % frame_buffer_.capacity();
}
catch(...)
{
return model_name_ + L" [" + boost::lexical_cast<std::wstring>(device_index_) + L"|" + in_format_desc_.name + L"]";
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
});
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return producer_->monitor_output();
}
const std::shared_ptr<AVFormatContext> oc_;
const core::video_format_desc format_desc_;
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
tbb::spin_mutex exception_mutex_;
std::exception_ptr exception_;
return L"ffmpeg[" + u16(filename_) + L"]";
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
av_frame->top_field_first = format_desc_.field_mode == core::field_mode::upper;
av_frame->pts = frame_number_++;
- monitor_subject_ << monitor::message("/frame") % static_cast<int64_t>(frame_number_)
- % static_cast<int64_t>(std::numeric_limits<int64_t>::max());
+ monitor_subject_ << core::monitor::message("/frame")
+ % static_cast<int64_t>(frame_number_)
+ % static_cast<int64_t>(std::numeric_limits<int64_t>::max());
AVPacket pkt;
av_init_packet(&pkt);
return 200;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return consumer_->monitor_output();
}
struct audio_decoder::impl : boost::noncopyable
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
input* input_;
int index_;
const spl::shared_ptr<AVCodecContext> codec_context_;
frame->nb_samples = channel_samples;
frame->format = AV_SAMPLE_FMT_S32;
- monitor_subject_ << monitor::message("/file/audio/sample-rate") % codec_context_->sample_rate
- << monitor::message("/file/audio/channels") % codec_context_->channels
- << monitor::message("/file/audio/format") % u8(av_get_sample_fmt_name(codec_context_->sample_fmt))
- << monitor::message("/file/audio/codec") % u8(codec_context_->codec->long_name);
+ monitor_subject_ << core::monitor::message("/file/audio/sample-rate") % codec_context_->sample_rate
+ << core::monitor::message("/file/audio/channels") % codec_context_->channels
+ << core::monitor::message("/file/audio/format") % u8(av_get_sample_fmt_name(codec_context_->sample_fmt))
+ << core::monitor::message("/file/audio/codec") % u8(codec_context_->codec->long_name);
return frame;
}
std::shared_ptr<AVFrame> audio_decoder::operator()(){return impl_->poll();}
uint32_t audio_decoder::nb_frames() const{return impl_->nb_frames();}
std::wstring audio_decoder::print() const{return impl_->print();}
-monitor::source& audio_decoder::monitor_output() { return impl_->monitor_subject_;}
+core::monitor::subject& audio_decoder::monitor_output() { return impl_->monitor_subject_;}
}}
\ No newline at end of file
std::wstring print() const;
- monitor::source& monitor_output();
+ core::monitor::subject& monitor_output();
private:
struct impl;
#include <queue>
namespace caspar { namespace ffmpeg {
-
+
+std::wstring get_relative_or_original(
+ const std::wstring& filename,
+ const boost::filesystem::wpath& relative_to)
+{
+ boost::filesystem::wpath file(filename);
+ auto result = file.filename().wstring();
+
+ boost::filesystem::wpath current_path = file;
+
+ while (true)
+ {
+ current_path = current_path.parent_path();
+
+ if (boost::filesystem::equivalent(current_path, relative_to))
+ break;
+
+ if (current_path.empty())
+ return filename;
+
+ result = current_path.filename().wstring() + L"/" + result;
+ }
+
+ return result;
+}
+
struct ffmpeg_producer : public core::frame_producer_base
{
- monitor::subject monitor_subject_;
+ spl::shared_ptr<core::monitor::subject> monitor_subject_;
const std::wstring filename_;
+ const std::wstring path_relative_to_media_;
const spl::shared_ptr<diagnostics::graph> graph_;
uint32_t start,
uint32_t length)
: filename_(filename)
+ , path_relative_to_media_(get_relative_or_original(filename, env::media_folder()))
, frame_factory_(frame_factory)
, format_desc_(format_desc)
, input_(graph_, filename_, loop, start, length)
try
{
video_decoder_.reset(new video_decoder(input_));
- video_decoder_->monitor_output().link_target(&monitor_subject_);
+ video_decoder_->monitor_output().attach_parent(monitor_subject_);
constraints_.width.set(video_decoder_->width());
constraints_.height.set(video_decoder_->height());
try
{
audio_decoder_ .reset(new audio_decoder(input_, format_desc_));
- audio_decoder_->monitor_output().link_target(&monitor_subject_);
+ audio_decoder_->monitor_output().attach_parent(monitor_subject_);
CASPAR_LOG(info) << print() << L" " << audio_decoder_->print();
}
graph_->set_tag("underflow");
graph_->set_value("frame-time", frame_timer.elapsed()*format_desc_.fps*0.5);
- monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps);
-
- monitor_subject_ << monitor::message("/file/time") % (file_frame_number()/fps_)
- % (file_nb_frames()/fps_)
- << monitor::message("/file/frame") % static_cast<int32_t>(file_frame_number())
- % static_cast<int32_t>(file_nb_frames())
- << monitor::message("/file/fps") % fps_
- << monitor::message("/file/path") % filename_
- << monitor::message("/loop") % input_.loop();
+ *monitor_subject_
+ << core::monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps);
+ *monitor_subject_
+ << core::monitor::message("/file/time") % (file_frame_number()/fps_)
+ % (file_nb_frames()/fps_)
+ << core::monitor::message("/file/frame") % static_cast<int32_t>(file_frame_number())
+ % static_cast<int32_t>(file_nb_frames())
+ << core::monitor::message("/file/fps") % fps_
+ << core::monitor::message("/file/path") % path_relative_to_media_
+ << core::monitor::message("/loop") % input_.loop();
return frame;
}
return info;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
- return monitor_subject_;
+ return *monitor_subject_;
}
// ffmpeg_producer
struct video_decoder::impl : boost::noncopyable
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
input* input_;
int index_;
const spl::shared_ptr<AVCodecContext> codec_context_;
if(frame->repeat_pict > 0)
CASPAR_LOG(warning) << "[video_decoder] repeat_pict not implemented.";
- monitor_subject_ << monitor::message("/file/video/width") % width_
- << monitor::message("/file/video/height") % height_
- << monitor::message("/file/video/field") % u8(!frame->interlaced_frame ? "progressive" : (frame->top_field_first ? "upper" : "lower"))
- << monitor::message("/file/video/codec") % u8(codec_context_->codec->long_name);
+ monitor_subject_ << core::monitor::message("/file/video/width") % width_
+ << core::monitor::message("/file/video/height") % height_
+ << core::monitor::message("/file/video/field") % u8(!frame->interlaced_frame ? "progressive" : (frame->top_field_first ? "upper" : "lower"))
+ << core::monitor::message("/file/video/codec") % u8(codec_context_->codec->long_name);
return frame;
}
uint32_t video_decoder::file_frame_number() const{return impl_->file_frame_number_;}
bool video_decoder::is_progressive() const{return impl_->is_progressive_;}
std::wstring video_decoder::print() const{return impl_->print();}
-monitor::source& video_decoder::monitor_output() { return impl_->monitor_subject_; }
+core::monitor::subject& video_decoder::monitor_output() { return impl_->monitor_subject_; }
}}
\ No newline at end of file
std::wstring print() const;
- monitor::source& monitor_output();
+ core::monitor::subject& monitor_output();
private:
struct impl;
return L"";
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return flash_producer_->monitor_output();
}
std::wstring cg_proxy::invoke(int layer, const std::wstring& label){return impl_->timed_invoke(layer, label);}
std::wstring cg_proxy::description(int layer){return impl_->timed_description(layer);}
std::wstring cg_proxy::template_host_info(){return impl_->timed_template_host_info();}
-monitor::source& cg_proxy::monitor_output(){return impl_->monitor_output();}
+core::monitor::subject& cg_proxy::monitor_output(){return impl_->monitor_output();}
}}
\ No newline at end of file
std::wstring invoke(int layer, const std::wstring& label);
std::wstring description(int layer);
std::wstring template_host_info();
- monitor::source& monitor_output();
+ core::monitor::subject& monitor_output();
private:
struct impl;
}
} com_init_;
- monitor::subject& monitor_subject_;
+ core::monitor::subject& monitor_subject_;
const std::wstring filename_;
const int height_;
public:
- flash_renderer(monitor::subject& monitor_subject, const spl::shared_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height)
+ flash_renderer(core::monitor::subject& monitor_subject, const spl::shared_ptr<diagnostics::graph>& graph, const std::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& filename, int width, int height)
: monitor_subject_(monitor_subject)
, graph_(graph)
, filename_(filename)
graph_->set_tag("sync");
graph_->set_value("sync", sync);
- monitor_subject_ << monitor::message("/sync") % sync;
+ monitor_subject_ << core::monitor::message("/sync") % sync;
ax_->Tick();
}
graph_->set_value("frame-time", static_cast<float>(frame_timer.elapsed()/frame_time)*0.5f);
- monitor_subject_ << monitor::message("/renderer/profiler/time") % frame_timer.elapsed() % frame_time;
+ monitor_subject_ << core::monitor::message("/renderer/profiler/time") % frame_timer.elapsed() % frame_time;
return head_;
}
struct flash_producer : public core::frame_producer_base
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
const std::wstring filename_;
const spl::shared_ptr<core::frame_factory> frame_factory_;
const core::video_format_desc format_desc_;
else
graph_->set_tag("late-frame");
- monitor_subject_ << monitor::message("/host/path") % filename_
- << monitor::message("/host/width") % width_
- << monitor::message("/host/height") % height_
- << monitor::message("/host/fps") % fps_
- << monitor::message("/buffer") % output_buffer_.size() % buffer_size_;
+ monitor_subject_ << core::monitor::message("/host/path") % filename_
+ << core::monitor::message("/host/width") % width_
+ << core::monitor::message("/host/height") % height_
+ << core::monitor::message("/host/fps") % fps_
+ << core::monitor::message("/buffer") % output_buffer_.size() % buffer_size_;
return last_frame_ = frame;
}
return info;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
}
graph_->set_value("tick-time", static_cast<float>(tick_timer_.elapsed()/fps_)*0.5f);
- monitor_subject_ << monitor::message("/profiler/time") % tick_timer_.elapsed() % fps_;
+ monitor_subject_ << core::monitor::message("/profiler/time") % tick_timer_.elapsed() % fps_;
output_buffer_.push(std::move(frame_buffer_.front()));
frame_buffer_.pop();
struct image_consumer : public core::frame_consumer
{
- monitor::subject monitor_subject_;
- std::wstring filename_;
+ core::monitor::subject monitor_subject_;
+ std::wstring filename_;
public:
// frame_consumer
return 100;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
struct image_producer : public core::frame_producer_base
{
- monitor::subject monitor_subject_;
- const std::wstring description_;
- const spl::shared_ptr<core::frame_factory> frame_factory_;
- core::draw_frame frame_;
- core::constraints constraints_;
+ core::monitor::subject monitor_subject_;
+ const std::wstring description_;
+ const spl::shared_ptr<core::frame_factory> frame_factory_;
+ core::draw_frame frame_;
+ core::constraints constraints_;
image_producer(const spl::shared_ptr<core::frame_factory>& frame_factory, const std::wstring& description)
: description_(description)
core::draw_frame receive_impl() override
{
- monitor_subject_ << monitor::message("/file/path") % description_;
+ monitor_subject_ << core::monitor::message("/file/path") % description_;
return frame_;
}
return info;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
struct image_scroll_producer : public core::frame_producer_base
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
const std::wstring filename_;
std::vector<core::draw_frame> frames_;
result = core::draw_frame::interlace(field1, field2, format_desc_.field_mode);
}
- monitor_subject_ << monitor::message("/file/path") % filename_
- << monitor::message("/delta") % delta_
- << monitor::message("/speed") % speed_;
+ monitor_subject_ << core::monitor::message("/file/path") % filename_
+ << core::monitor::message("/delta") % delta_
+ << core::monitor::message("/speed") % speed_;
return result;
}
}
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
struct oal_consumer : public core::frame_consumer
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
- spl::shared_ptr<diagnostics::graph> graph_;
- boost::timer perf_timer_;
- int channel_index_;
+ spl::shared_ptr<diagnostics::graph> graph_;
+ boost::timer perf_timer_;
+ int channel_index_;
- core::video_format_desc format_desc_;
+ core::video_format_desc format_desc_;
- ALuint source_;
- std::array<ALuint, 3> buffers_;
+ ALuint source_;
+ std::array<ALuint, 3> buffers_;
- executor executor_;
+ executor executor_;
public:
oal_consumer()
return 500;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
class reroute_producer : public reactive::observer<std::map<int, core::draw_frame>>
, public core::frame_producer_base
{
- monitor::subject monitor_subject_;
+ core::monitor::subject monitor_subject_;
core::constraints constraints_;
const spl::shared_ptr<diagnostics::graph> graph_;
return info;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
struct screen_consumer_proxy : public core::frame_consumer
{
- monitor::subject monitor_subject_;
- const configuration config_;
- std::unique_ptr<screen_consumer> consumer_;
- core::interaction_sink* sink_;
+ core::monitor::subject monitor_subject_;
+ const configuration config_;
+ std::unique_ptr<screen_consumer> consumer_;
+ core::interaction_sink* sink_;
public:
return 600 + (config_.key_only ? 10 : 0) + config_.screen_index;
}
- monitor::source& monitor_output()
+ core::monitor::subject& monitor_output()
{
return monitor_subject_;
}
--- /dev/null
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../StdAfx.h"
+
+#include "io_service_manager.h"
+
+#include <memory>
+
+#include <boost/asio/io_service.hpp>
+#include <boost/thread/thread.hpp>
+
+#include <common/except.h>
+
+namespace caspar { namespace protocol { namespace asio {
+
+struct io_service_manager::impl
+{
+ boost::asio::io_service service_;
+ // To keep the io_service::run() running although no pending async
+ // operations are posted.
+ std::unique_ptr<boost::asio::io_service::work> work_;
+ boost::thread thread_;
+
+ impl()
+ : work_(new boost::asio::io_service::work(service_))
+ , thread_([this] { run(); })
+ {
+ }
+
+ void run()
+ {
+ win32_exception::ensure_handler_installed_for_thread("asio-thread");
+
+ service_.run();
+ }
+
+ ~impl()
+ {
+ work_.reset();
+ service_.stop();
+ thread_.join();
+ }
+};
+
+io_service_manager::io_service_manager()
+ : impl_(new impl)
+{
+}
+
+io_service_manager::~io_service_manager()
+{
+}
+
+boost::asio::io_service& io_service_manager::service()
+{
+ return impl_->service_;
+}
+
+}}}
--- /dev/null
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <memory>
+
+#include <boost/noncopyable.hpp>
+
+namespace boost { namespace asio {
+ class io_service;
+}}
+
+namespace caspar { namespace protocol { namespace asio {
+
+class io_service_manager : boost::noncopyable
+{
+public:
+ io_service_manager();
+ ~io_service_manager();
+ boost::asio::io_service& service();
+private:
+ struct impl;
+ std::unique_ptr<impl> impl_;
+};
+
+}}}
--- /dev/null
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#include "../stdafx.h"
+
+#include "client.h"
+
+#include "oscpack/OscOutboundPacketStream.h"
+#include "oscpack/OscHostEndianness.h"
+
+#include <common/utf.h>
+#include <common/except.h>
+#include <common/endian.h>
+
+#include <core/monitor/monitor.h>
+
+#include <functional>
+#include <vector>
+#include <unordered_map>
+
+#include <boost/asio.hpp>
+#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread.hpp>
+
+#include <tbb/spin_mutex.h>
+#include <tbb/cache_aligned_allocator.h>
+
+using namespace boost::asio::ip;
+
+namespace caspar { namespace protocol { namespace osc {
+
+template<typename T>
+struct no_init_proxy
+{
+ T value;
+
+ no_init_proxy()
+ {
+ static_assert(sizeof(no_init_proxy) == sizeof(T), "invalid size");
+ static_assert(__alignof(no_init_proxy) == __alignof(T), "invalid alignment");
+ }
+};
+
+typedef std::vector<no_init_proxy<char>, tbb::cache_aligned_allocator<no_init_proxy<char>>> byte_vector;
+
+template<typename T>
+struct param_visitor : public boost::static_visitor<void>
+{
+ T& o;
+
+ param_visitor(T& o)
+ : o(o)
+ {
+ }
+
+ void operator()(const bool value) {o << value;}
+ void operator()(const int32_t value) {o << static_cast<int64_t>(value);}
+ void operator()(const uint32_t value) {o << static_cast<int64_t>(value);}
+ void operator()(const int64_t value) {o << static_cast<int64_t>(value);}
+ void operator()(const uint64_t value) {o << static_cast<int64_t>(value);}
+ void operator()(const float value) {o << value;}
+ void operator()(const double value) {o << static_cast<float>(value);}
+ void operator()(const std::string& value) {o << value.c_str();}
+ void operator()(const std::wstring& value) {o << u8(value).c_str();}
+ void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
+};
+
+void write_osc_event(byte_vector& destination, const core::monitor::message& e)
+{
+ destination.resize(4096);
+
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginMessage(e.path().c_str());
+
+ param_visitor<decltype(o)> param_visitor(o);
+ BOOST_FOREACH(const auto& data, e.data())
+ boost::apply_visitor(param_visitor, data);
+
+ o << ::osc::EndMessage;
+
+ destination.resize(o.Size());
+}
+
+byte_vector write_osc_bundle_start()
+{
+ byte_vector destination;
+ destination.resize(16);
+
+ ::osc::OutboundPacketStream o(reinterpret_cast<char*>(destination.data()), static_cast<unsigned long>(destination.size()));
+ o << ::osc::BeginBundle();
+
+ destination.resize(o.Size());
+
+ return destination;
+}
+
+void write_osc_bundle_element_start(byte_vector& destination, const byte_vector& message)
+{
+ destination.resize(4);
+
+ int32_t* bundle_element_size = reinterpret_cast<int32_t*>(destination.data());
+
+#ifdef OSC_HOST_LITTLE_ENDIAN
+ *bundle_element_size = swap_byte_order(static_cast<int32_t>(message.size()));
+#else
+ *bundle_element_size = static_cast<int32_t>(bundle.size());
+#endif
+}
+
+struct client::impl : public spl::enable_shared_from_this<client::impl>, core::monitor::sink
+{
+ udp::socket socket_;
+ tbb::spin_mutex endpoints_mutex_;
+ std::map<udp::endpoint, int> reference_counts_by_endpoint_;
+
+ std::unordered_map<std::string, byte_vector> updates_;
+ boost::mutex updates_mutex_;
+ boost::condition_variable updates_cond_;
+
+ tbb::atomic<bool> is_running_;
+
+ boost::thread thread_;
+
+public:
+ impl(boost::asio::io_service& service)
+ : socket_(service, udp::v4())
+ , thread_(boost::bind(&impl::run, this))
+ {
+ }
+
+ ~impl()
+ {
+ is_running_ = false;
+
+ updates_cond_.notify_one();
+
+ thread_.join();
+ }
+
+ std::shared_ptr<void> get_subscription_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
+ {
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ ++reference_counts_by_endpoint_[endpoint];
+
+ std::weak_ptr<impl> weak_self = shared_from_this();
+
+ return std::shared_ptr<void>(nullptr, [weak_self, endpoint] (void*)
+ {
+ auto strong = weak_self.lock();
+
+ if (!strong)
+ return;
+
+ auto& self = *strong;
+
+ tbb::spin_mutex::scoped_lock lock(self.endpoints_mutex_);
+
+ int reference_count_after =
+ --self.reference_counts_by_endpoint_[endpoint];
+
+ if (reference_count_after == 0)
+ self.reference_counts_by_endpoint_.erase(endpoint);
+ });
+ }
+private:
+ void propagate(const core::monitor::message& msg)
+ {
+ boost::lock_guard<boost::mutex> lock(updates_mutex_);
+
+ try
+ {
+ write_osc_event(updates_[msg.path()], msg);
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ updates_.erase(msg.path());
+ }
+
+ updates_cond_.notify_one();
+ }
+
+ template<typename T>
+ void do_send(
+ const T& buffers, const std::vector<udp::endpoint>& destinations)
+ {
+ boost::system::error_code ec;
+
+ BOOST_FOREACH(const auto& endpoint, destinations)
+ socket_.send_to(buffers, endpoint, 0, ec);
+ }
+
+ void run()
+ {
+ // http://stackoverflow.com/questions/14993000/the-most-reliable-and-efficient-udp-packet-size
+ const int SAFE_DATAGRAM_SIZE = 508;
+
+ try
+ {
+ is_running_ = true;
+
+ std::unordered_map<std::string, byte_vector> updates;
+ std::vector<udp::endpoint> destinations;
+ const byte_vector bundle_header = write_osc_bundle_start();
+ std::vector<byte_vector> element_headers;
+
+ while (is_running_)
+ {
+ updates.clear();
+ destinations.clear();
+
+ {
+ boost::unique_lock<boost::mutex> cond_lock(updates_mutex_);
+
+ if (updates_.empty())
+ updates_cond_.wait(cond_lock);
+
+ std::swap(updates, updates_);
+ }
+
+ {
+ tbb::spin_mutex::scoped_lock lock(endpoints_mutex_);
+
+ BOOST_FOREACH(const auto& endpoint, reference_counts_by_endpoint_)
+ destinations.push_back(endpoint.first);
+ }
+
+ if (destinations.empty())
+ continue;
+
+ std::vector<boost::asio::const_buffers_1> buffers;
+ element_headers.resize(
+ std::max(element_headers.size(), updates.size()));
+
+ int i = 0;
+ auto datagram_size = bundle_header.size();
+ buffers.push_back(boost::asio::buffer(bundle_header));
+
+ BOOST_FOREACH(const auto& slot, updates)
+ {
+ write_osc_bundle_element_start(element_headers[i], slot.second);
+ const auto& headers = element_headers;
+
+ auto size_of_element = headers[i].size() + slot.second.size();
+
+ if (datagram_size + size_of_element >= SAFE_DATAGRAM_SIZE)
+ {
+ do_send(buffers, destinations);
+ buffers.clear();
+ buffers.push_back(boost::asio::buffer(bundle_header));
+ datagram_size = bundle_header.size();
+ }
+
+ buffers.push_back(boost::asio::buffer(headers[i]));
+ buffers.push_back(boost::asio::buffer(slot.second));
+
+ datagram_size += size_of_element;
+ ++i;
+ }
+
+ if (!buffers.empty())
+ do_send(buffers, destinations);
+ }
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ }
+};
+
+client::client(boost::asio::io_service& service)
+ : impl_(new impl(service))
+{
+}
+
+client::client(client&& other)
+ : impl_(std::move(other.impl_))
+{
+}
+
+client& client::operator=(client&& other)
+{
+ impl_ = std::move(other.impl_);
+ return *this;
+}
+
+client::~client()
+{
+}
+
+std::shared_ptr<void> client::get_subscription_token(
+ const boost::asio::ip::udp::endpoint& endpoint)
+{
+ return impl_->get_subscription_token(endpoint);
+}
+
+spl::shared_ptr<core::monitor::sink> client::sink()
+{
+ return impl_;
+}
+
+}}}
--- /dev/null
+/*
+* Copyright 2013 Sveriges Television AB http://casparcg.com/
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <boost/asio/ip/udp.hpp>
+#include <boost/noncopyable.hpp>
+
+#include <common/memory.h>
+#include <core/monitor/monitor.h>
+
+namespace caspar { namespace protocol { namespace osc {
+
+class client
+{
+ client(const client&);
+ client& operator=(const client&);
+public:
+
+ // Static Members
+
+ // Constructors
+
+ client(boost::asio::io_service& service);
+
+ client(client&&);
+
+ /**
+ * Get a subscription token that ensures that OSC messages are sent to the
+ * given endpoint as long as the token is alive. It will stop sending when
+ * the token is dropped unless another token to the same endpoint has
+ * previously been checked out.
+ *
+ * @param endpoint The UDP endpoint to send OSC messages to.
+ *
+ * @return The token. It is ok for the token to outlive the client
+ */
+ std::shared_ptr<void> get_subscription_token(
+ const boost::asio::ip::udp::endpoint& endpoint);
+
+ ~client();
+
+ // Methods
+
+ client& operator=(client&&);
+
+ // Properties
+
+ spl::shared_ptr<core::monitor::sink> sink();
+private:
+ struct impl;
+ spl::shared_ptr<impl> impl_;
+};
+
+}}}
+++ /dev/null
-#include "..\stdafx.h"
-
-#include "server.h"
-
-#include "oscpack/oscOutboundPacketStream.h"
-
-#include <functional>
-#include <vector>
-
-#include <boost/asio.hpp>
-#include <boost/foreach.hpp>
-#include <boost/thread.hpp>
-
-using namespace boost::asio::ip;
-
-namespace caspar { namespace protocol { namespace osc {
-
-template<typename T>
-struct param_visitor : public boost::static_visitor<void>
-{
- T& o;
-
- param_visitor(T& o)
- : o(o)
- {
- }
-
- void operator()(const bool value) {o << value;}
- void operator()(const int32_t value) {o << static_cast<int64_t>(value);}
- void operator()(const uint32_t value) {o << static_cast<int64_t>(value);}
- void operator()(const int64_t value) {o << static_cast<int64_t>(value);}
- void operator()(const uint64_t value) {o << static_cast<int64_t>(value);}
- void operator()(const float value) {o << value;}
- void operator()(const double value) {o << static_cast<float>(value);}
- void operator()(const std::string& value) {o << value.c_str();}
- void operator()(const std::wstring& value) {o << std::string(value.begin(), value.end()).c_str();}
- void operator()(const std::vector<int8_t>& value) {o << ::osc::Blob(value.data(), static_cast<unsigned long>(value.size()));}
-};
-
-std::vector<char> write_osc_event(const monitor::message& e)
-{
- std::array<char, 4096> buffer;
- ::osc::OutboundPacketStream o(buffer.data(), static_cast<unsigned long>(buffer.size()));
-
- o << ::osc::BeginMessage(e.path().c_str());
-
- param_visitor<decltype(o)> pd_visitor(o);
- BOOST_FOREACH(auto data, e.data())
- boost::apply_visitor(pd_visitor, data);
-
- o << ::osc::EndMessage;
-
- return std::vector<char>(o.Data(), o.Data() + o.Size());
-}
-
-struct server::impl
-{
- boost::asio::io_service service_;
-
- udp::endpoint endpoint_;
- udp::socket socket_;
-
- boost::thread thread_;
-
- Concurrency::call<monitor::message> on_next_;
-
-public:
- impl(udp::endpoint endpoint,
- Concurrency::ISource<monitor::message>& source)
- : endpoint_(endpoint)
- , socket_(service_, endpoint_.protocol())
- , thread_(std::bind(&boost::asio::io_service::run, &service_))
- , on_next_([this](const monitor::message& msg){ on_next(msg); })
- {
- source.link_target(&on_next_);
- }
-
- ~impl()
- {
- thread_.join();
- }
-
- void on_next(const monitor::message& msg)
- {
- auto data_ptr = spl::make_shared<std::vector<char>>(write_osc_event(msg));
- if(data_ptr->size() >0)
- socket_.async_send_to(boost::asio::buffer(*data_ptr),
- endpoint_,
- boost::bind(&impl::handle_send_to, this, data_ptr, //data_ptr need to stay alive
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
-
- void handle_send_to(spl::shared_ptr<std::vector<char>> data, const boost::system::error_code& /*error*/, size_t /*bytes_sent*/)
- {
- }
-};
-
-server::server(udp::endpoint endpoint,
- Concurrency::ISource<monitor::message>& source)
- : impl_(new impl(endpoint, source))
-{
-}
-
-server::server(server&& other)
- : impl_(std::move(other.impl_))
-{
-}
-
-server& server::operator=(server&& other)
-{
- impl_ = std::move(other.impl_);
- return *this;
-}
-
-server::~server()
-{
-}
-
-}}}
\ No newline at end of file
+++ /dev/null
-#pragma once
-
-#include <common/memory.h>
-
-#include <core/monitor/monitor.h>
-#include <boost/asio/ip/udp.hpp>
-
-namespace caspar { namespace protocol { namespace osc {
-
-class server
-{
- server(const server&);
- server& operator=(const server&);
-public:
-
- // Static Members
-
- // Constructors
-
- server(boost::asio::ip::udp::endpoint endpoint,
- Concurrency::ISource<monitor::message>& source);
-
- server(server&&);
-
- ~server();
-
- // Methods
-
- server& operator=(server&&);
-
- // Properties
-
-private:
- struct impl;
- std::unique_ptr<impl> impl_;
-};
-
-}}}
\ No newline at end of file
<ClInclude Include="amcp\AMCPCommandsImpl.h" />\r
<ClInclude Include="amcp\AMCPProtocolStrategy.h" />\r
<ClInclude Include="amcp\amcp_shared.h" />\r
+ <ClInclude Include="asio\io_service_manager.h" />\r
<ClInclude Include="cii\CIICommand.h" />\r
<ClInclude Include="cii\CIICommandsImpl.h" />\r
<ClInclude Include="cii\CIIProtocolStrategy.h" />\r
<ClInclude Include="clk\CLKProtocolStrategy.h" />\r
<ClInclude Include="clk\clk_commands.h" />\r
<ClInclude Include="clk\clk_command_processor.h" />\r
+ <ClInclude Include="osc\client.h" />\r
<ClInclude Include="osc\oscpack\MessageMappingOscPacketListener.h" />\r
<ClInclude Include="osc\oscpack\OscException.h" />\r
<ClInclude Include="osc\oscpack\OscHostEndianness.h" />\r
<ClInclude Include="osc\oscpack\OscPrintReceivedElements.h" />\r
<ClInclude Include="osc\oscpack\OscReceivedElements.h" />\r
<ClInclude Include="osc\oscpack\OscTypes.h" />\r
- <ClInclude Include="osc\server.h" />\r
<ClInclude Include="StdAfx.h" />\r
<ClInclude Include="util\AsyncEventServer.h" />\r
<ClInclude Include="util\ClientInfo.h" />\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
</ClCompile>\r
+ <ClCompile Include="asio\io_service_manager.cpp">\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+ </ClCompile>\r
<ClCompile Include="cii\CIICommandsImpl.cpp">\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
<PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
</ClCompile>\r
+ <ClCompile Include="osc\client.cpp">\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+ <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
+ </ClCompile>\r
<ClCompile Include="osc\oscpack\OscOutboundPacketStream.cpp">\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">NotUsing</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">NotUsing</PrecompiledHeader>\r
</ClCompile>\r
- <ClCompile Include="osc\server.cpp">\r
- <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
- <PrecompiledHeaderFile Condition="'$(Configuration)|$(Platform)'=='Release|x64'">../StdAfx.h</PrecompiledHeaderFile>\r
- </ClCompile>\r
<ClCompile Include="StdAfx.cpp">\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">Create</PrecompiledHeader>\r
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">Create</PrecompiledHeader>\r
<Filter Include="source\osc\oscpack">\r
<UniqueIdentifier>{d6dede65-7a93-4494-aa2d-d18d5267902c}</UniqueIdentifier>\r
</Filter>\r
+ <Filter Include="source\asio">\r
+ <UniqueIdentifier>{982ff369-f4a0-418b-b613-81ec322208c5}</UniqueIdentifier>\r
+ </Filter>\r
</ItemGroup>\r
<ItemGroup>\r
<ClInclude Include="amcp\AMCPCommand.h">\r
<ClInclude Include="osc\oscpack\OscException.h">\r
<Filter>source\osc\oscpack</Filter>\r
</ClInclude>\r
- <ClInclude Include="osc\server.h">\r
- <Filter>source\osc</Filter>\r
- </ClInclude>\r
<ClInclude Include="clk\clk_command_processor.h">\r
<Filter>source\clk</Filter>\r
</ClInclude>\r
<ClInclude Include="amcp\amcp_shared.h">\r
<Filter>source\amcp</Filter>\r
</ClInclude>\r
+ <ClInclude Include="osc\client.h">\r
+ <Filter>source\osc</Filter>\r
+ </ClInclude>\r
+ <ClInclude Include="asio\io_service_manager.h">\r
+ <Filter>source\asio</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
<ItemGroup>\r
<ClCompile Include="amcp\AMCPCommandQueue.cpp">\r
<ClCompile Include="osc\oscpack\OscTypes.cpp">\r
<Filter>source\osc\oscpack</Filter>\r
</ClCompile>\r
- <ClCompile Include="osc\server.cpp">\r
- <Filter>source\osc</Filter>\r
- </ClCompile>\r
<ClCompile Include="clk\clk_command_processor.cpp">\r
<Filter>source\clk</Filter>\r
</ClCompile>\r
<ClCompile Include="util\lock_container.cpp">\r
<Filter>source\util</Filter>\r
</ClCompile>\r
+ <ClCompile Include="osc\client.cpp">\r
+ <Filter>source\osc</Filter>\r
+ </ClCompile>\r
+ <ClCompile Include="asio\io_service_manager.cpp">\r
+ <Filter>source\asio</Filter>\r
+ </ClCompile>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
<port>5250</port>\r
<protocol>AMCP</protocol>\r
</tcp>\r
- <udp>\r
- <address>127.0.0.1</address>\r
- <port>6250</port>\r
- <protocol>OSC</protocol>\r
- </udp>\r
</controllers>\r
</configuration>\r
\r
</consumers>\r
</channel>\r
</channels>\r
+<osc>\r
+ <default-port>6250</default-port>\r
+ <predefined-clients>\r
+ <predefined-client>\r
+ <address>127.0.0.1</address>\r
+ <port>5253</port>\r
+ </predefined-client>\r
+ </predefined-clients>\r
+</osc>\r
-->\r
#include <modules/screen/consumer/screen_consumer.h>
#include <modules/ffmpeg/consumer/ffmpeg_consumer.h>
+#include <protocol/asio/io_service_manager.h>
#include <protocol/amcp/AMCPProtocolStrategy.h>
#include <protocol/cii/CIIProtocolStrategy.h>
#include <protocol/CLK/CLKProtocolStrategy.h>
#include <protocol/util/AsyncEventServer.h>
#include <protocol/util/strategy_adapters.h>
-#include <protocol/osc/server.h>
+#include <protocol/osc/client.h>
#include <boost/algorithm/string.hpp>
#include <boost/thread.hpp>
struct server::impl : boost::noncopyable
{
- monitor::subject monitor_subject_;
+ protocol::asio::io_service_manager io_service_manager_;
+ spl::shared_ptr<monitor::subject> monitor_subject_;
accelerator::accelerator accelerator_;
std::vector<spl::shared_ptr<IO::AsyncEventServer>> async_servers_;
- std::vector<osc::server> osc_servers_;
+ std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;
+ osc::client osc_client_;
+ std::vector<std::shared_ptr<void>> predefined_osc_subscriptions_;
std::vector<spl::shared_ptr<video_channel>> channels_;
std::shared_ptr<thumbnail_generator> thumbnail_generator_;
boost::promise<bool>& shutdown_server_now_;
explicit impl(boost::promise<bool>& shutdown_server_now)
- : accelerator_(env::properties().get(L"configuration.accelerator", L"auto")), shutdown_server_now_(shutdown_server_now)
+ : accelerator_(env::properties().get(L"configuration.accelerator", L"auto"))
+ , osc_client_(io_service_manager_.service())
+ , shutdown_server_now_(shutdown_server_now)
{
ffmpeg::init();
setup_controllers(env::properties());
CASPAR_LOG(info) << L"Initialized controllers.";
+
+ setup_osc(env::properties());
+ CASPAR_LOG(info) << L"Initialized osc.";
}
~impl()
- {
+ {
+ thumbnail_generator_.reset();
+ primary_amcp_server_.reset();
async_servers_.clear();
channels_.clear();
}
}
- channel->monitor_output().link_target(&monitor_subject_);
+ channel->monitor_output().attach_parent(monitor_subject_);
channels_.push_back(channel);
}
channels_.push_back(spl::make_shared<video_channel>(static_cast<int>(channels_.size()+1), core::video_format_desc(core::video_format::x576p2500), accelerator_.create_image_mixer()));
}
+ void setup_osc(const boost::property_tree::wptree& pt)
+ {
+ using boost::property_tree::wptree;
+ using namespace boost::asio::ip;
+
+ monitor_subject_->attach_parent(osc_client_.sink());
+
+ auto default_port =
+ pt.get<unsigned short>(L"configuration.osc.default-port", 6250);
+ auto predefined_clients =
+ pt.get_child_optional(L"configuration.osc.predefined-clients");
+
+ if (predefined_clients)
+ {
+ BOOST_FOREACH(auto& predefined_client, *predefined_clients)
+ {
+ const auto address =
+ predefined_client.second.get<std::wstring>(L"address");
+ const auto port =
+ predefined_client.second.get<unsigned short>(L"port");
+ predefined_osc_subscriptions_.push_back(
+ osc_client_.get_subscription_token(udp::endpoint(
+ address_v4::from_string(u8(address)),
+ port)));
+ }
+ }
+
+ if (primary_amcp_server_)
+ primary_amcp_server_->add_client_lifecycle_object_factory(
+ [=] (const std::string& ipv4_address)
+ -> std::pair<std::wstring, std::shared_ptr<void>>
+ {
+ using namespace boost::asio::ip;
+
+ return std::make_pair(
+ std::wstring(L"osc_subscribe"),
+ osc_client_.get_subscription_token(
+ udp::endpoint(
+ address_v4::from_string(
+ ipv4_address),
+ default_port)));
+ });
+ }
+
void setup_thumbnail_generation(const boost::property_tree::wptree& pt)
{
if (!pt.get(L"configuration.thumbnails.generate-thumbnails", true))
auto asyncbootstrapper = spl::make_shared<IO::AsyncEventServer>(create_protocol(protocol), port);
async_servers_.push_back(asyncbootstrapper);
- //TODO: remove - test
- asyncbootstrapper->add_client_lifecycle_object_factory([=] (const std::string& ipv4_address) {
- return std::pair<std::wstring, std::shared_ptr<void>>(L"log", std::shared_ptr<void>(nullptr, [] (void*)
- { CASPAR_LOG(info) << "Client disconnect (lifecycle)"; }));
- });
- }
- else if(name == L"udp")
- {
- const auto address = xml_controller.second.get(L"address", L"127.0.0.1");
- const auto port = xml_controller.second.get<unsigned short>(L"port", 6250);
-
- osc_servers_.push_back(osc::server(boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4::from_string(std::string(address.begin(), address.end())), port), monitor_subject_));
+ if (!primary_amcp_server_ && boost::iequals(protocol, L"AMCP"))
+ primary_amcp_server_ = asyncbootstrapper;
}
else
CASPAR_LOG(warning) << "Invalid controller: " << name;
return impl_->channels_;
}
std::shared_ptr<core::thumbnail_generator> server::get_thumbnail_generator() const {return impl_->thumbnail_generator_; }
-monitor::source& server::monitor_output() { return impl_->monitor_subject_; }
+core::monitor::subject& server::monitor_output() { return *impl_->monitor_subject_; }
}
\ No newline at end of file
const std::vector<spl::shared_ptr<core::video_channel>> channels() const;
std::shared_ptr<core::thumbnail_generator> get_thumbnail_generator() const;
- monitor::source& monitor_output();
+ core::monitor::subject& monitor_output();
private:
struct impl;
spl::shared_ptr<impl> impl_;