#include <common/except.h>
#include <common/memshfl.h>
#include <common/array.h>
+#include <common/future.h>
#include <core/consumer/frame_consumer.h>
spl::shared_ptr<diagnostics::graph> graph_;
boost::timer tick_timer_;
+ retry_task<bool> send_completion_;
public:
decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index)
graph_->set_tag("flushed-frame");
auto frame = core::const_frame::empty();
- video_frame_buffer_.pop(frame);
+ video_frame_buffer_.pop(frame);
+ send_completion_.try_completion();
schedule_next_video(frame);
unsigned long buffered;
{
auto frame = core::const_frame::empty();
audio_frame_buffer_.pop(frame);
+ send_completion_.try_completion();
schedule_next_audio(frame.audio_data());
}
tick_timer_.restart();
}
- void send(core::const_frame frame)
+ boost::unique_future<bool> send(core::const_frame frame)
{
auto exception = lock(exception_mutex_, [&]
{
if(!is_running_)
CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(u8(print()) + " Is not running."));
- if(config_.embedded_audio)
- audio_frame_buffer_.push(frame);
- video_frame_buffer_.push(frame);
+ bool audio_ready = !config_.embedded_audio;
+ bool video_ready = false;
+
+ auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>
+ {
+ if (!audio_ready)
+ audio_ready = audio_frame_buffer_.try_push(frame);
+
+ if (!video_ready)
+ video_ready = video_frame_buffer_.try_push(frame);
+
+ if (audio_ready && video_ready)
+ return true;
+ else
+ return boost::optional<bool>();
+ };
+
+ if (enqueue_task())
+ return wrap_as_future(true);
+
+ send_completion_.set_task(enqueue_task);
+
+ return send_completion_.get_future();
}
std::wstring print() const
});
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
- consumer_->send(frame);
- return true;
+ return consumer_->send(frame);
}
std::wstring print() const override