X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=core%2Fproducer%2Fstage.cpp;h=a0b16806d5aa03c348c696d7f191ba7b76aa0187;hb=086d3c7a704c92dd89a6e7b8e6576b9f1c216c2d;hp=bfc83750b7a6be137b43441e0a6dd07af33ec6ec;hpb=27ad13001cd92f83fa9682eab5841935b1e9ce58;p=casparcg diff --git a/core/producer/stage.cpp b/core/producer/stage.cpp index bfc83750b..a0b16806d 100644 --- a/core/producer/stage.cpp +++ b/core/producer/stage.cpp @@ -1,363 +1,493 @@ -/* -* Copyright (c) 2011 Sveriges Television AB -* -* This file is part of CasparCG (www.casparcg.com). -* -* CasparCG is free software: you can redistribute it and/or modify -* it under the terms of the GNU General Public License as published by -* the Free Software Foundation, either version 3 of the License, or -* (at your option) any later version. -* -* CasparCG is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with CasparCG. If not, see . -* -* Author: Robert Nagy, ronag89@gmail.com -*/ - -#include "../StdAfx.h" - -#include "stage.h" - -#include "layer.h" - -#include "../frame/draw_frame.h" -#include "../frame/frame_factory.h" - -#include -#include -#include - -#include - -#include -#include -#include -#include - -#include - -#include -#include -#include - -namespace caspar { namespace core { - -struct stage::impl : public std::enable_shared_from_this -{ - spl::shared_ptr graph_; - monitor::basic_subject event_subject_; - reactive::basic_subject> frames_subject_; - std::map layers_; - std::map tweens_; - executor executor_; -public: - impl(spl::shared_ptr graph) - : graph_(std::move(graph)) - , event_subject_("stage") - , executor_(L"stage") - { - graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f)); - } - - std::map operator()(const struct video_format_desc& format_desc) - { - boost::timer frame_timer; - - auto frames = executor_.invoke([=]() -> std::map - { - - std::map frames; - - try - { - std::vector indices; - - BOOST_FOREACH(auto& layer, layers_) - { - frames[layer.first] = draw_frame::empty(); - indices.push_back(layer.first); - } - - // WORKAROUND: Compiler doesn't seem to like lambda. - tbb::parallel_for_each(indices.begin(), indices.end(), std::bind(&stage::impl::draw, this, std::placeholders::_1, std::ref(format_desc), std::ref(frames))); - } - catch(...) - { - layers_.clear(); - CASPAR_LOG_CURRENT_EXCEPTION(); - } - - - return frames; - }); - - frames_subject_ << frames; - - graph_->set_value("produce-time", frame_timer.elapsed()*format_desc.fps*0.5); - event_subject_ << monitor::event("profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps); - - return frames; - } - - void draw(int index, const video_format_desc& format_desc, std::map& frames) - { - auto& layer = layers_[index]; - auto& tween = tweens_[index]; - - auto frame = layer.receive(format_desc); - auto frame1 = frame; - frame1.transform() *= tween.fetch_and_tick(1); - - if(format_desc.field_mode != core::field_mode::progressive) - { - auto frame2 = frame; - frame2.transform() *= tween.fetch_and_tick(1); - frame1 = core::draw_frame::interlace(frame1, frame2, format_desc.field_mode); - } - - frames[index] = frame1; - } - - layer& get_layer(int index) - { - auto it = layers_.find(index); - if(it == std::end(layers_)) - { - it = layers_.insert(std::make_pair(index, layer(index))).first; - it->second.subscribe(event_subject_); - } - return it->second; - } - - boost::unique_future apply_transforms(const std::vector>& transforms) - { - return executor_.begin_invoke([=] - { - BOOST_FOREACH(auto& transform, transforms) - { - auto src = tweens_[std::get<0>(transform)].fetch(); - auto dst = std::get<1>(transform)(src); - tweens_[std::get<0>(transform)] = tweened_transform(src, dst, std::get<2>(transform), std::get<3>(transform)); - } - }, task_priority::high_priority); - } - - boost::unique_future apply_transform(int index, const stage::transform_func_t& transform, unsigned int mix_duration, const tweener& tween) - { - return executor_.begin_invoke([=] - { - auto src = tweens_[index].fetch(); - auto dst = transform(src); - tweens_[index] = tweened_transform(src, dst, mix_duration, tween); - }, task_priority::high_priority); - } - - boost::unique_future clear_transforms(int index) - { - return executor_.begin_invoke([=] - { - tweens_.erase(index); - }, task_priority::high_priority); - } - - boost::unique_future clear_transforms() - { - return executor_.begin_invoke([=] - { - tweens_.clear(); - }, task_priority::high_priority); - } - - boost::unique_future load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta) - { - return executor_.begin_invoke([=] - { - get_layer(index).load(producer, preview, auto_play_delta); - }, task_priority::high_priority); - } - - boost::unique_future pause(int index) - { - return executor_.begin_invoke([=] - { - layers_[index].pause(); - }, task_priority::high_priority); - } - - boost::unique_future play(int index) - { - return executor_.begin_invoke([=] - { - layers_[index].play(); - }, task_priority::high_priority); - } - - boost::unique_future stop(int index) - { - return executor_.begin_invoke([=] - { - layers_[index].stop(); - }, task_priority::high_priority); - } - - boost::unique_future clear(int index) - { - return executor_.begin_invoke([=] - { - layers_.erase(index); - }, task_priority::high_priority); - } - - boost::unique_future clear() - { - return executor_.begin_invoke([=] - { - layers_.clear(); - }, task_priority::high_priority); - } - - boost::unique_future swap_layers(stage& other) - { - auto other_impl = other.impl_; - - if(other_impl.get() == this) - return async(launch::deferred, []{}); - - auto func = [=] - { - auto layers = layers_ | boost::adaptors::map_values; - auto other_layers = other_impl->layers_ | boost::adaptors::map_values; - - BOOST_FOREACH(auto& layer, layers) - layer.unsubscribe(event_subject_); - - BOOST_FOREACH(auto& layer, other_layers) - layer.unsubscribe(event_subject_); - - std::swap(layers_, other_impl->layers_); - - BOOST_FOREACH(auto& layer, layers) - layer.subscribe(event_subject_); - - BOOST_FOREACH(auto& layer, other_layers) - layer.subscribe(event_subject_); - }; - - return executor_.begin_invoke([=] - { - other_impl->executor_.invoke(func, task_priority::high_priority); - }, task_priority::high_priority); - } - - boost::unique_future swap_layer(int index, int other_index) - { - return executor_.begin_invoke([=] - { - std::swap(layers_[index], layers_[other_index]); - }, task_priority::high_priority); - } - - boost::unique_future swap_layer(int index, int other_index, stage& other) - { - auto other_impl = other.impl_; - - if(other_impl.get() == this) - return swap_layer(index, other_index); - else - { - auto func = [=] - { - auto& my_layer = get_layer(index); - auto& other_layer = other_impl->get_layer(other_index); - - my_layer.unsubscribe(event_subject_); - other_layer.unsubscribe(other_impl->event_subject_); - - std::swap(my_layer, other_layer); - - my_layer.subscribe(event_subject_); - other_layer.subscribe(other_impl->event_subject_); - }; - - return executor_.begin_invoke([=] - { - other_impl->executor_.invoke(func, task_priority::high_priority); - }, task_priority::high_priority); - } - } - - boost::unique_future> foreground(int index) - { - return executor_.begin_invoke([=] - { - return layers_[index].foreground(); - }, task_priority::high_priority); - } - - boost::unique_future> background(int index) - { - return executor_.begin_invoke([=] - { - return layers_[index].background(); - }, task_priority::high_priority); - } - - boost::unique_future info() - { - return executor_.begin_invoke([this]() -> boost::property_tree::wptree - { - boost::property_tree::wptree info; - BOOST_FOREACH(auto& layer, layers_) - info.add_child(L"layers.layer", layer.second.info()) - .add(L"index", layer.first); - return info; - }, task_priority::high_priority); - } - - boost::unique_future info(int index) - { - return executor_.begin_invoke([=] - { - return layers_[index].info(); - }, task_priority::high_priority); - } - - boost::unique_future call(int index, const std::wstring& params) - { - return flatten(executor_.begin_invoke([=] - { - return make_shared(layers_[index].foreground()->call(params)); - }, task_priority::high_priority)); - } -}; - -stage::stage(spl::shared_ptr graph) : impl_(new impl(std::move(graph))){} -boost::unique_future stage::call(int index, const std::wstring& params){return impl_->call(index, params);} -boost::unique_future stage::apply_transforms(const std::vector& transforms){return impl_->apply_transforms(transforms);} -boost::unique_future stage::apply_transform(int index, const std::function& transform, unsigned int mix_duration, const tweener& tween){return impl_->apply_transform(index, transform, mix_duration, tween);} -boost::unique_future stage::clear_transforms(int index){return impl_->clear_transforms(index);} -boost::unique_future stage::clear_transforms(){return impl_->clear_transforms();} -boost::unique_future stage::load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta){return impl_->load(index, producer, preview, auto_play_delta);} -boost::unique_future stage::pause(int index){return impl_->pause(index);} -boost::unique_future stage::play(int index){return impl_->play(index);} -boost::unique_future stage::stop(int index){return impl_->stop(index);} -boost::unique_future stage::clear(int index){return impl_->clear(index);} -boost::unique_future stage::clear(){return impl_->clear();} -boost::unique_future stage::swap_layers(stage& other){return impl_->swap_layers(other);} -boost::unique_future stage::swap_layer(int index, int other_index){return impl_->swap_layer(index, other_index);} -boost::unique_future stage::swap_layer(int index, int other_index, stage& other){return impl_->swap_layer(index, other_index, other);} -boost::unique_future> stage::foreground(int index) {return impl_->foreground(index);} -boost::unique_future> stage::background(int index) {return impl_->background(index);} -boost::unique_future stage::info() const{return impl_->info();} -boost::unique_future stage::info(int index) const{return impl_->info(index);} -std::map stage::operator()(const video_format_desc& format_desc){return (*impl_)(format_desc);} -void stage::subscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.subscribe(o);} -void stage::unsubscribe(const monitor::observable::observer_ptr& o) {impl_->event_subject_.unsubscribe(o);} -void stage::subscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.subscribe(o);} -void stage::unsubscribe(const frame_observable::observer_ptr& o) {impl_->frames_subject_.unsubscribe(o);} -}} \ No newline at end of file +/* +* Copyright (c) 2011 Sveriges Television AB +* +* This file is part of CasparCG (www.casparcg.com). +* +* CasparCG is free software: you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation, either version 3 of the License, or +* (at your option) any later version. +* +* CasparCG is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with CasparCG. If not, see . +* +* Author: Robert Nagy, ronag89@gmail.com +*/ + +#include "../StdAfx.h" + +#include "stage.h" + +#include "layer.h" + +#include "../frame/draw_frame.h" +#include "../frame/frame_factory.h" +#include "../interaction/interaction_aggregator.h" +#include "../consumer/write_frame_consumer.h" + +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include +#include + +namespace caspar { namespace core { + +struct stage::impl : public std::enable_shared_from_this +{ + int channel_index_; + spl::shared_ptr graph_; + spl::shared_ptr monitor_subject_ = spl::make_shared("/stage"); + std::map layers_; + std::map tweens_; + interaction_aggregator aggregator_; + // map of layer -> map of tokens (src ref) -> layer_consumer + std::map>> layer_consumers_; + executor executor_ { L"stage " + boost::lexical_cast(channel_index_) }; +public: + impl(int channel_index, spl::shared_ptr graph) + : channel_index_(channel_index) + , graph_(std::move(graph)) + , aggregator_([=] (double x, double y) { return collission_detect(x, y); }) + { + graph_->set_color("produce-time", diagnostics::color(0.0f, 1.0f, 0.0f)); + } + + std::map operator()(const video_format_desc& format_desc) + { + caspar::timer frame_timer; + + auto frames = executor_.invoke([=]() -> std::map + { + + std::map frames; + + try + { + std::vector indices; + + for (auto& layer : layers_) + { + // Prevent race conditions in parallel for each later + frames[layer.first] = draw_frame::empty(); + tweens_[layer.first]; + layer_consumers_[layer.first]; + + indices.push_back(layer.first); + } + + aggregator_.translate_and_send(); + + tbb::parallel_for_each(indices.begin(), indices.end(), [&](int index) + { + draw(index, format_desc, frames); + }); + } + catch(...) + { + layers_.clear(); + CASPAR_LOG_CURRENT_EXCEPTION(); + } + + + return frames; + }); + + //frames_subject_ << frames; + + graph_->set_value("produce-time", frame_timer.elapsed()*format_desc.fps*0.5); + *monitor_subject_ << monitor::message("/profiler/time") % frame_timer.elapsed() % (1.0/format_desc.fps); + + return frames; + } + + void draw(int index, const video_format_desc& format_desc, std::map& frames) + { + auto& layer = layers_[index]; + auto& tween = tweens_[index]; + auto& consumers = layer_consumers_[index]; + + auto frame = layer.receive(format_desc); + + if (!consumers.empty()) + { + auto consumer_it = consumers | boost::adaptors::map_values; + tbb::parallel_for_each(consumer_it.begin(), consumer_it.end(), [&](decltype(*consumer_it.begin()) layer_consumer) + { + layer_consumer->send(frame); + }); + } + + auto frame1 = frame; + + frame1.transform() *= tween.fetch_and_tick(1); + + if(format_desc.field_mode != core::field_mode::progressive) + { + auto frame2 = frame; + frame2.transform() *= tween.fetch_and_tick(1); + frame2.transform().audio_transform.volume = 0.0; + frame1 = core::draw_frame::interlace(frame1, frame2, format_desc.field_mode); + } + + frames[index] = frame1; + } + + layer& get_layer(int index) + { + auto it = layers_.find(index); + if(it == std::end(layers_)) + { + it = layers_.insert(std::make_pair(index, layer(index))).first; + it->second.monitor_output().attach_parent(monitor_subject_); + } + return it->second; + } + + std::future apply_transforms(const std::vector>& transforms) + { + return executor_.begin_invoke([=] + { + for (auto& transform : transforms) + { + auto& tween = tweens_[std::get<0>(transform)]; + auto src = tween.fetch(); + auto dst = std::get<1>(transform)(tween.dest()); + tweens_[std::get<0>(transform)] = tweened_transform(src, dst, std::get<2>(transform), std::get<3>(transform)); + } + }, task_priority::high_priority); + } + + std::future apply_transform(int index, const stage::transform_func_t& transform, unsigned int mix_duration, const tweener& tween) + { + return executor_.begin_invoke([=] + { + auto src = tweens_[index].fetch(); + auto dst = transform(src); + tweens_[index] = tweened_transform(src, dst, mix_duration, tween); + }, task_priority::high_priority); + } + + std::future clear_transforms(int index) + { + return executor_.begin_invoke([=] + { + tweens_.erase(index); + }, task_priority::high_priority); + } + + std::future clear_transforms() + { + return executor_.begin_invoke([=] + { + tweens_.clear(); + }, task_priority::high_priority); + } + + std::future get_current_transform(int index) + { + return executor_.begin_invoke([=] + { + return tweens_[index].fetch(); + }, task_priority::high_priority); + } + + std::future load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta) + { + return executor_.begin_invoke([=] + { + get_layer(index).load(producer, preview, auto_play_delta); + }, task_priority::high_priority); + } + + std::future pause(int index) + { + return executor_.begin_invoke([=] + { + get_layer(index).pause(); + }, task_priority::high_priority); + } + + std::future resume(int index) + { + return executor_.begin_invoke([=] + { + get_layer(index).resume(); + }, task_priority::high_priority); + } + + std::future play(int index) + { + return executor_.begin_invoke([=] + { + get_layer(index).play(); + }, task_priority::high_priority); + } + + std::future stop(int index) + { + return executor_.begin_invoke([=] + { + get_layer(index).stop(); + }, task_priority::high_priority); + } + + std::future clear(int index) + { + return executor_.begin_invoke([=] + { + layers_.erase(index); + }, task_priority::high_priority); + } + + std::future clear() + { + return executor_.begin_invoke([=] + { + layers_.clear(); + }, task_priority::high_priority); + } + + std::future swap_layers(stage& other, bool swap_transforms) + { + auto other_impl = other.impl_; + + if (other_impl.get() == this) + { + return make_ready_future(); + } + + auto func = [=] + { + auto layers = layers_ | boost::adaptors::map_values; + auto other_layers = other_impl->layers_ | boost::adaptors::map_values; + + for (auto& layer : layers) + layer.monitor_output().detach_parent(); + + for (auto& layer : other_layers) + layer.monitor_output().detach_parent(); + + std::swap(layers_, other_impl->layers_); + + for (auto& layer : layers) + layer.monitor_output().attach_parent(monitor_subject_); + + for (auto& layer : other_layers) + layer.monitor_output().attach_parent(monitor_subject_); + + if (swap_transforms) + std::swap(tweens_, other_impl->tweens_); + }; + + return executor_.begin_invoke([=] + { + other_impl->executor_.invoke(func, task_priority::high_priority); + }, task_priority::high_priority); + } + + std::future swap_layer(int index, int other_index, bool swap_transforms) + { + return executor_.begin_invoke([=] + { + std::swap(get_layer(index), get_layer(other_index)); + + if (swap_transforms) + std::swap(tweens_[index], tweens_[other_index]); + }, task_priority::high_priority); + } + + std::future swap_layer(int index, int other_index, stage& other, bool swap_transforms) + { + auto other_impl = other.impl_; + + if(other_impl.get() == this) + return swap_layer(index, other_index, swap_transforms); + else + { + auto func = [=] + { + auto& my_layer = get_layer(index); + auto& other_layer = other_impl->get_layer(other_index); + + my_layer.monitor_output().detach_parent(); + other_layer.monitor_output().detach_parent(); + + std::swap(my_layer, other_layer); + + my_layer.monitor_output().attach_parent(monitor_subject_); + other_layer.monitor_output().attach_parent(other_impl->monitor_subject_); + + if (swap_transforms) + { + auto& my_tween = tweens_[index]; + auto& other_tween = other_impl->tweens_[other_index]; + std::swap(my_tween, other_tween); + } + }; + + return executor_.begin_invoke([=] + { + other_impl->executor_.invoke(func, task_priority::high_priority); + }, task_priority::high_priority); + } + } + + void add_layer_consumer(void* token, int layer, const spl::shared_ptr& layer_consumer) + { + executor_.begin_invoke([=] + { + layer_consumers_[layer].insert(std::make_pair(token, layer_consumer)); + }, task_priority::high_priority); + } + + void remove_layer_consumer(void* token, int layer) + { + executor_.begin_invoke([=] + { + auto& layer_map = layer_consumers_[layer]; + layer_map.erase(token); + if (layer_map.empty()) + { + layer_consumers_.erase(layer); + } + }, task_priority::high_priority); + } + + std::future> foreground(int index) + { + return executor_.begin_invoke([=]() -> std::shared_ptr + { + return get_layer(index).foreground(); + }, task_priority::high_priority); + } + + std::future> background(int index) + { + return executor_.begin_invoke([=]() -> std::shared_ptr + { + return get_layer(index).background(); + }, task_priority::high_priority); + } + + std::future info() + { + return executor_.begin_invoke([this]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + for (auto& layer : layers_) + info.add_child(L"layers.layer", layer.second.info()) + .add(L"index", layer.first); + return info; + }, task_priority::high_priority); + } + + std::future info(int index) + { + return executor_.begin_invoke([=] + { + return get_layer(index).info(); + }, task_priority::high_priority); + } + + std::future delay_info() + { + return std::move(executor_.begin_invoke([this]() -> boost::property_tree::wptree + { + boost::property_tree::wptree info; + + for (auto& layer : layers_) + info.add_child(L"layer", layer.second.delay_info()).add(L"index", layer.first); + + return info; + }, task_priority::high_priority)); + } + + std::future delay_info(int index) + { + return std::move(executor_.begin_invoke([=]() -> boost::property_tree::wptree + { + return get_layer(index).delay_info(); + }, task_priority::high_priority)); + } + + std::future call(int index, const std::vector& params) + { + return flatten(executor_.begin_invoke([=] + { + return get_layer(index).foreground()->call(params).share(); + }, task_priority::high_priority)); + } + + void on_interaction(const interaction_event::ptr& event) + { + executor_.begin_invoke([=] + { + aggregator_.offer(event); + }, task_priority::high_priority); + } + + boost::optional collission_detect(double x, double y) + { + for (auto& layer : layers_ | boost::adaptors::reversed) + { + auto transform = tweens_[layer.first].fetch(); + auto translated = translate(x, y, transform); + + if (translated.first >= 0.0 + && translated.first <= 1.0 + && translated.second >= 0.0 + && translated.second <= 1.0 + && layer.second.collides(translated.first, translated.second)) + { + return std::make_pair(transform, static_cast(&layer.second)); + } + } + + return boost::optional(); + } +}; + +stage::stage(int channel_index, spl::shared_ptr graph) : impl_(new impl(channel_index, std::move(graph))){} +std::future stage::call(int index, const std::vector& params){return impl_->call(index, params);} +std::future stage::apply_transforms(const std::vector& transforms){ return impl_->apply_transforms(transforms); } +std::future stage::apply_transform(int index, const std::function& transform, unsigned int mix_duration, const tweener& tween){ return impl_->apply_transform(index, transform, mix_duration, tween); } +std::future stage::clear_transforms(int index){ return impl_->clear_transforms(index); } +std::future stage::clear_transforms(){ return impl_->clear_transforms(); } +std::future stage::get_current_transform(int index){ return impl_->get_current_transform(index); } +std::future stage::load(int index, const spl::shared_ptr& producer, bool preview, const boost::optional& auto_play_delta){ return impl_->load(index, producer, preview, auto_play_delta); } +std::future stage::pause(int index){ return impl_->pause(index); } +std::future stage::resume(int index){ return impl_->resume(index); } +std::future stage::play(int index){ return impl_->play(index); } +std::future stage::stop(int index){ return impl_->stop(index); } +std::future stage::clear(int index){ return impl_->clear(index); } +std::future stage::clear(){ return impl_->clear(); } +std::future stage::swap_layers(stage& other, bool swap_transforms){ return impl_->swap_layers(other, swap_transforms); } +std::future stage::swap_layer(int index, int other_index, bool swap_transforms){ return impl_->swap_layer(index, other_index, swap_transforms); } +std::future stage::swap_layer(int index, int other_index, stage& other, bool swap_transforms){ return impl_->swap_layer(index, other_index, other, swap_transforms); } +void stage::add_layer_consumer(void* token, int layer, const spl::shared_ptr& layer_consumer){ impl_->add_layer_consumer(token, layer, layer_consumer); } +void stage::remove_layer_consumer(void* token, int layer){ impl_->remove_layer_consumer(token, layer); }std::future> stage::foreground(int index) { return impl_->foreground(index); } +std::future> stage::background(int index) { return impl_->background(index); } +std::future stage::info() const{ return impl_->info(); } +std::future stage::info(int index) const{ return impl_->info(index); } +std::future stage::delay_info() const{ return impl_->delay_info(); } +std::future stage::delay_info(int index) const{ return impl_->delay_info(index); } +std::map stage::operator()(const video_format_desc& format_desc){ return (*impl_)(format_desc); } +monitor::subject& stage::monitor_output(){return *impl_->monitor_subject_;} +void stage::on_interaction(const interaction_event::ptr& event) { impl_->on_interaction(event); } +}}