#include <tbb/concurrent_queue.h>
-#include <boost/foreach.hpp>
#include <boost/thread/mutex.hpp>
#include "semaphore.h"
public:
typedef unsigned int size_type;
private:
- std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
- semaphore space_available_;
- semaphore elements_available_;
- mutable boost::mutex capacity_mutex_;
- size_type capacity_;
+ std::map<Prio, tbb::concurrent_queue<T>, std::greater<Prio>> queues_by_priority_;
+ size_type capacity_;
+ semaphore space_available_ { capacity_ };
+ semaphore elements_available_ { 0u };
+ mutable boost::mutex capacity_mutex_;
public:
/**
* Constructor.
*
* @param capacity The initial capacity of the queue.
- * @param priorities A forward iterable container with the priorities to
+ * @param priorities A forward iterable range with the priorities to
* support.
*/
template<class PrioList>
blocking_priority_queue(size_type capacity, const PrioList& priorities)
- : space_available_(capacity)
- , elements_available_(0u)
- , capacity_(capacity)
+ : capacity_(capacity)
{
- BOOST_FOREACH(Prio priority, priorities)
+ for (Prio priority : priorities)
{
queues_by_priority_.insert(std::make_pair(priority, tbb::concurrent_queue<T>()));
}
// The std::map is read-only from now on, so there *should* (it is
// unlikely but possible for a std::map implementor to choose to
- // rebalance the tree during read operations) be no race conditions
+ // rebalance the tree during read operations) be no race conditions
// regarding the map.
//
// This may be true for vc10 as well:
{
acquire_transaction transaction(elements_available_);
-
pop_acquired_any_priority(element, transaction);
}
acquire_transaction transaction(elements_available_, true);
- BOOST_FOREACH(auto& queue, queues_by_priority_)
+ for (auto& queue : queues_by_priority_)
{
if (queue.first < minimum_priority)
{
*/
void set_capacity(size_type capacity)
{
- boost::mutex::scoped_lock lock (capacity_mutex_);
+ boost::unique_lock<boost::mutex> lock(capacity_mutex_);
if (capacity_ < capacity)
{
*/
size_type capacity() const
{
- boost::mutex::scoped_lock lock (capacity_mutex_);
+ boost::unique_lock<boost::mutex> lock (capacity_mutex_);
return capacity_;
}
{
return elements_available_.permits();
}
+
+ /**
+ * @return the current available space in the queue (may have changed at
+ * the time of returning).
+ */
+ size_type space_available() const
+ {
+ return space_available_.permits();
+ }
private:
void push_acquired(Prio priority, const T& element, acquire_transaction& transaction)
{
void pop_acquired_any_priority(T& element, acquire_transaction& transaction)
{
- BOOST_FOREACH(auto& queue, queues_by_priority_)
+ for (auto& queue : queues_by_priority_)
{
if (queue.second.try_pop(element))
{