\r
namespace caspar { namespace ffmpeg {\r
\r
-static const size_t MAX_BUFFER_COUNT = 32;\r
+static const size_t MAX_TOKENS = 32;\r
\r
struct input::implementation : public Concurrency::agent, boost::noncopyable\r
{\r
tbb::atomic<size_t> nb_frames_;\r
tbb::atomic<size_t> nb_loops_; \r
tbb::atomic<size_t> packets_count_;\r
+ tbb::atomic<size_t> packets_size_;\r
\r
bool stop_;\r
- \r
+ \r
public:\r
explicit implementation(input::target_t& target,\r
const safe_ptr<diagnostics::graph>& graph, \r
, stop_(false)\r
{ \r
packets_count_ = 0;\r
+ packets_size_ = 0;\r
nb_frames_ = 0;\r
nb_loops_ = 0;\r
\r
seek_frame(start_);\r
\r
graph_->set_color("seek", diagnostics::color(1.0f, 0.5f, 0.0f));\r
- graph_->set_color("buffer-count", diagnostics::color(0.7f, 0.4f, 0.4f));\r
- graph_->set_color("buffer-size", diagnostics::color(1.0f, 1.0f, 0.0f)); \r
}\r
-\r
- ~implementation()\r
+ \r
+ void stop()\r
{\r
+ stop_ = true;\r
agent::wait(this);\r
}\r
\r
{\r
try\r
{\r
- while(!stop_ && read_next_packet())\r
+ while(!stop_)\r
{\r
+ auto packet = read_next_packet();\r
+ if(!packet)\r
+ break;\r
+\r
+ Concurrency::asend(target_, make_safe_ptr(packet));\r
+ Concurrency::wait(40);\r
}\r
}\r
catch(...)\r
\r
BOOST_FOREACH(auto stream, streams_)\r
Concurrency::send(target_, eof_packet(stream->index)); \r
- \r
+\r
done();\r
}\r
\r
- bool read_next_packet()\r
+ std::shared_ptr<AVPacket> read_next_packet()\r
{ \r
auto packet = create_packet();\r
\r
if(loop_)\r
{\r
CASPAR_LOG(trace) << print() << " Looping.";\r
- seek_frame(start_, AVSEEK_FLAG_BACKWARD); \r
+ seek_frame(start_, AVSEEK_FLAG_BACKWARD); \r
+ return read_next_packet();\r
} \r
else\r
{\r
CASPAR_LOG(trace) << print() << " Stopping.";\r
- return false;\r
+ return nullptr;\r
}\r
}\r
- else\r
- { \r
- THROW_ON_ERROR(ret, print(), "av_read_frame");\r
-\r
- if(packet->stream_index == default_stream_index_)\r
- {\r
- if(nb_loops_ == 0)\r
- ++nb_frames_;\r
- ++frame_number_;\r
- }\r
\r
- THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
- \r
- // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
- auto size = packet->size;\r
- auto data = packet->data; \r
+ THROW_ON_ERROR(ret, print(), "av_read_frame");\r
\r
- packet = std::shared_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
- {\r
- packet->size = size;\r
- packet->data = data;\r
- --packets_count_;\r
- graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT); \r
- });\r
+ if(packet->stream_index == default_stream_index_)\r
+ {\r
+ if(nb_loops_ == 0)\r
+ ++nb_frames_;\r
+ ++frame_number_;\r
+ }\r
\r
- ++packets_count_;\r
+ THROW_ON_ERROR2(av_dup_packet(packet.get()), print());\r
+ \r
+ // Make sure that the packet is correctly deallocated even if size and data is modified during decoding.\r
+ auto size = packet->size;\r
+ auto data = packet->data; \r
\r
- Concurrency::asend(target_, packet);\r
- \r
- graph_->update_value("buffer-count", (static_cast<double>(packets_count_)+0.001)/MAX_BUFFER_COUNT);\r
- } \r
+ packet = safe_ptr<AVPacket>(packet.get(), [=](AVPacket*)\r
+ {\r
+ packet->size = size;\r
+ packet->data = data;\r
+ --packets_count_;\r
+ });\r
\r
- return true;\r
+ ++packets_count_;\r
+ \r
+ return packet;\r
}\r
\r
void seek_frame(int64_t frame, int flags = 0)\r
\r
THROW_ON_ERROR2(av_seek_frame(format_context_.get(), default_stream_index_, frame, flags), print()); \r
auto packet = create_packet();\r
- packet->size = 0; \r
+ packet->size = 0;\r
\r
BOOST_FOREACH(auto stream, streams_)\r
- Concurrency::send(target_, loop_packet(stream->index)); \r
+ Concurrency::asend(target_, loop_packet(stream->index)); \r
\r
graph_->add_tag("seek"); \r
} \r
\r
void input::stop()\r
{\r
- impl_->stop_ = true;\r
+ impl_->stop();\r
}\r
\r
}}
\ No newline at end of file