]> git.sesse.net Git - casparcg/commitdiff
3560180: Proper sync of multiple consumers on the same channel (needs testing)
authorhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 30 Aug 2012 07:33:17 +0000 (07:33 +0000)
committerhellgore <hellgore@362d55ac-95cf-4e76-9f9a-cbaa9c17b72d>
Thu, 30 Aug 2012 07:33:17 +0000 (07:33 +0000)
git-svn-id: https://casparcg.svn.sourceforge.net/svnroot/casparcg/server/trunk@3224 362d55ac-95cf-4e76-9f9a-cbaa9c17b72d

14 files changed:
common/common.vcxproj
common/common.vcxproj.filters
common/concurrency/executor.h
common/concurrency/future_util.h [new file with mode: 0644]
core/consumer/frame_consumer.cpp
core/consumer/frame_consumer.h
core/consumer/output.cpp
core/producer/channel/channel_producer.cpp
modules/bluefish/consumer/bluefish_consumer.cpp
modules/decklink/consumer/decklink_consumer.cpp
modules/ffmpeg/consumer/ffmpeg_consumer.cpp
modules/image/consumer/image_consumer.cpp
modules/oal/consumer/oal_consumer.cpp
modules/ogl/consumer/ogl_consumer.cpp

index d1047f5eb5566ad5b3be6f37e4f809507aa7914e..2c74d97553c302c6f6acf8e55a569659ad0cfa68 100644 (file)
     <ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
     <ClInclude Include="concurrency\com_context.h" />\r
     <ClInclude Include="concurrency\executor.h" />\r
+    <ClInclude Include="concurrency\future_util.h" />\r
     <ClInclude Include="concurrency\lock.h" />\r
     <ClInclude Include="concurrency\target.h" />\r
     <ClInclude Include="diagnostics\graph.h" />\r
index 1a5155289638e6d63b996aef99d026c4124e8e02..8773a0e26fdd77a6989ab19a8dc0e942012de629 100644 (file)
     <ClInclude Include="concurrency\lock.h">\r
       <Filter>source\concurrency</Filter>\r
     </ClInclude>\r
+    <ClInclude Include="concurrency\future_util.h">\r
+      <Filter>source\concurrency</Filter>\r
+    </ClInclude>\r
   </ItemGroup>\r
 </Project>
\ No newline at end of file
index 41a82ea2f45992cc036e9d2c3e46d6236efb5c68..52025bd694366c048038222423fd158dc8dcdc31 100644 (file)
-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Robert Nagy, ronag89@gmail.com\r
-*/\r
-\r
-#pragma once\r
-\r
-#include "../exception/win32_exception.h"\r
-#include "../exception/exceptions.h"\r
-#include "../utility/string.h"\r
-#include "../utility/move_on_copy.h"\r
-#include "../log/log.h"\r
-\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_queue.h>\r
-\r
-#include <boost/thread.hpp>\r
-#include <boost/optional.hpp>\r
-#include <boost/noncopyable.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
-\r
-namespace detail {\r
-\r
-typedef struct tagTHREADNAME_INFO\r
-{\r
-       DWORD dwType; // must be 0x1000\r
-       LPCSTR szName; // pointer to name (in user addr space)\r
-       DWORD dwThreadID; // thread ID (-1=caller thread)\r
-       DWORD dwFlags; // reserved for future use, must be zero\r
-} THREADNAME_INFO;\r
-\r
-inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
-{\r
-       THREADNAME_INFO info;\r
-       {\r
-               info.dwType = 0x1000;\r
-               info.szName = szThreadName;\r
-               info.dwThreadID = dwThreadID;\r
-               info.dwFlags = 0;\r
-       }\r
-       __try\r
-       {\r
-               RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
-       }\r
-       __except (EXCEPTION_CONTINUE_EXECUTION){}       \r
-}\r
-\r
-}\r
-\r
-enum task_priority\r
-{\r
-       high_priority,\r
-       normal_priority,\r
-       priority_count\r
-};\r
-\r
-enum thread_priority\r
-{\r
-       high_priority_class,\r
-       above_normal_priority_class,\r
-       normal_priority_class,\r
-       below_normal_priority_class\r
-};\r
-\r
-class executor : boost::noncopyable\r
-{\r
-       const std::string name_;\r
-       boost::thread thread_;\r
-       tbb::atomic<bool> is_running_;\r
-       \r
-       typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
-       function_queue execution_queue_[priority_count];\r
-               \r
-       template<typename Func>\r
-       auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
-       {       \r
-               typedef boost::packaged_task<decltype(func())> task_type;\r
-                               \r
-               auto task = task_type(std::forward<Func>(func));\r
-               \r
-               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
-               {\r
-                       try\r
-                       {\r
-                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                                       my_task();\r
-                       }\r
-                       catch(boost::task_already_started&){}\r
-               }));\r
-                               \r
-               return std::move(task);\r
-       }\r
-\r
-public:\r
-               \r
-       explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept\r
-       {\r
-               is_running_ = true;\r
-               thread_ = boost::thread([this]{run();});\r
-       }\r
-       \r
-       virtual ~executor() // noexcept\r
-       {\r
-               stop();\r
-               join();\r
-       }\r
-\r
-       void set_capacity(size_t capacity) // noexcept\r
-       {\r
-               execution_queue_[normal_priority].set_capacity(capacity);\r
-       }\r
-\r
-       void set_priority_class(thread_priority p)\r
-       {\r
-               begin_invoke([=]\r
-               {\r
-                       if(p == high_priority_class)\r
-                               SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);\r
-                       else if(p == above_normal_priority_class)\r
-                               SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);\r
-                       else if(p == normal_priority_class)\r
-                               SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);\r
-                       else if(p == below_normal_priority_class)\r
-                               SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);\r
-               });\r
-       }\r
-       \r
-       void clear()\r
-       {               \r
-               std::function<void()> func;\r
-               while(execution_queue_[normal_priority].try_pop(func));\r
-               while(execution_queue_[high_priority].try_pop(func));\r
-       }\r
-                               \r
-       void stop() // noexcept\r
-       {\r
-               is_running_ = false;    \r
-               execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.\r
-       }\r
-\r
-       void wait() // noexcept\r
-       {\r
-               invoke([]{});\r
-       }\r
-\r
-       void join()\r
-       {\r
-               if(boost::this_thread::get_id() != thread_.get_id())\r
-                       thread_.join();\r
-       }\r
-                               \r
-       template<typename Func>\r
-       auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
-       {       \r
-               if(!is_running_)\r
-                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));\r
-\r
-               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
-               auto task_adaptor = make_move_on_copy(create_task(func));\r
-\r
-               auto future = task_adaptor.value.get_future();\r
-\r
-               execution_queue_[priority].push([=]\r
-               {\r
-                       try\r
-                       {\r
-                               task_adaptor.value();\r
-                       }\r
-                       catch(boost::task_already_started&)\r
-                       {\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       }\r
-               });\r
-\r
-               if(priority != normal_priority)\r
-                       execution_queue_[normal_priority].push(nullptr);\r
-                                       \r
-               return std::move(future);               \r
-       }\r
-       \r
-       template<typename Func>\r
-       auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
-       {\r
-               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
-                       return func();\r
-               \r
-               return begin_invoke(std::forward<Func>(func), prioriy).get();\r
-       }\r
-       \r
-       void yield() // noexcept\r
-       {\r
-               if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.\r
-                       return;\r
-\r
-               std::function<void()> func;\r
-               while(execution_queue_[high_priority].try_pop(func))\r
-               {\r
-                       if(func)\r
-                               func();\r
-               }       \r
-       }\r
-       \r
-       function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity();  }\r
-       function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size();  }\r
-       bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty();     }\r
-       bool is_running() const /*noexcept*/ { return is_running_; }    \r
-               \r
-private:\r
-       \r
-       void execute() // noexcept\r
-       {\r
-               std::function<void()> func;\r
-               execution_queue_[normal_priority].pop(func);    \r
-\r
-               yield();\r
-\r
-               if(func)\r
-                       func();\r
-       }\r
-\r
-       void run() // noexcept\r
-       {\r
-               win32_exception::install_handler();             \r
-               detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
-               while(is_running_)\r
-               {\r
-                       try\r
-                       {\r
-                               execute();\r
-                       }\r
-                       catch(...)\r
-                       {\r
-                               CASPAR_LOG_CURRENT_EXCEPTION();\r
-                       }\r
-               }\r
-       }       \r
-};\r
-\r
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include "../exception/win32_exception.h"
+#include "../exception/exceptions.h"
+#include "../utility/string.h"
+#include "../utility/move_on_copy.h"
+#include "../log/log.h"
+
+#include <tbb/atomic.h>
+#include <tbb/concurrent_queue.h>
+
+#include <boost/thread.hpp>
+#include <boost/optional.hpp>
+#include <boost/noncopyable.hpp>
+
+#include <functional>
+
+namespace caspar {
+
+namespace detail {
+
+typedef struct tagTHREADNAME_INFO
+{
+       DWORD dwType; // must be 0x1000
+       LPCSTR szName; // pointer to name (in user addr space)
+       DWORD dwThreadID; // thread ID (-1=caller thread)
+       DWORD dwFlags; // reserved for future use, must be zero
+} THREADNAME_INFO;
+
+inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
+{
+       THREADNAME_INFO info;
+       {
+               info.dwType = 0x1000;
+               info.szName = szThreadName;
+               info.dwThreadID = dwThreadID;
+               info.dwFlags = 0;
+       }
+       __try
+       {
+               RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
+       }
+       __except (EXCEPTION_CONTINUE_EXECUTION){}       
+}
+
+}
+
+enum task_priority
+{
+       high_priority,
+       normal_priority,
+       priority_count
+};
+
+enum thread_priority
+{
+       high_priority_class,
+       above_normal_priority_class,
+       normal_priority_class,
+       below_normal_priority_class
+};
+
+class executor : boost::noncopyable
+{
+       const std::string name_;
+       boost::thread thread_;
+       tbb::atomic<bool> is_running_;
+       
+       typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
+       function_queue execution_queue_[priority_count];
+               
+       template<typename Func>
+       auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
+       {       
+               typedef boost::packaged_task<decltype(func())> task_type;
+                               
+               auto task = task_type(std::forward<Func>(func));
+               
+               task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
+               {
+                       try
+                       {
+                               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
+                                       my_task();
+                       }
+                       catch(boost::task_already_started&){}
+               }));
+                               
+               return std::move(task);
+       }
+
+public:
+               
+       explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept
+       {
+               is_running_ = true;
+               thread_ = boost::thread([this]{run();});
+       }
+       
+       virtual ~executor() // noexcept
+       {
+               stop();
+               join();
+       }
+
+       void set_capacity(size_t capacity) // noexcept
+       {
+               execution_queue_[normal_priority].set_capacity(capacity);
+       }
+
+       void set_priority_class(thread_priority p)
+       {
+               begin_invoke([=]
+               {
+                       if(p == high_priority_class)
+                               SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);
+                       else if(p == above_normal_priority_class)
+                               SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);
+                       else if(p == normal_priority_class)
+                               SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);
+                       else if(p == below_normal_priority_class)
+                               SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);
+               });
+       }
+       
+       void clear()
+       {               
+               std::function<void()> func;
+               while(execution_queue_[normal_priority].try_pop(func));
+               while(execution_queue_[high_priority].try_pop(func));
+       }
+                               
+       void stop() // noexcept
+       {
+               is_running_ = false;    
+               execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.
+       }
+
+       void wait() // noexcept
+       {
+               invoke([]{});
+       }
+
+       void join()
+       {
+               if(boost::this_thread::get_id() != thread_.get_id())
+                       thread_.join();
+       }
+
+       template<typename Func>
+       auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>
+       {       
+               if(!is_running_)
+                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
+               auto task_adaptor = make_move_on_copy(create_task(func));
+
+               auto future = task_adaptor.value.get_future();
+
+               // Enable the cancellation of the task if priority is other than
+               // normal, because there are two queues to try_push to. Either both
+               // succeed or nothing will be executed.
+               
+               boost::promise<bool> cancelled_promise;
+               boost::shared_future<bool> cancelled(cancelled_promise.get_future());
+
+               bool was_enqueued = execution_queue_[priority].try_push([=]() mutable
+               {
+                       // Wait until we know if we should cancel execution or not.
+                       if (cancelled.get())
+                               return;
+
+                       try
+                       {
+                               task_adaptor.value();
+                       }
+                       catch(boost::task_already_started&)
+                       {
+                       }
+                       catch(...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+               });
+
+               if (!was_enqueued)
+                       return boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>();
+
+               if (priority != normal_priority)
+               {
+                       was_enqueued = execution_queue_[normal_priority].try_push(nullptr);
+
+                       if (was_enqueued)
+                       {
+                               // Now we know that both enqueue operations has succeeded.
+                               cancelled_promise.set_value(false);
+                       }
+                       else
+                       {
+                               cancelled_promise.set_value(true); // The actual task has already been
+                                                                  // queued so we cancel it.
+
+                               return boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>();
+                       }
+               }
+               else
+               {
+                       cancelled_promise.set_value(false);
+               }
+
+               return caspar::make_move_on_copy(std::move(future));
+       }
+                               
+       template<typename Func>
+       auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+       {       
+               if(!is_running_)
+                       BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+               // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
+               auto task_adaptor = make_move_on_copy(create_task(func));
+
+               auto future = task_adaptor.value.get_future();
+
+               execution_queue_[priority].push([=]
+               {
+                       try
+                       {
+                               task_adaptor.value();
+                       }
+                       catch(boost::task_already_started&)
+                       {
+                       }
+                       catch(...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+               });
+
+               if(priority != normal_priority)
+                       execution_queue_[normal_priority].push(nullptr);
+                                       
+               return std::move(future);               
+       }
+       
+       template<typename Func>
+       auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
+       {
+               if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.
+                       return func();
+               
+               return begin_invoke(std::forward<Func>(func), prioriy).get();
+       }
+       
+       void yield() // noexcept
+       {
+               if(boost::this_thread::get_id() != thread_.get_id())  // Only yield when calling from execution thread.
+                       return;
+
+               std::function<void()> func;
+               while(execution_queue_[high_priority].try_pop(func))
+               {
+                       if(func)
+                               func();
+               }       
+       }
+       
+       function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity();  }
+       function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size();  }
+       bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty();     }
+       bool is_running() const /*noexcept*/ { return is_running_; }    
+               
+private:
+       
+       void execute() // noexcept
+       {
+               std::function<void()> func;
+               execution_queue_[normal_priority].pop(func);    
+
+               yield();
+
+               if(func)
+                       func();
+       }
+
+       void run() // noexcept
+       {
+               win32_exception::install_handler();             
+               detail::SetThreadName(GetCurrentThreadId(), name_.c_str());
+               while(is_running_)
+               {
+                       try
+                       {
+                               execute();
+                       }
+                       catch(...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                       }
+               }
+       }       
+};
+
 }
\ No newline at end of file
diff --git a/common/concurrency/future_util.h b/common/concurrency/future_util.h
new file mode 100644 (file)
index 0000000..fec9608
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+#pragma once
+
+#include "../log/log.h"
+
+#include <boost/thread/future.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/optional.hpp>
+
+namespace caspar
+{
+
+/**
+ * A utility that helps the producer side of a future when the task is not
+ * able to complete immediately but there are known retry points in the code.
+ */
+template<class R>
+class retry_task
+{
+public:
+       typedef boost::function<boost::optional<R> ()> func_type;
+       
+       retry_task() : done_(false) {}
+
+       /**
+        * Reset the state with a new task. If the previous task has not completed
+        * the old one will be discarded.
+        *
+        * @param func The function that tries to calculate future result. If the
+        *             optional return value is set the future is marked as ready.
+        */
+       void set_task(const func_type& func)
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               func_ = func;
+               done_ = false;
+               promise_ = boost::promise<R>();
+       }
+
+       /**
+        * Take ownership of the future for the current task. Cannot only be called
+        * once for each task.
+        *
+        * @return the future.
+        */
+       boost::unique_future<R> get_future()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               return promise_.get_future();
+       }
+
+       /**
+        * Call this when it is guaranteed or probable that the task will be able
+        * to complete.
+        *
+        * @return true if the task completed (the future will have a result).
+        */
+       bool try_completion()
+       {
+               boost::mutex::scoped_lock lock(mutex_);
+
+               if (!func_)
+                       return false;
+
+               if (done_)
+                       return true;
+
+               boost::optional<R> result;
+
+               try
+               {
+                       result = func_();
+               }
+               catch (...)
+               {
+                       CASPAR_LOG_CURRENT_EXCEPTION();
+                       promise_.set_exception(boost::current_exception());
+                       done_ = true;
+
+                       return true;
+               }
+
+               if (result)
+               {
+                       promise_.set_value(*result);
+                       done_ = true;
+               }
+
+               return done_;
+       }
+
+       /**
+        * Call this when it is certain that the result should be ready, and if not
+        * it should be regarded as an unrecoverable error (retrying again would
+        * be useless), so the future will be marked as failed.
+        *
+        * @param exception The exception to mark the future with *if* the task
+        *                  completion fails.
+        */
+       void try_or_fail(const std::exception& exception)
+       {
+               if (!try_completion())
+               {
+                       try
+                       {
+                               throw exception;
+                       }
+                       catch (...)
+                       {
+                               CASPAR_LOG_CURRENT_EXCEPTION();
+                               promise_.set_exception(boost::current_exception());
+                               done_ = true;
+                       }
+               }
+       }
+private:
+       boost::mutex mutex_;
+       func_type func_;
+       boost::promise<R> promise_;
+       bool done_;
+};
+
+/**
+ * Wrap a value in a future with an already known result.
+ * <p>
+ * Useful when the result of an operation is already known at the time of
+ * calling.
+ *
+ * @param value The r-value to wrap.
+ *
+ * @return The future with the result set.
+ */
+template<class R>
+boost::unique_future<R> wrap_as_future(R&& value)
+{
+       boost::promise<R> p;
+
+       p.set_value(value);
+
+       return p.get_future();
+}
+
+}
index 63bf35bd47f3890b1360da91b02fad8e403cd44f..bce2337507ab7b6c09a84d9326949c20505221a5 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/env.h>\r
 #include <common/memory/safe_ptr.h>\r
 #include <common/exception/exceptions.h>\r
+#include <common/concurrency/future_util.h>\r
 #include <core/video_format.h>\r
 #include <core/mixer/read_frame.h>\r
 \r
@@ -84,12 +85,12 @@ public:
                consumer_->initialize(format_desc, channel_index);\r
        }\r
 \r
-       virtual bool send(const safe_ptr<read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) override\r
        {               \r
                if(audio_cadence_.size() == 1)\r
                        return consumer_->send(frame);\r
 \r
-               bool result = true;\r
+               boost::unique_future<bool> result = caspar::wrap_as_future(true);\r
                \r
                if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size())) \r
                {       \r
@@ -102,7 +103,7 @@ public:
 \r
                sync_buffer_.push_back(static_cast<size_t>(frame->audio_data().size()));\r
                \r
-               return result;\r
+               return std::move(result);\r
        }\r
 \r
        virtual std::wstring print() const override\r
@@ -140,7 +141,7 @@ const safe_ptr<frame_consumer>& frame_consumer::empty()
 {\r
        struct empty_frame_consumer : public frame_consumer\r
        {\r
-               virtual bool send(const safe_ptr<read_frame>&) override {return false;}\r
+               virtual boost::unique_future<bool> send(const safe_ptr<read_frame>&) override { return caspar::wrap_as_future(false); }\r
                virtual void initialize(const video_format_desc&, int) override{}\r
                virtual std::wstring print() const override {return L"empty";}\r
                virtual bool has_synchronization_clock() const override {return false;}\r
index 531dee8edffe44822188e3087e173eb105c913bb..3577a8e5ea53bd579d144dce96da77445476f270 100644 (file)
@@ -25,6 +25,7 @@
 \r
 #include <boost/noncopyable.hpp>\r
 #include <boost/property_tree/ptree_fwd.hpp>\r
+#include <boost/thread/future.hpp>\r
 \r
 #include <functional>\r
 #include <string>\r
@@ -39,7 +40,7 @@ struct frame_consumer : boost::noncopyable
 {\r
        virtual ~frame_consumer() {}\r
        \r
-       virtual bool send(const safe_ptr<read_frame>& frame) = 0;\r
+       virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) = 0;\r
        virtual void initialize(const video_format_desc& format_desc, int channel_index) = 0;\r
        virtual std::wstring print() const = 0;\r
        virtual boost::property_tree::wptree info() const = 0;\r
index cb615fa78cf3926707cb61953aa8f5588833dcf5..9e7a0d90914f9e48e5fede835950f5add42db8ac 100644 (file)
@@ -188,6 +188,9 @@ public:
                                if(!frames_.full())\r
                                        return;\r
 \r
+                               std::map<int, boost::unique_future<bool>> send_results;\r
+\r
+                               // Start invocations\r
                                auto it = consumers_.begin();\r
                                while(it != consumers_.end())\r
                                {\r
@@ -196,12 +199,40 @@ public:
                                                \r
                                        try\r
                                        {\r
-                                               if(consumer->send(frame))\r
-                                                       ++it;\r
-                                               else\r
+                                               send_results.insert(std::make_pair(it->first, consumer->send(frame)));\r
+                                       }\r
+                                       catch(...)\r
+                                       {\r
+                                               CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                               try\r
                                                {\r
-                                                       CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
-                                                       consumers_.erase(it++);\r
+                                                       send_results.insert(std::make_pair(it->first, consumer->send(frame)));\r
+                                               }\r
+                                               catch(...)\r
+                                               {\r
+                                                       CASPAR_LOG_CURRENT_EXCEPTION();\r
+                                                       CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
+                                                       consumers_.erase(it);\r
+                                               }\r
+                                       }\r
+\r
+                                       ++it;\r
+                               }\r
+\r
+                               // Retrieve results\r
+                               auto result_it = send_results.begin();\r
+                               while(result_it != send_results.end())\r
+                               {\r
+                                       auto consumer           = consumers_.at(result_it->first);\r
+                                       auto frame                      = frames_.at(consumer->buffer_depth()-minmax.first);\r
+                                       auto& result_future     = result_it->second;\r
+                                               \r
+                                       try\r
+                                       {\r
+                                               if(!result_future.get())\r
+                                               {\r
+                                                       CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+                                                       consumers_.erase(result_it->first);\r
                                                }\r
                                        }\r
                                        catch(...)\r
@@ -210,21 +241,21 @@ public:
                                                try\r
                                                {\r
                                                        consumer->initialize(format_desc_, channel_index_);\r
-                                                       if(consumer->send(frame))\r
-                                                               ++it;\r
-                                                       else\r
+                                                       if(!consumer->send(frame).get())\r
                                                        {\r
-                                                               CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
-                                                               consumers_.erase(it++);\r
+                                                               CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+                                                               consumers_.erase(result_it->first);\r
                                                        }\r
                                                }\r
                                                catch(...)\r
                                                {\r
                                                        CASPAR_LOG_CURRENT_EXCEPTION();\r
                                                        CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
-                                                       consumers_.erase(it++);\r
+                                                       consumers_.erase(result_it->first);\r
                                                }\r
                                        }\r
+\r
+                                       ++result_it;\r
                                }\r
                                                \r
                                graph_->set_value("consume-time", consume_timer_.elapsed()*format_desc_.fps*0.5);\r
index 7fcd28a9e168cf63b8f1e196685ab452e749c7fe..6d3bdb2636b2ba760ed53434cd3e98f5c426c677 100644 (file)
@@ -34,7 +34,7 @@
 \r
 #include <common/exception/exceptions.h>\r
 #include <common/memory/memcpy.h>\r
-\r
+#include <common/concurrency/future_util.h>\r\r
 #include <tbb/concurrent_queue.h>\r
 \r
 namespace caspar { namespace core {\r
@@ -60,10 +60,10 @@ public:
 \r
        // frame_consumer\r
 \r
-       virtual bool send(const safe_ptr<read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) override\r
        {\r
                frame_buffer_.try_push(frame);\r
-               return is_running_;\r
+               return caspar::wrap_as_future(is_running_.load());\r
        }\r
 \r
        virtual void initialize(const core::video_format_desc& format_desc, int channel_index) override\r
index 162a3d4128f49989fa79791f71c418dec5d41f9d..950c39b06615be7b19fc61917664c0e223b4806a 100644 (file)
@@ -29,6 +29,7 @@
 #include <core/mixer/read_frame.h>\r
 \r
 #include <common/concurrency/executor.h>\r
+#include <common/concurrency/future_util.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/memory/memclr.h>\r
 #include <common/memory/memcpy.h>\r
@@ -72,6 +73,7 @@ struct bluefish_consumer : boost::noncopyable
        const bool                                                      key_only_;\r
                \r
        executor                                                        executor_;\r
+       retry_task<bool>                                        send_completion_;\r
 public:\r
        bluefish_consumer(const core::video_format_desc& format_desc, unsigned int device_index, bool embedded_audio, bool key_only, int channel_index) \r
                : blue_(create_blue(device_index))\r
@@ -184,10 +186,14 @@ public:
                        CASPAR_LOG(error)<< print() << TEXT(" Failed to disable video output.");                \r
        }\r
        \r
-       void send(const safe_ptr<core::read_frame>& frame)\r
-       {                                       \r
-               executor_.begin_invoke([=]\r
+       boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
+       {\r
+               auto display_command = [=]\r
                {\r
+                       // The executor queue now has room so we try to complete the pending send call\r
+                       send_completion_.try_or_fail(\r
+                               caspar_exception() << msg_info(narrow(print()) + " Future send not able to complete."));\r
+\r
                        try\r
                        {       \r
                                display_frame(frame);                           \r
@@ -198,7 +204,22 @@ public:
                        {\r
                                CASPAR_LOG_CURRENT_EXCEPTION();\r
                        }\r
+               };\r
+\r
+               auto enqueue_command = [=]\r
+               {\r
+                       return executor_.try_begin_invoke(display_command);\r
+               };\r
+\r
+               if (enqueue_command())\r
+                       return wrap_as_future(true);\r
+\r
+               send_completion_.set_task([=]\r
+               {\r
+                       return enqueue_command() ? boost::optional<bool>(true) : boost::optional<bool>();\r
                });\r
+\r
+               return std::move(send_completion_.get_future());\r
        }\r
 \r
        void display_frame(const safe_ptr<core::read_frame>& frame)\r
@@ -324,13 +345,12 @@ public:
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";   \r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                CASPAR_VERIFY(audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size()));\r
                boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
 \r
-               consumer_->send(frame);\r
-               return true;\r
+               return consumer_->send(frame);\r
        }\r
                \r
        virtual std::wstring print() const override\r
index ccebe095c674e2d3288c37e28f1f25f9e54d7c72..4e764ec217033c427be40a320b3917b3d8350ae6 100644 (file)
@@ -30,6 +30,7 @@
 #include <core/mixer/read_frame.h>\r
 \r
 #include <common/concurrency/com_context.h>\r
+#include <common/concurrency/future_util.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/exception/exceptions.h>\r
 #include <common/memory/memcpy.h>\r
@@ -203,6 +204,7 @@ struct decklink_consumer : public IDeckLinkVideoOutputCallback, public IDeckLink
        \r
        safe_ptr<diagnostics::graph> graph_;\r
        boost::timer tick_timer_;\r
+       retry_task<bool> send_completion_;\r
 \r
 public:\r
        decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) \r
@@ -371,7 +373,8 @@ public:
                                graph_->set_tag("flushed-frame");\r
 \r
                        std::shared_ptr<core::read_frame> frame;        \r
-                       video_frame_buffer_.pop(frame);                                 \r
+                       video_frame_buffer_.pop(frame);\r
+                       send_completion_.try_completion();\r
                        schedule_next_video(make_safe_ptr(frame));      \r
                        \r
                        unsigned long buffered;\r
@@ -409,6 +412,7 @@ public:
                        {\r
                                std::shared_ptr<core::read_frame> frame;\r
                                audio_frame_buffer_.pop(frame);\r
+                               send_completion_.try_completion();\r
                                schedule_next_audio(frame->audio_data());\r
                        }\r
 \r
@@ -451,7 +455,7 @@ public:
                tick_timer_.restart();\r
        }\r
 \r
-       void send(const safe_ptr<core::read_frame>& frame)\r
+       boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
        {\r
                {\r
                        tbb::spin_mutex::scoped_lock lock(exception_mutex_);\r
@@ -461,10 +465,30 @@ public:
 \r
                if(!is_running_)\r
                        BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Is not running."));\r
+\r
+               bool audio_ready = !config_.embedded_audio;\r
+               bool video_ready = false;\r
+\r
+               auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>\r
+               {\r
+                       if (!audio_ready)\r
+                               audio_ready = audio_frame_buffer_.try_push(frame);\r
+\r
+                       if (!video_ready)\r
+                               video_ready = video_frame_buffer_.try_push(frame);\r
+\r
+                       if (audio_ready && video_ready)\r
+                               return true;\r
+                       else\r
+                               return boost::optional<bool>();\r
+               };\r
                \r
-               if(config_.embedded_audio)\r
-                       audio_frame_buffer_.push(frame);        \r
-               video_frame_buffer_.push(frame);        \r
+               if (enqueue_task())\r
+                       return wrap_as_future(true);\r
+\r
+               send_completion_.set_task(enqueue_task);\r
+\r
+               return send_completion_.get_future();\r
        }\r
        \r
        std::wstring print() const\r
@@ -507,13 +531,12 @@ public:
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";   \r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                CASPAR_VERIFY(audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size()));\r
                boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
 \r
-               context_->send(frame);\r
-               return true;\r
+               return context_->send(frame);\r
        }\r
        \r
        virtual std::wstring print() const override\r
index 1213b7d7bd688326d053ef1c47789801bfb46504..2ddaa3bbfe8484c2ab6de7662c23265ea72bf903 100644 (file)
@@ -33,6 +33,7 @@
 #include <core/video_format.h>\r
 \r
 #include <common/concurrency/executor.h>\r
+#include <common/concurrency/future_util.h>\r
 #include <common/diagnostics/graph.h>\r
 #include <common/env.h>\r
 #include <common/utility/string.h>\r
@@ -615,10 +616,10 @@ public:
                consumer_.reset(new ffmpeg_consumer(narrow(filename_), format_desc, options_));\r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                consumer_->send(frame);\r
-               return true;\r
+               return caspar::wrap_as_future(true);\r
        }\r
        \r
        virtual std::wstring print() const override\r
index d88bdf35563d6418e681ab67585d388c48099a46..d14bf07274a6439fb0ae99e73bd0122593c1b35f 100644 (file)
@@ -25,6 +25,7 @@
 #include <common/env.h>\r
 #include <common/log/log.h>\r
 #include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
 \r
 #include <core/consumer/frame_consumer.h>\r
 #include <core/video_format.h>\r
@@ -53,7 +54,7 @@ public:
                format_desc_ = format_desc;\r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {                               \r
                auto format_desc = format_desc_;\r
                boost::thread async([format_desc, frame]\r
@@ -74,7 +75,7 @@ public:
                });\r
                async.detach();\r
 \r
-               return false;\r
+               return wrap_as_future(false);\r
        }\r
 \r
        virtual std::wstring print() const override\r
index a9da741b8f7db0020879a57f53168589101857fa..6f843b83292c20b48f607bccf130466b8a9334a5 100644 (file)
@@ -26,6 +26,7 @@
 #include <common/log/log.h>\r
 #include <common/utility/timer.h>\r
 #include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
 \r
 #include <core/consumer/frame_consumer.h>\r
 #include <core/mixer/audio/audio_util.h>\r
@@ -38,6 +39,8 @@
 #include <boost/circular_buffer.hpp>\r
 #include <boost/property_tree/ptree.hpp>\r
 #include <boost/timer.hpp>\r
+#include <boost/thread/future.hpp>\r
+#include <boost/optional.hpp>\r
 \r
 #include <tbb/concurrent_queue.h>\r
 \r
@@ -55,6 +58,7 @@ struct oal_consumer : public core::frame_consumer,  public sf::SoundStream
        boost::circular_buffer<audio_buffer_16>                         container_;\r
        tbb::atomic<bool>                                                                       is_running_;\r
        core::audio_buffer                                                                      temp;\r
+       retry_task<bool>                                                                        send_completion_;\r
 \r
        core::video_format_desc                                                         format_desc_;\r
 public:\r
@@ -97,10 +101,17 @@ public:
                CASPAR_LOG(info) << print() << " Sucessfully Initialized.";\r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
-       {                       \r
-               input_.push(std::make_shared<audio_buffer_16>(core::audio_32_to_16(frame->audio_data())));\r
-               return true;\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
+       {\r
+               auto buffer = std::make_shared<audio_buffer_16>(core::audio_32_to_16(frame->audio_data()));\r
+\r
+               if (input_.try_push(buffer))\r                   return wrap_as_future(is_running_.load());\r\r
+               send_completion_.set_task([=]\r
+               {\r
+                       return input_.try_push(buffer) ? boost::optional<bool>(is_running_) : boost::optional<bool>();\r
+               });\r
+\r
+               return send_completion_.get_future();\r
        }\r
        \r
        virtual std::wstring print() const override\r
@@ -126,7 +137,8 @@ public:
        {               \r
                std::shared_ptr<audio_buffer_16> audio_data;            \r
                input_.pop(audio_data);\r
-                               \r
+               send_completion_.try_completion();\r
+\r
                container_.push_back(std::move(*audio_data));\r
                data.Samples = container_.back().data();\r
                data.NbSamples = container_.back().size();      \r
index 16014e1033e1fdc2a2933c3946f9f6b4e65b65c2..a44029ce954a8c132d3799b92e0eb1ae63dd11ad 100644 (file)
@@ -32,6 +32,7 @@
 #include <common/memory/memshfl.h>\r
 #include <common/utility/timer.h>\r
 #include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
 \r
 #include <ffmpeg/producer/filter/filter.h>\r
 \r
@@ -435,12 +436,12 @@ public:
                std::rotate(pbos_.begin(), pbos_.begin() + 1, pbos_.end());\r
        }\r
 \r
-       bool send(const safe_ptr<core::read_frame>& frame)\r
+       boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
        {\r
                if (!frame_buffer_.try_push(frame))\r
                        graph_->set_tag("dropped-frame"); \r
 \r
-               return is_running_;\r
+               return wrap_as_future(is_running_.load());\r
        }\r
                \r
        std::wstring print() const\r
@@ -535,7 +536,7 @@ public:
                CASPAR_LOG(info) << print() << L" Successfully Initialized.";   \r
        }\r
        \r
-       virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+       virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
        {\r
                return consumer_->send(frame);\r
        }\r
@@ -578,7 +579,9 @@ safe_ptr<core::frame_consumer> create_consumer(const std::vector<std::wstring>&
        \r
        configuration config;\r
                \r
-       if(params.size() > 1)\r          config.screen_index = lexical_cast_or_default<int>(params[1], config.screen_index);\r\r
+       if(params.size() > 1)\r
+               config.screen_index = lexical_cast_or_default<int>(params[1], config.screen_index);\r
+\r
        auto device_it = std::find(params.begin(), params.end(), L"DEVICE");\r
        if(device_it != params.end() && ++device_it != params.end())\r
                config.screen_index = boost::lexical_cast<int>(*device_it);\r