]> git.sesse.net Git - casparcg/blob - dependencies64/tbb/include/tbb/aggregator.h
8a28ed0d743ecfb6bda22365f8b57303e68ac0e0
[casparcg] / dependencies64 / tbb / include / tbb / aggregator.h
1 /*
2     Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
3
4     This file is part of Threading Building Blocks. Threading Building Blocks is free software;
5     you can redistribute it and/or modify it under the terms of the GNU General Public License
6     version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
7     distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8     implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
9     See  the GNU General Public License for more details.   You should have received a copy of
10     the  GNU General Public License along with Threading Building Blocks; if not, write to the
11     Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA
12
13     As a special exception,  you may use this file  as part of a free software library without
14     restriction.  Specifically,  if other files instantiate templates  or use macros or inline
15     functions from this file, or you compile this file and link it with other files to produce
16     an executable,  this file does not by itself cause the resulting executable to be covered
17     by the GNU General Public License. This exception does not however invalidate any other
18     reasons why the executable file might be covered by the GNU General Public License.
19 */
20
21 #ifndef __TBB__aggregator_H
22 #define __TBB__aggregator_H
23
24 #if !TBB_PREVIEW_AGGREGATOR
25 #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
26 #endif
27
28 #include "atomic.h"
29 #include "tbb_profiling.h"
30
31 namespace tbb {
32 namespace interface6 {
33
34 using namespace tbb::internal;
35
36 class aggregator_operation {
37     template<typename handler_type> friend class aggregator_ext;
38     uintptr_t status;
39     aggregator_operation* my_next;
40 public:
41     enum aggregator_operation_status { agg_waiting=0, agg_finished };
42     aggregator_operation() : status(agg_waiting), my_next(NULL) {}
43     /// Call start before handling this operation
44     void start() { call_itt_notify(acquired, &status); }
45     /// Call finish when done handling this operation
46     /** The operation will be released to its originating thread, and possibly deleted. */
47     void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
48     aggregator_operation* next() { return itt_hide_load_word(my_next);}
49     void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
50 };
51
52 namespace internal {
53
54 class basic_operation_base : public aggregator_operation {
55     friend class basic_handler;
56     virtual void apply_body() = 0;
57 public:
58     basic_operation_base() : aggregator_operation() {}
59     virtual ~basic_operation_base() {}
60 };
61
62 template<typename Body>
63 class basic_operation : public basic_operation_base, no_assign {
64     const Body& my_body;
65     /*override*/ void apply_body() { my_body(); }
66 public:
67     basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
68 };
69
70 class basic_handler {
71 public:
72     basic_handler() {}
73     void operator()(aggregator_operation* op_list) const { 
74         while (op_list) {
75             // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
76             // The executing thread "acquires" the tag (see start()) and then performs
77             // the associated operation w/o triggering a race condition diagnostics.
78             // A thread that created the operation is waiting for its status (see execute_impl()),
79             // so when this thread is done with the operation, it will "release" the tag 
80             // and update the status (see finish()) to give control back to the waiting thread.
81             basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
82             // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
83             op_list = op_list->next();
84             request.start();
85             request.apply_body();
86             request.finish();
87         }
88     }
89 };
90
91 } // namespace internal
92
93 //! Aggregator base class and expert interface
94 /** An aggregator for collecting operations coming from multiple sources and executing
95     them serially on a single thread. */
96 template <typename handler_type>
97 class aggregator_ext : tbb::internal::no_copy {
98 public:
99     aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
100
101     //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
102     /** Details of user-made operations must be handled by user-provided handler */
103     void process(aggregator_operation *op) { execute_impl(*op); }
104
105  protected:
106     /** Place operation in mailbox, then either handle mailbox or wait for the operation 
107         to be completed by a different thread. */
108     void execute_impl(aggregator_operation& op) {
109         aggregator_operation* res;
110
111         // ITT note: &(op.status) tag is used to cover accesses to this operation. This
112         // thread has created the operation, and now releases it so that the handler
113         // thread may handle the associated operation w/o triggering a race condition;
114         // thus this tag will be acquired just before the operation is handled in the
115         // handle_operations functor.
116         call_itt_notify(releasing, &(op.status));
117         // insert the operation in the queue
118         do {
119             // ITT may flag the following line as a race; it is a false positive:
120             // This is an atomic read; we don't provide itt_hide_load_word for atomics
121             op.my_next = res = mailbox; // NOT A RACE 
122         } while (mailbox.compare_and_swap(&op, res) != res);
123         if (!res) { // first in the list; handle the operations
124             // ITT note: &mailbox tag covers access to the handler_busy flag, which this
125             // waiting handler thread will try to set before entering handle_operations.
126             call_itt_notify(acquired, &mailbox);
127             start_handle_operations();
128             __TBB_ASSERT(op.status, NULL);
129         }
130         else { // not first; wait for op to be ready
131             call_itt_notify(prepare, &(op.status));
132             spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
133             itt_load_word_with_acquire(op.status);
134         }
135     }
136
137
138  private:
139     //! An atomically updated list (aka mailbox) of aggregator_operations
140     atomic<aggregator_operation *> mailbox;
141
142     //! Controls thread access to handle_operations
143     /** Behaves as boolean flag where 0=false, 1=true */
144     uintptr_t handler_busy;
145
146     handler_type handle_operations;
147
148     //! Trigger the handling of operations when the handler is free
149     void start_handle_operations() {
150         aggregator_operation *pending_operations;
151
152         // ITT note: &handler_busy tag covers access to mailbox as it is passed
153         // between active and waiting handlers.  Below, the waiting handler waits until
154         // the active handler releases, and the waiting handler acquires &handler_busy as
155         // it becomes the active_handler. The release point is at the end of this
156         // function, when all operations in mailbox have been handled by the
157         // owner of this aggregator.
158         call_itt_notify(prepare, &handler_busy);
159         // get handler_busy: only one thread can possibly spin here at a time
160         spin_wait_until_eq(handler_busy, uintptr_t(0));
161         call_itt_notify(acquired, &handler_busy);
162         // acquire fence not necessary here due to causality rule and surrounding atomics
163         __TBB_store_with_release(handler_busy, uintptr_t(1));
164
165         // ITT note: &mailbox tag covers access to the handler_busy flag itself. 
166         // Capturing the state of the mailbox signifies that handler_busy has been 
167         // set and a new active handler will now process that list's operations.
168         call_itt_notify(releasing, &mailbox);
169         // grab pending_operations
170         pending_operations = mailbox.fetch_and_store(NULL);
171
172         // handle all the operations
173         handle_operations(pending_operations);
174
175         // release the handler
176         itt_store_word_with_release(handler_busy, uintptr_t(0));
177     }
178 };
179
180 //! Basic aggregator interface
181 class aggregator : private aggregator_ext<internal::basic_handler> {
182 public:
183     aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
184     //! BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
185     /** The calling thread stores the function object in a basic_operation and
186         places the operation in the aggregator's mailbox */
187     template<typename Body>
188     void execute(const Body& b) {
189         internal::basic_operation<Body> op(b);
190         this->execute_impl(op);
191     }
192 };
193
194 } // namespace interface6
195
196 using interface6::aggregator;
197 using interface6::aggregator_ext;
198 using interface6::aggregator_operation;
199
200 } // namespace tbb
201
202 #endif  // __TBB__aggregator_H