1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
6 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
8 #include "../rx-includes.hpp"
16 template<class T, class Observable, class Coordination>
17 struct subscribe_on : public operator_base<T>
19 typedef typename std::decay<Observable>::type source_type;
20 typedef typename std::decay<Coordination>::type coordination_type;
21 typedef typename coordination_type::coordinator_type coordinator_type;
22 struct subscribe_on_values
24 ~subscribe_on_values()
27 subscribe_on_values(source_type s, coordination_type sf)
28 : source(std::move(s))
29 , coordination(std::move(sf))
33 coordination_type coordination;
35 const subscribe_on_values initial;
40 subscribe_on(source_type s, coordination_type sf)
41 : initial(std::move(s), std::move(sf))
45 template<class Subscriber>
46 void on_subscribe(Subscriber s) const {
48 typedef Subscriber output_type;
49 struct subscribe_on_state_type
50 : public std::enable_shared_from_this<subscribe_on_state_type>
51 , public subscribe_on_values
53 subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg)
54 : subscribe_on_values(i)
55 , coordinator(std::move(coor))
59 composite_subscription source_lifetime;
60 coordinator_type coordinator;
64 auto coordinator = initial.coordination.create_coordinator(s.get_subscription());
66 auto controller = coordinator.get_worker();
68 // take a copy of the values for each subscription
69 auto state = std::shared_ptr<subscribe_on_state_type>(new subscribe_on_state_type(initial, std::move(coordinator), std::move(s)));
71 auto disposer = [=](const rxsc::schedulable&){
72 state->source_lifetime.unsubscribe();
73 state->out.unsubscribe();
75 auto selectedDisposer = on_exception(
76 [&](){return state->coordinator.act(disposer);},
78 if (selectedDisposer.empty()) {
83 controller.schedule(selectedDisposer.get());
85 state->source_lifetime.add([=](){
86 controller.schedule(selectedDisposer.get());
89 auto producer = [=](const rxsc::schedulable&){
90 state->source.subscribe(state->source_lifetime, state->out);
93 auto selectedProducer = on_exception(
94 [&](){return state->coordinator.act(producer);},
96 if (selectedProducer.empty()) {
100 controller.schedule(selectedProducer.get());
104 template<class Coordination>
105 class subscribe_on_factory
107 typedef typename std::decay<Coordination>::type coordination_type;
109 coordination_type coordination;
111 subscribe_on_factory(coordination_type sf)
112 : coordination(std::move(sf))
115 template<class Observable>
116 auto operator()(Observable&& source)
117 -> observable<typename std::decay<Observable>::type::value_type, subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>> {
118 return observable<typename std::decay<Observable>::type::value_type, subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>>(
119 subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>(std::forward<Observable>(source), coordination));
125 template<class Coordination>
126 auto subscribe_on(Coordination sf)
127 -> detail::subscribe_on_factory<Coordination> {
128 return detail::subscribe_on_factory<Coordination>(std::move(sf));