2 Copyright 2005-2011 Intel Corporation. All Rights Reserved.
4 This file is part of Threading Building Blocks.
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
29 #ifndef __TBB__graph_join_internal_H
30 #define __TBB__graph_join_internal_H
34 typedef size_t tag_value;
35 static const tag_value NO_TAG = tag_value(-1);
37 struct forwarding_base {
38 forwarding_base(task *rt) : my_root_task(rt), current_tag(NO_TAG) {}
39 virtual ~forwarding_base() {}
40 virtual void decrement_port_count() = 0;
41 virtual void increment_port_count() = 0;
42 virtual void increment_tag_count(tag_value /*t*/) {}
43 // moved here so input ports can queue tasks
45 tag_value current_tag; // so ports can refer to FE's desired items
51 template< typename TupleType, typename PortType >
52 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
53 std::get<N-1>( my_input ).set_join_node_pointer(port);
54 join_helper<N-1>::set_join_node_pointer( my_input, port );
56 template< typename TupleType >
57 static inline void consume_reservations( TupleType &my_input ) {
58 std::get<N-1>( my_input ).consume();
59 join_helper<N-1>::consume_reservations( my_input );
62 template< typename TupleType >
63 static inline void release_my_reservation( TupleType &my_input ) {
64 std::get<N-1>( my_input ).release();
67 template <typename TupleType>
68 static inline void release_reservations( TupleType &my_input) {
69 join_helper<N-1>::release_reservations(my_input);
70 release_my_reservation(my_input);
73 template< typename InputTuple, typename OutputTuple >
74 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
75 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
76 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
77 release_my_reservation( my_input );
83 template<typename InputTuple, typename OutputTuple>
84 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
85 bool res = std::get<N-1>(my_input).get_item(std::get<N-1>(out) ); // may fail
86 return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
89 template<typename InputTuple, typename OutputTuple>
90 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
91 return get_my_item(my_input, out);
94 template<typename InputTuple>
95 static inline void reset_my_port(InputTuple &my_input) {
96 join_helper<N-1>::reset_my_port(my_input);
97 std::get<N-1>(my_input).reset_port();
100 template<typename InputTuple>
101 static inline void reset_ports(InputTuple& my_input) {
102 reset_my_port(my_input);
105 template<typename InputTuple, typename TagFuncTuple>
106 static inline void set_tag_func(InputTuple &my_input, TagFuncTuple &my_tag_funcs) {
107 std::get<N-1>(my_input).set_my_original_tag_func(std::get<N-1>(my_tag_funcs));
108 std::get<N-1>(my_input).set_my_tag_func(std::get<N-1>(my_input).my_original_func()->clone());
109 std::get<N-1>(my_tag_funcs) = NULL;
110 join_helper<N-1>::set_tag_func(my_input, my_tag_funcs);
113 template< typename TagFuncTuple1, typename TagFuncTuple2>
114 static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagFuncTuple2 &other_inputs) {
115 if(std::get<N-1>(other_inputs).my_original_func()) {
116 std::get<N-1>(my_inputs).set_my_tag_func(std::get<N-1>(other_inputs).my_original_func()->clone());
117 std::get<N-1>(my_inputs).set_my_original_tag_func(std::get<N-1>(other_inputs).my_original_func()->clone());
119 join_helper<N-1>::copy_tag_functors(my_inputs, other_inputs);
124 struct join_helper<1> {
126 template< typename TupleType, typename PortType >
127 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
128 std::get<0>( my_input ).set_join_node_pointer(port);
131 template< typename TupleType >
132 static inline void consume_reservations( TupleType &my_input ) {
133 std::get<0>( my_input ).consume();
136 template< typename TupleType >
137 static inline void release_my_reservation( TupleType &my_input ) {
138 std::get<0>( my_input ).release();
141 template<typename TupleType>
142 static inline void release_reservations( TupleType &my_input) {
143 release_my_reservation(my_input);
146 template< typename InputTuple, typename OutputTuple >
147 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
148 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
151 template<typename InputTuple, typename OutputTuple>
152 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
153 return std::get<0>(my_input).get_item(std::get<0>(out));
156 template<typename InputTuple, typename OutputTuple>
157 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
158 return get_my_item(my_input, out);
161 template<typename InputTuple>
162 static inline void reset_my_port(InputTuple &my_input) {
163 std::get<0>(my_input).reset_port();
166 template<typename InputTuple>
167 static inline void reset_ports(InputTuple& my_input) {
168 reset_my_port(my_input);
171 template<typename InputTuple, typename TagFuncTuple>
172 static inline void set_tag_func(InputTuple &my_input, TagFuncTuple &my_tag_funcs) {
173 std::get<0>(my_input).set_my_original_tag_func(std::get<0>(my_tag_funcs));
174 std::get<0>(my_input).set_my_tag_func(std::get<0>(my_input).my_original_func()->clone());
175 std::get<0>(my_tag_funcs) = NULL;
178 template< typename TagFuncTuple1, typename TagFuncTuple2>
179 static inline void copy_tag_functors(TagFuncTuple1 &my_inputs, TagFuncTuple2 &other_inputs) {
180 if(std::get<0>(other_inputs).my_original_func()) {
181 std::get<0>(my_inputs).set_my_tag_func(std::get<0>(other_inputs).my_original_func()->clone());
182 std::get<0>(my_inputs).set_my_original_tag_func(std::get<0>(other_inputs).my_original_func()->clone());
187 //! The two-phase join port
188 template< typename T >
189 class reserving_port : public receiver<T> {
191 typedef T input_type;
192 typedef sender<T> predecessor_type;
194 // ----------- Aggregator ------------
195 enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res };
196 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
197 typedef reserving_port<T> my_class;
199 class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
204 predecessor_type *my_pred;
206 reserving_port_operation(const T& e, op_type t) :
207 type(char(t)), my_arg(const_cast<T*>(&e)) {}
208 reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
209 my_pred(const_cast<predecessor_type *>(&s)) {}
210 reserving_port_operation(op_type t) : type(char(t)) {}
213 typedef internal::aggregating_functor<my_class, reserving_port_operation> my_handler;
214 friend class internal::aggregating_functor<my_class, reserving_port_operation>;
215 aggregator<my_handler, reserving_port_operation> my_aggregator;
217 void handle_operations(reserving_port_operation* op_list) {
218 reserving_port_operation *current;
219 bool no_predecessors;
222 op_list = op_list->next;
223 switch(current->type) {
225 no_predecessors = my_predecessors.empty();
226 my_predecessors.add(*(current->my_pred));
227 if ( no_predecessors ) {
228 my_join->decrement_port_count( ); // may try to forward
230 __TBB_store_with_release(current->status, SUCCEEDED);
233 my_predecessors.remove(*(current->my_pred));
234 if(my_predecessors.empty()) my_join->increment_port_count();
235 __TBB_store_with_release(current->status, SUCCEEDED);
239 __TBB_store_with_release(current->status, FAILED);
241 else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
243 __TBB_store_with_release(current->status, SUCCEEDED);
245 if ( my_predecessors.empty() ) {
246 my_join->increment_port_count();
248 __TBB_store_with_release(current->status, FAILED);
253 my_predecessors.try_release( );
254 __TBB_store_with_release(current->status, SUCCEEDED);
258 my_predecessors.try_consume( );
259 __TBB_store_with_release(current->status, SUCCEEDED);
268 reserving_port() : reserved(false) {
270 my_predecessors.set_owner( this );
271 my_aggregator.initialize_handler(my_handler(this));
275 reserving_port(const reserving_port& /* other */) : receiver<T>() {
278 my_predecessors.set_owner( this );
279 my_aggregator.initialize_handler(my_handler(this));
282 void set_join_node_pointer(forwarding_base *join) {
286 // always rejects, so arc is reversed (and reserves can be done.)
287 bool try_put( const T & ) {
291 //! Add a predecessor
292 bool register_predecessor( sender<T> &src ) {
293 reserving_port_operation op_data(src, reg_pred);
294 my_aggregator.execute(&op_data);
295 return op_data.status == SUCCEEDED;
298 //! Remove a predecessor
299 bool remove_predecessor( sender<T> &src ) {
300 reserving_port_operation op_data(src, rem_pred);
301 my_aggregator.execute(&op_data);
302 return op_data.status == SUCCEEDED;
305 //! Reserve an item from the port
306 bool reserve( T &v ) {
307 reserving_port_operation op_data(v, res_item);
308 my_aggregator.execute(&op_data);
309 return op_data.status == SUCCEEDED;
314 reserving_port_operation op_data(rel_res);
315 my_aggregator.execute(&op_data);
318 //! Complete use of the port
320 reserving_port_operation op_data(con_res);
321 my_aggregator.execute(&op_data);
325 forwarding_base *my_join;
326 reservable_predecessor_cache< T, null_mutex > my_predecessors;
330 //! queueing join_port
332 class queueing_port : public receiver<T>, public item_buffer<T> {
334 typedef T input_type;
335 typedef sender<T> predecessor_type;
336 typedef queueing_port<T> my_node_type;
338 // ----------- Aggregator ------------
340 enum op_type { try__put, get__item, res_port };
341 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
342 typedef queueing_port<T> my_class;
344 class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
351 // constructor for value parameter
352 queueing_port_operation(const T& e, op_type t) :
353 // type(char(t)), my_val(const_cast<T>(e)) {}
354 type(char(t)), my_val(e) {}
355 // constructor for pointer parameter
356 queueing_port_operation(const T* p, op_type t) :
357 type(char(t)), my_arg(const_cast<T*>(p)) {}
358 // constructor with no parameter
359 queueing_port_operation(op_type t) : type(char(t)) {}
362 typedef internal::aggregating_functor<my_class, queueing_port_operation> my_handler;
363 friend class internal::aggregating_functor<my_class, queueing_port_operation>;
364 aggregator<my_handler, queueing_port_operation> my_aggregator;
366 void handle_operations(queueing_port_operation* op_list) {
367 queueing_port_operation *current;
371 op_list = op_list->next;
372 switch(current->type) {
374 was_empty = this->buffer_empty();
375 this->push_back(current->my_val);
376 if (was_empty) my_join->decrement_port_count();
377 __TBB_store_with_release(current->status, SUCCEEDED);
380 if(!this->buffer_empty()) {
381 this->fetch_front(*(current->my_arg));
382 __TBB_store_with_release(current->status, SUCCEEDED);
385 __TBB_store_with_release(current->status, FAILED);
389 __TBB_ASSERT(this->item_valid(this->my_head), "No item to reset");
390 this->invalidate_front(); ++(this->my_head);
391 if(this->item_valid(this->my_head)) {
392 my_join->decrement_port_count();
394 __TBB_store_with_release(current->status, SUCCEEDED);
399 // ------------ End Aggregator ---------------
403 queueing_port() : item_buffer<T>() {
405 my_aggregator.initialize_handler(my_handler(this));
409 queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
411 my_aggregator.initialize_handler(my_handler(this));
414 //! record parent for tallying available items
415 void set_join_node_pointer(forwarding_base *join) {
419 /*override*/bool try_put(const T &v) {
420 queueing_port_operation op_data(v, try__put);
421 my_aggregator.execute(&op_data);
422 return op_data.status == SUCCEEDED;
426 bool get_item( T &v ) {
427 queueing_port_operation op_data(&v, get__item);
428 my_aggregator.execute(&op_data);
429 return op_data.status == SUCCEEDED;
432 // reset_port is called when item is accepted by successor, but
433 // is initiated by join_node.
435 queueing_port_operation op_data(res_port);
436 my_aggregator.execute(&op_data);
441 forwarding_base *my_join;
444 #include "_flow_graph_tagged_buffer_impl.h"
446 template< typename T >
447 class tag_matching_port : public receiver<T>, public tagged_buffer< tag_value, T, NO_TAG > {
449 typedef T input_type;
450 typedef sender<T> predecessor_type;
451 typedef tag_matching_port<T> my_node_type; // for forwarding, if needed
452 typedef function_body<input_type, tag_value> my_tag_func_type;
454 // ----------- Aggregator ------------
456 enum op_type { try__put, get__item, res_port };
457 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
458 typedef tag_matching_port<T> my_class;
460 class tag_matching_port_operation : public aggregated_operation<tag_matching_port_operation> {
467 // constructor for value parameter
468 tag_matching_port_operation(const T& e, op_type t) :
469 // type(char(t)), my_val(const_cast<T>(e)) {}
470 type(char(t)), my_val(e) {}
471 // constructor for pointer parameter
472 tag_matching_port_operation(const T* p, op_type t) :
473 type(char(t)), my_arg(const_cast<T*>(p)) {}
474 // constructor with no parameter
475 tag_matching_port_operation(op_type t) : type(char(t)) {}
478 typedef internal::aggregating_functor<my_class, tag_matching_port_operation> my_handler;
479 friend class internal::aggregating_functor<my_class, tag_matching_port_operation>;
480 aggregator<my_handler, tag_matching_port_operation> my_aggregator;
482 void handle_operations(tag_matching_port_operation* op_list) {
483 tag_matching_port_operation *current;
486 op_list = op_list->next;
487 switch(current->type) {
489 tag_value tval = (*my_tag_func)(current->my_val);
490 bool was_inserted = this->tagged_insert(tval, current->my_val);
492 // report the tag to join_node_FE
493 my_join->increment_tag_count(tval); // may spawn
495 // should we make it an error to insert a tag twice?
496 __TBB_store_with_release(current->status, SUCCEEDED);
500 // use current_tag from FE for item
501 if(!this->tagged_find(my_join->current_tag, *(current->my_arg))) {
502 __TBB_ASSERT(false, "Failed to find item corresponding to current_tag.");
504 __TBB_store_with_release(current->status, SUCCEEDED);
507 // use current_tag from FE for item
508 this->tagged_delete(my_join->current_tag);
509 __TBB_store_with_release(current->status, SUCCEEDED);
514 // ------------ End Aggregator ---------------
517 tag_matching_port() : receiver<T>(), tagged_buffer<tag_value, T, NO_TAG>() {
520 my_original_tag_func = NULL;
521 my_aggregator.initialize_handler(my_handler(this));
525 tag_matching_port(const tag_matching_port& /*other*/) : receiver<T>(), tagged_buffer<tag_value,T, NO_TAG>() {
527 // setting the tag methods is done in the copy-constructor for the front-end.
529 my_original_tag_func = NULL;
530 my_aggregator.initialize_handler(my_handler(this));
533 ~tag_matching_port() {
534 if (my_tag_func) delete my_tag_func;
535 if (my_original_tag_func) delete my_original_tag_func;
538 void set_join_node_pointer(forwarding_base *join) {
542 void set_my_original_tag_func(my_tag_func_type *f) {
543 my_original_tag_func = f;
546 void set_my_tag_func(my_tag_func_type *f) {
550 /*override*/bool try_put(const T& v) {
551 tag_matching_port_operation op_data(v, try__put);
552 my_aggregator.execute(&op_data);
553 return op_data.status == SUCCEEDED;
557 bool get_item( T &v ) {
558 tag_matching_port_operation op_data(&v, get__item);
559 my_aggregator.execute(&op_data);
560 return op_data.status == SUCCEEDED;
563 // reset_port is called when item is accepted by successor, but
564 // is initiated by join_node.
566 tag_matching_port_operation op_data(res_port);
567 my_aggregator.execute(&op_data);
571 my_tag_func_type *my_func() { return my_tag_func; }
572 my_tag_func_type *my_original_func() { return my_original_tag_func; }
575 // need map of tags to values
576 forwarding_base *my_join;
577 my_tag_func_type *my_tag_func;
578 my_tag_func_type *my_original_tag_func;
579 }; // tag_matching_port
581 using namespace graph_policy_namespace;
583 template<graph_buffer_policy JP, typename InputTuple, typename OutputTuple>
584 class join_node_base;
586 //! join_node_FE : implements input port policy
587 template<graph_buffer_policy JP, typename InputTuple, typename OutputTuple>
590 template<typename InputTuple, typename OutputTuple>
591 class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
593 static const int N = std::tuple_size<OutputTuple>::value;
594 typedef OutputTuple output_type;
595 typedef InputTuple input_type;
596 typedef join_node_base<reserving, InputTuple, OutputTuple> my_node_type; // for forwarding
598 join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
599 ports_with_no_inputs = N;
600 join_helper<N>::set_join_node_pointer(my_inputs, this);
603 join_node_FE(const join_node_FE& other) : forwarding_base(other.my_root_task), my_node(NULL) {
604 ports_with_no_inputs = N;
605 join_helper<N>::set_join_node_pointer(my_inputs, this);
608 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
610 void increment_port_count() {
611 ++ports_with_no_inputs;
614 // if all input_ports have predecessors, spawn forward to try and consume tuples
615 void decrement_port_count() {
616 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
617 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
618 forward_task<my_node_type>(*my_node) );
622 input_type &inputs() { return my_inputs; }
624 // all methods on input ports should be called under mutual exclusion from join_node_base.
626 bool tuple_build_may_succeed() {
627 return !ports_with_no_inputs;
630 bool try_to_make_tuple(output_type &out) {
631 if(ports_with_no_inputs) return false;
632 return join_helper<N>::reserve(my_inputs, out);
635 void tuple_accepted() {
636 join_helper<N>::consume_reservations(my_inputs);
638 void tuple_rejected() {
639 join_helper<N>::release_reservations(my_inputs);
642 input_type my_inputs;
643 my_node_type *my_node;
644 atomic<size_t> ports_with_no_inputs;
647 template<typename InputTuple, typename OutputTuple>
648 class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
650 static const int N = std::tuple_size<OutputTuple>::value;
651 typedef OutputTuple output_type;
652 typedef InputTuple input_type;
653 typedef join_node_base<queueing, InputTuple, OutputTuple> my_node_type; // for forwarding
655 join_node_FE(graph &g) : forwarding_base(g.root_task()), my_node(NULL) {
656 ports_with_no_items = N;
657 join_helper<N>::set_join_node_pointer(my_inputs, this);
660 join_node_FE(const join_node_FE& other) : forwarding_base(other.my_root_task), my_node(NULL) {
661 ports_with_no_items = N;
662 join_helper<N>::set_join_node_pointer(my_inputs, this);
665 // needed for forwarding
666 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
668 void reset_port_count() {
669 ports_with_no_items = N;
672 // if all input_ports have items, spawn forward to try and consume tuples
673 void decrement_port_count() {
674 if(ports_with_no_items.fetch_and_decrement() == 1) {
675 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
676 forward_task<my_node_type>(*my_node) );
680 void increment_port_count() { __TBB_ASSERT(false, NULL); } // should never be called
682 input_type &inputs() { return my_inputs; }
684 // all methods on input ports should be called under mutual exclusion from join_node_base.
686 bool tuple_build_may_succeed() {
687 return !ports_with_no_items;
690 bool try_to_make_tuple(output_type &out) {
691 if(ports_with_no_items) return false;
692 return join_helper<N>::get_items(my_inputs, out);
695 void tuple_accepted() {
697 join_helper<N>::reset_ports(my_inputs);
699 void tuple_rejected() {
703 input_type my_inputs;
704 my_node_type *my_node;
705 atomic<size_t> ports_with_no_items;
708 // tag_matching join input port.
709 template<typename InputTuple, typename OutputTuple>
710 class join_node_FE<tag_matching, InputTuple, OutputTuple> : public forwarding_base, public tagged_buffer<tag_value, size_t, NO_TAG> {
712 static const int N = std::tuple_size<OutputTuple>::value;
713 typedef OutputTuple output_type;
714 typedef InputTuple input_type;
715 typedef tagged_buffer<tag_value, size_t, NO_TAG> my_tag_buffer;
716 typedef join_node_base<tag_matching, InputTuple, OutputTuple> my_node_type; // for forwarding
718 // ----------- Aggregator ------------
719 // the aggregator is only needed to serialize the access to the hash table.
720 // and the current_tag field.
722 enum op_type { res_count, inc_count, may_succeed, try_make };
723 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
724 typedef join_node_FE<tag_matching, InputTuple, OutputTuple> my_class;
726 class tag_matching_FE_operation : public aggregated_operation<tag_matching_FE_operation> {
731 output_type* my_output;
733 // constructor for value parameter
734 tag_matching_FE_operation(const tag_value& e, op_type t) :
735 // type(char(t)), my_val(const_cast<T>(e)) {}
736 type(char(t)), my_val(e) {}
737 tag_matching_FE_operation(output_type *p, op_type t) :
738 type(t), my_output(p) {}
739 // constructor with no parameter
740 tag_matching_FE_operation(op_type t) : type(char(t)) {}
743 typedef internal::aggregating_functor<my_class, tag_matching_FE_operation> my_handler;
744 friend class internal::aggregating_functor<my_class, tag_matching_FE_operation>;
745 aggregator<my_handler, tag_matching_FE_operation> my_aggregator;
747 void handle_operations(tag_matching_FE_operation* op_list) {
748 tag_matching_FE_operation *current;
751 op_list = op_list->next;
752 switch(current->type) {
754 this->current_tag = NO_TAG;
755 if(find_value_tag(this->current_tag,N)) {
756 this->tagged_delete(this->current_tag);
757 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
758 forward_task<my_node_type>(*my_node) );
760 __TBB_store_with_release(current->status, SUCCEEDED);
764 tag_value t = current->my_val;
765 if(!(this->tagged_find_ref(t,p))) {
766 this->tagged_insert(t, 0);
767 if(!(this->tagged_find_ref(t,p))) {
768 __TBB_ASSERT(false, NULL);
771 if(++(*p) == size_t(N) && this->current_tag == NO_TAG) {
772 // all items of tuple are available.
773 this->current_tag = t;
774 this->tagged_delete(t);
775 task::enqueue( * new ( task::allocate_additional_child_of( *(this->my_root_task) ) )
776 forward_task<my_node_type>(*my_node) );
779 __TBB_store_with_release(current->status, SUCCEEDED);
782 if(this->current_tag == NO_TAG) {
783 __TBB_store_with_release(current->status, FAILED);
786 __TBB_store_with_release(current->status, SUCCEEDED);
790 if(this->current_tag == NO_TAG) {
791 __TBB_store_with_release(current->status, FAILED);
794 if(join_helper<N>::get_items(my_inputs, *(current->my_output))) {
795 __TBB_store_with_release(current->status, SUCCEEDED);
798 __TBB_ASSERT(false, NULL); // shouldn't be asked to make a tuple if all items not available.
805 // ------------ End Aggregator ---------------
808 template<typename FunctionTuple>
809 join_node_FE(graph &g, FunctionTuple tag_funcs) : forwarding_base(g.root_task()), my_node(NULL) {
810 join_helper<N>::set_join_node_pointer(my_inputs, this);
811 join_helper<N>::set_tag_func(my_inputs, tag_funcs);
812 my_aggregator.initialize_handler(my_handler(this));
815 join_node_FE(const join_node_FE& other) : forwarding_base(other.my_root_task), my_tag_buffer(), my_node(NULL) {
816 join_helper<N>::set_join_node_pointer(my_inputs, this);
817 join_helper<N>::copy_tag_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
818 my_aggregator.initialize_handler(my_handler(this));
821 // needed for forwarding
822 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
824 void reset_port_count() {
825 // reset while current_tag has old value. The current_tag value is still valid, and will
826 // not be reset until we call into res_count in the aggregator.
827 // called from aggregator of back-end of join node (via tuple_accepted()), so this is serial on join.
828 join_helper<N>::reset_ports(my_inputs);
829 // only the hash table ops need to be serialized on our aggregator.
830 tag_matching_FE_operation op_data(res_count);
831 my_aggregator.execute(&op_data);
835 // if all input_ports have items, spawn forward to try and consume tuples
836 void increment_tag_count(tag_value t) {
837 tag_matching_FE_operation op_data(t, inc_count);
838 my_aggregator.execute(&op_data);
842 void decrement_port_count() { __TBB_ASSERT(false, NULL); }
844 void increment_port_count() { __TBB_ASSERT(false, NULL); } // should never be called
846 input_type &inputs() { return my_inputs; }
848 // all methods on input ports should be called under mutual exclusion from join_node_base.
850 bool tuple_build_may_succeed() {
851 tag_matching_FE_operation op_data(may_succeed);
852 my_aggregator.execute(&op_data);
853 return op_data.status == SUCCEEDED;
856 // cannot lock while calling back to input_ports. current_tag will only be set
857 // and reset under the aggregator, so it will remain consistent.
858 bool try_to_make_tuple(output_type &out) {
859 if(this->current_tag == NO_TAG) {
862 if(join_helper<N>::get_items(my_inputs, out)) {
865 __TBB_ASSERT(false, NULL); // shouldn't be asked to make a tuple if all items not available.
869 void tuple_accepted() {
870 reset_port_count(); // reset current_tag after ports reset.
873 void tuple_rejected() {
877 input_type my_inputs; // input ports
878 my_node_type *my_node;
879 }; // join_node_FE<tag_matching, InputTuple, OutputTuple>
882 template<graph_buffer_policy JP, typename InputTuple, typename OutputTuple>
883 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
884 public sender<OutputTuple> {
886 typedef OutputTuple output_type;
888 typedef receiver<output_type> successor_type;
889 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
890 using input_ports_type::tuple_build_may_succeed;
891 using input_ports_type::try_to_make_tuple;
892 using input_ports_type::tuple_accepted;
893 using input_ports_type::tuple_rejected;
896 // ----------- Aggregator ------------
897 enum op_type { reg_succ, rem_succ, try__get, do_fwrd };
898 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
899 typedef join_node_base<JP,InputTuple,OutputTuple> my_class;
901 class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
906 successor_type *my_succ;
908 join_node_base_operation(const output_type& e, op_type t) :
909 type(char(t)), my_arg(const_cast<output_type*>(&e)) {}
910 join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
911 my_succ(const_cast<successor_type *>(&s)) {}
912 join_node_base_operation(op_type t) : type(char(t)) {}
915 typedef internal::aggregating_functor<my_class, join_node_base_operation> my_handler;
916 friend class internal::aggregating_functor<my_class, join_node_base_operation>;
918 aggregator<my_handler, join_node_base_operation> my_aggregator;
920 void handle_operations(join_node_base_operation* op_list) {
921 join_node_base_operation *current;
924 op_list = op_list->next;
925 switch(current->type) {
927 my_successors.register_successor(*(current->my_succ));
928 if(tuple_build_may_succeed() && !forwarder_busy) {
929 task::enqueue( * new ( task::allocate_additional_child_of(*(this->my_root_task)) )
930 forward_task<join_node_base<JP,InputTuple,OutputTuple> >(*this));
931 forwarder_busy = true;
933 __TBB_store_with_release(current->status, SUCCEEDED);
936 my_successors.remove_successor(*(current->my_succ));
937 __TBB_store_with_release(current->status, SUCCEEDED);
940 if(tuple_build_may_succeed()) {
941 if(try_to_make_tuple(*(current->my_arg))) {
943 __TBB_store_with_release(current->status, SUCCEEDED);
945 else __TBB_store_with_release(current->status, FAILED);
947 else __TBB_store_with_release(current->status, FAILED);
950 bool build_succeeded;
952 if(tuple_build_may_succeed()) {
954 build_succeeded = try_to_make_tuple(out);
955 if(build_succeeded) {
956 if(my_successors.try_put(out)) {
961 build_succeeded = false;
964 } while(build_succeeded);
966 __TBB_store_with_release(current->status, SUCCEEDED);
967 forwarder_busy = false;
973 // ---------- end aggregator -----------
975 join_node_base(graph &g) : input_ports_type(g), forwarder_busy(false) {
976 my_successors.set_owner(this);
977 input_ports_type::set_my_node(this);
978 my_aggregator.initialize_handler(my_handler(this));
981 join_node_base(const join_node_base& other) :
982 #if ( __TBB_GCC_VERSION < 40202 )
985 input_ports_type(other),
986 #if ( __TBB_GCC_VERSION < 40202 )
987 sender<OutputTuple>(),
989 forwarder_busy(false), my_successors() {
990 my_successors.set_owner(this);
991 input_ports_type::set_my_node(this);
992 my_aggregator.initialize_handler(my_handler(this));
995 template<typename FunctionTuple>
996 join_node_base(graph &g, FunctionTuple f) : input_ports_type(g, f), forwarder_busy(false) {
997 my_successors.set_owner(this);
998 input_ports_type::set_my_node(this);
999 my_aggregator.initialize_handler(my_handler(this));
1002 bool register_successor(successor_type &r) {
1003 join_node_base_operation op_data(r, reg_succ);
1004 my_aggregator.execute(&op_data);
1005 return op_data.status == SUCCEEDED;
1008 bool remove_successor( successor_type &r) {
1009 join_node_base_operation op_data(r, rem_succ);
1010 my_aggregator.execute(&op_data);
1011 return op_data.status == SUCCEEDED;
1014 bool try_get( output_type &v) {
1015 join_node_base_operation op_data(v, try__get);
1016 my_aggregator.execute(&op_data);
1017 return op_data.status == SUCCEEDED;
1021 broadcast_cache<output_type, null_rw_mutex> my_successors;
1023 friend class forward_task< join_node_base<JP, InputTuple, OutputTuple> >;
1026 join_node_base_operation op_data(do_fwrd);
1027 my_aggregator.execute(&op_data);
1031 //! unfolded_join_node : passes input_ports_tuple_type to join_node_base. We build the input port type
1032 // using tuple_element. The class PT is the port type (reserving_port, queueing_port, tag_matching_port)
1033 // and should match the graph_buffer_policy.
1034 template<int N, template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1035 class unfolded_join_node;
1037 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1038 class unfolded_join_node<2,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1040 PT<typename std::tuple_element<0,OutputTuple>::type>,
1041 PT<typename std::tuple_element<1,OutputTuple>::type> >,
1046 typedef typename std::tuple<
1047 PT<typename std::tuple_element<0,OutputTuple>::type>,
1048 PT<typename std::tuple_element<1,OutputTuple>::type> > input_ports_tuple_type;
1049 typedef OutputTuple output_type;
1051 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1053 unfolded_join_node(graph &g) : base_type(g) {}
1054 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1057 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1058 class unfolded_join_node<3,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1060 PT<typename std::tuple_element<0,OutputTuple>::type>,
1061 PT<typename std::tuple_element<1,OutputTuple>::type>,
1062 PT<typename std::tuple_element<2,OutputTuple>::type> >,
1067 typedef typename std::tuple<
1068 PT<typename std::tuple_element<0,OutputTuple>::type>,
1069 PT<typename std::tuple_element<1,OutputTuple>::type>,
1070 PT<typename std::tuple_element<2,OutputTuple>::type> > input_ports_tuple_type;
1071 typedef OutputTuple output_type;
1073 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1075 unfolded_join_node(graph &g) : base_type(g) {}
1076 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1079 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1080 class unfolded_join_node<4,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1082 PT<typename std::tuple_element<0,OutputTuple>::type>,
1083 PT<typename std::tuple_element<1,OutputTuple>::type>,
1084 PT<typename std::tuple_element<2,OutputTuple>::type>,
1085 PT<typename std::tuple_element<3,OutputTuple>::type> >,
1089 typedef typename std::tuple<
1090 PT<typename std::tuple_element<0,OutputTuple>::type>,
1091 PT<typename std::tuple_element<1,OutputTuple>::type>,
1092 PT<typename std::tuple_element<2,OutputTuple>::type>,
1093 PT<typename std::tuple_element<3,OutputTuple>::type> > input_ports_tuple_type;
1094 typedef OutputTuple output_type;
1096 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1098 unfolded_join_node(graph &g) : base_type(g) {}
1099 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1102 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1103 class unfolded_join_node<5,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1105 PT<typename std::tuple_element<0,OutputTuple>::type>,
1106 PT<typename std::tuple_element<1,OutputTuple>::type>,
1107 PT<typename std::tuple_element<2,OutputTuple>::type>,
1108 PT<typename std::tuple_element<3,OutputTuple>::type>,
1109 PT<typename std::tuple_element<4,OutputTuple>::type> >,
1113 typedef typename std::tuple<
1114 PT<typename std::tuple_element<0,OutputTuple>::type>,
1115 PT<typename std::tuple_element<1,OutputTuple>::type>,
1116 PT<typename std::tuple_element<2,OutputTuple>::type>,
1117 PT<typename std::tuple_element<3,OutputTuple>::type>,
1118 PT<typename std::tuple_element<4,OutputTuple>::type> > input_ports_tuple_type;
1119 typedef OutputTuple output_type;
1121 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1123 unfolded_join_node(graph &g) : base_type(g) {}
1124 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1127 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1128 class unfolded_join_node<6,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1130 PT<typename std::tuple_element<0,OutputTuple>::type>,
1131 PT<typename std::tuple_element<1,OutputTuple>::type>,
1132 PT<typename std::tuple_element<2,OutputTuple>::type>,
1133 PT<typename std::tuple_element<3,OutputTuple>::type>,
1134 PT<typename std::tuple_element<4,OutputTuple>::type>,
1135 PT<typename std::tuple_element<5,OutputTuple>::type> >,
1139 typedef typename std::tuple<
1140 PT<typename std::tuple_element<0,OutputTuple>::type>,
1141 PT<typename std::tuple_element<1,OutputTuple>::type>,
1142 PT<typename std::tuple_element<2,OutputTuple>::type>,
1143 PT<typename std::tuple_element<3,OutputTuple>::type>,
1144 PT<typename std::tuple_element<4,OutputTuple>::type>,
1145 PT<typename std::tuple_element<5,OutputTuple>::type> > input_ports_tuple_type;
1146 typedef OutputTuple output_type;
1148 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1150 unfolded_join_node(graph &g) : base_type(g) {}
1151 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1154 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1155 class unfolded_join_node<7,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1157 PT<typename std::tuple_element<0,OutputTuple>::type>,
1158 PT<typename std::tuple_element<1,OutputTuple>::type>,
1159 PT<typename std::tuple_element<2,OutputTuple>::type>,
1160 PT<typename std::tuple_element<3,OutputTuple>::type>,
1161 PT<typename std::tuple_element<4,OutputTuple>::type>,
1162 PT<typename std::tuple_element<5,OutputTuple>::type>,
1163 PT<typename std::tuple_element<6,OutputTuple>::type> >,
1167 typedef typename std::tuple<
1168 PT<typename std::tuple_element<0,OutputTuple>::type>,
1169 PT<typename std::tuple_element<1,OutputTuple>::type>,
1170 PT<typename std::tuple_element<2,OutputTuple>::type>,
1171 PT<typename std::tuple_element<3,OutputTuple>::type>,
1172 PT<typename std::tuple_element<4,OutputTuple>::type>,
1173 PT<typename std::tuple_element<5,OutputTuple>::type>,
1174 PT<typename std::tuple_element<6,OutputTuple>::type> > input_ports_tuple_type;
1175 typedef OutputTuple output_type;
1177 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1179 unfolded_join_node(graph &g) : base_type(g) {}
1180 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1183 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1184 class unfolded_join_node<8,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1186 PT<typename std::tuple_element<0,OutputTuple>::type>,
1187 PT<typename std::tuple_element<1,OutputTuple>::type>,
1188 PT<typename std::tuple_element<2,OutputTuple>::type>,
1189 PT<typename std::tuple_element<3,OutputTuple>::type>,
1190 PT<typename std::tuple_element<4,OutputTuple>::type>,
1191 PT<typename std::tuple_element<5,OutputTuple>::type>,
1192 PT<typename std::tuple_element<6,OutputTuple>::type>,
1193 PT<typename std::tuple_element<7,OutputTuple>::type> >,
1197 typedef typename std::tuple<
1198 PT<typename std::tuple_element<0,OutputTuple>::type>,
1199 PT<typename std::tuple_element<1,OutputTuple>::type>,
1200 PT<typename std::tuple_element<2,OutputTuple>::type>,
1201 PT<typename std::tuple_element<3,OutputTuple>::type>,
1202 PT<typename std::tuple_element<4,OutputTuple>::type>,
1203 PT<typename std::tuple_element<5,OutputTuple>::type>,
1204 PT<typename std::tuple_element<6,OutputTuple>::type>,
1205 PT<typename std::tuple_element<7,OutputTuple>::type> > input_ports_tuple_type;
1206 typedef OutputTuple output_type;
1208 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1210 unfolded_join_node(graph &g) : base_type(g) {}
1211 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1214 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1215 class unfolded_join_node<9,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1217 PT<typename std::tuple_element<0,OutputTuple>::type>,
1218 PT<typename std::tuple_element<1,OutputTuple>::type>,
1219 PT<typename std::tuple_element<2,OutputTuple>::type>,
1220 PT<typename std::tuple_element<3,OutputTuple>::type>,
1221 PT<typename std::tuple_element<4,OutputTuple>::type>,
1222 PT<typename std::tuple_element<5,OutputTuple>::type>,
1223 PT<typename std::tuple_element<6,OutputTuple>::type>,
1224 PT<typename std::tuple_element<7,OutputTuple>::type>,
1225 PT<typename std::tuple_element<8,OutputTuple>::type> >,
1229 typedef typename std::tuple<
1230 PT<typename std::tuple_element<0,OutputTuple>::type>,
1231 PT<typename std::tuple_element<1,OutputTuple>::type>,
1232 PT<typename std::tuple_element<2,OutputTuple>::type>,
1233 PT<typename std::tuple_element<3,OutputTuple>::type>,
1234 PT<typename std::tuple_element<4,OutputTuple>::type>,
1235 PT<typename std::tuple_element<5,OutputTuple>::type>,
1236 PT<typename std::tuple_element<6,OutputTuple>::type>,
1237 PT<typename std::tuple_element<7,OutputTuple>::type>,
1238 PT<typename std::tuple_element<8,OutputTuple>::type> > input_ports_tuple_type;
1239 typedef OutputTuple output_type;
1241 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1243 unfolded_join_node(graph &g) : base_type(g) {}
1244 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1247 template<template<class> class PT, typename OutputTuple, graph_buffer_policy JP>
1248 class unfolded_join_node<10,PT,OutputTuple,JP> : public internal::join_node_base<JP,
1250 PT<typename std::tuple_element<0,OutputTuple>::type>,
1251 PT<typename std::tuple_element<1,OutputTuple>::type>,
1252 PT<typename std::tuple_element<2,OutputTuple>::type>,
1253 PT<typename std::tuple_element<3,OutputTuple>::type>,
1254 PT<typename std::tuple_element<4,OutputTuple>::type>,
1255 PT<typename std::tuple_element<5,OutputTuple>::type>,
1256 PT<typename std::tuple_element<6,OutputTuple>::type>,
1257 PT<typename std::tuple_element<7,OutputTuple>::type>,
1258 PT<typename std::tuple_element<8,OutputTuple>::type>,
1259 PT<typename std::tuple_element<9,OutputTuple>::type> >,
1263 typedef typename std::tuple<
1264 PT<typename std::tuple_element<0,OutputTuple>::type>,
1265 PT<typename std::tuple_element<1,OutputTuple>::type>,
1266 PT<typename std::tuple_element<2,OutputTuple>::type>,
1267 PT<typename std::tuple_element<3,OutputTuple>::type>,
1268 PT<typename std::tuple_element<4,OutputTuple>::type>,
1269 PT<typename std::tuple_element<5,OutputTuple>::type>,
1270 PT<typename std::tuple_element<6,OutputTuple>::type>,
1271 PT<typename std::tuple_element<7,OutputTuple>::type>,
1272 PT<typename std::tuple_element<8,OutputTuple>::type>,
1273 PT<typename std::tuple_element<9,OutputTuple>::type> > input_ports_tuple_type;
1274 typedef OutputTuple output_type;
1276 typedef join_node_base<JP, input_ports_tuple_type, output_type > base_type;
1278 unfolded_join_node(graph &g) : base_type(g) {}
1279 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1282 // tag_matching unfolded_join_node. This must be a separate specialization because the constructors
1285 template<typename OutputTuple>
1286 class unfolded_join_node<2,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1288 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1289 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type> >,
1293 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1294 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1296 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1> > input_ports_tuple_type;
1297 typedef OutputTuple output_type;
1299 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1300 typedef typename internal::function_body<T0, tag_value> *f0_p;
1301 typedef typename internal::function_body<T1, tag_value> *f1_p;
1302 typedef typename std::tuple< f0_p, f1_p > func_initializer_type;
1304 template<typename B0, typename B1>
1305 unfolded_join_node(graph &g, B0 b0, B1 b1) : base_type(g,
1306 func_initializer_type(
1307 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1308 new internal::function_body_leaf<T1, tag_value, B1>(b1)
1310 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1313 template<typename OutputTuple>
1314 class unfolded_join_node<3,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1316 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1317 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1318 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type> >,
1322 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1323 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1324 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1326 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>
1327 > input_ports_tuple_type;
1328 typedef OutputTuple output_type;
1330 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1331 typedef typename internal::function_body<T0, tag_value> *f0_p;
1332 typedef typename internal::function_body<T1, tag_value> *f1_p;
1333 typedef typename internal::function_body<T2, tag_value> *f2_p;
1334 typedef typename std::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1336 template<typename B0, typename B1, typename B2>
1337 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2) : base_type(g,
1338 func_initializer_type(
1339 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1340 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1341 new internal::function_body_leaf<T2, tag_value, B2>(b2)
1343 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1346 template<typename OutputTuple>
1347 class unfolded_join_node<4,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1349 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1350 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1351 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1352 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>
1355 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1356 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1357 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1358 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1360 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1361 tag_matching_port<T3> > input_ports_tuple_type;
1362 typedef OutputTuple output_type;
1364 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1365 typedef typename internal::function_body<T0, tag_value> *f0_p;
1366 typedef typename internal::function_body<T1, tag_value> *f1_p;
1367 typedef typename internal::function_body<T2, tag_value> *f2_p;
1368 typedef typename internal::function_body<T3, tag_value> *f3_p;
1369 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1371 template<typename B0, typename B1, typename B2, typename B3>
1372 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : base_type(g,
1373 func_initializer_type(
1374 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1375 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1376 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1377 new internal::function_body_leaf<T3, tag_value, B3>(b3)
1379 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1382 template<typename OutputTuple>
1383 class unfolded_join_node<5,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1385 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1386 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1387 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1388 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1389 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>
1392 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1393 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1394 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1395 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1396 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1398 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1399 tag_matching_port<T3>, tag_matching_port<T4> > input_ports_tuple_type;
1400 typedef OutputTuple output_type;
1402 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1403 typedef typename internal::function_body<T0, tag_value> *f0_p;
1404 typedef typename internal::function_body<T1, tag_value> *f1_p;
1405 typedef typename internal::function_body<T2, tag_value> *f2_p;
1406 typedef typename internal::function_body<T3, tag_value> *f3_p;
1407 typedef typename internal::function_body<T4, tag_value> *f4_p;
1408 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1410 template<typename B0, typename B1, typename B2, typename B3, typename B4>
1411 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : base_type(g,
1412 func_initializer_type(
1413 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1414 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1415 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1416 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1417 new internal::function_body_leaf<T4, tag_value, B4>(b4)
1419 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1422 template<typename OutputTuple>
1423 class unfolded_join_node<6,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1425 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1426 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1427 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1428 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1429 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>,
1430 tag_matching_port<typename std::tuple_element<5,OutputTuple>::type>
1433 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1434 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1435 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1436 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1437 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1438 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1440 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1441 tag_matching_port<T3>, tag_matching_port<T4>, tag_matching_port<T5> > input_ports_tuple_type;
1442 typedef OutputTuple output_type;
1444 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1445 typedef typename internal::function_body<T0, tag_value> *f0_p;
1446 typedef typename internal::function_body<T1, tag_value> *f1_p;
1447 typedef typename internal::function_body<T2, tag_value> *f2_p;
1448 typedef typename internal::function_body<T3, tag_value> *f3_p;
1449 typedef typename internal::function_body<T4, tag_value> *f4_p;
1450 typedef typename internal::function_body<T5, tag_value> *f5_p;
1451 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1453 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
1454 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : base_type(g,
1455 func_initializer_type(
1456 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1457 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1458 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1459 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1460 new internal::function_body_leaf<T4, tag_value, B4>(b4),
1461 new internal::function_body_leaf<T5, tag_value, B5>(b5)
1463 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1466 template<typename OutputTuple>
1467 class unfolded_join_node<7,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1469 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1470 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1471 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1472 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1473 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>,
1474 tag_matching_port<typename std::tuple_element<5,OutputTuple>::type>,
1475 tag_matching_port<typename std::tuple_element<6,OutputTuple>::type>
1478 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1479 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1480 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1481 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1482 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1483 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1484 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1486 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1487 tag_matching_port<T3>, tag_matching_port<T4>, tag_matching_port<T5>, tag_matching_port<T6>
1488 > input_ports_tuple_type;
1489 typedef OutputTuple output_type;
1491 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1492 typedef typename internal::function_body<T0, tag_value> *f0_p;
1493 typedef typename internal::function_body<T1, tag_value> *f1_p;
1494 typedef typename internal::function_body<T2, tag_value> *f2_p;
1495 typedef typename internal::function_body<T3, tag_value> *f3_p;
1496 typedef typename internal::function_body<T4, tag_value> *f4_p;
1497 typedef typename internal::function_body<T5, tag_value> *f5_p;
1498 typedef typename internal::function_body<T6, tag_value> *f6_p;
1499 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1501 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
1502 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : base_type(g,
1503 func_initializer_type(
1504 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1505 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1506 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1507 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1508 new internal::function_body_leaf<T4, tag_value, B4>(b4),
1509 new internal::function_body_leaf<T5, tag_value, B5>(b5),
1510 new internal::function_body_leaf<T6, tag_value, B6>(b6)
1512 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1515 template<typename OutputTuple>
1516 class unfolded_join_node<8,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1518 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1519 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1520 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1521 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1522 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>,
1523 tag_matching_port<typename std::tuple_element<5,OutputTuple>::type>,
1524 tag_matching_port<typename std::tuple_element<6,OutputTuple>::type>,
1525 tag_matching_port<typename std::tuple_element<7,OutputTuple>::type>
1528 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1529 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1530 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1531 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1532 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1533 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1534 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1535 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1537 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1538 tag_matching_port<T3>, tag_matching_port<T4>, tag_matching_port<T5>, tag_matching_port<T6>,
1539 tag_matching_port<T7> > input_ports_tuple_type;
1540 typedef OutputTuple output_type;
1542 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1543 typedef typename internal::function_body<T0, tag_value> *f0_p;
1544 typedef typename internal::function_body<T1, tag_value> *f1_p;
1545 typedef typename internal::function_body<T2, tag_value> *f2_p;
1546 typedef typename internal::function_body<T3, tag_value> *f3_p;
1547 typedef typename internal::function_body<T4, tag_value> *f4_p;
1548 typedef typename internal::function_body<T5, tag_value> *f5_p;
1549 typedef typename internal::function_body<T6, tag_value> *f6_p;
1550 typedef typename internal::function_body<T7, tag_value> *f7_p;
1551 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1553 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
1554 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : base_type(g,
1555 func_initializer_type(
1556 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1557 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1558 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1559 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1560 new internal::function_body_leaf<T4, tag_value, B4>(b4),
1561 new internal::function_body_leaf<T5, tag_value, B5>(b5),
1562 new internal::function_body_leaf<T6, tag_value, B6>(b6),
1563 new internal::function_body_leaf<T7, tag_value, B7>(b7)
1565 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1568 template<typename OutputTuple>
1569 class unfolded_join_node<9,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1571 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1572 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1573 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1574 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1575 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>,
1576 tag_matching_port<typename std::tuple_element<5,OutputTuple>::type>,
1577 tag_matching_port<typename std::tuple_element<6,OutputTuple>::type>,
1578 tag_matching_port<typename std::tuple_element<7,OutputTuple>::type>,
1579 tag_matching_port<typename std::tuple_element<8,OutputTuple>::type>
1582 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1583 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1584 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1585 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1586 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1587 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1588 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1589 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1590 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1592 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1593 tag_matching_port<T3>, tag_matching_port<T4>, tag_matching_port<T5>, tag_matching_port<T6>,
1594 tag_matching_port<T7>, tag_matching_port<T8> > input_ports_tuple_type;
1595 typedef OutputTuple output_type;
1597 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1598 typedef typename internal::function_body<T0, tag_value> *f0_p;
1599 typedef typename internal::function_body<T1, tag_value> *f1_p;
1600 typedef typename internal::function_body<T2, tag_value> *f2_p;
1601 typedef typename internal::function_body<T3, tag_value> *f3_p;
1602 typedef typename internal::function_body<T4, tag_value> *f4_p;
1603 typedef typename internal::function_body<T5, tag_value> *f5_p;
1604 typedef typename internal::function_body<T6, tag_value> *f6_p;
1605 typedef typename internal::function_body<T7, tag_value> *f7_p;
1606 typedef typename internal::function_body<T8, tag_value> *f8_p;
1607 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1609 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
1610 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : base_type(g,
1611 func_initializer_type(
1612 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1613 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1614 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1615 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1616 new internal::function_body_leaf<T4, tag_value, B4>(b4),
1617 new internal::function_body_leaf<T5, tag_value, B5>(b5),
1618 new internal::function_body_leaf<T6, tag_value, B6>(b6),
1619 new internal::function_body_leaf<T7, tag_value, B7>(b7),
1620 new internal::function_body_leaf<T8, tag_value, B8>(b8)
1622 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1625 template<typename OutputTuple>
1626 class unfolded_join_node<10,tag_matching_port,OutputTuple,tag_matching> : public internal::join_node_base<tag_matching,
1628 tag_matching_port<typename std::tuple_element<0,OutputTuple>::type>,
1629 tag_matching_port<typename std::tuple_element<1,OutputTuple>::type>,
1630 tag_matching_port<typename std::tuple_element<2,OutputTuple>::type>,
1631 tag_matching_port<typename std::tuple_element<3,OutputTuple>::type>,
1632 tag_matching_port<typename std::tuple_element<4,OutputTuple>::type>,
1633 tag_matching_port<typename std::tuple_element<5,OutputTuple>::type>,
1634 tag_matching_port<typename std::tuple_element<6,OutputTuple>::type>,
1635 tag_matching_port<typename std::tuple_element<7,OutputTuple>::type>,
1636 tag_matching_port<typename std::tuple_element<8,OutputTuple>::type>,
1637 tag_matching_port<typename std::tuple_element<9,OutputTuple>::type>
1640 typedef typename std::tuple_element<0, OutputTuple>::type T0;
1641 typedef typename std::tuple_element<1, OutputTuple>::type T1;
1642 typedef typename std::tuple_element<2, OutputTuple>::type T2;
1643 typedef typename std::tuple_element<3, OutputTuple>::type T3;
1644 typedef typename std::tuple_element<4, OutputTuple>::type T4;
1645 typedef typename std::tuple_element<5, OutputTuple>::type T5;
1646 typedef typename std::tuple_element<6, OutputTuple>::type T6;
1647 typedef typename std::tuple_element<7, OutputTuple>::type T7;
1648 typedef typename std::tuple_element<8, OutputTuple>::type T8;
1649 typedef typename std::tuple_element<9, OutputTuple>::type T9;
1651 typedef typename std::tuple< tag_matching_port<T0>, tag_matching_port<T1>, tag_matching_port<T2>,
1652 tag_matching_port<T3>, tag_matching_port<T4>, tag_matching_port<T5>, tag_matching_port<T6>,
1653 tag_matching_port<T7>, tag_matching_port<T8>, tag_matching_port<T9> > input_ports_tuple_type;
1654 typedef OutputTuple output_type;
1656 typedef join_node_base<tag_matching, input_ports_tuple_type, output_type > base_type;
1657 typedef typename internal::function_body<T0, tag_value> *f0_p;
1658 typedef typename internal::function_body<T1, tag_value> *f1_p;
1659 typedef typename internal::function_body<T2, tag_value> *f2_p;
1660 typedef typename internal::function_body<T3, tag_value> *f3_p;
1661 typedef typename internal::function_body<T4, tag_value> *f4_p;
1662 typedef typename internal::function_body<T5, tag_value> *f5_p;
1663 typedef typename internal::function_body<T6, tag_value> *f6_p;
1664 typedef typename internal::function_body<T7, tag_value> *f7_p;
1665 typedef typename internal::function_body<T8, tag_value> *f8_p;
1666 typedef typename internal::function_body<T9, tag_value> *f9_p;
1667 typedef typename std::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1669 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
1670 unfolded_join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : base_type(g,
1671 func_initializer_type(
1672 new internal::function_body_leaf<T0, tag_value, B0>(b0),
1673 new internal::function_body_leaf<T1, tag_value, B1>(b1),
1674 new internal::function_body_leaf<T2, tag_value, B2>(b2),
1675 new internal::function_body_leaf<T3, tag_value, B3>(b3),
1676 new internal::function_body_leaf<T4, tag_value, B4>(b4),
1677 new internal::function_body_leaf<T5, tag_value, B5>(b5),
1678 new internal::function_body_leaf<T6, tag_value, B6>(b6),
1679 new internal::function_body_leaf<T7, tag_value, B7>(b7),
1680 new internal::function_body_leaf<T8, tag_value, B8>(b8),
1681 new internal::function_body_leaf<T9, tag_value, B9>(b9)
1683 unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1686 //! templated function to refer to input ports of the join node
1687 template<size_t N, typename JNT>
1688 typename std::tuple_element<N, typename JNT::input_ports_tuple_type>::type &input_port(JNT &jn) {
1689 return std::get<N>(jn.inputs());