const StreamConfig &stream_config = config.streams[i];
if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+ } else {
+ servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
}
expecting_stream_ids.erase(stream_config.stream_id);
streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
}
+void Server::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+ MutexLock lock(&mutex);
+ assert(streams.count(stream_id) != 0);
+ streams[stream_id]->set_backlog_size(new_size);
+}
+
void Server::set_header(const string &stream_id, const string &header)
{
MutexLock lock(&mutex);
++queued_it) {
Stream *stream = find_stream(queued_it->first);
stream->add_data(queued_it->second.data(), queued_it->second.size());
+ stream->wake_up_all_clients();
}
queued_data.clear();
}
void add_client_from_serialized(const ClientProto &client);
void add_stream(const std::string &stream_id, size_t bytes_received);
void add_stream_from_serialized(const StreamProto &stream);
+ void set_backlog_size(const std::string &stream_id, size_t new_size);
private:
// Mutex protecting queued_data only. Note that if you want to hold both this
servers[i].set_mark_pool(stream_id, mark_pool);
}
}
+
+void ServerPool::set_backlog_size(const std::string &stream_id, size_t new_size)
+{
+ for (int i = 0; i < num_servers; ++i) {
+ servers[i].set_backlog_size(stream_id, new_size);
+ }
+}
// Connects the given stream to the given mark pool for all the servers.
void set_mark_pool(const std::string &stream_id, MarkPool *mark_pool);
+ // Changes the given stream's backlog size on all the servers.
+ void set_backlog_size(const std::string &stream_id, size_t new_size);
+
// Starts all the servers.
void run();
data_fd = -1;
return serialized;
}
+
+void Stream::set_backlog_size(size_t new_size)
+{
+ if (backlog_size == new_size) {
+ return;
+ }
+
+ string existing_data;
+ if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd.
+ exit(1);
+ }
+
+ // Unwrap the data so it's no longer circular.
+ if (bytes_received <= backlog_size) {
+ existing_data.resize(bytes_received);
+ } else {
+ size_t pos = bytes_received % backlog_size;
+ existing_data = existing_data.substr(pos, string::npos) +
+ existing_data.substr(0, pos);
+ }
+
+ // See if we need to discard data.
+ if (new_size < existing_data.size()) {
+ size_t to_discard = existing_data.size() - new_size;
+ existing_data = existing_data.substr(to_discard, string::npos);
+ }
+
+ // Create a new, empty data file.
+ data_fd = make_tempfile("");
+ backlog_size = new_size;
+
+ // Now cheat a bit by rewinding, and adding all the old data back.
+ bytes_received -= existing_data.size();
+ add_data(existing_data.data(), existing_data.size());
+}
void Stream::put_client_to_sleep(Client *client)
{
data += ret;
bytes -= ret;
}
-
- wake_up_all_clients();
}
void Stream::wake_up_all_clients()
Stream(const StreamProto &serialized);
StreamProto serialize();
+ // Changes the backlog size, restructuring the data as needed.
+ void set_backlog_size(size_t new_size);
+
std::string stream_id;
// The HTTP response header, plus the video stream header (if any).
// in the list of clients to wake up when we do.
void put_client_to_sleep(Client *client);
- // Add more input data to the stream, and wake up all clients that are sleeping.
+ // Add more input data to the stream. You should probably call wake_up_all_clients()
+ // after that.
void add_data(const char *data, ssize_t bytes);
-private:
- Stream(const Stream& other);
-
// We have more data, so mark all clients that are sleeping as ready to go.
void wake_up_all_clients();
+
+private:
+ Stream(const Stream& other);
};
#endif // !defined(_STREAM_H)