X-Git-Url: https://git.sesse.net/?p=cubemap;a=blobdiff_plain;f=main.cpp;h=fa63549bd7f8a00b1ce05d55ad09c5f594507cfa;hp=e953a8cea29d79da4579adb06a3580e989598a28;hb=340489a8e732519182ecbc92116e7dfa2997143c;hpb=4f4e1384b4299611924a39f59f536b4964806135 diff --git a/main.cpp b/main.cpp index e953a8c..fa63549 100644 --- a/main.cpp +++ b/main.cpp @@ -36,6 +36,11 @@ vector mark_pools; volatile bool hupped = false; volatile bool stopped = false; +struct InputWithRefcount { + Input *input; + int refcount; +}; + void hup(int signum) { hupped = true; @@ -46,7 +51,7 @@ void hup(int signum) CubemapStateProto collect_state(const timeval &serialize_start, const vector acceptors, - const vector inputs, + const multimap inputs, ServerPool *servers) { CubemapStateProto state = servers->serialize(); // Fills streams() and clients(). @@ -57,8 +62,10 @@ CubemapStateProto collect_state(const timeval &serialize_start, state.add_acceptors()->MergeFrom(acceptors[i]->serialize()); } - for (size_t i = 0; i < inputs.size(); ++i) { - state.add_inputs()->MergeFrom(inputs[i]->serialize()); + for (multimap::const_iterator input_it = inputs.begin(); + input_it != inputs.end(); + ++input_it) { + state.add_inputs()->MergeFrom(input_it->second.input->serialize()); } return state; @@ -98,50 +105,34 @@ vector create_acceptors( } // Find all streams in the configuration file, and create inputs for them. -vector create_inputs(const Config &config, - map *deserialized_inputs) +void create_config_inputs(const Config &config, multimap *inputs) { - vector inputs; for (unsigned i = 0; i < config.streams.size(); ++i) { const StreamConfig &stream_config = config.streams[i]; if (stream_config.src.empty()) { continue; } - string stream_id = stream_config.stream_id; string src = stream_config.src; - - Input *input = NULL; - map::iterator deserialized_input_it = - deserialized_inputs->find(stream_id); - if (deserialized_input_it != deserialized_inputs->end()) { - input = deserialized_input_it->second; - if (input->get_url() != src) { - log(INFO, "Stream '%s' has changed URL from '%s' to '%s', restarting input.", - stream_id.c_str(), input->get_url().c_str(), src.c_str()); - input->close_socket(); - delete input; - input = NULL; - } - deserialized_inputs->erase(deserialized_input_it); + if (inputs->count(src) != 0) { + continue; } - if (input == NULL) { - input = create_input(stream_id, src); - if (input == NULL) { - log(ERROR, "did not understand URL '%s', clients will not get any data.", - src.c_str()); - continue; - } + + InputWithRefcount iwr; + iwr.input = create_input(src); + if (iwr.input == NULL) { + log(ERROR, "did not understand URL '%s', clients will not get any data.", + src.c_str()); + continue; } - input->run(); - inputs.push_back(input); + iwr.refcount = 0; + inputs->insert(make_pair(src, iwr)); } - return inputs; } void create_streams(const Config &config, const set &deserialized_stream_ids, - map *deserialized_inputs) + multimap *inputs) { for (unsigned i = 0; i < config.mark_pools.size(); ++i) { const MarkPoolConfig &mp_config = config.mark_pools[i]; @@ -162,6 +153,14 @@ void create_streams(const Config &config, servers->set_mark_pool(stream_config.stream_id, mark_pools[stream_config.mark_pool]); } + + string src = stream_config.src; + if (!src.empty()) { + multimap::iterator input_it = inputs->find(src); + assert(input_it != inputs->end()); + input_it->second.input->add_destination(stream_config.stream_id); + ++input_it->second.refcount; + } } // Warn about any servers we've lost. @@ -173,10 +172,6 @@ void create_streams(const Config &config, log(WARNING, "stream '%s' disappeared from the configuration file. " "It will not be deleted, but clients will not get any new inputs.", stream_id.c_str()); - if (deserialized_inputs->count(stream_id) != 0) { - delete (*deserialized_inputs)[stream_id]; - deserialized_inputs->erase(stream_id); - } } } @@ -324,8 +319,8 @@ start: CubemapStateProto loaded_state; struct timeval serialize_start; set deserialized_stream_ids; - map deserialized_inputs; map deserialized_acceptors; + multimap inputs; // multimap due to older versions without deduplication. if (state_fd != -1) { log(INFO, "Deserializing state from previous process..."); string serialized; @@ -346,11 +341,12 @@ start: deserialized_stream_ids.insert(loaded_state.streams(i).stream_id()); } - // Deserialize the inputs. Note that we don't actually add them to any state yet. + // Deserialize the inputs. Note that we don't actually add them to any stream yet. for (int i = 0; i < loaded_state.inputs_size(); ++i) { - deserialized_inputs.insert(make_pair( - loaded_state.inputs(i).stream_id(), - create_input(loaded_state.inputs(i)))); + InputWithRefcount iwr; + iwr.input = create_input(loaded_state.inputs(i)); + iwr.refcount = 0; + inputs.insert(make_pair(loaded_state.inputs(i).url(), iwr)); } // Deserialize the acceptors. @@ -363,14 +359,12 @@ start: log(INFO, "Deserialization done."); } - // Find all streams in the configuration file, and create them. - create_streams(config, deserialized_stream_ids, &deserialized_inputs); - - vector acceptors = create_acceptors(config, &deserialized_acceptors); - vector inputs = create_inputs(config, &deserialized_inputs); + // Add any new inputs coming from the config. + create_config_inputs(config, &inputs); - // All deserialized inputs should now have been taken care of, one way or the other. - assert(deserialized_inputs.empty()); + // Find all streams in the configuration file, create them, and connect to the inputs. + create_streams(config, deserialized_stream_ids, &inputs); + vector acceptors = create_acceptors(config, &deserialized_acceptors); // Put back the existing clients. It doesn't matter which server we // allocate them to, so just do round-robin. However, we need to add @@ -381,6 +375,20 @@ start: servers->run(); + // Now delete all inputs that are longer in use, and start the others. + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); ) { + if (input_it->second.refcount == 0) { + log(WARNING, "Input '%s' no longer in use, closing.", + input_it->first.c_str()); + delete input_it->second.input; + inputs.erase(input_it++); + } else { + input_it->second.input->run(); + ++input_it; + } + } + // Start writing statistics. StatsThread *stats_thread = NULL; if (!config.stats_file.empty()) { @@ -413,8 +421,10 @@ start: for (size_t i = 0; i < acceptors.size(); ++i) { acceptors[i]->stop(); } - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i]->stop(); + for (multimap::iterator input_it = inputs.begin(); + input_it != inputs.end(); + ++input_it) { + input_it->second.input->stop(); } servers->stop();