- Fixed streaming_consumer sometimes crashing the server during destruction.
return consumer_->has_synchronization_clock();\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
return consumer_->buffer_depth();\r
}\r
virtual int64_t presentation_frame_age_millis() const { return 0; }\r
virtual std::wstring print() const override {return L"empty";}\r
virtual bool has_synchronization_clock() const override {return false;}\r
- virtual size_t buffer_depth() const override {return 0;};\r
+ virtual int buffer_depth() const override {return 0;};\r
virtual int index() const{return -1;}\r
virtual boost::property_tree::wptree info() const override\r
{\r
virtual std::wstring print() const = 0;\r
virtual boost::property_tree::wptree info() const = 0;\r
virtual bool has_synchronization_clock() const {return true;}\r
- virtual size_t buffer_depth() const = 0;\r
+ virtual int buffer_depth() const = 0; // -1 to not participate in frame presentation synchronization\r
virtual int index() const = 0;\r
\r
static const safe_ptr<frame_consumer>& empty();\r
});\r
}\r
\r
- std::map<int, size_t> buffer_depths_snapshot() const\r
+ std::map<int, int> buffer_depths_snapshot() const\r
{\r
- std::map<int, size_t> result;\r
+ std::map<int, int> result;\r
\r
BOOST_FOREACH(auto& consumer, consumers_)\r
result.insert(std::make_pair(\r
consumer.first,\r
consumer.second->buffer_depth()));\r
\r
- return std::move(result);\r
+ return result;\r
}\r
\r
- std::pair<size_t, size_t> minmax_buffer_depth(\r
- const std::map<int, size_t>& buffer_depths) const\r
+ std::pair<int, int> minmax_buffer_depth(\r
+ const std::map<int, int>& buffer_depths) const\r
{ \r
if(consumers_.empty())\r
return std::make_pair(0, 0);\r
\r
- auto depths = buffer_depths | boost::adaptors::map_values; \r
+ auto depths = buffer_depths\r
+ | boost::adaptors::map_values\r
+ | boost::adaptors::filtered([](int v) { return v >= 0; });\r
+\r
+ if (depths.empty())\r
+ return std::make_pair(0, 0);\r
\r
return std::make_pair(\r
*boost::range::min_element(depths),\r
for (auto it = consumers_.begin(); it != consumers_.end();)\r
{\r
auto consumer = it->second;\r
- auto frame = frames_.at(buffer_depths[it->first]-minmax.first);\r
+ auto depth = buffer_depths[it->first];\r
+ auto frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first);\r
\r
send_to_consumers_delays_[it->first] = frame->get_age_millis();\r
\r
for (auto result_it = send_results.begin(); result_it != send_results.end(); ++result_it)\r
{\r
auto consumer = consumers_.at(result_it->first);\r
- auto frame = frames_.at(buffer_depths[result_it->first]-minmax.first);\r
+ auto depth = buffer_depths[result_it->first];\r
+ auto frame = depth < 0 ? frames_.back() : frames_.at(depth - minmax.first);\r
auto& result_future = result_it->second;\r
\r
try\r
return false;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
- return 1;\r
+ return -1;\r
}\r
\r
virtual int index() const override\r
return info;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
return 1;\r
}\r
return info;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
return config_.buffer_depth();\r
}\r
{\r
}\r
\r
- size_t buffer_depth() const\r
+ int buffer_depth() const\r
{\r
return base_buffer_depth + (latency == low_latency ? 0 : 1) + (embedded_audio ? 1 : 0);\r
}\r
return false;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
- return 1;\r
+ return -1;\r
}\r
\r
virtual int index() const override\r
, video_pts_(0)
, audio_pts_(0)
, executor_(print())
- , audio_encoder_executor_(print() + L" video_encoder")
- , video_encoder_executor_(print() + L" audio_encoder")
+ , audio_encoder_executor_(print() + L" audio_encoder")
+ , video_encoder_executor_(print() + L" video_encoder")
, write_executor_(print() + L" io")
{
abort_request_ = false;
{
if(oc_)
{
- encode_video(nullptr, nullptr);
- encode_audio(nullptr, nullptr);
+ video_encoder_executor_.begin_invoke([&] { encode_video(nullptr, nullptr); });
+ audio_encoder_executor_.begin_invoke([&] { encode_audio(nullptr, nullptr); });
+
+ video_encoder_executor_.stop();
+ audio_encoder_executor_.stop();
+ video_encoder_executor_.join();
+ audio_encoder_executor_.join();
video_graph_.reset();
audio_graph_.reset();
-
- video_encoder_executor_.wait();
- audio_encoder_executor_.wait();
-
video_st_.reset();
audio_st_.reset();
write_packet(nullptr, nullptr);
- write_executor_.wait();
-
+ write_executor_.stop();
+ write_executor_.join();
+
FF(av_write_trailer(oc_.get()));
-
+
if (!(oc_->oformat->flags & AVFMT_NOFILE) && oc_->pb)
avio_close(oc_->pb);
std::wstring print() const override
{
- return L"ffmpeg_consumer[" + widen(path_.string()) + L"]";
+ return L"streaming_consumer[" + widen(path_.string()) + L"]";
}
virtual boost::property_tree::wptree info() const override
return false;
}
- size_t buffer_depth() const override
+ int buffer_depth() const override
{
- return 0;
+ return -1;
}
int index() const override
in_video_format_.width,
in_video_format_.height,
1));
- }
- FF(av_buffersrc_add_frame(
- video_graph_in_,
- src_av_frame.get()));
+ FF(av_buffersrc_add_frame(
+ video_graph_in_,
+ src_av_frame.get()));
+ }
int ret = 0;
src_av_frame->nb_samples,
static_cast<AVSampleFormat>(src_av_frame->format),
16));
- }
- FF(av_buffersrc_add_frame(
- audio_graph_in_,
- src_av_frame.get()));
+ FF(av_buffersrc_add_frame(
+ audio_graph_in_,
+ src_av_frame.get()));
+ }
int ret = 0;
return info;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
- return 0;\r
+ return -1;\r
}\r
\r
virtual int index() const override\r
return info;
}
- virtual size_t buffer_depth() const override
+ virtual int buffer_depth() const override
{
- return 0;
+ return -1;
}
virtual int index() const override
return info;
}
- virtual size_t buffer_depth() const override
+ virtual int buffer_depth() const override
{
return 6;
}
return false;\r
}\r
\r
- virtual size_t buffer_depth() const override\r
+ virtual int buffer_depth() const override\r
{\r
return 1;\r
}\r