X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=c4c5bd4187d3a74850a721fb2d8b90f77835405d;hp=8d60826fb3d7a8436607446f27b572ba294bc237;hb=74cd48ffef90d7d0752e37a4515e4ecfb68f7c9d;hpb=7b3d494100ef1063578b1ef76818baee4ab53ada diff --git a/main.cpp b/main.cpp index 8d60826..c4c5bd4 100644 --- a/main.cpp +++ b/main.cpp @@ -20,6 +20,7 @@ #include "accesslog.h" #include "config.h" #include "input.h" +#include "input_stats.h" #include "log.h" #include "markpool.h" #include "serverpool.h" @@ -134,7 +135,9 @@ void create_config_inputs(const Config &config, multimapadd_stream(stream_config.url, stream_config.backlog_size, @@ -167,7 +179,6 @@ void create_streams(const Config &config, servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); } - expecting_urls.erase(stream_config.url); if (stream_config.mark_pool != -1) { servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); @@ -176,20 +187,21 @@ void create_streams(const Config &config, string src = stream_config.src; if (!src.empty()) { multimap::iterator input_it = inputs->find(src); - assert(input_it != inputs->end()); - input_it->second.input->add_destination(stream_index); - ++input_it->second.refcount; + if (input_it != inputs->end()) { + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } } } - // Warn about any HTTP servers we've lost. - // TODO: Make an option (delete=yes?) to actually shut down streams. + // Warn about any streams servers we've lost. for (set::const_iterator stream_it = expecting_urls.begin(); stream_it != expecting_urls.end(); ++stream_it) { string url = *stream_it; log(WARNING, "stream '%s' disappeared from the configuration file. " - "It will not be deleted, but clients will not get any new inputs.", + "It will not be deleted, but clients will not get any new inputs. " + "If you really meant to delete it, set src=delete and reload.", url.c_str()); } @@ -267,6 +279,17 @@ bool dry_run_config(const std::string &argv0, const std::string &config_filename return (WIFEXITED(status) && WEXITSTATUS(status) == 0); } +void find_deleted_streams(const Config &config, set *deleted_urls) +{ + for (unsigned i = 0; i < config.streams.size(); ++i) { + const StreamConfig &stream_config = config.streams[i]; + if (stream_config.src == "delete") { + log(INFO, "Deleting stream '%s'.", stream_config.url.c_str()); + deleted_urls->insert(stream_config.url); + } + } +} + int main(int argc, char **argv) { signal(SIGHUP, hup); @@ -354,6 +377,10 @@ start: servers = new ServerPool(config.num_servers); + // Find all the streams that are to be deleted. + set deleted_urls; + find_deleted_streams(config, &deleted_urls); + CubemapStateProto loaded_state; struct timeval serialize_start; set deserialized_urls; @@ -374,30 +401,51 @@ start: serialize_start.tv_usec = loaded_state.serialize_start_usec(); // Deserialize the streams. + map stream_headers_for_url; // See below. for (int i = 0; i < loaded_state.streams_size(); ++i) { const StreamProto &stream = loaded_state.streams(i); - vector data_fds; - for (int j = 0; j < stream.data_fds_size(); ++j) { - data_fds.push_back(stream.data_fds(j)); - } - - // Older versions stored the data once in the protobuf instead of - // sending around file descriptors. - if (data_fds.empty() && stream.has_data()) { - data_fds.push_back(make_tempfile(stream.data())); + if (deleted_urls.count(stream.url()) != 0) { + // Delete the stream backlogs. + for (int j = 0; j < stream.data_fds_size(); ++j) { + safe_close(stream.data_fds(j)); + } + } else { + vector data_fds; + for (int j = 0; j < stream.data_fds_size(); ++j) { + data_fds.push_back(stream.data_fds(j)); + } + + // Older versions stored the data once in the protobuf instead of + // sending around file descriptors. + if (data_fds.empty() && stream.has_data()) { + data_fds.push_back(make_tempfile(stream.data())); + } + + servers->add_stream_from_serialized(stream, data_fds); + deserialized_urls.insert(stream.url()); + + stream_headers_for_url.insert(make_pair(stream.url(), stream.stream_header())); } - - servers->add_stream_from_serialized(stream, data_fds); - deserialized_urls.insert(stream.url()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. for (int i = 0; i < loaded_state.inputs_size(); ++i) { + InputProto serialized_input = loaded_state.inputs(i); + + // Older versions did not store the stream header in the input, + // only in each stream. We need to have the stream header in the + // input as well, in case we create a new stream reusing the same input. + // Thus, we put it into place here if it's missing. + if (!serialized_input.has_stream_header() && + stream_headers_for_url.count(serialized_input.url()) != 0) { + serialized_input.set_stream_header(stream_headers_for_url[serialized_input.url()]); + } + InputWithRefcount iwr; - iwr.input = create_input(loaded_state.inputs(i)); + iwr.input = create_input(serialized_input); iwr.refcount = 0; - inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr)); + inputs.insert(make_pair(serialized_input.url(), iwr)); } // Deserialize the acceptors. @@ -421,7 +469,11 @@ start: // allocate them to, so just do round-robin. However, we need to add // them after the mark pools have been set up. for (int i = 0; i < loaded_state.clients_size(); ++i) { - servers->add_client_from_serialized(loaded_state.clients(i)); + if (deleted_urls.count(loaded_state.clients(i).url()) != 0) { + safe_close(loaded_state.clients(i).sock()); + } else { + servers->add_client_from_serialized(loaded_state.clients(i)); + } } servers->run(); @@ -448,6 +500,18 @@ start: stats_thread->run(); } + InputStatsThread *input_stats_thread = NULL; + if (!config.input_stats_file.empty()) { + vector inputs_no_refcount; + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ++input_it) { + inputs_no_refcount.push_back(input_it->second.input); + } + + input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount); + input_stats_thread->run(); + } + struct timeval server_start; gettimeofday(&server_start, NULL); if (state_fd != -1) { @@ -466,6 +530,10 @@ start: // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. gettimeofday(&serialize_start, NULL); + if (input_stats_thread != NULL) { + input_stats_thread->stop(); + delete input_stats_thread; + } if (stats_thread != NULL) { stats_thread->stop(); delete stats_thread;