From 62223382349043462909cc6da53c969e0c3a2e3c Mon Sep 17 00:00:00 2001 From: ronag Date: Sun, 5 Feb 2012 17:45:36 +0000 Subject: [PATCH] 2.1.0: Refactored frame_consumer exception handling and recovery, git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches/2.1.0@2264 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d --- core/consumer/frame_consumer.cpp | 85 +++++++++++++++++++++++++------- core/consumer/output.cpp | 82 ++++++++++-------------------- core/producer/frame_producer.cpp | 33 +++++++------ core/video_channel.cpp | 37 ++++++++------ 4 files changed, 131 insertions(+), 106 deletions(-) diff --git a/core/consumer/frame_consumer.cpp b/core/consumer/frame_consumer.cpp index c27ff0a5e..77dddaefd 100644 --- a/core/consumer/frame_consumer.cpp +++ b/core/consumer/frame_consumer.cpp @@ -76,13 +76,13 @@ public: }); } - virtual bool send(const spl::shared_ptr& frame) { return (*consumer_)->send(frame);} - virtual void initialize(const struct video_format_desc& format_desc, int channel_index) { return (*consumer_)->initialize(format_desc, channel_index);} - virtual std::wstring print() const { return (*consumer_)->print();} - virtual boost::property_tree::wptree info() const { return (*consumer_)->info();} - virtual bool has_synchronization_clock() const { return (*consumer_)->has_synchronization_clock();} - virtual int buffer_depth() const { return (*consumer_)->buffer_depth();} - virtual int index() const { return (*consumer_)->index();} + virtual bool send(const spl::shared_ptr& frame) override {return (*consumer_)->send(frame);} + virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return (*consumer_)->initialize(format_desc, channel_index);} + virtual std::wstring print() const override {return (*consumer_)->print();} + virtual boost::property_tree::wptree info() const override {return (*consumer_)->info();} + virtual bool has_synchronization_clock() const override {return (*consumer_)->has_synchronization_clock();} + virtual int buffer_depth() const override {return (*consumer_)->buffer_depth();} + virtual int index() const override {return (*consumer_)->index();} }; class print_consumer_proxy : public frame_consumer @@ -103,13 +103,13 @@ public: CASPAR_LOG(info) << str << L" Uninitialized."; } - virtual bool send(const spl::shared_ptr& frame) { return consumer_->send(frame);} - virtual void initialize(const struct video_format_desc& format_desc, int channel_index) { return consumer_->initialize(format_desc, channel_index);} - virtual std::wstring print() const { return consumer_->print();} - virtual boost::property_tree::wptree info() const { return consumer_->info();} - virtual bool has_synchronization_clock() const { return consumer_->has_synchronization_clock();} - virtual int buffer_depth() const { return consumer_->buffer_depth();} - virtual int index() const { return consumer_->index();} + virtual bool send(const spl::shared_ptr& frame) override {return consumer_->send(frame);} + virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);} + virtual std::wstring print() const override {return consumer_->print();} + virtual boost::property_tree::wptree info() const override {return consumer_->info();} + virtual bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();} + virtual int buffer_depth() const override {return consumer_->buffer_depth();} + virtual int index() const override {return consumer_->index();} }; // This class is used to guarantee that audio cadence is correct. This is important for NTSC audio. @@ -151,9 +151,57 @@ public: return result; } + + virtual std::wstring print() const override {return consumer_->print();} + virtual boost::property_tree::wptree info() const override {return consumer_->info();} + virtual bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();} + virtual int buffer_depth() const override {return consumer_->buffer_depth();} + virtual int index() const override {return consumer_->index();} +}; + +class recover_consumer_proxy : public frame_consumer +{ + std::shared_ptr consumer_; + int channel_index_; + video_format_desc format_desc_; +public: + recover_consumer_proxy(spl::shared_ptr&& consumer) + : consumer_(std::move(consumer)) + { + } + + virtual bool send(const spl::shared_ptr& frame) + { + try + { + return consumer_->send(frame); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + try + { + consumer_->initialize(format_desc_, channel_index_); + return consumer_->send(frame); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + CASPAR_LOG(error) << print() << "Failed to recover consumer."; + return false; + } + } + } + + virtual void initialize(const struct video_format_desc& format_desc, int channel_index) + { + format_desc_ = format_desc; + channel_index_ = channel_index; + return consumer_->initialize(format_desc, channel_index); + } - virtual std::wstring print() const override {return consumer_->print(); } - virtual boost::property_tree::wptree info() const override {return consumer_->info(); } + virtual std::wstring print() const override {return consumer_->print();} + virtual boost::property_tree::wptree info() const override {return consumer_->info();} virtual bool has_synchronization_clock() const override {return consumer_->has_synchronization_clock();} virtual int buffer_depth() const override {return consumer_->buffer_depth();} virtual int index() const override {return consumer_->index();} @@ -183,8 +231,9 @@ spl::shared_ptr create_consumer(const std::vector( spl::make_shared( - spl::make_shared( - std::move(consumer)))); + spl::make_shared( + spl::make_shared( + std::move(consumer))))); } const spl::shared_ptr& frame_consumer::empty() diff --git a/core/consumer/output.cpp b/core/consumer/output.cpp index 6f72f6082..ff5a51fde 100644 --- a/core/consumer/output.cpp +++ b/core/consumer/output.cpp @@ -143,72 +143,40 @@ public: void operator()(spl::shared_ptr input_frame, const video_format_desc& format_desc) { executor_.invoke([=] - { - try - { - if(!has_synchronization_clock()) - sync_timer_.tick(1.0/format_desc_.fps); + { + if(!has_synchronization_clock()) + sync_timer_.tick(1.0/format_desc_.fps); - if(format_desc_ != format_desc) - set_video_format_desc(format_desc); + if(format_desc_ != format_desc) + set_video_format_desc(format_desc); - if(input_frame->image_data().size() != format_desc_.size) - { - sync_timer_.tick(1.0/format_desc_.fps); - return; - } + if(input_frame->image_data().size() != format_desc_.size) + { + sync_timer_.tick(1.0/format_desc_.fps); + return; + } - auto minmax = minmax_buffer_depth(); + auto minmax = minmax_buffer_depth(); - frames_.set_capacity(std::max(1, minmax.second - minmax.first) + 1); // std::max(1, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back. - frames_.push_back(input_frame); + frames_.set_capacity(std::max(1, minmax.second - minmax.first) + 1); // std::max(1, x) since we want to guarantee some pipeline depth for asycnhronous mixer read-back. + frames_.push_back(input_frame); - if(!frames_.full()) - return; + if(!frames_.full()) + return; - auto it = consumers_.begin(); - while(it != consumers_.end()) - { - auto consumer = it->second; - auto frame = frames_.at(consumer->buffer_depth()-minmax.first); + for(auto it = consumers_.begin(); it != consumers_.end();) + { + auto consumer = it->second; + auto frame = frames_.at(consumer->buffer_depth()-minmax.first); - try - { - if(consumer->send(frame)) - ++it; - else - { - CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed."; - consumers_.erase(it++); - } - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - try - { - consumer->initialize(format_desc_, channel_index_); - if(consumer->send(frame)) - ++it; - else - { - CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed."; - consumers_.erase(it++); - } - } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it."; - consumers_.erase(it++); - } - } + if(consumer->send(frame)) + ++it; + else + { + CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed."; + consumers_.erase(it++); } } - catch(...) - { - CASPAR_LOG_CURRENT_EXCEPTION(); - } }); } diff --git a/core/producer/frame_producer.cpp b/core/producer/frame_producer.cpp index b33e9b1ec..8ab918415 100644 --- a/core/producer/frame_producer.cpp +++ b/core/producer/frame_producer.cpp @@ -126,14 +126,14 @@ public: }); } - virtual spl::shared_ptr receive(int hints) override {return (*producer_)->receive(hints);} - virtual spl::shared_ptr last_frame() const override {return (*producer_)->last_frame();} - virtual std::wstring print() const override {return (*producer_)->print();} - virtual boost::property_tree::wptree info() const override {return (*producer_)->info();} - virtual boost::unique_future call(const std::wstring& str) override {return (*producer_)->call(str);} - virtual spl::shared_ptr get_following_producer() const override {return (*producer_)->get_following_producer();} - virtual void set_leading_producer(const spl::shared_ptr& producer) override {(*producer_)->set_leading_producer(producer);} - virtual uint32_t nb_frames() const override {return (*producer_)->nb_frames();} + virtual spl::shared_ptr receive(int hints) override {return (*producer_)->receive(hints);} + virtual spl::shared_ptr last_frame() const override {return (*producer_)->last_frame();} + virtual std::wstring print() const override {return (*producer_)->print();} + virtual boost::property_tree::wptree info() const override {return (*producer_)->info();} + virtual boost::unique_future call(const std::wstring& str) override {return (*producer_)->call(str);} + virtual spl::shared_ptr get_following_producer() const override {return (*producer_)->get_following_producer();} + virtual void set_leading_producer(const spl::shared_ptr& producer) override {return (*producer_)->set_leading_producer(producer);} + virtual uint32_t nb_frames() const override {return (*producer_)->nb_frames();} }; class print_producer_proxy : public frame_producer @@ -153,15 +153,16 @@ public: producer_.reset(); CASPAR_LOG(info) << str << L" Uninitialized."; } + - virtual spl::shared_ptr receive(int hints) override {return (producer_)->receive(hints);} - virtual spl::shared_ptr last_frame() const override {return (producer_)->last_frame();} - virtual std::wstring print() const override {return (producer_)->print();} - virtual boost::property_tree::wptree info() const override {return (producer_)->info();} - virtual boost::unique_future call(const std::wstring& str) override {return (producer_)->call(str);} - virtual spl::shared_ptr get_following_producer() const override {return (producer_)->get_following_producer();} - virtual void set_leading_producer(const spl::shared_ptr& producer) override {(producer_)->set_leading_producer(producer);} - virtual uint32_t nb_frames() const override {return (producer_)->nb_frames();} + virtual spl::shared_ptr receive(int hints) override {return producer_->receive(hints);} + virtual spl::shared_ptr last_frame() const override {return producer_->last_frame();} + virtual std::wstring print() const override {return producer_->print();} + virtual boost::property_tree::wptree info() const override {return producer_->info();} + virtual boost::unique_future call(const std::wstring& str) override {return producer_->call(str);} + virtual spl::shared_ptr get_following_producer() const override {return producer_->get_following_producer();} + virtual void set_leading_producer(const spl::shared_ptr& producer) override {return producer_->set_leading_producer(producer);} + virtual uint32_t nb_frames() const override {return producer_->nb_frames();} }; spl::shared_ptr do_create_producer(const spl::shared_ptr& my_frame_factory, const std::vector& params) diff --git a/core/video_channel.cpp b/core/video_channel.cpp index f3078c726..fd6f29711 100644 --- a/core/video_channel.cpp +++ b/core/video_channel.cpp @@ -105,35 +105,42 @@ public: void tick() { - tick_timer_.restart(); + try + { + tick_timer_.restart(); - // Produce + // Produce - produce_timer_.restart(); + produce_timer_.restart(); - auto stage_frames = (*stage_)(format_desc_); + auto stage_frames = (*stage_)(format_desc_); - graph_->set_value("produce-time", produce_timer_.elapsed()*format_desc_.fps*0.5); + graph_->set_value("produce-time", produce_timer_.elapsed()*format_desc_.fps*0.5); - // Mix + // Mix - //mix_timer_.restart(); + //mix_timer_.restart(); - auto mixed_frame = (*mixer_)(std::move(stage_frames), format_desc_); + auto mixed_frame = (*mixer_)(std::move(stage_frames), format_desc_); - //graph_->set_value("mix-time", mix_timer_.elapsed()*format_desc_.fps*0.5); + //graph_->set_value("mix-time", mix_timer_.elapsed()*format_desc_.fps*0.5); - // Consume + // Consume - consume_timer_.restart(); + consume_timer_.restart(); - frame_subject_.on_next(mixed_frame); + frame_subject_.on_next(mixed_frame); - graph_->set_value("consume-time", consume_timer_.elapsed()*format_desc_.fps*0.5); + graph_->set_value("consume-time", consume_timer_.elapsed()*format_desc_.fps*0.5); - (*output_)(std::move(mixed_frame), format_desc_); + (*output_)(std::move(mixed_frame), format_desc_); - graph_->set_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5); + graph_->set_value("tick-time", tick_timer_.elapsed()*format_desc_.fps*0.5); + } + catch(...) + { + CASPAR_LOG_CURRENT_EXCEPTION(); + } executor_.begin_invoke([=]{tick();}); } -- 2.39.5