From cbdce14899459aca2e5331b6e1a969c359d28880 Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Fri, 16 Aug 2013 20:33:57 +0200 Subject: [PATCH] Store the stream header in the inputs, not just the streams. This avoids an issue where a stream would get no stream header when it reused an existing input. --- httpinput.cpp | 10 ++++++---- httpinput.h | 5 ++++- main.cpp | 18 ++++++++++++++++-- state.proto | 1 + 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/httpinput.cpp b/httpinput.cpp index 2fe6806..2e6e617 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -49,6 +49,7 @@ HTTPInput::HTTPInput(const InputProto &serialized) request_bytes_sent(serialized.request_bytes_sent()), response(serialized.response()), http_header(serialized.http_header()), + stream_header(serialized.stream_header()), has_metacube_header(serialized.has_metacube_header()), sock(serialized.sock()) { @@ -95,6 +96,7 @@ InputProto HTTPInput::serialize() const serialized.set_request_bytes_sent(request_bytes_sent); serialized.set_response(response); serialized.set_http_header(http_header); + serialized.set_stream_header(stream_header); serialized.set_pending_data(string(pending_data.begin(), pending_data.end())); serialized.set_has_metacube_header(has_metacube_header); serialized.set_sock(sock); @@ -263,7 +265,7 @@ bool HTTPInput::parse_response(const std::string &request) } for (size_t i = 0; i < stream_indices.size(); ++i) { - servers->set_header(stream_indices[i], http_header, ""); + servers->set_header(stream_indices[i], http_header, stream_header); } return true; @@ -504,9 +506,9 @@ void HTTPInput::process_data(char *ptr, size_t bytes) } char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { - string header(inner_data, inner_data + size); + stream_header = string(inner_data, inner_data + size); for (size_t i = 0; i < stream_indices.size(); ++i) { - servers->set_header(stream_indices[i], http_header, header); + servers->set_header(stream_indices[i], http_header, stream_header); } } else { StreamStartSuitability suitable_for_stream_start; @@ -542,7 +544,7 @@ void HTTPInput::drop_pending_data(size_t num_bytes) void HTTPInput::add_destination(int stream_index) { stream_indices.push_back(stream_index); - servers->set_header(stream_index, http_header, ""); + servers->set_header(stream_index, http_header, stream_header); } InputStats HTTPInput::get_stats() const diff --git a/httpinput.h b/httpinput.h index 78c49c3..2273e7d 100644 --- a/httpinput.h +++ b/httpinput.h @@ -69,9 +69,12 @@ private: // The HTTP response we've received so far. Only relevant for RECEIVING_HEADER. std::string response; - // The HTTP respones headers we want to give clients for this input. + // The HTTP response headers we want to give clients for this input. std::string http_header; + // The stream heder we want to give clients for this input. + std::string stream_header; + // Data we have received but not fully processed yet. std::vector pending_data; diff --git a/main.cpp b/main.cpp index 3f36f32..c4c5bd4 100644 --- a/main.cpp +++ b/main.cpp @@ -401,6 +401,7 @@ start: serialize_start.tv_usec = loaded_state.serialize_start_usec(); // Deserialize the streams. + map stream_headers_for_url; // See below. for (int i = 0; i < loaded_state.streams_size(); ++i) { const StreamProto &stream = loaded_state.streams(i); @@ -423,15 +424,28 @@ start: servers->add_stream_from_serialized(stream, data_fds); deserialized_urls.insert(stream.url()); + + stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header())); } } // Deserialize the inputs. Note that we don't actually add them to any stream yet. for (int i = 0; i < loaded_state.inputs_size(); ++i) { + InputProto serialized_input = loaded_state.inputs(i); + + // Older versions did not store the stream header in the input, + // only in each stream. We need to have the stream header in the + // input as well, in case we create a new stream reusing the same input. + // Thus, we put it into place here if it's missing. + if (!serialized_input.has_stream_header() && + stream_headers_for_url.count(serialized_input.url()) != 0) { + serialized_input.set_stream_header(stream_headers_for_url[serialized_input.url()]); + } + InputWithRefcount iwr; - iwr.input = create_input(loaded_state.inputs(i)); + iwr.input = create_input(serialized_input); iwr.refcount = 0; - inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr)); + inputs.insert(make_pair(serialized_input.url(), iwr)); } // Deserialize the acceptors. diff --git a/state.proto b/state.proto index 99c1e92..9479c2a 100644 --- a/state.proto +++ b/state.proto @@ -40,6 +40,7 @@ message InputProto { optional int32 request_bytes_sent = 5; optional bytes response = 6; optional bytes http_header = 10; + optional bytes stream_header = 14; optional bytes pending_data = 7; optional bool has_metacube_header = 8; optional int32 sock = 9; -- 2.39.5