2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_concurrent_priority_queue_H
30 #define __TBB_concurrent_priority_queue_H
32 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
33 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
37 #include "cache_aligned_allocator.h"
38 #include "tbb_exception.h"
39 #include "tbb_stddef.h"
40 #include "tbb_profiling.h"
41 #include "internal/_aggregator_impl.h"
47 namespace interface5 {
49 using namespace tbb::internal;
51 //! Concurrent priority queue
52 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
53 class concurrent_priority_queue {
55 //! Element type in the queue.
61 //! Const reference type
62 typedef const T& const_reference;
64 //! Integral type for representing size of the queue.
65 typedef size_t size_type;
67 //! Difference type for iterator
68 typedef ptrdiff_t difference_type;
71 typedef A allocator_type;
73 //! Constructs a new concurrent_priority_queue with default capacity
74 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
76 my_aggregator.initialize_handler(my_functor_t(this));
79 //! Constructs a new concurrent_priority_queue with init_sz capacity
80 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
81 mark(0), my_size(0), data(a)
83 data.reserve(init_capacity);
84 my_aggregator.initialize_handler(my_functor_t(this));
87 //! [begin,end) constructor
88 template<typename InputIterator>
89 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
93 my_aggregator.initialize_handler(my_functor_t(this));
95 my_size = data.size();
99 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
100 explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
101 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
103 my_aggregator.initialize_handler(my_functor_t(this));
107 //! Copy constructor with specific allocator
108 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
109 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
110 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
112 my_aggregator.initialize_handler(my_functor_t(this));
116 //! Assignment operator
117 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
118 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
120 std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
122 my_size = src.my_size;
127 //! Returns true if empty, false otherwise
128 /** Returned value may not reflect results of pending operations.
129 This operation reads shared data and will trigger a race condition. */
130 bool empty() const { return size()==0; }
132 //! Returns the current number of elements contained in the queue
133 /** Returned value may not reflect results of pending operations.
134 This operation reads shared data and will trigger a race condition. */
135 size_type size() const { return __TBB_load_with_acquire(my_size); }
137 //! Pushes elem onto the queue, increasing capacity of queue if necessary
138 /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
139 void push(const_reference elem) {
140 cpq_operation op_data(elem, PUSH_OP);
141 my_aggregator.execute(&op_data);
142 if (op_data.status == FAILED) // exception thrown
143 throw_exception(eid_bad_alloc);
146 //! Gets a reference to and removes highest priority element
147 /** If a highest priority element was found, sets elem and returns true,
148 otherwise returns false.
149 This operation can be safely used concurrently with other push, try_pop or reserve operations. */
150 bool try_pop(reference elem) {
151 cpq_operation op_data(POP_OP);
152 op_data.elem = &elem;
153 my_aggregator.execute(&op_data);
154 return op_data.status==SUCCEEDED;
157 //! Clear the queue; not thread-safe
158 /** This operation is unsafe if there are pending concurrent operations on the queue.
159 Resets size, effectively emptying queue; does not free space.
160 May not clear elements added in pending operations. */
167 //! Swap this queue with another; not thread-safe
168 /** This operation is unsafe if there are pending concurrent operations on the queue. */
169 void swap(concurrent_priority_queue& q) {
171 std::swap(mark, q.mark);
172 std::swap(my_size, q.my_size);
175 //! Return allocator object
176 allocator_type get_allocator() const { return data.get_allocator(); }
179 enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
180 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
182 class cpq_operation : public aggregated_operation<cpq_operation> {
189 cpq_operation(const_reference e, operation_type t) :
190 type(t), elem(const_cast<value_type*>(&e)) {}
191 cpq_operation(operation_type t) : type(t) {}
195 concurrent_priority_queue<T, Compare, A> *cpq;
198 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
199 void operator()(cpq_operation* op_list) {
200 cpq->handle_operations(op_list);
204 aggregator< my_functor_t, cpq_operation> my_aggregator;
205 //! Padding added to avoid false sharing
206 char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
207 //! The point at which unsorted elements begin
209 __TBB_atomic size_type my_size;
211 //! Padding added to avoid false sharing
212 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
213 //! Storage for the heap of elements in queue, plus unheapified elements
214 /** data has the following structure:
221 [_|...|_|_|...|_| |...| ]
227 Thus, data stores the binary heap starting at position 0 through
228 mark-1 (it may be empty). Then there are 0 or more elements
229 that have not yet been inserted into the heap, in positions
230 mark through my_size-1. */
231 std::vector<value_type, allocator_type> data;
233 void handle_operations(cpq_operation *op_list) {
234 cpq_operation *tmp, *pop_list=NULL;
236 __TBB_ASSERT(mark == data.size(), NULL);
238 // first pass processes all constant time operations: pushes,
239 // tops, some pops. Also reserve.
241 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
242 // node. This thread is going to handle the operation, and so will acquire it
243 // and perform the associated operation w/o triggering a race condition; the
244 // thread that created the operation is waiting on the status field, so when
245 // this thread is done with the operation, it will perform a
246 // store_with_release to give control back to the waiting thread in
247 // aggregator::insert_operation.
248 call_itt_notify(acquired, &(op_list->status));
249 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
251 op_list = itt_hide_load_word(op_list->next);
252 if (tmp->type == PUSH_OP) {
254 data.push_back(*(tmp->elem));
255 __TBB_store_with_release(my_size, my_size+1);
256 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
258 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
261 else { // tmp->type == POP_OP
262 __TBB_ASSERT(tmp->type == POP_OP, NULL);
263 if (mark < data.size() &&
264 compare(data[0], data[data.size()-1])) {
265 // there are newly pushed elems and the last one
266 // is higher than top
267 *(tmp->elem) = data[data.size()-1]; // copy the data
268 __TBB_store_with_release(my_size, my_size-1);
269 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
271 __TBB_ASSERT(mark<=data.size(), NULL);
273 else { // no convenient item to pop; postpone
274 itt_hide_store_word(tmp->next, pop_list);
280 // second pass processes pop operations
283 pop_list = itt_hide_load_word(pop_list->next);
284 __TBB_ASSERT(tmp->type == POP_OP, NULL);
286 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
289 __TBB_ASSERT(mark<=data.size(), NULL);
290 if (mark < data.size() &&
291 compare(data[0], data[data.size()-1])) {
292 // there are newly pushed elems and the last one is
294 *(tmp->elem) = data[data.size()-1]; // copy the data
295 __TBB_store_with_release(my_size, my_size-1);
296 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
299 else { // extract top and push last element down heap
300 *(tmp->elem) = data[0]; // copy the data
301 __TBB_store_with_release(my_size, my_size-1);
302 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
308 // heapify any leftover pushed elements before doing the next
309 // batch of operations
310 if (mark<data.size()) heapify();
311 __TBB_ASSERT(mark == data.size(), NULL);
314 //! Merge unsorted elements into heap
316 if (!mark && data.size()>0) mark = 1;
317 for (; mark<data.size(); ++mark) {
318 // for each unheapified element under size
319 size_type cur_pos = mark;
320 value_type to_place = data[mark];
321 do { // push to_place up the heap
322 size_type parent = (cur_pos-1)>>1;
323 if (!compare(data[parent], to_place)) break;
324 data[cur_pos] = data[parent];
327 data[cur_pos] = to_place;
331 //! Re-heapify after an extraction
332 /** Re-heapify by pushing last element down the heap from the root. */
334 size_type cur_pos=0, child=1;
336 while (child < mark) {
337 size_type target = child;
338 if (child+1 < mark && compare(data[child], data[child+1]))
340 // target now has the higher priority child
341 if (compare(data[target], data[data.size()-1])) break;
342 data[cur_pos] = data[target];
344 child = (cur_pos<<1)+1;
346 data[cur_pos] = data[data.size()-1];
348 if (mark > data.size()) mark = data.size();
352 } // namespace interface5
354 using interface5::concurrent_priority_queue;
358 #endif /* __TBB_concurrent_priority_queue_H */