]> git.sesse.net Git - casparcg/blob - dependencies/boost/boost/asio/detail/impl/epoll_reactor.ipp
Manually merged pull request #222
[casparcg] / dependencies / boost / boost / asio / detail / impl / epoll_reactor.ipp
1 //
2 // detail/impl/epoll_reactor.ipp
3 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
12 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
13
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17
18 #include <boost/asio/detail/config.hpp>
19
20 #if defined(BOOST_ASIO_HAS_EPOLL)
21
22 #include <cstddef>
23 #include <sys/epoll.h>
24 #include <boost/asio/detail/epoll_reactor.hpp>
25 #include <boost/asio/detail/throw_error.hpp>
26 #include <boost/asio/error.hpp>
27
28 #if defined(BOOST_ASIO_HAS_TIMERFD)
29 # include <sys/timerfd.h>
30 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
31
32 #include <boost/asio/detail/push_options.hpp>
33
34 namespace boost {
35 namespace asio {
36 namespace detail {
37
38 epoll_reactor::epoll_reactor(boost::asio::io_service& io_service)
39   : boost::asio::detail::service_base<epoll_reactor>(io_service),
40     io_service_(use_service<io_service_impl>(io_service)),
41     mutex_(),
42     interrupter_(),
43     epoll_fd_(do_epoll_create()),
44     timer_fd_(do_timerfd_create()),
45     shutdown_(false)
46 {
47   // Add the interrupter's descriptor to epoll.
48   epoll_event ev = { 0, { 0 } };
49   ev.events = EPOLLIN | EPOLLERR | EPOLLET;
50   ev.data.ptr = &interrupter_;
51   epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
52   interrupter_.interrupt();
53
54   // Add the timer descriptor to epoll.
55   if (timer_fd_ != -1)
56   {
57     ev.events = EPOLLIN | EPOLLERR;
58     ev.data.ptr = &timer_fd_;
59     epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
60   }
61 }
62
63 epoll_reactor::~epoll_reactor()
64 {
65   if (epoll_fd_ != -1)
66     close(epoll_fd_);
67   if (timer_fd_ != -1)
68     close(timer_fd_);
69 }
70
71 void epoll_reactor::shutdown_service()
72 {
73   mutex::scoped_lock lock(mutex_);
74   shutdown_ = true;
75   lock.unlock();
76
77   op_queue<operation> ops;
78
79   while (descriptor_state* state = registered_descriptors_.first())
80   {
81     for (int i = 0; i < max_ops; ++i)
82       ops.push(state->op_queue_[i]);
83     state->shutdown_ = true;
84     registered_descriptors_.free(state);
85   }
86
87   timer_queues_.get_all_timers(ops);
88
89   io_service_.abandon_operations(ops);
90 }
91
92 void epoll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
93 {
94   if (fork_ev == boost::asio::io_service::fork_child)
95   {
96     if (epoll_fd_ != -1)
97       ::close(epoll_fd_);
98     epoll_fd_ = -1;
99     epoll_fd_ = do_epoll_create();
100
101     if (timer_fd_ != -1)
102       ::close(timer_fd_);
103     timer_fd_ = -1;
104     timer_fd_ = do_timerfd_create();
105
106     interrupter_.recreate();
107
108     // Add the interrupter's descriptor to epoll.
109     epoll_event ev = { 0, { 0 } };
110     ev.events = EPOLLIN | EPOLLERR | EPOLLET;
111     ev.data.ptr = &interrupter_;
112     epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
113     interrupter_.interrupt();
114
115     // Add the timer descriptor to epoll.
116     if (timer_fd_ != -1)
117     {
118       ev.events = EPOLLIN | EPOLLERR;
119       ev.data.ptr = &timer_fd_;
120       epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
121     }
122
123     update_timeout();
124
125     // Re-register all descriptors with epoll.
126     mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
127     for (descriptor_state* state = registered_descriptors_.first();
128         state != 0; state = state->next_)
129     {
130       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
131       ev.data.ptr = state;
132       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
133       if (result != 0)
134       {
135         boost::system::error_code ec(errno,
136             boost::asio::error::get_system_category());
137         boost::asio::detail::throw_error(ec, "epoll re-registration");
138       }
139     }
140   }
141 }
142
143 void epoll_reactor::init_task()
144 {
145   io_service_.init_task();
146 }
147
148 int epoll_reactor::register_descriptor(socket_type descriptor,
149     epoll_reactor::per_descriptor_data& descriptor_data)
150 {
151   descriptor_data = allocate_descriptor_state();
152
153   {
154     mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
155
156     descriptor_data->reactor_ = this;
157     descriptor_data->descriptor_ = descriptor;
158     descriptor_data->shutdown_ = false;
159
160     for (int i = 0; i < max_ops; ++i)
161       descriptor_data->op_queue_is_empty_[i] =
162         descriptor_data->op_queue_[i].empty();
163   }
164
165   epoll_event ev = { 0, { 0 } };
166   ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
167   ev.data.ptr = descriptor_data;
168   int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
169   if (result != 0)
170     return errno;
171
172   return 0;
173 }
174
175 int epoll_reactor::register_internal_descriptor(
176     int op_type, socket_type descriptor,
177     epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
178 {
179   descriptor_data = allocate_descriptor_state();
180
181   {
182     mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
183
184     descriptor_data->reactor_ = this;
185     descriptor_data->descriptor_ = descriptor;
186     descriptor_data->shutdown_ = false;
187     descriptor_data->op_queue_[op_type].push(op);
188
189     for (int i = 0; i < max_ops; ++i)
190       descriptor_data->op_queue_is_empty_[i] =
191         descriptor_data->op_queue_[i].empty();
192   }
193
194   epoll_event ev = { 0, { 0 } };
195   ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLOUT | EPOLLPRI | EPOLLET;
196   ev.data.ptr = descriptor_data;
197   int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
198   if (result != 0)
199     return errno;
200
201   return 0;
202 }
203
204 void epoll_reactor::move_descriptor(socket_type,
205     epoll_reactor::per_descriptor_data& target_descriptor_data,
206     epoll_reactor::per_descriptor_data& source_descriptor_data)
207 {
208   target_descriptor_data = source_descriptor_data;
209   source_descriptor_data = 0;
210 }
211
212 void epoll_reactor::start_op(int op_type, socket_type descriptor,
213     epoll_reactor::per_descriptor_data& descriptor_data,
214     reactor_op* op, bool allow_speculative)
215 {
216   if (!descriptor_data)
217   {
218     op->ec_ = boost::asio::error::bad_descriptor;
219     post_immediate_completion(op);
220     return;
221   }
222
223   bool perform_speculative = allow_speculative;
224   if (perform_speculative)
225   {
226     if (descriptor_data->op_queue_is_empty_[op_type]
227         && (op_type != read_op
228           || descriptor_data->op_queue_is_empty_[except_op]))
229     {
230       if (op->perform())
231       {
232         io_service_.post_immediate_completion(op);
233         return;
234       }
235       perform_speculative = false;
236     }
237   }
238
239   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
240
241   if (descriptor_data->shutdown_)
242   {
243     post_immediate_completion(op);
244     return;
245   }
246
247   for (int i = 0; i < max_ops; ++i)
248     descriptor_data->op_queue_is_empty_[i] =
249       descriptor_data->op_queue_[i].empty();
250
251   if (descriptor_data->op_queue_is_empty_[op_type])
252   {
253     if (allow_speculative)
254     {
255       if (perform_speculative
256           && (op_type != read_op
257             || descriptor_data->op_queue_is_empty_[except_op]))
258       {
259         if (op->perform())
260         {
261           descriptor_lock.unlock();
262           io_service_.post_immediate_completion(op);
263           return;
264         }
265       }
266     }
267     else
268     {
269       epoll_event ev = { 0, { 0 } };
270       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP
271         | EPOLLOUT | EPOLLPRI | EPOLLET;
272       ev.data.ptr = descriptor_data;
273       epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
274     }
275   }
276
277   descriptor_data->op_queue_[op_type].push(op);
278   descriptor_data->op_queue_is_empty_[op_type] = false;
279   io_service_.work_started();
280 }
281
282 void epoll_reactor::cancel_ops(socket_type,
283     epoll_reactor::per_descriptor_data& descriptor_data)
284 {
285   if (!descriptor_data)
286     return;
287
288   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
289
290   op_queue<operation> ops;
291   for (int i = 0; i < max_ops; ++i)
292   {
293     while (reactor_op* op = descriptor_data->op_queue_[i].front())
294     {
295       op->ec_ = boost::asio::error::operation_aborted;
296       descriptor_data->op_queue_[i].pop();
297       ops.push(op);
298     }
299   }
300
301   descriptor_lock.unlock();
302
303   io_service_.post_deferred_completions(ops);
304 }
305
306 void epoll_reactor::deregister_descriptor(socket_type descriptor,
307     epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
308 {
309   if (!descriptor_data)
310     return;
311
312   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
313
314   if (!descriptor_data->shutdown_)
315   {
316     if (closing)
317     {
318       // The descriptor will be automatically removed from the epoll set when
319       // it is closed.
320     }
321     else
322     {
323       epoll_event ev = { 0, { 0 } };
324       epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
325     }
326
327     op_queue<operation> ops;
328     for (int i = 0; i < max_ops; ++i)
329     {
330       while (reactor_op* op = descriptor_data->op_queue_[i].front())
331       {
332         op->ec_ = boost::asio::error::operation_aborted;
333         descriptor_data->op_queue_[i].pop();
334         ops.push(op);
335       }
336     }
337
338     descriptor_data->descriptor_ = -1;
339     descriptor_data->shutdown_ = true;
340
341     descriptor_lock.unlock();
342
343     free_descriptor_state(descriptor_data);
344     descriptor_data = 0;
345
346     io_service_.post_deferred_completions(ops);
347   }
348 }
349
350 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
351     epoll_reactor::per_descriptor_data& descriptor_data)
352 {
353   if (!descriptor_data)
354     return;
355
356   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
357
358   if (!descriptor_data->shutdown_)
359   {
360     epoll_event ev = { 0, { 0 } };
361     epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
362
363     op_queue<operation> ops;
364     for (int i = 0; i < max_ops; ++i)
365       ops.push(descriptor_data->op_queue_[i]);
366
367     descriptor_data->descriptor_ = -1;
368     descriptor_data->shutdown_ = true;
369
370     descriptor_lock.unlock();
371
372     free_descriptor_state(descriptor_data);
373     descriptor_data = 0;
374   }
375 }
376
377 void epoll_reactor::run(bool block, op_queue<operation>& ops)
378 {
379   // This code relies on the fact that the task_io_service queues the reactor
380   // task behind all descriptor operations generated by this function. This
381   // means, that by the time we reach this point, any previously returned
382   // descriptor operations have already been dequeued. Therefore it is now safe
383   // for us to reuse and return them for the task_io_service to queue again.
384
385   // Calculate a timeout only if timerfd is not used.
386   int timeout;
387   if (timer_fd_ != -1)
388     timeout = block ? -1 : 0;
389   else
390   {
391     mutex::scoped_lock lock(mutex_);
392     timeout = block ? get_timeout() : 0;
393   }
394
395   // Block on the epoll descriptor.
396   epoll_event events[128];
397   int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
398
399 #if defined(BOOST_ASIO_HAS_TIMERFD)
400   bool check_timers = (timer_fd_ == -1);
401 #else // defined(BOOST_ASIO_HAS_TIMERFD)
402   bool check_timers = true;
403 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
404
405   // Dispatch the waiting events.
406   for (int i = 0; i < num_events; ++i)
407   {
408     void* ptr = events[i].data.ptr;
409     if (ptr == &interrupter_)
410     {
411       // No need to reset the interrupter since we're leaving the descriptor
412       // in a ready-to-read state and relying on edge-triggered notifications
413       // to make it so that we only get woken up when the descriptor's epoll
414       // registration is updated.
415
416 #if defined(BOOST_ASIO_HAS_TIMERFD)
417       if (timer_fd_ == -1)
418         check_timers = true;
419 #else // defined(BOOST_ASIO_HAS_TIMERFD)
420       check_timers = true;
421 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
422     }
423 #if defined(BOOST_ASIO_HAS_TIMERFD)
424     else if (ptr == &timer_fd_)
425     {
426       check_timers = true;
427     }
428 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
429     else
430     {
431       // The descriptor operation doesn't count as work in and of itself, so we
432       // don't call work_started() here. This still allows the io_service to
433       // stop if the only remaining operations are descriptor operations.
434       descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
435       descriptor_data->set_ready_events(events[i].events);
436       ops.push(descriptor_data);
437     }
438   }
439
440   if (check_timers)
441   {
442     mutex::scoped_lock common_lock(mutex_);
443     timer_queues_.get_ready_timers(ops);
444
445 #if defined(BOOST_ASIO_HAS_TIMERFD)
446     if (timer_fd_ != -1)
447     {
448       itimerspec new_timeout;
449       itimerspec old_timeout;
450       int flags = get_timeout(new_timeout);
451       timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
452     }
453 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
454   }
455 }
456
457 void epoll_reactor::interrupt()
458 {
459   epoll_event ev = { 0, { 0 } };
460   ev.events = EPOLLIN | EPOLLERR | EPOLLET;
461   ev.data.ptr = &interrupter_;
462   epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
463 }
464
465 int epoll_reactor::do_epoll_create()
466 {
467 #if defined(EPOLL_CLOEXEC)
468   int fd = epoll_create1(EPOLL_CLOEXEC);
469 #else // defined(EPOLL_CLOEXEC)
470   int fd = -1;
471   errno = EINVAL;
472 #endif // defined(EPOLL_CLOEXEC)
473
474   if (fd == -1 && errno == EINVAL)
475   {
476     fd = epoll_create(epoll_size);
477     if (fd != -1)
478       ::fcntl(fd, F_SETFD, FD_CLOEXEC);
479   }
480
481   if (fd == -1)
482   {
483     boost::system::error_code ec(errno,
484         boost::asio::error::get_system_category());
485     boost::asio::detail::throw_error(ec, "epoll");
486   }
487
488   return fd;
489 }
490
491 int epoll_reactor::do_timerfd_create()
492 {
493 #if defined(BOOST_ASIO_HAS_TIMERFD)
494 # if defined(TFD_CLOEXEC)
495   int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
496 # else // defined(TFD_CLOEXEC)
497   int fd = -1;
498   errno = EINVAL;
499 # endif // defined(TFD_CLOEXEC)
500
501   if (fd == -1 && errno == EINVAL)
502   {
503     fd = timerfd_create(CLOCK_MONOTONIC, 0);
504     if (fd != -1)
505       ::fcntl(fd, F_SETFD, FD_CLOEXEC);
506   }
507
508   return fd;
509 #else // defined(BOOST_ASIO_HAS_TIMERFD)
510   return -1;
511 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
512 }
513
514 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
515 {
516   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
517   return registered_descriptors_.alloc();
518 }
519
520 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
521 {
522   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
523   registered_descriptors_.free(s);
524 }
525
526 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
527 {
528   mutex::scoped_lock lock(mutex_);
529   timer_queues_.insert(&queue);
530 }
531
532 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
533 {
534   mutex::scoped_lock lock(mutex_);
535   timer_queues_.erase(&queue);
536 }
537
538 void epoll_reactor::update_timeout()
539 {
540 #if defined(BOOST_ASIO_HAS_TIMERFD)
541   if (timer_fd_ != -1)
542   {
543     itimerspec new_timeout;
544     itimerspec old_timeout;
545     int flags = get_timeout(new_timeout);
546     timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
547     return;
548   }
549 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
550   interrupt();
551 }
552
553 int epoll_reactor::get_timeout()
554 {
555   // By default we will wait no longer than 5 minutes. This will ensure that
556   // any changes to the system clock are detected after no longer than this.
557   return timer_queues_.wait_duration_msec(5 * 60 * 1000);
558 }
559
560 #if defined(BOOST_ASIO_HAS_TIMERFD)
561 int epoll_reactor::get_timeout(itimerspec& ts)
562 {
563   ts.it_interval.tv_sec = 0;
564   ts.it_interval.tv_nsec = 0;
565
566   long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
567   ts.it_value.tv_sec = usec / 1000000;
568   ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
569
570   return usec ? 0 : TFD_TIMER_ABSTIME;
571 }
572 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
573
574 struct epoll_reactor::perform_io_cleanup_on_block_exit
575 {
576   explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
577     : reactor_(r), first_op_(0)
578   {
579   }
580
581   ~perform_io_cleanup_on_block_exit()
582   {
583     if (first_op_)
584     {
585       // Post the remaining completed operations for invocation.
586       if (!ops_.empty())
587         reactor_->io_service_.post_deferred_completions(ops_);
588
589       // A user-initiated operation has completed, but there's no need to
590       // explicitly call work_finished() here. Instead, we'll take advantage of
591       // the fact that the task_io_service will call work_finished() once we
592       // return.
593     }
594     else
595     {
596       // No user-initiated operations have completed, so we need to compensate
597       // for the work_finished() call that the task_io_service will make once
598       // this operation returns.
599       reactor_->io_service_.work_started();
600     }
601   }
602
603   epoll_reactor* reactor_;
604   op_queue<operation> ops_;
605   operation* first_op_;
606 };
607
608 epoll_reactor::descriptor_state::descriptor_state()
609   : operation(&epoll_reactor::descriptor_state::do_complete)
610 {
611 }
612
613 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
614 {
615   perform_io_cleanup_on_block_exit io_cleanup(reactor_);
616   mutex::scoped_lock descriptor_lock(mutex_);
617
618   // Exception operations must be processed first to ensure that any
619   // out-of-band data is read before normal data.
620   static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
621   for (int j = max_ops - 1; j >= 0; --j)
622   {
623     if (events & (flag[j] | EPOLLERR | EPOLLHUP))
624     {
625       while (reactor_op* op = op_queue_[j].front())
626       {
627         if (op->perform())
628         {
629           op_queue_[j].pop();
630           io_cleanup.ops_.push(op);
631         }
632         else
633           break;
634       }
635     }
636   }
637
638   // The first operation will be returned for completion now. The others will
639   // be posted for later by the io_cleanup object's destructor.
640   io_cleanup.first_op_ = io_cleanup.ops_.front();
641   io_cleanup.ops_.pop();
642   return io_cleanup.first_op_;
643 }
644
645 void epoll_reactor::descriptor_state::do_complete(
646     io_service_impl* owner, operation* base,
647     const boost::system::error_code& ec, std::size_t bytes_transferred)
648 {
649   if (owner)
650   {
651     descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
652     uint32_t events = static_cast<uint32_t>(bytes_transferred);
653     if (operation* op = descriptor_data->perform_io(events))
654     {
655       op->complete(*owner, ec, 0);
656     }
657   }
658 }
659
660 } // namespace detail
661 } // namespace asio
662 } // namespace boost
663
664 #include <boost/asio/detail/pop_options.hpp>
665
666 #endif // defined(BOOST_ASIO_HAS_EPOLL)
667
668 #endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP