Send backlog file descriptors around instead of going through the protobuf. Much...
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 18 Apr 2013 22:41:19 +0000 (00:41 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Thu, 18 Apr 2013 22:41:19 +0000 (00:41 +0200)
main.cpp
server.cpp
server.h
serverpool.cpp
serverpool.h
state.proto
stream.cpp
stream.h
util.cpp
util.h

index c626f46..c42fd19 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -341,8 +341,21 @@ start:
 
                // Deserialize the streams.
                for (int i = 0; i < loaded_state.streams_size(); ++i) {
-                       servers->add_stream_from_serialized(loaded_state.streams(i));
-                       deserialized_stream_ids.insert(loaded_state.streams(i).stream_id());
+                       const StreamProto &stream = loaded_state.streams(i);
+
+                       vector<int> data_fds;
+                       for (int j = 0; j < stream.data_fds_size(); ++j) {
+                               data_fds.push_back(stream.data_fds(j));
+                       }
+
+                       // Older versions stored the data once in the protobuf instead of
+                       // sending around file descriptors.
+                       if (data_fds.empty() && stream.has_data()) {
+                               data_fds.push_back(make_tempfile(stream.data()));
+                       }
+
+                       servers->add_stream_from_serialized(stream, data_fds);
+                       deserialized_stream_ids.insert(stream.stream_id());
                }
 
                // Deserialize the inputs. Note that we don't actually add them to any stream yet.
index 8282a56..640f10a 100644 (file)
@@ -202,10 +202,10 @@ void Server::add_stream(const string &stream_id, size_t backlog_size, Stream::En
        streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
 }
 
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
 {
        MutexLock lock(&mutex);
-       streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+       streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
 }
        
 void Server::set_backlog_size(const string &stream_id, size_t new_size)
index 523f5c5..a3e032d 100644 (file)
--- a/server.h
+++ b/server.h
@@ -55,7 +55,7 @@ public:
        CubemapStateProto serialize();
        void add_client_from_serialized(const ClientProto &client);
        void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
-       void add_stream_from_serialized(const StreamProto &stream);
+       void add_stream_from_serialized(const StreamProto &stream, int data_fd);
        void set_backlog_size(const std::string &stream_id, size_t new_size);
        void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
 
index 60a913d..6828199 100644 (file)
@@ -1,9 +1,12 @@
-#include <google/protobuf/repeated_field.h>
+#include <unistd.h>
+#include <errno.h>
 
 #include "client.h"
+#include "log.h"
 #include "server.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "util.h"
 
 using namespace std;
 
@@ -26,9 +29,16 @@ CubemapStateProto ServerPool::serialize()
        for (int i = 0; i < num_servers; ++i) {
                 CubemapStateProto local_state = servers[i].serialize();
 
-               // The stream state should be identical between the servers, so we only store it once.
+               // The stream state should be identical between the servers, so we only store it once,
+               // save for the fds, which we keep around to distribute to the servers after re-exec.
                if (i == 0) {
                        state.mutable_streams()->MergeFrom(local_state.streams());
+               } else {
+                       assert(state.streams_size() == local_state.streams_size());
+                       for (int j = 0; j < local_state.streams_size(); ++j) {
+                               assert(local_state.streams(j).data_fds_size() == 1);
+                               state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
+                       }
                }
                for (int j = 0; j < local_state.clients_size(); ++j) {
                        state.add_clients()->MergeFrom(local_state.clients(j));
@@ -55,10 +65,39 @@ void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream
        }
 }
 
-void ServerPool::add_stream_from_serialized(const StreamProto &stream)
+void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
 {
+       assert(!data_fds.empty());
+       string contents;
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_stream_from_serialized(stream);
+               int data_fd;
+               if (i < int(data_fds.size())) {
+                       // Reuse one of the existing file descriptors.
+                       data_fd = data_fds[i];
+               } else {
+                       // Clone the first one.
+                       if (contents.empty()) {
+                               if (!read_tempfile(data_fds[0], &contents)) {
+                                       exit(1);
+                               }
+                       }
+                       data_fd = make_tempfile(contents);
+               }
+
+               servers[i].add_stream_from_serialized(stream, data_fd);
+       }
+
+       // Close and delete any leftovers, if the number of servers was reduced.
+       for (size_t i = num_servers; i < data_fds.size(); ++i) {
+               int ret;
+               do {
+                       ret = close(data_fds[i]);  // Implicitly deletes the file.
+               } while (ret == -1 && errno == EINTR);
+
+               if (ret == -1) {
+                       log_perror("close");
+                       // Can still continue.
+               }
        }
 }
 
index 84a201f..a027981 100644 (file)
@@ -27,7 +27,7 @@ public:
 
        // Adds the given stream to all the servers.
        void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
-       void add_stream_from_serialized(const StreamProto &stream);
+       void add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
 
        // Adds the given data to all the servers.
        void set_header(const std::string &stream_id,
index 5d38a6b..88ad2d8 100644 (file)
@@ -18,13 +18,17 @@ message ClientProto {
 message StreamProto {
        optional bytes http_header = 6;
        optional bytes stream_header = 7;
-       optional bytes data = 2;
+       repeated int32 data_fds = 8;
        optional int64 backlog_size = 5 [default=1048576];
        optional int64 bytes_received = 3;
        optional string stream_id = 4;
 
        // Older versions stored the HTTP and video headers together in this field.
        optional bytes header = 1;
+
+       // Older versions stored the data in the protobuf instead of sending file
+       // descriptors around.
+       optional bytes data = 2;
 };
 
 // Corresponds to class Input.
index e61428a..109d2fd 100644 (file)
@@ -40,12 +40,12 @@ Stream::~Stream()
        }
 }
 
-Stream::Stream(const StreamProto &serialized)
+Stream::Stream(const StreamProto &serialized, int data_fd)
        : stream_id(serialized.stream_id()),
          http_header(serialized.http_header()),
          stream_header(serialized.stream_header()),
          encoding(Stream::STREAM_ENCODING_RAW),  // Will be changed later.
-         data_fd(make_tempfile(serialized.data())),
+         data_fd(data_fd),
          backlog_size(serialized.backlog_size()),
          bytes_received(serialized.bytes_received()),
          mark_pool(NULL)
@@ -73,9 +73,7 @@ StreamProto Stream::serialize()
        StreamProto serialized;
        serialized.set_http_header(http_header);
        serialized.set_stream_header(stream_header);
-       if (!read_tempfile(data_fd, serialized.mutable_data())) {  // Closes data_fd.
-               exit(1);
-       }
+       serialized.add_data_fds(data_fd);
        serialized.set_backlog_size(backlog_size);
        serialized.set_bytes_received(bytes_received);
        serialized.set_stream_id(stream_id);
@@ -90,7 +88,7 @@ void Stream::set_backlog_size(size_t new_size)
        }
 
        string existing_data;
-       if (!read_tempfile(data_fd, &existing_data)) {  // Closes data_fd.
+       if (!read_tempfile_and_close(data_fd, &existing_data)) {
                exit(1);
        }
 
index 22a162d..7e20aef 100644 (file)
--- a/stream.h
+++ b/stream.h
@@ -22,7 +22,7 @@ struct Stream {
        ~Stream();
 
        // Serialization/deserialization.
-       Stream(const StreamProto &serialized);
+       Stream(const StreamProto &serialized, int data_fd);
        StreamProto serialize();
 
        // Changes the backlog size, restructuring the data as needed.
index 8159e38..b8862ed 100644 (file)
--- a/util.cpp
+++ b/util.cpp
@@ -40,24 +40,38 @@ int make_tempfile(const std::string &contents)
        return fd;
 }
 
+bool read_tempfile_and_close(int fd, std::string *contents)
+{
+       bool ok = read_tempfile(fd, contents);
+
+       int ret;
+       do {
+               ret = close(fd);  // Implicitly deletes the file.
+       } while (ret == -1 && errno == EINTR);
+       
+       if (ret == -1) {
+               log_perror("close");
+               // Can still continue.
+       }
+
+       return ok;
+}
+
 bool read_tempfile(int fd, std::string *contents)
 {
-       bool ok = true;
        ssize_t ret, has_read;
 
        off_t len = lseek(fd, 0, SEEK_END);
        if (len == -1) {
                log_perror("lseek");
-               ok = false;
-               goto done;
+               return false;
        }
 
        contents->resize(len);
 
        if (lseek(fd, 0, SEEK_SET) == -1) {
                log_perror("lseek");
-               ok = false;
-               goto done;
+               return false;
        }
 
        has_read = 0;
@@ -65,26 +79,14 @@ bool read_tempfile(int fd, std::string *contents)
                ret = read(fd, &((*contents)[has_read]), len - has_read);
                if (ret == -1) {
                        log_perror("read");
-                       ok = false;
-                       goto done;
+                       return false;
                }
                if (ret == 0) {
                        log(ERROR, "Unexpected EOF!");
-                       ok = false;
-                       goto done;
+                       return false;
                }
                has_read += ret;
        }
 
-done:
-       do {
-               ret = close(fd);  // Implicitly deletes the files.
-       } while (ret == -1 && errno == EINTR);
-       
-       if (ret == -1) {
-               log_perror("close");
-               // Can still continue.
-       }
-
-       return ok;
+       return true;
 }
diff --git a/util.h b/util.h
index f64e257..303de14 100644 (file)
--- a/util.h
+++ b/util.h
@@ -10,6 +10,9 @@
 int make_tempfile(const std::string &contents);
 
 // Opposite of make_tempfile(). Returns false on failure.
+bool read_tempfile_and_close(int fd, std::string *contents);
+
+// Same as read_tempfile_and_close(), without the close.
 bool read_tempfile(int fd, std::string *contents);
 
 #endif  // !defined(_UTIL_H