2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB_aggregator_internal_H
30 #define __TBB_aggregator_internal_H
32 #include "../atomic.h"
33 #include "../tbb_profiling.h"
36 namespace interface6 {
39 using namespace tbb::internal;
41 //! aggregated_operation base class
42 template <typename Derived>
43 class aggregated_operation {
47 aggregated_operation() : status(0), next(NULL) {}
50 //! Aggregator base class
51 /** An aggregator for collecting operations coming from multiple sources and executing
52 them serially on a single thread. operation_type must be derived from
53 aggregated_operation. The parameter handler_type is a functor that will be passed the
54 list of operations and is expected to handle each operation appropriately, setting the
55 status of each operation to non-zero.*/
56 template < typename handler_type, typename operation_type >
59 aggregator() : handler_busy(false) { pending_operations = NULL; }
60 explicit aggregator(handler_type h) : handler_busy(false), handle_operations(h) {
61 pending_operations = NULL;
64 void initialize_handler(handler_type h) { handle_operations = h; }
66 //! Place operation in list
67 /** Place operation in list and either handle list or wait for operation to
69 void execute(operation_type *op) {
72 // ITT note: &(op->status) tag is used to cover accesses to this op node. This
73 // thread has created the operation, and now releases it so that the handler
74 // thread may handle the associated operation w/o triggering a race condition;
75 // thus this tag will be acquired just before the operation is handled in the
76 // handle_operations functor.
77 call_itt_notify(releasing, &(op->status));
78 // insert the operation in the queue
80 // ITT may flag the following line as a race; it is a false positive:
81 // This is an atomic read; we don't provide itt_hide_load_word for atomics
82 op->next = res = pending_operations; // NOT A RACE
83 } while (pending_operations.compare_and_swap(op, res) != res);
84 if (!res) { // first in the list; handle the operations
85 // ITT note: &pending_operations tag covers access to the handler_busy flag,
86 // which this waiting handler thread will try to set before entering
88 call_itt_notify(acquired, &pending_operations);
89 start_handle_operations();
90 __TBB_ASSERT(op->status, NULL);
92 else { // not first; wait for op to be ready
93 call_itt_notify(prepare, &(op->status));
94 spin_wait_while_eq(op->status, uintptr_t(0));
95 itt_load_word_with_acquire(op->status);
100 //! An atomically updated list (aka mailbox) of pending operations
101 atomic<operation_type *> pending_operations;
102 //! Controls thread access to handle_operations
103 uintptr_t handler_busy;
104 handler_type handle_operations;
106 //! Trigger the handling of operations when the handler is free
107 void start_handle_operations() {
108 operation_type *op_list;
110 // ITT note: &handler_busy tag covers access to pending_operations as it is passed
111 // between active and waiting handlers. Below, the waiting handler waits until
112 // the active handler releases, and the waiting handler acquires &handler_busy as
113 // it becomes the active_handler. The release point is at the end of this
114 // function, when all operations in pending_operations have been handled by the
115 // owner of this aggregator.
116 call_itt_notify(prepare, &handler_busy);
117 // get the handler_busy:
118 // only one thread can possibly spin here at a time
119 spin_wait_until_eq(handler_busy, uintptr_t(0));
120 call_itt_notify(acquired, &handler_busy);
121 // acquire fence not necessary here due to causality rule and surrounding atomics
122 __TBB_store_with_release(handler_busy, uintptr_t(1));
124 // ITT note: &pending_operations tag covers access to the handler_busy flag
125 // itself. Capturing the state of the pending_operations signifies that
126 // handler_busy has been set and a new active handler will now process that list's
128 call_itt_notify(releasing, &pending_operations);
129 // grab pending_operations
130 op_list = pending_operations.fetch_and_store(NULL);
132 // handle all the operations
133 handle_operations(op_list);
135 // release the handler
136 itt_store_word_with_release(handler_busy, uintptr_t(0));
140 // the most-compatible friend declaration (vs, gcc, icc) is
141 // template<class U, class V> friend class aggregating_functor;
142 template<typename aggregating_class, typename operation_list>
143 class aggregating_functor {
144 aggregating_class *fi;
146 aggregating_functor() {}
147 aggregating_functor(aggregating_class *fi_) : fi(fi_) {}
148 void operator()(operation_list* op_list) { fi->handle_operations(op_list); }
151 } // namespace internal
152 } // namespace interface6
155 using interface6::internal::aggregated_operation;
156 using interface6::internal::aggregator;
157 using interface6::internal::aggregating_functor;
158 } // namespace internal