1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_RX_SCHEDULER_TEST_HPP)
6 #define RXCPP_RX_SCHEDULER_TEST_HPP
8 #include "../rx-includes.hpp"
12 namespace schedulers {
16 class test_type : public scheduler_interface
20 typedef scheduler_interface::clock_type clock_type;
22 struct test_type_state : public virtual_time<long, long>
24 typedef virtual_time<long, long> base;
26 using base::schedule_absolute;
27 using base::schedule_relative;
29 clock_type::time_point now() const {
30 return to_time_point(clock_now);
33 virtual void schedule_absolute(long when, const schedulable& a) const
35 if (when <= base::clock_now)
36 when = base::clock_now + 1;
38 return base::schedule_absolute(when, a);
41 virtual long add(long absolute, long relative) const
43 return absolute + relative;
46 virtual clock_type::time_point to_time_point(long absolute) const
48 return clock_type::time_point(std::chrono::milliseconds(absolute));
51 virtual long to_relative(clock_type::duration d) const
53 return static_cast<long>(std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
58 mutable std::shared_ptr<test_type_state> state;
61 struct test_type_worker : public worker_interface
63 mutable std::shared_ptr<test_type_state> state;
65 typedef test_type_state::absolute absolute;
66 typedef test_type_state::relative relative;
68 test_type_worker(std::shared_ptr<test_type_state> st)
69 : state(std::move(st))
73 virtual clock_type::time_point now() const {
77 virtual void schedule(const schedulable& scbl) const {
78 state->schedule_absolute(state->clock(), scbl);
81 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
82 state->schedule_relative(state->to_relative(when - now()), scbl);
85 void schedule_absolute(absolute when, const schedulable& scbl) const {
86 state->schedule_absolute(when, scbl);
89 void schedule_relative(relative when, const schedulable& scbl) const {
90 state->schedule_relative(when, scbl);
93 bool is_enabled() const {return state->is_enabled();}
94 absolute clock() const {return state->clock();}
106 void advance_to(absolute time) const
108 state->advance_to(time);
111 void advance_by(relative time) const
113 state->advance_by(time);
116 void sleep(relative time) const
122 subscriber<T, rxt::testable_observer<T>> make_subscriber() const;
127 : state(new test_type_state())
131 virtual clock_type::time_point now() const {
135 virtual worker create_worker(composite_subscription cs) const {
136 std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
137 return worker(cs, wi);
140 bool is_enabled() const {return state->is_enabled();}
142 return state->clock();
145 std::shared_ptr<test_type_worker> create_test_type_worker_interface() const {
146 std::shared_ptr<test_type_worker> wi(new test_type_worker(state));
151 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
154 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const;
159 : public rxt::detail::test_subject_base<T>
161 typedef typename rxn::notification<T> notification_type;
162 typedef rxn::recorded<typename notification_type::type> recorded_type;
165 explicit mock_observer(std::shared_ptr<test_type::test_type_state> sc)
170 std::shared_ptr<test_type::test_type_state> sc;
171 std::vector<recorded_type> m;
173 virtual void on_subscribe(subscriber<T>) const {
176 virtual std::vector<rxn::subscription> subscriptions() const {
177 abort(); return std::vector<rxn::subscription>();
180 virtual std::vector<recorded_type> messages() const {
186 subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subscriber() const
188 typedef typename rxn::notification<T> notification_type;
189 typedef rxn::recorded<typename notification_type::type> recorded_type;
191 std::shared_ptr<mock_observer<T>> ts(new mock_observer<T>(state));
193 return rxcpp::make_subscriber<T>(rxt::testable_observer<T>(ts, make_observer_dynamic<T>(
198 recorded_type(ts->sc->clock(), notification_type::on_next(value)));
201 [ts](std::exception_ptr e)
204 recorded_type(ts->sc->clock(), notification_type::on_error(e)));
210 recorded_type(ts->sc->clock(), notification_type::on_completed()));
215 class cold_observable
216 : public rxt::detail::test_subject_base<T>
218 typedef cold_observable<T> this_type;
219 std::shared_ptr<test_type::test_type_state> sc;
220 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
221 mutable std::vector<recorded_type> mv;
222 mutable std::vector<rxn::subscription> sv;
223 mutable worker controller;
227 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
234 template<class Iterator>
235 cold_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, Iterator begin, Iterator end)
242 virtual void on_subscribe(subscriber<T> o) const {
243 sv.push_back(rxn::subscription(sc->clock()));
244 auto index = sv.size() - 1;
246 for (auto& message : mv) {
247 auto n = message.value();
248 sc->schedule_relative(message.time(), make_schedulable(
250 [n, o](const schedulable& scbl) {
251 if (o.is_subscribed()) {
257 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
258 o.add([sharedThis, index]() {
259 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
263 virtual std::vector<rxn::subscription> subscriptions() const {
267 virtual std::vector<recorded_type> messages() const {
273 rxt::testable_observable<T> test_type::make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
275 auto co = std::shared_ptr<cold_observable<T>>(new cold_observable<T>(state, create_worker(composite_subscription()), std::move(messages)));
276 return rxt::testable_observable<T>(co);
281 : public rxt::detail::test_subject_base<T>
283 typedef hot_observable<T> this_type;
284 std::shared_ptr<test_type::test_type_state> sc;
285 typedef rxn::recorded<typename rxn::notification<T>::type> recorded_type;
286 typedef subscriber<T> observer_type;
287 mutable std::vector<recorded_type> mv;
288 mutable std::vector<rxn::subscription> sv;
289 mutable std::vector<observer_type> observers;
290 mutable worker controller;
294 hot_observable(std::shared_ptr<test_type::test_type_state> sc, worker w, std::vector<recorded_type> mv)
299 for (auto& message : mv) {
300 auto n = message.value();
301 sc->schedule_absolute(message.time(), make_schedulable(
303 [this, n](const schedulable& scbl) {
304 auto local = this->observers;
305 for (auto& o : local) {
306 if (o.is_subscribed()) {
314 virtual void on_subscribe(observer_type o) const {
315 observers.push_back(o);
316 sv.push_back(rxn::subscription(sc->clock()));
317 auto index = sv.size() - 1;
319 auto sharedThis = std::static_pointer_cast<const this_type>(this->shared_from_this());
320 o.add([sharedThis, index]() {
321 sharedThis->sv[index] = rxn::subscription(sharedThis->sv[index].subscribe(), sharedThis->sc->clock());
325 virtual std::vector<rxn::subscription> subscriptions() const {
329 virtual std::vector<recorded_type> messages() const {
335 rxt::testable_observable<T> test_type::make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const
337 return rxt::testable_observable<T>(
338 std::make_shared<hot_observable<T>>(state, create_worker(composite_subscription()), std::move(messages)));
342 struct is_create_source_function
346 static auto check(int) -> decltype((*(CF*)nullptr)());
348 static not_void check(...);
350 static const bool value = is_observable<decltype(check<typename std::decay<F>::type>(0))>::value;
355 class test : public scheduler
357 std::shared_ptr<detail::test_type> tester;
360 explicit test(std::shared_ptr<detail::test_type> t)
361 : scheduler(std::static_pointer_cast<scheduler_interface>(t))
366 typedef detail::test_type::clock_type clock_type;
368 static const long created_time = 100;
369 static const long subscribed_time = 200;
370 static const long unsubscribed_time = 1000;
375 typedef typename rxn::notification<T> notification_type;
376 typedef rxn::recorded<typename notification_type::type> recorded_type;
377 typedef rxn::subscription subscription_type;
381 struct on_next_factory
383 recorded_type operator()(long ticks, T value) const {
384 return recorded_type(ticks, notification_type::on_next(value));
387 struct on_completed_factory
389 recorded_type operator()(long ticks) const {
390 return recorded_type(ticks, notification_type::on_completed());
393 struct on_error_factory
395 template<class Exception>
396 recorded_type operator()(long ticks, Exception&& e) const {
397 return recorded_type(ticks, notification_type::on_error(std::forward<Exception>(e)));
401 static const on_next_factory next;
402 static const on_completed_factory completed;
403 static const on_error_factory error;
405 struct subscribe_factory
407 rxn::subscription operator()(long subscribe, long unsubscribe) const {
408 return rxn::subscription(subscribe, unsubscribe);
411 static const subscribe_factory subscribe;
414 class test_worker : public worker
416 std::shared_ptr<detail::test_type::test_type_worker> tester;
419 explicit test_worker(composite_subscription cs, std::shared_ptr<detail::test_type::test_type_worker> t)
420 : worker(cs, std::static_pointer_cast<worker_interface>(t))
425 bool is_enabled() const {return tester->is_enabled();}
426 long clock() const {return tester->clock();}
428 void schedule_absolute(long when, const schedulable& a) const {
429 tester->schedule_absolute(when, a);
432 void schedule_relative(long when, const schedulable& a) const {
433 tester->schedule_relative(when, a);
436 template<class Arg0, class... ArgN>
437 auto schedule_absolute(long when, Arg0&& a0, ArgN&&... an) const
438 -> typename std::enable_if<
439 (detail::is_action_function<Arg0>::value ||
440 is_subscription<Arg0>::value) &&
441 !is_schedulable<Arg0>::value>::type {
442 tester->schedule_absolute(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
445 template<class Arg0, class... ArgN>
446 auto schedule_relative(long when, Arg0&& a0, ArgN&&... an) const
447 -> typename std::enable_if<
448 (detail::is_action_function<Arg0>::value ||
449 is_subscription<Arg0>::value) &&
450 !is_schedulable<Arg0>::value>::type {
451 tester->schedule_relative(when, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
454 template<class T, class F>
455 auto start(F createSource, long created, long subscribed, long unsubscribed) const
456 -> subscriber<T, rxt::testable_observer<T>>
459 : public std::enable_shared_from_this<state_type>
461 typedef decltype(createSource()) source_type;
463 std::unique_ptr<source_type> source;
464 subscriber<T, rxt::testable_observer<T>> o;
466 explicit state_type(subscriber<T, rxt::testable_observer<T>> o)
472 std::shared_ptr<state_type> state(new state_type(this->make_subscriber<T>()));
474 schedule_absolute(created, [createSource, state](const schedulable& scbl) {
475 state->source.reset(new typename state_type::source_type(createSource()));
477 schedule_absolute(subscribed, [state](const schedulable& scbl) {
478 state->source->subscribe(state->o);
480 schedule_absolute(unsubscribed, [state](const schedulable& scbl) {
481 state->o.unsubscribe();
489 template<class T, class F>
490 auto start(F&& createSource, long unsubscribed) const
491 -> subscriber<T, rxt::testable_observer<T>>
493 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed);
496 template<class T, class F>
497 auto start(F&& createSource) const
498 -> subscriber<T, rxt::testable_observer<T>>
500 return start<T>(std::forward<F>(createSource), created_time, subscribed_time, unsubscribed_time);
506 typedef decltype((*(F*)nullptr)()) source_type;
507 typedef typename source_type::value_type value_type;
508 typedef subscriber<value_type, rxt::testable_observer<value_type>> subscriber_type;
512 auto start(F createSource, long created, long subscribed, long unsubscribed) const
513 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
515 return start<typename start_traits<F>::value_type>(std::move(createSource), created, subscribed, unsubscribed);
519 auto start(F createSource, long unsubscribed) const
520 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
522 return start<typename start_traits<F>::value_type>(std::move(createSource), created_time, subscribed_time, unsubscribed);
526 auto start(F createSource) const
527 -> typename std::enable_if<detail::is_create_source_function<F>::value, start_traits<F>>::type::subscriber_type
529 return start<typename start_traits<F>::value_type>(std::move(createSource), created_time, subscribed_time, unsubscribed_time);
537 subscriber<T, rxt::testable_observer<T>> make_subscriber() const {
538 return tester->make_subscriber<T>();
542 clock_type::time_point now() const {
543 return tester->now();
546 test_worker create_worker(composite_subscription cs = composite_subscription()) const {
547 return test_worker(cs, tester->create_test_type_worker_interface());
550 bool is_enabled() const {return tester->is_enabled();}
551 long clock() const {return tester->clock();}
554 rxt::testable_observable<T> make_hot_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const{
555 return tester->make_hot_observable(std::move(messages));
558 template<class T, size_t size>
559 auto make_hot_observable(const T (&arr) [size]) const
560 -> decltype(tester->make_hot_observable(std::vector<T>())) {
561 return tester->make_hot_observable(rxu::to_vector(arr));
565 auto make_hot_observable(std::initializer_list<T> il) const
566 -> decltype(tester->make_hot_observable(std::vector<T>())) {
567 return tester->make_hot_observable(std::vector<T>(il));
571 rxt::testable_observable<T> make_cold_observable(std::vector<rxn::recorded<std::shared_ptr<rxn::detail::notification_base<T>>>> messages) const {
572 return tester->make_cold_observable(std::move(messages));
575 template<class T, size_t size>
576 auto make_cold_observable(const T (&arr) [size]) const
577 -> decltype(tester->make_cold_observable(std::vector<T>())) {
578 return tester->make_cold_observable(rxu::to_vector(arr));
582 auto make_cold_observable(std::initializer_list<T> il) const
583 -> decltype(tester->make_cold_observable(std::vector<T>())) {
584 return tester->make_cold_observable(std::vector<T>(il));
590 RXCPP_SELECT_ANY const typename test::messages<T>::on_next_factory test::messages<T>::next = test::messages<T>::on_next_factory();
594 RXCPP_SELECT_ANY const typename test::messages<T>::on_completed_factory test::messages<T>::completed = test::messages<T>::on_completed_factory();
598 RXCPP_SELECT_ANY const typename test::messages<T>::on_error_factory test::messages<T>::error = test::messages<T>::on_error_factory();
602 RXCPP_SELECT_ANY const typename test::messages<T>::subscribe_factory test::messages<T>::subscribe = test::messages<T>::subscribe_factory();
606 inline test make_test() {
607 return test(std::make_shared<detail::test_type>());