--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include "semaphore.h"
+
+#include <boost/thread/mutex.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace caspar {
+
+/**
+ * Adapts an unbounded non-blocking concurrent queue into a blocking bounded
+ * concurrent queue.
+ *
+ * The queue Q to adapt must support the following use cases:
+ *
+ * Q q;
+ * Q::value_type elem;
+ * q.push(elem);
+ *
+ * and:
+ *
+ * Q q;
+ * Q::value_type elem;
+ * q.try_pop(elem);
+ *
+ * It must also guarantee thread safety for those operations.
+ */
+template<class Q>
+class blocking_bounded_queue_adapter : boost::noncopyable
+{
+public:
+ typedef typename Q::value_type value_type;
+ typedef unsigned int size_type;
+private:
+ semaphore space_available_;
+ semaphore elements_available_;
+ Q queue_;
+ mutable boost::mutex capacity_mutex_;
+ size_type capacity_;
+public:
+ /**
+ * Constructor.
+ *
+ * @param capacity The capacity of the queue.
+ */
+ blocking_bounded_queue_adapter(size_type capacity)
+ : space_available_(capacity)
+ , elements_available_(0)
+ , capacity_(capacity)
+ {
+ }
+
+ /**
+ * Push an element to the queue, block until room is available.
+ *
+ * @param element The element to push.
+ */
+ void push(const value_type& element)
+ {
+ space_available_.acquire();
+ push_after_room_reserved(element);
+ }
+
+ /**
+ * Try to push an element to the queue, returning immediately if room is not
+ * available.
+ *
+ * @param element The element to push.
+ *
+ * @return true if there was room for the element.
+ */
+ bool try_push(const value_type& element)
+ {
+ bool room_available = space_available_.try_acquire();
+
+ if (!room_available)
+ return false;
+
+ push_after_room_reserved(element);
+
+ return true;
+ }
+
+ /**
+ * Pop an element from the queue, will block until an element is available.
+ *
+ * @param element The element to store the result in.
+ */
+ void pop(value_type& element)
+ {
+ elements_available_.acquire();
+ queue_.try_pop(element);
+ space_available_.release();
+ }
+
+ /**
+ * Try to pop an element from the queue, returning immediately if no
+ * element is available.
+ *
+ * @param element The element to store the result in.
+ *
+ * @return true if an element was popped.
+ */
+ bool try_pop(value_type& element)
+ {
+ if (!elements_available_.try_acquire())
+ return false;
+
+ queue_.try_pop(element);
+ space_available_.release();
+
+ return true;
+ }
+
+ /**
+ * Modify the capacity of the queue. May block if reducing the capacity.
+ *
+ * @param capacity The new capacity.
+ */
+ void set_capacity(size_type capacity)
+ {
+ boost::mutex::scoped_lock lock (capacity_mutex_);
+
+ if (capacity_ < capacity)
+ {
+ auto to_grow_with = capacity - capacity_;
+
+ space_available_.release(to_grow_with);
+ }
+ else if (capacity_ > capacity)
+ {
+ auto to_shrink_with = capacity_ - capacity;
+
+ // Will block until the desired capacity has been reached.
+ space_available_.acquire(to_shrink_with);
+ }
+
+ capacity_ = capacity;
+ }
+
+ /**
+ * @return the current capacity of the queue.
+ */
+ size_type capacity() const
+ {
+ boost::mutex::scoped_lock lock (capacity_mutex_);
+
+ return capacity_;
+ }
+
+ /**
+ * @return the current size of the queue (may have changed at the time of
+ * returning).
+ */
+ size_type size() const
+ {
+ return elements_available_.permits();
+ }
+private:
+ void push_after_room_reserved(const value_type& element)
+ {
+ try
+ {
+ queue_.push(element);
+ }
+ catch (...)
+ {
+ space_available_.release();
+
+ throw;
+ }
+
+ elements_available_.release();
+ }
+};
+
+}
<ItemGroup>\r
<ClInclude Include="array.h" />\r
<ClInclude Include="assert.h" />\r
+ <ClInclude Include="blocking_bounded_queue_adapter.h" />\r
<ClInclude Include="compiler\vs\disable_silly_warnings.h" />\r
<ClInclude Include="compiler\vs\stack_walker.h" />\r
<ClInclude Include="diagnostics\graph.h" />\r
<ClInclude Include="param.h" />\r
<ClInclude Include="prec_timer.h" />\r
<ClInclude Include="reactive.h" />\r
+ <ClInclude Include="semaphore.h" />\r
<ClInclude Include="stdafx.h" />\r
<ClInclude Include="tweener.h" />\r
<ClInclude Include="utf.h" />\r
<ClInclude Include="compiler\vs\stack_walker.h">\r
<Filter>source\compiler\vs</Filter>\r
</ClInclude>\r
+ <ClInclude Include="semaphore.h">\r
+ <Filter>source</Filter>\r
+ </ClInclude>\r
+ <ClInclude Include="blocking_bounded_queue_adapter.h">\r
+ <Filter>source</Filter>\r
+ </ClInclude>\r
</ItemGroup>\r
</Project>
\ No newline at end of file
#include "except.h"
#include "enum_class.h"
#include "log.h"
+#include "blocking_bounded_queue_adapter.h"
#include <tbb/atomic.h>
#include <tbb/concurrent_priority_queue.h>
-#include <tbb/concurrent_queue.h>
#include <boost/thread.hpp>
+#include <boost/optional.hpp>
#include <functional>
executor(const executor&);
executor& operator=(const executor&);
- typedef tbb::concurrent_priority_queue<priority_function> function_queue_t;
+ typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>> function_queue_t;
const std::wstring name_;
tbb::atomic<bool> is_running_;
boost::thread thread_;
function_queue_t execution_queue_;
- tbb::concurrent_bounded_queue<int> semaphore_;
public:
executor(const std::wstring& name)
: name_(name)
+ , execution_queue_(512)
{
is_running_ = true;
- set_capacity(512);
thread_ = boost::thread([this]{run();});
}
internal_begin_invoke([=]
{
is_running_ = false;
- }).wait();
+ }, false).wait();
}
catch(...)
{
thread_.join();
}
-
+
+ template<typename Func>
+ auto try_begin_invoke(Func&& func, task_priority priority = normal_priority) -> boost::unique_future<decltype(func())>
+ {
+ if(!is_running_)
+ BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
+
+ // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
+ return internal_begin_invoke(std::forward<Func>(func), true, priority);
+ }
+
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
- return internal_begin_invoke(std::forward<Func>(func), priority);
+ return internal_begin_invoke(std::forward<Func>(func), false, priority);
}
template<typename Func>
if(!is_current())
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
- int dummy;
- if(!semaphore_.try_pop(dummy))
- return;
-
priority_function func;
if(execution_queue_.try_pop(func))
func();
}
- void set_capacity(std::size_t capacity)
+ void set_capacity(function_queue_t::size_type capacity)
{
- semaphore_.set_capacity(capacity);
+ execution_queue_.set_capacity(capacity);
}
- std::size_t capacity() const
+ function_queue_t::size_type capacity() const
{
- return semaphore_.capacity();
+ return execution_queue_.capacity();
}
void clear()
}
template<typename Func>
- auto internal_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
+ auto internal_begin_invoke(
+ Func&& func,
+ bool try_begin,
+ task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
typedef typename std::remove_reference<Func>::type function_type;
typedef decltype(func()) result_type;
catch(boost::task_already_started&){}
});
- execution_queue_.push(prio_func);
-
- if(!semaphore_.try_push(0))
+ if (!execution_queue_.try_push(prio_func))
{
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
- semaphore_.push(0);
- }
+ if (try_begin)
+ {
+ delete raw_task;
+
+ return boost::unique_future<decltype(func())>();
+ }
+ else
+ {
+ CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ execution_queue_.push(prio_func);
+ }
+ }
+
return std::move(future);
}
{
try
{
- int dummy;
- semaphore_.pop(dummy);
-
priority_function func;
- if(execution_queue_.try_pop(func))
- func();
+ execution_queue_.pop(func);
+ func();
}
catch(...)
{
--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+
+#pragma once
+
+#include <cmath>
+
+#include <boost/noncopyable.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition.hpp>
+
+namespace caspar {
+
+template <class N, class Func>
+void repeat_n(N times_to_repeat_block, const Func& func)
+{
+ for (N i = 0; i < times_to_repeat_block; ++i)
+ {
+ func();
+ }
+}
+
+/**
+ * Counting semaphore modelled after java.util.concurrent.Semaphore
+ */
+class semaphore : boost::noncopyable
+{
+ mutable boost::mutex mutex_;
+ unsigned int permits_;
+ boost::condition_variable permits_available_;
+public:
+ /**
+ * Constructor.
+ *
+ * @param permits The initial number of permits.
+ */
+ semaphore(unsigned int permits)
+ : permits_(permits)
+ {
+ }
+
+ /**
+ * Release a permit.
+ */
+ void release()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ ++permits_;
+
+ permits_available_.notify_one();
+ }
+
+ /**
+ * Release a permit.
+ *
+ * @param permits The number of permits to release.
+ */
+ void release(unsigned int permits)
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ permits_ += permits;
+
+ repeat_n(permits, [this] { permits_available_.notify_one(); });
+ }
+
+ /**
+ * Acquire a permit. Will block until one becomes available if no permit is
+ * currently available.
+ */
+ void acquire()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ while (permits_ == 0u)
+ {
+ permits_available_.wait(lock);
+ }
+
+ --permits_;
+ }
+
+ /**
+ * Acquire a number of permits. Will block until the given number of
+ * permits has been acquired if not enough permits are currently available.
+ *
+ * @param permits The number of permits to acquire.
+ */
+ void acquire(unsigned int permits)
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+ auto num_acquired = 0u;
+
+ while (permits_ == 0u && num_acquired < permits)
+ {
+ permits_available_.wait(lock);
+
+ auto num_wanted = permits - num_acquired;
+ auto to_drain = std::min(num_wanted, permits_);
+
+ permits_ -= to_drain;
+ num_acquired += to_drain;
+ }
+ }
+
+ /**
+ * Acquire one permits if permits are currently available. Does not block
+ * until one is available, but returns immediately if unavailable.
+ *
+ * @return true if a permit was acquired or false if no permits where
+ * currently available.
+ */
+ bool try_acquire()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ if (permits_ == 0u)
+ return false;
+ else
+ {
+ --permits_;
+
+ return true;
+ }
+ }
+
+ /**
+ * @return the current number of permits (may have changed at the time of
+ * return).
+ */
+ unsigned int permits() const
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ return permits_;
+ }
+};
+
+}