#pragma once\r
\r
-#include "../exception/exceptions.h"\r
#include "../exception/win32_exception.h"\r
#include "../log/log.h"\r
\r
-#include <boost/thread.hpp>\r
-\r
#include <tbb/atomic.h>\r
#include <tbb/concurrent_queue.h>\r
\r
+#include <boost/thread.hpp>\r
+#include <boost/noncopyable.hpp>\r
+\r
#include <functional>\r
#include <array>\r
\r
namespace caspar {\r
\r
-class executor\r
+class executor : boost::noncopyable\r
{\r
public:\r
-\r
- enum priority\r
- {\r
- low_priority = 0,\r
- normal_priority,\r
- high_priority\r
- };\r
-\r
+ \r
explicit executor(const std::function<void()>& f = nullptr)\r
{\r
- size_ = 0;\r
is_running_ = false;\r
f_ = f != nullptr ? f : [this]{run();};\r
}\r
\r
- virtual ~executor()\r
+ ~executor()\r
{\r
stop();\r
}\r
return;\r
thread_ = boost::thread(f_);\r
}\r
-\r
- bool is_running() const // noexcept\r
- {\r
- return is_running_;\r
- }\r
- \r
+ \r
void stop(bool wait = true) // noexcept\r
{\r
is_running_ = false; \r
- begin_invoke([]{}); // wake if sleeping\r
+ execution_queue_.push([]{});\r
if(wait && boost::this_thread::get_id() != thread_.get_id())\r
thread_.join();\r
}\r
-\r
- void execute() // noexcept\r
- {\r
- boost::unique_lock<boost::mutex> lock(mut_);\r
- while(size_ < 1) \r
- cond_.wait(lock);\r
- \r
- try_execute();\r
- }\r
-\r
- bool try_execute() // noexcept\r
- {\r
- std::function<void()> func;\r
- if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))\r
- {\r
- func();\r
- --size_;\r
- }\r
-\r
- return func != nullptr;\r
- }\r
\r
template<typename Func>\r
- auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+ auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
{ \r
- typedef decltype(func()) result_type; \r
+ typedef boost::packaged_task<decltype(func())> task_type;\r
\r
- auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.\r
- auto future = task->get_future();\r
+ auto task = task_type(std::forward<Func>(func));\r
+ auto future = task.get_future();\r
\r
- task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+ task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
{\r
try\r
{\r
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
- task();\r
+ my_task();\r
}\r
catch(boost::task_already_started&){}\r
}));\r
- execution_queue_[p].push([=]\r
+ \r
+ struct task_adaptor_t\r
+ {\r
+ task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
+ task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
+ void operator()() const { task(); }\r
+ mutable task_type task;\r
+ } task_adaptor(std::move(task));\r
+\r
+ execution_queue_.try_push([=]\r
{\r
- try{(*task)();}\r
+ try{task_adaptor();}\r
catch(boost::task_already_started&){}\r
catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
});\r
- ++size_;\r
- cond_.notify_one();\r
\r
return std::move(future); \r
}\r
-\r
- size_t size() const\r
- {\r
- return execution_queue_.size();\r
- }\r
-\r
- bool empty() const\r
- {\r
- return execution_queue_.empty();\r
- }\r
\r
template<typename Func>\r
- auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())\r
+ auto invoke(Func&& func) -> decltype(func())\r
{\r
if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
return func();\r
\r
- return begin_invoke(std::forward<Func>(func), p).get();\r
+ return begin_invoke(std::forward<Func>(func)).get();\r
}\r
+\r
+ tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size(); }\r
+ bool empty() const { return execution_queue_.empty(); }\r
+ bool is_running() const { return is_running_; } \r
\r
private:\r
+ \r
+ void execute() // noexcept\r
+ {\r
+ std::function<void()> func;\r
+ execution_queue_.pop(func); \r
+ func();\r
+ }\r
\r
- virtual void run() // noexcept\r
+ void run() // noexcept\r
{\r
win32_exception::install_handler();\r
while(is_running_)\r
execute();\r
}\r
-\r
- tbb::atomic<size_t> size_;\r
- boost::condition_variable cond_;\r
- boost::mutex mut_;\r
-\r
+ \r
std::function<void()> f_;\r
boost::thread thread_;\r
tbb::atomic<bool> is_running_;\r
- std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;\r
+ tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
};\r
\r
}
\ No newline at end of file
\r
void read_file() // For every packet taken: read in a number of packets.\r
{ \r
- for(size_t n = 0; (n < 3 || video_packet_buffer_.size() < 3 || audio_packet_buffer_.size() < 3) && buffer_size_ < BUFFER_SIZE && executor_.is_running(); ++n)\r
+ for(size_t n = 0; buffer_size_ < BUFFER_SIZE && (n < 3 || video_packet_buffer_.size() < 3 || audio_packet_buffer_.size() < 3) && executor_.is_running(); ++n)\r
{\r
AVPacket tmp_packet;\r
safe_ptr<AVPacket> read_packet(&tmp_packet, av_free_packet); \r
if(buffer.try_pop(packet))\r
{\r
buffer_size_ -= packet.size();\r
- if(executor_.size() < 4) // Avoid problems when in underrun.\r
- executor_.begin_invoke([this]{read_file();});\r
+ executor_.begin_invoke([this]{read_file();});\r
+ assert(executor_.size() < 8);\r
}\r
return std::move(packet);\r
}\r
\r
std::wstring print() const\r
{\r
- return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"] Buffering ";\r
+ return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"] Buffer thread";\r
}\r
\r
std::shared_ptr<AVFormatContext> format_context_; // Destroy this last\r
return current_frame_;\r
}\r
\r
- safe_ptr<draw_frame> receive()\r
+ bool try_pop(safe_ptr<draw_frame>& dest)\r
{\r
- frame_buffer_.try_pop(last_frame_);\r
- return last_frame_;\r
+ bool result = frame_buffer_.try_pop(last_frame_);\r
+ dest = last_frame_;\r
+ return result;\r
}\r
\r
- std::wstring print() const{ return L"flash[" + filename_ + L"]"; }\r
- std::string bprint() const{ return narrow(L"flash[" + filename_ + L"]"); }\r
+ std::wstring print() const{ return L"flash[" + boost::filesystem::wpath(filename_).filename() + L"] Render thread"; }\r
+ std::string bprint() const{ return narrow(print()); }\r
\r
private:\r
const std::wstring filename_;\r
if(!renderer_)\r
renderer_.reset(factory_());\r
renderer_->param(param);\r
- }, executor::high_priority);\r
+ render_frame();\r
+ });\r
}\r
\r
safe_ptr<draw_frame> receive()\r
{\r
- auto frame = renderer_ ? renderer_->receive() : draw_frame::empty();\r
- if(executor_.size() < 4) // Avoid problems when in underrun.\r
+ auto frame = draw_frame::empty();\r
+ if(renderer_ && renderer_->try_pop(frame)) // Only render again if frame was removed from buffer. \r
+ executor_.begin_invoke([this]{render_frame();}); \r
+ \r
+ assert(executor_.size() < 8);\r
+ \r
+ return frame;\r
+ }\r
+\r
+ void render_frame()\r
+ {\r
+ try\r
{\r
- executor_.begin_invoke([this]\r
- {\r
- try\r
- {\r
- renderer_->render();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- renderer_.reset();\r
- }\r
- });\r
+ renderer_->render();\r
+ }\r
+ catch(...)\r
+ {\r
+ renderer_.reset();\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
- return frame;\r
}\r
\r
void initialize(const safe_ptr<frame_processor_device>& frame_processor)\r
{\r
factory_ = [=]{return new flash_renderer(frame_processor, filename_);};\r
- executor_.invoke([&]\r
- {\r
- if(!renderer_)\r
- renderer_.reset(factory_());\r
- });\r
}\r
-\r
- std::wstring print() const{ return L"flash[" + filename_ + L"]"; }\r
+ \r
+ std::wstring print() const{ return L"flash[" + boost::filesystem::wpath(filename_).filename() + L"]"; }\r
\r
std::wstring filename_;\r
std::function<flash_renderer*()> factory_;\r
return producer;\r
}\r
\r
- virtual std::wstring print() const = 0;\r
+ virtual std::wstring print() const = 0; // nothrow\r
};\r
\r
inline std::wostream& operator<<(std::wostream& out, const frame_producer& producer)\r
void load(const safe_ptr<frame_producer>& frame_producer, bool autoplay)\r
{ \r
background_ = frame_producer;\r
+ CASPAR_LOG(info) << print() << " " << foreground_->print() << " => background";\r
if(autoplay)\r
play(); \r
}\r
\r
void preview(const safe_ptr<frame_producer>& frame_producer)\r
{\r
- background_ = frame_producer;\r
- foreground_ = frame_producer::empty(); \r
+ stop();\r
+ load(frame_producer, false); \r
try\r
{\r
last_frame_ = frame_producer->receive();\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(warning) << L"layer[" << index_ << L"] Error. Removed " << foreground_->print() << L" from layer.";\r
+ CASPAR_LOG(warning) << print() << L" empty => background{" << background_->print() << "}";\r
background_ = frame_producer::empty();\r
- last_frame_ = draw_frame::empty();\r
}\r
}\r
\r
foreground_ = background_;\r
background_ = frame_producer::empty();\r
is_paused_ = false;\r
- CASPAR_LOG(info) << L"layer[" << index_ << L"] Started: " << foreground_->print();\r
+ CASPAR_LOG(info) << print() << L" background{" << foreground_->print() << "} => foreground";\r
}\r
\r
void pause()\r
\r
if(last_frame_ == draw_frame::eof())\r
{\r
- CASPAR_LOG(info) << L"layer[" << index_ << L"] Ended:" << foreground_->print();\r
auto following = foreground_->get_following_producer();\r
following->set_leading_producer(foreground_);\r
foreground_ = following;\r
if(foreground_ != frame_producer::empty())\r
- CASPAR_LOG(info) << L"layer[" << index_ << L"] Started:" << foreground_->print();\r
+ CASPAR_LOG(info) << print() << L" [EOF] following{" << foreground_->print() << "} => foreground{" << foreground_->print() << "}";\r
+ else\r
+ CASPAR_LOG(info) << print() << L" [EOF] empty => foreground{" << foreground_->print() << "}";\r
last_frame_ = receive();\r
}\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- CASPAR_LOG(warning) << L"layer[" << index_ << L"] Error. Removed " << foreground_->print() << L" from layer.";\r
+ CASPAR_LOG(warning) << print() << L" empty -> foreground{" << foreground_->print() << "]";\r
foreground_ = frame_producer::empty();\r
last_frame_ = draw_frame::empty();\r
}\r
\r
return last_frame_;\r
}\r
+\r
+ std::wstring print() const { return L"layer[" + boost::lexical_cast<std::wstring>(index_) + L"]"; }\r
\r
tbb::atomic<bool> is_paused_;\r
safe_ptr<draw_frame> last_frame_;\r