#include "../stdafx.h"\r
\r
#include "input.h"\r
+#include "util.h"\r
#include "../ffmpeg_error.h"\r
-#include "../tbb_avcodec.h"\r
\r
#include <core/video_format.h>\r
\r
#include <common/exception/exceptions.h>\r
#include <common/exception/win32_exception.h>\r
\r
-#include <tbb/concurrent_queue.h>\r
#include <tbb/atomic.h>\r
\r
-#include <boost/range/algorithm.hpp>\r
-#include <boost/thread/condition_variable.hpp>\r
-#include <boost/thread/mutex.hpp>\r
-#include <boost/thread/thread.hpp>\r
+#include <agents.h>\r
+#include <concrt_extras.h>\r
+#include <semaphore.h>\r
\r
#if defined(_MSC_VER)\r
#pragma warning (push)\r
#pragma warning (pop)\r
#endif\r
\r
-namespace caspar {\r
+using namespace Concurrency;\r
\r
-static const size_t MAX_BUFFER_COUNT = 128;\r
-static const size_t MAX_BUFFER_SIZE = 32 * 1000000;\r
- \r
-struct input::implementation : boost::noncopyable\r
-{ \r
- std::shared_ptr<AVFormatContext> format_context_; // Destroy this last\r
- int default_stream_index_;\r
+namespace caspar { namespace ffmpeg {\r
\r
- safe_ptr<diagnostics::graph> graph_;\r
- \r
- const std::wstring filename_;\r
- const bool loop_;\r
- const int start_; \r
+static const size_t MAX_TOKENS = 32;\r
\r
- tbb::concurrent_bounded_queue<std::shared_ptr<AVPacket>> buffer_;\r
- tbb::atomic<size_t> buffer_size_;\r
- boost::condition_variable buffer_cond_;\r
- boost::mutex buffer_mutex_;\r
- \r
- boost::thread thread_;\r
- tbb::atomic<bool> is_running_;\r
+struct input::implementation : public Concurrency::agent, boost::noncopyable\r
+{\r
+ input::target_t& target_;\r
\r
- tbb::atomic<size_t> nb_frames_;\r
- tbb::atomic<size_t> nb_loops_;\r
+ const std::wstring filename_;\r
+ const safe_ptr<AVFormatContext> format_context_; // Destroy this last\r
+ int default_stream_index_;\r
+ const boost::iterator_range<AVStream**> streams_;\r
\r
-public:\r
- explicit implementation(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, int start) \r
- : graph_(graph)\r
- , loop_(loop)\r
- , filename_(filename)\r
- , start_(std::max(start, 0))\r
- { \r
- is_running_ = true;\r
- nb_frames_ = 0;\r
- nb_loops_ = 0;\r
+ safe_ptr<diagnostics::graph> graph_;\r
\r
- AVFormatContext* weak_format_context_ = nullptr;\r
- THROW_ON_ERROR2(avformat_open_input(&weak_format_context_, narrow(filename).c_str(), nullptr, nullptr), print());\r
-\r
- format_context_.reset(weak_format_context_, av_close_input_file);\r
-\r
- //av_dump_format(weak_format_context_, 0, narrow(filename).c_str(), 0);\r
+ const bool loop_;\r
+ const size_t start_; \r
+ const size_t length_;\r
+ size_t frame_number_;\r
\r
- THROW_ON_ERROR2(avformat_find_stream_info(format_context_.get(), nullptr), print());\r
- \r
- default_stream_index_ = THROW_ON_ERROR2(av_find_default_stream_index(format_context_.get()), print());\r
-\r
- if(start_ != 0) \r
- seek_frame(start_);\r
- \r
- for(int n = 0; n < 16 && !full(); ++n)\r
- read_next_packet();\r
- \r
- graph_->set_color("seek", diagnostics::color(0.5f, 1.0f, 0.5f)); \r
- graph_->set_color("buffer-count", diagnostics::color(0.2f, 0.8f, 1.0f));\r
- graph_->set_color("buffer-size", diagnostics::color(0.2f, 0.4f, 1.0f)); \r
-\r
- thread_ = boost::thread([this]{run();});\r
- }\r
+ tbb::atomic<size_t> nb_frames_;\r
+ tbb::atomic<size_t> nb_loops_; \r
+ tbb::atomic<size_t> packets_count_;\r
+ tbb::atomic<size_t> packets_size_;\r
\r
- ~implementation()\r
- {\r
- is_running_ = false;\r
- buffer_cond_.notify_all();\r
- thread_.join();\r
- }\r
+ bool stop_;\r
\r
- bool try_pop(std::shared_ptr<AVPacket>& packet)\r
- {\r
- const bool result = buffer_.try_pop(packet);\r
-\r
- if(result)\r
- {\r
- if(packet)\r
- buffer_size_ -= packet->size;\r
- buffer_cond_.notify_all();\r
- }\r
-\r
- graph_->update_value("buffer-size", MAX_BUFFER_SIZE/static_cast<double>(buffer_size_));\r
- graph_->update_value("buffer-count", MAX_BUFFER_COUNT/static_cast<double>(buffer_.size()));\r
-\r
- return result;\r
- }\r
-\r
- size_t nb_frames() const\r
- {\r
- return nb_frames_;\r
+public:\r
+ explicit implementation(input::target_t& target,\r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start,\r
+ size_t length)\r
+ : target_(target)\r
+ , filename_(filename)\r
+ , format_context_(open_input(filename)) \r
+ , default_stream_index_(av_find_default_stream_index(format_context_.get()))\r
+ , streams_(format_context_->streams, format_context_->streams + format_context_->nb_streams)\r
+ , graph_(graph)\r
+ , loop_(loop)\r
+ , start_(start)\r
+ , length_(length)\r
+ , frame_number_(0)\r
+ , stop_(false)\r
+ { \r
+ packets_count_ = 0;\r
+ packets_size_ = 0;\r
+ nb_frames_ = 0;\r
+ nb_loops_ = 0;\r
+ \r
+ av_dump_format(format_context_.get(), 0, narrow(filename).c_str(), 0);\r
+ \r
+ if(start_ > 0) \r
+ seek_frame(start_);\r
+ \r
+ graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
}\r
-\r
- size_t nb_loops() const\r
+ \r
+ void stop()\r
{\r
- return nb_loops_;\r
+ stop_ = true;\r
+ agent::wait(this);\r
}\r
-\r
-private:\r
\r
- void run()\r
- { \r
- caspar::win32_exception::install_handler();\r
-\r
+ virtual void run()\r
+ {\r
try\r
{\r
- CASPAR_LOG(info) << print() << " Thread Started.";\r
-\r
- while(is_running_)\r
+ while(!stop_)\r
{\r
- {\r
- boost::unique_lock<boost::mutex> lock(buffer_mutex_);\r
- buffer_cond_.wait(lock, [this]{return !full();});\r
- }\r
- read_next_packet(); \r
- }\r
+ auto packet = read_next_packet();\r
+ if(!packet)\r
+ break;\r
\r
- CASPAR_LOG(info) << print() << " Thread Stopped.";\r
+ Concurrency::asend(target_, make_safe_ptr(packet));\r
+ Concurrency::wait(40);\r
+ }\r
}\r
catch(...)\r
{\r
CASPAR_LOG_CURRENT_EXCEPTION();\r
- is_running_ = false;\r
- }\r
+ } \r
+ \r
+ BOOST_FOREACH(auto stream, streams_)\r
+ Concurrency::send(target_, eof_packet(stream->index)); \r
+\r
+ done();\r
}\r
- \r
- void read_next_packet()\r
- { \r
- int ret = 0;\r
\r
- std::shared_ptr<AVPacket> read_packet(new AVPacket, [](AVPacket* p)\r
+ std::shared_ptr<AVPacket> read_next_packet()\r
+ { \r
+ auto packet = create_packet();\r
+ \r
+ int ret = [&]() -> int\r
{\r
- av_free_packet(p);\r
- delete p;\r
- });\r
- av_init_packet(read_packet.get());\r
+ Concurrency::scoped_oversubcription_token oversubscribe;\r
+ return av_read_frame(format_context_.get(), packet.get()); // packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
+ }();\r
\r
- ret = av_read_frame(format_context_.get(), read_packet.get()); // read_packet is only valid until next call of av_read_frame. Use av_dup_packet to extend its life. \r
- \r
if(is_eof(ret)) \r
{\r
++nb_loops_;\r
+ frame_number_ = 0;\r
\r
if(loop_)\r
{\r
- seek_frame(start_, AVSEEK_FLAG_BACKWARD);\r
- graph_->add_tag("seek"); \r
- CASPAR_LOG(trace) << print() << " Received EOF. Looping."; \r
+ CASPAR_LOG(trace) << print() << " Looping.";\r
+ seek_frame(start_, AVSEEK_FLAG_BACKWARD); \r
+ return read_next_packet();\r
} \r
else\r
{\r
- is_running_ = false;\r
- CASPAR_LOG(trace) << print() << " Received EOF. Stopping.";\r
+ CASPAR_LOG(trace) << print() << " Stopping.";\r
+ return nullptr;\r
}\r
}\r
- else\r
- { \r
- THROW_ON_ERROR(ret, print(), "av_read_frame");\r
\r
- if(read_packet->stream_index == default_stream_index_ && nb_loops_ == 0)\r
+ THROW_ON_ERROR(ret, print(), "av_read_frame");\r
+\r
+ if(packet->stream_index == default_stream_index_)\r
+ {\r
+ if(nb_loops_ == 0)\r
++nb_frames_;\r
+ ++frame_number_;\r
+ }\r
\r
- THROW_ON_ERROR2(av_dup_packet(read_packet.get()), print());\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
\r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = read_packet->size;\r
- auto data = read_packet->data;\r
-\r
- read_packet = std::shared_ptr<AVPacket>(read_packet.get(), [=](AVPacket*)\r
- {\r
- read_packet->size = size;\r
- read_packet->data = data;\r
- });\r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data; \r
\r
- buffer_.try_push(read_packet);\r
- buffer_size_ += read_packet->size;\r
- \r
- graph_->update_value("buffer-count", MAX_BUFFER_COUNT/static_cast<double>(buffer_.size()));\r
- graph_->update_value("buffer-size", MAX_BUFFER_SIZE/static_cast<double>(buffer_size_));\r
- } \r
- }\r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data;\r
+ --packets_count_;\r
+ });\r
\r
- bool full() const\r
- {\r
- return is_running_ && (buffer_size_ > MAX_BUFFER_SIZE || buffer_.size() > MAX_BUFFER_COUNT);\r
+ ++packets_count_;\r
+ \r
+ return packet;\r
}\r
\r
void seek_frame(int64_t frame, int flags = 0)\r
- { \r
- THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
- buffer_.push(nullptr);\r
+ { \r
+ if(flags == AVSEEK_FLAG_BACKWARD)\r
+ {\r
+ // Fix VP6 seeking\r
+ int vid_stream_index = av_find_best_stream(format_context_.get(), AVMEDIA_TYPE_VIDEO, -1, -1, 0, 0);\r
+ if(vid_stream_index >= 0)\r
+ {\r
+ auto codec_id = format_context_->streams[vid_stream_index]->codec->codec_id;\r
+ if(codec_id == CODEC_ID_VP6A || codec_id == CODEC_ID_VP6F || codec_id == CODEC_ID_VP6)\r
+ flags |= AVSEEK_FLAG_BYTE;\r
+ }\r
+ }\r
+\r
+ THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
+ auto packet = create_packet();\r
+ packet->size = 0;\r
+\r
+ BOOST_FOREACH(auto stream, streams_)\r
+ Concurrency::asend(target_, loop_packet(stream->index)); \r
+\r
+ graph_->add_tag("seek"); \r
} \r
\r
bool is_eof(int ret)\r
if(ret == AVERROR_EOF)\r
CASPAR_LOG(trace) << print() << " Received EOF. " << nb_frames_;\r
\r
- return ret == AVERROR_EOF || ret == AVERROR(EIO); // av_read_frame doesn't always correctly return AVERROR_EOF;\r
+ CASPAR_VERIFY(ret >= 0 || ret == AVERROR_EOF || ret == AVERROR(EIO), ffmpeg_error() << source_info(narrow(print())));\r
+\r
+ return ret == AVERROR_EOF || ret == AVERROR(EIO) || frame_number_ >= length_; // av_read_frame doesn't always correctly return AVERROR_EOF;\r
}\r
\r
std::wstring print() const\r
}\r
};\r
\r
-input::input(const safe_ptr<diagnostics::graph>& graph, const std::wstring& filename, bool loop, int start, int length) \r
- : impl_(new implementation(graph, filename, loop, start)){}\r
-bool input::eof() const {return !impl_->is_running_;}\r
-bool input::try_pop(std::shared_ptr<AVPacket>& packet){return impl_->try_pop(packet);}\r
-safe_ptr<AVFormatContext> input::context(){return make_safe(impl_->format_context_);}\r
-size_t input::nb_frames() const {return impl_->nb_frames();}\r
-size_t input::nb_loops() const {return impl_->nb_loops();}\r
-}
\ No newline at end of file
+input::input(input::target_t& target,\r
+ const safe_ptr<diagnostics::graph>& graph, \r
+ const std::wstring& filename, \r
+ bool loop, \r
+ size_t start, \r
+ size_t length)\r
+ : impl_(new implementation(target, graph, filename, loop, start, length))\r
+{\r
+}\r
+\r
+safe_ptr<AVFormatContext> input::context()\r
+{\r
+ return safe_ptr<AVFormatContext>(impl_->format_context_);\r
+}\r
+\r
+size_t input::nb_frames() const\r
+{\r
+ return impl_->nb_frames_;\r
+}\r
+\r
+size_t input::nb_loops() const \r
+{\r
+ return impl_->nb_loops_;\r
+}\r
+\r
+void input::start()\r
+{\r
+ impl_->start();\r
+}\r
+\r
+void input::stop()\r
+{\r
+ impl_->stop();\r
+}\r
+\r
+}}
\ No newline at end of file