unique_lock<mutex> lock(buffer_mutex);
has_buffered_data.wait(lock, [this] { return should_quit || !buffered_data.empty(); });
if (should_quit) {
- return 0;
+ return -1;
}
ssize_t ret = 0;
buf += len;
ret += len;
max -= len;
+ buffered_data_bytes -= s.size();
buffered_data.pop_front();
used_of_buffered_data = 0;
} else {
void HTTPD::Stream::add_data(const char *buf, size_t buf_size, HTTPD::Stream::DataType data_type, int64_t time, AVRational timebase)
{
- if (buf_size == 0) {
+ if (buf_size == 0 || should_quit) {
return;
}
if (data_type == DATA_TYPE_KEYFRAME) {
lock_guard<mutex> lock(buffer_mutex);
+ if (buffered_data_bytes + buf_size > (1ULL << 30)) {
+ // More than 1GB of backlog; the client obviously isn't keeping up,
+ // so kill it instead of going out of memory. Note that this
+ // won't kill the client immediately, but will cause the next callback
+ // to kill the client.
+ should_quit = true;
+ buffered_data.clear();
+ has_buffered_data.notify_all();
+ return;
+ }
+
if (framing == FRAMING_METACUBE) {
int flags = 0;
if (data_type == DATA_TYPE_HEADER) {
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ buffered_data_bytes += sizeof(hdr) + sizeof(packet);
}
metacube2_block_header hdr;
hdr.flags = htons(flags);
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
+ buffered_data_bytes += sizeof(hdr);
}
buffered_data.emplace_back(buf, buf_size);
+ buffered_data_bytes += buf_size;
// Send a Metacube2 timestamp every keyframe.
if (framing == FRAMING_METACUBE && data_type == DATA_TYPE_KEYFRAME) {
hdr.csum = htons(metacube2_compute_crc(&hdr));
buffered_data.emplace_back((char *)&hdr, sizeof(hdr));
buffered_data.emplace_back((char *)&packet, sizeof(packet));
+ buffered_data_bytes += sizeof(hdr) + sizeof(packet);
}
has_buffered_data.notify_all();
bool should_quit = false; // Under <buffer_mutex>.
std::condition_variable has_buffered_data;
std::deque<std::string> buffered_data; // Protected by <buffer_mutex>.
- size_t used_of_buffered_data = 0; // How many bytes of the first element of <buffered_data> that is already used. Protected by <mutex>.
+ size_t used_of_buffered_data = 0; // How many bytes of the first element of <buffered_data> that is already used. Protected by <buffer_mutex>.
+ size_t buffered_data_bytes = 0; // The sum of all size() in buffered_data. Protected by <buffer_mutex>.
size_t seen_keyframe = false;
StreamType stream_type;
};