]> git.sesse.net Git - casparcg/blob - tbb/include/tbb/internal/_aggregator_impl.h
2.0. Updated tbb library.
[casparcg] / tbb / include / tbb / internal / _aggregator_impl.h
1 /*
2     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks.
5
6     Threading Building Blocks is free software; you can redistribute it
7     and/or modify it under the terms of the GNU General Public License
8     version 2 as published by the Free Software Foundation.
9
10     Threading Building Blocks is distributed in the hope that it will be
11     useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13     GNU General Public License for more details.
14
15     You should have received a copy of the GNU General Public License
16     along with Threading Building Blocks; if not, write to the Free Software
17     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18
19     As a special exception, you may use this file as part of a free software
20     library without restriction.  Specifically, if other files instantiate
21     templates or use macros or inline functions from this file, or you compile
22     this file and link it with other files to produce an executable, this
23     file does not by itself cause the resulting executable to be covered by
24     the GNU General Public License.  This exception does not however
25     invalidate any other reasons why the executable file might be covered by
26     the GNU General Public License.
27 */
28
29 #ifndef __TBB_aggregator_internal_H
30 #define __TBB_aggregator_internal_H
31
32 #include "../atomic.h"
33 #include "../tbb_profiling.h"
34
35 namespace tbb {
36 namespace interface6 {
37 namespace internal {
38
39 using namespace tbb::internal;
40
41 //! aggregated_operation base class
42 template <typename Derived>
43 class aggregated_operation {
44  public:
45     uintptr_t status;
46     Derived *next;
47     aggregated_operation() : status(0), next(NULL) {}
48 };
49
50 //! Aggregator base class
51 /** An aggregator for collecting operations coming from multiple sources and executing
52     them serially on a single thread.  operation_type must be derived from
53     aggregated_operation. The parameter handler_type is a functor that will be passed the
54     list of operations and is expected to handle each operation appropriately, setting the
55     status of each operation to non-zero.*/
56  template < typename handler_type, typename operation_type >
57 class aggregator {
58  public:
59     aggregator() : handler_busy(false) { pending_operations = NULL; }
60     explicit aggregator(handler_type h) : handler_busy(false), handle_operations(h) {
61         pending_operations = NULL; 
62     }
63
64     void initialize_handler(handler_type h) { handle_operations = h; }
65
66     //! Place operation in list
67     /** Place operation in list and either handle list or wait for operation to
68         complete.  */
69     void execute(operation_type *op) {
70         operation_type *res;
71
72         // ITT note: &(op->status) tag is used to cover accesses to this op node. This
73         // thread has created the operation, and now releases it so that the handler
74         // thread may handle the associated operation w/o triggering a race condition;
75         // thus this tag will be acquired just before the operation is handled in the
76         // handle_operations functor.
77         call_itt_notify(releasing, &(op->status));
78         // insert the operation in the queue
79         do {
80             // ITT may flag the following line as a race; it is a false positive:
81             // This is an atomic read; we don't provide itt_hide_load_word for atomics
82             op->next = res = pending_operations; // NOT A RACE 
83         } while (pending_operations.compare_and_swap(op, res) != res);
84         if (!res) { // first in the list; handle the operations
85             // ITT note: &pending_operations tag covers access to the handler_busy flag,
86             // which this waiting handler thread will try to set before entering
87             // handle_operations.
88             call_itt_notify(acquired, &pending_operations);
89             start_handle_operations();
90             __TBB_ASSERT(op->status, NULL);
91         }
92         else { // not first; wait for op to be ready
93             call_itt_notify(prepare, &(op->status));
94             spin_wait_while_eq(op->status, uintptr_t(0));
95             itt_load_word_with_acquire(op->status);
96         }
97     }
98
99  private:
100     //! An atomically updated list (aka mailbox) of pending operations
101     atomic<operation_type *> pending_operations;
102     //! Controls thread access to handle_operations
103     uintptr_t handler_busy;
104     handler_type handle_operations;
105
106     //! Trigger the handling of operations when the handler is free
107     void start_handle_operations() {
108         operation_type *op_list;
109
110         // ITT note: &handler_busy tag covers access to pending_operations as it is passed
111         // between active and waiting handlers.  Below, the waiting handler waits until
112         // the active handler releases, and the waiting handler acquires &handler_busy as
113         // it becomes the active_handler. The release point is at the end of this
114         // function, when all operations in pending_operations have been handled by the
115         // owner of this aggregator.
116         call_itt_notify(prepare, &handler_busy);
117         // get the handler_busy:
118         // only one thread can possibly spin here at a time
119         spin_wait_until_eq(handler_busy, uintptr_t(0));
120         call_itt_notify(acquired, &handler_busy);
121         // acquire fence not necessary here due to causality rule and surrounding atomics
122         __TBB_store_with_release(handler_busy, uintptr_t(1));
123
124         // ITT note: &pending_operations tag covers access to the handler_busy flag
125         // itself. Capturing the state of the pending_operations signifies that
126         // handler_busy has been set and a new active handler will now process that list's
127         // operations.
128         call_itt_notify(releasing, &pending_operations);
129         // grab pending_operations
130         op_list = pending_operations.fetch_and_store(NULL);
131
132         // handle all the operations
133         handle_operations(op_list);
134
135         // release the handler
136         itt_store_word_with_release(handler_busy, uintptr_t(0));
137     }
138 };
139
140 // the most-compatible friend declaration (vs, gcc, icc) is
141 //    template<class U, class V> friend class aggregating_functor;
142 template<typename aggregating_class, typename operation_list>
143 class aggregating_functor {
144     aggregating_class *fi;
145 public:
146     aggregating_functor() {}
147     aggregating_functor(aggregating_class *fi_) : fi(fi_) {}
148     void operator()(operation_list* op_list) { fi->handle_operations(op_list); }
149 };
150
151 } // namespace internal
152 } // namespace interface6
153
154 namespace internal {
155     using interface6::internal::aggregated_operation;
156     using interface6::internal::aggregator;
157     using interface6::internal::aggregating_functor;
158 } // namespace internal
159
160 } // namespace tbb
161
162 #endif