X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=8d60826fb3d7a8436607446f27b572ba294bc237;hp=fa63549bd7f8a00b1ce05d55ad09c5f594507cfa;hb=adbeb2f8972672ed1059509662d006df47762228;hpb=340489a8e732519182ecbc92116e7dfa2997143c diff --git a/main.cpp b/main.cpp index fa63549..8d60826 100644 --- a/main.cpp +++ b/main.cpp @@ -16,8 +16,8 @@ #include #include -#include "accesslog.h" #include "acceptor.h" +#include "accesslog.h" #include "config.h" #include "input.h" #include "log.h" @@ -25,6 +25,7 @@ #include "serverpool.h" #include "state.pb.h" #include "stats.h" +#include "stream.h" #include "util.h" #include "version.h" @@ -49,6 +50,10 @@ void hup(int signum) } } +void do_nothing(int signum) +{ +} + CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, const multimap inputs, @@ -104,34 +109,41 @@ vector create_acceptors( return acceptors; } +void create_config_input(const string &src, multimap *inputs) +{ + if (src.empty()) { + return; + } + if (inputs->count(src) != 0) { + return; + } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + return; + } + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); +} + // Find all streams in the configuration file, and create inputs for them. void create_config_inputs(const Config &config, multimap *inputs) { for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (stream_config.src.empty()) { - continue; - } - - string src = stream_config.src; - if (inputs->count(src) != 0) { - continue; - } - - InputWithRefcount iwr; - iwr.input = create_input(src); - if (iwr.input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } - iwr.refcount = 0; - inputs->insert(make_pair(src, iwr)); + create_config_input(stream_config.src, inputs); + } + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + create_config_input(udpstream_config.src, inputs); } } void create_streams(const Config &config, - const set &deserialized_stream_ids, + const set &deserialized_urls, multimap *inputs) { for (unsigned i = 0; i < config.mark_pools.size(); ++i) { @@ -139,39 +151,64 @@ void create_streams(const Config &config, mark_pools.push_back(new MarkPool(mp_config.from, mp_config.to)); } - set expecting_stream_ids = deserialized_stream_ids; + // HTTP streams. + set expecting_urls = deserialized_urls; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; - if (deserialized_stream_ids.count(stream_config.stream_id) == 0) { - servers->add_stream(stream_config.stream_id, stream_config.backlog_size); + int stream_index; + if (deserialized_urls.count(stream_config.url) == 0) { + stream_index = servers->add_stream(stream_config.url, + stream_config.backlog_size, + Stream::Encoding(stream_config.encoding)); } else { - servers->set_backlog_size(stream_config.stream_id, stream_config.backlog_size); + stream_index = servers->lookup_stream_by_url(stream_config.url); + assert(stream_index != -1); + servers->set_backlog_size(stream_index, stream_config.backlog_size); + servers->set_encoding(stream_index, + Stream::Encoding(stream_config.encoding)); } - expecting_stream_ids.erase(stream_config.stream_id); + expecting_urls.erase(stream_config.url); if (stream_config.mark_pool != -1) { - servers->set_mark_pool(stream_config.stream_id, - mark_pools[stream_config.mark_pool]); + servers->set_mark_pool(stream_index, mark_pools[stream_config.mark_pool]); } string src = stream_config.src; if (!src.empty()) { multimap::iterator input_it = inputs->find(src); assert(input_it != inputs->end()); - input_it->second.input->add_destination(stream_config.stream_id); + input_it->second.input->add_destination(stream_index); ++input_it->second.refcount; } } - // Warn about any servers we've lost. + // Warn about any HTTP servers we've lost. // TODO: Make an option (delete=yes?) to actually shut down streams. - for (set::const_iterator stream_it = expecting_stream_ids.begin(); - stream_it != expecting_stream_ids.end(); + for (set::const_iterator stream_it = expecting_urls.begin(); + stream_it != expecting_urls.end(); ++stream_it) { - string stream_id = *stream_it; + string url = *stream_it; log(WARNING, "stream '%s' disappeared from the configuration file. " "It will not be deleted, but clients will not get any new inputs.", - stream_id.c_str()); + url.c_str()); + } + + // UDP streams. + for (unsigned i = 0; i < config.udpstreams.size(); ++i) { + const UDPStreamConfig &udpstream_config = config.udpstreams[i]; + MarkPool *mark_pool = NULL; + if (udpstream_config.mark_pool != -1) { + mark_pool = mark_pools[udpstream_config.mark_pool]; + } + int stream_index = servers->add_udpstream(udpstream_config.dst, mark_pool); + + string src = udpstream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_index); + ++input_it->second.refcount; + } } } @@ -234,6 +271,7 @@ int main(int argc, char **argv) { signal(SIGHUP, hup); signal(SIGINT, hup); + signal(SIGUSR1, do_nothing); // Used in internal signalling. signal(SIGPIPE, SIG_IGN); // Parse options. @@ -318,7 +356,7 @@ start: CubemapStateProto loaded_state; struct timeval serialize_start; - set deserialized_stream_ids; + set deserialized_urls; map deserialized_acceptors; multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { @@ -337,8 +375,21 @@ start: // Deserialize the streams. for (int i = 0; i < loaded_state.streams_size(); ++i) { - servers->add_stream_from_serialized(loaded_state.streams(i)); - deserialized_stream_ids.insert(loaded_state.streams(i).stream_id()); + const StreamProto &stream = loaded_state.streams(i); + + vector data_fds; + for (int j = 0; j < stream.data_fds_size(); ++j) { + data_fds.push_back(stream.data_fds(j)); + } + + // Older versions stored the data once in the protobuf instead of + // sending around file descriptors. + if (data_fds.empty() && stream.has_data()) { + data_fds.push_back(make_tempfile(stream.data())); + } + + servers->add_stream_from_serialized(stream, data_fds); + deserialized_urls.insert(stream.url()); } // Deserialize the inputs. Note that we don't actually add them to any stream yet. @@ -363,7 +414,7 @@ start: create_config_inputs(config, &inputs); // Find all streams in the configuration file, create them, and connect to the inputs. - create_streams(config, deserialized_stream_ids, &inputs); + create_streams(config, deserialized_urls, &inputs); vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we @@ -381,6 +432,7 @@ start: if (input_it->second.refcount == 0) { log(WARNING, "Input '%s' no longer in use, closing.", input_it->first.c_str()); + input_it->second.input->close_socket(); delete input_it->second.input; inputs.erase(input_it++); } else {