\r
#include "input.h"\r
\r
-#include "../../frame/frame_format.h"\r
-#include "../../../common/utility/memory.h"\r
-#include "../../../common/utility/scope_exit.h"\r
+#include "../../format/video_format.h"\r
+\r
+#include <common/concurrency/executor.h>\r
\r
#include <tbb/concurrent_queue.h>\r
#include <tbb/queuing_mutex.h>\r
\r
-#include <boost/thread.hpp>\r
+#include <boost/exception/error_info.hpp>\r
+#include <boost/thread/once.hpp>\r
\r
#include <errno.h>\r
#include <system_error>\r
-\r
-#pragma warning(disable : 4482)\r
\r
+#if defined(_MSC_VER)\r
+#pragma warning (disable : 4244)\r
+#endif\r
+\r
+\r
+extern "C" \r
+{\r
+ #define __STDC_CONSTANT_MACROS\r
+ #define __STDC_LIMIT_MACROS\r
+ #include <libavformat/avformat.h>\r
+}\r
+\r
namespace caspar { namespace core { namespace ffmpeg{\r
\r
struct input::implementation : boost::noncopyable\r
-{\r
- implementation(const frame_format_desc& format_desc) \r
- : video_frame_rate_(25.0), video_s_index_(-1), audio_s_index_(-1), video_codec_(nullptr), audio_codec_(nullptr), format_desc_(format_desc)\r
- {\r
- loop_ = false;\r
- //file_buffer_size_ = 0; \r
- video_packet_buffer_.set_capacity(25);\r
- audio_packet_buffer_.set_capacity(25);\r
- }\r
+{ \r
+ std::shared_ptr<AVFormatContext> format_context_; // Destroy this last\r
\r
- ~implementation()\r
- { \r
- stop();\r
- }\r
+ std::shared_ptr<AVCodecContext> video_codec_context_;\r
+ std::shared_ptr<AVCodecContext> audio_codex_context_;\r
+\r
+ tbb::queuing_mutex seek_mutex_;\r
+\r
+ const std::wstring filename_;\r
+\r
+ tbb::atomic<bool> loop_;\r
+ int video_s_index_;\r
+ int audio_s_index_;\r
+\r
+ tbb::atomic<size_t> buffer_size_;\r
\r
- void stop()\r
- {\r
- is_running_ = false;\r
- audio_packet_buffer_.clear();\r
- video_packet_buffer_.clear();\r
- //file_buffer_size_ = 0;\r
- //file_buffer_size_cond_.notify_all();\r
- io_thread_.join();\r
- }\r
+ tbb::concurrent_bounded_queue<std::shared_ptr<aligned_buffer>> video_packet_buffer_;\r
+ tbb::concurrent_bounded_queue<std::shared_ptr<aligned_buffer>> audio_packet_buffer_;\r
+ \r
+ executor executor_;\r
\r
- void load(const std::string& filename)\r
- { \r
- try\r
- {\r
- int errn;\r
- AVFormatContext* weak_format_context_;\r
- if((errn = -av_open_input_file(&weak_format_context_, filename.c_str(), nullptr, 0, nullptr)) > 0)\r
- BOOST_THROW_EXCEPTION(file_read_error() << msg_info("No video or audio codec found."));\r
- format_context_.reset(weak_format_context_, av_close_input_file);\r
- \r
- if((errn = -av_find_stream_info(format_context_.get())) > 0)\r
- throw std::runtime_error("File read error");\r
+ static const size_t BUFFER_SIZE = 2 << 25;\r
\r
- video_codec_context_ = open_video_stream();\r
- if(!video_codec_context_)\r
- CASPAR_LOG(warning) << "No video stream found.";\r
+public:\r
+ explicit implementation(const std::wstring& filename) \r
+ : video_s_index_(-1)\r
+ , audio_s_index_(-1)\r
+ , filename_(filename)\r
+ { \r
+ static boost::once_flag av_register_all_flag = BOOST_ONCE_INIT;\r
+ boost::call_once(av_register_all, av_register_all_flag); \r
\r
- audio_codex_context_ = open_audio_stream();\r
- if(!audio_codex_context_)\r
- CASPAR_LOG(warning) << "No audio stream found.";\r
+ static boost::once_flag avcodec_init_flag = BOOST_ONCE_INIT;\r
+ boost::call_once(avcodec_init, avcodec_init_flag); \r
\r
- if(!video_codec_context_ && !audio_codex_context_)\r
- BOOST_THROW_EXCEPTION(file_read_error() << msg_info("No video or audio codec found."));\r
+ loop_ = false; \r
+ \r
+ int errn;\r
+ AVFormatContext* weak_format_context_;\r
+ if((errn = -av_open_input_file(&weak_format_context_, narrow(filename).c_str(), nullptr, 0, nullptr)) > 0)\r
+ BOOST_THROW_EXCEPTION(\r
+ file_read_error() << \r
+ msg_info("No format context found.") << \r
+ boost::errinfo_api_function("av_open_input_file") <<\r
+ boost::errinfo_errno(errn) <<\r
+ boost::errinfo_file_name(narrow(filename)));\r
+\r
+ format_context_.reset(weak_format_context_, av_close_input_file);\r
\r
- video_frame_rate_ = static_cast<double>(video_codec_context_->time_base.den) / static_cast<double>(video_codec_context_->time_base.num); \r
- }\r
- catch(...)\r
- {\r
- video_codec_context_.reset();\r
- audio_codex_context_.reset();\r
- format_context_.reset();\r
- video_frame_rate_ = 25.0;\r
- video_s_index_ = -1;\r
- audio_s_index_ = -1; \r
- throw;\r
- }\r
- filename_ = filename;\r
+ if((errn = -av_find_stream_info(format_context_.get())) > 0)\r
+ BOOST_THROW_EXCEPTION(\r
+ file_read_error() << \r
+ boost::errinfo_api_function("av_find_stream_info") <<\r
+ msg_info("No stream found.") << \r
+ boost::errinfo_errno(errn));\r
+\r
+ video_codec_context_ = open_stream(CODEC_TYPE_VIDEO, video_s_index_);\r
+ if(!video_codec_context_)\r
+ CASPAR_LOG(warning) << "Could not open any video stream.";\r
+ \r
+ audio_codex_context_ = open_stream(CODEC_TYPE_AUDIO, audio_s_index_);\r
+ if(!audio_codex_context_)\r
+ CASPAR_LOG(warning) << "Could not open any audio stream.";\r
+\r
+ if(!video_codec_context_ && !audio_codex_context_)\r
+ BOOST_THROW_EXCEPTION(file_read_error() << msg_info("No video or audio codec context found.")); \r
+ \r
+ executor_.start();\r
+ executor_.begin_invoke([this]{read_file();});\r
+ CASPAR_LOG(info) << print() << " started.";\r
}\r
\r
- void start()\r
+ ~implementation()\r
{\r
- io_thread_ = boost::thread([=]{read_file();});\r
+ CASPAR_LOG(info) << print() << " ended.";\r
}\r
- \r
- std::shared_ptr<AVCodecContext> open_video_stream()\r
+ \r
+ std::shared_ptr<AVCodecContext> open_stream(int codec_type, int& s_index)\r
{ \r
AVStream** streams_end = format_context_->streams+format_context_->nb_streams;\r
- AVStream** video_stream = std::find_if(format_context_->streams, streams_end, \r
- [](AVStream* stream) { return stream != nullptr && stream->codec->codec_type == CODEC_TYPE_VIDEO ;});\r
-\r
- video_s_index_ = video_stream != streams_end ? (*video_stream)->index : -1;\r
- if(video_s_index_ == -1) \r
- return nullptr;\r
+ AVStream** stream = std::find_if(format_context_->streams, streams_end, \r
+ [&](AVStream* stream) { return stream != nullptr && stream->codec->codec_type == codec_type ;});\r
\r
- video_codec_ = avcodec_find_decoder((*video_stream)->codec->codec_id); \r
- if(video_codec_ == nullptr)\r
+ if(stream == streams_end) \r
return nullptr;\r
- \r
- if((-avcodec_open((*video_stream)->codec, video_codec_)) > 0) \r
- return nullptr;\r
-\r
- return std::shared_ptr<AVCodecContext>((*video_stream)->codec, avcodec_close);\r
- }\r
-\r
- std::shared_ptr<AVCodecContext> open_audio_stream()\r
- { \r
- AVStream** streams_end = format_context_->streams+format_context_->nb_streams;\r
- AVStream** audio_stream = std::find_if(format_context_->streams, streams_end, \r
- [](AVStream* stream) { return stream != nullptr && stream->codec->codec_type == CODEC_TYPE_AUDIO;});\r
\r
- audio_s_index_ = audio_stream != streams_end ? (*audio_stream)->index : -1;\r
- if(audio_s_index_ == -1)\r
- return nullptr;\r
+ s_index = (*stream)->index;\r
\r
- audio_codec_ = avcodec_find_decoder((*audio_stream)->codec->codec_id);\r
- if(audio_codec_ == nullptr)\r
+ auto codec = avcodec_find_decoder((*stream)->codec->codec_id); \r
+ if(codec == nullptr)\r
return nullptr;\r
-\r
- if((-avcodec_open((*audio_stream)->codec, audio_codec_)) > 0) \r
+ \r
+ if((-avcodec_open((*stream)->codec, codec)) > 0) \r
return nullptr;\r
\r
- return std::shared_ptr<AVCodecContext>((*audio_stream)->codec, avcodec_close);\r
- } \r
-\r
- void read_file()\r
- { \r
- CASPAR_LOG(info) << "Started ffmpeg_producer::read_file Thread for " << filename_.c_str();\r
- win32_exception::install_handler();\r
+ return std::shared_ptr<AVCodecContext>((*stream)->codec, avcodec_close);\r
+ }\r
\r
- is_running_ = true;\r
- AVPacket tmp_packet;\r
- while(is_running_)\r
+ void read_file() // For every packet taken: read in a number of packets.\r
+ { \r
+ for(size_t n = 0; buffer_size_ < BUFFER_SIZE && (n < 3 || video_packet_buffer_.size() < 3 || audio_packet_buffer_.size() < 3) && executor_.is_running(); ++n)\r
{\r
- std::shared_ptr<AVPacket> packet(&tmp_packet, av_free_packet); \r
+ AVPacket tmp_packet;\r
+ safe_ptr<AVPacket> read_packet(&tmp_packet, av_free_packet); \r
tbb::queuing_mutex::scoped_lock lock(seek_mutex_); \r
\r
- if (av_read_frame(format_context_.get(), packet.get()) >= 0) // NOTE: Packet is only valid until next call of av_read_frame or av_close_input_file\r
+ if (av_read_frame(format_context_.get(), read_packet.get()) >= 0) // NOTE: read_packet is only valid until next call of av_safe_ptr<read_frame> or av_close_input_file\r
{\r
- if(packet->stream_index == video_s_index_) \r
+ auto packet = std::make_shared<aligned_buffer>(read_packet->data, read_packet->data + read_packet->size);\r
+ if(read_packet->stream_index == video_s_index_) \r
{\r
- video_packet_buffer_.push(std::make_shared<video_packet>(packet, format_desc_, video_codec_context_.get(), video_codec_)); // NOTE: video_packet makes a copy of AVPacket\r
- //file_buffer_size_ += packet->size;\r
+ buffer_size_ += packet->size();\r
+ video_packet_buffer_.try_push(std::move(packet)); \r
}\r
- else if(packet->stream_index == audio_s_index_) \r
+ else if(read_packet->stream_index == audio_s_index_) \r
{\r
- audio_packet_buffer_.push(std::make_shared<audio_packet>(packet, audio_codex_context_.get(), audio_codec_, video_frame_rate_)); \r
- //file_buffer_size_ += packet->size;\r
+ buffer_size_ += packet->size();\r
+ audio_packet_buffer_.try_push(std::move(packet));\r
}\r
}\r
else if(!loop_ || av_seek_frame(format_context_.get(), -1, 0, AVSEEK_FLAG_BACKWARD) < 0) // TODO: av_seek_frame does not work for all formats\r
- is_running_ = false;\r
- \r
- //if(is_running_)\r
- //{\r
- // boost::unique_lock<boost::mutex> lock(file_buffer_size_mutex_);\r
- // while(file_buffer_size_ > 32*1000000)\r
- // file_buffer_size_cond_.wait(lock); \r
- //}\r
+ executor_.stop(executor::no_wait);\r
}\r
- \r
- is_running_ = false;\r
- \r
- CASPAR_LOG(info) << " Ended ffmpeg_producer::read_file Thread for " << filename_.c_str();\r
}\r
- \r
- video_packet_ptr get_video_packet()\r
+ \r
+ aligned_buffer get_video_packet()\r
{\r
- video_packet_ptr video_packet;\r
- if(video_packet_buffer_.try_pop(video_packet))\r
- {\r
- //file_buffer_size_ -= video_packet->size;\r
- //file_buffer_size_cond_.notify_all();\r
- }\r
- return video_packet;\r
+ return get_packet(video_packet_buffer_);\r
}\r
\r
- audio_packet_ptr get_audio_packet()\r
+ aligned_buffer get_audio_packet()\r
{\r
- audio_packet_ptr audio_packet;\r
- if(audio_packet_buffer_.try_pop(audio_packet))\r
+ return get_packet(audio_packet_buffer_);\r
+ }\r
+ \r
+ aligned_buffer get_packet(tbb::concurrent_bounded_queue<std::shared_ptr<aligned_buffer>>& buffer)\r
+ {\r
+ std::shared_ptr<aligned_buffer> packet;\r
+ if(buffer.try_pop(packet))\r
{\r
- //file_buffer_size_ -= audio_packet->size;\r
- //file_buffer_size_cond_.notify_all();\r
+ buffer_size_ -= packet->size();\r
+ if(executor_.size() < 4)\r
+ executor_.begin_invoke([this]{read_file();});\r
+ return std::move(*packet);\r
}\r
- return audio_packet;\r
+ return aligned_buffer();\r
}\r
\r
bool is_eof() const\r
{\r
- return !is_running_ && video_packet_buffer_.empty() && audio_packet_buffer_.empty();\r
+ return !executor_.is_running() && video_packet_buffer_.empty() && audio_packet_buffer_.empty();\r
}\r
- \r
+ \r
+ // TODO: Not properly done.\r
bool seek(unsigned long long seek_target)\r
{\r
tbb::queuing_mutex::scoped_lock lock(seek_mutex_);\r
- if(av_seek_frame(format_context_.get(), -1, seek_target*AV_TIME_BASE, 0) >= 0)\r
- {\r
- video_packet_buffer_.clear();\r
- audio_packet_buffer_.clear();\r
- // TODO: Not sure its enough to jsut flush in input class\r
- if(video_codec_context_)\r
- avcodec_flush_buffers(video_codec_context_.get());\r
- if(audio_codex_context_)\r
- avcodec_flush_buffers(audio_codex_context_.get());\r
- return true;\r
- }\r
- \r
- return false;\r
- }\r
- \r
- //int file_buffer_max_size_;\r
- //tbb::atomic<int> file_buffer_size_;\r
- //boost::condition_variable file_buffer_size_cond_;\r
- //boost::mutex file_buffer_size_mutex_;\r
- \r
- tbb::queuing_mutex seek_mutex_;\r
+ if(av_seek_frame(format_context_.get(), -1, seek_target*AV_TIME_BASE, 0) < 0)\r
+ return false;\r
\r
- std::string filename_;\r
- std::shared_ptr<AVFormatContext> format_context_; // Destroy this last\r
-\r
- std::shared_ptr<AVCodecContext> video_codec_context_;\r
- AVCodec* video_codec_;\r
-\r
- std::shared_ptr<AVCodecContext> audio_codex_context_;\r
- AVCodec* audio_codec_;\r
-\r
- tbb::atomic<bool> loop_;\r
- int video_s_index_;\r
- int audio_s_index_;\r
- \r
- tbb::concurrent_bounded_queue<video_packet_ptr> video_packet_buffer_;\r
- tbb::concurrent_bounded_queue<audio_packet_ptr> audio_packet_buffer_;\r
- boost::thread io_thread_;\r
- tbb::atomic<bool> is_running_;\r
+ return true;\r
+ }\r
\r
- double video_frame_rate_;\r
+ double fps() const\r
+ {\r
+ return static_cast<double>(video_codec_context_->time_base.den) / static_cast<double>(video_codec_context_->time_base.num);\r
+ }\r
\r
- frame_format_desc format_desc_;\r
+ std::wstring print() const\r
+ {\r
+ return L"ffmpeg[" + boost::filesystem::wpath(filename_).filename() + L"] Buffer thread";\r
+ }\r
};\r
\r
-input::input(const frame_format_desc& format_desc) : impl_(new implementation(format_desc)){}\r
-void input::load(const std::string& filename){impl_->load(filename);}\r
+input::input(const std::wstring& filename) : impl_(new implementation(filename)){}\r
void input::set_loop(bool value){impl_->loop_ = value;}\r
const std::shared_ptr<AVCodecContext>& input::get_video_codec_context() const{return impl_->video_codec_context_;}\r
const std::shared_ptr<AVCodecContext>& input::get_audio_codec_context() const{return impl_->audio_codex_context_;}\r
bool input::is_eof() const{return impl_->is_eof();}\r
-video_packet_ptr input::get_video_packet(){return impl_->get_video_packet();}\r
-audio_packet_ptr input::get_audio_packet(){return impl_->get_audio_packet();}\r
+aligned_buffer input::get_video_packet(){return impl_->get_video_packet();}\r
+aligned_buffer input::get_audio_packet(){return impl_->get_audio_packet();}\r
bool input::seek(unsigned long long frame){return impl_->seek(frame);}\r
-void input::start(){impl_->start();}\r
+double input::fps() const { return impl_->fps(); }\r
}}}
\ No newline at end of file