#include "tbb_exception.h"
#include "tbb_stddef.h"
#include "tbb_profiling.h"
-#include "_aggregator_internal.h"
+#include "internal/_aggregator_impl.h"
#include <vector>
#include <iterator>
#include <functional>
typedef A allocator_type;
//! Constructs a new concurrent_priority_queue with default capacity
- explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) {
+ explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
+ {
my_aggregator.initialize_handler(my_functor_t(this));
}
//! Constructs a new concurrent_priority_queue with init_sz capacity
- explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) {
+ explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
+ mark(0), my_size(0), data(a)
+ {
data.reserve(init_capacity);
my_aggregator.initialize_handler(my_functor_t(this));
}
//! [begin,end) constructor
template<typename InputIterator>
- concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a)
+ concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
+ data(begin, end, a)
{
mark = 0;
my_aggregator.initialize_handler(my_functor_t(this));
heapify();
+ my_size = data.size();
}
//! Copy constructor
- /** State of this queue may not reflect results of pending
- operations on the copied queue. */
- explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator())
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+ explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
+ my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
{
my_aggregator.initialize_handler(my_functor_t(this));
heapify();
}
- concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a)
+ //! Copy constructor with specific allocator
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+ concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
+ my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
{
my_aggregator.initialize_handler(my_functor_t(this));
heapify();
}
//! Assignment operator
- /** State of this queue may not reflect results of pending
- operations on the copied queue. */
+ /** This operation is unsafe if there are pending concurrent operations on the src queue. */
concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
if (this != &src) {
std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
mark = src.mark;
+ my_size = src.my_size;
}
return *this;
}
//! Returns true if empty, false otherwise
- /** Returned value may not reflect results of pending operations. */
- bool empty() const { return data.empty(); }
+ /** Returned value may not reflect results of pending operations.
+ This operation reads shared data and will trigger a race condition. */
+ bool empty() const { return size()==0; }
//! Returns the current number of elements contained in the queue
- /** Returned value may not reflect results of pending operations. */
- size_type size() const { return data.size(); }
-
- //! Returns the current capacity (i.e. allocated storage) of the queue
- /** Returned value may not reflect results of pending operations. */
- size_type capacity() const { return data.capacity(); }
+ /** Returned value may not reflect results of pending operations.
+ This operation reads shared data and will trigger a race condition. */
+ size_type size() const { return __TBB_load_with_acquire(my_size); }
//! Pushes elem onto the queue, increasing capacity of queue if necessary
+ /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
void push(const_reference elem) {
cpq_operation op_data(elem, PUSH_OP);
my_aggregator.execute(&op_data);
//! Gets a reference to and removes highest priority element
/** If a highest priority element was found, sets elem and returns true,
- otherwise returns false. */
+ otherwise returns false.
+ This operation can be safely used concurrently with other push, try_pop or reserve operations. */
bool try_pop(reference elem) {
cpq_operation op_data(POP_OP);
op_data.elem = &elem;
return op_data.status==SUCCEEDED;
}
- //! If current capacity is less than new_cap, increases capacity to new_cap
- void reserve(size_type new_cap) {
- cpq_operation op_data(RESERVE_OP);
- op_data.sz = new_cap;
- my_aggregator.execute(&op_data);
- if (op_data.status == FAILED) // exception thrown
- throw_exception(eid_bad_alloc);
- }
-
//! Clear the queue; not thread-safe
- /** Resets size, effectively emptying queue; does not free space.
+ /** This operation is unsafe if there are pending concurrent operations on the queue.
+ Resets size, effectively emptying queue; does not free space.
May not clear elements added in pending operations. */
void clear() {
data.clear();
mark = 0;
- }
-
- //! Shrink queue capacity to current contents; not thread-safe
- void shrink_to_fit() {
- std::vector<value_type, allocator_type>(data.begin(), data.end(), data.get_allocator()).swap(data);
+ my_size = 0;
}
//! Swap this queue with another; not thread-safe
+ /** This operation is unsafe if there are pending concurrent operations on the queue. */
void swap(concurrent_priority_queue& q) {
data.swap(q.data);
std::swap(mark, q.mark);
+ std::swap(my_size, q.my_size);
}
//! Return allocator object
allocator_type get_allocator() const { return data.get_allocator(); }
private:
- enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
+ enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
enum operation_status { WAIT=0, SUCCEEDED, FAILED };
class cpq_operation : public aggregated_operation<cpq_operation> {
char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
//! The point at which unsorted elements begin
size_type mark;
+ __TBB_atomic size_type my_size;
Compare compare;
//! Padding added to avoid false sharing
- char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)];
+ char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
//! Storage for the heap of elements in queue, plus unheapified elements
/** data has the following structure:
[_|...|_|_|...|_| |...| ]
0 ^ ^ ^
| | |__capacity
- | |__size
+ | |__my_size
|__mark
-
Thus, data stores the binary heap starting at position 0 through
- mark-1 (it may be empty). Then there are 0 or more elements
- that have not yet been inserted into the heap, in positions
- mark through size-1. */
+ mark-1 (it may be empty). Then there are 0 or more elements
+ that have not yet been inserted into the heap, in positions
+ mark through my_size-1. */
std::vector<value_type, allocator_type> data;
void handle_operations(cpq_operation *op_list) {
if (tmp->type == PUSH_OP) {
__TBB_TRY {
data.push_back(*(tmp->elem));
+ __TBB_store_with_release(my_size, my_size+1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
} __TBB_CATCH(...) {
itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
}
}
- else if (tmp->type == POP_OP) {
+ else { // tmp->type == POP_OP
+ __TBB_ASSERT(tmp->type == POP_OP, NULL);
if (mark < data.size() &&
compare(data[0], data[data.size()-1])) {
// there are newly pushed elems and the last one
// is higher than top
*(tmp->elem) = data[data.size()-1]; // copy the data
+ __TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
data.pop_back();
__TBB_ASSERT(mark<=data.size(), NULL);
pop_list = tmp;
}
}
- else {
- __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
- __TBB_TRY {
- data.reserve(tmp->sz);
- itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
- } __TBB_CATCH(...) {
- itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
- }
- }
}
// second pass processes pop operations
// there are newly pushed elems and the last one is
// higher than top
*(tmp->elem) = data[data.size()-1]; // copy the data
+ __TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
data.pop_back();
}
else { // extract top and push last element down heap
*(tmp->elem) = data[0]; // copy the data
+ __TBB_store_with_release(my_size, my_size-1);
itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
reheap();
}
//! Merge unsorted elements into heap
void heapify() {
- if (!mark) mark = 1;
+ if (!mark && data.size()>0) mark = 1;
for (; mark<data.size(); ++mark) {
// for each unheapified element under size
size_type cur_pos = mark;