]> git.sesse.net Git - nageru/commitdiff
Make some more sleeps interruptable.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 14 Apr 2017 15:03:02 +0000 (17:03 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 14 Apr 2017 15:03:02 +0000 (17:03 +0200)
The pattern has now been encapsulated into a class of its own.

alsa_input.cpp
alsa_input.h
decklink_output.cpp
decklink_output.h
ffmpeg_capture.cpp
ffmpeg_capture.h
quittable_sleeper.h [new file with mode: 0644]

index 0d6901423b17085d65459708385a5fcb055218a4..3b59c00b6139ebe6737e0e2122fab66195b3760a 100644 (file)
@@ -157,13 +157,13 @@ ALSAInput::~ALSAInput()
 
 void ALSAInput::start_capture_thread()
 {
-       should_quit = false;
+       should_quit.unquit();
        capture_thread = thread(&ALSAInput::capture_thread_func, this);
 }
 
 void ALSAInput::stop_capture_thread()
 {
-       should_quit = true;
+       should_quit.quit();
        capture_thread.join();
 }
 
@@ -173,15 +173,15 @@ void ALSAInput::capture_thread_func()
 
        // If the device hasn't been opened already, we need to do so
        // before we can capture.
-       while (!should_quit && pcm_handle == nullptr) {
+       while (!should_quit.should_quit() && pcm_handle == nullptr) {
                if (!open_device()) {
                        fprintf(stderr, "[%s] Waiting one second and trying again...\n",
                                device.c_str());
-                       sleep(1);
+                       should_quit.sleep_for(seconds(1));
                }
        }
 
-       if (should_quit) {
+       if (should_quit.should_quit()) {
                // Don't call free_card(); that would be a deadlock.
                WARN_ON_ERROR("snd_pcm_close()", snd_pcm_close(pcm_handle));
                pcm_handle = nullptr;
@@ -205,7 +205,7 @@ void ALSAInput::capture_thread_func()
                        parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::STARTING);
                        fprintf(stderr, "[%s] Sleeping one second and restarting capture...\n",
                                device.c_str());
-                       sleep(1);
+                       should_quit.sleep_for(seconds(1));
                        break;
                }
        }
@@ -218,7 +218,7 @@ ALSAInput::CaptureEndReason ALSAInput::do_capture()
        parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::RUNNING);
 
        uint64_t num_frames_output = 0;
-       while (!should_quit) {
+       while (!should_quit.should_quit()) {
                int ret = snd_pcm_wait(pcm_handle, /*timeout=*/100);
                if (ret == 0) continue;  // Timeout.
                if (ret == -EPIPE) {
@@ -247,7 +247,7 @@ ALSAInput::CaptureEndReason ALSAInput::do_capture()
                const steady_clock::time_point now = steady_clock::now();
                bool success;
                do {
-                       if (should_quit) return CaptureEndReason::REQUESTED_QUIT;
+                       if (should_quit.should_quit()) return CaptureEndReason::REQUESTED_QUIT;
                        success = audio_callback(buffer.get(), frames, audio_format, pts - prev_pts, now);
                } while (!success);
                num_frames_output += frames;
index bae9fdf78413188cef9b9c2d86c06c67bda019dd..060b9212690fce317f9dfac701af37d597af908a 100644 (file)
@@ -20,6 +20,7 @@
 #include <thread>
 
 #include "bmusb/bmusb.h"
+#include "quittable_sleeper.h"
 
 class ALSAPool;
 
@@ -68,7 +69,7 @@ private:
 
        snd_pcm_t *pcm_handle = nullptr;
        std::thread capture_thread;
-       std::atomic<bool> should_quit{false};
+       QuittableSleeper should_quit;
        std::unique_ptr<uint8_t[]> buffer;
        ALSAPool *parent_pool;
        unsigned internal_dev_index;
index 3ce692bb6146c8413d61a4a3dcc7e1b40dfe2e87..d6ce684385dadff2a9234fc86a4b582993556734 100644 (file)
@@ -67,7 +67,7 @@ void DeckLinkOutput::start_output(uint32_t mode, int64_t base_pts)
        assert(output);
        assert(!playback_initiated);
 
-       should_quit = false;
+       should_quit.unquit();
        playback_initiated = true;
        playback_started = false;
        this->base_pts = base_pts;
@@ -160,7 +160,7 @@ void DeckLinkOutput::end_output()
                return;
        }
 
-       should_quit = true;
+       should_quit.quit();
        frame_queues_changed.notify_all();
        present_thread.join();
        playback_initiated = false;
@@ -181,7 +181,7 @@ void DeckLinkOutput::end_output()
 
 void DeckLinkOutput::send_frame(GLuint y_tex, GLuint cbcr_tex, YCbCrLumaCoefficients output_ycbcr_coefficients, const vector<RefCountedFrame> &input_frames, int64_t pts, int64_t duration)
 {
-       assert(!should_quit);
+       assert(!should_quit.should_quit());
 
        if ((current_mode_flags & bmdDisplayModeColorspaceRec601) && output_ycbcr_coefficients == YCBCR_REC_709) {
                if (!last_frame_had_mode_mismatch) {
@@ -271,7 +271,7 @@ void DeckLinkOutput::send_audio(int64_t pts, const std::vector<float> &samples)
 
 void DeckLinkOutput::wait_for_frame(int64_t pts, int *dropped_frames, int64_t *frame_duration, bool *is_preroll, steady_clock::time_point *frame_timestamp)
 {
-       assert(!should_quit);
+       assert(!should_quit.should_quit());
 
        *dropped_frames = 0;
        *frame_duration = this->frame_duration;
@@ -309,7 +309,7 @@ void DeckLinkOutput::wait_for_frame(int64_t pts, int *dropped_frames, int64_t *f
 
        // If we're ahead of time, wait for the frame to (approximately) start.
        if (stream_frame_time < target_time) {
-               this_thread::sleep_until(*frame_timestamp);
+               should_quit.sleep_until(*frame_timestamp);
                return;
        }
 
@@ -457,9 +457,9 @@ void DeckLinkOutput::present_thread_func()
                {
                         unique_lock<mutex> lock(frame_queue_mutex);
                         frame_queues_changed.wait(lock, [this]{
-                                return should_quit || !pending_video_frames.empty();
+                                return should_quit.should_quit() || !pending_video_frames.empty();
                         });
-                        if (should_quit) {
+                        if (should_quit.should_quit()) {
                                return;
                        }
                        frame = move(pending_video_frames.front());
index 7c0a17fad2d483c88a5232113f1bc1601bbb3bfb..a2954964bb4b1a388ae7dd634a052d41ed7fb3de 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "context.h"
 #include "print_latency.h"
+#include "quittable_sleeper.h"
 #include "ref_counted_frame.h"
 #include "ref_counted_gl_sync.h"
 
@@ -127,7 +128,7 @@ private:
        std::map<uint32_t, bmusb::VideoMode> video_modes;
 
        std::thread present_thread;
-       std::atomic<bool> should_quit{false};
+       QuittableSleeper should_quit;
 
        std::mutex frame_queue_mutex;
        std::queue<std::unique_ptr<Frame>> pending_video_frames;  // Under <frame_queue_mutex>.
index 77af9e654067f643ca411c7641ae11963a115414..5474c52fd94f511491206e4f8a92b2545bc87475 100644 (file)
@@ -78,7 +78,7 @@ void FFmpegCapture::start_bm_capture()
                return;
        }
        running = true;
-       producer_thread_should_quit = false;
+       producer_thread_should_quit.unquit();
        producer_thread = thread(&FFmpegCapture::producer_thread_func, this);
 }
 
@@ -88,7 +88,7 @@ void FFmpegCapture::stop_dequeue_thread()
                return;
        }
        running = false;
-       producer_thread_should_quit = true;
+       producer_thread_should_quit.quit();
        producer_thread.join();
 }
 
@@ -117,17 +117,17 @@ void FFmpegCapture::producer_thread_func()
        snprintf(thread_name, sizeof(thread_name), "FFmpeg_C_%d", card_index);
        pthread_setname_np(pthread_self(), thread_name);
 
-       while (!producer_thread_should_quit) {
+       while (!producer_thread_should_quit.should_quit()) {
                string pathname = search_for_file(filename);
                if (filename.empty()) {
                        fprintf(stderr, "%s not found, sleeping one second and trying again...\n", filename.c_str());
-                       sleep(1);
+                       producer_thread_should_quit.sleep_for(seconds(1));
                        continue;
                }
                if (!play_video(pathname)) {
                        // Error.
                        fprintf(stderr, "Error when playing %s, sleeping one second and trying again...\n", pathname.c_str());
-                       sleep(1);
+                       producer_thread_should_quit.sleep_for(seconds(1));
                        continue;
                }
 
@@ -198,7 +198,7 @@ bool FFmpegCapture::play_video(const string &pathname)
        int sws_last_width = -1, sws_last_height = -1;
 
        // Main loop.
-       while (!producer_thread_should_quit) {
+       while (!producer_thread_should_quit.should_quit()) {
                // Process any queued commands from other threads.
                vector<QueuedCommand> commands;
                {
@@ -307,8 +307,7 @@ bool FFmpegCapture::play_video(const string &pathname)
                 audio_format.bits_per_sample = 32;
                 audio_format.num_channels = 8;
 
-               // TODO: Make it interruptable somehow.
-               this_thread::sleep_until(next_frame_start);
+               producer_thread_should_quit.sleep_until(next_frame_start);
                frame_callback(timecode++,
                        video_frame, 0, video_format,
                        audio_frame, 0, audio_format);
index e6b4ed481f6b151e8a687a1f7b0aec5dab7d0472..24e6b5b40d3f9b780e71facdb7fb0dc347e5723c 100644 (file)
@@ -31,6 +31,7 @@
 #include <thread>
 
 #include "bmusb/bmusb.h"
+#include "quittable_sleeper.h"
 
 class FFmpegCapture : public bmusb::CaptureInterface
 {
@@ -157,7 +158,7 @@ private:
        std::unique_ptr<bmusb::FrameAllocator> owned_audio_frame_allocator;
        bmusb::frame_callback_t frame_callback = nullptr;
 
-       std::atomic<bool> producer_thread_should_quit{false};
+       QuittableSleeper producer_thread_should_quit;
        std::thread producer_thread;
 
        std::mutex queue_mu;
diff --git a/quittable_sleeper.h b/quittable_sleeper.h
new file mode 100644 (file)
index 0000000..ba00e73
--- /dev/null
@@ -0,0 +1,59 @@
+#ifndef _QUITTABLE_SLEEPER
+#define _QUITTABLE_SLEEPER 1
+
+// A class that assists with fast shutdown of threads. You can set
+// a flag that says the thread should quit, which it can then check
+// in a loop -- and if the thread sleeps (using the sleep_* functions
+// on the class), that sleep will immediately be aborted.
+//
+// All member functions on this class are thread-safe.
+
+#include <chrono>
+#include <mutex>
+
+class QuittableSleeper {
+public:
+       void quit()
+       {
+               std::lock_guard<std::mutex> l(mu);
+               should_quit_var = true;
+               quit_cond.notify_all();
+       }
+
+       void unquit()
+       {
+               std::lock_guard<std::mutex> l(mu);
+               should_quit_var = false;
+       }
+
+       bool should_quit() const
+       {
+               std::lock_guard<std::mutex> l(mu);
+               return should_quit_var;
+       }
+
+       template<class Rep, class Period>
+       void sleep_for(const std::chrono::duration<Rep, Period> &duration)
+       {
+               std::chrono::steady_clock::time_point t =
+                       std::chrono::steady_clock::now() +
+                       std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
+               sleep_until(t);
+       }
+
+       template<class Clock, class Duration>
+       void sleep_until(const std::chrono::time_point<Clock, Duration> &t)
+       {
+               std::unique_lock<std::mutex> lock(mu);
+               quit_cond.wait_until(lock, t, [this]{
+                       return should_quit_var;
+               });
+       }
+
+private:
+       mutable std::mutex mu;
+       bool should_quit_var = false;
+       std::condition_variable quit_cond;
+};
+
+#endif  // !defined(_QUITTABLE_SLEEPER)