});
}
+/**
+ * A utility that helps the producer side of a future when the task is not
+ * able to complete immediately but there are known retry points in the code.
+ */
+template<class R>
+class retry_task
+{
+public:
+ typedef boost::function<boost::optional<R> ()> func_type;
+
+ retry_task() : done_(false) {}
+
+ /**
+ * Reset the state with a new task. If the previous task has not completed
+ * the old one will be discarded.
+ *
+ * @param func The function that tries to calculate future result. If the
+ * optional return value is set the future is marked as ready.
+ */
+ void set_task(const func_type& func)
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ func_ = func;
+ done_ = false;
+ promise_ = boost::promise<R>();
+ }
+
+ /**
+ * Take ownership of the future for the current task. Cannot only be called
+ * once for each task.
+ *
+ * @return the future.
+ */
+ boost::unique_future<R> get_future()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ return promise_.get_future();
+ }
+
+ /**
+ * Call this when it is guaranteed or probable that the task will be able
+ * to complete.
+ *
+ * @return true if the task completed (the future will have a result).
+ */
+ bool try_completion()
+ {
+ boost::mutex::scoped_lock lock(mutex_);
+
+ if (!func_)
+ return false;
+
+ if (done_)
+ return true;
+
+ boost::optional<R> result;
+
+ try
+ {
+ result = func_();
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(boost::current_exception());
+ done_ = true;
+
+ return true;
+ }
+
+ if (result)
+ {
+ promise_.set_value(*result);
+ done_ = true;
+ }
+
+ return done_;
+ }
+
+ /**
+ * Call this when it is certain that the result should be ready, and if not
+ * it should be regarded as an unrecoverable error (retrying again would
+ * be useless), so the future will be marked as failed.
+ *
+ * @param exception The exception to mark the future with *if* the task
+ * completion fails.
+ */
+ void try_or_fail(const std::exception& exception)
+ {
+ if (!try_completion())
+ {
+ try
+ {
+ throw exception;
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ promise_.set_exception(boost::current_exception());
+ done_ = true;
+ }
+ }
+ }
+private:
+ boost::mutex mutex_;
+ func_type func_;
+ boost::promise<R> promise_;
+ bool done_;
+};
+
+/**
+ * Wrap a value in a future with an already known result.
+ * <p>
+ * Useful when the result of an operation is already known at the time of
+ * calling.
+ *
+ * @param value The r-value to wrap.
+ *
+ * @return The future with the result set.
+ */
+template<class R>
+boost::unique_future<R> wrap_as_future(R&& value)
+{
+ boost::promise<R> p;
+
+ p.set_value(value);
+
+ return p.get_future();
+}
+
}
\ No newline at end of file
#include "frame_consumer.h"
#include <common/except.h>
+#include <common/future.h>
#include <core/video_format.h>
#include <core/frame/frame.h>
}).detach();
}
- bool send(const_frame frame) override {return consumer_->send(std::move(frame));}
+ boost::unique_future<bool> send(const_frame frame) override {return consumer_->send(std::move(frame));}
virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);}
std::wstring print() const override {return consumer_->print();}
std::wstring name() const override {return consumer_->name();}
CASPAR_LOG(info) << str << L" Uninitialized.";
}
- bool send(const_frame frame) override {return consumer_->send(std::move(frame));}
+ boost::unique_future<bool> send(const_frame frame) override {return consumer_->send(std::move(frame));}
virtual void initialize(const struct video_format_desc& format_desc, int channel_index) override {return consumer_->initialize(format_desc, channel_index);}
std::wstring print() const override {return consumer_->print();}
std::wstring name() const override {return consumer_->name();}
{
}
- virtual bool send(const_frame frame)
+ virtual boost::unique_future<bool> send(const_frame frame)
{
try
{
{
CASPAR_LOG_CURRENT_EXCEPTION();
CASPAR_LOG(error) << print() << " Failed to recover consumer.";
- return false;
+ return wrap_as_future(false);
}
}
}
consumer_->initialize(format_desc, channel_index);
}
- bool send(const_frame frame) override
+ boost::unique_future<bool> send(const_frame frame) override
{
if(audio_cadence_.size() == 1)
return consumer_->send(frame);
- bool result = true;
+ boost::unique_future<bool> result = wrap_as_future(true);
if(boost::range::equal(sync_buffer_, audio_cadence_) && audio_cadence_.front() == static_cast<int>(frame.audio_data().size()))
{
sync_buffer_.push_back(static_cast<int>(frame.audio_data().size()));
- return result;
+ return std::move(result);
}
std::wstring print() const override {return consumer_->print();}
class empty_frame_consumer : public frame_consumer
{
public:
- bool send(const_frame) override {return false;}
+ boost::unique_future<bool> send(const_frame) override {return wrap_as_future(false);}
void initialize(const video_format_desc&, int) override{}
std::wstring print() const override {return L"empty";}
std::wstring name() const override {return L"empty";}
#include <common/memory.h>
#include <boost/property_tree/ptree_fwd.hpp>
+#include <boost/thread/future.hpp>
#include <functional>
#include <string>
#include <vector>
namespace caspar { namespace core {
-
+
// Interface
class frame_consumer : public monitor::observable
{
// Methods
- virtual bool send(class const_frame frame) = 0;
+ virtual boost::unique_future<bool> send(class const_frame frame) = 0;
virtual void initialize(const struct video_format_desc& format_desc, int channel_index) = 0;
// monitor::observable
if(!frames_.full())
return;
- for(auto it = ports_.begin(); it != ports_.end();)
+ std::map<int, boost::unique_future<bool>> send_results;
+
+ // Start invocations
+ for (auto it = ports_.begin(); it != ports_.end(); ++it)
{
auto& port = it->second;
auto& frame = frames_.at(port.buffer_depth()-minmax.first);
try
{
- if(port.send(frame))
- ++it;
- else
- ports_.erase(it++);
+ send_results.insert(std::make_pair(it->first, port.send(frame)));
}
- catch(...)
+ catch (...)
{
CASPAR_LOG_CURRENT_EXCEPTION();
- ports_.erase(it++);
+ ports_.erase(it);
+ }
+ }
+
+ // Retrieve results
+ for (auto it = send_results.begin(); it != send_results.end(); ++it)
+ {
+ try
+ {
+ if (!it->second.get())
+ ports_.erase(it->first);
+ }
+ catch (...)
+ {
+ CASPAR_LOG_CURRENT_EXCEPTION();
+ ports_.erase(it->first);
}
}
});
consumer_->initialize(format_desc, channel_index_);
}
- bool send(const_frame frame)
+ boost::unique_future<bool> send(const_frame frame)
{
event_subject_ << monitor::event("type") % consumer_->name();
return consumer_->send(std::move(frame));
port::port(port&& other) : impl_(std::move(other.impl_)){}
port::~port(){}
port& port::operator=(port&& other){impl_ = std::move(other.impl_); return *this;}
-bool port::send(const_frame frame){return impl_->send(std::move(frame));}
+boost::unique_future<bool> port::send(const_frame frame){return impl_->send(std::move(frame));}
void port::subscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.subscribe(o);}
void port::unsubscribe(const monitor::observable::observer_ptr& o){impl_->event_subject_.unsubscribe(o);}
void port::video_format_desc(const struct video_format_desc& format_desc){impl_->video_format_desc(format_desc);}
#include <common/memory.h>
#include <boost/property_tree/ptree_fwd.hpp>
+#include <boost/thread/future.hpp>
namespace caspar { namespace core {
port& operator=(port&& other);
- bool send(class const_frame frame);
+ boost::unique_future<bool> send(class const_frame frame);
// monitor::observable
CASPAR_LOG(error)<< print() << TEXT(" Failed to disable video output.");
}
- void send(core::const_frame& frame)
+ boost::unique_future<bool> send(core::const_frame& frame)
{
- executor_.begin_invoke([=]
+ return executor_.begin_invoke([=]() -> bool
{
try
{
{
CASPAR_LOG_CURRENT_EXCEPTION();
}
+
+ return true;
});
}
consumer_.reset(new bluefish_consumer(format_desc, device_index_, embedded_audio_, key_only_, channel_index));
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
- consumer_->send(frame);
- return true;
+ return consumer_->send(frame);
}
std::wstring print() const override
#include <common/except.h>
#include <common/memshfl.h>
#include <common/array.h>
+#include <common/future.h>
#include <core/consumer/frame_consumer.h>
spl::shared_ptr<diagnostics::graph> graph_;
boost::timer tick_timer_;
+ retry_task<bool> send_completion_;
public:
decklink_consumer(const configuration& config, const core::video_format_desc& format_desc, int channel_index)
graph_->set_tag("flushed-frame");
auto frame = core::const_frame::empty();
- video_frame_buffer_.pop(frame);
+ video_frame_buffer_.pop(frame);
+ send_completion_.try_completion();
schedule_next_video(frame);
unsigned long buffered;
{
auto frame = core::const_frame::empty();
audio_frame_buffer_.pop(frame);
+ send_completion_.try_completion();
schedule_next_audio(frame.audio_data());
}
tick_timer_.restart();
}
- void send(core::const_frame frame)
+ boost::unique_future<bool> send(core::const_frame frame)
{
auto exception = lock(exception_mutex_, [&]
{
if(!is_running_)
CASPAR_THROW_EXCEPTION(caspar_exception() << msg_info(u8(print()) + " Is not running."));
- if(config_.embedded_audio)
- audio_frame_buffer_.push(frame);
- video_frame_buffer_.push(frame);
+ bool audio_ready = !config_.embedded_audio;
+ bool video_ready = false;
+
+ auto enqueue_task = [audio_ready, video_ready, frame, this]() mutable -> boost::optional<bool>
+ {
+ if (!audio_ready)
+ audio_ready = audio_frame_buffer_.try_push(frame);
+
+ if (!video_ready)
+ video_ready = video_frame_buffer_.try_push(frame);
+
+ if (audio_ready && video_ready)
+ return true;
+ else
+ return boost::optional<bool>();
+ };
+
+ if (enqueue_task())
+ return wrap_as_future(true);
+
+ send_completion_.set_task(enqueue_task);
+
+ return send_completion_.get_future();
}
std::wstring print() const
});
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
- consumer_->send(frame);
- return true;
+ return consumer_->send(frame);
}
std::wstring print() const override
// frame_consumer
- bool send(core::const_frame& frame)
+ boost::unique_future<bool> send(core::const_frame& frame)
{
auto exception = lock(exception_mutex_, [&]
{
if(exception != nullptr)
std::rethrow_exception(exception);
- executor_.begin_invoke([=]
+ return executor_.begin_invoke([=]() -> bool
{
encode(frame);
+
+ return true;
});
-
- return true;
}
std::wstring print() const
consumer_.reset(new ffmpeg_consumer(u8(filename_), format_desc, options_));
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
return consumer_->send(frame);
}
#include <common/log.h>
#include <common/utf.h>
#include <common/array.h>
+#include <common/future.h>
#include <core/consumer/frame_consumer.h>
#include <core/video_format.h>
{
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
boost::thread async([frame]
{
});
async.detach();
- return false;
+ return wrap_as_future(false);
}
std::wstring print() const override
#include <common/log.h>
#include <common/utf.h>
#include <common/env.h>
+#include <common/future.h>
#include <core/consumer/frame_consumer.h>
#include <core/frame/frame.h>
});
}
- bool send(core::const_frame frame) override
- {
+ boost::unique_future<bool> send(core::const_frame frame) override
+ {
+ // Will only block if the default executor queue capacity of 512 is
+ // exhausted, which should not happen
executor_.begin_invoke([=]
{
ALenum state;
perf_timer_.restart();
});
- return true;
+ return wrap_as_future(true);
}
std::wstring print() const override
#include <common/memshfl.h>
#include <common/utf.h>
#include <common/prec_timer.h>
+#include <common/future.h>
#include <ffmpeg/producer/filter/filter.h>
}
- bool send(core::const_frame frame)
+ boost::unique_future<bool> send(core::const_frame frame)
{
if(!frame_buffer_.try_push(frame))
graph_->set_tag("dropped-frame");
- return is_running_;
+
+ return wrap_as_future(is_running_.load());
}
std::wstring print() const
consumer_.reset(new screen_consumer(config_, format_desc, channel_index));
}
- bool send(core::const_frame frame) override
+ boost::unique_future<bool> send(core::const_frame frame) override
{
return consumer_->send(frame);
}