\r
#include <core/mixer/read_frame.h>\r
\r
-#include <common/concurrency/executor.h>\r
+#include <common/concurrency/governor.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/memory/memclr.h>\r
#include <common/memory/memcpy.h>\r
\r
#include <tbb/concurrent_queue.h>\r
\r
+#include <agents.h>\r
#include <concrt_extras.h>\r
\r
#include <boost/timer.hpp>\r
#include <memory>\r
#include <array>\r
\r
+using namespace Concurrency;\r
+\r
namespace caspar { namespace bluefish { \r
\r
-struct bluefish_consumer : boost::noncopyable\r
+struct bluefish_consumer : public Concurrency::agent, boost::noncopyable\r
{\r
- safe_ptr<CBlueVelvet4> blue_;\r
- const unsigned int device_index_;\r
- const core::video_format_desc format_desc_;\r
-\r
- const std::wstring model_name_;\r
+ unbounded_buffer<safe_ptr<core::read_frame>> frames_;\r
\r
- safe_ptr<diagnostics::graph> graph_;\r
- boost::timer frame_timer_;\r
- boost::timer tick_timer_;\r
- boost::timer sync_timer_; \r
- \r
- unsigned int vid_fmt_;\r
+ safe_ptr<CBlueVelvet4> blue_;\r
+ const unsigned int device_index_;\r
+ const core::video_format_desc format_desc_;\r
\r
- std::array<blue_dma_buffer_ptr, 4> reserved_frames_; \r
- tbb::concurrent_bounded_queue<std::shared_ptr<core::read_frame>> frame_buffer_;\r
+ const std::wstring model_name_;\r
\r
- int preroll_count_;\r
+ safe_ptr<diagnostics::graph> graph_;\r
+ boost::timer frame_timer_;\r
+ boost::timer tick_timer_;\r
+ boost::timer sync_timer_; \r
+ \r
+ const unsigned int vid_fmt_;\r
\r
- const bool embedded_audio_;\r
- const bool key_only_;\r
+ std::array<blue_dma_buffer_ptr, 4> reserved_frames_; \r
\r
- executor executor_;\r
+ const bool embedded_audio_;\r
+ const bool key_only_;\r
+ \r
+ governor governor_;\r
+ tbb::atomic<bool> is_running_;\r
public:\r
bluefish_consumer(const core::video_format_desc& format_desc, unsigned int device_index, bool embedded_audio, bool key_only) \r
: blue_(create_blue(device_index))\r
, format_desc_(format_desc) \r
, model_name_(get_card_desc(*blue_))\r
, vid_fmt_(get_video_mode(*blue_, format_desc))\r
- , preroll_count_(0)\r
, embedded_audio_(embedded_audio)\r
, key_only_(key_only)\r
- , executor_(print())\r
+ , governor_(1)\r
{\r
- executor_.set_capacity(core::consumer_buffer_depth());\r
-\r
graph_->add_guide("tick-time", 0.5);\r
graph_->set_color("tick-time", diagnostics::color(0.0f, 0.6f, 0.9f)); \r
graph_->add_guide("frame-time", 0.5f); \r
{\r
return std::make_shared<blue_dma_buffer>(format_desc_.size, n++);\r
});\r
+\r
+ is_running_ = true;\r
+ start();\r
\r
CASPAR_LOG(info) << print() << L" Successfully Initialized.";\r
}\r
\r
~bluefish_consumer()\r
{\r
+ is_running_ = false;\r
+ governor_.cancel();\r
+ agent::wait(this);\r
try\r
- {\r
- executor_.invoke([&]\r
- {\r
- disable_video_output();\r
- blue_->device_detach(); \r
- });\r
+ { \r
+ disable_video_output();\r
+ blue_->device_detach(); \r
}\r
catch(...)\r
{\r
\r
void send(const safe_ptr<core::read_frame>& frame)\r
{ \r
- if(preroll_count_ < executor_.capacity())\r
- {\r
- while(preroll_count_++ < executor_.capacity())\r
- schedule_next_video(make_safe<core::read_frame>());\r
- }\r
- \r
- schedule_next_video(frame); \r
+ auto ticket = governor_.acquire();\r
+ Concurrency::send(frames_, safe_ptr<core::read_frame>(frame.get(), [frame, ticket](core::read_frame*){}));\r
}\r
\r
- void schedule_next_video(const safe_ptr<core::read_frame>& frame)\r
+ virtual void run()\r
{\r
static std::vector<int16_t> silence(MAX_HANC_BUFFER_SIZE, 0);\r
\r
- executor_.begin_invoke([=]\r
+ try\r
{\r
- try\r
+ while(is_running_)\r
{\r
+ auto frame = receive(frames_);\r
+\r
const size_t audio_samples = format_desc_.audio_samples_per_frame;\r
const size_t audio_nchannels = format_desc_.audio_channels;\r
\r
\r
sync_timer_.restart();\r
unsigned long n_field = 0;\r
- blue_->wait_output_video_synch(UPD_FMT_FRAME, n_field);\r
+ {\r
+ scoped_oversubcription_token oversubscribe;\r
+ blue_->wait_output_video_synch(UPD_FMT_FRAME, n_field);\r
+ }\r
graph_->update_value("sync-time", static_cast<float>(sync_timer_.elapsed()*format_desc_.fps*0.5));\r
\r
// Send and display\r
graph_->update_value("frame-time", static_cast<float>(frame_timer_.elapsed()*format_desc_.fps*0.5));\r
\r
graph_->update_value("tick-time", static_cast<float>(tick_timer_.elapsed()*format_desc_.fps*0.5));\r
- tick_timer_.restart();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
+ tick_timer_.restart(); \r
}\r
- graph_->set_value("input-buffer", static_cast<double>(executor_.size())/static_cast<double>(executor_.capacity()));\r
- });\r
- graph_->set_value("input-buffer", static_cast<double>(executor_.size())/static_cast<double>(executor_.capacity()));\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ }\r
+\r
+ done();\r
}\r
\r
void encode_hanc(BLUE_UINT32* hanc_data, void* audio_data, size_t audio_samples, size_t audio_nchannels)\r
const size_t device_index_;\r
const bool embedded_audio_;\r
const bool key_only_;\r
- core::video_format_desc format_desc_;\r
public:\r
\r
bluefish_consumer_proxy(size_t device_index, bool embedded_audio, bool key_only)\r
\r
virtual void initialize(const core::video_format_desc& format_desc)\r
{\r
+ if(consumer_)\r
+ BOOST_THROW_EXCEPTION(invalid_operation());\r
+\r
Concurrency::scoped_oversubcription_token oversubscribe;\r
- format_desc_ = format_desc;\r
consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_));\r
}\r
\r
\r
return L"bluefish [" + boost::lexical_cast<std::wstring>(device_index_) + L"]";\r
}\r
+\r
+ virtual size_t buffer_depth() const\r
+ {\r
+ return 1;\r
+ }\r
}; \r
\r
safe_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>& params)\r