X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=cf728cadca6e04a709bc9e49ffa46ac1386ab209;hp=c42fd19cf200fcd60e16e1890d58b8dbf303a3dd;hb=4615dd4bda03a3e73bdeab2e9e81b950d266a6f5;hpb=534764f026b82b144e974882c8e53c4cd8d21b68 diff --git a/main.cpp b/main.cpp index c42fd19..cf728ca 100644 --- a/main.cpp +++ b/main.cpp @@ -16,15 +16,17 @@ #include #include -#include "accesslog.h" #include "acceptor.h" +#include "accesslog.h" #include "config.h" #include "input.h" +#include "input_stats.h" #include "log.h" #include "markpool.h" #include "serverpool.h" #include "state.pb.h" #include "stats.h" +#include "stream.h" #include "util.h" #include "version.h" @@ -49,6 +51,10 @@ void hup(int signum) } } +void do_nothing(int signum) +{ +} + CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, const multimap inputs, @@ -104,34 +110,41 @@ vector create_acceptors( return acceptors; } +void create_config_input(const string &src, multimap *inputs) +{ + if (src.empty()) { + return; + } + if (inputs->count(src) != 0) { + return; + } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + return; + } + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); +} + // Find all streams in the configuration file, and create inputs for them. void create_config_inputs(const Config &config, multimap *inputs) { for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (stream_config.src.empty()) { - continue; - } - - string src = stream_config.src; - if (inputs->count(src) != 0) { - continue; - } - - InputWithRefcount iwr; - iwr.input = create_input(src); - if (iwr.input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } - iwr.refcount = 0; - inputs->insert(make_pair(src, iwr)); + create_config_input(stream_config.src, inputs); + } + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + create_config_input(udpstream_config.src, inputs); } } void create_streams(const Config &config, - const set &deserialized_stream_ids, + const set &deserialized_urls, multimap *inputs) { for (unsigned i = 0; i < config.mark_pools.size(); ++i) { @@ -139,43 +152,65 @@ void create_streams(const Config &config, mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); } - set expecting_stream_ids = deserialized_stream_ids; + // HTTP streams. + set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (deserialized_stream_ids.count(stream_config.stream_id) == 0) { - servers->add_stream(stream_config.stream_id, - stream_config.backlog_size, - Stream::Encoding(stream_config.encoding)); + int stream_index; + if (deserialized_urls.count(stream_config.url) == 0) { + stream_index = servers->add_stream(stream_config.url, + stream_config.backlog_size, + Stream::Encoding(stream_config.encoding)); } else { - servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size); - servers->set_encoding(stream_config.stream_id, + stream_index = servers->lookup_stream_by_url(stream_config.url); + assert(stream_index != -1); + servers->set_backlog_size(stream_index, stream_config.backlog_size); + servers->set_encoding(stream_index, Stream::Encoding(stream_config.encoding)); } - expecting_stream_ids.erase(stream_config.stream_id); + expecting_urls.erase(stream_config.url); if (stream_config.mark_pool != -1) { - servers->set_mark_pool(stream_config.stream_id, - mark_pools[stream_config.mark_pool]); + servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); } string src = stream_config.src; if (!src.empty()) { multimap::iterator input_it = inputs->find(src); - assert(input_it != inputs->end()); - input_it->second.input->add_destination(stream_config.stream_id); - ++input_it->second.refcount; + if (input_it != inputs->end()) { + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } } } - // Warn about any servers we've lost. + // Warn about any HTTP servers we've lost. // TODO: Make an option (delete=yes?) to actually shut down streams. - for (set::const_iterator stream_it = expecting_stream_ids.begin(); - stream_it != expecting_stream_ids.end(); + for (set::const_iterator stream_it = expecting_urls.begin(); + stream_it != expecting_urls.end(); ++stream_it) { - string stream_id = *stream_it; + string url = *stream_it; log(WARNING, "stream '%s' disappeared from the configuration file. " "It will not be deleted, but clients will not get any new inputs.", - stream_id.c_str()); + url.c_str()); + } + + // UDP streams. + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + MarkPool *mark_pool = NULL; + if (udpstream_config.mark_pool != -1) { + mark_pool = mark_pools[udpstream_config.mark_pool]; + } + int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool); + + string src = udpstream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } } } @@ -238,6 +273,7 @@ int main(int argc, char **argv) { signal(SIGHUP, hup); signal(SIGINT, hup); + signal(SIGUSR1, do_nothing); // Used in internal signalling. signal(SIGPIPE, SIG_IGN); // Parse options. @@ -322,7 +358,7 @@ start: CubemapStateProto loaded_state; struct timeval serialize_start; - set deserialized_stream_ids; + set deserialized_urls; map deserialized_acceptors; multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { @@ -355,7 +391,7 @@ start: } servers->add_stream_from_serialized(stream, data_fds); - deserialized_stream_ids.insert(stream.stream_id()); + deserialized_urls.insert(stream.url()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. @@ -380,7 +416,7 @@ start: create_config_inputs(config, &inputs); // Find all streams in the configuration file, create them, and connect to the inputs. - create_streams(config, deserialized_stream_ids, &inputs); + create_streams(config, deserialized_urls, &inputs); vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we @@ -414,6 +450,18 @@ start: stats_thread->run(); } + InputStatsThread *input_stats_thread = NULL; + if (!config.input_stats_file.empty()) { + vector inputs_no_refcount; + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ++input_it) { + inputs_no_refcount.push_back(input_it->second.input); + } + + input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount); + input_stats_thread->run(); + } + struct timeval server_start; gettimeofday(&server_start, NULL); if (state_fd != -1) { @@ -432,6 +480,10 @@ start: // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. gettimeofday(&serialize_start, NULL); + if (input_stats_thread != NULL) { + input_stats_thread->stop(); + delete input_stats_thread; + } if (stats_thread != NULL) { stats_thread->stop(); delete stats_thread;