]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/schedulers/rx-test.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / schedulers / rx-test.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 namespace detail {
15
16 class test_type : public scheduler_interface
17 {
18 public:
19
20     typedef scheduler_interface::clock_type clock_type;
21
22     struct test_type_state : public virtual_time<long, long>
23     {
24         typedef virtual_time<long, long> base;
25
26         using base::schedule_absolute;
27         using base::schedule_relative;
28
29         clock_type::time_point now() const {
30             return to_time_point(clock_now);
31         }
32
33         virtual void schedule_absolute(long when, const schedulable& a) const
34         {
35             if (when <= base::clock_now)
36                 when = base::clock_now + 1;
37
38             return base::schedule_absolute(when, a);
39         }
40
41         virtual long add(long absolute, long relative) const
42         {
43             return absolute + relative;
44         }
45
46         virtual clock_type::time_point to_time_point(long absolute) const
47         {
48             return clock_type::time_point(std::chrono::milliseconds(absolute));
49         }
50
51         virtual long to_relative(clock_type::duration d) const
52         {
53             return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
54         }
55     };
56
57 private:
58     mutable std::shared_ptr<test_type_state> state;
59
60 public:
61     struct test_type_worker : public worker_interface
62     {
63         mutable std::shared_ptr<test_type_state> state;
64
65         typedef test_type_state::absolute absolute;
66         typedef test_type_state::relative relative;
67
68         test_type_worker(std::shared_ptr<test_type_state> st)
69             : state(std::move(st))
70         {
71         }
72
73         virtual clock_type::time_point now() const {
74             return state->now();
75         }
76
77         virtual void schedule(const schedulable& scbl) const {
78             state->schedule_absolute(state->clock(), scbl);
79         }
80
81         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82             state->schedule_relative(state->to_relative(when - now()), scbl);
83         }
84
85         void schedule_absolute(absolute when, const schedulable& scbl) const {
86             state->schedule_absolute(when, scbl);
87         }
88
89         void schedule_relative(relative when, const schedulable& scbl) const {
90             state->schedule_relative(when, scbl);
91         }
92
93         bool is_enabled() const {return state->is_enabled();}
94         absolute clock() const {return state->clock();}
95
96         void start() const
97         {
98             state->start();
99         }
100
101         void stop() const
102         {
103             state->stop();
104         }
105
106         void advance_to(absolute time) const
107         {
108             state->advance_to(time);
109         }
110
111         void advance_by(relative time) const
112         {
113             state->advance_by(time);
114         }
115
116         void sleep(relative time) const
117         {
118             state->sleep(time);
119         }
120
121         template<class T>
122         subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
123     };
124
125 public:
126     test_type()
127         : state(new test_type_state())
128     {
129     }
130
131     virtual clock_type::time_point now() const {
132         return state->now();
133     }
134
135     virtual worker create_worker(composite_subscription cs) const {
136         std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
137         return worker(cs, wi);
138     }
139
140     bool is_enabled() const {return state->is_enabled();}
141     long clock() {
142         return state->clock();
143     }
144
145     std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
146         std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
147         return wi;
148     }
149
150     template<class T>
151     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
152
153     template<class T>
154     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
155 };
156
157 template<class T>
158 class mock_observer
159     : public rxt::detail::test_subject_base<T>
160 {
161     typedef typename rxn::notification<T> notification_type;
162     typedef rxn::recorded<typename notification_type::type> recorded_type;
163
164 public:
165     explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
166         : sc(sc)
167     {
168     }
169
170     std::shared_ptr<test_type::test_type_state> sc;
171     std::vector<recorded_type> m;
172
173     virtual void on_subscribe(subscriber<T>) const {
174         abort();
175     }
176     virtual std::vector<rxn::subscription> subscriptions() const {
177         abort(); return std::vector<rxn::subscription>();
178     }
179
180     virtual std::vector<recorded_type> messages() const {
181         return m;
182     }
183 };
184
185 template<class T>
186 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
187 {
188     typedef typename rxn::notification<T> notification_type;
189     typedef rxn::recorded<typename notification_type::type> recorded_type;
190
191     std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(state));
192
193     return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
194           // on_next
195           [ts](T value)
196           {
197               ts->m.push_back(
198                               recorded_type(ts->sc->clock(), notification_type::on_next(value)));
199           },
200           // on_error
201           [ts](std::exception_ptr e)
202           {
203               ts->m.push_back(
204                               recorded_type(ts->sc->clock(), notification_type::on_error(e)));
205           },
206           // on_completed
207           [ts]()
208           {
209               ts->m.push_back(
210                               recorded_type(ts->sc->clock(), notification_type::on_completed()));
211           })));
212 }
213
214 template<class T>
215 class cold_observable
216     : public rxt::detail::test_subject_base<T>
217 {
218     typedef cold_observable<T> this_type;
219     std::shared_ptr<test_type::test_type_state> sc;
220     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
221     mutable std::vector<recorded_type> mv;
222     mutable std::vector<rxn::subscription> sv;
223     mutable worker controller;
224
225 public:
226
227     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
228         : sc(sc)
229         , mv(std::move(mv))
230         , controller(w)
231     {
232     }
233
234     template<class Iterator>
235     cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
236         : sc(sc)
237         , mv(begin, end)
238         , controller(w)
239     {
240     }
241
242     virtual void on_subscribe(subscriber<T> o) const {
243         sv.push_back(rxn::subscription(sc->clock()));
244         auto index = sv.size() - 1;
245
246         for (auto& message : mv) {
247             auto n = message.value();
248             sc->schedule_relative(message.time(), make_schedulable(
249                 controller,
250                 [n, o](const schedulable& scbl) {
251                     if (o.is_subscribed()) {
252                         n->accept(o);
253                     }
254                 }));
255         }
256
257         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
258         o.add([sharedThis, index]() {
259             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
260         });
261     }
262
263     virtual std::vector<rxn::subscription> subscriptions() const {
264         return sv;
265     }
266
267     virtual std::vector<recorded_type> messages() const {
268         return mv;
269     }
270 };
271
272 template<class T>
273 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
274 {
275     auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(state, create_worker(composite_subscription()), std::move(messages)));
276     return rxt::testable_observable<T>(co);
277 }
278
279 template<class T>
280 class hot_observable
281     : public rxt::detail::test_subject_base<T>
282 {
283     typedef hot_observable<T> this_type;
284     std::shared_ptr<test_type::test_type_state> sc;
285     typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
286     typedef subscriber<T> observer_type;
287     mutable std::vector<recorded_type> mv;
288     mutable std::vector<rxn::subscription> sv;
289     mutable std::vector<observer_type> observers;
290     mutable worker controller;
291
292 public:
293
294     hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
295         : sc(sc)
296         , mv(mv)
297         , controller(w)
298     {
299         for (auto& message : mv) {
300             auto n = message.value();
301             sc->schedule_absolute(message.time(), make_schedulable(
302                 controller,
303                 [this, n](const schedulable& scbl) {
304                     auto local = this->observers;
305                     for (auto& o : local) {
306                         if (o.is_subscribed()) {
307                             n->accept(o);
308                         }
309                     }
310                 }));
311         }
312     }
313
314     virtual void on_subscribe(observer_type o) const {
315         observers.push_back(o);
316         sv.push_back(rxn::subscription(sc->clock()));
317         auto index = sv.size() - 1;
318
319         auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
320         o.add([sharedThis, index]() {
321             sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
322         });
323     }
324
325     virtual std::vector<rxn::subscription> subscriptions() const {
326         return sv;
327     }
328
329     virtual std::vector<recorded_type> messages() const {
330         return mv;
331     }
332 };
333
334 template<class T>
335 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
336 {
337     return rxt::testable_observable<T>(
338         std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)));
339 }
340
341 template<class F>
342 struct is_create_source_function
343 {
344     struct not_void {};
345     template<class CF>
346     static auto check(int) -> decltype((*(CF*)nullptr)());
347     template<class CF>
348     static not_void check(...);
349
350     static const bool value = is_observable<decltype(check<typename std::decay<F>::type>(0))>::value;
351 };
352
353 }
354
355 class test : public scheduler
356 {
357     std::shared_ptr<detail::test_type> tester;
358 public:
359
360     explicit test(std::shared_ptr<detail::test_type> t)
361         : scheduler(std::static_pointer_cast<scheduler_interface>(t))
362         , tester(t)
363     {
364     }
365
366     typedef detail::test_type::clock_type clock_type;
367
368     static const long created_time = 100;
369     static const long subscribed_time = 200;
370     static const long unsubscribed_time = 1000;
371
372     template<class T>
373     struct messages
374     {
375         typedef typename rxn::notification<T> notification_type;
376         typedef rxn::recorded<typename notification_type::type> recorded_type;
377         typedef rxn::subscription subscription_type;
378
379         messages() {}
380
381         struct on_next_factory
382         {
383             recorded_type operator()(long ticks, T value) const {
384                 return recorded_type(ticks, notification_type::on_next(value));
385             }
386         };
387         struct on_completed_factory
388         {
389             recorded_type operator()(long ticks) const {
390                 return recorded_type(ticks, notification_type::on_completed());
391             }
392         };
393         struct on_error_factory
394         {
395             template<class Exception>
396             recorded_type operator()(long ticks, Exception&& e) const {
397                 return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
398             }
399         };
400
401         static const on_next_factory next;
402         static const on_completed_factory completed;
403         static const on_error_factory error;
404
405         struct subscribe_factory
406         {
407             rxn::subscription operator()(long subscribe, long unsubscribe) const {
408                 return rxn::subscription(subscribe, unsubscribe);
409             }
410         };
411         static const subscribe_factory subscribe;
412     };
413
414     class test_worker : public worker
415     {
416         std::shared_ptr<detail::test_type::test_type_worker> tester;
417     public:
418
419         explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
420             : worker(cs, std::static_pointer_cast<worker_interface>(t))
421             , tester(t)
422         {
423         }
424
425         bool is_enabled() const {return tester->is_enabled();}
426         long clock() const {return tester->clock();}
427
428         void schedule_absolute(long when, const schedulable& a) const {
429             tester->schedule_absolute(when, a);
430         }
431
432         void schedule_relative(long when, const schedulable& a) const {
433             tester->schedule_relative(when, a);
434         }
435
436         template<class Arg0, class... ArgN>
437         auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
438             -> typename std::enable_if<
439                 (detail::is_action_function<Arg0>::value ||
440                 is_subscription<Arg0>::value) &&
441                 !is_schedulable<Arg0>::value>::type {
442             tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
443         }
444
445         template<class Arg0, class... ArgN>
446         auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
447             -> typename std::enable_if<
448                 (detail::is_action_function<Arg0>::value ||
449                 is_subscription<Arg0>::value) &&
450                 !is_schedulable<Arg0>::value>::type {
451             tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
452         }
453
454         template<class T, class F>
455         auto start(F createSource, long created, long subscribed, long unsubscribed) const
456             -> subscriber<T, rxt::testable_observer<T>>
457         {
458             struct state_type
459             : public std::enable_shared_from_this<state_type>
460             {
461                 typedef decltype(createSource()) source_type;
462
463                 std::unique_ptr<source_type> source;
464                 subscriber<T, rxt::testable_observer<T>> o;
465
466                 explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
467                 : source()
468                 , o(o)
469                 {
470                 }
471             };
472             std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>()));
473
474             schedule_absolute(created, [createSource, state](const schedulable& scbl) {
475                 state->source.reset(new typename state_type::source_type(createSource()));
476             });
477             schedule_absolute(subscribed, [state](const schedulable& scbl) {
478                 state->source->subscribe(state->o);
479             });
480             schedule_absolute(unsubscribed, [state](const schedulable& scbl) {
481                 state->o.unsubscribe();
482             });
483
484             tester->start();
485
486             return state->o;
487         }
488
489         template<class T, class F>
490         auto start(F&& createSource, long unsubscribed) const
491             -> subscriber<T, rxt::testable_observer<T>>
492         {
493             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
494         }
495
496         template<class T, class F>
497         auto start(F&& createSource) const
498             -> subscriber<T, rxt::testable_observer<T>>
499         {
500             return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
501         }
502
503         template<class F>
504         struct start_traits
505         {
506             typedef decltype((*(F*)nullptr)()) source_type;
507             typedef typename source_type::value_type value_type;
508             typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
509         };
510
511         template<class F>
512         auto start(F createSource, long created, long subscribed, long unsubscribed) const
513             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
514         {
515             return start<typename start_traits<F>::value_type>(std::move(createSource), created, subscribed, unsubscribed);
516         }
517
518         template<class F>
519         auto start(F createSource, long unsubscribed) const
520             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
521         {
522             return start<typename start_traits<F>::value_type>(std::move(createSource), created_time, subscribed_time, unsubscribed);
523         }
524
525         template<class F>
526         auto start(F createSource) const
527             -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
528         {
529             return start<typename start_traits<F>::value_type>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
530         }
531
532         void start() const {
533             tester->start();
534         }
535
536         template<class T>
537         subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
538             return tester->make_subscriber<T>();
539         }
540     };
541
542     clock_type::time_point now() const {
543         return tester->now();
544     }
545
546     test_worker create_worker(composite_subscription cs = composite_subscription()) const {
547         return test_worker(cs, tester->create_test_type_worker_interface());
548     }
549
550     bool is_enabled() const {return tester->is_enabled();}
551     long clock() const {return tester->clock();}
552
553     template<class T>
554     rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
555         return tester->make_hot_observable(std::move(messages));
556     }
557
558     template<class T, size_t size>
559     auto make_hot_observable(const T (&arr) [size]) const
560         -> decltype(tester->make_hot_observable(std::vector<T>())) {
561         return      tester->make_hot_observable(rxu::to_vector(arr));
562     }
563
564     template<class T>
565     auto make_hot_observable(std::initializer_list<T> il) const
566         -> decltype(tester->make_hot_observable(std::vector<T>())) {
567         return      tester->make_hot_observable(std::vector<T>(il));
568     }
569
570     template<class T>
571     rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
572         return tester->make_cold_observable(std::move(messages));
573     }
574
575     template<class T, size_t size>
576     auto make_cold_observable(const T (&arr) [size]) const
577         -> decltype(tester->make_cold_observable(std::vector<T>())) {
578         return      tester->make_cold_observable(rxu::to_vector(arr));
579     }
580
581     template<class T>
582     auto make_cold_observable(std::initializer_list<T> il) const
583         -> decltype(tester->make_cold_observable(std::vector<T>())) {
584         return      tester->make_cold_observable(std::vector<T>(il));
585     }
586 };
587
588 template<class T>
589 //static
590 RXCPP_SELECT_ANY const typename test::messages<T>::on_next_factory test::messages<T>::next = test::messages<T>::on_next_factory();
591
592 template<class T>
593 //static
594 RXCPP_SELECT_ANY const typename test::messages<T>::on_completed_factory test::messages<T>::completed = test::messages<T>::on_completed_factory();
595
596 template<class T>
597 //static
598 RXCPP_SELECT_ANY const typename test::messages<T>::on_error_factory test::messages<T>::error = test::messages<T>::on_error_factory();
599
600 template<class T>
601 //static
602 RXCPP_SELECT_ANY const typename test::messages<T>::subscribe_factory test::messages<T>::subscribe = test::messages<T>::subscribe_factory();
603
604
605
606 inline test make_test() {
607     return test(std::make_shared<detail::test_type>());
608 }
609
610 }
611
612 }
613
614 #endif