]> git.sesse.net Git - cubemap/blobdiff - serverpool.cpp
Fix some issues with the last stats.cpp fix.
[cubemap] / serverpool.cpp
index 60a913d56146ecf58f7f9b72b82af0bb7adeb1f5..fb1668239a366b004313db3ab4b0efb2fd63ec00 100644 (file)
@@ -1,9 +1,15 @@
+#include <assert.h>
+#include <errno.h>
 #include <google/protobuf/repeated_field.h>
+#include <stdlib.h>
+#include <unistd.h>
 
 #include "client.h"
+#include "log.h"
 #include "server.h"
 #include "serverpool.h"
 #include "state.pb.h"
+#include "util.h"
 
 using namespace std;
 
@@ -26,9 +32,16 @@ CubemapStateProto ServerPool::serialize()
        for (int i = 0; i < num_servers; ++i) {
                 CubemapStateProto local_state = servers[i].serialize();
 
-               // The stream state should be identical between the servers, so we only store it once.
+               // The stream state should be identical between the servers, so we only store it once,
+               // save for the fds, which we keep around to distribute to the servers after re-exec.
                if (i == 0) {
                        state.mutable_streams()->MergeFrom(local_state.streams());
+               } else {
+                       assert(state.streams_size() == local_state.streams_size());
+                       for (int j = 0; j < local_state.streams_size(); ++j) {
+                               assert(local_state.streams(j).data_fds_size() == 1);
+                               state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
+                       }
                }
                for (int j = 0; j < local_state.clients_size(); ++j) {
                        state.add_clients()->MergeFrom(local_state.clients(j));
@@ -55,10 +68,39 @@ void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream
        }
 }
 
-void ServerPool::add_stream_from_serialized(const StreamProto &stream)
+void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
 {
+       assert(!data_fds.empty());
+       string contents;
        for (int i = 0; i < num_servers; ++i) {
-               servers[i].add_stream_from_serialized(stream);
+               int data_fd;
+               if (i < int(data_fds.size())) {
+                       // Reuse one of the existing file descriptors.
+                       data_fd = data_fds[i];
+               } else {
+                       // Clone the first one.
+                       if (contents.empty()) {
+                               if (!read_tempfile(data_fds[0], &contents)) {
+                                       exit(1);
+                               }
+                       }
+                       data_fd = make_tempfile(contents);
+               }
+
+               servers[i].add_stream_from_serialized(stream, data_fd);
+       }
+
+       // Close and delete any leftovers, if the number of servers was reduced.
+       for (size_t i = num_servers; i < data_fds.size(); ++i) {
+               int ret;
+               do {
+                       ret = close(data_fds[i]);  // Implicitly deletes the file.
+               } while (ret == -1 && errno == EINTR);
+
+               if (ret == -1) {
+                       log_perror("close");
+                       // Can still continue.
+               }
        }
 }