Store the stream header in the inputs, not just the streams.
authorSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 16 Aug 2013 18:33:57 +0000 (20:33 +0200)
committerSteinar H. Gunderson <sgunderson@bigfoot.com>
Fri, 16 Aug 2013 18:33:57 +0000 (20:33 +0200)
This avoids an issue where a stream would get no stream header
when it reused an existing input.

httpinput.cpp
httpinput.h
main.cpp
state.proto

index 2fe6806..2e6e617 100644 (file)
@@ -49,6 +49,7 @@ HTTPInput::HTTPInput(const InputProto &serialized)
          request_bytes_sent(serialized.request_bytes_sent()),
          response(serialized.response()),
          http_header(serialized.http_header()),
+         stream_header(serialized.stream_header()),
          has_metacube_header(serialized.has_metacube_header()),
          sock(serialized.sock())
 {
@@ -95,6 +96,7 @@ InputProto HTTPInput::serialize() const
        serialized.set_request_bytes_sent(request_bytes_sent);
        serialized.set_response(response);
        serialized.set_http_header(http_header);
+       serialized.set_stream_header(stream_header);
        serialized.set_pending_data(string(pending_data.begin(), pending_data.end()));
        serialized.set_has_metacube_header(has_metacube_header);
        serialized.set_sock(sock);
@@ -263,7 +265,7 @@ bool HTTPInput::parse_response(const std::string &request)
        }
 
        for (size_t i = 0; i < stream_indices.size(); ++i) {
-               servers->set_header(stream_indices[i], http_header, "");
+               servers->set_header(stream_indices[i], http_header, stream_header);
        }
 
        return true;
@@ -504,9 +506,9 @@ void HTTPInput::process_data(char *ptr, size_t bytes)
                }
                char *inner_data = pending_data.data() + sizeof(metacube_block_header);
                if (flags & METACUBE_FLAGS_HEADER) {
-                       string header(inner_data, inner_data + size);
+                       stream_header = string(inner_data, inner_data + size);
                        for (size_t i = 0; i < stream_indices.size(); ++i) {
-                               servers->set_header(stream_indices[i], http_header, header);
+                               servers->set_header(stream_indices[i], http_header, stream_header);
                        }
                } else {
                        StreamStartSuitability suitable_for_stream_start;
@@ -542,7 +544,7 @@ void HTTPInput::drop_pending_data(size_t num_bytes)
 void HTTPInput::add_destination(int stream_index)
 {
        stream_indices.push_back(stream_index);
-       servers->set_header(stream_index, http_header, "");
+       servers->set_header(stream_index, http_header, stream_header);
 }
 
 InputStats HTTPInput::get_stats() const
index 78c49c3..2273e7d 100644 (file)
@@ -69,9 +69,12 @@ private:
        // The HTTP response we've received so far. Only relevant for RECEIVING_HEADER.
        std::string response;
 
-       // The HTTP respones headers we want to give clients for this input.
+       // The HTTP response headers we want to give clients for this input.
        std::string http_header;
 
+       // The stream heder we want to give clients for this input.
+       std::string stream_header;
+
        // Data we have received but not fully processed yet.
        std::vector<char> pending_data;
 
index 3f36f32..c4c5bd4 100644 (file)
--- a/main.cpp
+++ b/main.cpp
@@ -401,6 +401,7 @@ start:
                serialize_start.tv_usec = loaded_state.serialize_start_usec();
 
                // Deserialize the streams.
+               map<string, string> stream_headers_for_url;  // See below.
                for (int i = 0; i < loaded_state.streams_size(); ++i) {
                        const StreamProto &stream = loaded_state.streams(i);
 
@@ -423,15 +424,28 @@ start:
 
                                servers->add_stream_from_serialized(stream, data_fds);
                                deserialized_urls.insert(stream.url());
+
+                               stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header()));
                        }
                }
 
                // Deserialize the inputs. Note that we don't actually add them to any stream yet.
                for (int i = 0; i < loaded_state.inputs_size(); ++i) {
+                       InputProto serialized_input = loaded_state.inputs(i);
+
+                       // Older versions did not store the stream header in the input,
+                       // only in each stream. We need to have the stream header in the
+                       // input as well, in case we create a new stream reusing the same input.
+                       // Thus, we put it into place here if it's missing.
+                       if (!serialized_input.has_stream_header() &&
+                           stream_headers_for_url.count(serialized_input.url()) != 0) {
+                               serialized_input.set_stream_header(stream_headers_for_url[serialized_input.url()]);
+                       }
+
                        InputWithRefcount iwr;
-                       iwr.input = create_input(loaded_state.inputs(i));
+                       iwr.input = create_input(serialized_input);
                        iwr.refcount = 0;
-                       inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr));
+                       inputs.insert(make_pair(serialized_input.url(), iwr));
                } 
 
                // Deserialize the acceptors.
index 99c1e92..9479c2a 100644 (file)
@@ -40,6 +40,7 @@ message InputProto {
        optional int32 request_bytes_sent = 5;
        optional bytes response = 6;
        optional bytes http_header = 10;
+       optional bytes stream_header = 14;
        optional bytes pending_data = 7;
        optional bool has_metacube_header = 8;
        optional int32 sock = 9;