From: Steinar H. Gunderson Date: Fri, 14 Apr 2017 15:03:02 +0000 (+0200) Subject: Make some more sleeps interruptable. X-Git-Tag: 1.6.0~51 X-Git-Url: https://git.sesse.net/?p=nageru;a=commitdiff_plain;h=f07adb19f0e2571bf4894ec57e6fcfe4a3e5fd95 Make some more sleeps interruptable. The pattern has now been encapsulated into a class of its own. --- diff --git a/alsa_input.cpp b/alsa_input.cpp index 0d69014..3b59c00 100644 --- a/alsa_input.cpp +++ b/alsa_input.cpp @@ -157,13 +157,13 @@ ALSAInput::~ALSAInput() void ALSAInput::start_capture_thread() { - should_quit = false; + should_quit.unquit(); capture_thread = thread(&ALSAInput::capture_thread_func, this); } void ALSAInput::stop_capture_thread() { - should_quit = true; + should_quit.quit(); capture_thread.join(); } @@ -173,15 +173,15 @@ void ALSAInput::capture_thread_func() // If the device hasn't been opened already, we need to do so // before we can capture. - while (!should_quit && pcm_handle == nullptr) { + while (!should_quit.should_quit() && pcm_handle == nullptr) { if (!open_device()) { fprintf(stderr, "[%s] Waiting one second and trying again...\n", device.c_str()); - sleep(1); + should_quit.sleep_for(seconds(1)); } } - if (should_quit) { + if (should_quit.should_quit()) { // Don't call free_card(); that would be a deadlock. WARN_ON_ERROR("snd_pcm_close()", snd_pcm_close(pcm_handle)); pcm_handle = nullptr; @@ -205,7 +205,7 @@ void ALSAInput::capture_thread_func() parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::STARTING); fprintf(stderr, "[%s] Sleeping one second and restarting capture...\n", device.c_str()); - sleep(1); + should_quit.sleep_for(seconds(1)); break; } } @@ -218,7 +218,7 @@ ALSAInput::CaptureEndReason ALSAInput::do_capture() parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::RUNNING); uint64_t num_frames_output = 0; - while (!should_quit) { + while (!should_quit.should_quit()) { int ret = snd_pcm_wait(pcm_handle, /*timeout=*/100); if (ret == 0) continue; // Timeout. if (ret == -EPIPE) { @@ -247,7 +247,7 @@ ALSAInput::CaptureEndReason ALSAInput::do_capture() const steady_clock::time_point now = steady_clock::now(); bool success; do { - if (should_quit) return CaptureEndReason::REQUESTED_QUIT; + if (should_quit.should_quit()) return CaptureEndReason::REQUESTED_QUIT; success = audio_callback(buffer.get(), frames, audio_format, pts - prev_pts, now); } while (!success); num_frames_output += frames; diff --git a/alsa_input.h b/alsa_input.h index bae9fdf..060b921 100644 --- a/alsa_input.h +++ b/alsa_input.h @@ -20,6 +20,7 @@ #include #include "bmusb/bmusb.h" +#include "quittable_sleeper.h" class ALSAPool; @@ -68,7 +69,7 @@ private: snd_pcm_t *pcm_handle = nullptr; std::thread capture_thread; - std::atomic should_quit{false}; + QuittableSleeper should_quit; std::unique_ptr buffer; ALSAPool *parent_pool; unsigned internal_dev_index; diff --git a/decklink_output.cpp b/decklink_output.cpp index 3ce692b..d6ce684 100644 --- a/decklink_output.cpp +++ b/decklink_output.cpp @@ -67,7 +67,7 @@ void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts) assert(output); assert(!playback_initiated); - should_quit = false; + should_quit.unquit(); playback_initiated = true; playback_started = false; this->base_pts = base_pts; @@ -160,7 +160,7 @@ void DeckLinkOutput::end_output() return; } - should_quit = true; + should_quit.quit(); frame_queues_changed.notify_all(); present_thread.join(); playback_initiated = false; @@ -181,7 +181,7 @@ void DeckLinkOutput::end_output() void DeckLinkOutput::send_frame(GLuint y_tex, GLuint cbcr_tex, YCbCrLumaCoefficients output_ycbcr_coefficients, const vector &input_frames, int64_t pts, int64_t duration) { - assert(!should_quit); + assert(!should_quit.should_quit()); if ((current_mode_flags & bmdDisplayModeColorspaceRec601) && output_ycbcr_coefficients == YCBCR_REC_709) { if (!last_frame_had_mode_mismatch) { @@ -271,7 +271,7 @@ void DeckLinkOutput::send_audio(int64_t pts, const std::vector &samples) void DeckLinkOutput::wait_for_frame(int64_t pts, int *dropped_frames, int64_t *frame_duration, bool *is_preroll, steady_clock::time_point *frame_timestamp) { - assert(!should_quit); + assert(!should_quit.should_quit()); *dropped_frames = 0; *frame_duration = this->frame_duration; @@ -309,7 +309,7 @@ void DeckLinkOutput::wait_for_frame(int64_t pts, int *dropped_frames, int64_t *f // If we're ahead of time, wait for the frame to (approximately) start. if (stream_frame_time < target_time) { - this_thread::sleep_until(*frame_timestamp); + should_quit.sleep_until(*frame_timestamp); return; } @@ -457,9 +457,9 @@ void DeckLinkOutput::present_thread_func() { unique_lock lock(frame_queue_mutex); frame_queues_changed.wait(lock, [this]{ - return should_quit || !pending_video_frames.empty(); + return should_quit.should_quit() || !pending_video_frames.empty(); }); - if (should_quit) { + if (should_quit.should_quit()) { return; } frame = move(pending_video_frames.front()); diff --git a/decklink_output.h b/decklink_output.h index 7c0a17f..a295496 100644 --- a/decklink_output.h +++ b/decklink_output.h @@ -19,6 +19,7 @@ #include "context.h" #include "print_latency.h" +#include "quittable_sleeper.h" #include "ref_counted_frame.h" #include "ref_counted_gl_sync.h" @@ -127,7 +128,7 @@ private: std::map video_modes; std::thread present_thread; - std::atomic should_quit{false}; + QuittableSleeper should_quit; std::mutex frame_queue_mutex; std::queue> pending_video_frames; // Under . diff --git a/ffmpeg_capture.cpp b/ffmpeg_capture.cpp index 77af9e6..5474c52 100644 --- a/ffmpeg_capture.cpp +++ b/ffmpeg_capture.cpp @@ -78,7 +78,7 @@ void FFmpegCapture::start_bm_capture() return; } running = true; - producer_thread_should_quit = false; + producer_thread_should_quit.unquit(); producer_thread = thread(&FFmpegCapture::producer_thread_func, this); } @@ -88,7 +88,7 @@ void FFmpegCapture::stop_dequeue_thread() return; } running = false; - producer_thread_should_quit = true; + producer_thread_should_quit.quit(); producer_thread.join(); } @@ -117,17 +117,17 @@ void FFmpegCapture::producer_thread_func() snprintf(thread_name, sizeof(thread_name), "FFmpeg_C_%d", card_index); pthread_setname_np(pthread_self(), thread_name); - while (!producer_thread_should_quit) { + while (!producer_thread_should_quit.should_quit()) { string pathname = search_for_file(filename); if (filename.empty()) { fprintf(stderr, "%s not found, sleeping one second and trying again...\n", filename.c_str()); - sleep(1); + producer_thread_should_quit.sleep_for(seconds(1)); continue; } if (!play_video(pathname)) { // Error. fprintf(stderr, "Error when playing %s, sleeping one second and trying again...\n", pathname.c_str()); - sleep(1); + producer_thread_should_quit.sleep_for(seconds(1)); continue; } @@ -198,7 +198,7 @@ bool FFmpegCapture::play_video(const string &pathname) int sws_last_width = -1, sws_last_height = -1; // Main loop. - while (!producer_thread_should_quit) { + while (!producer_thread_should_quit.should_quit()) { // Process any queued commands from other threads. vector commands; { @@ -307,8 +307,7 @@ bool FFmpegCapture::play_video(const string &pathname) audio_format.bits_per_sample = 32; audio_format.num_channels = 8; - // TODO: Make it interruptable somehow. - this_thread::sleep_until(next_frame_start); + producer_thread_should_quit.sleep_until(next_frame_start); frame_callback(timecode++, video_frame, 0, video_format, audio_frame, 0, audio_format); diff --git a/ffmpeg_capture.h b/ffmpeg_capture.h index e6b4ed4..24e6b5b 100644 --- a/ffmpeg_capture.h +++ b/ffmpeg_capture.h @@ -31,6 +31,7 @@ #include #include "bmusb/bmusb.h" +#include "quittable_sleeper.h" class FFmpegCapture : public bmusb::CaptureInterface { @@ -157,7 +158,7 @@ private: std::unique_ptr owned_audio_frame_allocator; bmusb::frame_callback_t frame_callback = nullptr; - std::atomic producer_thread_should_quit{false}; + QuittableSleeper producer_thread_should_quit; std::thread producer_thread; std::mutex queue_mu; diff --git a/quittable_sleeper.h b/quittable_sleeper.h new file mode 100644 index 0000000..ba00e73 --- /dev/null +++ b/quittable_sleeper.h @@ -0,0 +1,59 @@ +#ifndef _QUITTABLE_SLEEPER +#define _QUITTABLE_SLEEPER 1 + +// A class that assists with fast shutdown of threads. You can set +// a flag that says the thread should quit, which it can then check +// in a loop -- and if the thread sleeps (using the sleep_* functions +// on the class), that sleep will immediately be aborted. +// +// All member functions on this class are thread-safe. + +#include +#include + +class QuittableSleeper { +public: + void quit() + { + std::lock_guard l(mu); + should_quit_var = true; + quit_cond.notify_all(); + } + + void unquit() + { + std::lock_guard l(mu); + should_quit_var = false; + } + + bool should_quit() const + { + std::lock_guard l(mu); + return should_quit_var; + } + + template + void sleep_for(const std::chrono::duration &duration) + { + std::chrono::steady_clock::time_point t = + std::chrono::steady_clock::now() + + std::chrono::duration_cast(duration); + sleep_until(t); + } + + template + void sleep_until(const std::chrono::time_point &t) + { + std::unique_lock lock(mu); + quit_cond.wait_until(lock, t, [this]{ + return should_quit_var; + }); + } + +private: + mutable std::mutex mu; + bool should_quit_var = false; + std::condition_variable quit_cond; +}; + +#endif // !defined(_QUITTABLE_SLEEPER)