]> git.sesse.net Git - casparcg/blobdiff - tbb/include/tbb/concurrent_priority_queue.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / concurrent_priority_queue.h
index 8a9f252b660309e35c287203620a01749e15da15..fe1839a779a9403321334c35fae5e3d8317cc3a0 100644 (file)
@@ -38,7 +38,7 @@
 #include "tbb_exception.h"
 #include "tbb_stddef.h"
 #include "tbb_profiling.h"
-#include "_aggregator_internal.h"
+#include "internal/_aggregator_impl.h"
 #include <vector>
 #include <iterator>
 #include <functional>
@@ -71,64 +71,71 @@ class concurrent_priority_queue {
     typedef A allocator_type;
 
     //! Constructs a new concurrent_priority_queue with default capacity
-    explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), data(a) {
+    explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), data(a)
+    {
         my_aggregator.initialize_handler(my_functor_t(this));
     }
 
     //! Constructs a new concurrent_priority_queue with init_sz capacity
-    explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) : mark(0), data(a) {
+    explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
+        mark(0), my_size(0), data(a)
+    {
         data.reserve(init_capacity);
         my_aggregator.initialize_handler(my_functor_t(this));
     }
 
     //! [begin,end) constructor
     template<typename InputIterator>
-    concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : data(begin, end, a)
+    concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
+        data(begin, end, a)
     {
         mark = 0;
         my_aggregator.initialize_handler(my_functor_t(this));
         heapify();
+        my_size = data.size();
     }
 
     //! Copy constructor
-    /** State of this queue may not reflect results of pending
-       operations on the copied queue. */
-    explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark), data(src.data.begin(), src.data.end(), src.data.get_allocator())
+    /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+    explicit concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
+        my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
     {
         my_aggregator.initialize_handler(my_functor_t(this));
         heapify();
     }
 
-    concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark), data(src.data.begin(), src.data.end(), a)
+    //! Copy constructor with specific allocator
+    /** This operation is unsafe if there are pending concurrent operations on the src queue. */
+    concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
+        my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
     {
         my_aggregator.initialize_handler(my_functor_t(this));
         heapify();
     }
 
     //! Assignment operator
-    /** State of this queue may not reflect results of pending
-       operations on the copied queue. */
+    /** This operation is unsafe if there are pending concurrent operations on the src queue. */
     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
         if (this != &src) {
             std::vector<value_type, allocator_type>(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
             mark = src.mark;
+            my_size = src.my_size;
         }
         return *this;
     }
 
     //! Returns true if empty, false otherwise
-    /** Returned value may not reflect results of pending operations. */
-    bool empty() const { return data.empty(); }
+    /** Returned value may not reflect results of pending operations.
+        This operation reads shared data and will trigger a race condition. */
+    bool empty() const { return size()==0; }
 
     //! Returns the current number of elements contained in the queue
-    /** Returned value may not reflect results of pending operations. */
-    size_type size() const { return data.size(); }
-
-    //! Returns the current capacity (i.e. allocated storage) of the queue
-    /** Returned value may not reflect results of pending operations. */
-    size_type capacity() const { return data.capacity(); }
+    /** Returned value may not reflect results of pending operations.
+        This operation reads shared data and will trigger a race condition. */
+    size_type size() const { return __TBB_load_with_acquire(my_size); }
 
     //! Pushes elem onto the queue, increasing capacity of queue if necessary
+    /** This operation can be safely used concurrently with other push, try_pop or reserve operations. */
     void push(const_reference elem) {
         cpq_operation op_data(elem, PUSH_OP);
         my_aggregator.execute(&op_data);
@@ -138,7 +145,8 @@ class concurrent_priority_queue {
 
     //! Gets a reference to and removes highest priority element
     /** If a highest priority element was found, sets elem and returns true,
-        otherwise returns false. */
+        otherwise returns false.
+        This operation can be safely used concurrently with other push, try_pop or reserve operations. */
     bool try_pop(reference elem) {
         cpq_operation op_data(POP_OP);
         op_data.elem = &elem;
@@ -146,39 +154,29 @@ class concurrent_priority_queue {
         return op_data.status==SUCCEEDED;
     }
 
-    //! If current capacity is less than new_cap, increases capacity to new_cap
-    void reserve(size_type new_cap) {
-        cpq_operation op_data(RESERVE_OP);
-        op_data.sz = new_cap;
-        my_aggregator.execute(&op_data);
-        if (op_data.status == FAILED) // exception thrown
-            throw_exception(eid_bad_alloc);
-    }
-
     //! Clear the queue; not thread-safe
-    /** Resets size, effectively emptying queue; does not free space.
+    /** This operation is unsafe if there are pending concurrent operations on the queue.
+        Resets size, effectively emptying queue; does not free space.
         May not clear elements added in pending operations. */
     void clear() {
         data.clear();
         mark = 0;
-    }
-
-    //! Shrink queue capacity to current contents; not thread-safe
-    void shrink_to_fit() {
-        std::vector<value_type, allocator_type>(data.begin(), data.end(), data.get_allocator()).swap(data);
+        my_size = 0;
     }
 
     //! Swap this queue with another; not thread-safe
+    /** This operation is unsafe if there are pending concurrent operations on the queue. */
     void swap(concurrent_priority_queue& q) {
         data.swap(q.data);
         std::swap(mark, q.mark);
+        std::swap(my_size, q.my_size);
     }
 
     //! Return allocator object
     allocator_type get_allocator() const { return data.get_allocator(); }
 
  private:
-    enum operation_type {INVALID_OP, PUSH_OP, POP_OP, RESERVE_OP};
+    enum operation_type {INVALID_OP, PUSH_OP, POP_OP};
     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
 
     class cpq_operation : public aggregated_operation<cpq_operation> {
@@ -208,9 +206,10 @@ class concurrent_priority_queue {
     char padding1[NFS_MaxLineSize - sizeof(aggregator< my_functor_t, cpq_operation >)];
     //! The point at which unsorted elements begin
     size_type mark;
+    __TBB_atomic size_type my_size;
     Compare compare;
     //! Padding added to avoid false sharing
-    char padding2[NFS_MaxLineSize - sizeof(size_type) - sizeof(Compare)];
+    char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
     //! Storage for the heap of elements in queue, plus unheapified elements
     /** data has the following structure:
 
@@ -222,14 +221,13 @@ class concurrent_priority_queue {
         [_|...|_|_|...|_| |...| ]
          0       ^       ^       ^
                  |       |       |__capacity
-                 |       |__size
+                 |       |__my_size
                  |__mark
-                 
 
         Thus, data stores the binary heap starting at position 0 through
-        mark-1 (it may be empty).  Then there are 0 or more elements 
-        that have not yet been inserted into the heap, in positions 
-        mark through size-1. */
+        mark-1 (it may be empty).  Then there are 0 or more elements
+        that have not yet been inserted into the heap, in positions
+        mark through my_size-1. */
     std::vector<value_type, allocator_type> data;
 
     void handle_operations(cpq_operation *op_list) {
@@ -254,17 +252,20 @@ class concurrent_priority_queue {
             if (tmp->type == PUSH_OP) {
                 __TBB_TRY {
                     data.push_back(*(tmp->elem));
+                    __TBB_store_with_release(my_size, my_size+1);
                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
                 } __TBB_CATCH(...) {
                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
                 }
             }
-            else if (tmp->type == POP_OP) {
+            else { // tmp->type == POP_OP
+                __TBB_ASSERT(tmp->type == POP_OP, NULL);
                 if (mark < data.size() &&
                     compare(data[0], data[data.size()-1])) {
                     // there are newly pushed elems and the last one
                     // is higher than top
                     *(tmp->elem) = data[data.size()-1]; // copy the data
+                    __TBB_store_with_release(my_size, my_size-1);
                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
                     data.pop_back();
                     __TBB_ASSERT(mark<=data.size(), NULL);
@@ -274,15 +275,6 @@ class concurrent_priority_queue {
                     pop_list = tmp;
                 }
             }
-            else {
-                __TBB_ASSERT(tmp->type == RESERVE_OP, NULL);
-                __TBB_TRY {
-                    data.reserve(tmp->sz);
-                    itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
-                } __TBB_CATCH(...) {
-                    itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
-                }
-            }
         }
 
         // second pass processes pop operations
@@ -300,11 +292,13 @@ class concurrent_priority_queue {
                     // there are newly pushed elems and the last one is
                     // higher than top
                     *(tmp->elem) = data[data.size()-1]; // copy the data
+                    __TBB_store_with_release(my_size, my_size-1);
                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
                     data.pop_back();
                 }
                 else { // extract top and push last element down heap
                     *(tmp->elem) = data[0]; // copy the data
+                    __TBB_store_with_release(my_size, my_size-1);
                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
                     reheap();
                 }
@@ -319,7 +313,7 @@ class concurrent_priority_queue {
 
     //! Merge unsorted elements into heap
     void heapify() {
-        if (!mark) mark = 1;
+        if (!mark && data.size()>0) mark = 1;
         for (; mark<data.size(); ++mark) {
             // for each unheapified element under size
             size_type cur_pos = mark;