X-Git-Url: https://git.sesse.net/?a=blobdiff_plain;f=serverpool.cpp;h=68281996847fa5686158d50e33b588f198d330d6;hb=534764f026b82b144e974882c8e53c4cd8d21b68;hp=60a913d56146ecf58f7f9b72b82af0bb7adeb1f5;hpb=3b8ad87137cff7522ed12f4675d5ff26933bc94a;p=cubemap diff --git a/serverpool.cpp b/serverpool.cpp index 60a913d..6828199 100644 --- a/serverpool.cpp +++ b/serverpool.cpp @@ -1,9 +1,12 @@ -#include +#include +#include #include "client.h" +#include "log.h" #include "server.h" #include "serverpool.h" #include "state.pb.h" +#include "util.h" using namespace std; @@ -26,9 +29,16 @@ CubemapStateProto ServerPool::serialize() for (int i = 0; i < num_servers; ++i) { CubemapStateProto local_state = servers[i].serialize(); - // The stream state should be identical between the servers, so we only store it once. + // The stream state should be identical between the servers, so we only store it once, + // save for the fds, which we keep around to distribute to the servers after re-exec. if (i == 0) { state.mutable_streams()->MergeFrom(local_state.streams()); + } else { + assert(state.streams_size() == local_state.streams_size()); + for (int j = 0; j < local_state.streams_size(); ++j) { + assert(local_state.streams(j).data_fds_size() == 1); + state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0)); + } } for (int j = 0; j < local_state.clients_size(); ++j) { state.add_clients()->MergeFrom(local_state.clients(j)); @@ -55,10 +65,39 @@ void ServerPool::add_stream(const string &stream_id, size_t backlog_size, Stream } } -void ServerPool::add_stream_from_serialized(const StreamProto &stream) +void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector &data_fds) { + assert(!data_fds.empty()); + string contents; for (int i = 0; i < num_servers; ++i) { - servers[i].add_stream_from_serialized(stream); + int data_fd; + if (i < int(data_fds.size())) { + // Reuse one of the existing file descriptors. + data_fd = data_fds[i]; + } else { + // Clone the first one. + if (contents.empty()) { + if (!read_tempfile(data_fds[0], &contents)) { + exit(1); + } + } + data_fd = make_tempfile(contents); + } + + servers[i].add_stream_from_serialized(stream, data_fd); + } + + // Close and delete any leftovers, if the number of servers was reduced. + for (size_t i = num_servers; i < data_fds.size(); ++i) { + int ret; + do { + ret = close(data_fds[i]); // Implicitly deletes the file. + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + log_perror("close"); + // Can still continue. + } } }