]> git.sesse.net Git - casparcg/commitdiff
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/branches...
authorronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 19 Dec 2010 20:30:32 +0000 (20:30 +0000)
committerronag <ronag@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Sun, 19 Dec 2010 20:30:32 +0000 (20:30 +0000)
common/concurrency/executor.h
core/producer/ffmpeg/input.cpp
core/producer/flash/flash_producer.cpp
core/producer/frame_producer.h
core/producer/layer.cpp

index fcfb14f1ef875518c9f837aac6b514a14c29b217..2f1237726630ebe3b08ee3ab7a0c1f96536ed472 100644 (file)
@@ -1,38 +1,30 @@
 #pragma once\r
 \r
-#include "../exception/exceptions.h"\r
 #include "../exception/win32_exception.h"\r
 #include "../log/log.h"\r
 \r
-#include <boost/thread.hpp>\r
-\r
 #include <tbb/atomic.h>\r
 #include <tbb/concurrent_queue.h>\r
 \r
+#include <boost/thread.hpp>\r
+#include <boost/noncopyable.hpp>\r
+\r
 #include <functional>\r
 #include <array>\r
 \r
 namespace caspar {\r
 \r
-class executor\r
+class executor : boost::noncopyable\r
 {\r
 public:\r
-\r
-       enum priority\r
-       {\r
-               low_priority = 0,\r
-               normal_priority,\r
-               high_priority\r
-       };\r
-\r
+       \r
        explicit executor(const std::function<void()>& f = nullptr)\r
        {\r
-               size_ = 0;\r
                is_running_ = false;\r
                f_ = f != nullptr ? f : [this]{run();};\r
        }\r
 \r
-       virtual ~executor()\r
+       ~executor()\r
        {\r
                stop();\r
        }\r
@@ -43,106 +35,84 @@ public:
                        return;\r
                thread_ = boost::thread(f_);\r
        }\r
-\r
-       bool is_running() const // noexcept\r
-       {\r
-               return is_running_;\r
-       }\r
-       \r
+               \r
        void stop(bool wait = true) // noexcept\r
        {\r
                is_running_ = false;    \r
-               begin_invoke([]{}); // wake if sleeping\r
+               execution_queue_.push([]{});\r
                if(wait && boost::this_thread::get_id() != thread_.get_id())\r
                        thread_.join();\r
        }\r
-\r
-       void execute() // noexcept\r
-       {\r
-               boost::unique_lock<boost::mutex> lock(mut_);\r
-               while(size_ < 1)                \r
-                       cond_.wait(lock);\r
-               \r
-               try_execute();\r
-       }\r
-\r
-       bool try_execute() // noexcept\r
-       {\r
-               std::function<void()> func;\r
-               if(execution_queue_[high_priority].try_pop(func) || execution_queue_[normal_priority].try_pop(func) || execution_queue_[low_priority].try_pop(func))\r
-               {\r
-                       func();\r
-                       --size_;\r
-               }\r
-\r
-               return func != nullptr;\r
-       }\r
                        \r
        template<typename Func>\r
-       auto begin_invoke(Func&& func, priority p = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
+       auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
        {       \r
-               typedef decltype(func()) result_type; \r
+               typedef boost::packaged_task<decltype(func())> task_type;\r
                                \r
-               auto task = std::make_shared<boost::packaged_task<result_type>>(std::forward<Func>(func)); // boost::packaged_task cannot be moved into lambda, need to used shared_ptr.\r
-               auto future = task->get_future();\r
+               auto task = task_type(std::forward<Func>(func));\r
+               auto future = task.get_future();\r
                \r
-               task->set_wait_callback(std::function<void(decltype(*task)& task)>([=](decltype(*task)& task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
+               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
                {\r
                        try\r
                        {\r
                                if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                                       task();\r
+                                       my_task();\r
                        }\r
                        catch(boost::task_already_started&){}\r
                }));\r
-               execution_queue_[p].push([=]\r
+                               \r
+               struct task_adaptor_t\r
+               {\r
+                       task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
+                       task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
+                       void operator()() const { task(); }\r
+                       mutable task_type task;\r
+               } task_adaptor(std::move(task));\r
+\r
+               execution_queue_.try_push([=]\r
                {\r
-                       try{(*task)();}\r
+                       try{task_adaptor();}\r
                        catch(boost::task_already_started&){}\r
                        catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
                });\r
-               ++size_;\r
-               cond_.notify_one();\r
 \r
                return std::move(future);               \r
        }\r
-\r
-       size_t size() const\r
-       {\r
-               return execution_queue_.size();\r
-       }\r
-\r
-       bool empty() const\r
-       {\r
-               return execution_queue_.empty();\r
-       }\r
        \r
        template<typename Func>\r
-       auto invoke(Func&& func, priority p = normal_priority) -> decltype(func())\r
+       auto invoke(Func&& func) -> decltype(func())\r
        {\r
                if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
                        return func();\r
                \r
-               return begin_invoke(std::forward<Func>(func), p).get();\r
+               return begin_invoke(std::forward<Func>(func)).get();\r
        }\r
+\r
+       tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
+       bool empty() const              { return execution_queue_.empty();      }\r
+       bool is_running() const { return is_running_;                           }       \r
                \r
 private:\r
+       \r
+       void execute() // noexcept\r
+       {\r
+               std::function<void()> func;\r
+               execution_queue_.pop(func);     \r
+               func();\r
+       }\r
 \r
-       virtual void run() // noexcept\r
+       void run() // noexcept\r
        {\r
                win32_exception::install_handler();\r
                while(is_running_)\r
                        execute();\r
        }\r
-\r
-       tbb::atomic<size_t> size_;\r
-       boost::condition_variable cond_;\r
-       boost::mutex mut_;\r
-\r
+       \r
        std::function<void()> f_;\r
        boost::thread thread_;\r
        tbb::atomic<bool> is_running_;\r
-       std::array<tbb::concurrent_bounded_queue<std::function<void()>>, 3> execution_queue_;\r
+       tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
 };\r
 \r
 }
\ No newline at end of file
index 1cd45dca1047a2ebf9a6a39ddd7413fbb354764b..caef3c4d9760d477ac87be9229c66ad3721db92f 100644 (file)
@@ -100,7 +100,7 @@ struct input::implementation : boost::noncopyable
                \r
        void read_file() // For every packet taken: read in a number of packets.\r
        {               \r
-               for(size_t n = 0; (n < 3 || video_packet_buffer_.size() < 3 || audio_packet_buffer_.size() < 3) && buffer_size_ < BUFFER_SIZE && executor_.is_running(); ++n)\r
+               for(size_t n = 0; buffer_size_ < BUFFER_SIZE && (n < 3 || video_packet_buffer_.size() < 3 || audio_packet_buffer_.size() < 3) && executor_.is_running(); ++n)\r
                {\r
                        AVPacket tmp_packet;\r
                        safe_ptr<AVPacket> read_packet(&tmp_packet, av_free_packet);    \r
@@ -141,8 +141,8 @@ struct input::implementation : boost::noncopyable
                if(buffer.try_pop(packet))\r
                {\r
                        buffer_size_ -= packet.size();\r
-                       if(executor_.size() < 4) // Avoid problems when in underrun.\r
-                               executor_.begin_invoke([this]{read_file();});\r
+                       executor_.begin_invoke([this]{read_file();});\r
+                       assert(executor_.size() < 8);\r
                }\r
                return std::move(packet);\r
        }\r
@@ -171,7 +171,7 @@ struct input::implementation : boost::noncopyable
 \r
        std::wstring print() const\r
        {\r
-               return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"] Buffering ";\r
+               return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"] Buffer thread";\r
        }\r
                                \r
        std::shared_ptr<AVFormatContext>        format_context_;        // Destroy this last\r
index 9b49775e6922f8621f40eab2220f8ad6824ba75d..bd6794949222374c815efa02aaf9429c89c27771 100644 (file)
@@ -140,14 +140,15 @@ public:
                return current_frame_;\r
        }\r
        \r
-       safe_ptr<draw_frame> receive()\r
+       bool try_pop(safe_ptr<draw_frame>& dest)\r
        {\r
-               frame_buffer_.try_pop(last_frame_);\r
-               return last_frame_;\r
+               bool result = frame_buffer_.try_pop(last_frame_);\r
+               dest = last_frame_;\r
+               return result;\r
        }\r
 \r
-       std::wstring print() const{ return L"flash[" + filename_ + L"]"; }\r
-       std::string bprint() const{ return narrow(L"flash[" + filename_ + L"]"); }\r
+       std::wstring print() const{ return L"flash[" + boost::filesystem::wpath(filename_).filename() + L"] Render thread"; }\r
+       std::string bprint() const{ return narrow(print()); }\r
 \r
 private:\r
        const std::wstring filename_;\r
@@ -186,41 +187,40 @@ struct flash_producer::implementation
                        if(!renderer_)\r
                                renderer_.reset(factory_());\r
                        renderer_->param(param);\r
-               }, executor::high_priority);\r
+                       render_frame();\r
+               });\r
        }\r
        \r
        safe_ptr<draw_frame> receive()\r
        {\r
-               auto frame = renderer_ ? renderer_->receive() : draw_frame::empty();\r
-               if(executor_.size() < 4) // Avoid problems when in underrun.\r
+               auto frame = draw_frame::empty();\r
+               if(renderer_ && renderer_->try_pop(frame)) // Only render again if frame was removed from buffer.               \r
+                       executor_.begin_invoke([this]{render_frame();});                \r
+               \r
+               assert(executor_.size() < 8);\r
+               \r
+               return frame;\r
+       }\r
+\r
+       void render_frame()\r
+       {\r
+               try\r
                {\r
-                       executor_.begin_invoke([this]\r
-                       {\r
-                               try\r
-                               {\r
-                                       renderer_->render();\r
-                               }\r
-                               catch(...)\r
-                               {\r
-                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
-                                       renderer_.reset();\r
-                               }\r
-                       });\r
+                       renderer_->render();\r
+               }\r
+               catch(...)\r
+               {\r
+                       renderer_.reset();\r
+                       CASPAR_LOG_CURRENT_EXCEPTION();\r
                }\r
-               return frame;\r
        }\r
 \r
        void initialize(const safe_ptr<frame_processor_device>& frame_processor)\r
        {\r
                factory_ = [=]{return new flash_renderer(frame_processor, filename_);};\r
-               executor_.invoke([&]\r
-               {\r
-                       if(!renderer_)\r
-                               renderer_.reset(factory_());\r
-               });\r
        }\r
-\r
-       std::wstring print() const{ return L"flash[" + filename_ + L"]"; }\r
+       \r
+       std::wstring print() const{ return L"flash[" + boost::filesystem::wpath(filename_).filename() + L"]"; }\r
        \r
        std::wstring filename_;\r
        std::function<flash_renderer*()> factory_;\r
index 9e36deedddbd8768b09d18d64e3c699b5ad04f81..1878bd071e97555a42ec40d88b4a1c959dc22958 100644 (file)
@@ -86,7 +86,7 @@ public:
                return producer;\r
        }\r
 \r
-       virtual std::wstring print() const = 0;\r
+       virtual std::wstring print() const = 0; // nothrow\r
 };\r
 \r
 inline std::wostream& operator<<(std::wostream& out, const frame_producer& producer)\r
index 547c6a1bfe94fc295d07d43429569281f1fa2035..2afe89a79afd7aca8d916d12d4341d89541ea84d 100644 (file)
@@ -16,14 +16,15 @@ struct layer::implementation
        void load(const safe_ptr<frame_producer>& frame_producer, bool autoplay)\r
        {                       \r
                background_ = frame_producer;\r
+               CASPAR_LOG(info) << print() << " " << foreground_->print() << " => background";\r
                if(autoplay)\r
                        play();                 \r
        }\r
 \r
        void preview(const safe_ptr<frame_producer>& frame_producer)\r
        {\r
-               background_ = frame_producer;\r
-               foreground_ = frame_producer::empty();  \r
+               stop();\r
+               load(frame_producer, false);                    \r
                try\r
                {\r
                        last_frame_ = frame_producer->receive();\r
@@ -31,9 +32,8 @@ struct layer::implementation
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       CASPAR_LOG(warning) << L"layer[" << index_ << L"] Error. Removed " << foreground_->print() << L" from layer.";\r
+                       CASPAR_LOG(warning) << print() << L" empty => background{" << background_->print() << "}";\r
                        background_ = frame_producer::empty();\r
-                       last_frame_ = draw_frame::empty();\r
                }\r
        }\r
        \r
@@ -43,7 +43,7 @@ struct layer::implementation
                foreground_ = background_;\r
                background_ = frame_producer::empty();\r
                is_paused_ = false;\r
-               CASPAR_LOG(info) << L"layer[" << index_ << L"] Started: " << foreground_->print();\r
+               CASPAR_LOG(info) << print() << L" background{" << foreground_->print() << "} => foreground";\r
        }\r
 \r
        void pause()\r
@@ -75,25 +75,28 @@ struct layer::implementation
 \r
                        if(last_frame_ == draw_frame::eof())\r
                        {\r
-                               CASPAR_LOG(info) << L"layer[" << index_ << L"] Ended:" << foreground_->print();\r
                                auto following = foreground_->get_following_producer();\r
                                following->set_leading_producer(foreground_);\r
                                foreground_ = following;\r
                                if(foreground_ != frame_producer::empty())\r
-                                       CASPAR_LOG(info) << L"layer[" << index_ << L"] Started:" << foreground_->print();\r
+                                       CASPAR_LOG(info) << print() << L" [EOF] following{" << foreground_->print() << "} => foreground{" << foreground_->print() << "}";\r
+                               else\r
+                                       CASPAR_LOG(info) << print() << L" [EOF] empty => foreground{" << foreground_->print() << "}";\r
                                last_frame_ = receive();\r
                        }\r
                }\r
                catch(...)\r
                {\r
                        CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       CASPAR_LOG(warning) << L"layer[" << index_ << L"] Error. Removed " << foreground_->print() << L" from layer.";\r
+                       CASPAR_LOG(warning) << print() << L" empty -> foreground{" << foreground_->print() << "]";\r
                        foreground_ = frame_producer::empty();\r
                        last_frame_ = draw_frame::empty();\r
                }\r
 \r
                return last_frame_;\r
        }\r
+\r
+       std::wstring print() const { return L"layer[" + boost::lexical_cast<std::wstring>(index_) + L"]"; }\r
                                \r
        tbb::atomic<bool>                       is_paused_;\r
        safe_ptr<draw_frame>            last_frame_;\r