1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
8 #include "rx-includes.hpp"
12 namespace schedulers {
14 class worker_interface;
15 class scheduler_interface;
20 typedef std::shared_ptr<action_type> action_ptr;
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
25 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
26 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
30 // It is essential to keep virtual function calls out of an inner loop.
31 // To make tail-recursion work efficiently the recursion objects create
32 // a space on the stack inside the virtual function call in the actor that
33 // allows the callback and the scheduler to share stack space that records
34 // the request and the allowance without any virtual calls in the loop.
36 /// recursed is set on a schedulable by the action to allow the called
37 /// function to request to be rescheduled.
42 explicit recursed(bool& r)
46 /// request to be rescheduled
47 inline void operator()() const {
52 /// recurse is passed to the action by the scheduler.
53 /// the action uses recurse to coordinate the scheduler and the function.
57 mutable bool isrequested;
60 explicit recurse(bool& a)
63 , requestor(isrequested)
66 /// does the scheduler allow tail-recursion now?
67 inline bool is_allowed() const {
70 /// did the function request to be recursed?
71 inline bool is_requested() const {
74 /// reset the function request. call before each call to the function.
75 inline void reset() const {
78 /// get the recursed to set into the schedulable for the function to use to request recursion
79 inline const recursed& get_recursed() const {
84 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
87 mutable bool isallowed;
95 explicit recursion(bool b)
100 /// set whether tail-recursion is allowed
101 inline void reset(bool b = true) const {
104 /// get the recurse to pass into each action being called
105 inline const recurse& get_recurse() const {
113 typedef tag_action action_tag;
118 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
119 class action : public action_base
121 typedef action this_type;
122 detail::action_ptr inner;
123 static detail::action_ptr shared_empty;
128 explicit action(detail::action_ptr i)
129 : inner(std::move(i))
133 /// return the empty action
134 inline static action empty() {
135 return action(shared_empty);
138 /// call the function
139 inline void operator()(const schedulable& s, const recurse& r) const;
142 struct scheduler_base
144 typedef std::chrono::steady_clock clock_type;
145 typedef tag_scheduler scheduler_tag;
148 struct worker_base : public subscription_base
150 typedef tag_worker worker_tag;
153 class worker_interface
154 : public std::enable_shared_from_this<worker_interface>
156 typedef worker_interface this_type;
159 typedef scheduler_base::clock_type clock_type;
161 virtual ~worker_interface() {}
163 virtual clock_type::time_point now() const = 0;
165 virtual void schedule(const schedulable& scbl) const = 0;
166 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
172 struct is_action_function
176 static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
178 static not_void check(...);
180 static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
185 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
186 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
187 /// some inner implementations will impose additional constraints on the execution of items.
188 class worker : public worker_base
190 typedef worker this_type;
191 detail::worker_interface_ptr inner;
192 composite_subscription lifetime;
193 friend bool operator==(const worker&, const worker&);
195 typedef scheduler_base::clock_type clock_type;
196 typedef composite_subscription::weak_subscription weak_subscription;
201 worker(composite_subscription cs, detail::const_worker_interface_ptr i)
202 : inner(std::const_pointer_cast<worker_interface>(i))
203 , lifetime(std::move(cs))
206 worker(composite_subscription cs, worker o)
208 , lifetime(std::move(cs))
212 inline const composite_subscription& get_subscription() const {
215 inline composite_subscription& get_subscription() {
219 // composite_subscription
221 inline bool is_subscribed() const {
222 return lifetime.is_subscribed();
224 inline weak_subscription add(subscription s) const {
225 return lifetime.add(std::move(s));
227 inline void remove(weak_subscription w) const {
228 return lifetime.remove(std::move(w));
230 inline void clear() const {
231 return lifetime.clear();
233 inline void unsubscribe() const {
234 return lifetime.unsubscribe();
239 /// return the current time for this worker
240 inline clock_type::time_point now() const {
244 /// insert the supplied schedulable to be run as soon as possible
245 inline void schedule(const schedulable& scbl) const {
246 // force rebinding scbl to this worker
247 schedule_rebind(scbl);
250 /// insert the supplied schedulable to be run at the time specified
251 inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
252 // force rebinding scbl to this worker
253 schedule_rebind(when, scbl);
259 /// insert the supplied schedulable to be run at now() + the delay specified
260 inline void schedule(clock_type::duration when, const schedulable& scbl) const {
261 // force rebinding scbl to this worker
262 schedule_rebind(now() + when, scbl);
265 /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
266 /// this will continue until the worker or schedulable is unsubscribed.
267 inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
268 // force rebinding scbl to this worker
269 schedule_periodically_rebind(initial, period, scbl);
272 /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
273 /// this will continue until the worker or schedulable is unsubscribed.
274 inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
275 // force rebinding scbl to this worker
276 schedule_periodically_rebind(now() + initial, period, scbl);
279 /// use the supplied arguments to make a schedulable and then insert it to be run
280 template<class Arg0, class... ArgN>
281 auto schedule(Arg0&& a0, ArgN&&... an) const
282 -> typename std::enable_if<
283 (detail::is_action_function<Arg0>::value ||
284 is_subscription<Arg0>::value) &&
285 !is_schedulable<Arg0>::value>::type;
286 template<class... ArgN>
287 /// use the supplied arguments to make a schedulable and then insert it to be run
288 void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
290 /// use the supplied arguments to make a schedulable and then insert it to be run
291 template<class Arg0, class... ArgN>
292 auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
293 -> typename std::enable_if<
294 (detail::is_action_function<Arg0>::value ||
295 is_subscription<Arg0>::value) &&
296 !is_schedulable<Arg0>::value>::type;
297 /// use the supplied arguments to make a schedulable and then insert it to be run
298 template<class... ArgN>
299 void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
301 /// use the supplied arguments to make a schedulable and then insert it to be run
302 template<class Arg0, class... ArgN>
303 auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
304 -> typename std::enable_if<
305 (detail::is_action_function<Arg0>::value ||
306 is_subscription<Arg0>::value) &&
307 !is_schedulable<Arg0>::value>::type;
308 /// use the supplied arguments to make a schedulable and then insert it to be run
309 template<class... ArgN>
310 void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
313 inline bool operator==(const worker& lhs, const worker& rhs) {
314 return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
316 inline bool operator!=(const worker& lhs, const worker& rhs) {
317 return !(lhs == rhs);
320 class scheduler_interface
321 : public std::enable_shared_from_this<scheduler_interface>
323 typedef scheduler_interface this_type;
326 typedef scheduler_base::clock_type clock_type;
328 virtual ~scheduler_interface() {}
330 virtual clock_type::time_point now() const = 0;
332 virtual worker create_worker(composite_subscription cs) const = 0;
336 struct schedulable_base :
337 // public subscription_base, <- already in worker base
341 typedef tag_schedulable schedulable_tag;
344 class scheduler : public scheduler_base
346 typedef scheduler this_type;
347 detail::scheduler_interface_ptr inner;
348 friend bool operator==(const scheduler&, const scheduler&);
350 typedef scheduler_base::clock_type clock_type;
355 explicit scheduler(detail::scheduler_interface_ptr i)
356 : inner(std::move(i))
359 explicit scheduler(detail::const_scheduler_interface_ptr i)
360 : inner(std::move(std::const_pointer_cast<scheduler_interface>(i)))
364 /// return the current time for this scheduler
365 inline clock_type::time_point now() const {
368 /// create a worker with a lifetime.
369 /// when the worker is unsubscribed all scheduled items will be unsubscribed.
370 /// items scheduled to a worker will be run one at a time.
371 /// scheduling order is preserved: when more than one item is scheduled for
372 /// time T then at time T they will be run in the order that they were scheduled.
373 inline worker create_worker(composite_subscription cs = composite_subscription()) const {
374 return inner->create_worker(cs);
378 template<class Scheduler, class... ArgN>
379 inline scheduler make_scheduler(ArgN&&... an) {
380 return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
384 class schedulable : public schedulable_base
386 typedef schedulable this_type;
388 composite_subscription lifetime;
392 composite_subscription::weak_subscription action_scope;
402 detacher(const this_type* that)
406 const this_type* that;
409 class recursed_scope_type
411 mutable const recursed* requestor;
413 class exit_recursed_scope_type
415 const recursed_scope_type* that;
417 ~exit_recursed_scope_type()
419 that->requestor = nullptr;
421 exit_recursed_scope_type(const recursed_scope_type* that)
427 recursed_scope_type()
431 recursed_scope_type(const recursed_scope_type&)
434 // does not aquire recursion scope
436 recursed_scope_type& operator=(const recursed_scope_type& o)
438 // no change in recursion scope
441 exit_recursed_scope_type reset(const recurse& r) const {
442 requestor = std::addressof(r.get_recursed());
443 return exit_recursed_scope_type(this);
445 bool is_recursed() const {
448 void operator()() const {
452 recursed_scope_type recursed_scope;
455 typedef composite_subscription::weak_subscription weak_subscription;
456 typedef scheduler_base::clock_type clock_type;
461 controller.remove(action_scope);
469 /// action and worker share lifetime
470 schedulable(worker q, action a)
471 : lifetime(q.get_subscription())
472 , controller(std::move(q))
473 , activity(std::move(a))
477 /// action and worker have independent lifetimes
478 schedulable(composite_subscription cs, worker q, action a)
479 : lifetime(std::move(cs))
480 , controller(std::move(q))
481 , activity(std::move(a))
483 , action_scope(controller.add(lifetime))
486 /// inherit lifetimes
487 schedulable(schedulable scbl, worker q, action a)
488 : lifetime(scbl.get_subscription())
489 , controller(std::move(q))
490 , activity(std::move(a))
491 , scoped(scbl.scoped)
492 , action_scope(scbl.scoped ? controller.add(lifetime) : weak_subscription())
496 inline const composite_subscription& get_subscription() const {
499 inline composite_subscription& get_subscription() {
502 inline const worker& get_worker() const {
505 inline worker& get_worker() {
508 inline const action& get_action() const {
511 inline action& get_action() {
515 inline static schedulable empty(worker sc) {
516 return schedulable(composite_subscription::empty(), sc, action::empty());
519 inline auto set_recursed(const recurse& r) const
520 -> decltype(recursed_scope.reset(r)) {
521 return recursed_scope.reset(r);
526 bool is_recursed() const {
527 return recursed_scope.is_recursed();
529 /// requests tail-recursion of the same action
530 /// this will exit the process if called when
531 /// is_recursed() is false.
532 /// Note: to improve perf it is not required
533 /// to call is_recursed() before calling this
534 /// operator. Context is sufficient. The schedulable
535 /// passed to the action by the scheduler will return
536 /// true from is_recursed()
537 inline void operator()() const {
541 // composite_subscription
543 inline bool is_subscribed() const {
544 return lifetime.is_subscribed();
546 inline weak_subscription add(subscription s) const {
547 return lifetime.add(std::move(s));
551 -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
552 return lifetime.add(make_subscription(std::move(f)));
554 inline void remove(weak_subscription w) const {
555 return lifetime.remove(std::move(w));
557 inline void clear() const {
558 return lifetime.clear();
560 inline void unsubscribe() const {
561 return lifetime.unsubscribe();
566 inline clock_type::time_point now() const {
567 return controller.now();
569 /// put this on the queue of the stored scheduler to run asap
570 inline void schedule() const {
571 if (is_subscribed()) {
572 controller.schedule(*this);
575 /// put this on the queue of the stored scheduler to run at the specified time
576 inline void schedule(clock_type::time_point when) const {
577 if (is_subscribed()) {
578 controller.schedule(when, *this);
581 /// put this on the queue of the stored scheduler to run after a delay from now
582 inline void schedule(clock_type::duration when) const {
583 if (is_subscribed()) {
584 controller.schedule(when, *this);
590 /// invokes the action
591 inline void operator()(const recurse& r) const {
592 if (!is_subscribed()) {
595 detacher protect(this);
597 protect.that = nullptr;
601 struct current_thread;
606 : public std::enable_shared_from_this<action_type>
608 typedef action_type this_type;
611 typedef std::function<void(const schedulable&, const recurse&)> function_type;
621 action_type(function_type f)
626 inline void operator()(const schedulable& s, const recurse& r) {
636 inline void action::operator()(const schedulable& s, const recurse& r) const {
641 RXCPP_SELECT_ANY detail::action_ptr action::shared_empty = detail::action_ptr(new detail::action_type());
644 inline action make_action_empty() {
645 return action::empty();
649 inline action make_action(F&& f) {
650 static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
651 auto fn = std::forward<F>(f);
652 return action(std::make_shared<detail::action_type>(
653 // tail-recurse inside of the virtual function call
654 // until a new action, lifetime or scheduler is returned
655 [fn](const schedulable& s, const recurse& r) {
656 trace_activity().action_enter(s);
657 auto scope = s.set_recursed(r);
658 while (s.is_subscribed()) {
661 if (!r.is_allowed() || !r.is_requested()) {
662 if (r.is_requested()) {
667 trace_activity().action_recurse(s);
669 trace_activity().action_return(s);
674 inline auto make_schedulable(
675 const schedulable& scbl)
677 return schedulable(scbl);
680 inline auto make_schedulable(
683 return schedulable(std::move(scbl));
686 inline schedulable make_schedulable(worker sc, action a) {
687 return schedulable(sc, a);
689 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
690 return schedulable(cs, sc, a);
694 auto make_schedulable(worker sc, F&& f)
695 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
696 return schedulable(sc, make_action(std::forward<F>(f)));
699 auto make_schedulable(worker sc, composite_subscription cs, F&& f)
700 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
701 return schedulable(cs, sc, make_action(std::forward<F>(f)));
704 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
705 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
706 return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
709 auto make_schedulable(schedulable scbl, worker sc, F&& f)
710 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
711 return schedulable(scbl, sc, make_action(std::forward<F>(f)));
714 auto make_schedulable(schedulable scbl, F&& f)
715 -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
716 return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
719 inline auto make_schedulable(schedulable scbl, composite_subscription cs)
721 return schedulable(cs, scbl.get_worker(), scbl.get_action());
723 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
725 return schedulable(cs, sc, scbl.get_action());
727 inline auto make_schedulable(schedulable scbl, worker sc)
729 return schedulable(scbl, sc, scbl.get_action());
732 template<class Arg0, class... ArgN>
733 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
734 -> typename std::enable_if<
735 (detail::is_action_function<Arg0>::value ||
736 is_subscription<Arg0>::value) &&
737 !is_schedulable<Arg0>::value>::type {
738 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
739 trace_activity().schedule_enter(*inner.get(), scbl);
740 inner->schedule(std::move(scbl));
741 trace_activity().schedule_return(*inner.get());
743 template<class... ArgN>
744 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
745 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
746 trace_activity().schedule_enter(*inner.get(), rescbl);
747 inner->schedule(std::move(rescbl));
748 trace_activity().schedule_return(*inner.get());
751 template<class Arg0, class... ArgN>
752 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
753 -> typename std::enable_if<
754 (detail::is_action_function<Arg0>::value ||
755 is_subscription<Arg0>::value) &&
756 !is_schedulable<Arg0>::value>::type {
757 auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
758 trace_activity().schedule_when_enter(*inner.get(), when, scbl);
759 inner->schedule(when, std::move(scbl));
760 trace_activity().schedule_when_return(*inner.get());
762 template<class... ArgN>
763 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
764 auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
765 trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
766 inner->schedule(when, std::move(rescbl));
767 trace_activity().schedule_when_return(*inner.get());
770 template<class Arg0, class... ArgN>
771 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
772 -> typename std::enable_if<
773 (detail::is_action_function<Arg0>::value ||
774 is_subscription<Arg0>::value) &&
775 !is_schedulable<Arg0>::value>::type {
776 schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
778 template<class... ArgN>
779 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
780 std::shared_ptr<clock_type::time_point> target(new clock_type::time_point(initial));
781 auto activity = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
782 auto periodic = make_schedulable(
784 [target, period, activity](schedulable self) {
785 // any recursion requests will be pushed to the scheduler queue
788 activity(r.get_recurse());
790 // schedule next occurance (if the action took longer than 'period' target will be in the past)
792 self.schedule(*target);
794 trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
795 inner->schedule(*target, periodic);
796 trace_activity().schedule_when_return(*inner.get());
801 template<class TimePoint>
802 struct time_schedulable
804 typedef TimePoint time_point_type;
806 time_schedulable(TimePoint when, schedulable a)
816 // Sorts time_schedulable items in priority order sorted
817 // on value of time_schedulable.when. Items with equal
818 // values for when are sorted in fifo order.
819 template<class TimePoint>
820 class schedulable_queue {
822 typedef time_schedulable<TimePoint> item_type;
823 typedef std::pair<item_type, int64_t> elem_type;
824 typedef std::vector<elem_type> container_type;
825 typedef const item_type& const_reference;
830 bool operator()(const elem_type& lhs, const elem_type& rhs) const {
831 if (lhs.first.when == rhs.first.when) {
832 return lhs.second > rhs.second;
835 return lhs.first.when > rhs.first.when;
840 typedef std::priority_queue<
850 const_reference top() const {
851 return queue.top().first;
859 return queue.empty();
862 void push(const item_type& value) {
863 queue.push(elem_type(value, ordinal++));
866 void push(item_type&& value) {
867 queue.push(elem_type(std::move(value), ordinal++));
874 namespace rxsc=schedulers;
878 #include "schedulers/rx-currentthread.hpp"
879 #include "schedulers/rx-newthread.hpp"
880 #include "schedulers/rx-eventloop.hpp"
881 #include "schedulers/rx-immediate.hpp"
882 #include "schedulers/rx-virtualtime.hpp"
883 #include "schedulers/rx-sameworker.hpp"