]> git.sesse.net Git - casparcg/blob - common/concurrency/executor.h
2.0.0.2: Fixed consumer-device shutdown dead-lock.
[casparcg] / common / concurrency / executor.h
1 #pragma once\r
2 \r
3 #include "../exception/win32_exception.h"\r
4 #include "../log/log.h"\r
5 \r
6 #include <tbb/atomic.h>\r
7 #include <tbb/concurrent_queue.h>\r
8 \r
9 #include <boost/thread.hpp>\r
10 #include <boost/noncopyable.hpp>\r
11 \r
12 #include <functional>\r
13 \r
14 namespace caspar {\r
15 \r
16 namespace detail {\r
17 \r
18 typedef struct tagTHREADNAME_INFO\r
19 {\r
20         DWORD dwType; // must be 0x1000\r
21         LPCSTR szName; // pointer to name (in user addr space)\r
22         DWORD dwThreadID; // thread ID (-1=caller thread)\r
23         DWORD dwFlags; // reserved for future use, must be zero\r
24 } THREADNAME_INFO;\r
25 \r
26 inline void SetThreadName(DWORD dwThreadID, LPCSTR szThreadName)\r
27 {\r
28         THREADNAME_INFO info;\r
29         {\r
30                 info.dwType = 0x1000;\r
31                 info.szName = szThreadName;\r
32                 info.dwThreadID = dwThreadID;\r
33                 info.dwFlags = 0;\r
34         }\r
35         __try\r
36         {\r
37                 RaiseException( 0x406D1388, 0, sizeof(info)/sizeof(DWORD), (DWORD*)&info );\r
38         }\r
39         __except (EXCEPTION_CONTINUE_EXECUTION)\r
40         {\r
41         }       \r
42 }\r
43 \r
44 }\r
45 \r
46 class executor : boost::noncopyable\r
47 {\r
48 public:\r
49                 \r
50         explicit executor(const std::wstring& name = L"executor") : name_(narrow(name))\r
51         {\r
52                 is_running_ = false;\r
53         }\r
54         \r
55         virtual ~executor()\r
56         {\r
57                 stop();\r
58                 clear();\r
59                 if(boost::this_thread::get_id() != thread_.get_id())\r
60                         thread_.join();\r
61         }\r
62 \r
63         void set_capacity(size_t capacity)\r
64         {\r
65                 execution_queue_.set_capacity(capacity);\r
66         }\r
67 \r
68         void start() // noexcept\r
69         {\r
70                 if(is_running_.fetch_and_store(true))\r
71                         return;\r
72                 clear();\r
73                 thread_ = boost::thread([this]{run();});\r
74         }\r
75                         \r
76         void stop() // noexcept\r
77         {\r
78                 is_running_ = false;    \r
79                 execution_queue_.try_push([]{});\r
80         }\r
81         \r
82         void clear()\r
83         {\r
84                 std::function<void()> func;\r
85                 auto size = execution_queue_.size();\r
86                 for(int n = 0; n < size; ++n)\r
87                 {\r
88                         try\r
89                         {\r
90                                 if(!execution_queue_.try_pop(func))\r
91                                         return;\r
92                         }\r
93                         catch(boost::broken_promise&){}\r
94                 }\r
95         }\r
96                         \r
97         template<typename Func>\r
98         auto begin_invoke(Func&& func) -> boost::unique_future<decltype(func())> // noexcept\r
99         {       \r
100                 typedef boost::packaged_task<decltype(func())> task_type;\r
101                                 \r
102                 auto task = task_type(std::forward<Func>(func));\r
103                 auto future = task.get_future();\r
104                 \r
105                 task.set_wait_callback(std::function<void(task_type&)>([=](task_type& my_task) // The std::function wrapper is required in order to add ::result_type to functor class.\r
106                 {\r
107                         try\r
108                         {\r
109                                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
110                                         my_task();\r
111                         }\r
112                         catch(boost::task_already_started&){}\r
113                 }));\r
114                                 \r
115                 struct task_adaptor_t\r
116                 {\r
117                         task_adaptor_t(const task_adaptor_t& other) : task(std::move(other.task)){}\r
118                         task_adaptor_t(task_type&& task) : task(std::move(task)){}\r
119                         void operator()() const { task(); }\r
120                         mutable task_type task;\r
121                 } task_adaptor(std::move(task));\r
122 \r
123                 execution_queue_.push([=]\r
124                 {\r
125                         try{task_adaptor();}\r
126                         catch(boost::task_already_started&){}\r
127                         catch(...){CASPAR_LOG_CURRENT_EXCEPTION();}\r
128                 });\r
129 \r
130                 return std::move(future);               \r
131         }\r
132         \r
133         template<typename Func>\r
134         auto invoke(Func&& func) -> decltype(func())\r
135         {\r
136                 if(boost::this_thread::get_id() == thread_.get_id())  // Avoids potential deadlock.\r
137                         return func();\r
138                 \r
139                 return begin_invoke(std::forward<Func>(func)).get();\r
140         }\r
141         \r
142         tbb::concurrent_bounded_queue<std::function<void()>>::size_type capacity() const { return execution_queue_.capacity();  }\r
143         tbb::concurrent_bounded_queue<std::function<void()>>::size_type size() const { return execution_queue_.size();  }\r
144         bool empty() const              { return execution_queue_.empty();      }\r
145         bool is_running() const { return is_running_;                           }       \r
146                 \r
147 private:\r
148         \r
149         void execute() // noexcept\r
150         {\r
151                 std::function<void()> func;\r
152                 execution_queue_.pop(func);     \r
153                 func();\r
154         }\r
155 \r
156         void run() // noexcept\r
157         {\r
158                 win32_exception::install_handler();             \r
159                 detail::SetThreadName(GetCurrentThreadId(), name_.c_str());\r
160                 while(is_running_)\r
161                         execute();\r
162         }\r
163         \r
164         std::string name_;\r
165         boost::thread thread_;\r
166         tbb::atomic<bool> is_running_;\r
167         tbb::concurrent_bounded_queue<std::function<void()>> execution_queue_;\r
168 };\r
169 \r
170 }