return s;\r
}\r
\r
-}}\r
-\r
-namespace std {\r
- \r
-template <typename T, typename F1, typename F2>\r
-void swap(caspar::reactive::observer_function<T, F1>& lhs, caspar::reactive::observer_function<T, F2>& rhs) \r
-{\r
- lhs.swap(rhs);\r
-}\r
-\r
-template <typename I, typename O>\r
-void swap(caspar::reactive::basic_subject<I, O>& lhs, caspar::reactive::basic_subject<I, O>& rhs) \r
-{\r
- lhs.swap(rhs);\r
-}\r
-\r
-} // std\r
+}}
\ No newline at end of file
\r
typedef boost::chrono::duration<double, boost::ratio<1, 1>> duration;\r
\r
-typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::vector<int8_t>, duration> param;\r
+typedef boost::variant<bool, int32_t, int64_t, float, double, std::string, std::vector<int8_t>, duration> param;\r
\r
std::ostream& operator<<(std::ostream& o, const param& p);\r
\r
virtual ~subject()\r
{\r
}\r
+
+ subject& operator=(subject&& other)
+ {
+ other.swap(*this);
+ }
+
+ void swap(subject& other)
+ {
+ std::swap(path_, other.path_);
+ std::swap(impl_, other.impl_);
+ }
\r
virtual void subscribe(const observer_ptr& o) override\r
{ \r
{ \r
impl_->on_next(path_.empty() ? e : e.propagate(path_));\r
}\r
+\r
+ operator std::weak_ptr<observer>()\r
+ {\r
+ return impl_;\r
+ }\r
private:\r
monitor::path path_;\r
std::shared_ptr<impl> impl_;\r
return s;\r
}\r
\r
-}}\r
-\r
-namespace std {\r
-\r
-inline void swap(caspar::monitor::path& lhs, caspar::monitor::path& rhs) \r
-{\r
- lhs.swap(rhs);\r
-}\r
- \r
-inline void swap(caspar::monitor::event& lhs, caspar::monitor::event& rhs) \r
-{\r
- lhs.swap(rhs);\r
-}\r
-\r
-}\r
+}}
\ No newline at end of file
\r
class follow_producer_proxy : public producer_proxy_base\r
{ \r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
public:\r
follow_producer_proxy(spl::shared_ptr<frame_producer>&& producer) \r
: producer_proxy_base(std::move(producer))\r
\r
producer_->unsubscribe(event_subject_);\r
producer_ = std::move(following);\r
- event_subject_->subscribe(event_subject_);\r
+ producer_->subscribe(event_subject_);\r
}\r
\r
return receive(hints);\r
\r
virtual void subscribe(const monitor::observable::observer_ptr& o) override \r
{\r
- return event_subject_->subscribe(o);\r
+ event_subject_.subscribe(o);\r
}\r
\r
virtual void unsubscribe(const monitor::observable::observer_ptr& o) override \r
{\r
- return event_subject_->unsubscribe(o);\r
+ event_subject_.unsubscribe(o);\r
}\r
};\r
\r
\r
struct layer::impl\r
{ \r
- spl::shared_ptr<monitor::subject> event_subject_;\r
- spl::shared_ptr<monitor::subject> foreground_event_subject_;\r
- spl::shared_ptr<monitor::subject> background_event_subject_;\r
+ monitor::subject event_subject_;\r
+ monitor::subject foreground_event_subject_;\r
+ monitor::subject background_event_subject_;\r
spl::shared_ptr<frame_producer> foreground_;\r
spl::shared_ptr<frame_producer> background_;\r
int64_t frame_number_;\r
\r
public:\r
impl(int index) \r
- : event_subject_(new monitor::subject(monitor::path("layer") % index))\r
- , foreground_event_subject_(new monitor::subject(""))\r
- , background_event_subject_(new monitor::subject("background"))\r
+ : event_subject_(monitor::path("layer") % index)\r
+ , foreground_event_subject_("")\r
+ , background_event_subject_("background")\r
, foreground_(frame_producer::empty())\r
, background_(frame_producer::empty())\r
, frame_number_(0)\r
, is_paused_(false)\r
{\r
- foreground_event_subject_->subscribe(event_subject_);\r
- background_event_subject_->subscribe(event_subject_);\r
+ foreground_event_subject_.subscribe(event_subject_);\r
+ background_event_subject_.subscribe(event_subject_);\r
}\r
\r
void pause()\r
}\r
}\r
\r
- *event_subject_ << monitor::event("state") % u8(is_paused_ ? L"paused" : (foreground_ == frame_producer::empty() ? L"stopped" : L"playing")) \r
+ event_subject_ << monitor::event("state") % u8(is_paused_ ? L"paused" : (foreground_ == frame_producer::empty() ? L"stopped" : L"playing")) \r
<< monitor::event("time") % monitor::duration(frame_number_/format_desc.fps)\r
% monitor::duration(static_cast<int64_t>(foreground_->nb_frames()) - static_cast<int64_t>(auto_play_delta_ ? *auto_play_delta_ : 0)/format_desc.fps)\r
<< monitor::event("frame") % static_cast<int64_t>(frame_number_)\r
% static_cast<int64_t>((static_cast<int64_t>(foreground_->nb_frames()) - static_cast<int64_t>(auto_play_delta_ ? *auto_play_delta_ : 0)));\r
\r
- *foreground_event_subject_ << monitor::event("type") % u8(foreground_->name());\r
- *background_event_subject_ << monitor::event("type") % u8(foreground_->name());\r
+ foreground_event_subject_ << monitor::event("type") % u8(foreground_->name());\r
+ background_event_subject_ << monitor::event("type") % u8(foreground_->name());\r
\r
return frame;\r
}\r
spl::shared_ptr<frame_producer> layer::foreground() const { return impl_->foreground_;}\r
spl::shared_ptr<frame_producer> layer::background() const { return impl_->background_;}\r
boost::property_tree::wptree layer::info() const{return impl_->info();}\r
-void layer::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->subscribe(o);}\r
-void layer::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->unsubscribe(o);}\r
+void layer::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.subscribe(o);}\r
+void layer::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.unsubscribe(o);}\r
}}
\ No newline at end of file
struct stage::impl : public std::enable_shared_from_this<impl>\r
{ \r
spl::shared_ptr<diagnostics::graph> graph_;\r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
std::map<int, layer> layers_; \r
std::map<int, tweened_transform> tweens_; \r
executor executor_;\r
} \r
\r
graph_->set_value("produce-time", frame_timer.elapsed()*format_desc.fps*0.5);\r
- *event_subject_ << monitor::event("profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);\r
+ event_subject_ << monitor::event("profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps);\r
\r
return frames;\r
});\r
boost::unique_future<boost::property_tree::wptree> stage::info() const{return impl_->info();}\r
boost::unique_future<boost::property_tree::wptree> stage::info(int index) const{return impl_->info(index);}\r
std::map<int, spl::shared_ptr<class draw_frame>> stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);}\r
-void stage::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->subscribe(o);}\r
-void stage::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->unsubscribe(o);}\r
+void stage::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.subscribe(o);}\r
+void stage::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.unsubscribe(o);}\r
}}
\ No newline at end of file
\r
struct transition_producer : public frame_producer\r
{ \r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
const field_mode mode_;\r
int current_frame_;\r
\r
if(++current_frame_ >= info_.duration)\r
return draw_frame::eof();\r
\r
- *event_subject_ << monitor::event("transition/frame") % current_frame_ % info_.duration\r
+ event_subject_ << monitor::event("transition/frame") % current_frame_ % info_.duration\r
<< monitor::event("transition/type") % [&]() -> std::string\r
{\r
switch(info_.type.value())\r
\r
virtual void subscribe(const monitor::observable::observer_ptr& o) override \r
{\r
- return event_subject_->subscribe(o);\r
+ event_subject_.subscribe(o);\r
}\r
\r
virtual void unsubscribe(const monitor::observable::observer_ptr& o) override \r
{\r
- return event_subject_->unsubscribe(o);\r
+ event_subject_.unsubscribe(o);\r
}\r
};\r
\r
struct video_channel::impl sealed : public frame_factory\r
{\r
reactive::basic_subject<spl::shared_ptr<const data_frame>> frame_subject_;\r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
\r
const int index_;\r
\r
executor executor_;\r
public:\r
impl(int index, const core::video_format_desc& format_desc, spl::unique_ptr<image_mixer> image_mixer) \r
- : event_subject_(new monitor::subject(monitor::path() % "channel" % index))\r
+ : event_subject_(monitor::path() % "channel" % index)\r
, index_(index)\r
, format_desc_(format_desc)\r
, output_(graph_, format_desc, index)\r
\r
graph_->set_value("tick-time", frame_timer.elapsed()*format_desc.fps*0.5);\r
\r
- *event_subject_ << monitor::event("debug/profiler") % frame_timer.elapsed() % (1.0/format_desc_.fps);\r
- *event_subject_ << monitor::event("format") % u8(format_desc.name);\r
+ event_subject_ << monitor::event("debug/profiler") % frame_timer.elapsed() % (1.0/format_desc_.fps)\r
+ << monitor::event("format") % u8(format_desc.name);\r
}\r
catch(...)\r
{\r
boost::property_tree::wptree video_channel::info() const{return impl_->info();}\r
void video_channel::subscribe(const frame_observable::observer_ptr& o) {impl_->frame_subject_.subscribe(o);}\r
void video_channel::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frame_subject_.unsubscribe(o);} \r
-void video_channel::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->subscribe(o);}\r
-void video_channel::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_->unsubscribe(o);}\r
+void video_channel::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.subscribe(o);}\r
+void video_channel::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.unsubscribe(o);}\r
\r
}}
\ No newline at end of file
\r
struct audio_decoder::impl : boost::noncopyable\r
{ \r
+ monitor::subject event_subject_;\r
int index_;\r
- const spl::shared_ptr<AVCodecContext> codec_context_; \r
+ const spl::shared_ptr<AVCodecContext> codec_context_; \r
const core::video_format_desc format_desc_;\r
\r
audio_resampler resampler_;\r
\r
std::vector<int8_t, tbb::cache_aligned_allocator<int8_t>> buffer1_;\r
\r
- std::queue<spl::shared_ptr<AVPacket>> packets_;\r
+ std::queue<spl::shared_ptr<AVPacket>> packets_;\r
\r
const int64_t nb_frames_;\r
tbb::atomic<uint32_t> file_frame_number_;\r
\r
const auto n_samples = buffer1_.size() / av_get_bytes_per_sample(AV_SAMPLE_FMT_S32);\r
const auto samples = reinterpret_cast<int32_t*>(buffer1_.data());\r
+ \r
+ event_subject_ << monitor::event("file/audio/sample-rate") % codec_context_->sample_rate\r
+ << monitor::event("file/audio/channels") % codec_context_->channels\r
+ << monitor::event("file/audio/format") % u8(av_get_sample_fmt_name(codec_context_->sample_fmt))\r
+ << monitor::event("file/audio/codec") % u8(codec_context_->codec->long_name);\r
\r
++file_frame_number_;\r
\r
uint32_t audio_decoder::nb_frames() const{return impl_->nb_frames();}\r
uint32_t audio_decoder::file_frame_number() const{return impl_->file_frame_number_;}\r
std::wstring audio_decoder::print() const{return impl_->print();}\r
+void audio_decoder::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);}\r
+void audio_decoder::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);}\r
\r
}}
\ No newline at end of file
#pragma once\r
\r
#include <core/mixer/audio/audio_mixer.h>\r
+#include <core/monitor/monitor.h>\r
\r
#include <common/spl/memory.h>\r
\r
\r
namespace ffmpeg {\r
\r
-class audio_decoder : boost::noncopyable\r
+class audio_decoder : public monitor::observable\r
+ , boost::noncopyable\r
{\r
public:\r
explicit audio_decoder(const spl::shared_ptr<AVFormatContext>& context, const core::video_format_desc& format_desc);\r
uint32_t file_frame_number() const;\r
\r
std::wstring print() const;\r
+ \r
+ // monitor::observable\r
+ \r
+ virtual void subscribe(const monitor::observable::observer_ptr& o) override;\r
+ virtual void unsubscribe(const monitor::observable::observer_ptr& o) override;\r
+\r
private:\r
struct impl;\r
spl::shared_ptr<impl> impl_;\r
\r
struct ffmpeg_producer : public core::frame_producer\r
{\r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
const std::wstring filename_;\r
\r
const spl::shared_ptr<diagnostics::graph> graph_;\r
try\r
{\r
video_decoder_.reset(new video_decoder(input_.context()));\r
+ video_decoder_->subscribe(event_subject_);\r
CASPAR_LOG(info) << print() << L" " << video_decoder_->print();\r
}\r
catch(averror_stream_not_found&)\r
try\r
{\r
audio_decoder_.reset(new audio_decoder(input_.context(), frame_factory->video_format_desc()));\r
+ audio_decoder_->subscribe(event_subject_);\r
CASPAR_LOG(info) << print() << L" " << audio_decoder_->print();\r
}\r
catch(averror_stream_not_found&)\r
\r
graph_->set_value("frame-time", frame_timer.elapsed()*format_desc_.fps*0.5);\r
\r
- *event_subject_ << monitor::event("profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps) \r
+ event_subject_ << monitor::event("profiler/time") % frame_timer.elapsed() % (1.0/format_desc_.fps) \r
<< monitor::event("file/time") % monitor::duration(file_frame_number()/fps_) \r
% monitor::duration(file_nb_frames()/fps_)\r
<< monitor::event("file/frame") % static_cast<int32_t>(file_frame_number())\r
% static_cast<int32_t>(file_nb_frames())\r
<< monitor::event("file/fps") % fps_\r
- << monitor::event("file/video/mode") % u8(print_mode())\r
- << monitor::event("file/video/codec") % (video_decoder_ ? u8(video_decoder_->print()) : "n/a")\r
- << monitor::event("file/audio/codec") % (audio_decoder_ ? u8(audio_decoder_->print()) : "n/a")\r
<< monitor::event("filename") % u8(filename_)\r
<< monitor::event("loop") % input_.loop();\r
\r
\r
virtual void subscribe(const monitor::observable::observer_ptr& o) override\r
{\r
- event_subject_->subscribe(o);\r
+ event_subject_.subscribe(o);\r
}\r
\r
virtual void unsubscribe(const monitor::observable::observer_ptr& o) override\r
{\r
- event_subject_->unsubscribe(o);\r
+ event_subject_.unsubscribe(o);\r
}\r
\r
// ffmpeg_producer\r
\r
struct video_decoder::impl : boost::noncopyable\r
{\r
+ monitor::subject event_subject_;\r
int index_;\r
const spl::shared_ptr<AVCodecContext> codec_context_;\r
\r
\r
if(decoded_frame->repeat_pict > 0)\r
CASPAR_LOG(warning) << "[video_decoder] Field repeat_pict not implemented.";\r
- \r
+ \r
+ event_subject_ << monitor::event("file/video/width") % width_\r
+ << monitor::event("file/video/height") % height_\r
+ << monitor::event("file/video/field") % u8(!decoded_frame->interlaced_frame ? "progressive" : (decoded_frame->top_field_first ? "upper" : "lower"))\r
+ << monitor::event("file/video/codec") % u8(codec_context_->codec->long_name);\r
+\r
++file_frame_number_;\r
\r
return decoded_frame;\r
uint32_t video_decoder::file_frame_number() const{return impl_->file_frame_number_;}\r
bool video_decoder::is_progressive() const{return impl_->is_progressive_;}\r
std::wstring video_decoder::print() const{return impl_->print();}\r
+void video_decoder::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);}\r
+void video_decoder::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);}\r
\r
}}
\ No newline at end of file
#include <common/spl/memory.h>\r
#include <common/forward.h>\r
\r
+#include <core/monitor/monitor.h>\r
+\r
#include <boost/noncopyable.hpp>\r
\r
struct AVFormatContext;\r
\r
namespace caspar { namespace ffmpeg {\r
\r
-class video_decoder : boost::noncopyable\r
+class video_decoder : public monitor::observable\r
+ , boost::noncopyable\r
{\r
public:\r
explicit video_decoder(const spl::shared_ptr<AVFormatContext>& context);\r
bool is_progressive() const;\r
\r
std::wstring print() const;\r
+ \r
+ // monitor::observable\r
+ \r
+ virtual void subscribe(const monitor::observable::observer_ptr& o) override;\r
+ virtual void unsubscribe(const monitor::observable::observer_ptr& o) override;\r
\r
private:\r
struct impl;\r
\r
struct server::impl : boost::noncopyable\r
{\r
- spl::shared_ptr<monitor::subject> event_subject_;\r
+ monitor::subject event_subject_;\r
accelerator::accelerator accelerator_;\r
std::vector<spl::shared_ptr<IO::AsyncEventServer>> async_servers_; \r
std::vector<spl::shared_ptr<video_channel>> channels_;\r
{\r
return impl_->channels_;\r
}\r
-void server::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_->subscribe(o);}\r
-void server::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_->unsubscribe(o);}\r
+void server::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);}\r
+void server::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);}\r
\r
}
\ No newline at end of file