catch(...){}\r
\r
producer2.reset();\r
- CASPAR_LOG(debug) << str << L" Destroyed.";\r
pool->push(destroyer);\r
}); \r
}\r
virtual uint32_t nb_frames() const override {return (*producer_)->nb_frames();}\r
};\r
\r
-safe_ptr<core::frame_producer> create_producer_destroy_proxy(safe_ptr<core::frame_producer>&& producer)\r
+safe_ptr<core::frame_producer> create_producer_destroy_proxy(safe_ptr<core::frame_producer> producer)\r
{\r
return make_safe<destroy_producer_proxy>(std::move(producer));\r
}\r
\r
+class print_producer_proxy : public frame_producer\r
+{ \r
+ std::shared_ptr<frame_producer> producer_;\r
+public:\r
+ print_producer_proxy(safe_ptr<frame_producer>&& producer) \r
+ : producer_(std::move(producer))\r
+ {\r
+ CASPAR_LOG(info) << producer_->print() << L" Initialized";\r
+ }\r
+\r
+ ~print_producer_proxy()\r
+ { \r
+ auto str = producer_->print();\r
+ producer_.reset();\r
+ CASPAR_LOG(info) << str << L" Uninitialized";\r
+ }\r
+\r
+ virtual safe_ptr<basic_frame> receive(int hints) override {return (producer_)->receive(hints);}\r
+ virtual safe_ptr<basic_frame> last_frame() const override {return (producer_)->last_frame();}\r
+ virtual std::wstring print() const override {return (producer_)->print();}\r
+ virtual boost::property_tree::wptree info() const override {return (producer_)->info();}\r
+ virtual boost::unique_future<std::wstring> call(const std::wstring& str) override {return (producer_)->call(str);}\r
+ virtual safe_ptr<frame_producer> get_following_producer() const override {return (producer_)->get_following_producer();}\r
+ virtual void set_leading_producer(const safe_ptr<frame_producer>& producer) override {(producer_)->set_leading_producer(producer);}\r
+ virtual uint32_t nb_frames() const override {return (producer_)->nb_frames();}\r
+};\r
+\r
+safe_ptr<core::frame_producer> create_producer_print_proxy(safe_ptr<core::frame_producer> producer)\r
+{\r
+ return make_safe<print_producer_proxy>(std::move(producer));\r
+}\r
+\r
class last_frame_producer : public frame_producer\r
{\r
const std::wstring print_;\r
\r
if(producer == frame_producer::empty())\r
producer = create_playlist_producer(my_frame_factory, params);\r
+ \r
+ if(producer != frame_producer::empty())\r
+ producer = create_producer_print_proxy(producer);\r
\r
return producer;\r
}\r
#include <core/video_format.h>\r
\r
#include <common/diagnostics/graph.h>\r
+#include <common/concurrency/executor.h>\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
#pragma warning (pop)\r
#endif\r
\r
-namespace caspar { namespace ffmpeg {\r
-\r
static const size_t MAX_BUFFER_COUNT = 100;\r
static const size_t MIN_BUFFER_COUNT = 4;\r
static const size_t MAX_BUFFER_SIZE = 16 * 1000000;\r
- \r
+\r
+namespace caspar { namespace ffmpeg {\r
+ \r
struct input::implementation : boost::noncopyable\r
{ \r
- safe_ptr<diagnostics::graph> graph_;\r
+ const safe_ptr<diagnostics::graph> graph_;\r
\r
const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
const int default_stream_index_;\r
\r
const std::wstring filename_;\r
- tbb::atomic<bool> loop_;\r
const uint32_t start_; \r
const uint32_t length_;\r
+ tbb::atomic<bool> loop_;\r
uint32_t frame_number_;\r
\r
tbb::concurrent_bounded_queue<std::shared_ptr<AVPacket>> buffer_;\r
tbb::atomic<size_t> buffer_size_;\r
- boost::condition_variable buffer_cond_;\r
- boost::mutex buffer_mutex_;\r
\r
- boost::thread thread_;\r
- tbb::atomic<bool> is_running_;\r
- tbb::atomic<bool> is_eof_;\r
-\r
- tbb::recursive_mutex mutex_;\r
-\r
- explicit implementation(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length) \r
+ executor executor_;\r
+ \r
+ explicit implementation(const safe_ptr<diagnostics::graph> graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length) \r
: graph_(graph)\r
, format_context_(open_input(filename)) \r
, default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
, start_(start)\r
, length_(length)\r
, frame_number_(0)\r
+ , executor_(print())\r
{ \r
- is_eof_ = false;\r
loop_ = loop;\r
buffer_size_ = 0;\r
\r
if(start_ > 0) \r
- do_seek(start_);\r
+ queued_seek(start_);\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f)); \r
graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
- \r
- is_running_ = true;\r
- thread_ = boost::thread([this]{run();});\r
\r
- CASPAR_LOG(info) << print() << L" Initialized.";\r
+ tick();\r
}\r
-\r
- ~implementation()\r
- {\r
- is_running_ = false;\r
- buffer_cond_.notify_all();\r
- thread_.join();\r
- }\r
- \r
+ \r
bool try_pop(std::shared_ptr<AVPacket>& packet)\r
{\r
- const bool result = buffer_.try_pop(packet);\r
-\r
+ auto result = buffer_.try_pop(packet);\r
+ \r
if(result)\r
{\r
if(packet)\r
buffer_size_ -= packet->size;\r
- buffer_cond_.notify_all();\r
+ tick();\r
}\r
\r
graph_->set_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
graph_->set_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
-\r
+ \r
return result;\r
}\r
- \r
- void run()\r
- { \r
- caspar::win32_exception::install_handler();\r
\r
- try\r
+ void seek(uint32_t target)\r
+ {\r
+ executor_.begin_invoke([=]\r
{\r
- CASPAR_LOG(info) << print() << " Thread Started.";\r
+ std::shared_ptr<AVPacket> packet;\r
+ while(buffer_.try_pop(packet) && packet)\r
+ buffer_size_ -= packet->size;\r
\r
- while(is_running_)\r
- {\r
- {\r
- boost::unique_lock<boost::mutex> lock(buffer_mutex_);\r
- while(full())\r
- buffer_cond_.timed_wait(lock, boost::posix_time::millisec(20));\r
- }\r
- read_next_packet(); \r
- }\r
+ queued_seek(target);\r
\r
- CASPAR_LOG(info) << print() << " Thread Stopped.";\r
- }\r
- catch(...)\r
- {\r
- CASPAR_LOG_CURRENT_EXCEPTION();\r
- is_running_ = false;\r
- }\r
+ tick();\r
+ }, high_priority);\r
}\r
- \r
- void read_next_packet()\r
+ \r
+ std::wstring print() const\r
+ {\r
+ return L"ffmpeg_input[" + filename_ + L")]";\r
+ }\r
+ \r
+ void tick()\r
{ \r
- tbb::recursive_mutex::scoped_lock lock(mutex_);\r
-\r
- auto packet = create_packet();\r
- auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
- \r
- if(is_eof(ret)) \r
+ executor_.begin_invoke([this]\r
{\r
- frame_number_ = 0;\r
- is_eof_ = true;\r
-\r
- if(loop_)\r
+ auto packet = create_packet();\r
+ \r
+ auto ret = av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
+ \r
+ if(is_eof(ret)) \r
{\r
- do_seek(start_);\r
- graph_->set_tag("seek"); \r
- CASPAR_LOG(trace) << print() << " Looping."; \r
- } \r
- }\r
- else\r
- { \r
- THROW_ON_ERROR(ret, "av_read_frame", print());\r
+ frame_number_ = 0;\r
\r
- if(packet->stream_index == default_stream_index_)\r
- ++frame_number_;\r
+ if(loop_)\r
+ {\r
+ queued_seek(start_);\r
+ graph_->set_tag("seek"); \r
+ CASPAR_LOG(trace) << print() << " Looping."; \r
+ } \r
+ else\r
+ executor_.stop();\r
+ }\r
+ else\r
+ { \r
+ THROW_ON_ERROR(ret, "av_read_frame", print());\r
+\r
+ if(packet->stream_index == default_stream_index_)\r
+ ++frame_number_;\r
\r
- THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
\r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = packet->size;\r
- auto data = packet->data;\r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data;\r
\r
- packet = safe_ptr<AVPacket>(packet.get(), [packet, size, data](AVPacket*)\r
- {\r
- packet->size = size;\r
- packet->data = data;\r
- });\r
+ packet = safe_ptr<AVPacket>(packet.get(), [packet, size, data](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data; \r
+ });\r
\r
- buffer_.try_push(packet);\r
- buffer_size_ += packet->size;\r
+ buffer_.try_push(packet);\r
+ buffer_size_ += packet->size;\r
\r
- graph_->set_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
- graph_->set_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
- } \r
- }\r
+ graph_->set_value("buffer-size", (static_cast<double>(buffer_size_)+0.001)/MAX_BUFFER_SIZE);\r
+ graph_->set_value("buffer-count", (static_cast<double>(buffer_.size()+0.001)/MAX_BUFFER_COUNT));\r
+ } \r
\r
- bool full() const\r
- {\r
- return is_running_ && (is_eof_ || (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT) && buffer_.size() > MIN_BUFFER_COUNT);\r
- }\r
- \r
- void do_seek(const uint32_t target)\r
+ if((buffer_size_ < MAX_BUFFER_SIZE || buffer_.size() > MIN_BUFFER_COUNT) && buffer_.size() < MAX_BUFFER_COUNT && executor_.is_running()) \r
+ tick(); \r
+ });\r
+ } \r
+ \r
+ void queued_seek(const uint32_t target)\r
{ \r
CASPAR_LOG(debug) << print() << " Seeking: " << target;\r
\r
auto fixed_target = (target*stream->time_base.den*codec->time_base.num)/(stream->time_base.num*codec->time_base.den)*codec->ticks_per_frame;\r
\r
THROW_ON_ERROR2(avformat_seek_file(format_context_.get(), default_stream_index_, std::numeric_limits<int64_t>::min(), fixed_target, std::numeric_limits<int64_t>::max(), 0), print()); \r
-\r
- is_eof_ = false;\r
- buffer_cond_.notify_all();\r
-\r
+ \r
auto flush_packet = create_packet();\r
flush_packet->data = nullptr;\r
flush_packet->size = 0;\r
buffer_.push(flush_packet);\r
} \r
\r
- void seek(uint32_t target)\r
- {\r
- tbb::recursive_mutex::scoped_lock lock(mutex_);\r
-\r
- std::shared_ptr<AVPacket> packet;\r
- while(try_pop(packet))\r
- {\r
- }\r
-\r
- do_seek(target);\r
- }\r
-\r
bool is_eof(int ret)\r
{\r
if(ret == AVERROR(EIO))\r
\r
return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF;\r
}\r
- \r
- std::wstring print() const\r
- {\r
- return L"ffmpeg_input[" + filename_ + L")]";\r
- }\r
};\r
\r
input::input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, uint32_t start, uint32_t length) \r
: impl_(new implementation(graph, filename, loop, start, length)){}\r
-bool input::eof() const {return impl_->is_eof_;}\r
+bool input::eof() const {return !impl_->executor_.is_running();}\r
bool input::try_pop(std::shared_ptr<AVPacket>& packet){return impl_->try_pop(packet);}\r
safe_ptr<AVFormatContext> input::context(){return impl_->format_context_;}\r
void input::loop(bool value){impl_->loop_ = value;}\r