1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_OPERATORS_RX_MERGE_HPP)
6 #define RXCPP_OPERATORS_RX_MERGE_HPP
8 #include "../rx-includes.hpp"
16 template<class T, class Observable, class Coordination>
18 : public operator_base<typename std::decay<T>::type::value_type>
20 //static_assert(is_observable<Observable>::value, "merge requires an observable");
21 //static_assert(is_observable<T>::value, "merge requires an observable that contains observables");
23 typedef merge<T, Observable, Coordination> this_type;
25 typedef typename std::decay<T>::type source_value_type;
26 typedef typename std::decay<Observable>::type source_type;
28 typedef typename source_type::source_operator_type source_operator_type;
29 typedef typename source_value_type::value_type value_type;
31 typedef typename std::decay<Coordination>::type coordination_type;
32 typedef typename coordination_type::coordinator_type coordinator_type;
36 values(source_operator_type o, coordination_type sf)
37 : source_operator(std::move(o))
38 , coordination(std::move(sf))
41 source_operator_type source_operator;
42 coordination_type coordination;
46 merge(const source_type& o, coordination_type sf)
47 : initial(o.source_operator, std::move(sf))
51 template<class Subscriber>
52 void on_subscribe(Subscriber scbr) const {
53 static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
55 typedef Subscriber output_type;
57 struct merge_state_type
58 : public std::enable_shared_from_this<merge_state_type>
61 merge_state_type(values i, coordinator_type coor, output_type oarg)
63 , source(i.source_operator)
64 , pendingCompletions(0)
65 , coordinator(std::move(coor))
66 , out(std::move(oarg))
69 observable<source_value_type, source_operator_type> source;
70 // on_completed on the output must wait until all the
71 // subscriptions have received on_completed
72 int pendingCompletions;
73 coordinator_type coordinator;
77 auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription());
79 // take a copy of the values for each subscription
80 auto state = std::shared_ptr<merge_state_type>(new merge_state_type(initial, std::move(coordinator), std::move(scbr)));
82 composite_subscription outercs;
84 // when the out observer is unsubscribed all the
85 // inner subscriptions are unsubscribed as well
86 state->out.add(outercs);
88 auto source = on_exception(
89 [&](){return state->coordinator.in(state->source);},
95 ++state->pendingCompletions;
96 // this subscribe does not share the observer subscription
97 // so that when it is unsubscribed the observer can be called
98 // until the inner subscriptions have finished
99 auto sink = make_subscriber<source_value_type>(
103 [state](source_value_type st) {
105 composite_subscription innercs;
107 // when the out observer is unsubscribed all the
108 // inner subscriptions are unsubscribed as well
109 auto innercstoken = state->out.add(innercs);
111 innercs.add(make_subscription([state, innercstoken](){
112 state->out.remove(innercstoken);
115 auto selectedSource = on_exception(
116 [&](){return state->coordinator.in(st);},
118 if (selectedSource.empty()) {
122 ++state->pendingCompletions;
123 // this subscribe does not share the source subscription
124 // so that when it is unsubscribed the source will continue
125 auto sinkInner = make_subscriber<value_type>(
129 [state, st](value_type ct) {
130 state->out.on_next(std::move(ct));
133 [state](std::exception_ptr e) {
134 state->out.on_error(e);
138 if (--state->pendingCompletions == 0) {
139 state->out.on_completed();
143 auto selectedSinkInner = on_exception(
144 [&](){return state->coordinator.out(sinkInner);},
146 if (selectedSinkInner.empty()) {
149 selectedSource->subscribe(std::move(selectedSinkInner.get()));
152 [state](std::exception_ptr e) {
153 state->out.on_error(e);
157 if (--state->pendingCompletions == 0) {
158 state->out.on_completed();
162 auto selectedSink = on_exception(
163 [&](){return state->coordinator.out(sink);},
165 if (selectedSink.empty()) {
168 source->subscribe(std::move(selectedSink.get()));
172 template<class Coordination>
175 typedef typename std::decay<Coordination>::type coordination_type;
177 coordination_type coordination;
179 merge_factory(coordination_type sf)
180 : coordination(std::move(sf))
184 template<class Observable>
185 auto operator()(Observable source)
186 -> observable<typename merge<typename Observable::value_type, Observable, Coordination>::value_type, merge<typename Observable::value_type, Observable, Coordination>> {
187 return observable<typename merge<typename Observable::value_type, Observable, Coordination>::value_type, merge<typename Observable::value_type, Observable, Coordination>>(
188 merge<typename Observable::value_type, Observable, Coordination>(std::move(source), coordination));
194 template<class Coordination>
195 auto merge(Coordination&& sf)
196 -> detail::merge_factory<Coordination> {
197 return detail::merge_factory<Coordination>(std::forward<Coordination>(sf));