]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/operators/rx-merge.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / operators / rx-merge.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
6 #define RXCPP_OPERATORS_RX_MERGE_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace operators {
13
14 namespace detail {
15
16 template<class T, class Observable, class Coordination>
17 struct merge
18     : public operator_base<typename std::decay<T>::type::value_type>
19 {
20     //static_assert(is_observable<Observable>::value, "merge requires an observable");
21     //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
22
23     typedef merge<T, Observable, Coordination> this_type;
24
25     typedef typename std::decay<T>::type source_value_type;
26     typedef typename std::decay<Observable>::type source_type;
27
28     typedef typename source_type::source_operator_type source_operator_type;
29     typedef typename source_value_type::value_type value_type;
30
31     typedef typename std::decay<Coordination>::type coordination_type;
32     typedef typename coordination_type::coordinator_type coordinator_type;
33
34     struct values
35     {
36         values(source_operator_type o, coordination_type sf)
37             : source_operator(std::move(o))
38             , coordination(std::move(sf))
39         {
40         }
41         source_operator_type source_operator;
42         coordination_type coordination;
43     };
44     values initial;
45
46     merge(const source_type& o, coordination_type sf)
47         : initial(o.source_operator, std::move(sf))
48     {
49     }
50
51     template<class Subscriber>
52     void on_subscribe(Subscriber scbr) const {
53         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
54
55         typedef Subscriber output_type;
56
57         struct merge_state_type
58             : public std::enable_shared_from_this<merge_state_type>
59             , public values
60         {
61             merge_state_type(values i, coordinator_type coor, output_type oarg)
62                 : values(i)
63                 , source(i.source_operator)
64                 , pendingCompletions(0)
65                 , coordinator(std::move(coor))
66                 , out(std::move(oarg))
67             {
68             }
69             observable<source_value_type, source_operator_type> source;
70             // on_completed on the output must wait until all the
71             // subscriptions have received on_completed
72             int pendingCompletions;
73             coordinator_type coordinator;
74             output_type out;
75         };
76
77         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
78
79         // take a copy of the values for each subscription
80         auto state = std::shared_ptr<merge_state_type>(new merge_state_type(initial, std::move(coordinator), std::move(scbr)));
81
82         composite_subscription outercs;
83
84         // when the out observer is unsubscribed all the
85         // inner subscriptions are unsubscribed as well
86         state->out.add(outercs);
87
88         auto source = on_exception(
89             [&](){return state->coordinator.in(state->source);},
90             state->out);
91         if (source.empty()) {
92             return;
93         }
94
95         ++state->pendingCompletions;
96         // this subscribe does not share the observer subscription
97         // so that when it is unsubscribed the observer can be called
98         // until the inner subscriptions have finished
99         auto sink = make_subscriber<source_value_type>(
100             state->out,
101             outercs,
102         // on_next
103             [state](source_value_type st) {
104
105                 composite_subscription innercs;
106
107                 // when the out observer is unsubscribed all the
108                 // inner subscriptions are unsubscribed as well
109                 auto innercstoken = state->out.add(innercs);
110
111                 innercs.add(make_subscription([state, innercstoken](){
112                     state->out.remove(innercstoken);
113                 }));
114
115                 auto selectedSource = on_exception(
116                     [&](){return state->coordinator.in(st);},
117                     state->out);
118                 if (selectedSource.empty()) {
119                     return;
120                 }
121
122                 ++state->pendingCompletions;
123                 // this subscribe does not share the source subscription
124                 // so that when it is unsubscribed the source will continue
125                 auto sinkInner = make_subscriber<value_type>(
126                     state->out,
127                     innercs,
128                 // on_next
129                     [state, st](value_type ct) {
130                         state->out.on_next(std::move(ct));
131                     },
132                 // on_error
133                     [state](std::exception_ptr e) {
134                         state->out.on_error(e);
135                     },
136                 //on_completed
137                     [state](){
138                         if (--state->pendingCompletions == 0) {
139                             state->out.on_completed();
140                         }
141                     }
142                 );
143                 auto selectedSinkInner = on_exception(
144                     [&](){return state->coordinator.out(sinkInner);},
145                     state->out);
146                 if (selectedSinkInner.empty()) {
147                     return;
148                 }
149                 selectedSource->subscribe(std::move(selectedSinkInner.get()));
150             },
151         // on_error
152             [state](std::exception_ptr e) {
153                 state->out.on_error(e);
154             },
155         // on_completed
156             [state]() {
157                 if (--state->pendingCompletions == 0) {
158                     state->out.on_completed();
159                 }
160             }
161         );
162         auto selectedSink = on_exception(
163             [&](){return state->coordinator.out(sink);},
164             state->out);
165         if (selectedSink.empty()) {
166             return;
167         }
168         source->subscribe(std::move(selectedSink.get()));
169     }
170 };
171
172 template<class Coordination>
173 class merge_factory
174 {
175     typedef typename std::decay<Coordination>::type coordination_type;
176
177     coordination_type coordination;
178 public:
179     merge_factory(coordination_type sf)
180         : coordination(std::move(sf))
181     {
182     }
183
184     template<class Observable>
185     auto operator()(Observable source)
186         ->      observable<typename merge<typename Observable::value_type, Observable, Coordination>::value_type,   merge<typename Observable::value_type, Observable, Coordination>> {
187         return  observable<typename merge<typename Observable::value_type, Observable, Coordination>::value_type,   merge<typename Observable::value_type, Observable, Coordination>>(
188                                                                                                                     merge<typename Observable::value_type, Observable, Coordination>(std::move(source), coordination));
189     }
190 };
191
192 }
193
194 template<class Coordination>
195 auto merge(Coordination&& sf)
196     ->      detail::merge_factory<Coordination> {
197     return  detail::merge_factory<Coordination>(std::forward<Coordination>(sf));
198 }
199
200 }
201
202 }
203
204 #endif