#include <utility>
#include <vector>
-#include "accesslog.h"
#include "acceptor.h"
+#include "accesslog.h"
#include "config.h"
#include "input.h"
#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
#include "stats.h"
+#include "stream.h"
#include "util.h"
#include "version.h"
for (unsigned i = 0; i < config.streams.size(); ++i) {
const StreamConfig &stream_config = config.streams[i];
if (deserialized_stream_ids.count(stream_config.stream_id) == 0) {
- servers->add_stream(stream_config.stream_id, stream_config.backlog_size);
+ servers->add_stream(stream_config.stream_id,
+ stream_config.backlog_size,
+ Stream::Encoding(stream_config.encoding));
} else {
servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size);
+ servers->set_encoding(stream_config.stream_id,
+ Stream::Encoding(stream_config.encoding));
}
expecting_stream_ids.erase(stream_config.stream_id);
// Deserialize the streams.
for (int i = 0; i < loaded_state.streams_size(); ++i) {
- servers->add_stream_from_serialized(loaded_state.streams(i));
- deserialized_stream_ids.insert(loaded_state.streams(i).stream_id());
+ const StreamProto &stream = loaded_state.streams(i);
+
+ vector<int> data_fds;
+ for (int j = 0; j < stream.data_fds_size(); ++j) {
+ data_fds.push_back(stream.data_fds(j));
+ }
+
+ // Older versions stored the data once in the protobuf instead of
+ // sending around file descriptors.
+ if (data_fds.empty() && stream.has_data()) {
+ data_fds.push_back(make_tempfile(stream.data()));
+ }
+
+ servers->add_stream_from_serialized(stream, data_fds);
+ deserialized_stream_ids.insert(stream.stream_id());
}
// Deserialize the inputs. Note that we don't actually add them to any stream yet.
if (input_it->second.refcount == 0) {
log(WARNING, "Input '%s' no longer in use, closing.",
input_it->first.c_str());
+ input_it->second.input->close_socket();
delete input_it->second.input;
inputs.erase(input_it++);
} else {