X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=c40b78f5ef3ba20bea62cc86589903cd69aa9c0e;hp=5ec0cb487269649c59c32dde293b3c42133ab2a7;hb=bd694fdd3dd1417399aecead2c8b91fc4fe95ce8;hpb=9abb89bcf7940e2ada9d708f86a218a56334f68d diff --git a/main.cpp b/main.cpp index 5ec0cb4..c40b78f 100644 --- a/main.cpp +++ b/main.cpp @@ -20,6 +20,7 @@ #include "accesslog.h" #include "config.h" #include "input.h" +#include "input_stats.h" #include "log.h" #include "markpool.h" #include "serverpool.h" @@ -109,29 +110,36 @@ vector create_acceptors( return acceptors; } +void create_config_input(const string &src, multimap *inputs) +{ + if (src.empty()) { + return; + } + if (inputs->count(src) != 0) { + return; + } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + return; + } + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); +} + // Find all streams in the configuration file, and create inputs for them. void create_config_inputs(const Config &config, multimap *inputs) { for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (stream_config.src.empty()) { - continue; - } - - string src = stream_config.src; - if (inputs->count(src) != 0) { - continue; - } - - InputWithRefcount iwr; - iwr.input = create_input(src); - if (iwr.input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } - iwr.refcount = 0; - inputs->insert(make_pair(src, iwr)); + create_config_input(stream_config.src, inputs); + } + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + create_config_input(udpstream_config.src, inputs); } } @@ -144,6 +152,7 @@ void create_streams(const Config &config, mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); } + // HTTP streams. set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; @@ -174,7 +183,7 @@ void create_streams(const Config &config, } } - // Warn about any servers we've lost. + // Warn about any HTTP servers we've lost. // TODO: Make an option (delete=yes?) to actually shut down streams. for (set::const_iterator stream_it = expecting_urls.begin(); stream_it != expecting_urls.end(); @@ -184,6 +193,24 @@ void create_streams(const Config &config, "It will not be deleted, but clients will not get any new inputs.", url.c_str()); } + + // UDP streams. + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + MarkPool *mark_pool = NULL; + if (udpstream_config.mark_pool != -1) { + mark_pool = mark_pools[udpstream_config.mark_pool]; + } + int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool); + + string src = udpstream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } + } } void open_logs(const vector &log_destinations) @@ -422,6 +449,18 @@ start: stats_thread->run(); } + InputStatsThread *input_stats_thread = NULL; + if (!config.input_stats_file.empty()) { + vector inputs_no_refcount; + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ++input_it) { + inputs_no_refcount.push_back(input_it->second.input); + } + + input_stats_thread = new InputStatsThread(config.input_stats_file, config.input_stats_interval, inputs_no_refcount); + input_stats_thread->run(); + } + struct timeval server_start; gettimeofday(&server_start, NULL); if (state_fd != -1) { @@ -440,6 +479,10 @@ start: // OK, we've been HUPed. Time to shut down everything, serialize, and re-exec. gettimeofday(&serialize_start, NULL); + if (input_stats_thread != NULL) { + input_stats_thread->stop(); + delete input_stats_thread; + } if (stats_thread != NULL) { stats_thread->stop(); delete stats_thread;