]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/operators/rx-repeat.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / operators / rx-repeat.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_OPERATORS_RX_REPEAT_HPP)
6 #define RXCPP_OPERATORS_RX_REPEAT_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace operators {
13
14 namespace detail {
15
16 template<class T, class Observable, class Count>
17 struct repeat : public operator_base<T>
18 {
19     typedef typename std::decay<Observable>::type source_type;
20     typedef typename std::decay<Count>::type count_type;
21     struct values
22     {
23         values(source_type s, count_type t)
24             : source(std::move(s))
25             , remaining(std::move(t))
26             , repeat_infinitely(t == 0)
27         {
28         }
29         source_type source;
30         count_type remaining;
31         bool repeat_infinitely;
32     };
33     values initial;
34
35     repeat(source_type s, count_type t)
36         : initial(std::move(s), std::move(t))
37     {
38     }
39
40     template<class Subscriber>
41     void on_subscribe(const Subscriber& s) const {
42
43         typedef Subscriber output_type;
44         struct state_type
45             : public std::enable_shared_from_this<state_type>
46             , public values
47         {
48             state_type(const values& i, const output_type& oarg)
49                 : values(i)
50                 , source_lifetime(composite_subscription::empty())
51                 , out(oarg)
52             {
53             }
54             composite_subscription source_lifetime;
55             output_type out;
56
57             void do_subscribe() {
58                 auto state = this->shared_from_this();
59
60                 state->source_lifetime = composite_subscription();
61                 state->out.add(state->source_lifetime);
62
63                 state->source.subscribe(
64                     state->out,
65                     state->source_lifetime,
66                 // on_next
67                     [state](T t) {
68                         state->out.on_next(t);
69                     },
70                 // on_error
71                     [state](std::exception_ptr e) {
72                         state->out.on_error(e);
73                     },
74                 // on_completed
75                     [state]() {
76                         if (state->repeat_infinitely || (--state->remaining > 0)) {
77                             state->do_subscribe();
78                         } else {
79                             state->out.on_completed();
80                         }
81                     }
82                 );
83             }
84         };
85
86         // take a copy of the values for each subscription
87         auto state = std::shared_ptr<state_type>(new state_type(initial, s));
88
89         // start the first iteration
90         state->do_subscribe();
91     }
92 };
93
94 template<class T>
95 class repeat_factory
96 {
97     typedef typename std::decay<T>::type count_type;
98     count_type count;
99 public:
100     repeat_factory(count_type t) : count(std::move(t)) {}
101
102     template<class Observable>
103     auto operator()(Observable&& source)
104         ->      observable<typename std::decay<Observable>::type::value_type, repeat<typename std::decay<Observable>::type::value_type, Observable, count_type>> {
105         return  observable<typename std::decay<Observable>::type::value_type, repeat<typename std::decay<Observable>::type::value_type, Observable, count_type>>(
106                                                                               repeat<typename std::decay<Observable>::type::value_type, Observable, count_type>(std::forward<Observable>(source), count));
107     }
108 };
109
110 }
111
112 template<class T>
113 auto repeat(T&& t)
114 ->      detail::repeat_factory<T> {
115     return  detail::repeat_factory<T>(std::forward<T>(t));
116 }
117
118 }
119
120 }
121
122 #endif