// Deserialize the streams.
for (int i = 0; i < loaded_state.streams_size(); ++i) {
- servers->add_stream_from_serialized(loaded_state.streams(i));
- deserialized_stream_ids.insert(loaded_state.streams(i).stream_id());
+ const StreamProto &stream = loaded_state.streams(i);
+
+ vector<int> data_fds;
+ for (int j = 0; j < stream.data_fds_size(); ++j) {
+ data_fds.push_back(stream.data_fds(j));
+ }
+
+ // Older versions stored the data once in the protobuf instead of
+ // sending around file descriptors.
+ if (data_fds.empty() && stream.has_data()) {
+ data_fds.push_back(make_tempfile(stream.data()));
+ }
+
+ servers->add_stream_from_serialized(stream, data_fds);
+ deserialized_stream_ids.insert(stream.stream_id());
}
// Deserialize the inputs. Note that we don't actually add them to any stream yet.
streams.insert(make_pair(stream_id, new Stream(stream_id, backlog_size, encoding)));
}
-void Server::add_stream_from_serialized(const StreamProto &stream)
+void Server::add_stream_from_serialized(const StreamProto &stream, int data_fd)
{
MutexLock lock(&mutex);
- streams.insert(make_pair(stream.stream_id(), new Stream(stream)));
+ streams.insert(make_pair(stream.stream_id(), new Stream(stream, data_fd)));
}
void Server::set_backlog_size(const string &stream_id, size_t new_size)
CubemapStateProto serialize();
void add_client_from_serialized(const ClientProto &client);
void add_stream(const std::string &stream_id, size_t bytes_received, Stream::Encoding encoding);
- void add_stream_from_serialized(const StreamProto &stream);
+ void add_stream_from_serialized(const StreamProto &stream, int data_fd);
void set_backlog_size(const std::string &stream_id, size_t new_size);
void set_encoding(const std::string &stream_id, Stream::Encoding encoding);
-#include <google/protobuf/repeated_field.h>
+#include <unistd.h>
+#include <errno.h>
#include "client.h"
+#include "log.h"
#include "server.h"
#include "serverpool.h"
#include "state.pb.h"
+#include "util.h"
using namespace std;
for (int i = 0; i < num_servers; ++i) {
CubemapStateProto local_state = servers[i].serialize();
- // The stream state should be identical between the servers, so we only store it once.
+ // The stream state should be identical between the servers, so we only store it once,
+ // save for the fds, which we keep around to distribute to the servers after re-exec.
if (i == 0) {
state.mutable_streams()->MergeFrom(local_state.streams());
+ } else {
+ assert(state.streams_size() == local_state.streams_size());
+ for (int j = 0; j < local_state.streams_size(); ++j) {
+ assert(local_state.streams(j).data_fds_size() == 1);
+ state.mutable_streams(j)->add_data_fds(local_state.streams(j).data_fds(0));
+ }
}
for (int j = 0; j < local_state.clients_size(); ++j) {
state.add_clients()->MergeFrom(local_state.clients(j));
}
}
-void ServerPool::add_stream_from_serialized(const StreamProto &stream)
+void ServerPool::add_stream_from_serialized(const StreamProto &stream, const vector<int> &data_fds)
{
+ assert(!data_fds.empty());
+ string contents;
for (int i = 0; i < num_servers; ++i) {
- servers[i].add_stream_from_serialized(stream);
+ int data_fd;
+ if (i < int(data_fds.size())) {
+ // Reuse one of the existing file descriptors.
+ data_fd = data_fds[i];
+ } else {
+ // Clone the first one.
+ if (contents.empty()) {
+ if (!read_tempfile(data_fds[0], &contents)) {
+ exit(1);
+ }
+ }
+ data_fd = make_tempfile(contents);
+ }
+
+ servers[i].add_stream_from_serialized(stream, data_fd);
+ }
+
+ // Close and delete any leftovers, if the number of servers was reduced.
+ for (size_t i = num_servers; i < data_fds.size(); ++i) {
+ int ret;
+ do {
+ ret = close(data_fds[i]); // Implicitly deletes the file.
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ // Can still continue.
+ }
}
}
// Adds the given stream to all the servers.
void add_stream(const std::string &stream_id, size_t backlog_size, Stream::Encoding encoding);
- void add_stream_from_serialized(const StreamProto &stream);
+ void add_stream_from_serialized(const StreamProto &stream, const std::vector<int> &data_fds);
// Adds the given data to all the servers.
void set_header(const std::string &stream_id,
message StreamProto {
optional bytes http_header = 6;
optional bytes stream_header = 7;
- optional bytes data = 2;
+ repeated int32 data_fds = 8;
optional int64 backlog_size = 5 [default=1048576];
optional int64 bytes_received = 3;
optional string stream_id = 4;
// Older versions stored the HTTP and video headers together in this field.
optional bytes header = 1;
+
+ // Older versions stored the data in the protobuf instead of sending file
+ // descriptors around.
+ optional bytes data = 2;
};
// Corresponds to class Input.
}
}
-Stream::Stream(const StreamProto &serialized)
+Stream::Stream(const StreamProto &serialized, int data_fd)
: stream_id(serialized.stream_id()),
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
- data_fd(make_tempfile(serialized.data())),
+ data_fd(data_fd),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
mark_pool(NULL)
StreamProto serialized;
serialized.set_http_header(http_header);
serialized.set_stream_header(stream_header);
- if (!read_tempfile(data_fd, serialized.mutable_data())) { // Closes data_fd.
- exit(1);
- }
+ serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
serialized.set_stream_id(stream_id);
}
string existing_data;
- if (!read_tempfile(data_fd, &existing_data)) { // Closes data_fd.
+ if (!read_tempfile_and_close(data_fd, &existing_data)) {
exit(1);
}
~Stream();
// Serialization/deserialization.
- Stream(const StreamProto &serialized);
+ Stream(const StreamProto &serialized, int data_fd);
StreamProto serialize();
// Changes the backlog size, restructuring the data as needed.
return fd;
}
+bool read_tempfile_and_close(int fd, std::string *contents)
+{
+ bool ok = read_tempfile(fd, contents);
+
+ int ret;
+ do {
+ ret = close(fd); // Implicitly deletes the file.
+ } while (ret == -1 && errno == EINTR);
+
+ if (ret == -1) {
+ log_perror("close");
+ // Can still continue.
+ }
+
+ return ok;
+}
+
bool read_tempfile(int fd, std::string *contents)
{
- bool ok = true;
ssize_t ret, has_read;
off_t len = lseek(fd, 0, SEEK_END);
if (len == -1) {
log_perror("lseek");
- ok = false;
- goto done;
+ return false;
}
contents->resize(len);
if (lseek(fd, 0, SEEK_SET) == -1) {
log_perror("lseek");
- ok = false;
- goto done;
+ return false;
}
has_read = 0;
ret = read(fd, &((*contents)[has_read]), len - has_read);
if (ret == -1) {
log_perror("read");
- ok = false;
- goto done;
+ return false;
}
if (ret == 0) {
log(ERROR, "Unexpected EOF!");
- ok = false;
- goto done;
+ return false;
}
has_read += ret;
}
-done:
- do {
- ret = close(fd); // Implicitly deletes the files.
- } while (ret == -1 && errno == EINTR);
-
- if (ret == -1) {
- log_perror("close");
- // Can still continue.
- }
-
- return ok;
+ return true;
}
int make_tempfile(const std::string &contents);
// Opposite of make_tempfile(). Returns false on failure.
+bool read_tempfile_and_close(int fd, std::string *contents);
+
+// Same as read_tempfile_and_close(), without the close.
bool read_tempfile(int fd, std::string *contents);
#endif // !defined(_UTIL_H