]> git.sesse.net Git - casparcg/blob - dependencies/tbb/include/tbb/concurrent_priority_queue.h
Subtree merge of old SVN "dependencies" folder into the "master" git branch. You...
[casparcg] / dependencies / tbb / include / tbb / concurrent_priority_queue.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_concurrent_priority_queue_H
30 #define __TBB_concurrent_priority_queue_H
31
32 #if !TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE
33 #error Set TBB_PREVIEW_CONCURRENT_PRIORITY_QUEUE to include concurrent_priority_queue.h
34 #endif
35
36 #include "atomic.h"
37 #include "cache_aligned_allocator.h"
38 #include "tbb_exception.h"
39 #include "tbb_stddef.h"
40 #include "tbb_profiling.h"
41 #include "internal/_aggregator_impl.h"
42 #include <vector>
43 #include <iterator>
44 #include <functional>
45
46 namespace tbb {
47 namespace interface5 {
48
49 using namespace tbb::internal;
50
51 //! Concurrent priority queue
52 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
53 class concurrent_priority_queue {
54  public:
55     //! Element type in the queue.
56     typedef T value_type;
57
58     //! Reference type
59     typedef T& reference;
60
61     //! Const reference type
62     typedef const T& const_reference;
63
64     //! Integral type for representing size of the queue.
65     typedef size_t size_type;
66
67     //! Difference type for iterator
68     typedef ptrdiff_t difference_type;
69
70     //! Allocator type
71     typedef A allocator_type;
72
73     //! Constructs a new concurrent_priority_queue with default capacity
74     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
75     {
76         my_aggregator.initialize_handler(my_functor_t(this));
77     }
78
79     //! Constructs a new concurrent_priority_queue with init_sz capacity
80     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
81         mark(0), my_size(0), data(a)
82     {
83         data.reserve(init_capacity);
84         my_aggregator.initialize_handler(my_functor_t(this));
85     }
86
87     //! [begin,end) constructor
88     template<typename InputIterator>
89     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
90         data(begin, end, a)
91     {
92         mark = 0;
93         my_aggregator.initialize_handler(my_functor_t(this));
94         heapify();
95         my_size = data.size();
96     }
97
98     //! Copy constructor
99     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
100     explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
101         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
102     {
103         my_aggregator.initialize_handler(my_functor_t(this));
104         heapify();
105     }
106
107     //! Copy constructor with specific allocator
108     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
109     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
110         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
111     {
112         my_aggregator.initialize_handler(my_functor_t(this));
113         heapify();
114     }
115
116     //! Assignment operator
117     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
118     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
119         if (this != &src) {
120             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
121             mark = src.mark;
122             my_size = src.my_size;
123         }
124         return *this;
125     }
126
127     //! Returns true if empty, false otherwise
128     /** Returned value may not reflect results of pending operations.
129         This operation reads shared data and will trigger a race condition. */
130     bool empty() const { return size()==0; }
131
132     //! Returns the current number of elements contained in the queue
133     /** Returned value may not reflect results of pending operations.
134         This operation reads shared data and will trigger a race condition. */
135     size_type size() const { return __TBB_load_with_acquire(my_size); }
136
137     //! Pushes elem onto the queue, increasing capacity of queue if necessary
138     /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
139     void push(const_reference elem) {
140         cpq_operation op_data(elem, PUSH_OP);
141         my_aggregator.execute(&op_data);
142         if (op_data.status == FAILED) // exception thrown
143             throw_exception(eid_bad_alloc);
144     }
145
146     //! Gets a reference to and removes highest priority element
147     /** If a highest priority element was found, sets elem and returns true,
148         otherwise returns false.
149         This operation can be safely used concurrently with other push, try_pop or reserve operations. */
150     bool try_pop(reference elem) {
151         cpq_operation op_data(POP_OP);
152         op_data.elem = &elem;
153         my_aggregator.execute(&op_data);
154         return op_data.status==SUCCEEDED;
155     }
156
157     //! Clear the queue; not thread-safe
158     /** This operation is unsafe if there are pending concurrent operations on the queue.
159         Resets size, effectively emptying queue; does not free space.
160         May not clear elements added in pending operations. */
161     void clear() {
162         data.clear();
163         mark = 0;
164         my_size = 0;
165     }
166
167     //! Swap this queue with another; not thread-safe
168     /** This operation is unsafe if there are pending concurrent operations on the queue. */
169     void swap(concurrent_priority_queue& q) {
170         data.swap(q.data);
171         std::swap(mark, q.mark);
172         std::swap(my_size, q.my_size);
173     }
174
175     //! Return allocator object
176     allocator_type get_allocator() const { return data.get_allocator(); }
177
178  private:
179     enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
180     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
181
182     class cpq_operation : public aggregated_operation<cpq_operation> {
183      public:
184         operation_type type;
185         union {
186             value_type *elem;
187             size_type sz;
188         };
189         cpq_operation(const_reference e, operation_type t) :
190             type(t), elem(const_cast<value_type*>(&e)) {}
191         cpq_operation(operation_type t) : type(t) {}
192     };
193
194     class my_functor_t {
195         concurrent_priority_queue<T, Compare, A> *cpq;
196      public:
197         my_functor_t() {}
198         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
199         void operator()(cpq_operation* op_list) {
200             cpq->handle_operations(op_list);
201         }
202     };
203
204     aggregator< my_functor_t, cpq_operation> my_aggregator;
205     //! Padding added to avoid false sharing
206     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
207     //! The point at which unsorted elements begin
208     size_type mark;
209     __TBB_atomic size_type my_size;
210     Compare compare;
211     //! Padding added to avoid false sharing
212     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
213     //! Storage for the heap of elements in queue, plus unheapified elements
214     /** data has the following structure:
215
216          binary unheapified
217           heap   elements
218         ____|_______|____
219         |       |       |
220         v       v       v
221         [_|...|_|_|...|_| |...| ]
222          0       ^       ^       ^
223                  |       |       |__capacity
224                  |       |__my_size
225                  |__mark
226
227         Thus, data stores the binary heap starting at position 0 through
228         mark-1 (it may be empty).  Then there are 0 or more elements
229         that have not yet been inserted into the heap, in positions
230         mark through my_size-1. */
231     std::vector<value_type, allocator_type> data;
232
233     void handle_operations(cpq_operation *op_list) {
234         cpq_operation *tmp, *pop_list=NULL;
235
236         __TBB_ASSERT(mark == data.size(), NULL);
237
238         // first pass processes all constant time operations: pushes,
239         // tops, some pops. Also reserve.
240         while (op_list) {
241             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
242             // node. This thread is going to handle the operation, and so will acquire it
243             // and perform the associated operation w/o triggering a race condition; the
244             // thread that created the operation is waiting on the status field, so when
245             // this thread is done with the operation, it will perform a
246             // store_with_release to give control back to the waiting thread in
247             // aggregator::insert_operation.
248             call_itt_notify(acquired, &(op_list->status));
249             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
250             tmp = op_list;
251             op_list = itt_hide_load_word(op_list->next);
252             if (tmp->type == PUSH_OP) {
253                 __TBB_TRY {
254                     data.push_back(*(tmp->elem));
255                     __TBB_store_with_release(my_size, my_size+1);
256                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
257                 } __TBB_CATCH(...) {
258                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
259                 }
260             }
261             else { // tmp->type == POP_OP
262                 __TBB_ASSERT(tmp->type == POP_OP, NULL);
263                 if (mark < data.size() &&
264                     compare(data[0], data[data.size()-1])) {
265                     // there are newly pushed elems and the last one
266                     // is higher than top
267                     *(tmp->elem) = data[data.size()-1]; // copy the data
268                     __TBB_store_with_release(my_size, my_size-1);
269                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
270                     data.pop_back();
271                     __TBB_ASSERT(mark<=data.size(), NULL);
272                 }
273                 else { // no convenient item to pop; postpone
274                     itt_hide_store_word(tmp->next, pop_list);
275                     pop_list = tmp;
276                 }
277             }
278         }
279
280         // second pass processes pop operations
281         while (pop_list) {
282             tmp = pop_list;
283             pop_list = itt_hide_load_word(pop_list->next);
284             __TBB_ASSERT(tmp->type == POP_OP, NULL);
285             if (data.empty()) {
286                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
287             }
288             else {
289                 __TBB_ASSERT(mark<=data.size(), NULL);
290                 if (mark < data.size() &&
291                     compare(data[0], data[data.size()-1])) {
292                     // there are newly pushed elems and the last one is
293                     // higher than top
294                     *(tmp->elem) = data[data.size()-1]; // copy the data
295                     __TBB_store_with_release(my_size, my_size-1);
296                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
297                     data.pop_back();
298                 }
299                 else { // extract top and push last element down heap
300                     *(tmp->elem) = data[0]; // copy the data
301                     __TBB_store_with_release(my_size, my_size-1);
302                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
303                     reheap();
304                 }
305             }
306         }
307
308         // heapify any leftover pushed elements before doing the next
309         // batch of operations
310         if (mark<data.size()) heapify();
311         __TBB_ASSERT(mark == data.size(), NULL);
312     }
313
314     //! Merge unsorted elements into heap
315     void heapify() {
316         if (!mark && data.size()>0) mark = 1;
317         for (; mark<data.size(); ++mark) {
318             // for each unheapified element under size
319             size_type cur_pos = mark;
320             value_type to_place = data[mark];
321             do { // push to_place up the heap
322                 size_type parent = (cur_pos-1)>>1;
323                 if (!compare(data[parent], to_place)) break;
324                 data[cur_pos] = data[parent];
325                 cur_pos = parent;
326             } while( cur_pos );
327             data[cur_pos] = to_place;
328         }
329     }
330
331     //! Re-heapify after an extraction
332     /** Re-heapify by pushing last element down the heap from the root. */
333     void reheap() {
334         size_type cur_pos=0, child=1;
335
336         while (child < mark) {
337             size_type target = child;
338             if (child+1 < mark && compare(data[child], data[child+1]))
339                 ++target;
340             // target now has the higher priority child
341             if (compare(data[target], data[data.size()-1])) break;
342             data[cur_pos] = data[target];
343             cur_pos = target;
344             child = (cur_pos<<1)+1;
345         }
346         data[cur_pos] = data[data.size()-1];
347         data.pop_back();
348         if (mark > data.size()) mark = data.size();
349     }
350 };
351
352 } // namespace interface5
353
354 using interface5::concurrent_priority_queue;
355
356 } // namespace tbb
357
358 #endif /* __TBB_concurrent_priority_queue_H */