]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/rx-scheduler.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / rx-scheduler.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_HPP)
6 #define RXCPP_RX_SCHEDULER_HPP
7
8 #include "rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 class worker_interface;
15 class scheduler_interface;
16
17 namespace detail {
18
19 class action_type;
20 typedef std::shared_ptr<action_type> action_ptr;
21
22 typedef std::shared_ptr<worker_interface> worker_interface_ptr;
23 typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
24
25 typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
26 typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
27
28 }
29
30 // It is essential to keep virtual function calls out of an inner loop.
31 // To make tail-recursion work efficiently the recursion objects create
32 // a space on the stack inside the virtual function call in the actor that
33 // allows the callback and the scheduler to share stack space that records
34 // the request and the allowance without any virtual calls in the loop.
35
36 /// recursed is set on a schedulable by the action to allow the called
37 /// function to request to be rescheduled.
38 class recursed
39 {
40     bool& isrequested;
41 public:
42     explicit recursed(bool& r)
43         : isrequested(r)
44     {
45     }
46     /// request to be rescheduled
47     inline void operator()() const {
48         isrequested = true;
49     }
50 };
51
52 /// recurse is passed to the action by the scheduler.
53 /// the action uses recurse to coordinate the scheduler and the function.
54 class recurse
55 {
56     bool& isallowed;
57     mutable bool isrequested;
58     recursed requestor;
59 public:
60     explicit recurse(bool& a)
61         : isallowed(a)
62         , isrequested(true)
63         , requestor(isrequested)
64     {
65     }
66     /// does the scheduler allow tail-recursion now?
67     inline bool is_allowed() const {
68         return isallowed;
69     }
70     /// did the function request to be recursed?
71     inline bool is_requested() const {
72         return isrequested;
73     }
74     /// reset the function request. call before each call to the function.
75     inline void reset() const {
76         isrequested = false;
77     }
78     /// get the recursed to set into the schedulable for the function to use to request recursion
79     inline const recursed& get_recursed() const {
80         return requestor;
81     }
82 };
83
84 /// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
85 class recursion
86 {
87     mutable bool isallowed;
88     recurse recursor;
89 public:
90     recursion()
91         : isallowed(true)
92         , recursor(isallowed)
93     {
94     }
95     explicit recursion(bool b)
96         : isallowed(b)
97         , recursor(isallowed)
98     {
99     }
100     /// set whether tail-recursion is allowed
101     inline void reset(bool b = true) const {
102         isallowed = b;
103     }
104     /// get the recurse to pass into each action being called
105     inline const recurse& get_recurse() const {
106         return recursor;
107     }
108 };
109
110
111 struct action_base
112 {
113     typedef tag_action action_tag;
114 };
115
116 class schedulable;
117
118 /// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
119 class action : public action_base
120 {
121     typedef action this_type;
122     detail::action_ptr inner;
123     static detail::action_ptr shared_empty;
124 public:
125     action()
126     {
127     }
128     explicit action(detail::action_ptr i)
129     : inner(std::move(i))
130     {
131     }
132
133     /// return the empty action
134     inline static action empty() {
135         return action(shared_empty);
136     }
137
138     /// call the function
139     inline void operator()(const schedulable& s, const recurse& r) const;
140 };
141
142 struct scheduler_base
143 {
144     typedef std::chrono::steady_clock clock_type;
145     typedef tag_scheduler scheduler_tag;
146 };
147
148 struct worker_base : public subscription_base
149 {
150     typedef tag_worker worker_tag;
151 };
152
153 class worker_interface
154     : public std::enable_shared_from_this<worker_interface>
155 {
156     typedef worker_interface this_type;
157
158 public:
159     typedef scheduler_base::clock_type clock_type;
160
161     virtual ~worker_interface() {}
162
163     virtual clock_type::time_point now() const = 0;
164
165     virtual void schedule(const schedulable& scbl) const = 0;
166     virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
167 };
168
169 namespace detail {
170
171 template<class F>
172 struct is_action_function
173 {
174     struct not_void {};
175     template<class CF>
176     static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
177     template<class CF>
178     static not_void check(...);
179
180     static const bool value = std::is_same<decltype(check<typename std::decay<F>::type>(0)), void>::value;
181 };
182
183 }
184
185 /// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
186 /// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
187 /// some inner implementations will impose additional constraints on the execution of items.
188 class worker : public worker_base
189 {
190     typedef worker this_type;
191     detail::worker_interface_ptr inner;
192     composite_subscription lifetime;
193     friend bool operator==(const worker&, const worker&);
194 public:
195     typedef scheduler_base::clock_type clock_type;
196     typedef composite_subscription::weak_subscription weak_subscription;
197
198     worker()
199     {
200     }
201     worker(composite_subscription cs, detail::const_worker_interface_ptr i)
202         : inner(std::const_pointer_cast<worker_interface>(i))
203         , lifetime(std::move(cs))
204     {
205     }
206     worker(composite_subscription cs, worker o)
207         : inner(o.inner)
208         , lifetime(std::move(cs))
209     {
210     }
211
212     inline const composite_subscription& get_subscription() const {
213         return lifetime;
214     }
215     inline composite_subscription& get_subscription() {
216         return lifetime;
217     }
218
219     // composite_subscription
220     //
221     inline bool is_subscribed() const {
222         return lifetime.is_subscribed();
223     }
224     inline weak_subscription add(subscription s) const {
225         return lifetime.add(std::move(s));
226     }
227     inline void remove(weak_subscription w) const {
228         return lifetime.remove(std::move(w));
229     }
230     inline void clear() const {
231         return lifetime.clear();
232     }
233     inline void unsubscribe() const {
234         return lifetime.unsubscribe();
235     }
236
237     // worker_interface
238     //
239     /// return the current time for this worker
240     inline clock_type::time_point now() const {
241         return inner->now();
242     }
243
244     /// insert the supplied schedulable to be run as soon as possible
245     inline void schedule(const schedulable& scbl) const {
246         // force rebinding scbl to this worker
247         schedule_rebind(scbl);
248     }
249
250     /// insert the supplied schedulable to be run at the time specified
251     inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
252         // force rebinding scbl to this worker
253         schedule_rebind(when, scbl);
254     }
255
256     // helpers
257     //
258
259     /// insert the supplied schedulable to be run at now() + the delay specified
260     inline void schedule(clock_type::duration when, const schedulable& scbl) const {
261         // force rebinding scbl to this worker
262         schedule_rebind(now() + when, scbl);
263     }
264
265     /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
266     /// this will continue until the worker or schedulable is unsubscribed.
267     inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
268         // force rebinding scbl to this worker
269         schedule_periodically_rebind(initial, period, scbl);
270     }
271
272     /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
273     /// this will continue until the worker or schedulable is unsubscribed.
274     inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
275         // force rebinding scbl to this worker
276         schedule_periodically_rebind(now() + initial, period, scbl);
277     }
278
279     /// use the supplied arguments to make a schedulable and then insert it to be run
280     template<class Arg0, class... ArgN>
281     auto schedule(Arg0&& a0, ArgN&&... an) const
282         -> typename std::enable_if<
283             (detail::is_action_function<Arg0>::value ||
284             is_subscription<Arg0>::value) &&
285             !is_schedulable<Arg0>::value>::type;
286     template<class... ArgN>
287     /// use the supplied arguments to make a schedulable and then insert it to be run
288     void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
289
290     /// use the supplied arguments to make a schedulable and then insert it to be run
291     template<class Arg0, class... ArgN>
292     auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
293         -> typename std::enable_if<
294             (detail::is_action_function<Arg0>::value ||
295             is_subscription<Arg0>::value) &&
296             !is_schedulable<Arg0>::value>::type;
297     /// use the supplied arguments to make a schedulable and then insert it to be run
298     template<class... ArgN>
299     void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
300
301     /// use the supplied arguments to make a schedulable and then insert it to be run
302     template<class Arg0, class... ArgN>
303     auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
304         -> typename std::enable_if<
305             (detail::is_action_function<Arg0>::value ||
306             is_subscription<Arg0>::value) &&
307             !is_schedulable<Arg0>::value>::type;
308     /// use the supplied arguments to make a schedulable and then insert it to be run
309     template<class... ArgN>
310     void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
311 };
312
313 inline bool operator==(const worker& lhs, const worker& rhs) {
314     return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
315 }
316 inline bool operator!=(const worker& lhs, const worker& rhs) {
317     return !(lhs == rhs);
318 }
319
320 class scheduler_interface
321     : public std::enable_shared_from_this<scheduler_interface>
322 {
323     typedef scheduler_interface this_type;
324
325 public:
326     typedef scheduler_base::clock_type clock_type;
327
328     virtual ~scheduler_interface() {}
329
330     virtual clock_type::time_point now() const = 0;
331
332     virtual worker create_worker(composite_subscription cs) const = 0;
333 };
334
335
336 struct schedulable_base :
337     // public subscription_base, <- already in worker base
338     public worker_base,
339     public action_base
340 {
341     typedef tag_schedulable schedulable_tag;
342 };
343
344 class scheduler : public scheduler_base
345 {
346     typedef scheduler this_type;
347     detail::scheduler_interface_ptr inner;
348     friend bool operator==(const scheduler&, const scheduler&);
349 public:
350     typedef scheduler_base::clock_type clock_type;
351
352     scheduler()
353     {
354     }
355     explicit scheduler(detail::scheduler_interface_ptr i)
356         : inner(std::move(i))
357     {
358     }
359     explicit scheduler(detail::const_scheduler_interface_ptr i)
360     : inner(std::move(std::const_pointer_cast<scheduler_interface>(i)))
361     {
362     }
363
364     /// return the current time for this scheduler
365     inline clock_type::time_point now() const {
366         return inner->now();
367     }
368     /// create a worker with a lifetime.
369     /// when the worker is unsubscribed all scheduled items will be unsubscribed.
370     /// items scheduled to a worker will be run one at a time.
371     /// scheduling order is preserved: when more than one item is scheduled for
372     /// time T then at time T they will be run in the order that they were scheduled.
373     inline worker create_worker(composite_subscription cs = composite_subscription()) const {
374         return inner->create_worker(cs);
375     }
376 };
377
378 template<class Scheduler, class... ArgN>
379 inline scheduler make_scheduler(ArgN&&... an) {
380     return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
381 }
382
383
384 class schedulable : public schedulable_base
385 {
386     typedef schedulable this_type;
387
388     composite_subscription lifetime;
389     worker controller;
390     action activity;
391     bool scoped;
392     composite_subscription::weak_subscription action_scope;
393
394     struct detacher
395     {
396         ~detacher()
397         {
398             if (that) {
399                 that->unsubscribe();
400             }
401         }
402         detacher(const this_type* that)
403             : that(that)
404         {
405         }
406         const this_type* that;
407     };
408
409     class recursed_scope_type
410     {
411         mutable const recursed* requestor;
412
413         class exit_recursed_scope_type
414         {
415             const recursed_scope_type* that;
416         public:
417             ~exit_recursed_scope_type()
418             {
419                     that->requestor = nullptr;
420             }
421             exit_recursed_scope_type(const recursed_scope_type* that)
422                 : that(that)
423             {
424             }
425         };
426     public:
427         recursed_scope_type()
428             : requestor(nullptr)
429         {
430         }
431         recursed_scope_type(const recursed_scope_type&)
432             : requestor(nullptr)
433         {
434             // does not aquire recursion scope
435         }
436         recursed_scope_type& operator=(const recursed_scope_type& o)
437         {
438             // no change in recursion scope
439             return *this;
440         }
441         exit_recursed_scope_type reset(const recurse& r) const {
442             requestor = std::addressof(r.get_recursed());
443             return exit_recursed_scope_type(this);
444         }
445         bool is_recursed() const {
446             return !!requestor;
447         }
448         void operator()() const {
449             (*requestor)();
450         }
451     };
452     recursed_scope_type recursed_scope;
453
454 public:
455     typedef composite_subscription::weak_subscription weak_subscription;
456     typedef scheduler_base::clock_type clock_type;
457
458     ~schedulable()
459     {
460         if (scoped) {
461             controller.remove(action_scope);
462         }
463     }
464     schedulable()
465         : scoped(false)
466     {
467     }
468
469     /// action and worker share lifetime
470     schedulable(worker q, action a)
471         : lifetime(q.get_subscription())
472         , controller(std::move(q))
473         , activity(std::move(a))
474         , scoped(false)
475     {
476     }
477     /// action and worker have independent lifetimes
478     schedulable(composite_subscription cs, worker q, action a)
479         : lifetime(std::move(cs))
480         , controller(std::move(q))
481         , activity(std::move(a))
482         , scoped(true)
483         , action_scope(controller.add(lifetime))
484     {
485     }
486     /// inherit lifetimes
487     schedulable(schedulable scbl, worker q, action a)
488         : lifetime(scbl.get_subscription())
489         , controller(std::move(q))
490         , activity(std::move(a))
491         , scoped(scbl.scoped)
492         , action_scope(scbl.scoped ? controller.add(lifetime) : weak_subscription())
493     {
494     }
495
496     inline const composite_subscription& get_subscription() const {
497         return lifetime;
498     }
499     inline composite_subscription& get_subscription() {
500         return lifetime;
501     }
502     inline const worker& get_worker() const {
503         return controller;
504     }
505     inline worker& get_worker() {
506         return controller;
507     }
508     inline const action& get_action() const {
509         return activity;
510     }
511     inline action& get_action() {
512         return activity;
513     }
514
515     inline static schedulable empty(worker sc) {
516         return schedulable(composite_subscription::empty(), sc, action::empty());
517     }
518
519     inline auto set_recursed(const recurse& r) const
520         -> decltype(recursed_scope.reset(r)) {
521         return      recursed_scope.reset(r);
522     }
523
524     // recursed
525     //
526     bool is_recursed() const {
527         return recursed_scope.is_recursed();
528     }
529     /// requests tail-recursion of the same action
530     /// this will exit the process if called when
531     /// is_recursed() is false.
532     /// Note: to improve perf it is not required
533     /// to call is_recursed() before calling this
534     /// operator. Context is sufficient. The schedulable
535     /// passed to the action by the scheduler will return
536     /// true from is_recursed()
537     inline void operator()() const {
538         recursed_scope();
539     }
540
541     // composite_subscription
542     //
543     inline bool is_subscribed() const {
544         return lifetime.is_subscribed();
545     }
546     inline weak_subscription add(subscription s) const {
547         return lifetime.add(std::move(s));
548     }
549     template<class F>
550     auto add(F f) const
551     -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
552         return lifetime.add(make_subscription(std::move(f)));
553     }
554     inline void remove(weak_subscription w) const {
555         return lifetime.remove(std::move(w));
556     }
557     inline void clear() const {
558         return lifetime.clear();
559     }
560     inline void unsubscribe() const {
561         return lifetime.unsubscribe();
562     }
563
564     // scheduler
565     //
566     inline clock_type::time_point now() const {
567         return controller.now();
568     }
569     /// put this on the queue of the stored scheduler to run asap
570     inline void schedule() const {
571         if (is_subscribed()) {
572             controller.schedule(*this);
573         }
574     }
575     /// put this on the queue of the stored scheduler to run at the specified time
576     inline void schedule(clock_type::time_point when) const {
577         if (is_subscribed()) {
578             controller.schedule(when, *this);
579         }
580     }
581     /// put this on the queue of the stored scheduler to run after a delay from now
582     inline void schedule(clock_type::duration when) const {
583         if (is_subscribed()) {
584             controller.schedule(when, *this);
585         }
586     }
587
588     // action
589     //
590     /// invokes the action
591     inline void operator()(const recurse& r) const {
592         if (!is_subscribed()) {
593             return;
594         }
595         detacher protect(this);
596         activity(*this, r);
597         protect.that = nullptr;
598     }
599 };
600
601 struct current_thread;
602
603 namespace detail {
604
605 class action_type
606     : public std::enable_shared_from_this<action_type>
607 {
608     typedef action_type this_type;
609
610 public:
611     typedef std::function<void(const schedulable&, const recurse&)> function_type;
612
613 private:
614     function_type f;
615
616 public:
617     action_type()
618     {
619     }
620
621     action_type(function_type f)
622         : f(std::move(f))
623     {
624     }
625
626     inline void operator()(const schedulable& s, const recurse& r) {
627         if (!f) {
628             abort();
629         }
630         f(s, r);
631     }
632 };
633
634 }
635
636 inline void action::operator()(const schedulable& s, const recurse& r) const {
637     (*inner)(s, r);
638 }
639
640 //static
641 RXCPP_SELECT_ANY detail::action_ptr action::shared_empty = detail::action_ptr(new detail::action_type());
642
643
644 inline action make_action_empty() {
645     return action::empty();
646 }
647
648 template<class F>
649 inline action make_action(F&& f) {
650     static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
651     auto fn = std::forward<F>(f);
652     return action(std::make_shared<detail::action_type>(
653         // tail-recurse inside of the virtual function call
654         // until a new action, lifetime or scheduler is returned
655         [fn](const schedulable& s, const recurse& r) {
656             trace_activity().action_enter(s);
657             auto scope = s.set_recursed(r);
658             while (s.is_subscribed()) {
659                 r.reset();
660                 fn(s);
661                 if (!r.is_allowed() || !r.is_requested()) {
662                     if (r.is_requested()) {
663                         s.schedule();
664                     }
665                     break;
666                 }
667                 trace_activity().action_recurse(s);
668             }
669             trace_activity().action_return(s);
670         }));
671 }
672
673 // copy
674 inline auto make_schedulable(
675     const   schedulable& scbl)
676     ->      schedulable {
677     return  schedulable(scbl);
678 }
679 // move
680 inline auto make_schedulable(
681             schedulable&& scbl)
682     ->      schedulable {
683     return  schedulable(std::move(scbl));
684 }
685
686 inline schedulable make_schedulable(worker sc, action a) {
687     return schedulable(sc, a);
688 }
689 inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
690     return schedulable(cs, sc, a);
691 }
692
693 template<class F>
694 auto make_schedulable(worker sc, F&& f)
695     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
696     return schedulable(sc, make_action(std::forward<F>(f)));
697 }
698 template<class F>
699 auto make_schedulable(worker sc, composite_subscription cs, F&& f)
700     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
701     return schedulable(cs, sc, make_action(std::forward<F>(f)));
702 }
703 template<class F>
704 auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
705     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
706     return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
707 }
708 template<class F>
709 auto make_schedulable(schedulable scbl, worker sc, F&& f)
710     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
711     return schedulable(scbl, sc, make_action(std::forward<F>(f)));
712 }
713 template<class F>
714 auto make_schedulable(schedulable scbl, F&& f)
715     -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
716     return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
717 }
718
719 inline auto make_schedulable(schedulable scbl, composite_subscription cs)
720     -> schedulable {
721     return schedulable(cs, scbl.get_worker(), scbl.get_action());
722 }
723 inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
724     -> schedulable {
725     return schedulable(cs, sc, scbl.get_action());
726 }
727 inline auto make_schedulable(schedulable scbl, worker sc)
728     -> schedulable {
729     return schedulable(scbl, sc, scbl.get_action());
730 }
731
732 template<class Arg0, class... ArgN>
733 auto worker::schedule(Arg0&& a0, ArgN&&... an) const
734     -> typename std::enable_if<
735         (detail::is_action_function<Arg0>::value ||
736         is_subscription<Arg0>::value) &&
737         !is_schedulable<Arg0>::value>::type {
738     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
739     trace_activity().schedule_enter(*inner.get(), scbl);
740     inner->schedule(std::move(scbl));
741     trace_activity().schedule_return(*inner.get());
742 }
743 template<class... ArgN>
744 void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
745     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
746     trace_activity().schedule_enter(*inner.get(), rescbl);
747     inner->schedule(std::move(rescbl));
748     trace_activity().schedule_return(*inner.get());
749 }
750
751 template<class Arg0, class... ArgN>
752 auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
753     -> typename std::enable_if<
754         (detail::is_action_function<Arg0>::value ||
755         is_subscription<Arg0>::value) &&
756         !is_schedulable<Arg0>::value>::type {
757     auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
758     trace_activity().schedule_when_enter(*inner.get(), when, scbl);
759     inner->schedule(when, std::move(scbl));
760     trace_activity().schedule_when_return(*inner.get());
761 }
762 template<class... ArgN>
763 void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
764     auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
765     trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
766     inner->schedule(when, std::move(rescbl));
767     trace_activity().schedule_when_return(*inner.get());
768 }
769
770 template<class Arg0, class... ArgN>
771 auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
772     -> typename std::enable_if<
773         (detail::is_action_function<Arg0>::value ||
774         is_subscription<Arg0>::value) &&
775         !is_schedulable<Arg0>::value>::type {
776     schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
777 }
778 template<class... ArgN>
779 void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
780     std::shared_ptr<clock_type::time_point> target(new clock_type::time_point(initial));
781     auto activity = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
782     auto periodic = make_schedulable(
783         activity,
784         [target, period, activity](schedulable self) {
785             // any recursion requests will be pushed to the scheduler queue
786             recursion r(false);
787             // call action
788             activity(r.get_recurse());
789
790             // schedule next occurance (if the action took longer than 'period' target will be in the past)
791             *target += period;
792             self.schedule(*target);
793         });
794     trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
795     inner->schedule(*target, periodic);
796     trace_activity().schedule_when_return(*inner.get());
797 }
798
799 namespace detail {
800
801 template<class TimePoint>
802 struct time_schedulable
803 {
804     typedef TimePoint time_point_type;
805
806     time_schedulable(TimePoint when, schedulable a)
807         : when(when)
808         , what(std::move(a))
809     {
810     }
811     TimePoint when;
812     schedulable what;
813 };
814
815
816 // Sorts time_schedulable items in priority order sorted
817 // on value of time_schedulable.when. Items with equal
818 // values for when are sorted in fifo order.
819 template<class TimePoint>
820 class schedulable_queue {
821 public:
822     typedef time_schedulable<TimePoint> item_type;
823     typedef std::pair<item_type, int64_t> elem_type;
824     typedef std::vector<elem_type> container_type;
825     typedef const item_type& const_reference;
826
827 private:
828     struct compare_elem
829     {
830         bool operator()(const elem_type& lhs, const elem_type& rhs) const {
831             if (lhs.first.when == rhs.first.when) {
832                 return lhs.second > rhs.second;
833             }
834             else {
835                 return lhs.first.when > rhs.first.when;
836             }
837         }
838     };
839
840     typedef std::priority_queue<
841         elem_type,
842         container_type,
843         compare_elem
844     > queue_type;
845
846     queue_type queue;
847
848     int64_t ordinal;
849 public:
850     const_reference top() const {
851         return queue.top().first;
852     }
853
854     void pop() {
855         queue.pop();
856     }
857
858     bool empty() const {
859         return queue.empty();
860     }
861
862     void push(const item_type& value) {
863         queue.push(elem_type(value, ordinal++));
864     }
865
866     void push(item_type&& value) {
867         queue.push(elem_type(std::move(value), ordinal++));
868     }
869 };
870
871 }
872
873 }
874 namespace rxsc=schedulers;
875
876 }
877
878 #include "schedulers/rx-currentthread.hpp"
879 #include "schedulers/rx-newthread.hpp"
880 #include "schedulers/rx-eventloop.hpp"
881 #include "schedulers/rx-immediate.hpp"
882 #include "schedulers/rx-virtualtime.hpp"
883 #include "schedulers/rx-sameworker.hpp"
884
885 #endif