1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
8 #include "../rx-includes.hpp"
12 namespace schedulers {
14 struct event_loop : public scheduler_interface
17 typedef event_loop this_type;
18 event_loop(const this_type&);
20 struct loop_worker : public worker_interface
23 typedef loop_worker this_type;
24 loop_worker(const this_type&);
26 typedef detail::schedulable_queue<
27 typename clock_type::time_point> queue_item_time;
29 typedef queue_item_time::item_type item_type;
31 composite_subscription lifetime;
35 virtual ~loop_worker()
38 loop_worker(composite_subscription cs, worker w)
44 virtual clock_type::time_point now() const {
45 return clock_type::now();
48 virtual void schedule(const schedulable& scbl) const {
49 controller.schedule(lifetime, scbl.get_action());
52 virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
53 controller.schedule(when, lifetime, scbl.get_action());
57 mutable thread_factory factory;
59 mutable std::atomic<size_t> count;
60 std::vector<worker> loops;
64 : factory([](std::function<void()> start){
65 return std::thread(std::move(start));
67 , newthread(make_new_thread())
70 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
72 loops.push_back(newthread.create_worker());
75 explicit event_loop(thread_factory tf)
77 , newthread(make_new_thread(tf))
80 auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
82 loops.push_back(newthread.create_worker());
89 virtual clock_type::time_point now() const {
90 return clock_type::now();
93 virtual worker create_worker(composite_subscription cs) const {
94 return worker(cs, std::shared_ptr<loop_worker>(new loop_worker(cs, loops[++count % loops.size()])));
98 inline scheduler make_event_loop() {
99 static auto loop = make_scheduler<event_loop>();
102 inline scheduler make_event_loop(thread_factory tf) {
103 return make_scheduler<event_loop>(tf);