From: Steinar H. Gunderson Date: Thu, 18 Apr 2013 22:41:19 +0000 (+0200) Subject: Send backlog file descriptors around instead of going through the protobuf. Much... X-Git-Tag: 1.0.0~76 X-Git-Url: https://git.sesse.net/?p=cubemap;a=commitdiff_plain;h=534764f026b82b144e974882c8e53c4cd8d21b68 Send backlog file descriptors around instead of going through the protobuf. Much faster restart with big backlog sizes, and does not hit the 64MB protobuf limit. --- diff --git a/main.cpp b/main.cpp index c626f46..c42fd19 100644 --- a/main.cpp +++ b/main.cpp @@ -341,8 +341,21 @@ start: // Deserialize the streams. for (int i = 0; i < loaded_state.streams_size(); ++i) { - servers->add_stream_from_serialized(loaded_state.streams(i)); - deserialized_stream_ids.insert(loaded_state.streams(i).stream_id()); + const StreamProto &stream = loaded_state.streams(i); + + vector data_fds; + for (int j = 0; j < stream.data_fds_size(); ++j) { + data_fds.push_back(stream.data_fds(j)); + } + + // Older versions stored the data once in the protobuf instead of + // sending around file descriptors. + if (data_fds.empty() && stream.has_data()) { + data_fds.push_back(make_tempfile(stream.data())); + } + + servers->add_stream_from_serialized(stream, data_fds); + deserialized_stream_ids.insert(stream.stream_id()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. diff --git a/server.cpp b/server.cpp index 8282a56..640f10a 100644 --- a/server.cpp +++ b/server.cpp @@ -202,10 +202,10 @@ void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::En streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding))); } -void Server::add_stream_from_serialized(const StreamProto &stream) +void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd) { MutexLock lock(&mutex); - streams.insert(make_pair(stream.stream_id(), new Stream(stream))); + streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd))); } void Server::set_backlog_size(const string &stream_id, size_t new_size) diff --git a/server.h b/server.h index 523f5c5..a3e032d 100644 --- a/server.h +++ b/server.h @@ -55,7 +55,7 @@ public: CubemapStateProto serialize(); void add_client_from_serialized(const ClientProto &client); void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding); - void add_stream_from_serialized(const StreamProto &stream); + void add_stream_from_serialized(const StreamProto &stream, int data_fd); void set_backlog_size(const std::string &stream_id, size_t new_size); void set_encoding(const std::string &stream_id, Stream::Encoding encoding); diff --git a/serverpool.cpp b/serverpool.cpp index 60a913d..6828199 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -1,9 +1,12 @@ -#include +#include +#include #include "client.h" +#include "log.h" #include "server.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" using namespace std; @@ -26,9 +29,16 @@ CubemapStateProto ServerPool::serialize() for (int i = 0; i < num_servers; ++i) { CubemapStateProto local_state = servers[i].serialize(); - // The stream state should be identical between the servers, so we only store it once. + // The stream state should be identical between the servers, so we only store it once, + // save for the fds, which we keep around to distribute to the servers after re-exec. if (i == 0) { state.mutable_streams()->MergeFrom(local_state.streams()); + } else { + assert(state.streams_size() == local_state.streams_size()); + for (int j = 0; j < local_state.streams_size(); ++j) { + assert(local_state.streams(j).data_fds_size() == 1); + state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0)); + } } for (int j = 0; j < local_state.clients_size(); ++j) { state.add_clients()->MergeFrom(local_state.clients(j)); @@ -55,10 +65,39 @@ void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream } } -void ServerPool::add_stream_from_serialized(const StreamProto &stream) +void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector &data_fds) { + assert(!data_fds.empty()); + string contents; for (int i = 0; i < num_servers; ++i) { - servers[i].add_stream_from_serialized(stream); + int data_fd; + if (i < int(data_fds.size())) { + // Reuse one of the existing file descriptors. + data_fd = data_fds[i]; + } else { + // Clone the first one. + if (contents.empty()) { + if (!read_tempfile(data_fds[0], &contents)) { + exit(1); + } + } + data_fd = make_tempfile(contents); + } + + servers[i].add_stream_from_serialized(stream, data_fd); + } + + // Close and delete any leftovers, if the number of servers was reduced. + for (size_t i = num_servers; i < data_fds.size(); ++i) { + int ret; + do { + ret = close(data_fds[i]); // Implicitly deletes the file. + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + // Can still continue. + } } } diff --git a/serverpool.h b/serverpool.h index 84a201f..a027981 100644 --- a/serverpool.h +++ b/serverpool.h @@ -27,7 +27,7 @@ public: // Adds the given stream to all the servers. void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding); - void add_stream_from_serialized(const StreamProto &stream); + void add_stream_from_serialized(const StreamProto &stream, const std::vector &data_fds); // Adds the given data to all the servers. void set_header(const std::string &stream_id, diff --git a/state.proto b/state.proto index 5d38a6b..88ad2d8 100644 --- a/state.proto +++ b/state.proto @@ -18,13 +18,17 @@ message ClientProto { message StreamProto { optional bytes http_header = 6; optional bytes stream_header = 7; - optional bytes data = 2; + repeated int32 data_fds = 8; optional int64 backlog_size = 5 [default=1048576]; optional int64 bytes_received = 3; optional string stream_id = 4; // Older versions stored the HTTP and video headers together in this field. optional bytes header = 1; + + // Older versions stored the data in the protobuf instead of sending file + // descriptors around. + optional bytes data = 2; }; // Corresponds to class Input. diff --git a/stream.cpp b/stream.cpp index e61428a..109d2fd 100644 --- a/stream.cpp +++ b/stream.cpp @@ -40,12 +40,12 @@ Stream::~Stream() } } -Stream::Stream(const StreamProto &serialized) +Stream::Stream(const StreamProto &serialized, int data_fd) : stream_id(serialized.stream_id()), http_header(serialized.http_header()), stream_header(serialized.stream_header()), encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later. - data_fd(make_tempfile(serialized.data())), + data_fd(data_fd), backlog_size(serialized.backlog_size()), bytes_received(serialized.bytes_received()), mark_pool(NULL) @@ -73,9 +73,7 @@ StreamProto Stream::serialize() StreamProto serialized; serialized.set_http_header(http_header); serialized.set_stream_header(stream_header); - if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd. - exit(1); - } + serialized.add_data_fds(data_fd); serialized.set_backlog_size(backlog_size); serialized.set_bytes_received(bytes_received); serialized.set_stream_id(stream_id); @@ -90,7 +88,7 @@ void Stream::set_backlog_size(size_t new_size) } string existing_data; - if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd. + if (!read_tempfile_and_close(data_fd, &existing_data)) { exit(1); } diff --git a/stream.h b/stream.h index 22a162d..7e20aef 100644 --- a/stream.h +++ b/stream.h @@ -22,7 +22,7 @@ struct Stream { ~Stream(); // Serialization/deserialization. - Stream(const StreamProto &serialized); + Stream(const StreamProto &serialized, int data_fd); StreamProto serialize(); // Changes the backlog size, restructuring the data as needed. diff --git a/util.cpp b/util.cpp index 8159e38..b8862ed 100644 --- a/util.cpp +++ b/util.cpp @@ -40,24 +40,38 @@ int make_tempfile(const std::string &contents) return fd; } +bool read_tempfile_and_close(int fd, std::string *contents) +{ + bool ok = read_tempfile(fd, contents); + + int ret; + do { + ret = close(fd); // Implicitly deletes the file. + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + // Can still continue. + } + + return ok; +} + bool read_tempfile(int fd, std::string *contents) { - bool ok = true; ssize_t ret, has_read; off_t len = lseek(fd, 0, SEEK_END); if (len == -1) { log_perror("lseek"); - ok = false; - goto done; + return false; } contents->resize(len); if (lseek(fd, 0, SEEK_SET) == -1) { log_perror("lseek"); - ok = false; - goto done; + return false; } has_read = 0; @@ -65,26 +79,14 @@ bool read_tempfile(int fd, std::string *contents) ret = read(fd, &((*contents)[has_read]), len - has_read); if (ret == -1) { log_perror("read"); - ok = false; - goto done; + return false; } if (ret == 0) { log(ERROR, "Unexpected EOF!"); - ok = false; - goto done; + return false; } has_read += ret; } -done: - do { - ret = close(fd); // Implicitly deletes the files. - } while (ret == -1 && errno == EINTR); - - if (ret == -1) { - log_perror("close"); - // Can still continue. - } - - return ok; + return true; } diff --git a/util.h b/util.h index f64e257..303de14 100644 --- a/util.h +++ b/util.h @@ -10,6 +10,9 @@ int make_tempfile(const std::string &contents); // Opposite of make_tempfile(). Returns false on failure. +bool read_tempfile_and_close(int fd, std::string *contents); + +// Same as read_tempfile_and_close(), without the close. bool read_tempfile(int fd, std::string *contents); #endif // !defined(_UTIL_H