2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_concurrent_priority_queue_H
30 #define __TBB_concurrent_priority_queue_H
32 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
33 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
37 #include "cache_aligned_allocator.h"
38 #include "tbb_exception.h"
39 #include "tbb_stddef.h"
40 #include "tbb_profiling.h"
41 #include "_aggregator_internal.h"
47 namespace interface5 {
49 using namespace tbb::internal;
51 //! Concurrent priority queue
52 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
53 class concurrent_priority_queue {
55 //! Element type in the queue.
61 //! Const reference type
62 typedef const T& const_reference;
64 //! Integral type for representing size of the queue.
65 typedef size_t size_type;
67 //! Difference type for iterator
68 typedef ptrdiff_t difference_type;
71 typedef A allocator_type;
73 //! Constructs a new concurrent_priority_queue with default capacity
74 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) {
75 my_aggregator.initialize_handler(my_functor_t(this));
78 //! Constructs a new concurrent_priority_queue with init_sz capacity
79 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) {
80 data.reserve(init_capacity);
81 my_aggregator.initialize_handler(my_functor_t(this));
84 //! [begin,end) constructor
85 template<typename InputIterator>
86 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a)
89 my_aggregator.initialize_handler(my_functor_t(this));
94 /** State of this queue may not reflect results of pending
95 operations on the copied queue. */
96 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator())
98 my_aggregator.initialize_handler(my_functor_t(this));
102 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a)
104 my_aggregator.initialize_handler(my_functor_t(this));
108 //! Assignment operator
109 /** State of this queue may not reflect results of pending
110 operations on the copied queue. */
111 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
113 std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
119 //! Returns true if empty, false otherwise
120 /** Returned value may not reflect results of pending operations. */
121 bool empty() const { return data.empty(); }
123 //! Returns the current number of elements contained in the queue
124 /** Returned value may not reflect results of pending operations. */
125 size_type size() const { return data.size(); }
127 //! Returns the current capacity (i.e. allocated storage) of the queue
128 /** Returned value may not reflect results of pending operations. */
129 size_type capacity() const { return data.capacity(); }
131 //! Pushes elem onto the queue, increasing capacity of queue if necessary
132 void push(const_reference elem) {
133 cpq_operation op_data(elem, PUSH_OP);
134 my_aggregator.execute(&op_data);
135 if (op_data.status == FAILED) // exception thrown
136 throw_exception(eid_bad_alloc);
139 //! Gets a reference to and removes highest priority element
140 /** If a highest priority element was found, sets elem and returns true,
141 otherwise returns false. */
142 bool try_pop(reference elem) {
143 cpq_operation op_data(POP_OP);
144 op_data.elem = &elem;
145 my_aggregator.execute(&op_data);
146 return op_data.status==SUCCEEDED;
149 //! If current capacity is less than new_cap, increases capacity to new_cap
150 void reserve(size_type new_cap) {
151 cpq_operation op_data(RESERVE_OP);
152 op_data.sz = new_cap;
153 my_aggregator.execute(&op_data);
154 if (op_data.status == FAILED) // exception thrown
155 throw_exception(eid_bad_alloc);
158 //! Clear the queue; not thread-safe
159 /** Resets size, effectively emptying queue; does not free space.
160 May not clear elements added in pending operations. */
166 //! Shrink queue capacity to current contents; not thread-safe
167 void shrink_to_fit() {
168 std::vector<value_type, allocator_type>(data.begin(), data.end(), data.get_allocator()).swap(data);
171 //! Swap this queue with another; not thread-safe
172 void swap(concurrent_priority_queue& q) {
174 std::swap(mark, q.mark);
177 //! Return allocator object
178 allocator_type get_allocator() const { return data.get_allocator(); }
181 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
182 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
184 class cpq_operation : public aggregated_operation<cpq_operation> {
191 cpq_operation(const_reference e, operation_type t) :
192 type(t), elem(const_cast<value_type*>(&e)) {}
193 cpq_operation(operation_type t) : type(t) {}
197 concurrent_priority_queue<T, Compare, A> *cpq;
200 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
201 void operator()(cpq_operation* op_list) {
202 cpq->handle_operations(op_list);
206 aggregator< my_functor_t, cpq_operation> my_aggregator;
207 //! Padding added to avoid false sharing
208 char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
209 //! The point at which unsorted elements begin
212 //! Padding added to avoid false sharing
213 char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)];
214 //! Storage for the heap of elements in queue, plus unheapified elements
215 /** data has the following structure:
222 [_|...|_|_|...|_| |...| ]
229 Thus, data stores the binary heap starting at position 0 through
230 mark-1 (it may be empty). Then there are 0 or more elements
231 that have not yet been inserted into the heap, in positions
232 mark through size-1. */
233 std::vector<value_type, allocator_type> data;
235 void handle_operations(cpq_operation *op_list) {
236 cpq_operation *tmp, *pop_list=NULL;
238 __TBB_ASSERT(mark == data.size(), NULL);
240 // first pass processes all constant time operations: pushes,
241 // tops, some pops. Also reserve.
243 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
244 // node. This thread is going to handle the operation, and so will acquire it
245 // and perform the associated operation w/o triggering a race condition; the
246 // thread that created the operation is waiting on the status field, so when
247 // this thread is done with the operation, it will perform a
248 // store_with_release to give control back to the waiting thread in
249 // aggregator::insert_operation.
250 call_itt_notify(acquired, &(op_list->status));
251 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
253 op_list = itt_hide_load_word(op_list->next);
254 if (tmp->type == PUSH_OP) {
256 data.push_back(*(tmp->elem));
257 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
259 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
262 else if (tmp->type == POP_OP) {
263 if (mark < data.size() &&
264 compare(data[0], data[data.size()-1])) {
265 // there are newly pushed elems and the last one
266 // is higher than top
267 *(tmp->elem) = data[data.size()-1]; // copy the data
268 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
270 __TBB_ASSERT(mark<=data.size(), NULL);
272 else { // no convenient item to pop; postpone
273 itt_hide_store_word(tmp->next, pop_list);
278 __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
280 data.reserve(tmp->sz);
281 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
283 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
288 // second pass processes pop operations
291 pop_list = itt_hide_load_word(pop_list->next);
292 __TBB_ASSERT(tmp->type == POP_OP, NULL);
294 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
297 __TBB_ASSERT(mark<=data.size(), NULL);
298 if (mark < data.size() &&
299 compare(data[0], data[data.size()-1])) {
300 // there are newly pushed elems and the last one is
302 *(tmp->elem) = data[data.size()-1]; // copy the data
303 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
306 else { // extract top and push last element down heap
307 *(tmp->elem) = data[0]; // copy the data
308 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
314 // heapify any leftover pushed elements before doing the next
315 // batch of operations
316 if (mark<data.size()) heapify();
317 __TBB_ASSERT(mark == data.size(), NULL);
320 //! Merge unsorted elements into heap
323 for (; mark<data.size(); ++mark) {
324 // for each unheapified element under size
325 size_type cur_pos = mark;
326 value_type to_place = data[mark];
327 do { // push to_place up the heap
328 size_type parent = (cur_pos-1)>>1;
329 if (!compare(data[parent], to_place)) break;
330 data[cur_pos] = data[parent];
333 data[cur_pos] = to_place;
337 //! Re-heapify after an extraction
338 /** Re-heapify by pushing last element down the heap from the root. */
340 size_type cur_pos=0, child=1;
342 while (child < mark) {
343 size_type target = child;
344 if (child+1 < mark && compare(data[child], data[child+1]))
346 // target now has the higher priority child
347 if (compare(data[target], data[data.size()-1])) break;
348 data[cur_pos] = data[target];
350 child = (cur_pos<<1)+1;
352 data[cur_pos] = data[data.size()-1];
354 if (mark > data.size()) mark = data.size();
358 } // namespace interface5
360 using interface5::concurrent_priority_queue;
364 #endif /* __TBB_concurrent_priority_queue_H */