From 340489a8e732519182ecbc92116e7dfa2997143c Mon Sep 17 00:00:00 2001 From: "Steinar H. Gunderson" Date: Wed, 17 Apr 2013 23:36:46 +0200 Subject: [PATCH] Deduplicate inputs. --- httpinput.cpp | 50 +++++++++++++---------- httpinput.h | 9 ++++- input.cpp | 6 +-- input.h | 3 +- main.cpp | 110 +++++++++++++++++++++++++++----------------------- state.proto | 1 - udpinput.cpp | 19 ++++----- udpinput.h | 9 ++++- 8 files changed, 117 insertions(+), 90 deletions(-) diff --git a/httpinput.cpp b/httpinput.cpp index 72a8f3a..51b4f41 100644 --- a/httpinput.cpp +++ b/httpinput.cpp @@ -27,9 +27,8 @@ using namespace std; extern ServerPool *servers; -HTTPInput::HTTPInput(const string &stream_id, const string &url) +HTTPInput::HTTPInput(const string &url) : state(NOT_CONNECTED), - stream_id(stream_id), url(url), has_metacube_header(false), sock(-1) @@ -38,7 +37,6 @@ HTTPInput::HTTPInput(const string &stream_id, const string &url) HTTPInput::HTTPInput(const InputProto &serialized) : state(State(serialized.state())), - stream_id(serialized.stream_id()), url(serialized.url()), request(serialized.request()), request_bytes_sent(serialized.request_bytes_sent()), @@ -70,7 +68,6 @@ InputProto HTTPInput::serialize() const { InputProto serialized; serialized.set_state(state); - serialized.set_stream_id(stream_id); serialized.set_url(url); serialized.set_request(request); serialized.set_request_bytes_sent(request_bytes_sent); @@ -88,7 +85,7 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) int err = getaddrinfo(host.c_str(), port.c_str(), NULL, &ai); if (err == -1) { log(WARNING, "[%s] Lookup of '%s' failed (%s).", - stream_id.c_str(), host.c_str(), gai_strerror(err)); + url.c_str(), host.c_str(), gai_strerror(err)); freeaddrinfo(ai); return -1; } @@ -126,7 +123,7 @@ int HTTPInput::lookup_and_connect(const string &host, const string &port) // Give the last one as error. log(WARNING, "[%s] Connect to '%s' failed (%s)", - stream_id.c_str(), host.c_str(), strerror(errno)); + url.c_str(), host.c_str(), strerror(errno)); freeaddrinfo(base_ai); return -1; } @@ -135,21 +132,21 @@ bool HTTPInput::parse_response(const std::string &request) { vector lines = split_lines(response); if (lines.empty()) { - log(WARNING, "[%s] Empty HTTP response from input.", stream_id.c_str()); + log(WARNING, "[%s] Empty HTTP response from input.", url.c_str()); return false; } vector first_line_tokens = split_tokens(lines[0]); if (first_line_tokens.size() < 2) { log(WARNING, "[%s] Malformed response line '%s' from input.", - stream_id.c_str(), lines[0].c_str()); + url.c_str(), lines[0].c_str()); return false; } int response = atoi(first_line_tokens[1].c_str()); if (response != 200) { log(WARNING, "[%s] Non-200 response '%s' from input.", - stream_id.c_str(), lines[0].c_str()); + url.c_str(), lines[0].c_str()); return false; } @@ -158,7 +155,7 @@ bool HTTPInput::parse_response(const std::string &request) size_t split = lines[i].find(":"); if (split == string::npos) { log(WARNING, "[%s] Ignoring malformed HTTP response line '%s'", - stream_id.c_str(), lines[i].c_str()); + url.c_str(), lines[i].c_str()); continue; } @@ -203,8 +200,11 @@ bool HTTPInput::parse_response(const std::string &request) ++it) { http_header.append(it->first + ": " + it->second + "\r\n"); } - http_header.append("\r\n"); - servers->set_header(stream_id, http_header); + http_header.append("\r\n"); + + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->set_header(stream_ids[i], http_header); + } return true; } @@ -236,12 +236,14 @@ void HTTPInput::do_work() request_bytes_sent = 0; response.clear(); pending_data.clear(); - servers->set_header(stream_id, ""); + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->set_header(stream_ids[i], ""); + } { string protocol; // Thrown away. if (!parse_url(url, &protocol, &host, &port, &path)) { - log(WARNING, "[%s] Failed to parse URL '%s'", stream_id.c_str(), url.c_str()); + log(WARNING, "[%s] Failed to parse URL '%s'", url.c_str(), url.c_str()); break; } } @@ -299,7 +301,7 @@ void HTTPInput::do_work() if (ret == 0) { // This really shouldn't happen... log(ERROR, "[%s] Socket unexpectedly closed while reading header", - stream_id.c_str()); + url.c_str()); state = CLOSING_SOCKET; continue; } @@ -307,7 +309,7 @@ void HTTPInput::do_work() RequestParseStatus status = wait_for_double_newline(&response, buf, ret); if (status == RP_OUT_OF_SPACE) { - log(WARNING, "[%s] Sever sent overlong HTTP response!", stream_id.c_str()); + log(WARNING, "[%s] Sever sent overlong HTTP response!", url.c_str()); state = CLOSING_SOCKET; continue; } else if (status == RP_NOT_FINISHED_YET) { @@ -335,7 +337,7 @@ void HTTPInput::do_work() } log(INFO, "[%s] Connected to '%s', receiving data.", - stream_id.c_str(), url.c_str()); + url.c_str(), url.c_str()); state = RECEIVING_DATA; break; } @@ -356,7 +358,7 @@ void HTTPInput::do_work() if (ret == 0) { // This really shouldn't happen... log(ERROR, "[%s] Socket unexpectedly closed while reading header", - stream_id.c_str()); + url.c_str()); state = CLOSING_SOCKET; continue; } @@ -385,7 +387,7 @@ void HTTPInput::do_work() // or the connection just got closed. // The earlier steps have already given the error message, if any. if (state == NOT_CONNECTED && !should_stop) { - log(INFO, "[%s] Waiting 0.2 second and restarting...", stream_id.c_str()); + log(INFO, "[%s] Waiting 0.2 second and restarting...", url.c_str()); usleep(200000); } } @@ -441,9 +443,13 @@ void HTTPInput::process_data(char *ptr, size_t bytes) char *inner_data = pending_data.data() + sizeof(metacube_block_header); if (flags & METACUBE_FLAGS_HEADER) { string header(inner_data, inner_data + size); - servers->set_header(stream_id, http_header + header); + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->set_header(stream_ids[i], http_header + header); + } } else { - servers->add_data(stream_id, inner_data, size); + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->add_data(stream_ids[i], inner_data, size); + } } // Consume the block. This isn't the most efficient way of dealing with things @@ -460,7 +466,7 @@ void HTTPInput::drop_pending_data(size_t num_bytes) return; } log(WARNING, "[%s] Dropping %lld junk bytes from stream, maybe it is not a Metacube stream?", - stream_id.c_str(), (long long)num_bytes); + url.c_str(), (long long)num_bytes); pending_data.erase(pending_data.begin(), pending_data.begin() + num_bytes); } diff --git a/httpinput.h b/httpinput.h index f14b099..9a27871 100644 --- a/httpinput.h +++ b/httpinput.h @@ -11,7 +11,7 @@ class InputProto; class HTTPInput : public Input { public: - HTTPInput(const std::string &stream_id, const std::string &url); + HTTPInput(const std::string &url); // Serialization/deserialization. HTTPInput(const InputProto &serialized); @@ -21,6 +21,11 @@ public: virtual std::string get_url() const { return url; } + virtual void add_destination(const std::string &stream_id) + { + stream_ids.push_back(stream_id); + } + private: // Actually does the download. virtual void do_work(); @@ -48,7 +53,7 @@ private: }; State state; - std::string stream_id; + std::vector stream_ids; // The URL and its parsed components. std::string url; diff --git a/input.cpp b/input.cpp index 7c776e1..ec12381 100644 --- a/input.cpp +++ b/input.cpp @@ -53,17 +53,17 @@ bool parse_url(const string &url, string *protocol, string *host, string *port, return true; } -Input *create_input(const std::string &stream_id, const std::string &url) +Input *create_input(const std::string &url) { string protocol, host, port, path; if (!parse_url(url, &protocol, &host, &port, &path)) { return NULL; } if (protocol == "http") { - return new HTTPInput(stream_id, url); + return new HTTPInput(url); } if (protocol == "udp") { - return new UDPInput(stream_id, url); + return new UDPInput(url); } return NULL; } diff --git a/input.h b/input.h index 79cfc54..e0bd54b 100644 --- a/input.h +++ b/input.h @@ -13,7 +13,7 @@ bool parse_url(const std::string &url, std::string *protocol, std::string *host, // Figure out the right type of input based on the URL, and create a new Input of the right type. // Will return NULL if unknown. -Input *create_input(const std::string &stream_id, const std::string &url); +Input *create_input(const std::string &url); Input *create_input(const InputProto &serialized); class Input : public Thread { @@ -22,6 +22,7 @@ public: virtual InputProto serialize() const = 0; virtual std::string get_url() const = 0; virtual void close_socket() = 0; + virtual void add_destination(const std::string &stream_id) = 0; }; #endif // !defined(_INPUT_H) diff --git a/main.cpp b/main.cpp index e953a8c..fa63549 100644 --- a/main.cpp +++ b/main.cpp @@ -36,6 +36,11 @@ vector mark_pools; volatile bool hupped = false; volatile bool stopped = false; +struct InputWithRefcount { + Input *input; + int refcount; +}; + void hup(int signum) { hupped = true; @@ -46,7 +51,7 @@ void hup(int signum) CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, - const vector inputs, + const multimap inputs, ServerPool *servers) { CubemapStateProto state = servers->serialize(); // Fills streams() and clients(). @@ -57,8 +62,10 @@ CubemapStateProto collect_state(const timeval &serialize_start, state.add_acceptors()->MergeFrom(acceptors[i]->serialize()); } - for (size_t i = 0; i < inputs.size(); ++i) { - state.add_inputs()->MergeFrom(inputs[i]->serialize()); + for (multimap::const_iterator input_it = inputs.begin(); + input_it != inputs.end(); + ++input_it) { + state.add_inputs()->MergeFrom(input_it->second.input->serialize()); } return state; @@ -98,50 +105,34 @@ vector create_acceptors( } // Find all streams in the configuration file, and create inputs for them. -vector create_inputs(const Config &config, - map *deserialized_inputs) +void create_config_inputs(const Config &config, multimap *inputs) { - vector inputs; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; if (stream_config.src.empty()) { continue; } - string stream_id = stream_config.stream_id; string src = stream_config.src; - - Input *input = NULL; - map::iterator deserialized_input_it = - deserialized_inputs->find(stream_id); - if (deserialized_input_it != deserialized_inputs->end()) { - input = deserialized_input_it->second; - if (input->get_url() != src) { - log(INFO, "Stream '%s' has changed URL from '%s' to '%s', restarting input.", - stream_id.c_str(), input->get_url().c_str(), src.c_str()); - input->close_socket(); - delete input; - input = NULL; - } - deserialized_inputs->erase(deserialized_input_it); + if (inputs->count(src) != 0) { + continue; } - if (input == NULL) { - input = create_input(stream_id, src); - if (input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + continue; } - input->run(); - inputs.push_back(input); + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); } - return inputs; } void create_streams(const Config &config, const set &deserialized_stream_ids, - map *deserialized_inputs) + multimap *inputs) { for (unsigned i = 0; i < config.mark_pools.size(); ++i) { const MarkPoolConfig &mp_config = config.mark_pools[i]; @@ -162,6 +153,14 @@ void create_streams(const Config &config, servers->set_mark_pool(stream_config.stream_id, mark_pools[stream_config.mark_pool]); } + + string src = stream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_config.stream_id); + ++input_it->second.refcount; + } } // Warn about any servers we've lost. @@ -173,10 +172,6 @@ void create_streams(const Config &config, log(WARNING, "stream '%s' disappeared from the configuration file. " "It will not be deleted, but clients will not get any new inputs.", stream_id.c_str()); - if (deserialized_inputs->count(stream_id) != 0) { - delete (*deserialized_inputs)[stream_id]; - deserialized_inputs->erase(stream_id); - } } } @@ -324,8 +319,8 @@ start: CubemapStateProto loaded_state; struct timeval serialize_start; set deserialized_stream_ids; - map deserialized_inputs; map deserialized_acceptors; + multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { log(INFO, "Deserializing state from previous process..."); string serialized; @@ -346,11 +341,12 @@ start: deserialized_stream_ids.insert(loaded_state.streams(i).stream_id()); } - // Deserialize the inputs. Note that we don't actually add them to any state yet. + // Deserialize the inputs. Note that we don't actually add them to any stream yet. for (int i = 0; i < loaded_state.inputs_size(); ++i) { - deserialized_inputs.insert(make_pair( - loaded_state.inputs(i).stream_id(), - create_input(loaded_state.inputs(i)))); + InputWithRefcount iwr; + iwr.input = create_input(loaded_state.inputs(i)); + iwr.refcount = 0; + inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr)); } // Deserialize the acceptors. @@ -363,14 +359,12 @@ start: log(INFO, "Deserialization done."); } - // Find all streams in the configuration file, and create them. - create_streams(config, deserialized_stream_ids, &deserialized_inputs); - - vector acceptors = create_acceptors(config, &deserialized_acceptors); - vector inputs = create_inputs(config, &deserialized_inputs); + // Add any new inputs coming from the config. + create_config_inputs(config, &inputs); - // All deserialized inputs should now have been taken care of, one way or the other. - assert(deserialized_inputs.empty()); + // Find all streams in the configuration file, create them, and connect to the inputs. + create_streams(config, deserialized_stream_ids, &inputs); + vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we // allocate them to, so just do round-robin. However, we need to add @@ -381,6 +375,20 @@ start: servers->run(); + // Now delete all inputs that are longer in use, and start the others. + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ) { + if (input_it->second.refcount == 0) { + log(WARNING, "Input '%s' no longer in use, closing.", + input_it->first.c_str()); + delete input_it->second.input; + inputs.erase(input_it++); + } else { + input_it->second.input->run(); + ++input_it; + } + } + // Start writing statistics. StatsThread *stats_thread = NULL; if (!config.stats_file.empty()) { @@ -413,8 +421,10 @@ start: for (size_t i = 0; i < acceptors.size(); ++i) { acceptors[i]->stop(); } - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i]->stop(); + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); + ++input_it) { + input_it->second.input->stop(); } servers->stop(); diff --git a/state.proto b/state.proto index bfacdbf..87b028c 100644 --- a/state.proto +++ b/state.proto @@ -26,7 +26,6 @@ message StreamProto { // Corresponds to class Input. message InputProto { optional int32 state = 1; - optional string stream_id = 2; optional string url = 3; optional bytes request = 4; optional int32 request_bytes_sent = 5; diff --git a/udpinput.cpp b/udpinput.cpp index ed53ede..bd83381 100644 --- a/udpinput.cpp +++ b/udpinput.cpp @@ -18,9 +18,8 @@ using namespace std; extern ServerPool *servers; -UDPInput::UDPInput(const string &stream_id, const string &url) - : stream_id(stream_id), - url(url), +UDPInput::UDPInput(const string &url) + : url(url), sock(-1) { // Should be verified by the caller. @@ -32,8 +31,7 @@ UDPInput::UDPInput(const string &stream_id, const string &url) } UDPInput::UDPInput(const InputProto &serialized) - : stream_id(serialized.stream_id()), - url(serialized.url()), + : url(serialized.url()), sock(serialized.sock()) { // Should be verified by the caller. @@ -47,7 +45,6 @@ UDPInput::UDPInput(const InputProto &serialized) InputProto UDPInput::serialize() const { InputProto serialized; - serialized.set_stream_id(stream_id); serialized.set_url(url); serialized.set_sock(sock); return serialized; @@ -75,7 +72,9 @@ void UDPInput::construct_header() "Cache-control: no-cache\r\n" "Server: " SERVER_IDENTIFICATION "\r\n" "\r\n"; - servers->set_header(stream_id, header); + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->set_header(stream_ids[i], header); + } } void UDPInput::do_work() @@ -86,7 +85,7 @@ void UDPInput::do_work() sock = create_server_socket(port_num, UDP_SOCKET); if (sock == -1) { log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...", - stream_id.c_str()); + url.c_str()); usleep(200000); continue; } @@ -120,6 +119,8 @@ void UDPInput::do_work() continue; } - servers->add_data(stream_id, buf, ret); + for (size_t i = 0; i < stream_ids.size(); ++i) { + servers->add_data(stream_ids[i], buf, ret); + } } } diff --git a/udpinput.h b/udpinput.h index a5f71b9..dc91bc2 100644 --- a/udpinput.h +++ b/udpinput.h @@ -10,7 +10,7 @@ class InputProto; class UDPInput : public Input { public: - UDPInput(const std::string &stream_id, const std::string &url); + UDPInput(const std::string &url); // Serialization/deserialization. UDPInput(const InputProto &serialized); @@ -19,6 +19,11 @@ public: virtual std::string get_url() const { return url; } virtual void close_socket(); + virtual void add_destination(const std::string &stream_id) + { + stream_ids.push_back(stream_id); + } + private: // Actually gets the packets. virtual void do_work(); @@ -26,7 +31,7 @@ private: // Create the HTTP header. void construct_header(); - std::string stream_id; + std::vector stream_ids; // The URL and its parsed components. std::string url; -- 2.39.2