HTTPD::HTTPD()
{
global_metrics.add("num_connected_clients", &metric_num_connected_clients, Metrics::TYPE_GAUGE);
+ global_metrics.add("num_connected_multicam_clients", &metric_num_connected_multicam_clients, Metrics::TYPE_GAUGE);
}
HTTPD::~HTTPD()
void HTTPD::add_data(StreamType stream_type, const char *buf, size_t size, bool keyframe, int64_t time, AVRational timebase)
{
- unique_lock<mutex> lock(streams_mutex);
+ lock_guard<mutex> lock(streams_mutex);
for (Stream *stream : streams) {
if (stream->get_stream_type() == stream_type) {
stream->add_data(buf, size, keyframe ? Stream::DATA_TYPE_KEYFRAME : Stream::DATA_TYPE_OTHER, time, timebase);
HTTPD::Stream *stream = new HTTPD::Stream(this, framing, stream_type);
stream->add_data(header[stream_type].data(), header[stream_type].size(), Stream::DATA_TYPE_HEADER, AV_NOPTS_VALUE, AVRational{ 1, 0 });
{
- unique_lock<mutex> lock(streams_mutex);
+ lock_guard<mutex> lock(streams_mutex);
streams.insert(stream);
}
++metric_num_connected_clients;
+ if (stream_type == HTTPD::StreamType::MULTICAM_STREAM) {
+ ++metric_num_connected_multicam_clients;
+ }
*con_cls = stream;
// Does not strictly have to be equal to MUX_BUFFER_SIZE.
{
HTTPD::Stream *stream = (HTTPD::Stream *)cls;
HTTPD *httpd = stream->get_parent();
+ if (stream->get_stream_type() == HTTPD::StreamType::MULTICAM_STREAM) {
+ --httpd->metric_num_connected_multicam_clients;
+ }
{
- unique_lock<mutex> lock(httpd->streams_mutex);
+ lock_guard<mutex> lock(httpd->streams_mutex);
delete stream;
httpd->streams.erase(stream);
}
unique_lock<mutex> lock(buffer_mutex);
has_buffered_data.wait(lock, [this] { return should_quit || !buffered_data.empty(); });
if (should_quit) {
- return 0;
+ return -1;
}
ssize_t ret = 0;
buf += len;
ret += len;
max -= len;
+ buffered_data_bytes -= s.size();
buffered_data.pop_front();
used_of_buffered_data = 0;
} else {
void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type, int64_t time, AVRational timebase)
{
- if (buf_size == 0) {
+ if (buf_size == 0 || should_quit) {
return;
}
if (data_type == DATA_TYPE_KEYFRAME) {
return;
}
- unique_lock<mutex> lock(buffer_mutex);
+ lock_guard<mutex> lock(buffer_mutex);
+
+ if (buffered_data_bytes + buf_size > (1ULL << 30)) {
+ // More than 1GB of backlog; the client obviously isn't keeping up,
+ // so kill it instead of going out of memory. Note that this
+ // won't kill the client immediately, but will cause the next callback
+ // to kill the client.
+ should_quit = true;
+ buffered_data.clear();
+ has_buffered_data.notify_all();
+ return;
+ }
if (framing == FRAMING_METACUBE) {
int flags = 0;
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ buffered_data_bytes += sizeof(hdr) + sizeof(packet);
}
metacube2_block_header hdr;
hdr.flags = htons(flags);
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+ buffered_data_bytes += sizeof(hdr);
}
buffered_data.emplace_back(buf, buf_size);
+ buffered_data_bytes += buf_size;
// Send a Metacube2 timestamp every keyframe.
if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) {
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ buffered_data_bytes += sizeof(hdr) + sizeof(packet);
}
has_buffered_data.notify_all();
void HTTPD::Stream::stop()
{
- unique_lock<mutex> lock(buffer_mutex);
+ lock_guard<mutex> lock(buffer_mutex);
should_quit = true;
has_buffered_data.notify_all();
}