X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=core%2Fproducer%2Fstage.cpp;h=54f30ff59542ba109eb8f24c904e2906ed846523;hb=c471e87cfc402308a0439642766d10a3b3dc4b20;hp=ae0b926518d8ff1c5620bae899f58c07c6ca554a;hpb=40ee6eeb1cc1486ba665c05ec256b8196952c73e;p=casparcg diff --git a/core/producer/stage.cpp b/core/producer/stage.cpp index ae0b92651..54f30ff59 100644 --- a/core/producer/stage.cpp +++ b/core/producer/stage.cpp @@ -28,14 +28,15 @@ #include "../frame/draw_frame.h" #include "../frame/frame_factory.h" #include "../interaction/interaction_aggregator.h" +#include "../consumer/write_frame_consumer.h" #include #include #include +#include #include -#include #include #include @@ -48,32 +49,33 @@ namespace caspar { namespace core { struct stage::impl : public std::enable_shared_from_this -{ - spl::shared_ptr graph_; - spl::shared_ptr monitor_subject_; - //reactive::basic_subject> frames_subject_; - std::map layers_; - std::map tweens_; - interaction_aggregator aggregator_; - executor executor_; +{ + int channel_index_; + spl::shared_ptr graph_; + spl::shared_ptr monitor_subject_ = spl::make_shared("/stage"); + std::map layers_; + std::map tweens_; + interaction_aggregator aggregator_; + // map of layer -> map of tokens (src ref) -> layer_consumer + std::map>> layer_consumers_; + executor executor_ { L"stage " + boost::lexical_cast(channel_index_) }; public: - impl(spl::shared_ptr graph) - : graph_(std::move(graph)) - , monitor_subject_(spl::make_shared("/stage")) + impl(int channel_index, spl::shared_ptr graph) + : channel_index_(channel_index) + , graph_(std::move(graph)) , aggregator_([=] (double x, double y) { return collission_detect(x, y); }) - , executor_(L"stage") { graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f)); } - std::map operator()(const struct video_format_desc& format_desc) + std::map operator()(const video_format_desc& format_desc) { - boost::timer frame_timer; + caspar::timer frame_timer; auto frames = executor_.invoke([=]() -> std::map { - std::map frames; + std::map frames; try { @@ -81,7 +83,11 @@ public: for (auto& layer : layers_) { - frames[layer.first] = draw_frame::empty(); + // Prevent race conditions in parallel for each later + frames[layer.first] = draw_frame::empty(); + tweens_[layer.first]; + layer_consumers_[layer.first]; + indices.push_back(layer.first); } @@ -114,9 +120,21 @@ public: { auto& layer = layers_[index]; auto& tween = tweens_[index]; - + auto& consumers = layer_consumers_[index]; + auto frame = layer.receive(format_desc); + + if (!consumers.empty()) + { + auto consumer_it = consumers | boost::adaptors::map_values; + tbb::parallel_for_each(consumer_it.begin(), consumer_it.end(), [&](decltype(*consumer_it.begin()) layer_consumer) + { + layer_consumer->send(frame); + }); + } + auto frame1 = frame; + frame1.transform() *= tween.fetch_and_tick(1); if(format_desc.field_mode != core::field_mode::progressive) @@ -146,8 +164,9 @@ public: { for (auto& transform : transforms) { - auto src = tweens_[std::get<0>(transform)].fetch(); - auto dst = std::get<1>(transform)(src); + auto& tween = tweens_[std::get<0>(transform)]; + auto src = tween.fetch(); + auto dst = std::get<1>(transform)(tween.dest()); tweens_[std::get<0>(transform)] = tweened_transform(src, dst, std::get<2>(transform), std::get<3>(transform)); } }, task_priority::high_priority); @@ -178,7 +197,15 @@ public: tweens_.clear(); }, task_priority::high_priority); } - + + std::future get_current_transform(int index) + { + return executor_.begin_invoke([=] + { + return tweens_[index].fetch(); + }, task_priority::high_priority); + } + std::future load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta) { return executor_.begin_invoke([=] @@ -195,6 +222,14 @@ public: }, task_priority::high_priority); } + std::future resume(int index) + { + return executor_.begin_invoke([=] + { + get_layer(index).resume(); + }, task_priority::high_priority); + } + std::future play(int index) { return executor_.begin_invoke([=] @@ -227,7 +262,7 @@ public: }, task_priority::high_priority); } - std::future swap_layers(stage& other) + std::future swap_layers(stage& other, bool swap_transforms) { auto other_impl = other.impl_; @@ -254,7 +289,10 @@ public: for (auto& layer : other_layers) layer.monitor_output().attach_parent(monitor_subject_); - }; + + if (swap_transforms) + std::swap(tweens_, other_impl->tweens_); + }; return executor_.begin_invoke([=] { @@ -262,20 +300,23 @@ public: }, task_priority::high_priority); } - std::future swap_layer(int index, int other_index) + std::future swap_layer(int index, int other_index, bool swap_transforms) { return executor_.begin_invoke([=] { std::swap(get_layer(index), get_layer(other_index)); + + if (swap_transforms) + std::swap(tweens_[index], tweens_[other_index]); }, task_priority::high_priority); } - std::future swap_layer(int index, int other_index, stage& other) + std::future swap_layer(int index, int other_index, stage& other, bool swap_transforms) { auto other_impl = other.impl_; if(other_impl.get() == this) - return swap_layer(index, other_index); + return swap_layer(index, other_index, swap_transforms); else { auto func = [=] @@ -290,6 +331,13 @@ public: my_layer.monitor_output().attach_parent(monitor_subject_); other_layer.monitor_output().attach_parent(other_impl->monitor_subject_); + + if (swap_transforms) + { + auto& my_tween = tweens_[index]; + auto& other_tween = other_impl->tweens_[other_index]; + std::swap(my_tween, other_tween); + } }; return executor_.begin_invoke([=] @@ -298,7 +346,28 @@ public: }, task_priority::high_priority); } } - + + void add_layer_consumer(void* token, int layer, const spl::shared_ptr& layer_consumer) + { + executor_.begin_invoke([=] + { + layer_consumers_[layer].insert(std::make_pair(token, layer_consumer)); + }, task_priority::high_priority); + } + + void remove_layer_consumer(void* token, int layer) + { + executor_.begin_invoke([=] + { + auto& layer_map = layer_consumers_[layer]; + layer_map.erase(token); + if (layer_map.empty()) + { + layer_consumers_.erase(layer); + } + }, task_priority::high_priority); + } + std::future> foreground(int index) { return executor_.begin_invoke([=]() -> std::shared_ptr @@ -333,7 +402,28 @@ public: { return get_layer(index).info(); }, task_priority::high_priority); - } + } + + std::future delay_info() + { + return std::move(executor_.begin_invoke([this]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + + for (auto& layer : layers_) + info.add_child(L"layer", layer.second.delay_info()).add(L"index", layer.first); + + return info; + }, task_priority::high_priority)); + } + + std::future delay_info(int index) + { + return std::move(executor_.begin_invoke([=]() -> boost::property_tree::wptree + { + return get_layer(index).delay_info(); + }, task_priority::high_priority)); + } std::future call(int index, const std::vector& params) { @@ -364,7 +454,7 @@ public: && translated.second <= 1.0 && layer.second.collides(translated.first, translated.second)) { - return std::make_pair(transform, &layer.second); + return std::make_pair(transform, static_cast(&layer.second)); } } @@ -372,28 +462,31 @@ public: } }; -stage::stage(spl::shared_ptr graph) : impl_(new impl(std::move(graph))){} +stage::stage(int channel_index, spl::shared_ptr graph) : impl_(new impl(channel_index, std::move(graph))){} std::future stage::call(int index, const std::vector& params){return impl_->call(index, params);} std::future stage::apply_transforms(const std::vector& transforms){ return impl_->apply_transforms(transforms); } std::future stage::apply_transform(int index, const std::function& transform, unsigned int mix_duration, const tweener& tween){ return impl_->apply_transform(index, transform, mix_duration, tween); } std::future stage::clear_transforms(int index){ return impl_->clear_transforms(index); } std::future stage::clear_transforms(){ return impl_->clear_transforms(); } +std::future stage::get_current_transform(int index){ return impl_->get_current_transform(index); } std::future stage::load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta){ return impl_->load(index, producer, preview, auto_play_delta); } std::future stage::pause(int index){ return impl_->pause(index); } +std::future stage::resume(int index){ return impl_->resume(index); } std::future stage::play(int index){ return impl_->play(index); } std::future stage::stop(int index){ return impl_->stop(index); } std::future stage::clear(int index){ return impl_->clear(index); } std::future stage::clear(){ return impl_->clear(); } -std::future stage::swap_layers(stage& other){ return impl_->swap_layers(other); } -std::future stage::swap_layer(int index, int other_index){ return impl_->swap_layer(index, other_index); } -std::future stage::swap_layer(int index, int other_index, stage& other){ return impl_->swap_layer(index, other_index, other); } -std::future> stage::foreground(int index) { return impl_->foreground(index); } +std::future stage::swap_layers(stage& other, bool swap_transforms){ return impl_->swap_layers(other, swap_transforms); } +std::future stage::swap_layer(int index, int other_index, bool swap_transforms){ return impl_->swap_layer(index, other_index, swap_transforms); } +std::future stage::swap_layer(int index, int other_index, stage& other, bool swap_transforms){ return impl_->swap_layer(index, other_index, other, swap_transforms); } +void stage::add_layer_consumer(void* token, int layer, const spl::shared_ptr& layer_consumer){ impl_->add_layer_consumer(token, layer, layer_consumer); } +void stage::remove_layer_consumer(void* token, int layer){ impl_->remove_layer_consumer(token, layer); }std::future> stage::foreground(int index) { return impl_->foreground(index); } std::future> stage::background(int index) { return impl_->background(index); } std::future stage::info() const{ return impl_->info(); } std::future stage::info(int index) const{ return impl_->info(index); } -std::map stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);} +std::future stage::delay_info() const{ return impl_->delay_info(); } +std::future stage::delay_info(int index) const{ return impl_->delay_info(index); } +std::map stage::operator()(const video_format_desc& format_desc){ return (*impl_)(format_desc); } monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;} -//void stage::subscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.subscribe(o);} -//void stage::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.unsubscribe(o);} void stage::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); } }}