request_bytes_sent(serialized.request_bytes_sent()),
response(serialized.response()),
http_header(serialized.http_header()),
+ stream_header(serialized.stream_header()),
has_metacube_header(serialized.has_metacube_header()),
sock(serialized.sock())
{
serialized.set_request_bytes_sent(request_bytes_sent);
serialized.set_response(response);
serialized.set_http_header(http_header);
+ serialized.set_stream_header(stream_header);
serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
serialized.set_has_metacube_header(has_metacube_header);
serialized.set_sock(sock);
}
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, "");
+ servers->set_header(stream_indices[i], http_header, stream_header);
}
return true;
}
char *inner_data = pending_data.data() + sizeof(metacube_block_header);
if (flags & METACUBE_FLAGS_HEADER) {
- string header(inner_data, inner_data + size);
+ stream_header = string(inner_data, inner_data + size);
for (size_t i = 0; i < stream_indices.size(); ++i) {
- servers->set_header(stream_indices[i], http_header, header);
+ servers->set_header(stream_indices[i], http_header, stream_header);
}
} else {
StreamStartSuitability suitable_for_stream_start;
void HTTPInput::add_destination(int stream_index)
{
stream_indices.push_back(stream_index);
- servers->set_header(stream_index, http_header, "");
+ servers->set_header(stream_index, http_header, stream_header);
}
InputStats HTTPInput::get_stats() const
// The HTTP response we've received so far. Only relevant for RECEIVING_HEADER.
std::string response;
- // The HTTP respones headers we want to give clients for this input.
+ // The HTTP response headers we want to give clients for this input.
std::string http_header;
+ // The stream heder we want to give clients for this input.
+ std::string stream_header;
+
// Data we have received but not fully processed yet.
std::vector<char> pending_data;
serialize_start.tv_usec = loaded_state.serialize_start_usec();
// Deserialize the streams.
+ map<string, string> stream_headers_for_url; // See below.
for (int i = 0; i < loaded_state.streams_size(); ++i) {
const StreamProto &stream = loaded_state.streams(i);
servers->add_stream_from_serialized(stream, data_fds);
deserialized_urls.insert(stream.url());
+
+ stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header()));
}
}
// Deserialize the inputs. Note that we don't actually add them to any stream yet.
for (int i = 0; i < loaded_state.inputs_size(); ++i) {
+ InputProto serialized_input = loaded_state.inputs(i);
+
+ // Older versions did not store the stream header in the input,
+ // only in each stream. We need to have the stream header in the
+ // input as well, in case we create a new stream reusing the same input.
+ // Thus, we put it into place here if it's missing.
+ if (!serialized_input.has_stream_header() &&
+ stream_headers_for_url.count(serialized_input.url()) != 0) {
+ serialized_input.set_stream_header(stream_headers_for_url[serialized_input.url()]);
+ }
+
InputWithRefcount iwr;
- iwr.input = create_input(loaded_state.inputs(i));
+ iwr.input = create_input(serialized_input);
iwr.refcount = 0;
- inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr));
+ inputs.insert(make_pair(serialized_input.url(), iwr));
}
// Deserialize the acceptors.
optional int32 request_bytes_sent = 5;
optional bytes response = 6;
optional bytes http_header = 10;
+ optional bytes stream_header = 14;
optional bytes pending_data = 7;
optional bool has_metacube_header = 8;
optional int32 sock = 9;