--- /dev/null
+/*
+* Copyright (c) 2011 Sveriges Television AB <info@casparcg.com>
+*
+* This file is part of CasparCG (www.casparcg.com).
+*
+* CasparCG is free software: you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation, either version 3 of the License, or
+* (at your option) any later version.
+*
+* CasparCG is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with CasparCG. If not, see <http://www.gnu.org/licenses/>.
+*
+* Author: Helge Norberg, helge.norberg@svt.se
+*/
+#pragma once
+
+#include <stdexcept>
+#include <map>
+#include <initializer_list>
+
+#include <tbb/concurrent_queue.h>
+
+#include <boost/foreach.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "semaphore.h"
+
+namespace caspar {
+
+/**
+ * Blocking concurrent priority queue supporting a predefined set of
+ * priorities. FIFO ordering is guaranteed within a priority.
+ *
+ * Prio must have the < and > operators defined where a larger instance is of a
+ * higher priority.
+ */
+template <class T, class Prio>
+class blocking_priority_queue
+{
+public:
+ typedef unsigned int size_type;
+private:
+ std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
+ semaphore space_available_;
+ semaphore elements_available_;
+ mutable boost::mutex capacity_mutex_;
+ size_type capacity_;
+public:
+ /**
+ * Constructor.
+ *
+ * @param capacity The initial capacity of the queue.
+ * @param priorities A forward iterable container with the priorities to
+ * support.
+ */
+ template<class PrioList>
+ blocking_priority_queue(size_type capacity, const PrioList& priorities)
+ : space_available_(capacity)
+ , elements_available_(0u)
+ , capacity_(capacity)
+ {
+ BOOST_FOREACH(Prio priority, priorities)
+ {
+ queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
+ }
+
+ // The std::map is read-only from now on, so there *should* (it is
+ // unlikely but possible for a std::map implementor to choose to
+ // rebalance the tree during read operations) be no race conditions
+ // regarding the map.
+ //
+ // This may be true for vc10 as well:
+ // http://msdn.microsoft.com/en-us/library/c9ceah3b%28VS.80%29.aspx
+ }
+
+ /**
+ * Push an element with a given priority to the queue. Blocks until room
+ * is available.
+ *
+ * @param priority The priority of the element.
+ * @param element The element.
+ */
+ void push(Prio priority, const T& element)
+ {
+ acquire_transaction transaction(space_available_);
+
+ push_acquired(priority, element, transaction);
+ }
+
+ /**
+ * Attempt to push an element with a given priority to the queue. Will
+ * immediately return even if there is no room in the queue.
+ *
+ * @param priority The priority of the element.
+ * @param element The element.
+ *
+ * @return true if the element was pushed. false if there was no room.
+ */
+ bool try_push(Prio priority, const T& element)
+ {
+ if (!space_available_.try_acquire())
+ return false;
+
+ acquire_transaction transaction(space_available_, true);
+
+ push_acquired(priority, element, transaction);
+
+ return true;
+ }
+
+ /**
+ * Pop the element with the highest priority (fifo for elements with the
+ * same priority). Blocks until an element is available.
+ *
+ * @param element The element to store the result in.
+ */
+ void pop(T& element)
+ {
+ acquire_transaction transaction(elements_available_);
+
+
+ pop_acquired_any_priority(element, transaction);
+ }
+
+ /**
+ * Attempt to pop the element with the highest priority (fifo for elements
+ * with the same priority) if available. Does not wait until an element is
+ * available.
+ *
+ * @param element The element to store the result in.
+ *
+ * @return true if an element was available.
+ */
+ bool try_pop(T& element)
+ {
+ if (!elements_available_.try_acquire())
+ return false;
+
+ acquire_transaction transaction(elements_available_, true);
+
+ pop_acquired_any_priority(element, transaction);
+
+ return true;
+ }
+
+ /**
+ * Attempt to pop the element with the highest priority (fifo for elements
+ * with the same priority) if available *and* has a minimum priority. Does
+ * not wait until an element satisfying the priority criteria is available.
+ *
+ * @param element The element to store the result in.
+ * @param minimum_priority The minimum priority to accept.
+ *
+ * @return true if an element was available with the minimum priority.
+ */
+ bool try_pop(T& element, Prio minimum_priority)
+ {
+ if (!elements_available_.try_acquire())
+ return false;
+
+ acquire_transaction transaction(elements_available_, true);
+
+ BOOST_FOREACH(auto& queue, queues_by_priority_)
+ {
+ if (queue.first < minimum_priority)
+ {
+ // Will be true for all queues from this point so we break.
+ break;
+ }
+
+ if (queue.second.try_pop(element))
+ {
+ transaction.commit();
+ space_available_.release();
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Modify the capacity of the queue. May block if reducing the capacity.
+ *
+ * @param capacity The new capacity.
+ */
+ void set_capacity(size_type capacity)
+ {
+ boost::mutex::scoped_lock lock (capacity_mutex_);
+
+ if (capacity_ < capacity)
+ {
+ auto to_grow_with = capacity - capacity_;
+ space_available_.release(to_grow_with);
+ }
+ else if (capacity_ > capacity)
+ {
+ auto to_shrink_with = capacity_ - capacity;
+ // Will block until the desired capacity has been reached.
+ space_available_.acquire(to_shrink_with);
+ }
+
+ capacity_ = capacity;
+ }
+
+ /**
+ * @return the current capacity of the queue.
+ */
+ size_type capacity() const
+ {
+ boost::mutex::scoped_lock lock (capacity_mutex_);
+
+ return capacity_;
+ }
+
+ /**
+ * @return the current size of the queue (may have changed at the time of
+ * returning).
+ */
+ size_type size() const
+ {
+ return elements_available_.permits();
+ }
+private:
+ void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
+ {
+ try
+ {
+ queues_by_priority_.at(priority).push(element);
+ }
+ catch (std::out_of_range&)
+ {
+ throw std::runtime_error("Priority not supported by queue");
+ }
+
+ transaction.commit();
+ elements_available_.release();
+ }
+
+ void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
+ {
+ BOOST_FOREACH(auto& queue, queues_by_priority_)
+ {
+ if (queue.second.try_pop(element))
+ {
+ transaction.commit();
+ space_available_.release();
+
+ return;
+ }
+ }
+
+ throw std::logic_error(
+ "blocking_priority_queue should have contained at least one element but didn't");
+ }
+};
+
+}
#include "enum_class.h"
#include "log.h"
#include "blocking_bounded_queue_adapter.h"
+#include "blocking_priority_queue.h"
+
#include <tbb/atomic.h>
#include <tbb/concurrent_priority_queue.h>
#include <boost/thread.hpp>
#include <boost/optional.hpp>
+#include <boost/assign/list_of.hpp>
#include <functional>
class executor sealed
{
- struct priority_function
- {
- int priority;
- std::function<void()> func;
-
- priority_function()
- {
- }
-
- template<typename F>
- priority_function(int priority, F&& func)
- : priority(priority)
- , func(std::forward<F>(func))
- {
- }
-
- void operator()()
- {
- func();
- }
-
- bool operator<(const priority_function& other) const
- {
- return priority < other.priority;
- }
- };
-
executor(const executor&);
executor& operator=(const executor&);
- typedef blocking_bounded_queue_adapter<tbb::concurrent_priority_queue<priority_function>> function_queue_t;
+ typedef blocking_priority_queue<std::function<void()>, task_priority> function_queue_t;
const std::wstring name_;
tbb::atomic<bool> is_running_;
public:
executor(const std::wstring& name)
: name_(name)
- , execution_queue_(512)
+ , execution_queue_(512, boost::assign::list_of
+ (task_priority::lowest_priority)
+ (task_priority::lower_priority)
+ (task_priority::low_priority)
+ (task_priority::normal_priority)
+ (task_priority::high_priority)
+ (task_priority::higher_priority))
{
is_running_ = true;
thread_ = boost::thread([this]{run();});
internal_begin_invoke([=]
{
is_running_ = false;
- }, false).wait();
+ }).wait();
}
catch(...)
{
thread_.join();
}
-
- template<typename Func>
- auto try_begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())>
- {
- if(!is_running_)
- BOOST_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running."));
-
- // Will return uninitialized future if the try failed (get_state() will return future_state::uninitialized).
- return internal_begin_invoke(std::forward<Func>(func), true, priority);
- }
template<typename Func>
auto begin_invoke(Func&& func, task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
if(!is_running_)
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("executor not running.") << source_info(name_));
- return internal_begin_invoke(std::forward<Func>(func), false, priority);
+ return internal_begin_invoke(std::forward<Func>(func), priority);
}
template<typename Func>
return begin_invoke(std::forward<Func>(func), prioriy).get();
}
- void yield()
+ void yield(task_priority minimum_priority)
{
if(!is_current())
CASPAR_THROW_EXCEPTION(invalid_operation() << msg_info("Executor can only yield inside of thread context.") << source_info(name_));
- priority_function func;
- if(execution_queue_.try_pop(func))
+ std::function<void ()> func;
+
+ while (execution_queue_.try_pop(func, minimum_priority))
func();
}
void clear()
{
- priority_function func;
+ std::function<void ()> func;
while(execution_queue_.try_pop(func));
}
template<typename Func>
auto internal_begin_invoke(
Func&& func,
- bool try_begin,
task_priority priority = task_priority::normal_priority) -> boost::unique_future<decltype(func())> // noexcept
{
typedef typename std::remove_reference<Func>::type function_type;
auto future = task->get_future();
auto raw_task = task.release();
- priority_function prio_func(priority.value(), [raw_task]
+ auto function = [raw_task]
{
std::unique_ptr<task_type> task(raw_task);
try
(*task)();
}
catch(boost::task_already_started&){}
- });
+ };
- if (!execution_queue_.try_push(prio_func))
+ if (!execution_queue_.try_push(priority, function))
{
- if (try_begin)
- {
- delete raw_task;
-
- return boost::unique_future<decltype(func())>();
- }
- else
- {
- CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
- execution_queue_.push(prio_func);
- }
+ CASPAR_LOG(debug) << print() << L" Overflow. Blocking caller.";
+ execution_queue_.push(priority, function);
}
return std::move(future);
{
try
{
- priority_function func;
+ std::function<void ()> func;
execution_queue_.pop(func);
func();
}