]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Mayor solution reconfiguration.
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 \r
14 namespace caspar {\r
15 \r
16 namespace detail {\r
17 \r
18 typedef struct tagTHREADNAME_INFO\r
19 {\r
20         DWORD dwType; // must be 0x1000\r
21         LPCSTR szName; // pointer to name (in user addr space)\r
22         DWORD dwThreadID; // thread ID (-1=caller thread)\r
23         DWORD dwFlags; // reserved for future use, must be zero\r
24 } THREADNAME_INFO;\r
25 \r
26 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
27 {\r
28         THREADNAME_INFO info;\r
29         {\r
30                 info.dwType = 0x1000;\r
31                 info.szName = szThreadName;\r
32                 info.dwThreadID = dwThreadID;\r
33                 info.dwFlags = 0;\r
34         }\r
35         __try\r
36         {\r
37                 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
38         }\r
39         __except (EXCEPTION_CONTINUE_EXECUTION){}       \r
40 }\r
41 \r
42 }\r
43 \r
44 class executor : boost::noncopyable\r
45 {\r
46         const std::string name_;\r
47         boost::thread thread_;\r
48         tbb::atomic<bool> is_running_;\r
49         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
50 public:\r
51                 \r
52         explicit executor(const std::wstring& name = L"executor") : name_(narrow(name))\r
53         {\r
54                 is_running_ = false;\r
55         }\r
56         \r
57         virtual ~executor()\r
58         {\r
59                 stop();\r
60                 clear();\r
61                 if(boost::this_thread::get_id() != thread_.get_id())\r
62                         thread_.join();\r
63         }\r
64 \r
65         void set_capacity(size_t capacity)\r
66         {\r
67                 execution_queue_.set_capacity(capacity);\r
68         }\r
69 \r
70         void start() // noexcept\r
71         {\r
72                 if(is_running_.fetch_and_store(true))\r
73                         return;\r
74                 clear();\r
75                 thread_ = boost::thread([this]{run();});\r
76         }\r
77                         \r
78         void stop() // noexcept\r
79         {\r
80                 is_running_ = false;    \r
81                 execution_queue_.try_push([]{});\r
82         }\r
83         \r
84         void clear()\r
85         {\r
86                 std::function<void()> func;\r
87                 auto size = execution_queue_.size();\r
88                 for(int n = 0; n < size; ++n)\r
89                 {\r
90                         try\r
91                         {\r
92                                 if(!execution_queue_.try_pop(func))\r
93                                         return;\r
94                         }\r
95                         catch(boost::broken_promise&){}\r
96                 }\r
97         }\r
98                         \r
99         template<typename Func>\r
100         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
101         {       \r
102                 typedef boost::packaged_task<decltype(func())> task_type;\r
103                                 \r
104                 auto task = task_type(std::forward<Func>(func));\r
105                 auto future = task.get_future();\r
106                 \r
107                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
108                 {\r
109                         try\r
110                         {\r
111                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
112                                         my_task();\r
113                         }\r
114                         catch(boost::task_already_started&){}\r
115                 }));\r
116                                 \r
117                 struct task_adaptor_t\r
118                 {\r
119                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
120                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
121                         void operator()() const { task(); }\r
122                         mutable task_type task;\r
123                 } task_adaptor(std::move(task));\r
124 \r
125                 execution_queue_.push([=]\r
126                 {\r
127                         try{task_adaptor();}\r
128                         catch(boost::task_already_started&){}\r
129                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
130                 });\r
131 \r
132                 return std::move(future);               \r
133         }\r
134         \r
135         template<typename Func>\r
136         auto invoke(Func&& func) -> decltype(func())\r
137         {\r
138                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
139                         return func();\r
140                 \r
141                 return begin_invoke(std::forward<Func>(func)).get();\r
142         }\r
143         \r
144         tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
145         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
146         bool empty() const              { return execution_queue_.empty();      }\r
147         bool is_running() const { return is_running_;                           }       \r
148                 \r
149 private:\r
150         \r
151         void execute() // noexcept\r
152         {\r
153                 std::function<void()> func;\r
154                 execution_queue_.pop(func);     \r
155                 func();\r
156         }\r
157 \r
158         void run() // noexcept\r
159         {\r
160                 win32_exception::install_handler();             \r
161                 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
162                 while(is_running_)\r
163                         execute();\r
164         }       \r
165 };\r
166 \r
167 }