]> git.sesse.net Git - casparcg/blob - dependencies64/RxCpp/include/rxcpp/schedulers/rx-eventloop.hpp
* Added RxCpp library for LINQ api, replacing Boost.Range based iterations where...
[casparcg] / dependencies64 / RxCpp / include / rxcpp / schedulers / rx-eventloop.hpp
1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2
3 #pragma once
4
5 #if !defined(RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP)
6 #define RXCPP_RX_SCHEDULER_EVENT_LOOP_HPP
7
8 #include "../rx-includes.hpp"
9
10 namespace rxcpp {
11
12 namespace schedulers {
13
14 struct event_loop : public scheduler_interface
15 {
16 private:
17     typedef event_loop this_type;
18     event_loop(const this_type&);
19
20     struct loop_worker : public worker_interface
21     {
22     private:
23         typedef loop_worker this_type;
24         loop_worker(const this_type&);
25
26         typedef detail::schedulable_queue<
27             typename clock_type::time_point> queue_item_time;
28
29         typedef queue_item_time::item_type item_type;
30
31         composite_subscription lifetime;
32         worker controller;
33
34     public:
35         virtual ~loop_worker()
36         {
37         }
38         loop_worker(composite_subscription cs, worker w)
39             : lifetime(cs)
40             , controller(w)
41         {
42         }
43
44         virtual clock_type::time_point now() const {
45             return clock_type::now();
46         }
47
48         virtual void schedule(const schedulable& scbl) const {
49             controller.schedule(lifetime, scbl.get_action());
50         }
51
52         virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
53             controller.schedule(when, lifetime, scbl.get_action());
54         }
55     };
56
57     mutable thread_factory factory;
58     scheduler newthread;
59     mutable std::atomic<size_t> count;
60     std::vector<worker> loops;
61
62 public:
63     event_loop()
64         : factory([](std::function<void()> start){
65             return std::thread(std::move(start));
66         })
67         , newthread(make_new_thread())
68         , count(0)
69     {
70         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
71         while (--remaining) {
72             loops.push_back(newthread.create_worker());
73         }
74     }
75     explicit event_loop(thread_factory tf)
76         : factory(tf)
77         , newthread(make_new_thread(tf))
78         , count(0)
79     {
80         auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
81         while (--remaining) {
82             loops.push_back(newthread.create_worker());
83         }
84     }
85     virtual ~event_loop()
86     {
87     }
88
89     virtual clock_type::time_point now() const {
90         return clock_type::now();
91     }
92
93     virtual worker create_worker(composite_subscription cs) const {
94         return worker(cs, std::shared_ptr<loop_worker>(new loop_worker(cs, loops[++count % loops.size()])));
95     }
96 };
97
98 inline scheduler make_event_loop() {
99     static auto loop = make_scheduler<event_loop>();
100     return loop;
101 }
102 inline scheduler make_event_loop(thread_factory tf) {
103     return make_scheduler<event_loop>(tf);
104 }
105
106 }
107
108 }
109
110 #endif