]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/operators/rx-multicast.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / operators / rx-multicast.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
6 #define RXCPP_OPERATORS_RX_MULTICAST_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace operators {
13
14 namespace detail {
15
16 template<class T, class Observable, class Subject>
17 struct multicast : public operator_base<T>
18 {
19     typedef typename std::decay<Observable>::type source_type;
20     typedef typename std::decay<Subject>::type subject_type;
21
22     struct multicast_state : public std::enable_shared_from_this<multicast_state>
23     {
24         multicast_state(source_type o, subject_type sub)
25             : source(std::move(o))
26             , subject_value(std::move(sub))
27         {
28         }
29         source_type source;
30         subject_type subject_value;
31         rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
32     };
33
34     std::shared_ptr<multicast_state> state;
35
36     multicast(source_type o, subject_type sub)
37         : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
38     {
39     }
40     template<class Subscriber>
41     void on_subscribe(Subscriber&& o) const {
42         state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
43     }
44     void on_connect(composite_subscription cs) const {
45         if (state->connection.empty()) {
46             auto destination = state->subject_value.get_subscriber();
47
48             // the lifetime of each connect is nested in the subject lifetime
49             state->connection.reset(destination.add(cs));
50
51             auto localState = state;
52
53             // when the connection is finished it should shutdown the connection
54             cs.add(
55                 [destination, localState](){
56                     if (!localState->connection.empty()) {
57                         destination.remove(localState->connection.get());
58                         localState->connection.reset();
59                     }
60                 });
61
62             // use cs not destination for lifetime of subscribe.
63             state->source.subscribe(cs, destination);
64         }
65     }
66 };
67
68 template<class Subject>
69 class multicast_factory
70 {
71     Subject caster;
72 public:
73     multicast_factory(Subject sub)
74         : caster(std::move(sub))
75     {
76     }
77     template<class Observable>
78     auto operator()(Observable&& source)
79         ->      connectable_observable<typename std::decay<Observable>::type::value_type,   multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>> {
80         return  connectable_observable<typename std::decay<Observable>::type::value_type,   multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>>(
81                                                                                             multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>(std::forward<Observable>(source), caster));
82     }
83 };
84
85 }
86
87 template<class Subject>
88 inline auto multicast(Subject sub)
89     ->      detail::multicast_factory<Subject> {
90     return  detail::multicast_factory<Subject>(std::move(sub));
91 }
92
93 }
94
95 }
96
97 #endif