]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/operators/rx-subscribe_on.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / operators / rx-subscribe_on.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP)
6 #define RXCPP_OPERATORS_RX_SUBSCRIBE_ON_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace operators {
13
14 namespace detail {
15
16 template<class T, class Observable, class Coordination>
17 struct subscribe_on : public operator_base<T>
18 {
19     typedef typename std::decay<Observable>::type source_type;
20     typedef typename std::decay<Coordination>::type coordination_type;
21     typedef typename coordination_type::coordinator_type coordinator_type;
22     struct subscribe_on_values
23     {
24         ~subscribe_on_values()
25         {
26         }
27         subscribe_on_values(source_type s, coordination_type sf)
28             : source(std::move(s))
29             , coordination(std::move(sf))
30         {
31         }
32         source_type source;
33         coordination_type coordination;
34     };
35     const subscribe_on_values initial;
36
37     ~subscribe_on()
38     {
39     }
40     subscribe_on(source_type s, coordination_type sf)
41         : initial(std::move(s), std::move(sf))
42     {
43     }
44
45     template<class Subscriber>
46     void on_subscribe(Subscriber s) const {
47
48         typedef Subscriber output_type;
49         struct subscribe_on_state_type
50             : public std::enable_shared_from_this<subscribe_on_state_type>
51             , public subscribe_on_values
52         {
53             subscribe_on_state_type(const subscribe_on_values& i, coordinator_type coor, const output_type& oarg)
54                 : subscribe_on_values(i)
55                 , coordinator(std::move(coor))
56                 , out(oarg)
57             {
58             }
59             composite_subscription source_lifetime;
60             coordinator_type coordinator;
61             output_type out;
62         };
63
64         auto coordinator = initial.coordination.create_coordinator(s.get_subscription());
65
66         auto controller = coordinator.get_worker();
67
68         // take a copy of the values for each subscription
69         auto state = std::shared_ptr<subscribe_on_state_type>(new subscribe_on_state_type(initial, std::move(coordinator), std::move(s)));
70
71         auto disposer = [=](const rxsc::schedulable&){
72             state->source_lifetime.unsubscribe();
73             state->out.unsubscribe();
74         };
75         auto selectedDisposer = on_exception(
76             [&](){return state->coordinator.act(disposer);},
77             state->out);
78         if (selectedDisposer.empty()) {
79             return;
80         }
81
82         state->out.add([=](){
83             controller.schedule(selectedDisposer.get());
84         });
85         state->source_lifetime.add([=](){
86             controller.schedule(selectedDisposer.get());
87         });
88
89         auto producer = [=](const rxsc::schedulable&){
90             state->source.subscribe(state->source_lifetime, state->out);
91         };
92
93         auto selectedProducer = on_exception(
94             [&](){return state->coordinator.act(producer);},
95             state->out);
96         if (selectedProducer.empty()) {
97             return;
98         }
99
100         controller.schedule(selectedProducer.get());
101     }
102 };
103
104 template<class Coordination>
105 class subscribe_on_factory
106 {
107     typedef typename std::decay<Coordination>::type coordination_type;
108
109     coordination_type coordination;
110 public:
111     subscribe_on_factory(coordination_type sf)
112         : coordination(std::move(sf))
113     {
114     }
115     template<class Observable>
116     auto operator()(Observable&& source)
117         ->      observable<typename std::decay<Observable>::type::value_type,   subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>> {
118         return  observable<typename std::decay<Observable>::type::value_type,   subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>>(
119                                                                                 subscribe_on<typename std::decay<Observable>::type::value_type, Observable, Coordination>(std::forward<Observable>(source), coordination));
120     }
121 };
122
123 }
124
125 template<class Coordination>
126 auto subscribe_on(Coordination sf)
127     ->      detail::subscribe_on_factory<Coordination> {
128     return  detail::subscribe_on_factory<Coordination>(std::move(sf));
129 }
130
131 }
132
133 }
134
135 #endif