<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
<ClInclude Include="concurrency\com_context.h" />\r
<ClInclude Include="concurrency\executor.h" />\r
+ <ClInclude Include="concurrency\future_util.h" />\r
<ClInclude Include="concurrency\lock.h" />\r
<ClInclude Include="concurrency\target.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="concurrency\lock.h">\r
<Filter>source\concurrency</Filter>\r
</ClInclude>\r
+ <ClInclude Include="concurrency\future_util.h">\r
+ <Filter>source\concurrency</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
-/*\r
-* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>\r
-*\r
-* This file is part of CasparCG (www.casparcg.com).\r
-*\r
-* CasparCG is free software: you can redistribute it and/or modify\r
-* it under the terms of the GNU General Public License as published by\r
-* the Free Software Foundation, either version 3 of the License, or\r
-* (at your option) any later version.\r
-*\r
-* CasparCG is distributed in the hope that it will be useful,\r
-* but WITHOUT ANY WARRANTY; without even the implied warranty of\r
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\r
-* GNU General Public License for more details.\r
-*\r
-* You should have received a copy of the GNU General Public License\r
-* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.\r
-*\r
-* Author: Robert Nagy, ronag89@gmail.com\r
-*/\r
-\r
-#pragma once\r
-\r
-#include "../exception/win32_exception.h"\r
-#include "../exception/exceptions.h"\r
-#include "../utility/string.h"\r
-#include "../utility/move_on_copy.h"\r
-#include "../log/log.h"\r
-\r
-#include <tbb/atomic.h>\r
-#include <tbb/concurrent_queue.h>\r
-\r
-#include <boost/thread.hpp>\r
-#include <boost/optional.hpp>\r
-#include <boost/noncopyable.hpp>\r
-\r
-#include <functional>\r
-\r
-namespace caspar {\r
-\r
-namespace detail {\r
-\r
-typedef struct tagTHREADNAME_INFO\r
-{\r
- DWORD dwType; // must be 0x1000\r
- LPCSTR szName; // pointer to name (in user addr space)\r
- DWORD dwThreadID; // thread ID (-1=caller thread)\r
- DWORD dwFlags; // reserved for future use, must be zero\r
-} THREADNAME_INFO;\r
-\r
-inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
-{\r
- THREADNAME_INFO info;\r
- {\r
- info.dwType = 0x1000;\r
- info.szName = szThreadName;\r
- info.dwThreadID = dwThreadID;\r
- info.dwFlags = 0;\r
- }\r
- __try\r
- {\r
- RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
- }\r
- __except (EXCEPTION_CONTINUE_EXECUTION){} \r
-}\r
-\r
-}\r
-\r
-enum task_priority\r
-{\r
- high_priority,\r
- normal_priority,\r
- priority_count\r
-};\r
-\r
-enum thread_priority\r
-{\r
- high_priority_class,\r
- above_normal_priority_class,\r
- normal_priority_class,\r
- below_normal_priority_class\r
-};\r
-\r
-class executor : boost::noncopyable\r
-{\r
- const std::string name_;\r
- boost::thread thread_;\r
- tbb::atomic<bool> is_running_;\r
- \r
- typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;\r
- function_queue execution_queue_[priority_count];\r
- \r
- template<typename Func>\r
- auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept\r
- { \r
- typedef boost::packaged_task<decltype(func())> task_type;\r
- \r
- auto task = task_type(std::forward<Func>(func));\r
- \r
- task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
- {\r
- try\r
- {\r
- if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
- my_task();\r
- }\r
- catch(boost::task_already_started&){}\r
- }));\r
- \r
- return std::move(task);\r
- }\r
-\r
-public:\r
- \r
- explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept\r
- {\r
- is_running_ = true;\r
- thread_ = boost::thread([this]{run();});\r
- }\r
- \r
- virtual ~executor() // noexcept\r
- {\r
- stop();\r
- join();\r
- }\r
-\r
- void set_capacity(size_t capacity) // noexcept\r
- {\r
- execution_queue_[normal_priority].set_capacity(capacity);\r
- }\r
-\r
- void set_priority_class(thread_priority p)\r
- {\r
- begin_invoke([=]\r
- {\r
- if(p == high_priority_class)\r
- SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);\r
- else if(p == above_normal_priority_class)\r
- SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);\r
- else if(p == normal_priority_class)\r
- SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);\r
- else if(p == below_normal_priority_class)\r
- SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);\r
- });\r
- }\r
- \r
- void clear()\r
- { \r
- std::function<void()> func;\r
- while(execution_queue_[normal_priority].try_pop(func));\r
- while(execution_queue_[high_priority].try_pop(func));\r
- }\r
- \r
- void stop() // noexcept\r
- {\r
- is_running_ = false; \r
- execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.\r
- }\r
-\r
- void wait() // noexcept\r
- {\r
- invoke([]{});\r
- }\r
-\r
- void join()\r
- {\r
- if(boost::this_thread::get_id() != thread_.get_id())\r
- thread_.join();\r
- }\r
- \r
- template<typename Func>\r
- auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept\r
- { \r
- if(!is_running_)\r
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));\r
-\r
- // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.\r
- auto task_adaptor = make_move_on_copy(create_task(func));\r
-\r
- auto future = task_adaptor.value.get_future();\r
-\r
- execution_queue_[priority].push([=]\r
- {\r
- try\r
- {\r
- task_adaptor.value();\r
- }\r
- catch(boost::task_already_started&)\r
- {\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- });\r
-\r
- if(priority != normal_priority)\r
- execution_queue_[normal_priority].push(nullptr);\r
- \r
- return std::move(future); \r
- }\r
- \r
- template<typename Func>\r
- auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept\r
- {\r
- if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.\r
- return func();\r
- \r
- return begin_invoke(std::forward<Func>(func), prioriy).get();\r
- }\r
- \r
- void yield() // noexcept\r
- {\r
- if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread.\r
- return;\r
-\r
- std::function<void()> func;\r
- while(execution_queue_[high_priority].try_pop(func))\r
- {\r
- if(func)\r
- func();\r
- } \r
- }\r
- \r
- function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); }\r
- function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); }\r
- bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); }\r
- bool is_running() const /*noexcept*/ { return is_running_; } \r
- \r
-private:\r
- \r
- void execute() // noexcept\r
- {\r
- std::function<void()> func;\r
- execution_queue_[normal_priority].pop(func); \r
-\r
- yield();\r
-\r
- if(func)\r
- func();\r
- }\r
-\r
- void run() // noexcept\r
- {\r
- win32_exception::install_handler(); \r
- detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
- while(is_running_)\r
- {\r
- try\r
- {\r
- execute();\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- }\r
- }\r
- } \r
-};\r
-\r
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Robert Nagy, ronag89@gmail.com
+*/
+
+#pragma once
+
+#include "../exception/win32_exception.h"
+#include "../exception/exceptions.h"
+#include "../utility/string.h"
+#include "../utility/move_on_copy.h"
+#include "../log/log.h"
+
+#include <tbb/atomic.h>
+#include <tbb/concurrent_queue.h>
+
+#include <boost/thread.hpp>
+#include <boost/optional.hpp>
+#include <boost/noncopyable.hpp>
+
+#include <functional>
+
+namespace caspar {
+
+namespace detail {
+
+typedef struct tagTHREADNAME_INFO
+{
+ DWORD dwType; // must be 0x1000
+ LPCSTR szName; // pointer to name (in user addr space)
+ DWORD dwThreadID; // thread ID (-1=caller thread)
+ DWORD dwFlags; // reserved for future use, must be zero
+} THREADNAME_INFO;
+
+inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)
+{
+ THREADNAME_INFO info;
+ {
+ info.dwType = 0x1000;
+ info.szName = szThreadName;
+ info.dwThreadID = dwThreadID;
+ info.dwFlags = 0;
+ }
+ __try
+ {
+ RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );
+ }
+ __except (EXCEPTION_CONTINUE_EXECUTION){}
+}
+
+}
+
+enum task_priority
+{
+ high_priority,
+ normal_priority,
+ priority_count
+};
+
+enum thread_priority
+{
+ high_priority_class,
+ above_normal_priority_class,
+ normal_priority_class,
+ below_normal_priority_class
+};
+
+class executor : boost::noncopyable
+{
+ const std::string name_;
+ boost::thread thread_;
+ tbb::atomic<bool> is_running_;
+
+ typedef tbb::concurrent_bounded_queue<std::function<void()>> function_queue;
+ function_queue execution_queue_[priority_count];
+
+ template<typename Func>
+ auto create_task(Func&& func) -> boost::packaged_task<decltype(func())> // noexcept
+ {
+ typedef boost::packaged_task<decltype(func())> task_type;
+
+ auto task = task_type(std::forward<Func>(func));
+
+ task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.
+ {
+ try
+ {
+ if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
+ my_task();
+ }
+ catch(boost::task_already_started&){}
+ }));
+
+ return std::move(task);
+ }
+
+public:
+
+ explicit executor(const std::wstring& name) : name_(narrow(name)) // noexcept
+ {
+ is_running_ = true;
+ thread_ = boost::thread([this]{run();});
+ }
+
+ virtual ~executor() // noexcept
+ {
+ stop();
+ join();
+ }
+
+ void set_capacity(size_t capacity) // noexcept
+ {
+ execution_queue_[normal_priority].set_capacity(capacity);
+ }
+
+ void set_priority_class(thread_priority p)
+ {
+ begin_invoke([=]
+ {
+ if(p == high_priority_class)
+ SetThreadPriority(GetCurrentThread(), HIGH_PRIORITY_CLASS);
+ else if(p == above_normal_priority_class)
+ SetThreadPriority(GetCurrentThread(), ABOVE_NORMAL_PRIORITY_CLASS);
+ else if(p == normal_priority_class)
+ SetThreadPriority(GetCurrentThread(), NORMAL_PRIORITY_CLASS);
+ else if(p == below_normal_priority_class)
+ SetThreadPriority(GetCurrentThread(), BELOW_NORMAL_PRIORITY_CLASS);
+ });
+ }
+
+ void clear()
+ {
+ std::function<void()> func;
+ while(execution_queue_[normal_priority].try_pop(func));
+ while(execution_queue_[high_priority].try_pop(func));
+ }
+
+ void stop() // noexcept
+ {
+ is_running_ = false;
+ execution_queue_[normal_priority].try_push([]{}); // Wake the execution thread.
+ }
+
+ void wait() // noexcept
+ {
+ invoke([]{});
+ }
+
+ void join()
+ {
+ if(boost::this_thread::get_id() != thread_.get_id())
+ thread_.join();
+ }
+
+ template<typename Func>
+ auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>
+ {
+ if(!is_running_)
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+ // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
+ auto task_adaptor = make_move_on_copy(create_task(func));
+
+ auto future = task_adaptor.value.get_future();
+
+ // Enable the cancellation of the task if priority is other than
+ // normal, because there are two queues to try_push to. Either both
+ // succeed or nothing will be executed.
+
+ boost::promise<bool> cancelled_promise;
+ boost::shared_future<bool> cancelled(cancelled_promise.get_future());
+
+ bool was_enqueued = execution_queue_[priority].try_push([=]() mutable
+ {
+ // Wait until we know if we should cancel execution or not.
+ if (cancelled.get())
+ return;
+
+ try
+ {
+ task_adaptor.value();
+ }
+ catch(boost::task_already_started&)
+ {
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ });
+
+ if (!was_enqueued)
+ return boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>();
+
+ if (priority != normal_priority)
+ {
+ was_enqueued = execution_queue_[normal_priority].try_push(nullptr);
+
+ if (was_enqueued)
+ {
+ // Now we know that both enqueue operations has succeeded.
+ cancelled_promise.set_value(false);
+ }
+ else
+ {
+ cancelled_promise.set_value(true); // The actual task has already been
+ // queued so we cancel it.
+
+ return boost::optional<caspar::move_on_copy<boost::unique_future<decltype(func())>>>();
+ }
+ }
+ else
+ {
+ cancelled_promise.set_value(false);
+ }
+
+ return caspar::make_move_on_copy(std::move(future));
+ }
+
+ template<typename Func>
+ auto begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+ {
+ if(!is_running_)
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+ // Create a move on copy adaptor to avoid copying the functor into the queue, tbb::concurrent_queue does not support move semantics.
+ auto task_adaptor = make_move_on_copy(create_task(func));
+
+ auto future = task_adaptor.value.get_future();
+
+ execution_queue_[priority].push([=]
+ {
+ try
+ {
+ task_adaptor.value();
+ }
+ catch(boost::task_already_started&)
+ {
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ });
+
+ if(priority != normal_priority)
+ execution_queue_[normal_priority].push(nullptr);
+
+ return std::move(future);
+ }
+
+ template<typename Func>
+ auto invoke(Func&& func, task_priority prioriy = normal_priority) -> decltype(func()) // noexcept
+ {
+ if(boost::this_thread::get_id() == thread_.get_id()) // Avoids potential deadlock.
+ return func();
+
+ return begin_invoke(std::forward<Func>(func), prioriy).get();
+ }
+
+ void yield() // noexcept
+ {
+ if(boost::this_thread::get_id() != thread_.get_id()) // Only yield when calling from execution thread.
+ return;
+
+ std::function<void()> func;
+ while(execution_queue_[high_priority].try_pop(func))
+ {
+ if(func)
+ func();
+ }
+ }
+
+ function_queue::size_type capacity() const /*noexcept*/ { return execution_queue_[normal_priority].capacity(); }
+ function_queue::size_type size() const /*noexcept*/ { return execution_queue_[normal_priority].size(); }
+ bool empty() const /*noexcept*/ { return execution_queue_[normal_priority].empty(); }
+ bool is_running() const /*noexcept*/ { return is_running_; }
+
+private:
+
+ void execute() // noexcept
+ {
+ std::function<void()> func;
+ execution_queue_[normal_priority].pop(func);
+
+ yield();
+
+ if(func)
+ func();
+ }
+
+ void run() // noexcept
+ {
+ win32_exception::install_handler();
+ detail::SetThreadName(GetCurrentThreadId(), name_.c_str());
+ while(is_running_)
+ {
+ try
+ {
+ execute();
+ }
+ catch(...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ }
+ }
+ }
+};
+
}
\ No newline at end of file
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+#pragma once
+
+#include "../log/log.h"
+
+#include <boost/thread/future.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/function.hpp>
+#include <boost/optional.hpp>
+
+namespace caspar
+{
+
+/**
+ * A utility that helps the producer side of a future when the task is not
+ * able to complete immediately but there are known retry points in the code.
+ */
+template<class R>
+class retry_task
+{
+public:
+ typedef boost::function<boost::optional<R> ()> func_type;
+
+ retry_task() : done_(false) {}
+
+ /**
+ * Reset the state with a new task. If the previous task has not completed
+ * the old one will be discarded.
+ *
+ * @param func The function that tries to calculate future result. If the
+ * optional return value is set the future is marked as ready.
+ */
+ void set_task(const func_type& func)
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ func_ = func;
+ done_ = false;
+ promise_ = boost::promise<R>();
+ }
+
+ /**
+ * Take ownership of the future for the current task. Cannot only be called
+ * once for each task.
+ *
+ * @return the future.
+ */
+ boost::unique_future<R> get_future()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ return promise_.get_future();
+ }
+
+ /**
+ * Call this when it is guaranteed or probable that the task will be able
+ * to complete.
+ *
+ * @return true if the task completed (the future will have a result).
+ */
+ bool try_completion()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ if (!func_)
+ return false;
+
+ if (done_)
+ return true;
+
+ boost::optional<R> result;
+
+ try
+ {
+ result = func_();
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(boost::current_exception());
+ done_ = true;
+
+ return true;
+ }
+
+ if (result)
+ {
+ promise_.set_value(*result);
+ done_ = true;
+ }
+
+ return done_;
+ }
+
+ /**
+ * Call this when it is certain that the result should be ready, and if not
+ * it should be regarded as an unrecoverable error (retrying again would
+ * be useless), so the future will be marked as failed.
+ *
+ * @param exception The exception to mark the future with *if* the task
+ * completion fails.
+ */
+ void try_or_fail(const std::exception& exception)
+ {
+ if (!try_completion())
+ {
+ try
+ {
+ throw exception;
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(boost::current_exception());
+ done_ = true;
+ }
+ }
+ }
+private:
+ boost::mutex mutex_;
+ func_type func_;
+ boost::promise<R> promise_;
+ bool done_;
+};
+
+/**
+ * Wrap a value in a future with an already known result.
+ * <p>
+ * Useful when the result of an operation is already known at the time of
+ * calling.
+ *
+ * @param value The r-value to wrap.
+ *
+ * @return The future with the result set.
+ */
+template<class R>
+boost::unique_future<R> wrap_as_future(R&& value)
+{
+ boost::promise<R> p;
+
+ p.set_value(value);
+
+ return p.get_future();
+}
+
+}
#include <common/env.h>\r
#include <common/memory/safe_ptr.h>\r
#include <common/exception/exceptions.h>\r
+#include <common/concurrency/future_util.h>\r
#include <core/video_format.h>\r
#include <core/mixer/read_frame.h>\r
\r
consumer_->initialize(format_desc, channel_index);\r
}\r
\r
- virtual bool send(const safe_ptr<read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) override\r
{ \r
if(audio_cadence_.size() == 1)\r
return consumer_->send(frame);\r
\r
- bool result = true;\r
+ boost::unique_future<bool> result = caspar::wrap_as_future(true);\r
\r
if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size())) \r
{ \r
\r
sync_buffer_.push_back(static_cast<size_t>(frame->audio_data().size()));\r
\r
- return result;\r
+ return std::move(result);\r
}\r
\r
virtual std::wstring print() const override\r
{\r
struct empty_frame_consumer : public frame_consumer\r
{\r
- virtual bool send(const safe_ptr<read_frame>&) override {return false;}\r
+ virtual boost::unique_future<bool> send(const safe_ptr<read_frame>&) override { return caspar::wrap_as_future(false); }\r
virtual void initialize(const video_format_desc&, int) override{}\r
virtual std::wstring print() const override {return L"empty";}\r
virtual bool has_synchronization_clock() const override {return false;}\r
\r
#include <boost/noncopyable.hpp>\r
#include <boost/property_tree/ptree_fwd.hpp>\r
+#include <boost/thread/future.hpp>\r
\r
#include <functional>\r
#include <string>\r
{\r
virtual ~frame_consumer() {}\r
\r
- virtual bool send(const safe_ptr<read_frame>& frame) = 0;\r
+ virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) = 0;\r
virtual void initialize(const video_format_desc& format_desc, int channel_index) = 0;\r
virtual std::wstring print() const = 0;\r
virtual boost::property_tree::wptree info() const = 0;\r
if(!frames_.full())\r
return;\r
\r
+ std::map<int, boost::unique_future<bool>> send_results;\r
+\r
+ // Start invocations\r
auto it = consumers_.begin();\r
while(it != consumers_.end())\r
{\r
\r
try\r
{\r
- if(consumer->send(frame))\r
- ++it;\r
- else\r
+ send_results.insert(std::make_pair(it->first, consumer->send(frame)));\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ try\r
{\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
+ send_results.insert(std::make_pair(it->first, consumer->send(frame)));\r
+ }\r
+ catch(...)\r
+ {\r
+ CASPAR_LOG_CURRENT_EXCEPTION();\r
+ CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
+ consumers_.erase(it);\r
+ }\r
+ }\r
+\r
+ ++it;\r
+ }\r
+\r
+ // Retrieve results\r
+ auto result_it = send_results.begin();\r
+ while(result_it != send_results.end())\r
+ {\r
+ auto consumer = consumers_.at(result_it->first);\r
+ auto frame = frames_.at(consumer->buffer_depth()-minmax.first);\r
+ auto& result_future = result_it->second;\r
+ \r
+ try\r
+ {\r
+ if(!result_future.get())\r
+ {\r
+ CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+ consumers_.erase(result_it->first);\r
}\r
}\r
catch(...)\r
try\r
{\r
consumer->initialize(format_desc_, channel_index_);\r
- if(consumer->send(frame))\r
- ++it;\r
- else\r
+ if(!consumer->send(frame).get())\r
{\r
- CASPAR_LOG(info) << print() << L" " << it->second->print() << L" Removed.";\r
- consumers_.erase(it++);\r
+ CASPAR_LOG(info) << print() << L" " << consumer->print() << L" Removed.";\r
+ consumers_.erase(result_it->first);\r
}\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
CASPAR_LOG(error) << "Failed to recover consumer: " << consumer->print() << L". Removing it.";\r
- consumers_.erase(it++);\r
+ consumers_.erase(result_it->first);\r
}\r
}\r
+\r
+ ++result_it;\r
}\r
\r
graph_->set_value("consume-time", consume_timer_.elapsed()*format_desc_.fps*0.5);\r
\r
#include <common/exception/exceptions.h>\r
#include <common/memory/memcpy.h>\r
-\r
+#include <common/concurrency/future_util.h>\r\r
#include <tbb/concurrent_queue.h>\r
\r
namespace caspar { namespace core {\r
\r
// frame_consumer\r
\r
- virtual bool send(const safe_ptr<read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<read_frame>& frame) override\r
{\r
frame_buffer_.try_push(frame);\r
- return is_running_;\r
+ return caspar::wrap_as_future(is_running_.load());\r
}\r
\r
virtual void initialize(const core::video_format_desc& format_desc, int channel_index) override\r
#include <core/mixer/read_frame.h>\r
\r
#include <common/concurrency/executor.h>\r
+#include <common/concurrency/future_util.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/memory/memclr.h>\r
#include <common/memory/memcpy.h>\r
const bool key_only_;\r
\r
executor executor_;\r
+ retry_task<bool> send_completion_;\r
public:\r
bluefish_consumer(const core::video_format_desc& format_desc, unsigned int device_index, bool embedded_audio, bool key_only, int channel_index) \r
: blue_(create_blue(device_index))\r
CASPAR_LOG(error)<< print() << TEXT(" Failed to disable video output."); \r
}\r
\r
- void send(const safe_ptr<core::read_frame>& frame)\r
- { \r
- executor_.begin_invoke([=]\r
+ boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
+ {\r
+ auto display_command = [=]\r
{\r
+ // The executor queue now has room so we try to complete the pending send call\r
+ send_completion_.try_or_fail(\r
+ caspar_exception() << msg_info(narrow(print()) + " Future send not able to complete."));\r
+\r
try\r
{ \r
display_frame(frame); \r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
}\r
+ };\r
+\r
+ auto enqueue_command = [=]\r
+ {\r
+ return executor_.try_begin_invoke(display_command);\r
+ };\r
+\r
+ if (enqueue_command())\r
+ return wrap_as_future(true);\r
+\r
+ send_completion_.set_task([=]\r
+ {\r
+ return enqueue_command() ? boost::optional<bool>(true) : boost::optional<bool>();\r
});\r
+\r
+ return std::move(send_completion_.get_future());\r
}\r
\r
void display_frame(const safe_ptr<core::read_frame>& frame)\r
CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
{\r
CASPAR_VERIFY(audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size()));\r
boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
\r
- consumer_->send(frame);\r
- return true;\r
+ return consumer_->send(frame);\r
}\r
\r
virtual std::wstring print() const override\r
#include <core/mixer/read_frame.h>\r
\r
#include <common/concurrency/com_context.h>\r
+#include <common/concurrency/future_util.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/exception/exceptions.h>\r
#include <common/memory/memcpy.h>\r
\r
safe_ptr<diagnostics::graph> graph_;\r
boost::timer tick_timer_;\r
+ retry_task<bool> send_completion_;\r
\r
public:\r
decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index) \r
graph_->set_tag("flushed-frame");\r
\r
std::shared_ptr<core::read_frame> frame; \r
- video_frame_buffer_.pop(frame); \r
+ video_frame_buffer_.pop(frame);\r
+ send_completion_.try_completion();\r
schedule_next_video(make_safe_ptr(frame)); \r
\r
unsigned long buffered;\r
{\r
std::shared_ptr<core::read_frame> frame;\r
audio_frame_buffer_.pop(frame);\r
+ send_completion_.try_completion();\r
schedule_next_audio(frame->audio_data());\r
}\r
\r
tick_timer_.restart();\r
}\r
\r
- void send(const safe_ptr<core::read_frame>& frame)\r
+ boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
{\r
{\r
tbb::spin_mutex::scoped_lock lock(exception_mutex_);\r
\r
if(!is_running_)\r
BOOST_THROW_EXCEPTION(caspar_exception() << msg_info(narrow(print()) + " Is not running."));\r
+\r
+ bool audio_ready = !config_.embedded_audio;\r
+ bool video_ready = false;\r
+\r
+ auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>\r
+ {\r
+ if (!audio_ready)\r
+ audio_ready = audio_frame_buffer_.try_push(frame);\r
+\r
+ if (!video_ready)\r
+ video_ready = video_frame_buffer_.try_push(frame);\r
+\r
+ if (audio_ready && video_ready)\r
+ return true;\r
+ else\r
+ return boost::optional<bool>();\r
+ };\r
\r
- if(config_.embedded_audio)\r
- audio_frame_buffer_.push(frame); \r
- video_frame_buffer_.push(frame); \r
+ if (enqueue_task())\r
+ return wrap_as_future(true);\r
+\r
+ send_completion_.set_task(enqueue_task);\r
+\r
+ return send_completion_.get_future();\r
}\r
\r
std::wstring print() const\r
CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
{\r
CASPAR_VERIFY(audio_cadence_.front() == static_cast<size_t>(frame->audio_data().size()));\r
boost::range::rotate(audio_cadence_, std::begin(audio_cadence_)+1);\r
\r
- context_->send(frame);\r
- return true;\r
+ return context_->send(frame);\r
}\r
\r
virtual std::wstring print() const override\r
#include <core/video_format.h>\r
\r
#include <common/concurrency/executor.h>\r
+#include <common/concurrency/future_util.h>\r
#include <common/diagnostics/graph.h>\r
#include <common/env.h>\r
#include <common/utility/string.h>\r
consumer_.reset(new ffmpeg_consumer(narrow(filename_), format_desc, options_));\r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
{\r
consumer_->send(frame);\r
- return true;\r
+ return caspar::wrap_as_future(true);\r
}\r
\r
virtual std::wstring print() const override\r
#include <common/env.h>\r
#include <common/log/log.h>\r
#include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
\r
#include <core/consumer/frame_consumer.h>\r
#include <core/video_format.h>\r
format_desc_ = format_desc;\r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
{ \r
auto format_desc = format_desc_;\r
boost::thread async([format_desc, frame]\r
});\r
async.detach();\r
\r
- return false;\r
+ return wrap_as_future(false);\r
}\r
\r
virtual std::wstring print() const override\r
#include <common/log/log.h>\r
#include <common/utility/timer.h>\r
#include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
\r
#include <core/consumer/frame_consumer.h>\r
#include <core/mixer/audio/audio_util.h>\r
#include <boost/circular_buffer.hpp>\r
#include <boost/property_tree/ptree.hpp>\r
#include <boost/timer.hpp>\r
+#include <boost/thread/future.hpp>\r
+#include <boost/optional.hpp>\r
\r
#include <tbb/concurrent_queue.h>\r
\r
boost::circular_buffer<audio_buffer_16> container_;\r
tbb::atomic<bool> is_running_;\r
core::audio_buffer temp;\r
+ retry_task<bool> send_completion_;\r
\r
core::video_format_desc format_desc_;\r
public:\r
CASPAR_LOG(info) << print() << " Sucessfully Initialized.";\r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
- { \r
- input_.push(std::make_shared<audio_buffer_16>(core::audio_32_to_16(frame->audio_data())));\r
- return true;\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
+ {\r
+ auto buffer = std::make_shared<audio_buffer_16>(core::audio_32_to_16(frame->audio_data()));\r
+\r
+ if (input_.try_push(buffer))\r return wrap_as_future(is_running_.load());\r\r
+ send_completion_.set_task([=]\r
+ {\r
+ return input_.try_push(buffer) ? boost::optional<bool>(is_running_) : boost::optional<bool>();\r
+ });\r
+\r
+ return send_completion_.get_future();\r
}\r
\r
virtual std::wstring print() const override\r
{ \r
std::shared_ptr<audio_buffer_16> audio_data; \r
input_.pop(audio_data);\r
- \r
+ send_completion_.try_completion();\r
+\r
container_.push_back(std::move(*audio_data));\r
data.Samples = container_.back().data();\r
data.NbSamples = container_.back().size(); \r
#include <common/memory/memshfl.h>\r
#include <common/utility/timer.h>\r
#include <common/utility/string.h>\r
+#include <common/concurrency/future_util.h>\r
\r
#include <ffmpeg/producer/filter/filter.h>\r
\r
std::rotate(pbos_.begin(), pbos_.begin() + 1, pbos_.end());\r
}\r
\r
- bool send(const safe_ptr<core::read_frame>& frame)\r
+ boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame)\r
{\r
if (!frame_buffer_.try_push(frame))\r
graph_->set_tag("dropped-frame"); \r
\r
- return is_running_;\r
+ return wrap_as_future(is_running_.load());\r
}\r
\r
std::wstring print() const\r
CASPAR_LOG(info) << print() << L" Successfully Initialized."; \r
}\r
\r
- virtual bool send(const safe_ptr<core::read_frame>& frame) override\r
+ virtual boost::unique_future<bool> send(const safe_ptr<core::read_frame>& frame) override\r
{\r
return consumer_->send(frame);\r
}\r
\r
configuration config;\r
\r
- if(params.size() > 1)\r config.screen_index = lexical_cast_or_default<int>(params[1], config.screen_index);\r\r
+ if(params.size() > 1)\r
+ config.screen_index = lexical_cast_or_default<int>(params[1], config.screen_index);\r
+\r
auto device_it = std::find(params.begin(), params.end(), L"DEVICE");\r
if(device_it != params.end() && ++device_it != params.end())\r
config.screen_index = boost::lexical_cast<int>(*device_it);\r