The pattern has now been encapsulated into a class of its own.
void ALSAInput::start_capture_thread()
{
- should_quit = false;
+ should_quit.unquit();
capture_thread = thread(&ALSAInput::capture_thread_func, this);
}
void ALSAInput::stop_capture_thread()
{
- should_quit = true;
+ should_quit.quit();
capture_thread.join();
}
// If the device hasn't been opened already, we need to do so
// before we can capture.
- while (!should_quit && pcm_handle == nullptr) {
+ while (!should_quit.should_quit() && pcm_handle == nullptr) {
if (!open_device()) {
fprintf(stderr, "[%s] Waiting one second and trying again...\n",
device.c_str());
- sleep(1);
+ should_quit.sleep_for(seconds(1));
}
}
- if (should_quit) {
+ if (should_quit.should_quit()) {
// Don't call free_card(); that would be a deadlock.
WARN_ON_ERROR("snd_pcm_close()", snd_pcm_close(pcm_handle));
pcm_handle = nullptr;
parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::STARTING);
fprintf(stderr, "[%s] Sleeping one second and restarting capture...\n",
device.c_str());
- sleep(1);
+ should_quit.sleep_for(seconds(1));
break;
}
}
parent_pool->set_card_state(internal_dev_index, ALSAPool::Device::State::RUNNING);
uint64_t num_frames_output = 0;
- while (!should_quit) {
+ while (!should_quit.should_quit()) {
int ret = snd_pcm_wait(pcm_handle, /*timeout=*/100);
if (ret == 0) continue; // Timeout.
if (ret == -EPIPE) {
const steady_clock::time_point now = steady_clock::now();
bool success;
do {
- if (should_quit) return CaptureEndReason::REQUESTED_QUIT;
+ if (should_quit.should_quit()) return CaptureEndReason::REQUESTED_QUIT;
success = audio_callback(buffer.get(), frames, audio_format, pts - prev_pts, now);
} while (!success);
num_frames_output += frames;
#include <thread>
#include "bmusb/bmusb.h"
+#include "quittable_sleeper.h"
class ALSAPool;
snd_pcm_t *pcm_handle = nullptr;
std::thread capture_thread;
- std::atomic<bool> should_quit{false};
+ QuittableSleeper should_quit;
std::unique_ptr<uint8_t[]> buffer;
ALSAPool *parent_pool;
unsigned internal_dev_index;
assert(output);
assert(!playback_initiated);
- should_quit = false;
+ should_quit.unquit();
playback_initiated = true;
playback_started = false;
this->base_pts = base_pts;
return;
}
- should_quit = true;
+ should_quit.quit();
frame_queues_changed.notify_all();
present_thread.join();
playback_initiated = false;
void DeckLinkOutput::send_frame(GLuint y_tex, GLuint cbcr_tex, YCbCrLumaCoefficients output_ycbcr_coefficients, const vector<RefCountedFrame> &input_frames, int64_t pts, int64_t duration)
{
- assert(!should_quit);
+ assert(!should_quit.should_quit());
if ((current_mode_flags & bmdDisplayModeColorspaceRec601) && output_ycbcr_coefficients == YCBCR_REC_709) {
if (!last_frame_had_mode_mismatch) {
void DeckLinkOutput::wait_for_frame(int64_t pts, int *dropped_frames, int64_t *frame_duration, bool *is_preroll, steady_clock::time_point *frame_timestamp)
{
- assert(!should_quit);
+ assert(!should_quit.should_quit());
*dropped_frames = 0;
*frame_duration = this->frame_duration;
// If we're ahead of time, wait for the frame to (approximately) start.
if (stream_frame_time < target_time) {
- this_thread::sleep_until(*frame_timestamp);
+ should_quit.sleep_until(*frame_timestamp);
return;
}
{
unique_lock<mutex> lock(frame_queue_mutex);
frame_queues_changed.wait(lock, [this]{
- return should_quit || !pending_video_frames.empty();
+ return should_quit.should_quit() || !pending_video_frames.empty();
});
- if (should_quit) {
+ if (should_quit.should_quit()) {
return;
}
frame = move(pending_video_frames.front());
#include "context.h"
#include "print_latency.h"
+#include "quittable_sleeper.h"
#include "ref_counted_frame.h"
#include "ref_counted_gl_sync.h"
std::map<uint32_t, bmusb::VideoMode> video_modes;
std::thread present_thread;
- std::atomic<bool> should_quit{false};
+ QuittableSleeper should_quit;
std::mutex frame_queue_mutex;
std::queue<std::unique_ptr<Frame>> pending_video_frames; // Under <frame_queue_mutex>.
return;
}
running = true;
- producer_thread_should_quit = false;
+ producer_thread_should_quit.unquit();
producer_thread = thread(&FFmpegCapture::producer_thread_func, this);
}
return;
}
running = false;
- producer_thread_should_quit = true;
+ producer_thread_should_quit.quit();
producer_thread.join();
}
snprintf(thread_name, sizeof(thread_name), "FFmpeg_C_%d", card_index);
pthread_setname_np(pthread_self(), thread_name);
- while (!producer_thread_should_quit) {
+ while (!producer_thread_should_quit.should_quit()) {
string pathname = search_for_file(filename);
if (filename.empty()) {
fprintf(stderr, "%s not found, sleeping one second and trying again...\n", filename.c_str());
- sleep(1);
+ producer_thread_should_quit.sleep_for(seconds(1));
continue;
}
if (!play_video(pathname)) {
// Error.
fprintf(stderr, "Error when playing %s, sleeping one second and trying again...\n", pathname.c_str());
- sleep(1);
+ producer_thread_should_quit.sleep_for(seconds(1));
continue;
}
int sws_last_width = -1, sws_last_height = -1;
// Main loop.
- while (!producer_thread_should_quit) {
+ while (!producer_thread_should_quit.should_quit()) {
// Process any queued commands from other threads.
vector<QueuedCommand> commands;
{
audio_format.bits_per_sample = 32;
audio_format.num_channels = 8;
- // TODO: Make it interruptable somehow.
- this_thread::sleep_until(next_frame_start);
+ producer_thread_should_quit.sleep_until(next_frame_start);
frame_callback(timecode++,
video_frame, 0, video_format,
audio_frame, 0, audio_format);
#include <thread>
#include "bmusb/bmusb.h"
+#include "quittable_sleeper.h"
class FFmpegCapture : public bmusb::CaptureInterface
{
std::unique_ptr<bmusb::FrameAllocator> owned_audio_frame_allocator;
bmusb::frame_callback_t frame_callback = nullptr;
- std::atomic<bool> producer_thread_should_quit{false};
+ QuittableSleeper producer_thread_should_quit;
std::thread producer_thread;
std::mutex queue_mu;
--- /dev/null
+#ifndef _QUITTABLE_SLEEPER
+#define _QUITTABLE_SLEEPER 1
+
+// A class that assists with fast shutdown of threads. You can set
+// a flag that says the thread should quit, which it can then check
+// in a loop -- and if the thread sleeps (using the sleep_* functions
+// on the class), that sleep will immediately be aborted.
+//
+// All member functions on this class are thread-safe.
+
+#include <chrono>
+#include <mutex>
+
+class QuittableSleeper {
+public:
+ void quit()
+ {
+ std::lock_guard<std::mutex> l(mu);
+ should_quit_var = true;
+ quit_cond.notify_all();
+ }
+
+ void unquit()
+ {
+ std::lock_guard<std::mutex> l(mu);
+ should_quit_var = false;
+ }
+
+ bool should_quit() const
+ {
+ std::lock_guard<std::mutex> l(mu);
+ return should_quit_var;
+ }
+
+ template<class Rep, class Period>
+ void sleep_for(const std::chrono::duration<Rep, Period> &duration)
+ {
+ std::chrono::steady_clock::time_point t =
+ std::chrono::steady_clock::now() +
+ std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
+ sleep_until(t);
+ }
+
+ template<class Clock, class Duration>
+ void sleep_until(const std::chrono::time_point<Clock, Duration> &t)
+ {
+ std::unique_lock<std::mutex> lock(mu);
+ quit_cond.wait_until(lock, t, [this]{
+ return should_quit_var;
+ });
+ }
+
+private:
+ mutable std::mutex mu;
+ bool should_quit_var = false;
+ std::condition_variable quit_cond;
+};
+
+#endif // !defined(_QUITTABLE_SLEEPER)