]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/concurrent_priority_queue.h
8a9f252b660309e35c287203620a01749e15da15
[casparcg] / tbb / include / tbb / concurrent_priority_queue.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_concurrent_priority_queue_H
30 #define __TBB_concurrent_priority_queue_H
31
32 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
33 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
34 #endif
35
36 #include "atomic.h"
37 #include "cache_aligned_allocator.h"
38 #include "tbb_exception.h"
39 #include "tbb_stddef.h"
40 #include "tbb_profiling.h"
41 #include "_aggregator_internal.h"
42 #include <vector>
43 #include <iterator>
44 #include <functional>
45
46 namespace tbb {
47 namespace interface5 {
48
49 using namespace tbb::internal;
50
51 //! Concurrent priority queue
52 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
53 class concurrent_priority_queue {
54  public:
55     //! Element type in the queue.
56     typedef T value_type;
57
58     //! Reference type
59     typedef T& reference;
60
61     //! Const reference type
62     typedef const T& const_reference;
63
64     //! Integral type for representing size of the queue.
65     typedef size_t size_type;
66
67     //! Difference type for iterator
68     typedef ptrdiff_t difference_type;
69
70     //! Allocator type
71     typedef A allocator_type;
72
73     //! Constructs a new concurrent_priority_queue with default capacity
74     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) {
75         my_aggregator.initialize_handler(my_functor_t(this));
76     }
77
78     //! Constructs a new concurrent_priority_queue with init_sz capacity
79     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) {
80         data.reserve(init_capacity);
81         my_aggregator.initialize_handler(my_functor_t(this));
82     }
83
84     //! [begin,end) constructor
85     template<typename InputIterator>
86     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a)
87     {
88         mark = 0;
89         my_aggregator.initialize_handler(my_functor_t(this));
90         heapify();
91     }
92
93     //! Copy constructor
94     /** State of this queue may not reflect results of pending
95         operations on the copied queue. */
96     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator())
97     {
98         my_aggregator.initialize_handler(my_functor_t(this));
99         heapify();
100     }
101
102     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a)
103     {
104         my_aggregator.initialize_handler(my_functor_t(this));
105         heapify();
106     }
107
108     //! Assignment operator
109     /** State of this queue may not reflect results of pending
110         operations on the copied queue. */
111     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
112         if (this != &src) {
113             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
114             mark = src.mark;
115         }
116         return *this;
117     }
118
119     //! Returns true if empty, false otherwise
120     /** Returned value may not reflect results of pending operations. */
121     bool empty() const { return data.empty(); }
122
123     //! Returns the current number of elements contained in the queue
124     /** Returned value may not reflect results of pending operations. */
125     size_type size() const { return data.size(); }
126
127     //! Returns the current capacity (i.e. allocated storage) of the queue
128     /** Returned value may not reflect results of pending operations. */
129     size_type capacity() const { return data.capacity(); }
130
131     //! Pushes elem onto the queue, increasing capacity of queue if necessary
132     void push(const_reference elem) {
133         cpq_operation op_data(elem, PUSH_OP);
134         my_aggregator.execute(&op_data);
135         if (op_data.status == FAILED) // exception thrown
136             throw_exception(eid_bad_alloc);
137     }
138
139     //! Gets a reference to and removes highest priority element
140     /** If a highest priority element was found, sets elem and returns true,
141         otherwise returns false. */
142     bool try_pop(reference elem) {
143         cpq_operation op_data(POP_OP);
144         op_data.elem = &elem;
145         my_aggregator.execute(&op_data);
146         return op_data.status==SUCCEEDED;
147     }
148
149     //! If current capacity is less than new_cap, increases capacity to new_cap
150     void reserve(size_type new_cap) {
151         cpq_operation op_data(RESERVE_OP);
152         op_data.sz = new_cap;
153         my_aggregator.execute(&op_data);
154         if (op_data.status == FAILED) // exception thrown
155             throw_exception(eid_bad_alloc);
156     }
157
158     //! Clear the queue; not thread-safe
159     /** Resets size, effectively emptying queue; does not free space.
160         May not clear elements added in pending operations. */
161     void clear() {
162         data.clear();
163         mark = 0;
164     }
165
166     //! Shrink queue capacity to current contents; not thread-safe
167     void shrink_to_fit() {
168         std::vector<value_type, allocator_type>(data.begin(), data.end(), data.get_allocator()).swap(data);
169     }
170
171     //! Swap this queue with another; not thread-safe
172     void swap(concurrent_priority_queue& q) {
173         data.swap(q.data);
174         std::swap(mark, q.mark);
175     }
176
177     //! Return allocator object
178     allocator_type get_allocator() const { return data.get_allocator(); }
179
180  private:
181     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
182     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
183
184     class cpq_operation : public aggregated_operation<cpq_operation> {
185      public:
186         operation_type type;
187         union {
188             value_type *elem;
189             size_type sz;
190         };
191         cpq_operation(const_reference e, operation_type t) :
192             type(t), elem(const_cast<value_type*>(&e)) {}
193         cpq_operation(operation_type t) : type(t) {}
194     };
195
196     class my_functor_t {
197         concurrent_priority_queue<T, Compare, A> *cpq;
198      public:
199         my_functor_t() {}
200         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
201         void operator()(cpq_operation* op_list) {
202             cpq->handle_operations(op_list);
203         }
204     };
205
206     aggregator< my_functor_t, cpq_operation> my_aggregator;
207     //! Padding added to avoid false sharing
208     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
209     //! The point at which unsorted elements begin
210     size_type mark;
211     Compare compare;
212     //! Padding added to avoid false sharing
213     char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)];
214     //! Storage for the heap of elements in queue, plus unheapified elements
215     /** data has the following structure:
216
217          binary unheapified
218           heap   elements
219         ____|_______|____
220         |       |       |
221         v       v       v
222         [_|...|_|_|...|_| |...| ]
223          0       ^       ^       ^
224                  |       |       |__capacity
225                  |       |__size
226                  |__mark
227                  
228
229         Thus, data stores the binary heap starting at position 0 through
230         mark-1 (it may be empty).  Then there are 0 or more elements 
231         that have not yet been inserted into the heap, in positions 
232         mark through size-1. */
233     std::vector<value_type, allocator_type> data;
234
235     void handle_operations(cpq_operation *op_list) {
236         cpq_operation *tmp, *pop_list=NULL;
237
238         __TBB_ASSERT(mark == data.size(), NULL);
239
240         // first pass processes all constant time operations: pushes,
241         // tops, some pops. Also reserve.
242         while (op_list) {
243             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
244             // node. This thread is going to handle the operation, and so will acquire it
245             // and perform the associated operation w/o triggering a race condition; the
246             // thread that created the operation is waiting on the status field, so when
247             // this thread is done with the operation, it will perform a
248             // store_with_release to give control back to the waiting thread in
249             // aggregator::insert_operation.
250             call_itt_notify(acquired, &(op_list->status));
251             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
252             tmp = op_list;
253             op_list = itt_hide_load_word(op_list->next);
254             if (tmp->type == PUSH_OP) {
255                 __TBB_TRY {
256                     data.push_back(*(tmp->elem));
257                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
258                 } __TBB_CATCH(...) {
259                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
260                 }
261             }
262             else if (tmp->type == POP_OP) {
263                 if (mark < data.size() &&
264                     compare(data[0], data[data.size()-1])) {
265                     // there are newly pushed elems and the last one
266                     // is higher than top
267                     *(tmp->elem) = data[data.size()-1]; // copy the data
268                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
269                     data.pop_back();
270                     __TBB_ASSERT(mark<=data.size(), NULL);
271                 }
272                 else { // no convenient item to pop; postpone
273                     itt_hide_store_word(tmp->next, pop_list);
274                     pop_list = tmp;
275                 }
276             }
277             else {
278                 __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
279                 __TBB_TRY {
280                     data.reserve(tmp->sz);
281                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
282                 } __TBB_CATCH(...) {
283                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
284                 }
285             }
286         }
287
288         // second pass processes pop operations
289         while (pop_list) {
290             tmp = pop_list;
291             pop_list = itt_hide_load_word(pop_list->next);
292             __TBB_ASSERT(tmp->type == POP_OP, NULL);
293             if (data.empty()) {
294                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
295             }
296             else {
297                 __TBB_ASSERT(mark<=data.size(), NULL);
298                 if (mark < data.size() &&
299                     compare(data[0], data[data.size()-1])) {
300                     // there are newly pushed elems and the last one is
301                     // higher than top
302                     *(tmp->elem) = data[data.size()-1]; // copy the data
303                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
304                     data.pop_back();
305                 }
306                 else { // extract top and push last element down heap
307                     *(tmp->elem) = data[0]; // copy the data
308                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
309                     reheap();
310                 }
311             }
312         }
313
314         // heapify any leftover pushed elements before doing the next
315         // batch of operations
316         if (mark<data.size()) heapify();
317         __TBB_ASSERT(mark == data.size(), NULL);
318     }
319
320     //! Merge unsorted elements into heap
321     void heapify() {
322         if (!mark) mark = 1;
323         for (; mark<data.size(); ++mark) {
324             // for each unheapified element under size
325             size_type cur_pos = mark;
326             value_type to_place = data[mark];
327             do { // push to_place up the heap
328                 size_type parent = (cur_pos-1)>>1;
329                 if (!compare(data[parent], to_place)) break;
330                 data[cur_pos] = data[parent];
331                 cur_pos = parent;
332             } while( cur_pos );
333             data[cur_pos] = to_place;
334         }
335     }
336
337     //! Re-heapify after an extraction
338     /** Re-heapify by pushing last element down the heap from the root. */
339     void reheap() {
340         size_type cur_pos=0, child=1;
341
342         while (child < mark) {
343             size_type target = child;
344             if (child+1 < mark && compare(data[child], data[child+1]))
345                 ++target;
346             // target now has the higher priority child
347             if (compare(data[target], data[data.size()-1])) break;
348             data[cur_pos] = data[target];
349             cur_pos = target;
350             child = (cur_pos<<1)+1;
351         }
352         data[cur_pos] = data[data.size()-1];
353         data.pop_back();
354         if (mark > data.size()) mark = data.size();
355     }
356 };
357
358 } // namespace interface5
359
360 using interface5::concurrent_priority_queue;
361
362 } // namespace tbb
363
364 #endif /* __TBB_concurrent_priority_queue_H */