1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_OPERATORS_RX_MULTICAST_HPP)
6 #define RXCPP_OPERATORS_RX_MULTICAST_HPP
8 #include "../rx-includes.hpp"
16 template<class T, class Observable, class Subject>
17 struct multicast : public operator_base<T>
19 typedef typename std::decay<Observable>::type source_type;
20 typedef typename std::decay<Subject>::type subject_type;
22 struct multicast_state : public std::enable_shared_from_this<multicast_state>
24 multicast_state(source_type o, subject_type sub)
25 : source(std::move(o))
26 , subject_value(std::move(sub))
30 subject_type subject_value;
31 rxu::detail::maybe<typename composite_subscription::weak_subscription> connection;
34 std::shared_ptr<multicast_state> state;
36 multicast(source_type o, subject_type sub)
37 : state(std::make_shared<multicast_state>(std::move(o), std::move(sub)))
40 template<class Subscriber>
41 void on_subscribe(Subscriber&& o) const {
42 state->subject_value.get_observable().subscribe(std::forward<Subscriber>(o));
44 void on_connect(composite_subscription cs) const {
45 if (state->connection.empty()) {
46 auto destination = state->subject_value.get_subscriber();
48 // the lifetime of each connect is nested in the subject lifetime
49 state->connection.reset(destination.add(cs));
51 auto localState = state;
53 // when the connection is finished it should shutdown the connection
55 [destination, localState](){
56 if (!localState->connection.empty()) {
57 destination.remove(localState->connection.get());
58 localState->connection.reset();
62 // use cs not destination for lifetime of subscribe.
63 state->source.subscribe(cs, destination);
68 template<class Subject>
69 class multicast_factory
73 multicast_factory(Subject sub)
74 : caster(std::move(sub))
77 template<class Observable>
78 auto operator()(Observable&& source)
79 -> connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>> {
80 return connectable_observable<typename std::decay<Observable>::type::value_type, multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>>(
81 multicast<typename std::decay<Observable>::type::value_type, Observable, Subject>(std::forward<Observable>(source), caster));
87 template<class Subject>
88 inline auto multicast(Subject sub)
89 -> detail::multicast_factory<Subject> {
90 return detail::multicast_factory<Subject>(std::move(sub));